mirror of
https://github.com/Mesteriis/hassio-addons-avm.git
synced 2026-01-09 23:11:02 +01:00
1181 lines
47 KiB
Python
1181 lines
47 KiB
Python
from datetime import timedelta
|
|
import os
|
|
import json
|
|
from os.path import abspath, join
|
|
from shutil import copyfile
|
|
from urllib.parse import quote
|
|
|
|
import aiohttp
|
|
import pytest
|
|
import asyncio
|
|
import base64
|
|
from aiohttp import BasicAuth
|
|
from aiohttp.client import ClientSession
|
|
|
|
from backup.file import File
|
|
from backup.util import AsyncHttpGetter, GlobalInfo, DataCache, UpgradeFlags
|
|
from backup.ui import UiServer, Restarter
|
|
from backup.config import Config, Setting, CreateOptions
|
|
from backup.const import (ERROR_CREDS_EXPIRED, ERROR_EXISTING_FOLDER,
|
|
ERROR_MULTIPLE_DELETES, ERROR_NO_BACKUP,
|
|
SOURCE_GOOGLE_DRIVE, SOURCE_HA)
|
|
from backup.creds import Creds
|
|
from backup.model import Coordinator, Backup
|
|
from backup.drive import DriveSource, FolderFinder, OOB_CRED_CUTOFF
|
|
from backup.drive.drivesource import FOLDER_MIME_TYPE, DriveRequests
|
|
from backup.ha import HaSource, HaUpdater
|
|
from backup.config import VERSION
|
|
from .faketime import FakeTime
|
|
from .helpers import compareStreams
|
|
from yarl import URL
|
|
from dev.ports import Ports
|
|
from dev.simulated_supervisor import SimulatedSupervisor
|
|
from dev.simulationserver import SimulationServer
|
|
from dev.simulated_google import SimulatedGoogle
|
|
from bs4 import BeautifulSoup
|
|
from .conftest import ReaderHelper
|
|
|
|
|
|
@pytest.fixture
|
|
def source(ha):
|
|
return ha
|
|
|
|
|
|
@pytest.fixture
|
|
def dest(drive):
|
|
return drive
|
|
|
|
|
|
@pytest.fixture
|
|
def simple_config(config):
|
|
return config
|
|
|
|
|
|
@pytest.fixture
|
|
async def restarter(injector, server):
|
|
restarter = injector.get(Restarter)
|
|
await restarter.start()
|
|
return restarter
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_uiserver_start(ui_server: UiServer):
|
|
assert ui_server.running
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.timeout(10)
|
|
async def test_uiserver_static_files(reader: ReaderHelper):
|
|
await reader.get("")
|
|
await reader.get("reauthenticate")
|
|
await reader.get("pp")
|
|
await reader.get("tos")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_getstatus(reader, config: Config, ha, server, ports: Ports):
|
|
File.touch(config.get(Setting.INGRESS_TOKEN_FILE_PATH))
|
|
await ha.init()
|
|
data = await reader.getjson("getstatus")
|
|
assert data['ask_error_reports'] is True
|
|
assert data['cred_version'] == 0
|
|
assert data['firstSync'] is True
|
|
assert data['folder_id'] is None
|
|
assert data['last_error'] is None
|
|
assert data['last_backup_text'] == "Never"
|
|
assert data['next_backup_text'] == "right now"
|
|
assert data['backup_name_template'] == config.get(Setting.BACKUP_NAME)
|
|
assert data['warn_ingress_upgrade'] is False
|
|
assert len(data['backups']) == 0
|
|
assert data['sources'][SOURCE_GOOGLE_DRIVE] == {
|
|
'deletable': 0,
|
|
'name': SOURCE_GOOGLE_DRIVE,
|
|
'retained': 0,
|
|
'backups': 0,
|
|
'latest': None,
|
|
'size': '0.0 B',
|
|
'enabled': True,
|
|
'max': config.get(Setting.MAX_BACKUPS_IN_GOOGLE_DRIVE),
|
|
'title': "Google Drive",
|
|
'icon': 'google-drive',
|
|
'ignored': 0,
|
|
'ignored_size': '0.0 B',
|
|
'detail': "",
|
|
}
|
|
assert data['sources'][SOURCE_HA] == {
|
|
'deletable': 0,
|
|
'name': SOURCE_HA,
|
|
'retained': 0,
|
|
'backups': 0,
|
|
'latest': None,
|
|
'size': '0.0 B',
|
|
'enabled': True,
|
|
'max': config.get(Setting.MAX_BACKUPS_IN_HA),
|
|
'title': "Home Assistant",
|
|
'free_space': "0.0 B",
|
|
'icon': 'home-assistant',
|
|
'ignored': 0,
|
|
'ignored_size': '0.0 B',
|
|
'detail': "",
|
|
}
|
|
assert len(data['sources']) == 2
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_getstatus_sync(reader, config: Config, backup: Backup, time: FakeTime):
|
|
data = await reader.getjson("getstatus")
|
|
assert data['firstSync'] is False
|
|
assert data['folder_id'] is not None
|
|
assert data['last_error'] is None
|
|
assert data['last_backup_text'] != "Never"
|
|
assert data['next_backup_text'] != "right now"
|
|
assert len(data['backups']) == 1
|
|
assert data['sources'][SOURCE_GOOGLE_DRIVE] == {
|
|
'deletable': 1,
|
|
'name': SOURCE_GOOGLE_DRIVE,
|
|
'retained': 0,
|
|
'backups': 1,
|
|
'latest': time.asRfc3339String(time.now()),
|
|
'size': data['sources'][SOURCE_GOOGLE_DRIVE]['size'],
|
|
'enabled': True,
|
|
'max': config.get(Setting.MAX_BACKUPS_IN_GOOGLE_DRIVE),
|
|
'title': "Google Drive",
|
|
'icon': 'google-drive',
|
|
'free_space': "5.0 GB",
|
|
'ignored': 0,
|
|
'ignored_size': '0.0 B',
|
|
'detail': "testing@no.where",
|
|
}
|
|
assert data['sources'][SOURCE_HA] == {
|
|
'deletable': 1,
|
|
'name': SOURCE_HA,
|
|
'retained': 0,
|
|
'backups': 1,
|
|
'latest': time.asRfc3339String(time.now()),
|
|
'size': data['sources'][SOURCE_HA]['size'],
|
|
'enabled': True,
|
|
'max': config.get(Setting.MAX_BACKUPS_IN_HA),
|
|
'title': "Home Assistant",
|
|
'free_space': data['sources'][SOURCE_HA]['free_space'],
|
|
'icon': 'home-assistant',
|
|
'ignored': 0,
|
|
'ignored_size': '0.0 B',
|
|
'detail': "",
|
|
}
|
|
assert len(data['sources']) == 2
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_retain(reader: ReaderHelper, config: Config, backup: Backup, coord: Coordinator, time: FakeTime):
|
|
slug = backup.slug()
|
|
assert await reader.getjson("retain", json={'slug': slug, 'sources': {"GoogleDrive": True, "HomeAssistant": True}}) == {
|
|
'message': "Updated the backup's settings"
|
|
}
|
|
status = await reader.getjson("getstatus")
|
|
assert status['sources'][SOURCE_GOOGLE_DRIVE] == {
|
|
'deletable': 0,
|
|
'name': SOURCE_GOOGLE_DRIVE,
|
|
'retained': 1,
|
|
'backups': 1,
|
|
'latest': time.asRfc3339String(backup.date()),
|
|
'size': status['sources'][SOURCE_GOOGLE_DRIVE]['size'],
|
|
'enabled': True,
|
|
'max': config.get(Setting.MAX_BACKUPS_IN_GOOGLE_DRIVE),
|
|
'title': "Google Drive",
|
|
'icon': 'google-drive',
|
|
'free_space': "5.0 GB",
|
|
'ignored': 0,
|
|
'ignored_size': '0.0 B',
|
|
'detail': "testing@no.where",
|
|
}
|
|
assert status['sources'][SOURCE_HA] == {
|
|
'deletable': 0,
|
|
'name': SOURCE_HA,
|
|
'retained': 1,
|
|
'backups': 1,
|
|
'latest': time.asRfc3339String(backup.date()),
|
|
'size': status['sources'][SOURCE_HA]['size'],
|
|
'enabled': True,
|
|
'max': config.get(Setting.MAX_BACKUPS_IN_HA),
|
|
'title': "Home Assistant",
|
|
'free_space': status['sources'][SOURCE_HA]["free_space"],
|
|
'icon': 'home-assistant',
|
|
'ignored': 0,
|
|
'ignored_size': '0.0 B',
|
|
'detail': "",
|
|
}
|
|
|
|
await reader.getjson("retain", json={'slug': slug, 'sources': {"GoogleDrive": False, "HomeAssistant": False}})
|
|
status = await reader.getjson("getstatus")
|
|
assert status['sources'][SOURCE_GOOGLE_DRIVE] == {
|
|
'deletable': 1,
|
|
'name': SOURCE_GOOGLE_DRIVE,
|
|
'retained': 0,
|
|
'backups': 1,
|
|
'latest': time.asRfc3339String(backup.date()),
|
|
'size': status['sources'][SOURCE_GOOGLE_DRIVE]['size'],
|
|
'enabled': True,
|
|
'max': config.get(Setting.MAX_BACKUPS_IN_GOOGLE_DRIVE),
|
|
'title': "Google Drive",
|
|
'icon': 'google-drive',
|
|
'free_space': "5.0 GB",
|
|
'ignored': 0,
|
|
'ignored_size': '0.0 B',
|
|
'detail': "testing@no.where",
|
|
}
|
|
assert status['sources'][SOURCE_HA] == {
|
|
'deletable': 1,
|
|
'name': SOURCE_HA,
|
|
'retained': 0,
|
|
'backups': 1,
|
|
'latest': time.asRfc3339String(backup.date()),
|
|
'size': status['sources'][SOURCE_HA]['size'],
|
|
'enabled': True,
|
|
'max': config.get(Setting.MAX_BACKUPS_IN_HA),
|
|
'title': "Home Assistant",
|
|
'free_space': status['sources'][SOURCE_HA]["free_space"],
|
|
'icon': 'home-assistant',
|
|
'ignored': 0,
|
|
'ignored_size': '0.0 B',
|
|
'detail': "",
|
|
}
|
|
delete_req = {
|
|
"slug": slug,
|
|
"sources": ["GoogleDrive"]
|
|
}
|
|
await reader.getjson("deleteSnapshot", json=delete_req)
|
|
await reader.getjson("retain", json={'slug': slug, 'sources': {"HomeAssistant": True}})
|
|
status = await reader.getjson("getstatus")
|
|
assert status['sources'][SOURCE_GOOGLE_DRIVE] == {
|
|
'deletable': 0,
|
|
'name': SOURCE_GOOGLE_DRIVE,
|
|
'retained': 0,
|
|
'backups': 0,
|
|
'latest': None,
|
|
'size': status['sources'][SOURCE_GOOGLE_DRIVE]['size'],
|
|
'enabled': True,
|
|
'max': config.get(Setting.MAX_BACKUPS_IN_GOOGLE_DRIVE),
|
|
'title': "Google Drive",
|
|
'icon': 'google-drive',
|
|
'free_space': "5.0 GB",
|
|
'ignored': 0,
|
|
'ignored_size': '0.0 B',
|
|
'detail': "testing@no.where",
|
|
}
|
|
assert status['sources'][SOURCE_HA] == {
|
|
'deletable': 0,
|
|
'name': SOURCE_HA,
|
|
'retained': 1,
|
|
'backups': 1,
|
|
'latest': time.asRfc3339String(backup.date()),
|
|
'size': status['sources'][SOURCE_HA]['size'],
|
|
'enabled': True,
|
|
'max': config.get(Setting.MAX_BACKUPS_IN_HA),
|
|
'title': "Home Assistant",
|
|
'free_space': status['sources'][SOURCE_HA]["free_space"],
|
|
'icon': 'home-assistant',
|
|
'ignored': 0,
|
|
'ignored_size': '0.0 B',
|
|
'detail': "",
|
|
}
|
|
|
|
# sync again, which should upoload the backup to Drive
|
|
await coord.sync()
|
|
status = await reader.getjson("getstatus")
|
|
assert status['sources'][SOURCE_GOOGLE_DRIVE]['backups'] == 1
|
|
assert status['sources'][SOURCE_GOOGLE_DRIVE]['retained'] == 0
|
|
assert status['sources'][SOURCE_GOOGLE_DRIVE]['backups'] == 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_note(reader: ReaderHelper, config: Config, backup: Backup, coord: Coordinator, time: FakeTime):
|
|
slug = backup.slug()
|
|
assert backup.note() is None
|
|
assert await reader.getjson("note", json={'slug': slug, 'note': "This is the note"}) == {
|
|
'message': "Updated the backup's settings"
|
|
}
|
|
status = await reader.getjson("getstatus")
|
|
assert backup.note() == "This is the note"
|
|
assert status['backups'][0]['note'] == "This is the note"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_sync(reader, ui_server, coord: Coordinator, time: FakeTime, session):
|
|
assert len(coord.backups()) == 0
|
|
status = await reader.getjson("sync")
|
|
assert len(coord.backups()) == 1
|
|
assert status == await reader.getjson("getstatus")
|
|
time.advance(days=7)
|
|
assert len((await reader.getjson("sync"))['backups']) == 2
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_delete(reader: ReaderHelper, ui_server, backup):
|
|
slug = backup.slug()
|
|
|
|
data = {"slug": "bad_slug", "sources": ["GoogleDrive"]}
|
|
await reader.assertError("deleteSnapshot", json=data, error_type=ERROR_NO_BACKUP)
|
|
status = await reader.getjson("getstatus")
|
|
assert len(status['backups']) == 1
|
|
data["slug"] = slug
|
|
assert await reader.getjson("deleteSnapshot", json=data) == {"message": "Deleted from 1 place(s)"}
|
|
await reader.assertError("deleteSnapshot", json=data, error_type=ERROR_NO_BACKUP)
|
|
status = await reader.getjson("getstatus")
|
|
assert len(status['backups']) == 1
|
|
assert status['sources'][SOURCE_GOOGLE_DRIVE]['backups'] == 0
|
|
data["sources"] = ["HomeAssistant"]
|
|
assert await reader.getjson("deleteSnapshot", json=data) == {"message": "Deleted from 1 place(s)"}
|
|
status = await reader.getjson("getstatus")
|
|
assert len(status['backups']) == 0
|
|
data["sources"] = []
|
|
await reader.assertError("deleteSnapshot", json=data, error_type=ERROR_NO_BACKUP)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_backup_now(reader, ui_server, time: FakeTime, backup: Backup, coord: Coordinator):
|
|
assert len(coord.backups()) == 1
|
|
assert (await reader.getjson("getstatus"))["backups"][0]["date"] == time.toLocal(time.now()).strftime("%c")
|
|
|
|
time.advance(hours=1)
|
|
assert await reader.getjson("backup?custom_name=TestName&retain_drive=False&retain_ha=False") == {
|
|
'message': "Requested backup 'TestName'"
|
|
}
|
|
status = await reader.getjson('getstatus')
|
|
assert len(status["backups"]) == 2
|
|
assert status["backups"][1]["date"] == time.toLocal(time.now()).strftime("%c")
|
|
assert status["backups"][1]["name"] == "TestName"
|
|
assert status["backups"][1]["note"] is None
|
|
assert status["backups"][1]['sources'][0]['retained'] is False
|
|
assert len(status["backups"][1]['sources']) == 1
|
|
|
|
time.advance(hours=1)
|
|
assert await reader.getjson("backup?custom_name=TestName2&retain_drive=True&retain_ha=False") == {
|
|
'message': "Requested backup 'TestName2'"
|
|
}
|
|
await coord.sync()
|
|
status = await reader.getjson('getstatus')
|
|
assert len(status["backups"]) == 3
|
|
assert status["backups"][2]["date"] == time.toLocal(time.now()).strftime("%c")
|
|
assert status["backups"][2]["name"] == "TestName2"
|
|
assert status["backups"][2]['sources'][0]['retained'] is False
|
|
assert status["backups"][2]['sources'][1]['retained'] is True
|
|
|
|
time.advance(hours=1)
|
|
assert await reader.getjson("backup?custom_name=TestName3&retain_drive=False&retain_ha=True") == {
|
|
'message': "Requested backup 'TestName3'"
|
|
}
|
|
await coord.sync()
|
|
status = await reader.getjson('getstatus')
|
|
assert len(status["backups"]) == 4
|
|
assert status["backups"][3]['sources'][0]['retained'] is True
|
|
assert status["backups"][3]['sources'][1]['retained'] is False
|
|
assert status["backups"][3]["date"] == time.toLocal(time.now()).strftime("%c")
|
|
assert status["backups"][3]["name"] == "TestName3"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_backup_now_with_note(reader, ui_server, time: FakeTime, coord: Coordinator):
|
|
assert len(coord.backups()) == 0
|
|
|
|
time.advance(hours=1)
|
|
assert await reader.getjson("backup?custom_name=TestName&retain_drive=False&retain_ha=False¬e=ThisIsTheNote") == {
|
|
'message': "Requested backup 'TestName'"
|
|
}
|
|
await coord.sync()
|
|
status = await reader.getjson('getstatus')
|
|
assert status['backups'][0]["note"] == "ThisIsTheNote"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_config(reader, ui_server, config: Config, supervisor: SimulatedSupervisor):
|
|
update = {
|
|
"config": {
|
|
"days_between_backups": 20,
|
|
"drive_ipv4": ""
|
|
},
|
|
"backup_folder": "unused"
|
|
}
|
|
assert ui_server._starts == 1
|
|
assert await reader.postjson("saveconfig", json=update) == {'message': 'Settings saved', "reload_page": False}
|
|
assert config.get(Setting.DAYS_BETWEEN_BACKUPS) == 20
|
|
assert supervisor._options["days_between_backups"] == 20
|
|
assert ui_server._starts == 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_auth_and_restart(reader, ui_server, config: Config, restarter, coord: Coordinator, supervisor: SimulatedSupervisor):
|
|
update = {"config": {"require_login": True,
|
|
"expose_extra_server": True}, "backup_folder": "unused"}
|
|
assert ui_server._starts == 1
|
|
assert not config.get(Setting.REQUIRE_LOGIN)
|
|
assert await reader.postjson("saveconfig", json=update) == {'message': 'Settings saved', "reload_page": False}
|
|
await restarter.waitForRestart()
|
|
assert config.get(Setting.REQUIRE_LOGIN)
|
|
assert supervisor._options['require_login']
|
|
assert ui_server._starts == 2
|
|
|
|
await reader.get("getstatus", status=401, ingress=False)
|
|
await reader.get("getstatus", auth=BasicAuth("user", "badpassword"), status=401, ingress=False)
|
|
await reader.get("getstatus", auth=BasicAuth("user", "pass"), ingress=False)
|
|
await coord.waitForSyncToFinish()
|
|
status = await reader.getjson("getstatus", auth=BasicAuth("user", "pass"), ingress=False)
|
|
|
|
# verify a the sync succeeded (no errors)
|
|
assert status["last_error"] is None
|
|
|
|
# The ingress server shouldn't require login, even though its turned on for the extra server
|
|
await reader.get("getstatus")
|
|
# even a bad user/pass should work
|
|
await reader.get("getstatus", auth=BasicAuth("baduser", "badpassword"))
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.timeout(100)
|
|
async def test_expose_extra_server_option(reader, ui_server: UiServer, config: Config):
|
|
with pytest.raises(aiohttp.client_exceptions.ClientConnectionError):
|
|
await reader.getjson("sync", ingress=False)
|
|
config.override(Setting.EXPOSE_EXTRA_SERVER, True)
|
|
await ui_server.run()
|
|
await reader.getjson("sync", ingress=False)
|
|
await ui_server.run()
|
|
await reader.getjson("sync", ingress=False)
|
|
config.override(Setting.EXPOSE_EXTRA_SERVER, False)
|
|
await ui_server.run()
|
|
with pytest.raises(aiohttp.client_exceptions.ClientConnectionError):
|
|
await reader.getjson("sync", ingress=False)
|
|
await reader.getjson("sync")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_error_reports_true(reader, ui_server, config: Config, supervisor: SimulatedSupervisor):
|
|
assert config.get(Setting.SEND_ERROR_REPORTS) is False
|
|
assert not config.isExplicit(Setting.SEND_ERROR_REPORTS)
|
|
assert await reader.getjson("errorreports?send=true") == {'message': 'Configuration updated'}
|
|
assert config.get(Setting.SEND_ERROR_REPORTS) is True
|
|
assert config.isExplicit(Setting.SEND_ERROR_REPORTS)
|
|
assert supervisor._options["send_error_reports"] is True
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_error_reports_false(reader, ui_server, config: Config, supervisor: SimulatedSupervisor):
|
|
assert config.get(Setting.SEND_ERROR_REPORTS) is False
|
|
assert not config.isExplicit(Setting.SEND_ERROR_REPORTS)
|
|
assert await reader.getjson("errorreports?send=false") == {'message': 'Configuration updated'}
|
|
assert config.get(Setting.SEND_ERROR_REPORTS) is False
|
|
assert config.isExplicit(Setting.SEND_ERROR_REPORTS)
|
|
assert supervisor._options["send_error_reports"] is False
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_drive_cred_generation(reader: ReaderHelper, ui_server: UiServer, backup, config: Config, global_info: GlobalInfo, session: ClientSession, google):
|
|
status = await reader.getjson("getstatus")
|
|
assert len(status["backups"]) == 1
|
|
assert global_info.credVersion == 0
|
|
# Invalidate the drive creds, sync, then verify we see an error
|
|
google.expireCreds()
|
|
status = await reader.getjson("sync")
|
|
assert status["last_error"]["error_type"] == ERROR_CREDS_EXPIRED
|
|
|
|
# simulate the user going through the Drive authentication workflow
|
|
auth_url = URL(status['authenticate_url']).with_query({
|
|
"redirectbacktoken": reader.getUrl(True) + "token",
|
|
"version": VERSION,
|
|
"return": reader.getUrl(True)
|
|
})
|
|
async with session.get(auth_url) as resp:
|
|
resp.raise_for_status()
|
|
html = await resp.text()
|
|
page = BeautifulSoup(html, 'html.parser')
|
|
area = page.find("textarea")
|
|
creds = str(area.getText()).strip()
|
|
|
|
cred_url = URL(reader.getUrl(True) + "token").with_query({"creds": creds, "host": reader.getUrl(True)})
|
|
async with session.get(cred_url) as resp:
|
|
resp.raise_for_status()
|
|
# verify we got redirected to the addon main page.
|
|
assert resp.url == URL(reader.getUrl(True))
|
|
await ui_server.sync(None)
|
|
assert global_info._last_error is None
|
|
assert global_info.credVersion == 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_confirm_multiple_deletes(reader, ui_server, server, config: Config, time: FakeTime, ha: HaSource):
|
|
# reconfigure to only store 1 backup
|
|
config.override(Setting.MAX_BACKUPS_IN_GOOGLE_DRIVE, 1)
|
|
config.override(Setting.MAX_BACKUPS_IN_HA, 1)
|
|
|
|
# create three backups
|
|
await ha.create(CreateOptions(time.now(), "Name1"))
|
|
await ha.create(CreateOptions(time.now(), "Name2"))
|
|
await ha.create(CreateOptions(time.now(), "Name3"))
|
|
|
|
# verify we have 3 backups an the multiple delete error
|
|
status = await reader.getjson("sync")
|
|
assert len(status['backups']) == 3
|
|
assert status["last_error"]["error_type"] == ERROR_MULTIPLE_DELETES
|
|
assert status["last_error"]["data"] == {
|
|
SOURCE_GOOGLE_DRIVE: 0,
|
|
SOURCE_GOOGLE_DRIVE + "_desc": '',
|
|
SOURCE_HA: 2,
|
|
SOURCE_HA + "_desc": "Name1\nName2"
|
|
}
|
|
|
|
# request that multiple deletes be allowed
|
|
assert await reader.getjson("confirmdelete?always=false") == {
|
|
'message': 'Backups deleted this one time'
|
|
}
|
|
assert config.get(Setting.CONFIRM_MULTIPLE_DELETES)
|
|
|
|
# backup, verify the deletes go through
|
|
status = await reader.getjson("sync")
|
|
assert status["last_error"] is None
|
|
assert len(status["backups"]) == 1
|
|
|
|
# create another backup, verify we delete the one
|
|
await ha.create(CreateOptions(time.now(), "Name1"))
|
|
status = await reader.getjson("sync")
|
|
assert len(status['backups']) == 1
|
|
assert status["last_error"] is None
|
|
|
|
# create two more backups, verify we see the error again
|
|
await ha.create(CreateOptions(time.now(), "Name1"))
|
|
await ha.create(CreateOptions(time.now(), "Name2"))
|
|
status = await reader.getjson("sync")
|
|
assert len(status['backups']) == 3
|
|
assert status["last_error"]["error_type"] == ERROR_MULTIPLE_DELETES
|
|
assert status["last_error"]["data"] == {
|
|
SOURCE_GOOGLE_DRIVE: 0,
|
|
SOURCE_GOOGLE_DRIVE + "_desc": '',
|
|
SOURCE_HA: 2,
|
|
SOURCE_HA + "_desc": "Name1\nName1"
|
|
}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_multiple_deletes_setting(reader, ui_server, server, config: Config, time: FakeTime, ha: HaSource, global_info: GlobalInfo):
|
|
assert await reader.getjson("confirmdelete?always=true") == {
|
|
'message': 'Configuration updated, I\'ll never ask again'
|
|
}
|
|
assert not config.get(Setting.CONFIRM_MULTIPLE_DELETES)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_resolve_folder_reuse(reader, config: Config, backup, time, drive):
|
|
# Simulate an existing folder error
|
|
old_folder = await drive.getFolderId()
|
|
File.delete(config.get(Setting.FOLDER_FILE_PATH))
|
|
time.advance(days=1)
|
|
status = await reader.getjson("sync")
|
|
assert status["last_error"]["error_type"] == ERROR_EXISTING_FOLDER
|
|
|
|
assert (await reader.getjson("resolvefolder?use_existing=true")) == {'message': 'Done'}
|
|
status = await reader.getjson("sync")
|
|
assert status["last_error"] is None
|
|
assert old_folder == await drive.getFolderId()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_resolve_folder_new(reader, config: Config, backup, time, drive):
|
|
# Simulate an existing folder error
|
|
old_folder = await drive.getFolderId()
|
|
File.delete(config.get(Setting.FOLDER_FILE_PATH))
|
|
time.advance(days=1)
|
|
status = await reader.getjson("sync")
|
|
assert status["last_error"]["error_type"] == ERROR_EXISTING_FOLDER
|
|
|
|
assert (await reader.getjson("resolvefolder?use_existing=false")) == {'message': 'Done'}
|
|
status = await reader.getjson("sync")
|
|
assert status["last_error"] is None
|
|
assert old_folder != await drive.getFolderId()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ssl_server(reader: ReaderHelper, ui_server: UiServer, config, server, cleandir, restarter):
|
|
ssl_dir = abspath(join(__file__, "..", "..", "dev", "ssl"))
|
|
copyfile(join(ssl_dir, "localhost.crt"), join(cleandir, "localhost.crt"))
|
|
copyfile(join(ssl_dir, "localhost.key"), join(cleandir, "localhost.key"))
|
|
update = {
|
|
"config": {
|
|
"use_ssl": True,
|
|
"expose_extra_server": True,
|
|
"certfile": join(cleandir, "localhost.crt"),
|
|
"keyfile": join(cleandir, "localhost.key")
|
|
},
|
|
"backup_folder": "unused"
|
|
}
|
|
assert ui_server._starts == 1
|
|
assert await reader.postjson("saveconfig", json=update) == {'message': 'Settings saved', "reload_page": False}
|
|
await restarter.waitForRestart()
|
|
assert ui_server._starts == 2
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_bad_ssl_config_missing_files(reader: ReaderHelper, ui_server: UiServer, config, server, cleandir, restarter):
|
|
update = {
|
|
"config": {
|
|
"use_ssl": True,
|
|
"expose_extra_server": True,
|
|
"certfile": join(cleandir, "localhost.crt"),
|
|
"keyfile": join(cleandir, "localhost.key")
|
|
},
|
|
"backup_folder": "unused"
|
|
}
|
|
assert ui_server._starts == 1
|
|
assert await reader.postjson("saveconfig", json=update) == {'message': 'Settings saved', "reload_page": False}
|
|
await restarter.waitForRestart()
|
|
assert ui_server._starts == 2
|
|
|
|
# Verify the ingress endpoint is still up, but not the SSL one
|
|
await reader.getjson("getstatus")
|
|
with pytest.raises(aiohttp.client_exceptions.ClientConnectionError):
|
|
await reader.getjson("getstatus", ingress=False, ssl=True, sslcontext=False)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_bad_ssl_config_wrong_files(reader: ReaderHelper, ui_server: UiServer, config, server, cleandir, restarter):
|
|
ssl_dir = abspath(join(__file__, "..", "..", "dev", "ssl"))
|
|
copyfile(join(ssl_dir, "localhost.crt"), join(cleandir, "localhost.crt"))
|
|
copyfile(join(ssl_dir, "localhost.key"), join(cleandir, "localhost.key"))
|
|
update = {
|
|
"config": {
|
|
"use_ssl": True,
|
|
"expose_extra_server": True,
|
|
"certfile": join(cleandir, "localhost.key"),
|
|
"keyfile": join(cleandir, "localhost.crt")
|
|
},
|
|
"backup_folder": "unused"
|
|
}
|
|
assert ui_server._starts == 1
|
|
assert await reader.postjson("saveconfig", json=update) == {'message': 'Settings saved', "reload_page": False}
|
|
await restarter.waitForRestart()
|
|
assert ui_server._starts == 2
|
|
|
|
# Verify the ingress endpoint is still up, but not the SSL one
|
|
await reader.getjson("getstatus")
|
|
with pytest.raises(aiohttp.client_exceptions.ClientConnectionError):
|
|
await reader.getjson("getstatus", ingress=False, ssl=True, sslcontext=False)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_download_drive(reader, ui_server, backup, drive: DriveSource, ha: HaSource, session, time):
|
|
await ha.delete(backup)
|
|
# download the item from Google Drive
|
|
from_drive = await drive.read(backup)
|
|
# Download rom the web server
|
|
from_server = AsyncHttpGetter(
|
|
reader.getUrl() + "download?slug=" + backup.slug(), {}, session, time=time)
|
|
await compareStreams(from_drive, from_server)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_download_home_assistant(reader: ReaderHelper, ui_server, backup, drive: DriveSource, ha: HaSource, session, time):
|
|
await drive.delete(backup)
|
|
# download the item from Google Drive
|
|
from_ha = await ha.read(backup)
|
|
# Download rom the web server
|
|
from_server = AsyncHttpGetter(
|
|
reader.getUrl() + "download?slug=" + backup.slug(), {}, session, time=time)
|
|
await compareStreams(from_ha, from_server)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cancel_and_startsync(reader: ReaderHelper, coord: Coordinator):
|
|
coord._sync_wait.set()
|
|
status = await reader.getjson("startSync")
|
|
assert status["syncing"]
|
|
cancel = await reader.getjson('cancelSync')
|
|
assert not cancel["syncing"]
|
|
assert cancel["last_error"]["error_type"] == "cancelled"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_token(reader: ReaderHelper, coord: Coordinator, ha, drive: DriveSource):
|
|
creds = {
|
|
"client_id": "new_access_token",
|
|
"access_token": "new_access_token",
|
|
"refresh_token": "new_refresh_token",
|
|
"token_expiry": "2022-01-01T00:00:00"
|
|
}
|
|
serialized = str(base64.b64encode(json.dumps(creds).encode("utf-8")), "utf-8")
|
|
await reader.get("token?creds={0}&host={1}".format(quote(serialized), quote(reader.getUrl(True))))
|
|
assert drive.drivebackend.creds.access_token == 'new_access_token'
|
|
assert drive.drivebackend.creds.refresh_token == 'new_refresh_token'
|
|
assert drive.drivebackend.creds.secret is None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_token_with_secret(reader: ReaderHelper, coord: Coordinator, ha, drive: DriveSource):
|
|
creds = {
|
|
"client_id": "new_access_token",
|
|
"client_secret": "new_client_secret",
|
|
"access_token": "new_access_token",
|
|
"refresh_token": "new_refresh_token",
|
|
"token_expiry": "2022-01-01T00:00:00"
|
|
}
|
|
serialized = str(base64.b64encode(json.dumps(creds).encode("utf-8")), "utf-8")
|
|
await reader.get("token?creds={0}&host={1}".format(quote(serialized), quote(reader.getUrl(True))))
|
|
assert drive.drivebackend.creds.access_token == 'new_access_token'
|
|
assert drive.drivebackend.creds.refresh_token == 'new_refresh_token'
|
|
assert drive.drivebackend.creds.secret == 'new_client_secret'
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_token_extra_server(reader: ReaderHelper, coord: Coordinator, ha, drive: DriveSource, restarter, time):
|
|
update = {
|
|
"config": {
|
|
"expose_extra_server": True
|
|
},
|
|
"backup_folder": "unused"
|
|
}
|
|
assert await reader.postjson("saveconfig", json=update) == {'message': 'Settings saved', "reload_page": False}
|
|
await restarter.waitForRestart()
|
|
creds = Creds(time, "id", time.now(), "token", "refresh")
|
|
serialized = str(base64.b64encode(json.dumps(creds.serialize()).encode("utf-8")), "utf-8")
|
|
await reader.get("token?creds={0}&host={1}".format(quote(serialized), quote(reader.getUrl(False))), ingress=False)
|
|
assert drive.drivebackend.creds.access_token == 'token'
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_changefolder(reader: ReaderHelper, coord: Coordinator, ha, ui_server, folder_finder: FolderFinder):
|
|
assert await reader.get("changefolder?id=12345") == '{}'
|
|
assert await folder_finder.get() == "12345"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_changefolder_extra_server(reader: ReaderHelper, coord: Coordinator, ha, drive: DriveSource, restarter, ui_server, folder_finder: FolderFinder):
|
|
update = {
|
|
"config": {
|
|
"expose_extra_server": True
|
|
},
|
|
"backup_folder": "unused"
|
|
}
|
|
assert await reader.postjson("saveconfig", json=update) == {'message': 'Settings saved', "reload_page": False}
|
|
await restarter.waitForRestart()
|
|
|
|
# create a folder
|
|
folder_metadata = {
|
|
'name': "Other Folder",
|
|
'mimeType': FOLDER_MIME_TYPE,
|
|
'appProperties': {
|
|
"backup_folder": "true",
|
|
},
|
|
}
|
|
|
|
# create two folders at different times
|
|
id = (await drive.drivebackend.createFolder(folder_metadata))['id']
|
|
|
|
await reader.get("changefolder?id=" + str(id), ingress=False)
|
|
assert await folder_finder.get() == id
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_sync_interval(reader, ui_server, config: Config, supervisor: SimulatedSupervisor):
|
|
# Make sure the default saves nothing
|
|
update = {
|
|
"config": {
|
|
"max_sync_interval_seconds": '3 hours',
|
|
},
|
|
"backup_folder": "unused"
|
|
}
|
|
assert await reader.postjson("saveconfig", json=update) == {'message': 'Settings saved', "reload_page": False}
|
|
assert config.get(Setting.MAX_SYNC_INTERVAL_SECONDS) == 60 * 60 * 3
|
|
assert "max_sync_interval_seconds" not in supervisor._options
|
|
|
|
# Update custom
|
|
update = {
|
|
"config": {
|
|
"max_sync_interval_seconds": '2 hours',
|
|
},
|
|
"backup_folder": "unused"
|
|
}
|
|
assert await reader.postjson("saveconfig", json=update) == {'message': 'Settings saved', "reload_page": False}
|
|
assert config.get(Setting.MAX_SYNC_INTERVAL_SECONDS) == 60 * 60 * 2
|
|
assert supervisor._options["max_sync_interval_seconds"] == 60 * 60 * 2
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_manual_creds(reader: ReaderHelper, ui_server: UiServer, config: Config, server: SimulationServer, session, drive: DriveSource):
|
|
periodic_check = await reader.getjson("checkManualAuth")
|
|
assert periodic_check['message'] == "No request for authorization is in progress."
|
|
drive.saveCreds(None)
|
|
assert not drive.enabled()
|
|
|
|
await setup_manual_creds(reader, server, drive, session)
|
|
|
|
# Verify creds are saved and drive is enabled
|
|
assert drive.enabled()
|
|
assert drive.isCustomCreds()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_manual_creds_failure(reader: ReaderHelper, ui_server: UiServer, config: Config, server: SimulationServer, session, drive: DriveSource):
|
|
drive.saveCreds(None)
|
|
assert not drive.enabled()
|
|
|
|
# Try with a bad client_id
|
|
req_path = URL("manualauth").with_query({
|
|
"client_id": "wrong_id",
|
|
"client_secret": server.google._custom_drive_client_secret})
|
|
data = await reader.getjson(str(req_path), status=500)
|
|
assert data["message"] == "Google responded with error status HTTP 401. Please verify your credentials are set up correctly."
|
|
|
|
# Try with a bad client_secret
|
|
req_path = URL("manualauth").with_query({
|
|
"client_id": server.google._custom_drive_client_id,
|
|
"client_secret": "wrong_secret"})
|
|
data = await reader.getjson(str(req_path))
|
|
|
|
await asyncio.sleep(2)
|
|
|
|
periodic_check = await reader.getjson("checkManualAuth", status=500)
|
|
assert periodic_check['message'] == "Failed unexpectedly while trying to reach Google. See the add-on logs for details."
|
|
|
|
# verify creds are saved and drive is enabled
|
|
assert not drive.enabled()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_setting_cancels_and_resyncs(reader: ReaderHelper, ui_server: UiServer, config: Config, server, session, drive: DriveSource, coord: Coordinator):
|
|
# Create a blocking sync task
|
|
coord._sync_wait.set()
|
|
sync = asyncio.create_task(coord.sync(), name="Sync from saving settings")
|
|
await coord._sync_start.wait()
|
|
assert not sync.cancelled()
|
|
assert not sync.done()
|
|
|
|
# Change some config
|
|
update = {
|
|
"config": {
|
|
"days_between_backups": 20,
|
|
"drive_ipv4": ""
|
|
},
|
|
"backup_folder": "unused"
|
|
}
|
|
assert await reader.postjson("saveconfig", json=update) == {'message': 'Settings saved', "reload_page": False}
|
|
|
|
# verify the previous sync is done and another one is running
|
|
assert sync.done()
|
|
assert coord.isSyncing()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_change_specify_folder_setting(reader: ReaderHelper, server, session, coord: Coordinator, folder_finder: FolderFinder):
|
|
await coord.sync()
|
|
assert folder_finder.getCachedFolder() is not None
|
|
|
|
old_folder = folder_finder.getCachedFolder()
|
|
# Change some config
|
|
update = {
|
|
"config": {
|
|
"specify_backup_folder": True
|
|
},
|
|
"backup_folder": ""
|
|
}
|
|
assert await reader.postjson("saveconfig", json=update) == {'message': 'Settings saved', "reload_page": False}
|
|
|
|
# verify the backup folder was reset, which triggers the error dialog to find a new folder
|
|
assert folder_finder.getCachedFolder() == old_folder
|
|
|
|
await coord.waitForSyncToFinish()
|
|
result = await reader.postjson("getstatus")
|
|
assert result["last_error"] is None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_change_specify_folder_setting_with_manual_creds(reader: ReaderHelper, google: SimulatedGoogle, session, coord: Coordinator, folder_finder: FolderFinder, drive: DriveSource, config):
|
|
google.resetDriveAuth()
|
|
drive.saveCreds(None)
|
|
assert not drive.enabled()
|
|
|
|
# get the auth url
|
|
req_path = URL("manualauth").with_query({
|
|
"client_id": google._custom_drive_client_id,
|
|
"client_secret": google._custom_drive_client_secret})
|
|
data = await reader.getjson(str(req_path))
|
|
|
|
# Authorize the device using the url and device code provided
|
|
authorize_url = URL(data["auth_url"]).with_query({"code": data['code']})
|
|
async with session.get(str(authorize_url), allow_redirects=False) as resp:
|
|
resp.raise_for_status()
|
|
|
|
# TODO: wait for creds in a smarter way
|
|
await asyncio.sleep(2)
|
|
|
|
assert drive.enabled()
|
|
assert drive.isCustomCreds()
|
|
|
|
await coord.sync()
|
|
assert folder_finder.getCachedFolder() is not None
|
|
|
|
# Specify the backup folder, which should cache the new one
|
|
update = {
|
|
"config": {
|
|
Setting.SPECIFY_BACKUP_FOLDER.value: True
|
|
},
|
|
"backup_folder": "12345"
|
|
}
|
|
assert await reader.postjson("saveconfig", json=update) == {'message': 'Settings saved', "reload_page": False}
|
|
assert folder_finder.getCachedFolder() == "12345"
|
|
|
|
# Un change the folder, which should keep the existing folder
|
|
update = {
|
|
"config": {
|
|
Setting.SPECIFY_BACKUP_FOLDER.value: False
|
|
},
|
|
"backup_folder": ""
|
|
}
|
|
assert await reader.postjson("saveconfig", json=update) == {'message': 'Settings saved', "reload_page": False}
|
|
assert folder_finder.getCachedFolder() == "12345"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_non_ui_setting(reader: ReaderHelper, server, session, coord: Coordinator, folder_finder: FolderFinder, config: Config):
|
|
await coord.sync()
|
|
# Change some config
|
|
update = {
|
|
"config": {
|
|
Setting.NEW_BACKUP_TIMEOUT_SECONDS.value: 10
|
|
},
|
|
"backup_folder": ""
|
|
}
|
|
assert await reader.postjson("saveconfig", json=update) == {'message': 'Settings saved', "reload_page": False}
|
|
|
|
assert config.get(Setting.NEW_BACKUP_TIMEOUT_SECONDS) == 10
|
|
|
|
update = {
|
|
"config": {
|
|
Setting.MAX_BACKUPS_IN_HA.value: 1
|
|
},
|
|
"backup_folder": ""
|
|
}
|
|
assert await reader.postjson("saveconfig", json=update) == {'message': 'Settings saved', "reload_page": False}
|
|
assert config.get(Setting.NEW_BACKUP_TIMEOUT_SECONDS) == 10
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_disable_drive(reader: ReaderHelper, server, coord: Coordinator, config: Config, drive_requests: DriveRequests):
|
|
# Disable drive
|
|
drive_requests.creds = None
|
|
os.remove(config.get(Setting.CREDENTIALS_FILE_PATH))
|
|
assert not coord.enabled()
|
|
await coord.sync()
|
|
assert len(coord.backups()) == 0
|
|
|
|
# Disable Drive Upload
|
|
update = {
|
|
"config": {
|
|
Setting.ENABLE_DRIVE_UPLOAD.value: False
|
|
},
|
|
"backup_folder": ""
|
|
}
|
|
assert await reader.postjson("saveconfig", json=update) == {'message': 'Settings saved', "reload_page": True}
|
|
assert config.get(Setting.ENABLE_DRIVE_UPLOAD) is False
|
|
|
|
# Verify the app is working fine.
|
|
assert coord.enabled()
|
|
await coord.waitForSyncToFinish()
|
|
assert len(coord.backups()) == 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_ignore(reader: ReaderHelper, time: FakeTime, coord: Coordinator, config: Config, supervisor: SimulatedSupervisor, ha: HaSource, drive: DriveSource):
|
|
config.override(Setting.IGNORE_UPGRADE_BACKUPS, True)
|
|
config.override(Setting.DAYS_BETWEEN_BACKUPS, 0)
|
|
|
|
# make an ignored_backup
|
|
slug = await supervisor.createBackup({'name': "Ignore_me", 'folders': ['homeassistant'], 'addons': []}, date=time.now())
|
|
|
|
await coord.sync()
|
|
assert len(await drive.get()) == 0
|
|
assert len(await ha.get()) == 1
|
|
assert len(coord.backups()) == 1
|
|
|
|
# Disable Drive Upload
|
|
update = {
|
|
"ignore": False,
|
|
"slug": slug,
|
|
}
|
|
await reader.postjson("ignore", json=update)
|
|
await coord.waitForSyncToFinish()
|
|
assert len(coord.backups()) == 1
|
|
assert len(await drive.get()) == 1
|
|
assert len(await ha.get()) == 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_check_ignored_backup_notification(reader: ReaderHelper, time: FakeTime, coord: Coordinator, config: Config, supervisor: SimulatedSupervisor, ha: HaSource, drive: DriveSource):
|
|
# Create an "ignored" backup after upgrade to the current version.
|
|
time.advance(days=1)
|
|
await supervisor.createBackup({'name': "test_name"}, date=time.now())
|
|
|
|
# cerate one that isn't ignored.
|
|
time.advance(days=1)
|
|
await ha.create(CreateOptions(time.now(), name_template=None))
|
|
|
|
update = {
|
|
"config": {
|
|
Setting.IGNORE_OTHER_BACKUPS.value: True
|
|
},
|
|
"backup_folder": ""
|
|
}
|
|
await reader.postjson("saveconfig", json=update)
|
|
await coord.waitForSyncToFinish()
|
|
|
|
status = await reader.getjson("getstatus")
|
|
assert status["backups"][0]["ignored"]
|
|
assert not status["backups"][1]["ignored"]
|
|
assert not status["notify_check_ignored"]
|
|
|
|
# Create an ignored backup from "before" the addon was upgraded to v0.104.0
|
|
await supervisor.createBackup({'name': "test_name"}, date=time.now() - timedelta(days=10))
|
|
await coord.sync()
|
|
|
|
# The UI should nofify about checking ignored backups
|
|
status = await reader.getjson("getstatus")
|
|
assert status["backups"][0]["ignored"]
|
|
assert status["backups"][1]["ignored"]
|
|
assert not status["backups"][2]["ignored"]
|
|
assert status["notify_check_ignored"]
|
|
|
|
# Acknowledge the notification
|
|
await reader.postjson("ackignorecheck") == {'message': "Acknowledged."}
|
|
status = await reader.getjson("getstatus")
|
|
assert not status["notify_check_ignored"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_snapshot_to_backup_upgrade_use_new_values(reader: ReaderHelper, time: FakeTime, coord: Coordinator, config: Config, supervisor: SimulatedSupervisor, ha: HaSource, drive: DriveSource, data_cache: DataCache, updater: HaUpdater):
|
|
""" Test the path where a user upgrades from the addon before the backup rename and then chooses to use the new names"""
|
|
status = await reader.getjson("getstatus")
|
|
assert not status["warn_backup_upgrade"]
|
|
|
|
# simulate upgrading config
|
|
supervisor._options = {
|
|
Setting.DEPRECTAED_MAX_BACKUPS_IN_HA.value: 7
|
|
}
|
|
await coord.sync()
|
|
assert Setting.CALL_BACKUP_SNAPSHOT.value in supervisor._options
|
|
assert Setting.DEPRECTAED_MAX_BACKUPS_IN_HA.value not in supervisor._options
|
|
assert config.get(Setting.CALL_BACKUP_SNAPSHOT)
|
|
|
|
status = await reader.getjson("getstatus")
|
|
assert status["warn_backup_upgrade"]
|
|
assert not data_cache.checkFlag(UpgradeFlags.NOTIFIED_ABOUT_BACKUP_RENAME)
|
|
assert not updater._trigger_once
|
|
|
|
# simulate user clicking the button to use new names
|
|
assert await reader.getjson("callbackupsnapshot?switch=true") == {'message': 'Configuration updated'}
|
|
assert data_cache.checkFlag(UpgradeFlags.NOTIFIED_ABOUT_BACKUP_RENAME)
|
|
assert not config.get(Setting.CALL_BACKUP_SNAPSHOT)
|
|
status = await reader.getjson("getstatus")
|
|
assert not status["warn_backup_upgrade"]
|
|
assert updater._trigger_once
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_snapshot_to_backup_upgrade_use_old_values(reader: ReaderHelper, time: FakeTime, coord: Coordinator, config: Config, supervisor: SimulatedSupervisor, ha: HaSource, drive: DriveSource, data_cache: DataCache, updater: HaUpdater):
|
|
""" Test the path where a user upgrades from the addon before the backup rename and then chooses to use the old names"""
|
|
status = await reader.getjson("getstatus")
|
|
assert not status["warn_backup_upgrade"]
|
|
|
|
# simulate upgrading config
|
|
supervisor._options = {
|
|
Setting.DEPRECTAED_MAX_BACKUPS_IN_HA.value: 7
|
|
}
|
|
await coord.sync()
|
|
assert Setting.CALL_BACKUP_SNAPSHOT.value in supervisor._options
|
|
assert config.get(Setting.CALL_BACKUP_SNAPSHOT)
|
|
|
|
status = await reader.getjson("getstatus")
|
|
assert status["warn_backup_upgrade"]
|
|
assert not data_cache.checkFlag(UpgradeFlags.NOTIFIED_ABOUT_BACKUP_RENAME)
|
|
assert not updater._trigger_once
|
|
|
|
# simulate user clicking the button to use new names
|
|
assert await reader.getjson("callbackupsnapshot?switch=false") == {'message': 'Configuration updated'}
|
|
assert data_cache.checkFlag(UpgradeFlags.NOTIFIED_ABOUT_BACKUP_RENAME)
|
|
status = await reader.getjson("getstatus")
|
|
assert not status["warn_backup_upgrade"]
|
|
assert config.get(Setting.CALL_BACKUP_SNAPSHOT)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_snapshot_to_backup_upgrade_avoid_default_overwrite(reader: ReaderHelper, time: FakeTime, coord: Coordinator, config: Config, supervisor: SimulatedSupervisor, ha: HaSource, drive: DriveSource, data_cache: DataCache, updater: HaUpdater):
|
|
""" Test the path where a user upgrades from the addon but a new value with a default value gets overwritten"""
|
|
status = await reader.getjson("getstatus")
|
|
assert not status["warn_backup_upgrade"]
|
|
|
|
# simulate upgrading config
|
|
supervisor._options = {
|
|
Setting.DEPRECTAED_MAX_BACKUPS_IN_HA.value: 7,
|
|
Setting.MAX_BACKUPS_IN_HA.value: 4 # defuault, should get overridden
|
|
}
|
|
await coord.sync()
|
|
assert Setting.CALL_BACKUP_SNAPSHOT.value in supervisor._options
|
|
assert config.get(Setting.CALL_BACKUP_SNAPSHOT)
|
|
assert config.get(Setting.MAX_BACKUPS_IN_HA) == 7
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ha_upload(reader: ReaderHelper, backup_helper, ui_server: UiServer, drive: DriveSource, ha: HaSource, config: Config, model, time):
|
|
from_backup, data = await backup_helper.createFile()
|
|
backup = await drive.save(from_backup, data)
|
|
|
|
config.override(Setting.DAYS_BETWEEN_BACKUPS, 0)
|
|
await model.sync(time.now())
|
|
assert len(await ha.get()) == 0
|
|
assert len(await drive.get()) == 1
|
|
|
|
reply = await reader.getjson(str(URL("upload").with_query({"slug": backup.slug()})))
|
|
assert reply['message'] == "Uploading backup in the background"
|
|
await ui_server.waitForUpload()
|
|
assert len(await ha.get()) == 1
|
|
|
|
|
|
async def setup_manual_creds(reader: ReaderHelper, server: SimulationServer, drive: DriveSource, session: ClientSession):
|
|
# get the auth url
|
|
req_path = URL("manualauth").with_query({
|
|
"client_id": server.google._custom_drive_client_id,
|
|
"client_secret": server.google._custom_drive_client_secret})
|
|
data = await reader.getjson(str(req_path))
|
|
assert "auth_url" in data
|
|
assert "code" in data
|
|
assert "expires" in data
|
|
|
|
periodic_check = await reader.getjson("checkManualAuth")
|
|
assert periodic_check['message'] == "Waiting for you to authorize the add-on."
|
|
|
|
# Authorize the device using the url and device code provided
|
|
drive._cred_trigger.clear()
|
|
authorize_url = URL(data["auth_url"]).with_query({"code": data['code']})
|
|
async with session.get(str(authorize_url), allow_redirects=False) as resp:
|
|
resp.raise_for_status()
|
|
|
|
await drive.debug_wait_for_credentials()
|
|
|
|
# verify creds are saved and drive is enabled
|
|
assert drive.enabled()
|
|
assert drive.isCustomCreds()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_oob_warning(reader: ReaderHelper, ui_server: UiServer, config: Config, server: SimulationServer, session, drive: DriveSource, data_cache: DataCache):
|
|
server.google._custom_drive_client_expiration = OOB_CRED_CUTOFF + timedelta(seconds=1)
|
|
assert not data_cache.checkFlag(UpgradeFlags.NOTIFIED_ABOUT_OOB_FLOW)
|
|
await setup_manual_creds(reader, server, drive, session)
|
|
assert data_cache.checkFlag(UpgradeFlags.NOTIFIED_ABOUT_OOB_FLOW)
|
|
data_cache.TESTS_ONLY_clearFlags()
|
|
status = await reader.getjson("getstatus")
|
|
assert status['warn_oob_oauth'] is False
|
|
|
|
server.google._custom_drive_client_expiration = OOB_CRED_CUTOFF - timedelta(seconds=1)
|
|
await setup_manual_creds(reader, server, drive, session)
|
|
data_cache.TESTS_ONLY_clearFlags()
|
|
status = await reader.getjson("getstatus")
|
|
assert status['warn_oob_oauth'] is True
|
|
|
|
data_cache.addFlag(UpgradeFlags.NOTIFIED_ABOUT_OOB_FLOW)
|
|
status = await reader.getjson("getstatus")
|
|
assert status['warn_oob_oauth'] is False
|