Files
hassio-addons-avm/hassio-google-drive-backup/tests/test_uiserver.py

1181 lines
47 KiB
Python

from datetime import timedelta
import os
import json
from os.path import abspath, join
from shutil import copyfile
from urllib.parse import quote
import aiohttp
import pytest
import asyncio
import base64
from aiohttp import BasicAuth
from aiohttp.client import ClientSession
from backup.file import File
from backup.util import AsyncHttpGetter, GlobalInfo, DataCache, UpgradeFlags
from backup.ui import UiServer, Restarter
from backup.config import Config, Setting, CreateOptions
from backup.const import (ERROR_CREDS_EXPIRED, ERROR_EXISTING_FOLDER,
ERROR_MULTIPLE_DELETES, ERROR_NO_BACKUP,
SOURCE_GOOGLE_DRIVE, SOURCE_HA)
from backup.creds import Creds
from backup.model import Coordinator, Backup
from backup.drive import DriveSource, FolderFinder, OOB_CRED_CUTOFF
from backup.drive.drivesource import FOLDER_MIME_TYPE, DriveRequests
from backup.ha import HaSource, HaUpdater
from backup.config import VERSION
from .faketime import FakeTime
from .helpers import compareStreams
from yarl import URL
from dev.ports import Ports
from dev.simulated_supervisor import SimulatedSupervisor
from dev.simulationserver import SimulationServer
from dev.simulated_google import SimulatedGoogle
from bs4 import BeautifulSoup
from .conftest import ReaderHelper
@pytest.fixture
def source(ha):
return ha
@pytest.fixture
def dest(drive):
return drive
@pytest.fixture
def simple_config(config):
return config
@pytest.fixture
async def restarter(injector, server):
restarter = injector.get(Restarter)
await restarter.start()
return restarter
@pytest.mark.asyncio
async def test_uiserver_start(ui_server: UiServer):
assert ui_server.running
@pytest.mark.asyncio
@pytest.mark.timeout(10)
async def test_uiserver_static_files(reader: ReaderHelper):
await reader.get("")
await reader.get("reauthenticate")
await reader.get("pp")
await reader.get("tos")
@pytest.mark.asyncio
async def test_getstatus(reader, config: Config, ha, server, ports: Ports):
File.touch(config.get(Setting.INGRESS_TOKEN_FILE_PATH))
await ha.init()
data = await reader.getjson("getstatus")
assert data['ask_error_reports'] is True
assert data['cred_version'] == 0
assert data['firstSync'] is True
assert data['folder_id'] is None
assert data['last_error'] is None
assert data['last_backup_text'] == "Never"
assert data['next_backup_text'] == "right now"
assert data['backup_name_template'] == config.get(Setting.BACKUP_NAME)
assert data['warn_ingress_upgrade'] is False
assert len(data['backups']) == 0
assert data['sources'][SOURCE_GOOGLE_DRIVE] == {
'deletable': 0,
'name': SOURCE_GOOGLE_DRIVE,
'retained': 0,
'backups': 0,
'latest': None,
'size': '0.0 B',
'enabled': True,
'max': config.get(Setting.MAX_BACKUPS_IN_GOOGLE_DRIVE),
'title': "Google Drive",
'icon': 'google-drive',
'ignored': 0,
'ignored_size': '0.0 B',
'detail': "",
}
assert data['sources'][SOURCE_HA] == {
'deletable': 0,
'name': SOURCE_HA,
'retained': 0,
'backups': 0,
'latest': None,
'size': '0.0 B',
'enabled': True,
'max': config.get(Setting.MAX_BACKUPS_IN_HA),
'title': "Home Assistant",
'free_space': "0.0 B",
'icon': 'home-assistant',
'ignored': 0,
'ignored_size': '0.0 B',
'detail': "",
}
assert len(data['sources']) == 2
@pytest.mark.asyncio
async def test_getstatus_sync(reader, config: Config, backup: Backup, time: FakeTime):
data = await reader.getjson("getstatus")
assert data['firstSync'] is False
assert data['folder_id'] is not None
assert data['last_error'] is None
assert data['last_backup_text'] != "Never"
assert data['next_backup_text'] != "right now"
assert len(data['backups']) == 1
assert data['sources'][SOURCE_GOOGLE_DRIVE] == {
'deletable': 1,
'name': SOURCE_GOOGLE_DRIVE,
'retained': 0,
'backups': 1,
'latest': time.asRfc3339String(time.now()),
'size': data['sources'][SOURCE_GOOGLE_DRIVE]['size'],
'enabled': True,
'max': config.get(Setting.MAX_BACKUPS_IN_GOOGLE_DRIVE),
'title': "Google Drive",
'icon': 'google-drive',
'free_space': "5.0 GB",
'ignored': 0,
'ignored_size': '0.0 B',
'detail': "testing@no.where",
}
assert data['sources'][SOURCE_HA] == {
'deletable': 1,
'name': SOURCE_HA,
'retained': 0,
'backups': 1,
'latest': time.asRfc3339String(time.now()),
'size': data['sources'][SOURCE_HA]['size'],
'enabled': True,
'max': config.get(Setting.MAX_BACKUPS_IN_HA),
'title': "Home Assistant",
'free_space': data['sources'][SOURCE_HA]['free_space'],
'icon': 'home-assistant',
'ignored': 0,
'ignored_size': '0.0 B',
'detail': "",
}
assert len(data['sources']) == 2
@pytest.mark.asyncio
async def test_retain(reader: ReaderHelper, config: Config, backup: Backup, coord: Coordinator, time: FakeTime):
slug = backup.slug()
assert await reader.getjson("retain", json={'slug': slug, 'sources': {"GoogleDrive": True, "HomeAssistant": True}}) == {
'message': "Updated the backup's settings"
}
status = await reader.getjson("getstatus")
assert status['sources'][SOURCE_GOOGLE_DRIVE] == {
'deletable': 0,
'name': SOURCE_GOOGLE_DRIVE,
'retained': 1,
'backups': 1,
'latest': time.asRfc3339String(backup.date()),
'size': status['sources'][SOURCE_GOOGLE_DRIVE]['size'],
'enabled': True,
'max': config.get(Setting.MAX_BACKUPS_IN_GOOGLE_DRIVE),
'title': "Google Drive",
'icon': 'google-drive',
'free_space': "5.0 GB",
'ignored': 0,
'ignored_size': '0.0 B',
'detail': "testing@no.where",
}
assert status['sources'][SOURCE_HA] == {
'deletable': 0,
'name': SOURCE_HA,
'retained': 1,
'backups': 1,
'latest': time.asRfc3339String(backup.date()),
'size': status['sources'][SOURCE_HA]['size'],
'enabled': True,
'max': config.get(Setting.MAX_BACKUPS_IN_HA),
'title': "Home Assistant",
'free_space': status['sources'][SOURCE_HA]["free_space"],
'icon': 'home-assistant',
'ignored': 0,
'ignored_size': '0.0 B',
'detail': "",
}
await reader.getjson("retain", json={'slug': slug, 'sources': {"GoogleDrive": False, "HomeAssistant": False}})
status = await reader.getjson("getstatus")
assert status['sources'][SOURCE_GOOGLE_DRIVE] == {
'deletable': 1,
'name': SOURCE_GOOGLE_DRIVE,
'retained': 0,
'backups': 1,
'latest': time.asRfc3339String(backup.date()),
'size': status['sources'][SOURCE_GOOGLE_DRIVE]['size'],
'enabled': True,
'max': config.get(Setting.MAX_BACKUPS_IN_GOOGLE_DRIVE),
'title': "Google Drive",
'icon': 'google-drive',
'free_space': "5.0 GB",
'ignored': 0,
'ignored_size': '0.0 B',
'detail': "testing@no.where",
}
assert status['sources'][SOURCE_HA] == {
'deletable': 1,
'name': SOURCE_HA,
'retained': 0,
'backups': 1,
'latest': time.asRfc3339String(backup.date()),
'size': status['sources'][SOURCE_HA]['size'],
'enabled': True,
'max': config.get(Setting.MAX_BACKUPS_IN_HA),
'title': "Home Assistant",
'free_space': status['sources'][SOURCE_HA]["free_space"],
'icon': 'home-assistant',
'ignored': 0,
'ignored_size': '0.0 B',
'detail': "",
}
delete_req = {
"slug": slug,
"sources": ["GoogleDrive"]
}
await reader.getjson("deleteSnapshot", json=delete_req)
await reader.getjson("retain", json={'slug': slug, 'sources': {"HomeAssistant": True}})
status = await reader.getjson("getstatus")
assert status['sources'][SOURCE_GOOGLE_DRIVE] == {
'deletable': 0,
'name': SOURCE_GOOGLE_DRIVE,
'retained': 0,
'backups': 0,
'latest': None,
'size': status['sources'][SOURCE_GOOGLE_DRIVE]['size'],
'enabled': True,
'max': config.get(Setting.MAX_BACKUPS_IN_GOOGLE_DRIVE),
'title': "Google Drive",
'icon': 'google-drive',
'free_space': "5.0 GB",
'ignored': 0,
'ignored_size': '0.0 B',
'detail': "testing@no.where",
}
assert status['sources'][SOURCE_HA] == {
'deletable': 0,
'name': SOURCE_HA,
'retained': 1,
'backups': 1,
'latest': time.asRfc3339String(backup.date()),
'size': status['sources'][SOURCE_HA]['size'],
'enabled': True,
'max': config.get(Setting.MAX_BACKUPS_IN_HA),
'title': "Home Assistant",
'free_space': status['sources'][SOURCE_HA]["free_space"],
'icon': 'home-assistant',
'ignored': 0,
'ignored_size': '0.0 B',
'detail': "",
}
# sync again, which should upoload the backup to Drive
await coord.sync()
status = await reader.getjson("getstatus")
assert status['sources'][SOURCE_GOOGLE_DRIVE]['backups'] == 1
assert status['sources'][SOURCE_GOOGLE_DRIVE]['retained'] == 0
assert status['sources'][SOURCE_GOOGLE_DRIVE]['backups'] == 1
@pytest.mark.asyncio
async def test_note(reader: ReaderHelper, config: Config, backup: Backup, coord: Coordinator, time: FakeTime):
slug = backup.slug()
assert backup.note() is None
assert await reader.getjson("note", json={'slug': slug, 'note': "This is the note"}) == {
'message': "Updated the backup's settings"
}
status = await reader.getjson("getstatus")
assert backup.note() == "This is the note"
assert status['backups'][0]['note'] == "This is the note"
@pytest.mark.asyncio
async def test_sync(reader, ui_server, coord: Coordinator, time: FakeTime, session):
assert len(coord.backups()) == 0
status = await reader.getjson("sync")
assert len(coord.backups()) == 1
assert status == await reader.getjson("getstatus")
time.advance(days=7)
assert len((await reader.getjson("sync"))['backups']) == 2
@pytest.mark.asyncio
async def test_delete(reader: ReaderHelper, ui_server, backup):
slug = backup.slug()
data = {"slug": "bad_slug", "sources": ["GoogleDrive"]}
await reader.assertError("deleteSnapshot", json=data, error_type=ERROR_NO_BACKUP)
status = await reader.getjson("getstatus")
assert len(status['backups']) == 1
data["slug"] = slug
assert await reader.getjson("deleteSnapshot", json=data) == {"message": "Deleted from 1 place(s)"}
await reader.assertError("deleteSnapshot", json=data, error_type=ERROR_NO_BACKUP)
status = await reader.getjson("getstatus")
assert len(status['backups']) == 1
assert status['sources'][SOURCE_GOOGLE_DRIVE]['backups'] == 0
data["sources"] = ["HomeAssistant"]
assert await reader.getjson("deleteSnapshot", json=data) == {"message": "Deleted from 1 place(s)"}
status = await reader.getjson("getstatus")
assert len(status['backups']) == 0
data["sources"] = []
await reader.assertError("deleteSnapshot", json=data, error_type=ERROR_NO_BACKUP)
@pytest.mark.asyncio
async def test_backup_now(reader, ui_server, time: FakeTime, backup: Backup, coord: Coordinator):
assert len(coord.backups()) == 1
assert (await reader.getjson("getstatus"))["backups"][0]["date"] == time.toLocal(time.now()).strftime("%c")
time.advance(hours=1)
assert await reader.getjson("backup?custom_name=TestName&retain_drive=False&retain_ha=False") == {
'message': "Requested backup 'TestName'"
}
status = await reader.getjson('getstatus')
assert len(status["backups"]) == 2
assert status["backups"][1]["date"] == time.toLocal(time.now()).strftime("%c")
assert status["backups"][1]["name"] == "TestName"
assert status["backups"][1]["note"] is None
assert status["backups"][1]['sources'][0]['retained'] is False
assert len(status["backups"][1]['sources']) == 1
time.advance(hours=1)
assert await reader.getjson("backup?custom_name=TestName2&retain_drive=True&retain_ha=False") == {
'message': "Requested backup 'TestName2'"
}
await coord.sync()
status = await reader.getjson('getstatus')
assert len(status["backups"]) == 3
assert status["backups"][2]["date"] == time.toLocal(time.now()).strftime("%c")
assert status["backups"][2]["name"] == "TestName2"
assert status["backups"][2]['sources'][0]['retained'] is False
assert status["backups"][2]['sources'][1]['retained'] is True
time.advance(hours=1)
assert await reader.getjson("backup?custom_name=TestName3&retain_drive=False&retain_ha=True") == {
'message': "Requested backup 'TestName3'"
}
await coord.sync()
status = await reader.getjson('getstatus')
assert len(status["backups"]) == 4
assert status["backups"][3]['sources'][0]['retained'] is True
assert status["backups"][3]['sources'][1]['retained'] is False
assert status["backups"][3]["date"] == time.toLocal(time.now()).strftime("%c")
assert status["backups"][3]["name"] == "TestName3"
@pytest.mark.asyncio
async def test_backup_now_with_note(reader, ui_server, time: FakeTime, coord: Coordinator):
assert len(coord.backups()) == 0
time.advance(hours=1)
assert await reader.getjson("backup?custom_name=TestName&retain_drive=False&retain_ha=False&note=ThisIsTheNote") == {
'message': "Requested backup 'TestName'"
}
await coord.sync()
status = await reader.getjson('getstatus')
assert status['backups'][0]["note"] == "ThisIsTheNote"
@pytest.mark.asyncio
async def test_config(reader, ui_server, config: Config, supervisor: SimulatedSupervisor):
update = {
"config": {
"days_between_backups": 20,
"drive_ipv4": ""
},
"backup_folder": "unused"
}
assert ui_server._starts == 1
assert await reader.postjson("saveconfig", json=update) == {'message': 'Settings saved', "reload_page": False}
assert config.get(Setting.DAYS_BETWEEN_BACKUPS) == 20
assert supervisor._options["days_between_backups"] == 20
assert ui_server._starts == 1
@pytest.mark.asyncio
async def test_auth_and_restart(reader, ui_server, config: Config, restarter, coord: Coordinator, supervisor: SimulatedSupervisor):
update = {"config": {"require_login": True,
"expose_extra_server": True}, "backup_folder": "unused"}
assert ui_server._starts == 1
assert not config.get(Setting.REQUIRE_LOGIN)
assert await reader.postjson("saveconfig", json=update) == {'message': 'Settings saved', "reload_page": False}
await restarter.waitForRestart()
assert config.get(Setting.REQUIRE_LOGIN)
assert supervisor._options['require_login']
assert ui_server._starts == 2
await reader.get("getstatus", status=401, ingress=False)
await reader.get("getstatus", auth=BasicAuth("user", "badpassword"), status=401, ingress=False)
await reader.get("getstatus", auth=BasicAuth("user", "pass"), ingress=False)
await coord.waitForSyncToFinish()
status = await reader.getjson("getstatus", auth=BasicAuth("user", "pass"), ingress=False)
# verify a the sync succeeded (no errors)
assert status["last_error"] is None
# The ingress server shouldn't require login, even though its turned on for the extra server
await reader.get("getstatus")
# even a bad user/pass should work
await reader.get("getstatus", auth=BasicAuth("baduser", "badpassword"))
@pytest.mark.asyncio
@pytest.mark.timeout(100)
async def test_expose_extra_server_option(reader, ui_server: UiServer, config: Config):
with pytest.raises(aiohttp.client_exceptions.ClientConnectionError):
await reader.getjson("sync", ingress=False)
config.override(Setting.EXPOSE_EXTRA_SERVER, True)
await ui_server.run()
await reader.getjson("sync", ingress=False)
await ui_server.run()
await reader.getjson("sync", ingress=False)
config.override(Setting.EXPOSE_EXTRA_SERVER, False)
await ui_server.run()
with pytest.raises(aiohttp.client_exceptions.ClientConnectionError):
await reader.getjson("sync", ingress=False)
await reader.getjson("sync")
@pytest.mark.asyncio
async def test_update_error_reports_true(reader, ui_server, config: Config, supervisor: SimulatedSupervisor):
assert config.get(Setting.SEND_ERROR_REPORTS) is False
assert not config.isExplicit(Setting.SEND_ERROR_REPORTS)
assert await reader.getjson("errorreports?send=true") == {'message': 'Configuration updated'}
assert config.get(Setting.SEND_ERROR_REPORTS) is True
assert config.isExplicit(Setting.SEND_ERROR_REPORTS)
assert supervisor._options["send_error_reports"] is True
@pytest.mark.asyncio
async def test_update_error_reports_false(reader, ui_server, config: Config, supervisor: SimulatedSupervisor):
assert config.get(Setting.SEND_ERROR_REPORTS) is False
assert not config.isExplicit(Setting.SEND_ERROR_REPORTS)
assert await reader.getjson("errorreports?send=false") == {'message': 'Configuration updated'}
assert config.get(Setting.SEND_ERROR_REPORTS) is False
assert config.isExplicit(Setting.SEND_ERROR_REPORTS)
assert supervisor._options["send_error_reports"] is False
@pytest.mark.asyncio
async def test_drive_cred_generation(reader: ReaderHelper, ui_server: UiServer, backup, config: Config, global_info: GlobalInfo, session: ClientSession, google):
status = await reader.getjson("getstatus")
assert len(status["backups"]) == 1
assert global_info.credVersion == 0
# Invalidate the drive creds, sync, then verify we see an error
google.expireCreds()
status = await reader.getjson("sync")
assert status["last_error"]["error_type"] == ERROR_CREDS_EXPIRED
# simulate the user going through the Drive authentication workflow
auth_url = URL(status['authenticate_url']).with_query({
"redirectbacktoken": reader.getUrl(True) + "token",
"version": VERSION,
"return": reader.getUrl(True)
})
async with session.get(auth_url) as resp:
resp.raise_for_status()
html = await resp.text()
page = BeautifulSoup(html, 'html.parser')
area = page.find("textarea")
creds = str(area.getText()).strip()
cred_url = URL(reader.getUrl(True) + "token").with_query({"creds": creds, "host": reader.getUrl(True)})
async with session.get(cred_url) as resp:
resp.raise_for_status()
# verify we got redirected to the addon main page.
assert resp.url == URL(reader.getUrl(True))
await ui_server.sync(None)
assert global_info._last_error is None
assert global_info.credVersion == 1
@pytest.mark.asyncio
async def test_confirm_multiple_deletes(reader, ui_server, server, config: Config, time: FakeTime, ha: HaSource):
# reconfigure to only store 1 backup
config.override(Setting.MAX_BACKUPS_IN_GOOGLE_DRIVE, 1)
config.override(Setting.MAX_BACKUPS_IN_HA, 1)
# create three backups
await ha.create(CreateOptions(time.now(), "Name1"))
await ha.create(CreateOptions(time.now(), "Name2"))
await ha.create(CreateOptions(time.now(), "Name3"))
# verify we have 3 backups an the multiple delete error
status = await reader.getjson("sync")
assert len(status['backups']) == 3
assert status["last_error"]["error_type"] == ERROR_MULTIPLE_DELETES
assert status["last_error"]["data"] == {
SOURCE_GOOGLE_DRIVE: 0,
SOURCE_GOOGLE_DRIVE + "_desc": '',
SOURCE_HA: 2,
SOURCE_HA + "_desc": "Name1\nName2"
}
# request that multiple deletes be allowed
assert await reader.getjson("confirmdelete?always=false") == {
'message': 'Backups deleted this one time'
}
assert config.get(Setting.CONFIRM_MULTIPLE_DELETES)
# backup, verify the deletes go through
status = await reader.getjson("sync")
assert status["last_error"] is None
assert len(status["backups"]) == 1
# create another backup, verify we delete the one
await ha.create(CreateOptions(time.now(), "Name1"))
status = await reader.getjson("sync")
assert len(status['backups']) == 1
assert status["last_error"] is None
# create two more backups, verify we see the error again
await ha.create(CreateOptions(time.now(), "Name1"))
await ha.create(CreateOptions(time.now(), "Name2"))
status = await reader.getjson("sync")
assert len(status['backups']) == 3
assert status["last_error"]["error_type"] == ERROR_MULTIPLE_DELETES
assert status["last_error"]["data"] == {
SOURCE_GOOGLE_DRIVE: 0,
SOURCE_GOOGLE_DRIVE + "_desc": '',
SOURCE_HA: 2,
SOURCE_HA + "_desc": "Name1\nName1"
}
@pytest.mark.asyncio
async def test_update_multiple_deletes_setting(reader, ui_server, server, config: Config, time: FakeTime, ha: HaSource, global_info: GlobalInfo):
assert await reader.getjson("confirmdelete?always=true") == {
'message': 'Configuration updated, I\'ll never ask again'
}
assert not config.get(Setting.CONFIRM_MULTIPLE_DELETES)
@pytest.mark.asyncio
async def test_resolve_folder_reuse(reader, config: Config, backup, time, drive):
# Simulate an existing folder error
old_folder = await drive.getFolderId()
File.delete(config.get(Setting.FOLDER_FILE_PATH))
time.advance(days=1)
status = await reader.getjson("sync")
assert status["last_error"]["error_type"] == ERROR_EXISTING_FOLDER
assert (await reader.getjson("resolvefolder?use_existing=true")) == {'message': 'Done'}
status = await reader.getjson("sync")
assert status["last_error"] is None
assert old_folder == await drive.getFolderId()
@pytest.mark.asyncio
async def test_resolve_folder_new(reader, config: Config, backup, time, drive):
# Simulate an existing folder error
old_folder = await drive.getFolderId()
File.delete(config.get(Setting.FOLDER_FILE_PATH))
time.advance(days=1)
status = await reader.getjson("sync")
assert status["last_error"]["error_type"] == ERROR_EXISTING_FOLDER
assert (await reader.getjson("resolvefolder?use_existing=false")) == {'message': 'Done'}
status = await reader.getjson("sync")
assert status["last_error"] is None
assert old_folder != await drive.getFolderId()
@pytest.mark.asyncio
async def test_ssl_server(reader: ReaderHelper, ui_server: UiServer, config, server, cleandir, restarter):
ssl_dir = abspath(join(__file__, "..", "..", "dev", "ssl"))
copyfile(join(ssl_dir, "localhost.crt"), join(cleandir, "localhost.crt"))
copyfile(join(ssl_dir, "localhost.key"), join(cleandir, "localhost.key"))
update = {
"config": {
"use_ssl": True,
"expose_extra_server": True,
"certfile": join(cleandir, "localhost.crt"),
"keyfile": join(cleandir, "localhost.key")
},
"backup_folder": "unused"
}
assert ui_server._starts == 1
assert await reader.postjson("saveconfig", json=update) == {'message': 'Settings saved', "reload_page": False}
await restarter.waitForRestart()
assert ui_server._starts == 2
@pytest.mark.asyncio
async def test_bad_ssl_config_missing_files(reader: ReaderHelper, ui_server: UiServer, config, server, cleandir, restarter):
update = {
"config": {
"use_ssl": True,
"expose_extra_server": True,
"certfile": join(cleandir, "localhost.crt"),
"keyfile": join(cleandir, "localhost.key")
},
"backup_folder": "unused"
}
assert ui_server._starts == 1
assert await reader.postjson("saveconfig", json=update) == {'message': 'Settings saved', "reload_page": False}
await restarter.waitForRestart()
assert ui_server._starts == 2
# Verify the ingress endpoint is still up, but not the SSL one
await reader.getjson("getstatus")
with pytest.raises(aiohttp.client_exceptions.ClientConnectionError):
await reader.getjson("getstatus", ingress=False, ssl=True, sslcontext=False)
@pytest.mark.asyncio
async def test_bad_ssl_config_wrong_files(reader: ReaderHelper, ui_server: UiServer, config, server, cleandir, restarter):
ssl_dir = abspath(join(__file__, "..", "..", "dev", "ssl"))
copyfile(join(ssl_dir, "localhost.crt"), join(cleandir, "localhost.crt"))
copyfile(join(ssl_dir, "localhost.key"), join(cleandir, "localhost.key"))
update = {
"config": {
"use_ssl": True,
"expose_extra_server": True,
"certfile": join(cleandir, "localhost.key"),
"keyfile": join(cleandir, "localhost.crt")
},
"backup_folder": "unused"
}
assert ui_server._starts == 1
assert await reader.postjson("saveconfig", json=update) == {'message': 'Settings saved', "reload_page": False}
await restarter.waitForRestart()
assert ui_server._starts == 2
# Verify the ingress endpoint is still up, but not the SSL one
await reader.getjson("getstatus")
with pytest.raises(aiohttp.client_exceptions.ClientConnectionError):
await reader.getjson("getstatus", ingress=False, ssl=True, sslcontext=False)
@pytest.mark.asyncio
async def test_download_drive(reader, ui_server, backup, drive: DriveSource, ha: HaSource, session, time):
await ha.delete(backup)
# download the item from Google Drive
from_drive = await drive.read(backup)
# Download rom the web server
from_server = AsyncHttpGetter(
reader.getUrl() + "download?slug=" + backup.slug(), {}, session, time=time)
await compareStreams(from_drive, from_server)
@pytest.mark.asyncio
async def test_download_home_assistant(reader: ReaderHelper, ui_server, backup, drive: DriveSource, ha: HaSource, session, time):
await drive.delete(backup)
# download the item from Google Drive
from_ha = await ha.read(backup)
# Download rom the web server
from_server = AsyncHttpGetter(
reader.getUrl() + "download?slug=" + backup.slug(), {}, session, time=time)
await compareStreams(from_ha, from_server)
@pytest.mark.asyncio
async def test_cancel_and_startsync(reader: ReaderHelper, coord: Coordinator):
coord._sync_wait.set()
status = await reader.getjson("startSync")
assert status["syncing"]
cancel = await reader.getjson('cancelSync')
assert not cancel["syncing"]
assert cancel["last_error"]["error_type"] == "cancelled"
@pytest.mark.asyncio
async def test_token(reader: ReaderHelper, coord: Coordinator, ha, drive: DriveSource):
creds = {
"client_id": "new_access_token",
"access_token": "new_access_token",
"refresh_token": "new_refresh_token",
"token_expiry": "2022-01-01T00:00:00"
}
serialized = str(base64.b64encode(json.dumps(creds).encode("utf-8")), "utf-8")
await reader.get("token?creds={0}&host={1}".format(quote(serialized), quote(reader.getUrl(True))))
assert drive.drivebackend.creds.access_token == 'new_access_token'
assert drive.drivebackend.creds.refresh_token == 'new_refresh_token'
assert drive.drivebackend.creds.secret is None
@pytest.mark.asyncio
async def test_token_with_secret(reader: ReaderHelper, coord: Coordinator, ha, drive: DriveSource):
creds = {
"client_id": "new_access_token",
"client_secret": "new_client_secret",
"access_token": "new_access_token",
"refresh_token": "new_refresh_token",
"token_expiry": "2022-01-01T00:00:00"
}
serialized = str(base64.b64encode(json.dumps(creds).encode("utf-8")), "utf-8")
await reader.get("token?creds={0}&host={1}".format(quote(serialized), quote(reader.getUrl(True))))
assert drive.drivebackend.creds.access_token == 'new_access_token'
assert drive.drivebackend.creds.refresh_token == 'new_refresh_token'
assert drive.drivebackend.creds.secret == 'new_client_secret'
@pytest.mark.asyncio
async def test_token_extra_server(reader: ReaderHelper, coord: Coordinator, ha, drive: DriveSource, restarter, time):
update = {
"config": {
"expose_extra_server": True
},
"backup_folder": "unused"
}
assert await reader.postjson("saveconfig", json=update) == {'message': 'Settings saved', "reload_page": False}
await restarter.waitForRestart()
creds = Creds(time, "id", time.now(), "token", "refresh")
serialized = str(base64.b64encode(json.dumps(creds.serialize()).encode("utf-8")), "utf-8")
await reader.get("token?creds={0}&host={1}".format(quote(serialized), quote(reader.getUrl(False))), ingress=False)
assert drive.drivebackend.creds.access_token == 'token'
@pytest.mark.asyncio
async def test_changefolder(reader: ReaderHelper, coord: Coordinator, ha, ui_server, folder_finder: FolderFinder):
assert await reader.get("changefolder?id=12345") == '{}'
assert await folder_finder.get() == "12345"
@pytest.mark.asyncio
async def test_changefolder_extra_server(reader: ReaderHelper, coord: Coordinator, ha, drive: DriveSource, restarter, ui_server, folder_finder: FolderFinder):
update = {
"config": {
"expose_extra_server": True
},
"backup_folder": "unused"
}
assert await reader.postjson("saveconfig", json=update) == {'message': 'Settings saved', "reload_page": False}
await restarter.waitForRestart()
# create a folder
folder_metadata = {
'name': "Other Folder",
'mimeType': FOLDER_MIME_TYPE,
'appProperties': {
"backup_folder": "true",
},
}
# create two folders at different times
id = (await drive.drivebackend.createFolder(folder_metadata))['id']
await reader.get("changefolder?id=" + str(id), ingress=False)
assert await folder_finder.get() == id
@pytest.mark.asyncio
async def test_update_sync_interval(reader, ui_server, config: Config, supervisor: SimulatedSupervisor):
# Make sure the default saves nothing
update = {
"config": {
"max_sync_interval_seconds": '3 hours',
},
"backup_folder": "unused"
}
assert await reader.postjson("saveconfig", json=update) == {'message': 'Settings saved', "reload_page": False}
assert config.get(Setting.MAX_SYNC_INTERVAL_SECONDS) == 60 * 60 * 3
assert "max_sync_interval_seconds" not in supervisor._options
# Update custom
update = {
"config": {
"max_sync_interval_seconds": '2 hours',
},
"backup_folder": "unused"
}
assert await reader.postjson("saveconfig", json=update) == {'message': 'Settings saved', "reload_page": False}
assert config.get(Setting.MAX_SYNC_INTERVAL_SECONDS) == 60 * 60 * 2
assert supervisor._options["max_sync_interval_seconds"] == 60 * 60 * 2
@pytest.mark.asyncio
async def test_manual_creds(reader: ReaderHelper, ui_server: UiServer, config: Config, server: SimulationServer, session, drive: DriveSource):
periodic_check = await reader.getjson("checkManualAuth")
assert periodic_check['message'] == "No request for authorization is in progress."
drive.saveCreds(None)
assert not drive.enabled()
await setup_manual_creds(reader, server, drive, session)
# Verify creds are saved and drive is enabled
assert drive.enabled()
assert drive.isCustomCreds()
@pytest.mark.asyncio
async def test_manual_creds_failure(reader: ReaderHelper, ui_server: UiServer, config: Config, server: SimulationServer, session, drive: DriveSource):
drive.saveCreds(None)
assert not drive.enabled()
# Try with a bad client_id
req_path = URL("manualauth").with_query({
"client_id": "wrong_id",
"client_secret": server.google._custom_drive_client_secret})
data = await reader.getjson(str(req_path), status=500)
assert data["message"] == "Google responded with error status HTTP 401. Please verify your credentials are set up correctly."
# Try with a bad client_secret
req_path = URL("manualauth").with_query({
"client_id": server.google._custom_drive_client_id,
"client_secret": "wrong_secret"})
data = await reader.getjson(str(req_path))
await asyncio.sleep(2)
periodic_check = await reader.getjson("checkManualAuth", status=500)
assert periodic_check['message'] == "Failed unexpectedly while trying to reach Google. See the add-on logs for details."
# verify creds are saved and drive is enabled
assert not drive.enabled()
@pytest.mark.asyncio
async def test_setting_cancels_and_resyncs(reader: ReaderHelper, ui_server: UiServer, config: Config, server, session, drive: DriveSource, coord: Coordinator):
# Create a blocking sync task
coord._sync_wait.set()
sync = asyncio.create_task(coord.sync(), name="Sync from saving settings")
await coord._sync_start.wait()
assert not sync.cancelled()
assert not sync.done()
# Change some config
update = {
"config": {
"days_between_backups": 20,
"drive_ipv4": ""
},
"backup_folder": "unused"
}
assert await reader.postjson("saveconfig", json=update) == {'message': 'Settings saved', "reload_page": False}
# verify the previous sync is done and another one is running
assert sync.done()
assert coord.isSyncing()
@pytest.mark.asyncio
async def test_change_specify_folder_setting(reader: ReaderHelper, server, session, coord: Coordinator, folder_finder: FolderFinder):
await coord.sync()
assert folder_finder.getCachedFolder() is not None
old_folder = folder_finder.getCachedFolder()
# Change some config
update = {
"config": {
"specify_backup_folder": True
},
"backup_folder": ""
}
assert await reader.postjson("saveconfig", json=update) == {'message': 'Settings saved', "reload_page": False}
# verify the backup folder was reset, which triggers the error dialog to find a new folder
assert folder_finder.getCachedFolder() == old_folder
await coord.waitForSyncToFinish()
result = await reader.postjson("getstatus")
assert result["last_error"] is None
@pytest.mark.asyncio
async def test_change_specify_folder_setting_with_manual_creds(reader: ReaderHelper, google: SimulatedGoogle, session, coord: Coordinator, folder_finder: FolderFinder, drive: DriveSource, config):
google.resetDriveAuth()
drive.saveCreds(None)
assert not drive.enabled()
# get the auth url
req_path = URL("manualauth").with_query({
"client_id": google._custom_drive_client_id,
"client_secret": google._custom_drive_client_secret})
data = await reader.getjson(str(req_path))
# Authorize the device using the url and device code provided
authorize_url = URL(data["auth_url"]).with_query({"code": data['code']})
async with session.get(str(authorize_url), allow_redirects=False) as resp:
resp.raise_for_status()
# TODO: wait for creds in a smarter way
await asyncio.sleep(2)
assert drive.enabled()
assert drive.isCustomCreds()
await coord.sync()
assert folder_finder.getCachedFolder() is not None
# Specify the backup folder, which should cache the new one
update = {
"config": {
Setting.SPECIFY_BACKUP_FOLDER.value: True
},
"backup_folder": "12345"
}
assert await reader.postjson("saveconfig", json=update) == {'message': 'Settings saved', "reload_page": False}
assert folder_finder.getCachedFolder() == "12345"
# Un change the folder, which should keep the existing folder
update = {
"config": {
Setting.SPECIFY_BACKUP_FOLDER.value: False
},
"backup_folder": ""
}
assert await reader.postjson("saveconfig", json=update) == {'message': 'Settings saved', "reload_page": False}
assert folder_finder.getCachedFolder() == "12345"
@pytest.mark.asyncio
async def test_update_non_ui_setting(reader: ReaderHelper, server, session, coord: Coordinator, folder_finder: FolderFinder, config: Config):
await coord.sync()
# Change some config
update = {
"config": {
Setting.NEW_BACKUP_TIMEOUT_SECONDS.value: 10
},
"backup_folder": ""
}
assert await reader.postjson("saveconfig", json=update) == {'message': 'Settings saved', "reload_page": False}
assert config.get(Setting.NEW_BACKUP_TIMEOUT_SECONDS) == 10
update = {
"config": {
Setting.MAX_BACKUPS_IN_HA.value: 1
},
"backup_folder": ""
}
assert await reader.postjson("saveconfig", json=update) == {'message': 'Settings saved', "reload_page": False}
assert config.get(Setting.NEW_BACKUP_TIMEOUT_SECONDS) == 10
@pytest.mark.asyncio
async def test_update_disable_drive(reader: ReaderHelper, server, coord: Coordinator, config: Config, drive_requests: DriveRequests):
# Disable drive
drive_requests.creds = None
os.remove(config.get(Setting.CREDENTIALS_FILE_PATH))
assert not coord.enabled()
await coord.sync()
assert len(coord.backups()) == 0
# Disable Drive Upload
update = {
"config": {
Setting.ENABLE_DRIVE_UPLOAD.value: False
},
"backup_folder": ""
}
assert await reader.postjson("saveconfig", json=update) == {'message': 'Settings saved', "reload_page": True}
assert config.get(Setting.ENABLE_DRIVE_UPLOAD) is False
# Verify the app is working fine.
assert coord.enabled()
await coord.waitForSyncToFinish()
assert len(coord.backups()) == 1
@pytest.mark.asyncio
async def test_update_ignore(reader: ReaderHelper, time: FakeTime, coord: Coordinator, config: Config, supervisor: SimulatedSupervisor, ha: HaSource, drive: DriveSource):
config.override(Setting.IGNORE_UPGRADE_BACKUPS, True)
config.override(Setting.DAYS_BETWEEN_BACKUPS, 0)
# make an ignored_backup
slug = await supervisor.createBackup({'name': "Ignore_me", 'folders': ['homeassistant'], 'addons': []}, date=time.now())
await coord.sync()
assert len(await drive.get()) == 0
assert len(await ha.get()) == 1
assert len(coord.backups()) == 1
# Disable Drive Upload
update = {
"ignore": False,
"slug": slug,
}
await reader.postjson("ignore", json=update)
await coord.waitForSyncToFinish()
assert len(coord.backups()) == 1
assert len(await drive.get()) == 1
assert len(await ha.get()) == 1
@pytest.mark.asyncio
async def test_check_ignored_backup_notification(reader: ReaderHelper, time: FakeTime, coord: Coordinator, config: Config, supervisor: SimulatedSupervisor, ha: HaSource, drive: DriveSource):
# Create an "ignored" backup after upgrade to the current version.
time.advance(days=1)
await supervisor.createBackup({'name': "test_name"}, date=time.now())
# cerate one that isn't ignored.
time.advance(days=1)
await ha.create(CreateOptions(time.now(), name_template=None))
update = {
"config": {
Setting.IGNORE_OTHER_BACKUPS.value: True
},
"backup_folder": ""
}
await reader.postjson("saveconfig", json=update)
await coord.waitForSyncToFinish()
status = await reader.getjson("getstatus")
assert status["backups"][0]["ignored"]
assert not status["backups"][1]["ignored"]
assert not status["notify_check_ignored"]
# Create an ignored backup from "before" the addon was upgraded to v0.104.0
await supervisor.createBackup({'name': "test_name"}, date=time.now() - timedelta(days=10))
await coord.sync()
# The UI should nofify about checking ignored backups
status = await reader.getjson("getstatus")
assert status["backups"][0]["ignored"]
assert status["backups"][1]["ignored"]
assert not status["backups"][2]["ignored"]
assert status["notify_check_ignored"]
# Acknowledge the notification
await reader.postjson("ackignorecheck") == {'message': "Acknowledged."}
status = await reader.getjson("getstatus")
assert not status["notify_check_ignored"]
@pytest.mark.asyncio
async def test_snapshot_to_backup_upgrade_use_new_values(reader: ReaderHelper, time: FakeTime, coord: Coordinator, config: Config, supervisor: SimulatedSupervisor, ha: HaSource, drive: DriveSource, data_cache: DataCache, updater: HaUpdater):
""" Test the path where a user upgrades from the addon before the backup rename and then chooses to use the new names"""
status = await reader.getjson("getstatus")
assert not status["warn_backup_upgrade"]
# simulate upgrading config
supervisor._options = {
Setting.DEPRECTAED_MAX_BACKUPS_IN_HA.value: 7
}
await coord.sync()
assert Setting.CALL_BACKUP_SNAPSHOT.value in supervisor._options
assert Setting.DEPRECTAED_MAX_BACKUPS_IN_HA.value not in supervisor._options
assert config.get(Setting.CALL_BACKUP_SNAPSHOT)
status = await reader.getjson("getstatus")
assert status["warn_backup_upgrade"]
assert not data_cache.checkFlag(UpgradeFlags.NOTIFIED_ABOUT_BACKUP_RENAME)
assert not updater._trigger_once
# simulate user clicking the button to use new names
assert await reader.getjson("callbackupsnapshot?switch=true") == {'message': 'Configuration updated'}
assert data_cache.checkFlag(UpgradeFlags.NOTIFIED_ABOUT_BACKUP_RENAME)
assert not config.get(Setting.CALL_BACKUP_SNAPSHOT)
status = await reader.getjson("getstatus")
assert not status["warn_backup_upgrade"]
assert updater._trigger_once
@pytest.mark.asyncio
async def test_snapshot_to_backup_upgrade_use_old_values(reader: ReaderHelper, time: FakeTime, coord: Coordinator, config: Config, supervisor: SimulatedSupervisor, ha: HaSource, drive: DriveSource, data_cache: DataCache, updater: HaUpdater):
""" Test the path where a user upgrades from the addon before the backup rename and then chooses to use the old names"""
status = await reader.getjson("getstatus")
assert not status["warn_backup_upgrade"]
# simulate upgrading config
supervisor._options = {
Setting.DEPRECTAED_MAX_BACKUPS_IN_HA.value: 7
}
await coord.sync()
assert Setting.CALL_BACKUP_SNAPSHOT.value in supervisor._options
assert config.get(Setting.CALL_BACKUP_SNAPSHOT)
status = await reader.getjson("getstatus")
assert status["warn_backup_upgrade"]
assert not data_cache.checkFlag(UpgradeFlags.NOTIFIED_ABOUT_BACKUP_RENAME)
assert not updater._trigger_once
# simulate user clicking the button to use new names
assert await reader.getjson("callbackupsnapshot?switch=false") == {'message': 'Configuration updated'}
assert data_cache.checkFlag(UpgradeFlags.NOTIFIED_ABOUT_BACKUP_RENAME)
status = await reader.getjson("getstatus")
assert not status["warn_backup_upgrade"]
assert config.get(Setting.CALL_BACKUP_SNAPSHOT)
@pytest.mark.asyncio
async def test_snapshot_to_backup_upgrade_avoid_default_overwrite(reader: ReaderHelper, time: FakeTime, coord: Coordinator, config: Config, supervisor: SimulatedSupervisor, ha: HaSource, drive: DriveSource, data_cache: DataCache, updater: HaUpdater):
""" Test the path where a user upgrades from the addon but a new value with a default value gets overwritten"""
status = await reader.getjson("getstatus")
assert not status["warn_backup_upgrade"]
# simulate upgrading config
supervisor._options = {
Setting.DEPRECTAED_MAX_BACKUPS_IN_HA.value: 7,
Setting.MAX_BACKUPS_IN_HA.value: 4 # defuault, should get overridden
}
await coord.sync()
assert Setting.CALL_BACKUP_SNAPSHOT.value in supervisor._options
assert config.get(Setting.CALL_BACKUP_SNAPSHOT)
assert config.get(Setting.MAX_BACKUPS_IN_HA) == 7
@pytest.mark.asyncio
async def test_ha_upload(reader: ReaderHelper, backup_helper, ui_server: UiServer, drive: DriveSource, ha: HaSource, config: Config, model, time):
from_backup, data = await backup_helper.createFile()
backup = await drive.save(from_backup, data)
config.override(Setting.DAYS_BETWEEN_BACKUPS, 0)
await model.sync(time.now())
assert len(await ha.get()) == 0
assert len(await drive.get()) == 1
reply = await reader.getjson(str(URL("upload").with_query({"slug": backup.slug()})))
assert reply['message'] == "Uploading backup in the background"
await ui_server.waitForUpload()
assert len(await ha.get()) == 1
async def setup_manual_creds(reader: ReaderHelper, server: SimulationServer, drive: DriveSource, session: ClientSession):
# get the auth url
req_path = URL("manualauth").with_query({
"client_id": server.google._custom_drive_client_id,
"client_secret": server.google._custom_drive_client_secret})
data = await reader.getjson(str(req_path))
assert "auth_url" in data
assert "code" in data
assert "expires" in data
periodic_check = await reader.getjson("checkManualAuth")
assert periodic_check['message'] == "Waiting for you to authorize the add-on."
# Authorize the device using the url and device code provided
drive._cred_trigger.clear()
authorize_url = URL(data["auth_url"]).with_query({"code": data['code']})
async with session.get(str(authorize_url), allow_redirects=False) as resp:
resp.raise_for_status()
await drive.debug_wait_for_credentials()
# verify creds are saved and drive is enabled
assert drive.enabled()
assert drive.isCustomCreds()
@pytest.mark.asyncio
async def test_oob_warning(reader: ReaderHelper, ui_server: UiServer, config: Config, server: SimulationServer, session, drive: DriveSource, data_cache: DataCache):
server.google._custom_drive_client_expiration = OOB_CRED_CUTOFF + timedelta(seconds=1)
assert not data_cache.checkFlag(UpgradeFlags.NOTIFIED_ABOUT_OOB_FLOW)
await setup_manual_creds(reader, server, drive, session)
assert data_cache.checkFlag(UpgradeFlags.NOTIFIED_ABOUT_OOB_FLOW)
data_cache.TESTS_ONLY_clearFlags()
status = await reader.getjson("getstatus")
assert status['warn_oob_oauth'] is False
server.google._custom_drive_client_expiration = OOB_CRED_CUTOFF - timedelta(seconds=1)
await setup_manual_creds(reader, server, drive, session)
data_cache.TESTS_ONLY_clearFlags()
status = await reader.getjson("getstatus")
assert status['warn_oob_oauth'] is True
data_cache.addFlag(UpgradeFlags.NOTIFIED_ABOUT_OOB_FLOW)
status = await reader.getjson("getstatus")
assert status['warn_oob_oauth'] is False