mirror of
https://github.com/Mesteriis/hassio-addons-avm.git
synced 2026-01-09 23:11:02 +01:00
1169 lines
48 KiB
Python
1169 lines
48 KiB
Python
import asyncio
|
|
from datetime import timedelta
|
|
import os
|
|
|
|
import pytest
|
|
from aiohttp.client_exceptions import ClientResponseError
|
|
|
|
from backup.config import Config, Setting, CreateOptions, Version
|
|
from backup.const import SOURCE_HA
|
|
from backup.exceptions import (HomeAssistantDeleteError, BackupInProgress,
|
|
BackupPasswordKeyInvalid, UploadFailed, SupervisorConnectionError, SupervisorPermissionError, SupervisorTimeoutError, UnknownNetworkStorageError, InactiveNetworkStorageError)
|
|
from backup.util import GlobalInfo, DataCache, KEY_CREATED, KEY_LAST_SEEN, KEY_NAME
|
|
from backup.ha import HaSource, PendingBackup, EVENT_BACKUP_END, EVENT_BACKUP_START, HABackup, Password, AddonStopper
|
|
from backup.model import DummyBackup
|
|
from dev.simulationserver import SimulationServer
|
|
from .faketime import FakeTime
|
|
from .helpers import all_addons, all_folders, createBackupTar, getTestStream
|
|
from dev.simulated_supervisor import SimulatedSupervisor, URL_MATCH_SELF_OPTIONS, URL_MATCH_START_ADDON, URL_MATCH_STOP_ADDON, URL_MATCH_BACKUP_FULL, URL_MATCH_BACKUP_DELETE, URL_MATCH_MISC_INFO, URL_MATCH_BACKUP_DOWNLOAD, URL_MATCH_BACKUPS, URL_MATCH_SNAPSHOT, URL_MATCH_MOUNT
|
|
from dev.request_interceptor import RequestInterceptor
|
|
from backup.model import Model
|
|
from backup.time import Time
|
|
from yarl import URL
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_sync_empty(ha) -> None:
|
|
assert len(await ha.get()) == 0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_CRUD(ha: HaSource, time, interceptor: RequestInterceptor, data_cache: DataCache) -> None:
|
|
backup: HABackup = await ha.create(CreateOptions(time.now(), "Test Name"))
|
|
|
|
assert backup.name() == "Test Name"
|
|
assert type(backup) is HABackup
|
|
assert not backup.retained()
|
|
assert backup.backupType() == "full"
|
|
assert not backup.protected()
|
|
assert backup.name() == "Test Name"
|
|
assert backup.source() == SOURCE_HA
|
|
assert not backup.ignore()
|
|
assert backup.madeByTheAddon()
|
|
assert "pending" not in data_cache.backups
|
|
|
|
# read the item directly, its metadata should match
|
|
from_ha = await ha.harequests.backup(backup.slug())
|
|
assert from_ha.size() == backup.size()
|
|
assert from_ha.slug() == backup.slug()
|
|
assert from_ha.source() == SOURCE_HA
|
|
|
|
backups = await ha.get()
|
|
assert len(backups) == 1
|
|
assert backup.slug() in backups
|
|
|
|
full = DummyBackup(from_ha.name(), from_ha.date(),
|
|
from_ha.size(), from_ha.slug(), "dummy")
|
|
full.addSource(backup)
|
|
|
|
# download the item, its bytes should match up
|
|
download = await ha.read(full)
|
|
await download.setup()
|
|
direct_download = await ha.harequests.download(backup.slug())
|
|
await direct_download.setup()
|
|
while True:
|
|
from_file = await direct_download.read(1024 * 1024)
|
|
from_download = await download.read(1024 * 1024)
|
|
if len(from_file.getbuffer()) == 0:
|
|
assert len(from_download.getbuffer()) == 0
|
|
break
|
|
assert from_file.getbuffer() == from_download.getbuffer()
|
|
|
|
# update retention
|
|
assert not backup.retained()
|
|
await ha.retain(full, True)
|
|
assert (await ha.get())[full.slug()].retained()
|
|
await ha.retain(full, False)
|
|
assert not (await ha.get())[full.slug()].retained()
|
|
|
|
# Delete the item, make sure its gone
|
|
await ha.delete(full)
|
|
assert full.getSource(ha.name()) is None
|
|
backups = await ha.get()
|
|
assert len(backups) == 0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.timeout(10)
|
|
async def test_pending_backup_nowait(ha: HaSource, time: Time, supervisor: SimulatedSupervisor, interceptor: RequestInterceptor, config: Config, data_cache: DataCache):
|
|
interceptor.setSleep(URL_MATCH_BACKUP_FULL, sleep=5)
|
|
config.override(Setting.NEW_BACKUP_TIMEOUT_SECONDS, 0.1)
|
|
backup_immediate: PendingBackup = await ha.create(CreateOptions(time.now(), "Test Name"))
|
|
assert isinstance(backup_immediate, PendingBackup)
|
|
backup_pending: HABackup = (await ha.get())['pending']
|
|
|
|
assert isinstance(backup_immediate, PendingBackup)
|
|
assert isinstance(backup_pending, PendingBackup)
|
|
assert backup_immediate is backup_pending
|
|
assert backup_immediate.name() == "Test Name"
|
|
assert backup_immediate.slug() == "pending"
|
|
assert not backup_immediate.uploadable()
|
|
assert backup_immediate.backupType() == "Full"
|
|
assert backup_immediate.source() == SOURCE_HA
|
|
assert backup_immediate.date() == time.now()
|
|
assert not backup_immediate.protected()
|
|
assert not backup_immediate.ignore()
|
|
assert backup_immediate.madeByTheAddon()
|
|
assert data_cache.backup("pending") == {
|
|
KEY_CREATED: time.now().isoformat(),
|
|
KEY_LAST_SEEN: time.now().isoformat(),
|
|
KEY_NAME: "Test Name"
|
|
}
|
|
|
|
# Might be a little flaky but...whatever
|
|
await asyncio.wait({ha._pending_backup_task})
|
|
|
|
backups = await ha.get()
|
|
assert 'pending' not in backups
|
|
assert len(backups) == 1
|
|
backup = next(iter(backups.values()))
|
|
assert isinstance(backup, HABackup)
|
|
assert not backup.ignore()
|
|
assert backup.madeByTheAddon()
|
|
assert data_cache.backup(backup.slug())[KEY_LAST_SEEN] == time.now().isoformat()
|
|
assert "pending" not in data_cache.backups
|
|
|
|
return
|
|
# ignroe events for now
|
|
assert supervisor.getEvents() == [
|
|
(EVENT_BACKUP_START, {
|
|
'backup_name': backup_immediate.name(),
|
|
'backup_time': str(backup_immediate.date())})]
|
|
ha.backup_thread.join()
|
|
assert supervisor.getEvents() == [
|
|
(EVENT_BACKUP_START, {
|
|
'backup_name': backup_immediate.name(),
|
|
'backup_time': str(backup_immediate.date())}),
|
|
(EVENT_BACKUP_END, {
|
|
'completed': True,
|
|
'backup_name': backup_immediate.name(),
|
|
'backup_time': str(backup_immediate.date())})]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_pending_backup_already_in_progress(ha, time, config: Config, supervisor: SimulatedSupervisor):
|
|
await ha.create(CreateOptions(time.now(), "Test Name"))
|
|
assert len(await ha.get()) == 1
|
|
|
|
config.override(Setting.NEW_BACKUP_TIMEOUT_SECONDS, 100)
|
|
await supervisor.toggleBlockBackup()
|
|
with pytest.raises(BackupInProgress):
|
|
await ha.create(CreateOptions(time.now(), "Test Name"))
|
|
backups = list((await ha.get()).values())
|
|
assert len(backups) == 2
|
|
backup = backups[1]
|
|
|
|
assert isinstance(backup, PendingBackup)
|
|
assert backup.name() == "Pending Backup"
|
|
assert backup.slug() == "pending"
|
|
assert not backup.uploadable()
|
|
assert backup.backupType() == "unknown"
|
|
assert backup.source() == SOURCE_HA
|
|
assert backup.date() == time.now()
|
|
assert not backup.protected()
|
|
|
|
with pytest.raises(BackupInProgress):
|
|
await ha.create(CreateOptions(time.now(), "Test Name"))
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_partial_backup(ha, time, server, config: Config):
|
|
config.override(Setting.NEW_BACKUP_TIMEOUT_SECONDS, 100)
|
|
for folder in all_folders:
|
|
config.override(Setting.EXCLUDE_FOLDERS, folder)
|
|
backup: HABackup = await ha.create(CreateOptions(time.now(), "Test Name"))
|
|
|
|
assert backup.backupType() == "partial"
|
|
for search in all_folders:
|
|
if search == folder:
|
|
assert search not in backup.details()['folders']
|
|
else:
|
|
assert search in backup.details()['folders']
|
|
|
|
for addon in all_addons:
|
|
config.override(Setting.EXCLUDE_ADDONS, addon['slug'])
|
|
backup: HABackup = await ha.create(CreateOptions(time.now(), "Test Name"))
|
|
assert backup.backupType() == "partial"
|
|
list_of_addons = []
|
|
for included in backup.details()['addons']:
|
|
list_of_addons.append(included['slug'])
|
|
for search in list_of_addons:
|
|
if search == addon:
|
|
assert search not in list_of_addons
|
|
else:
|
|
assert search in list_of_addons
|
|
|
|
# excluding addon/folders that don't exist should actually make a full backup
|
|
config.override(Setting.EXCLUDE_ADDONS, "none,of.these,are.addons")
|
|
config.override(Setting.EXCLUDE_FOLDERS, "not,folders,either")
|
|
backup: HABackup = await ha.create(CreateOptions(time.now(), "Test Name"))
|
|
assert backup.backupType() == "full"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_backup_password(ha: HaSource, config: Config, time):
|
|
config.override(Setting.NEW_BACKUP_TIMEOUT_SECONDS, 100)
|
|
backup: HABackup = await ha.create(CreateOptions(time.now(), "Test Name"))
|
|
assert not backup.protected()
|
|
|
|
config.override(Setting.BACKUP_PASSWORD, 'test')
|
|
backup = await ha.create(CreateOptions(time.now(), "Test Name"))
|
|
assert backup.protected()
|
|
|
|
config.override(Setting.BACKUP_PASSWORD, 'test')
|
|
assert Password(ha.config).resolve() == 'test'
|
|
|
|
config.override(Setting.BACKUP_PASSWORD, '!secret for_unit_tests')
|
|
assert Password(ha.config).resolve() == 'password value'
|
|
|
|
config.override(Setting.BACKUP_PASSWORD, '!secret bad_key')
|
|
with pytest.raises(BackupPasswordKeyInvalid):
|
|
Password(config).resolve()
|
|
|
|
config.override(Setting.SECRETS_FILE_PATH, "/bad/file/path")
|
|
config.override(Setting.BACKUP_PASSWORD, '!secret for_unit_tests')
|
|
with pytest.raises(BackupPasswordKeyInvalid):
|
|
Password(ha.config).resolve()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_backup_name(time: FakeTime, ha):
|
|
time.setNow(time.local(1985, 12, 6, 15, 8, 9, 10))
|
|
await assertName(ha, time.now(), "{type}", "Full")
|
|
await assertName(ha, time.now(), "{year}", "1985")
|
|
await assertName(ha, time.now(), "{year_short}", "85")
|
|
await assertName(ha, time.now(), "{weekday}", "Friday")
|
|
await assertName(ha, time.now(), "{weekday_short}", "Fri")
|
|
await assertName(ha, time.now(), "{month}", "12")
|
|
await assertName(ha, time.now(), "{month_long}", "December")
|
|
await assertName(ha, time.now(), "{month_short}", "Dec")
|
|
await assertName(ha, time.now(), "{ms}", "000010")
|
|
await assertName(ha, time.now(), "{day}", "06")
|
|
await assertName(ha, time.now(), "{hr24}", "15")
|
|
await assertName(ha, time.now(), "{hr12}", "03")
|
|
await assertName(ha, time.now(), "{min}", "08")
|
|
await assertName(ha, time.now(), "{sec}", "09")
|
|
await assertName(ha, time.now(), "{ampm}", "PM")
|
|
await assertName(ha, time.now(), "{version_ha}", "ha version")
|
|
await assertName(ha, time.now(), "{version_hassos}", "hassos version")
|
|
await assertName(ha, time.now(), "{version_super}", "super version")
|
|
await assertName(ha, time.now(), "{date}", "12/06/85")
|
|
await assertName(ha, time.now(), "{time}", "15:08:09")
|
|
await assertName(ha, time.now(), "{datetime}", "Fri Dec 6 15:08:09 1985")
|
|
await assertName(ha, time.now(), "{isotime}", "1985-12-06T15:08:09.000010-05:00")
|
|
|
|
|
|
async def assertName(ha: HaSource, time, template: str, expected: str):
|
|
backup: HABackup = await ha.create(CreateOptions(time, template))
|
|
assert backup.name() == expected
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_default_name(time: FakeTime, ha, server):
|
|
backup = await ha.create(CreateOptions(time.now(), ""))
|
|
assert backup.name() == "Full Backup 1985-12-06 00:00:00"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_pending_backup_timeout(time: FakeTime, ha: HaSource, config: Config, interceptor: RequestInterceptor, supervisor: SimulatedSupervisor):
|
|
config.override(Setting.NEW_BACKUP_TIMEOUT_SECONDS, 1)
|
|
config.override(Setting.FAILED_BACKUP_TIMEOUT_SECONDS, 1)
|
|
config.override(Setting.PENDING_BACKUP_TIMEOUT_SECONDS, 1)
|
|
|
|
async with supervisor._backup_inner_lock:
|
|
backup_immediate: PendingBackup = await ha.create(CreateOptions(time.now(), "Test Name"))
|
|
assert isinstance(backup_immediate, PendingBackup)
|
|
assert backup_immediate.name() == "Test Name"
|
|
assert not await ha.check()
|
|
assert ha.pending_backup is backup_immediate
|
|
|
|
await asyncio.wait({ha._pending_backup_task})
|
|
assert ha.pending_backup is backup_immediate
|
|
assert await ha.check()
|
|
assert not await ha.check()
|
|
|
|
time.advance(minutes=1)
|
|
assert await ha.check()
|
|
assert len(await ha.get()) == 0
|
|
assert not await ha.check()
|
|
assert ha.pending_backup is None
|
|
assert backup_immediate.isStale()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_pending_backup_timeout_external(time, config, ha: HaSource, supervisor: SimulatedSupervisor):
|
|
# now configure a snapshto to start outside of the addon
|
|
config.override(Setting.NEW_BACKUP_TIMEOUT_SECONDS, 100)
|
|
await supervisor.toggleBlockBackup()
|
|
with pytest.raises(BackupInProgress):
|
|
await ha.create(CreateOptions(time.now(), "Ignored"))
|
|
backup_immediate = (await ha.get())['pending']
|
|
await supervisor.toggleBlockBackup()
|
|
assert isinstance(backup_immediate, PendingBackup)
|
|
assert backup_immediate.name() == "Pending Backup"
|
|
assert await ha.check()
|
|
assert not await ha.check()
|
|
assert ha.pending_backup is backup_immediate
|
|
|
|
# should clean up after a day, since we're still waiting on the backup thread.
|
|
time.advanceDay()
|
|
assert await ha.check()
|
|
assert len(await ha.get()) == 0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_pending_backup_replaces_original(time, ha: HaSource, config: Config, supervisor: SimulatedSupervisor):
|
|
# now configure a snapshto to start outside of the addon
|
|
config.override(Setting.NEW_BACKUP_TIMEOUT_SECONDS, 100)
|
|
await supervisor.toggleBlockBackup()
|
|
with pytest.raises(BackupInProgress):
|
|
await ha.create(CreateOptions(time.now(), "Ignored"))
|
|
backup_immediate = (await ha.get())['pending']
|
|
await supervisor.toggleBlockBackup()
|
|
assert isinstance(backup_immediate, PendingBackup)
|
|
assert backup_immediate.name() == "Pending Backup"
|
|
assert await ha.check()
|
|
assert ha.pending_backup is backup_immediate
|
|
assert await ha.get() == {backup_immediate.slug(): backup_immediate}
|
|
|
|
# create a new backup behind the scenes, the pending backup should get replaced with the new one
|
|
slug = (await ha.harequests.createBackup({'name': "Suddenly Appears", "hardlock": True}))['slug']
|
|
results = await ha.get()
|
|
assert len(results) == 1
|
|
assert slug in results
|
|
assert results[slug].name() == "Suddenly Appears"
|
|
assert not results[slug].retained()
|
|
|
|
|
|
def test_retryable_errors():
|
|
# SOMEDAY: retryable errors should be retried in the future
|
|
pass
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_retained_on_finish(ha: HaSource, server, time, config: Config, supervisor: SimulatedSupervisor):
|
|
async with supervisor._backup_inner_lock:
|
|
retention = {ha.name(): True}
|
|
config.override(Setting.NEW_BACKUP_TIMEOUT_SECONDS, 0.0001)
|
|
pending = await ha.create(CreateOptions(time.now(), "Test Name", retention))
|
|
results = await ha.get()
|
|
assert pending.name() == "Test Name"
|
|
assert results == {pending.slug(): pending}
|
|
assert type(pending) == PendingBackup
|
|
assert not ha._pending_backup_task.done()
|
|
|
|
await asyncio.wait({ha._pending_backup_task})
|
|
results = list((await ha.get()).values())
|
|
assert len(results) == 1
|
|
assert results[0].name() == "Test Name"
|
|
assert type(results[0]) == HABackup
|
|
assert results[0].retained()
|
|
assert config.isRetained(results[0].slug())
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_upload(time, ha, server, uploader):
|
|
data = await uploader.upload(createBackupTar("slug", "Test Name", time.now(), 1024 * 1024))
|
|
dummy = DummyBackup("Test Name", time.now(), "src", "slug", "dummy")
|
|
backup: HABackup = await ha.save(dummy, data)
|
|
assert backup.name() == "Test Name"
|
|
assert backup.slug() == "slug"
|
|
assert backup.size() == round(data.size() / 1024.0 / 1024.0, 2) * 1024 * 1024
|
|
assert backup.retained()
|
|
# ensure its still retained on a refresh
|
|
assert list((await ha.get()).values())[0].retained()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_corrupt_upload(time, ha, server, uploader):
|
|
# verify a corrupt backup throws the right exception
|
|
bad_data = await uploader.upload(getTestStream(100))
|
|
dummy = DummyBackup("Test Name", time.now(), "src", "slug2", "dummy")
|
|
|
|
with pytest.raises(UploadFailed):
|
|
await ha.save(dummy, bad_data)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_upload_wrong_slug(time, ha, server, uploader):
|
|
# verify a backup with the wrong slug also throws
|
|
bad_data = await uploader.upload(createBackupTar("wrongslug", "Test Name", time.now(), 1024 * 1024))
|
|
dummy = DummyBackup("Test Name", time.now(), "src", "slug", "dummy")
|
|
with pytest.raises(UploadFailed):
|
|
await ha.save(dummy, bad_data)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_failed_backup(time, ha: HaSource, supervisor: SimulatedSupervisor, config: Config, interceptor: RequestInterceptor):
|
|
# create a blocking backup
|
|
interceptor.setError(URL_MATCH_BACKUP_FULL, 524)
|
|
config.override(Setting.NEW_BACKUP_TIMEOUT_SECONDS, 0)
|
|
await supervisor.toggleBlockBackup()
|
|
backup_immediate = await ha.create(CreateOptions(time.now(), "Some Name"))
|
|
assert isinstance(backup_immediate, PendingBackup)
|
|
assert backup_immediate.name() == "Some Name"
|
|
assert not await ha.check()
|
|
assert not backup_immediate.isFailed()
|
|
await supervisor.toggleBlockBackup()
|
|
|
|
# let the backup attempt to complete
|
|
await asyncio.wait({ha._pending_backup_task})
|
|
|
|
# verify it failed with the expected http error
|
|
assert backup_immediate.isFailed()
|
|
assert backup_immediate._exception.status == 524
|
|
|
|
backups = list((await ha.get()).values())
|
|
assert len(backups) == 1
|
|
assert backups[0] is backup_immediate
|
|
|
|
# verify we can create a new backup immediately
|
|
interceptor.clear()
|
|
await ha.create(CreateOptions(time.now(), "Some Name"))
|
|
assert len(await ha.get()) == 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_failed_backup_retry(ha: HaSource, time: FakeTime, config: Config, supervisor: SimulatedSupervisor, interceptor: RequestInterceptor):
|
|
# create a blocking backup
|
|
interceptor.setError(URL_MATCH_BACKUP_FULL, 524)
|
|
config.override(Setting.NEW_BACKUP_TIMEOUT_SECONDS, 0)
|
|
await supervisor.toggleBlockBackup()
|
|
backup_immediate = await ha.create(CreateOptions(time.now(), "Some Name"))
|
|
assert isinstance(backup_immediate, PendingBackup)
|
|
assert backup_immediate.name() == "Some Name"
|
|
assert not await ha.check()
|
|
assert not backup_immediate.isFailed()
|
|
await supervisor.toggleBlockBackup()
|
|
|
|
# let the backup attempt to complete
|
|
await asyncio.wait({ha._pending_backup_task})
|
|
|
|
# verify it failed with the expected http error
|
|
assert backup_immediate.isFailed()
|
|
assert backup_immediate._exception.status == 524
|
|
|
|
assert await ha.check()
|
|
assert not await ha.check()
|
|
time.advance(seconds=config.get(Setting.FAILED_BACKUP_TIMEOUT_SECONDS))
|
|
|
|
# should trigger a sync after the failed backup timeout
|
|
assert await ha.check()
|
|
await ha.get()
|
|
assert not await ha.check()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_immediate_backup_failure(time: FakeTime, ha: HaSource, config: Config, interceptor: RequestInterceptor):
|
|
interceptor.setError(URL_MATCH_BACKUP_FULL, 524)
|
|
with pytest.raises(ClientResponseError) as thrown:
|
|
await ha.create(CreateOptions(time.now(), "Some Name"))
|
|
assert thrown.value.status == 524
|
|
|
|
assert ha.pending_backup is not None
|
|
backups = list((await ha.get()).values())
|
|
assert len(backups) == 1
|
|
assert backups[0].isFailed()
|
|
|
|
# Failed backup should go away after it times out
|
|
assert await ha.check()
|
|
assert not await ha.check()
|
|
time.advance(seconds=config.get(
|
|
Setting.FAILED_BACKUP_TIMEOUT_SECONDS) + 1)
|
|
assert await ha.check()
|
|
|
|
assert len(await ha.get()) == 0
|
|
assert not await ha.check()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_delete_error(time, ha: HaSource, interceptor: RequestInterceptor):
|
|
backup = await ha.create(CreateOptions(time.now(), "Some Name"))
|
|
full = DummyBackup(backup.name(), backup.date(),
|
|
backup.size(), backup.slug(), "dummy")
|
|
full.addSource(backup)
|
|
interceptor.setError(URL_MATCH_BACKUP_DELETE, 400)
|
|
with pytest.raises(HomeAssistantDeleteError):
|
|
await ha.delete(full)
|
|
|
|
interceptor.clear()
|
|
await ha.delete(full)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_hostname(time, ha: HaSource, server, global_info: GlobalInfo):
|
|
await ha.init()
|
|
assert global_info.url == "/hassio/ingress/self_slug"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_supervisor_error(time, ha: HaSource, server: SimulationServer, global_info: GlobalInfo):
|
|
await server.stop()
|
|
with pytest.raises(SupervisorConnectionError):
|
|
await ha.init()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_supervisor_permission_error(time, ha: HaSource, interceptor: RequestInterceptor, global_info: GlobalInfo):
|
|
interceptor.setError(URL_MATCH_MISC_INFO, 403)
|
|
with pytest.raises(SupervisorPermissionError):
|
|
await ha.init()
|
|
|
|
interceptor.clear()
|
|
interceptor.setError(URL_MATCH_MISC_INFO, 404)
|
|
with pytest.raises(ClientResponseError):
|
|
await ha.init()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_download_timeout(ha: HaSource, time, interceptor: RequestInterceptor, config: Config) -> None:
|
|
config.override(Setting.NEW_BACKUP_TIMEOUT_SECONDS, 100)
|
|
backup: HABackup = await ha.create(CreateOptions(time.now(), "Test Name"))
|
|
from_ha = await ha.harequests.backup(backup.slug())
|
|
full = DummyBackup(from_ha.name(), from_ha.date(),
|
|
from_ha.size(), from_ha.slug(), "dummy")
|
|
full.addSource(backup)
|
|
|
|
interceptor.setSleep(URL_MATCH_BACKUP_DOWNLOAD, sleep=100)
|
|
config.override(Setting.DOWNLOAD_TIMEOUT_SECONDS, 1)
|
|
direct_download = await ha.harequests.download(backup.slug())
|
|
|
|
with pytest.raises(SupervisorTimeoutError):
|
|
await direct_download.setup()
|
|
await direct_download.read(1)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_start_and_stop_addon(ha: HaSource, time, interceptor: RequestInterceptor, config: Config, supervisor: SimulatedSupervisor, addon_stopper: AddonStopper) -> None:
|
|
addon_stopper.allowRun()
|
|
slug = "test_slug"
|
|
supervisor.installAddon(slug, "Test decription")
|
|
config.override(Setting.STOP_ADDONS, slug)
|
|
config.override(Setting.NEW_BACKUP_TIMEOUT_SECONDS, 0.001)
|
|
|
|
assert supervisor.addon(slug)["state"] == "started"
|
|
async with supervisor._backup_inner_lock:
|
|
await ha.create(CreateOptions(time.now(), "Test Name"))
|
|
assert supervisor.addon(slug)["state"] == "stopped"
|
|
await ha._pending_backup_task
|
|
assert supervisor.addon(slug)["state"] == "started"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_start_and_stop_two_addons(ha: HaSource, time, interceptor: RequestInterceptor, config: Config, supervisor: SimulatedSupervisor, addon_stopper: AddonStopper) -> None:
|
|
addon_stopper.allowRun()
|
|
slug1 = "test_slug_1"
|
|
supervisor.installAddon(slug1, "Test decription")
|
|
|
|
slug2 = "test_slug_2"
|
|
supervisor.installAddon(slug2, "Test decription")
|
|
config.override(Setting.STOP_ADDONS, ",".join([slug1, slug2]))
|
|
config.override(Setting.NEW_BACKUP_TIMEOUT_SECONDS, 0.001)
|
|
|
|
assert supervisor.addon(slug1)["state"] == "started"
|
|
assert supervisor.addon(slug2)["state"] == "started"
|
|
async with supervisor._backup_inner_lock:
|
|
await ha.create(CreateOptions(time.now(), "Test Name"))
|
|
assert supervisor.addon(slug1)["state"] == "stopped"
|
|
assert supervisor.addon(slug2)["state"] == "stopped"
|
|
await ha._pending_backup_task
|
|
assert supervisor.addon(slug1)["state"] == "started"
|
|
assert supervisor.addon(slug2)["state"] == "started"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_stop_addon_failure(ha: HaSource, time, interceptor: RequestInterceptor, config: Config, supervisor: SimulatedSupervisor, addon_stopper: AddonStopper) -> None:
|
|
addon_stopper.allowRun()
|
|
slug = "test_slug"
|
|
supervisor.installAddon(slug, "Test decription")
|
|
config.override(Setting.STOP_ADDONS, slug)
|
|
config.override(Setting.NEW_BACKUP_TIMEOUT_SECONDS, 0.001)
|
|
interceptor.setError(URL_MATCH_STOP_ADDON, 400)
|
|
|
|
assert supervisor.addon(slug)["state"] == "started"
|
|
async with supervisor._backup_inner_lock:
|
|
await ha.create(CreateOptions(time.now(), "Test Name"))
|
|
assert supervisor.addon(slug)["state"] == "started"
|
|
await ha._pending_backup_task
|
|
assert supervisor.addon(slug)["state"] == "started"
|
|
assert len(await ha.get()) == 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_start_addon_failure(ha: HaSource, time, interceptor: RequestInterceptor, config: Config, supervisor: SimulatedSupervisor, addon_stopper: AddonStopper) -> None:
|
|
addon_stopper.allowRun()
|
|
slug = "test_slug"
|
|
supervisor.installAddon(slug, "Test decription")
|
|
config.override(Setting.STOP_ADDONS, slug)
|
|
config.override(Setting.NEW_BACKUP_TIMEOUT_SECONDS, 0.001)
|
|
interceptor.setError(URL_MATCH_START_ADDON, 400)
|
|
|
|
assert supervisor.addon(slug)["state"] == "started"
|
|
async with supervisor._backup_inner_lock:
|
|
await ha.create(CreateOptions(time.now(), "Test Name"))
|
|
assert supervisor.addon(slug)["state"] == "stopped"
|
|
await ha._pending_backup_task
|
|
assert supervisor.addon(slug)["state"] == "stopped"
|
|
assert len(await ha.get()) == 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ingore_self_when_stopping(ha: HaSource, time, interceptor: RequestInterceptor, config: Config, supervisor: SimulatedSupervisor, addon_stopper: AddonStopper) -> None:
|
|
addon_stopper.allowRun()
|
|
slug = supervisor._addon_slug
|
|
config.override(Setting.STOP_ADDONS, slug)
|
|
config.override(Setting.NEW_BACKUP_TIMEOUT_SECONDS, 0.001)
|
|
interceptor.setError(URL_MATCH_START_ADDON, 400)
|
|
|
|
assert supervisor.addon(slug)["state"] == "started"
|
|
async with supervisor._backup_inner_lock:
|
|
await ha.create(CreateOptions(time.now(), "Test Name"))
|
|
assert supervisor.addon(slug)["state"] == "started"
|
|
await ha._pending_backup_task
|
|
assert supervisor.addon(slug)["state"] == "started"
|
|
assert not interceptor.urlWasCalled(URL_MATCH_START_ADDON)
|
|
assert not interceptor.urlWasCalled(URL_MATCH_STOP_ADDON)
|
|
assert len(await ha.get()) == 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_dont_purge_pending_backup(ha: HaSource, time, config: Config, supervisor: SimulatedSupervisor, model: Model, interceptor):
|
|
config.override(Setting.MAX_BACKUPS_IN_HA, 4)
|
|
await ha.create(CreateOptions(time.now(), "Test Name 1"))
|
|
await ha.create(CreateOptions(time.now(), "Test Name 2"))
|
|
await ha.create(CreateOptions(time.now(), "Test Name 3"))
|
|
await ha.create(CreateOptions(time.now(), "Test Name 4"))
|
|
await model.sync(time.now())
|
|
|
|
config.override(Setting.NEW_BACKUP_TIMEOUT_SECONDS, 0.1)
|
|
interceptor.setSleep(URL_MATCH_BACKUP_FULL, sleep=2)
|
|
await ha.create(CreateOptions(time.now(), "Test Name"))
|
|
backups = list((await ha.get()).values())
|
|
assert len(backups) == 5
|
|
backup = backups[4]
|
|
assert isinstance(backup, PendingBackup)
|
|
|
|
# no backup should get purged yet because the ending backup isn't considered for purging.
|
|
await model.sync(time.now())
|
|
backups = list((await ha.get()).values())
|
|
assert len(backups) == 5
|
|
|
|
# Wait for the backup to finish, then verify one gets purged.
|
|
await ha._pending_backup_task
|
|
await model.sync(time.now())
|
|
backups = list((await ha.get()).values())
|
|
assert len(backups) == 4
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_matching_pending_backup(ha: HaSource, time: Time, config: Config, supervisor: SimulatedSupervisor, model: Model, interceptor, data_cache: DataCache):
|
|
'''
|
|
A pending backups with the same name and within a day of the backup time should be considered
|
|
made by the addon
|
|
'''
|
|
data_cache.backup("pending")[KEY_NAME] = "Test Backup"
|
|
data_cache.backup("pending")[KEY_CREATED] = time.now().isoformat()
|
|
data_cache.backup("pending")[KEY_LAST_SEEN] = time.now().isoformat()
|
|
|
|
await supervisor.createBackup({"name": "Test Backup"}, date=time.now() - timedelta(hours=12))
|
|
|
|
backups = await ha.get()
|
|
assert len(backups) == 1
|
|
backup = next(iter(backups.values()))
|
|
assert backup.madeByTheAddon()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_date_match_wrong_pending_backup(ha: HaSource, time: Time, config: Config, supervisor: SimulatedSupervisor, model: Model, interceptor, data_cache: DataCache):
|
|
'''
|
|
A pending backups with the same name but with the wrong date shoudl nto be considered made by the addon
|
|
'''
|
|
data_cache.backup("pending")[KEY_NAME] = "Test Backup"
|
|
data_cache.backup("pending")[KEY_CREATED] = time.now().isoformat()
|
|
data_cache.backup("pending")[KEY_LAST_SEEN] = time.now().isoformat()
|
|
|
|
await supervisor.createBackup({"name": "Test Backup"}, date=time.now() - timedelta(hours=25))
|
|
|
|
backups = await ha.get()
|
|
assert len(backups) == 1
|
|
backups = next(iter(backups.values()))
|
|
assert not backups.madeByTheAddon()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_name_wrong_match_pending_backup(ha: HaSource, time: Time, config: Config, supervisor: SimulatedSupervisor, model: Model, interceptor, data_cache: DataCache):
|
|
'''
|
|
A pending backups with the wrong name shoudl not be considered made by the addon
|
|
'''
|
|
data_cache.backup("pending")[KEY_NAME] = "Test Backup"
|
|
data_cache.backup("pending")[KEY_CREATED] = time.now().isoformat()
|
|
data_cache.backup("pending")[KEY_LAST_SEEN] = time.now().isoformat()
|
|
|
|
await supervisor.createBackup({"name": "Wrong Name"}, date=time.now() - timedelta(hours=12))
|
|
|
|
backups = await ha.get()
|
|
assert len(backups) == 1
|
|
backup = next(iter(backups.values()))
|
|
assert not backup.madeByTheAddon()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_bump_last_seen(ha: HaSource, time: Time, config: Config, supervisor: SimulatedSupervisor, model: Model, interceptor, data_cache: DataCache):
|
|
backup = await ha.create(CreateOptions(time.now(), "Test Name"))
|
|
time.advance(days=1)
|
|
assert backup.slug() in await ha.get()
|
|
assert data_cache.backup(backup.slug())[KEY_LAST_SEEN] == time.now().isoformat()
|
|
|
|
time.advance(days=1)
|
|
assert backup.slug() in await ha.get()
|
|
assert data_cache.backup(backup.slug())[KEY_LAST_SEEN] == time.now().isoformat()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_backup_supervisor_path(ha: HaSource, supervisor: SimulatedSupervisor, interceptor: RequestInterceptor):
|
|
supervisor._super_version = Version(2021, 8)
|
|
await ha.get()
|
|
assert interceptor.urlWasCalled(URL_MATCH_BACKUPS)
|
|
assert not interceptor.urlWasCalled(URL_MATCH_SNAPSHOT)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_backup_supervisor_path_old_version(ha: HaSource, supervisor: SimulatedSupervisor, interceptor: RequestInterceptor):
|
|
supervisor._super_version = Version(2021, 7)
|
|
await ha.get()
|
|
assert not interceptor.urlWasCalled(URL_MATCH_BACKUPS)
|
|
assert interceptor.urlWasCalled(URL_MATCH_SNAPSHOT)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_supervisor_host(ha: HaSource, supervisor: SimulatedSupervisor, interceptor: RequestInterceptor, config: Config, server_url):
|
|
assert ha.harequests.getSupervisorURL() == server_url
|
|
|
|
config.override(Setting.SUPERVISOR_URL, "")
|
|
assert ha.harequests.getSupervisorURL() == URL("http://hassio")
|
|
|
|
os.environ['SUPERVISOR_TOKEN'] = "test"
|
|
assert ha.harequests.getSupervisorURL() == URL("http://supervisor")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_upgrade_default_config(ha: HaSource, supervisor: SimulatedSupervisor, interceptor: RequestInterceptor, config: Config, server_url):
|
|
"""Verify that converting the original default config optiosn works as expected"""
|
|
|
|
# overwrite the addon options with old values
|
|
supervisor._options = {
|
|
Setting.DEPRECTAED_MAX_BACKUPS_IN_HA.value: 4,
|
|
Setting.DEPRECTAED_MAX_BACKUPS_IN_GOOGLE_DRIVE.value: 4,
|
|
Setting.DEPRECATED_DAYS_BETWEEN_BACKUPS.value: 3,
|
|
Setting.USE_SSL.value: False,
|
|
}
|
|
|
|
await ha.init()
|
|
|
|
assert not config.mustSaveUpgradeChanges()
|
|
assert interceptor.urlWasCalled(URL_MATCH_SELF_OPTIONS)
|
|
|
|
# Verify the config was upgraded
|
|
assert supervisor._options == {
|
|
Setting.MAX_BACKUPS_IN_HA.value: 4,
|
|
Setting.MAX_BACKUPS_IN_GOOGLE_DRIVE.value: 4,
|
|
Setting.DAYS_BETWEEN_BACKUPS.value: 3,
|
|
Setting.CALL_BACKUP_SNAPSHOT.value: True,
|
|
}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_upgrade_all_config(ha: HaSource, supervisor: SimulatedSupervisor, interceptor: RequestInterceptor, config: Config, server_url):
|
|
"""Verify that converting all upgradeable config optiosn works as expected"""
|
|
|
|
# overwrite the addon options with old values
|
|
supervisor._options = {
|
|
Setting.DEPRECTAED_MAX_BACKUPS_IN_HA.value: 1,
|
|
Setting.DEPRECTAED_MAX_BACKUPS_IN_GOOGLE_DRIVE.value: 2,
|
|
Setting.DEPRECATED_DAYS_BETWEEN_BACKUPS.value: 5,
|
|
Setting.DEPRECTAED_IGNORE_OTHER_BACKUPS.value: True,
|
|
Setting.DEPRECTAED_IGNORE_UPGRADE_BACKUPS.value: True,
|
|
Setting.DEPRECTAED_BACKUP_TIME_OF_DAY.value: "01:11",
|
|
Setting.DEPRECTAED_DELETE_BEFORE_NEW_BACKUP.value: True,
|
|
Setting.DEPRECTAED_BACKUP_NAME.value: "test",
|
|
Setting.DEPRECTAED_SPECIFY_BACKUP_FOLDER.value: True,
|
|
Setting.DEPRECTAED_NOTIFY_FOR_STALE_BACKUPS.value: False,
|
|
Setting.DEPRECTAED_ENABLE_BACKUP_STALE_SENSOR.value: False,
|
|
Setting.DEPRECTAED_ENABLE_BACKUP_STATE_SENSOR.value: False,
|
|
Setting.DEPRECATED_BACKUP_PASSWORD.value: "test password",
|
|
}
|
|
|
|
await ha.init()
|
|
assert not config.mustSaveUpgradeChanges()
|
|
assert interceptor.urlWasCalled(URL_MATCH_SELF_OPTIONS)
|
|
|
|
# Verify the config was upgraded
|
|
assert supervisor._options == {
|
|
Setting.MAX_BACKUPS_IN_HA.value: 1,
|
|
Setting.MAX_BACKUPS_IN_GOOGLE_DRIVE.value: 2,
|
|
Setting.DAYS_BETWEEN_BACKUPS.value: 5,
|
|
Setting.IGNORE_OTHER_BACKUPS.value: True,
|
|
Setting.IGNORE_UPGRADE_BACKUPS.value: True,
|
|
Setting.BACKUP_TIME_OF_DAY.value: "01:11",
|
|
Setting.DELETE_BEFORE_NEW_BACKUP.value: True,
|
|
Setting.BACKUP_NAME.value: "test",
|
|
Setting.SPECIFY_BACKUP_FOLDER.value: True,
|
|
Setting.NOTIFY_FOR_STALE_BACKUPS.value: False,
|
|
Setting.ENABLE_BACKUP_STALE_SENSOR.value: False,
|
|
Setting.ENABLE_BACKUP_STATE_SENSOR.value: False,
|
|
Setting.BACKUP_PASSWORD.value: "test password",
|
|
Setting.CALL_BACKUP_SNAPSHOT.value: True,
|
|
}
|
|
|
|
interceptor.clear()
|
|
|
|
await ha.init()
|
|
assert not interceptor.urlWasCalled(URL_MATCH_SELF_OPTIONS)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_upgrade_some_config(ha: HaSource, supervisor: SimulatedSupervisor, interceptor: RequestInterceptor, config: Config, server_url):
|
|
"""Verify that converting a mix of upgradeable and not upgradeable config works"""
|
|
|
|
# overwrite the addon options with old values
|
|
supervisor._options = {
|
|
Setting.DEPRECTAED_MAX_BACKUPS_IN_HA.value: 4,
|
|
Setting.DEPRECTAED_MAX_BACKUPS_IN_GOOGLE_DRIVE.value: 4,
|
|
Setting.DEPRECATED_DAYS_BETWEEN_BACKUPS.value: 3,
|
|
Setting.DEPRECTAED_BACKUP_TIME_OF_DAY.value: "01:11",
|
|
Setting.EXCLUDE_ADDONS.value: "test",
|
|
Setting.USE_SSL.value: False,
|
|
}
|
|
|
|
await ha.init()
|
|
|
|
assert not config.mustSaveUpgradeChanges()
|
|
assert interceptor.urlWasCalled(URL_MATCH_SELF_OPTIONS)
|
|
|
|
# Verify the config was upgraded
|
|
assert supervisor._options == {
|
|
Setting.MAX_BACKUPS_IN_HA.value: 4,
|
|
Setting.MAX_BACKUPS_IN_GOOGLE_DRIVE.value: 4,
|
|
Setting.DAYS_BETWEEN_BACKUPS.value: 3,
|
|
Setting.EXCLUDE_ADDONS.value: "test",
|
|
Setting.BACKUP_TIME_OF_DAY.value: "01:11",
|
|
Setting.CALL_BACKUP_SNAPSHOT.value: True,
|
|
}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_upgrade_no_config(ha: HaSource, supervisor: SimulatedSupervisor, interceptor: RequestInterceptor, config: Config, server_url):
|
|
"""Verifies that config not in need of an upgrade doesn't get upgraded"""
|
|
|
|
# overwrite the addon options with old values
|
|
supervisor._options = {
|
|
Setting.MAX_BACKUPS_IN_HA.value: 4,
|
|
Setting.MAX_BACKUPS_IN_GOOGLE_DRIVE.value: 4,
|
|
Setting.DAYS_BETWEEN_BACKUPS.value: 3,
|
|
Setting.BACKUP_TIME_OF_DAY.value: "01:11",
|
|
Setting.EXCLUDE_ADDONS.value: "test"
|
|
}
|
|
|
|
await ha.init()
|
|
|
|
assert not config.mustSaveUpgradeChanges()
|
|
assert not interceptor.urlWasCalled(URL_MATCH_SELF_OPTIONS)
|
|
|
|
# Verify the config was upgraded
|
|
assert supervisor._options == {
|
|
Setting.MAX_BACKUPS_IN_HA.value: 4,
|
|
Setting.MAX_BACKUPS_IN_GOOGLE_DRIVE.value: 4,
|
|
Setting.DAYS_BETWEEN_BACKUPS.value: 3,
|
|
Setting.BACKUP_TIME_OF_DAY.value: "01:11",
|
|
Setting.EXCLUDE_ADDONS.value: "test",
|
|
}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_old_delete_path(ha: HaSource, supervisor: SimulatedSupervisor, interceptor: RequestInterceptor, time: FakeTime):
|
|
supervisor._super_version = Version(2020, 8)
|
|
await ha.get()
|
|
backup: HABackup = await ha.create(CreateOptions(time.now(), "Test Name"))
|
|
full = DummyBackup(backup.name(), backup.date(),
|
|
backup.size(), backup.slug(), "dummy")
|
|
full.addSource(backup)
|
|
await ha.delete(full)
|
|
assert interceptor.urlWasCalled("/snapshots/{0}/remove".format(backup.slug()))
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ignore_upgrade_backup_ha_config(ha: HaSource, time: Time, supervisor: SimulatedSupervisor, interceptor: RequestInterceptor, config: Config, data_cache: DataCache):
|
|
config.override(Setting.IGNORE_UPGRADE_BACKUPS, True)
|
|
slug = (await ha.harequests.createBackup({'name': "Suddenly Appears", 'folders': ['homeassistant'], 'addons': []}))['slug']
|
|
|
|
backups = await ha.get()
|
|
assert len(backups) == 1
|
|
assert slug in backups
|
|
assert backups[slug].ignore()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ignore_upgrade_backup_single_folder(ha: HaSource, time: Time, supervisor: SimulatedSupervisor, interceptor: RequestInterceptor, config: Config, data_cache: DataCache):
|
|
config.override(Setting.IGNORE_UPGRADE_BACKUPS, True)
|
|
slug = (await ha.harequests.createBackup({'name': "Suddenly Appears", 'folders': ['share'], 'addons': []}))['slug']
|
|
|
|
backups = await ha.get()
|
|
assert len(backups) == 1
|
|
assert slug in backups
|
|
assert backups[slug].ignore()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ignore_upgrade_backup_single_addon(ha: HaSource, time: Time, supervisor: SimulatedSupervisor, interceptor: RequestInterceptor, config: Config, data_cache: DataCache):
|
|
config.override(Setting.IGNORE_UPGRADE_BACKUPS, True)
|
|
slug = (await ha.harequests.createBackup({'name': "Suddenly Appears", 'folders': [], 'addons': ["particla_accel"]}))['slug']
|
|
|
|
backups = await ha.get()
|
|
assert len(backups) == 1
|
|
assert slug in backups
|
|
assert backups[slug].ignore()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ignore_upgrade_backup_two_folders(ha: HaSource, time: Time, supervisor: SimulatedSupervisor, interceptor: RequestInterceptor, config: Config, data_cache: DataCache):
|
|
config.override(Setting.IGNORE_UPGRADE_BACKUPS, True)
|
|
slug = (await ha.harequests.createBackup({'name': "Suddenly Appears", 'folders': ['homeassistant', "share"], 'addons': []}))['slug']
|
|
|
|
backups = await ha.get()
|
|
assert len(backups) == 1
|
|
assert slug in backups
|
|
assert not backups[slug].ignore()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ignore_upgrade_backup_empty(ha: HaSource, time: Time, supervisor: SimulatedSupervisor, interceptor: RequestInterceptor, config: Config, data_cache: DataCache):
|
|
config.override(Setting.IGNORE_UPGRADE_BACKUPS, True)
|
|
slug = (await ha.harequests.createBackup({'name': "Suddenly Appears", 'folders': [], 'addons': []}))['slug']
|
|
|
|
backups = await ha.get()
|
|
assert len(backups) == 1
|
|
assert slug in backups
|
|
assert not backups[slug].ignore()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_very_long_running_backup(time, config, ha: HaSource, supervisor: SimulatedSupervisor):
|
|
config.override(Setting.NEW_BACKUP_TIMEOUT_SECONDS, 1)
|
|
config.override(Setting.PENDING_BACKUP_TIMEOUT_SECONDS, 2)
|
|
config.override(Setting.IGNORE_OTHER_BACKUPS, True)
|
|
|
|
async with supervisor._backup_inner_lock:
|
|
await ha.create(CreateOptions(time.now(), "Actually gets made"))
|
|
|
|
for _ in range(4):
|
|
time.advance(hours=1)
|
|
assert "pending" in (await ha.get())
|
|
|
|
# Let the task fail
|
|
await ha._pending_backup_task
|
|
|
|
# after 4 hours the pending backup should be assumed to have failed and cleaned up
|
|
time.advance(hours=1)
|
|
assert len(await ha.get()) == 0
|
|
|
|
for _ in range(4):
|
|
time.advance(hours=1)
|
|
# Making a backup should keep failing
|
|
with pytest.raises(BackupInProgress):
|
|
await ha.create(CreateOptions(time.now(), "Ignored"))
|
|
backups = await ha.get()
|
|
assert len(backups) == 1
|
|
assert "pending" in backups
|
|
|
|
# Wait for the backup to complete
|
|
async with supervisor._backup_lock:
|
|
pass
|
|
|
|
time.advance(hours=1)
|
|
backups = await ha.get()
|
|
assert len(backups) == 1
|
|
assert "pending" not in backups
|
|
assert list(backups.values())[0].madeByTheAddon()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_note(time, config, ha: HaSource, supervisor: SimulatedSupervisor):
|
|
backup = await ha.create(CreateOptions(time.now(), "Backup"))
|
|
assert isinstance(backup, HABackup)
|
|
assert backup.note() is None
|
|
|
|
full = DummyBackup(backup.name(), backup.date(),
|
|
backup.size(), backup.slug(), "dummy")
|
|
full.addSource(backup)
|
|
await ha.note(full, "new note")
|
|
assert backup.note() == "new note"
|
|
|
|
await ha.note(full, None)
|
|
assert backup.note() is None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_note_creation(time, config, ha: HaSource, supervisor: SimulatedSupervisor):
|
|
backup = await ha.create(CreateOptions(time.now(), "Backup", note="creation note"))
|
|
assert isinstance(backup, HABackup)
|
|
assert backup.note() == "creation note"
|
|
assert (await ha.get())[backup.slug()].note() == "creation note"
|
|
|
|
full = DummyBackup(backup.name(), backup.date(),
|
|
backup.size(), backup.slug(), "dummy")
|
|
full.addSource(backup)
|
|
await ha.note(full, "new note")
|
|
assert backup.note() == "new note"
|
|
assert (await ha.get())[full.slug()].note() == "new note"
|
|
|
|
await ha.note(full, None)
|
|
assert backup.note() is None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_note_long_backup(time, config, ha: HaSource, supervisor: SimulatedSupervisor):
|
|
config.override(Setting.NEW_BACKUP_TIMEOUT_SECONDS, 1)
|
|
|
|
async with supervisor._backup_inner_lock:
|
|
backup = await ha.create(CreateOptions(time.now(), "Backup", note="Creation note"))
|
|
assert isinstance(backup, PendingBackup)
|
|
assert backup.note() == "Creation note"
|
|
|
|
pending = (await ha.get())[backup.slug()]
|
|
assert pending.note() == "Creation note"
|
|
assert ha._pending_backup_task is not None
|
|
await ha._pending_backup_task
|
|
completed = next(iter((await ha.get()).values()))
|
|
assert not isinstance(completed, PendingBackup)
|
|
assert completed.note() == "Creation note"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_note_long_backup_changed_during_creation(time, config, ha: HaSource, supervisor: SimulatedSupervisor):
|
|
config.override(Setting.NEW_BACKUP_TIMEOUT_SECONDS, 1)
|
|
|
|
async with supervisor._backup_inner_lock:
|
|
backup = await ha.create(CreateOptions(time.now(), "Backup", note="Creation note"))
|
|
assert isinstance(backup, PendingBackup)
|
|
assert backup.note() == "Creation note"
|
|
|
|
pending = (await ha.get())[backup.slug()]
|
|
assert pending.note() == "Creation note"
|
|
|
|
full = DummyBackup(pending.name(), pending.date(), pending.size(), pending.slug(), "dummy")
|
|
full.addSource(pending)
|
|
await ha.note(full, "changed")
|
|
|
|
still_pending = next(iter((await ha.get()).values()))
|
|
assert isinstance(still_pending, PendingBackup)
|
|
assert still_pending.note() == "changed"
|
|
|
|
assert ha._pending_backup_task is not None
|
|
await ha._pending_backup_task
|
|
completed = next(iter((await ha.get()).values()))
|
|
assert not isinstance(completed, PendingBackup)
|
|
assert completed.note() == "changed"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_note_change_external_backup(time, config, ha: HaSource, supervisor: SimulatedSupervisor):
|
|
config.override(Setting.NEW_BACKUP_TIMEOUT_SECONDS, 100)
|
|
|
|
await supervisor.toggleBlockBackup()
|
|
with pytest.raises(BackupInProgress):
|
|
await ha.create(CreateOptions(time.now(), "Ignored", note="ignored"))
|
|
pending = next(iter((await ha.get()).values()))
|
|
assert isinstance(pending, PendingBackup)
|
|
assert pending.note() is None
|
|
|
|
full = DummyBackup(pending.name(), pending.date(), pending.size(), pending.slug(), "dummy")
|
|
full.addSource(pending)
|
|
await ha.note(full, "changed note")
|
|
assert full.note() == "changed note"
|
|
|
|
# create a new backup in the background, this should get the note of the pending backup.
|
|
await supervisor.toggleBlockBackup()
|
|
await supervisor.createBackup({"name": "Test Backup"}, date=time.now() - timedelta(hours=12))
|
|
completed = next(iter((await ha.get()).values()))
|
|
assert not isinstance(completed, PendingBackup)
|
|
assert completed.note() == "changed note"
|
|
|
|
|
|
# Verify that if the supervisor is below the minimum version, we don't query for mount information and its populated with a reasonable default.
|
|
@pytest.mark.asyncio
|
|
async def test_mount_info_old_supervisor(time, config, ha: HaSource, supervisor: SimulatedSupervisor, interceptor: RequestInterceptor):
|
|
supervisor._super_version = Version.parse("2023.5")
|
|
await ha.refresh()
|
|
assert not interceptor.urlWasCalled(URL_MATCH_MOUNT)
|
|
assert len(ha.mount_info.get("mounts")) == 0
|
|
|
|
|
|
# Verify that if the supervisor is above the minimum version, we we do query for mount info and populate it
|
|
@pytest.mark.asyncio
|
|
async def test_mount_info_new_supervisor(time, config, ha: HaSource, supervisor: SimulatedSupervisor, interceptor: RequestInterceptor):
|
|
supervisor._super_version = Version.parse("2023.6")
|
|
await ha.refresh()
|
|
assert interceptor.urlWasCalled(URL_MATCH_MOUNT)
|
|
assert len(ha.mount_info.get("mounts")) > 0
|
|
|
|
|
|
# Verify that the default backup location is HA's configured default if the backup location is unspecified
|
|
# and the supervisor is above the minimum version
|
|
@pytest.mark.asyncio
|
|
async def test_default_backup_location_new_supervisor(time, config, ha: HaSource, supervisor: SimulatedSupervisor, interceptor: RequestInterceptor):
|
|
await ha.refresh()
|
|
req, _name, _protected = ha._buildBackupInfo(CreateOptions(time.now(), "Backup"))
|
|
assert req.get("location") is None
|
|
assert 'location' in req
|
|
|
|
supervisor._mounts["default_backup_mount"] = "my_backup_share"
|
|
|
|
await ha.refresh()
|
|
req, _name, _protected = ha._buildBackupInfo(CreateOptions(time.now(), "Backup"))
|
|
assert req.get("location") == "my_backup_share"
|
|
|
|
|
|
# Verify that having a backup storage location of "local-disk" always uses the default even if HA has another default configured
|
|
@pytest.mark.asyncio
|
|
async def test_default_backup_location_local_disk(time, config: Config, ha: HaSource, supervisor: SimulatedSupervisor, interceptor: RequestInterceptor):
|
|
config.override(Setting.BACKUP_STORAGE, "local-disk")
|
|
await ha.refresh()
|
|
req, _name, _protected = ha._buildBackupInfo(CreateOptions(time.now(), "Backup"))
|
|
assert req.get("location") is None
|
|
assert 'location' in req
|
|
|
|
supervisor._mounts["default_backup_mount"] = "my_backup_share"
|
|
await ha.refresh()
|
|
req, _name, _protected = ha._buildBackupInfo(CreateOptions(time.now(), "Backup"))
|
|
assert req.get("location") is None
|
|
assert 'location' in req
|
|
|
|
|
|
# Verify that using a non-active share results in an error before attempting to request the backup
|
|
@pytest.mark.asyncio
|
|
async def test_inactive_backup_location(time, config: Config, ha: HaSource, supervisor: SimulatedSupervisor, interceptor: RequestInterceptor):
|
|
config.override(Setting.BACKUP_STORAGE, "my_backup_share")
|
|
supervisor._mounts["mounts"][1]["state"] = "starting"
|
|
await ha.refresh()
|
|
with pytest.raises(InactiveNetworkStorageError):
|
|
ha._buildBackupInfo(CreateOptions(time.now(), "Backup"))
|
|
|
|
|
|
# Verify that using a non-existant share results in an error before attempting to request the backup
|
|
@pytest.mark.asyncio
|
|
async def test_unknown_backup_location(time, config: Config, ha: HaSource, supervisor: SimulatedSupervisor, interceptor: RequestInterceptor):
|
|
config.override(Setting.BACKUP_STORAGE, "doesn't_exists")
|
|
await ha.refresh()
|
|
with pytest.raises(UnknownNetworkStorageError):
|
|
ha._buildBackupInfo(CreateOptions(time.now(), "Backup"))
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_exclude_database(ha: HaSource, time, config: Config, supervisor: SimulatedSupervisor) -> None:
|
|
config.override(Setting.EXCLUDE_HA_DATABASE, True)
|
|
config.override(Setting.NEW_BACKUP_TIMEOUT_SECONDS, 0.1)
|
|
async with supervisor._backup_inner_lock:
|
|
backup = await ha.create(CreateOptions(time.now(), "Test Name"))
|
|
assert isinstance(backup, PendingBackup)
|
|
assert backup._request_info['homeassistant_exclude_database']
|