Files

1350 lines
52 KiB
Python

from datetime import datetime, timedelta, timezone
import pytest
from dateutil.tz import gettz
from backup.config import Config, Setting, CreateOptions
from backup.exceptions import DeleteMutlipleBackupsError
from backup.util import GlobalInfo, DataCache
from backup.model import Model, BackupSource
from .faketime import FakeTime
from .helpers import HelperTestSource, IntentionalFailure
test_tz = gettz('EST')
default_source = BackupSource()
@pytest.fixture
def source():
return HelperTestSource("Source", is_destination=False)
@pytest.fixture
def dest():
return HelperTestSource("Dest", is_destination=True)
@pytest.fixture
def simple_config() -> Config:
config = createConfig()
return config
@pytest.fixture
def model(source, dest, time, simple_config, global_info, estimator, data_cache):
return Model(simple_config, time, source, dest, global_info, estimator, data_cache)
def createConfig() -> Config:
return Config().override(Setting.BACKUP_STARTUP_DELAY_MINUTES, 0)
def test_timeOfDay(estimator, model: Model) -> None:
assert model.getTimeOfDay() is None
model.config.override(Setting.BACKUP_TIME_OF_DAY, '00:00')
model.reinitialize()
assert model.getTimeOfDay() == (0, 0)
model.config.override(Setting.BACKUP_TIME_OF_DAY, '23:59')
model.reinitialize()
assert model.getTimeOfDay() == (23, 59)
model.config.override(Setting.BACKUP_TIME_OF_DAY, '24:59')
model.reinitialize()
assert model.getTimeOfDay() is None
model.config.override(Setting.BACKUP_TIME_OF_DAY, '24:60')
model.reinitialize()
assert model.getTimeOfDay() is None
model.config.override(Setting.BACKUP_TIME_OF_DAY, '-1:60')
model.reinitialize()
assert model.getTimeOfDay() is None
model.config.override(Setting.BACKUP_TIME_OF_DAY, '24:-1')
model.reinitialize()
assert model.getTimeOfDay() is None
model.config.override(Setting.BACKUP_TIME_OF_DAY, 'boop:60')
model.reinitialize()
assert model.getTimeOfDay() is None
model.config.override(Setting.BACKUP_TIME_OF_DAY, '24:boop')
model.reinitialize()
assert model.getTimeOfDay() is None
model.config.override(Setting.BACKUP_TIME_OF_DAY, '24:10:22')
model.reinitialize()
assert model.getTimeOfDay() is None
model.config.override(Setting.BACKUP_TIME_OF_DAY, '10')
model.reinitialize()
assert model.getTimeOfDay() is None
def test_next_time(estimator, data_cache):
time: FakeTime = FakeTime()
now: datetime = datetime(1985, 12, 6, 1, 0, 0).astimezone(timezone.utc)
time.setNow(now)
info = GlobalInfo(time)
config: Config = createConfig().override(Setting.DAYS_BETWEEN_BACKUPS, 0)
model: Model = Model(config, time, default_source,
default_source, info, estimator, data_cache)
assert model._nextBackup(now=now, last_backup=None) is None
assert model._nextBackup(now=now, last_backup=now) is None
config: Config = createConfig().override(Setting.DAYS_BETWEEN_BACKUPS, 1)
model: Model = Model(config, time, default_source,
default_source, info, estimator, data_cache)
assert model._nextBackup(
now=now, last_backup=None) == now
assert model._nextBackup(
now=now, last_backup=now) == now + timedelta(days=1)
assert model._nextBackup(
now=now, last_backup=now - timedelta(days=1)) == now
assert model._nextBackup(
now=now, last_backup=now + timedelta(days=1)) == now + timedelta(days=2)
def test_next_time_of_day(estimator, data_cache):
time: FakeTime = FakeTime()
now: datetime = datetime(1985, 12, 6, 1, 0, 0).astimezone(timezone.utc)
time.setNow(now)
info = GlobalInfo(time)
config: Config = createConfig().override(Setting.DAYS_BETWEEN_BACKUPS, 1).override(
Setting.BACKUP_TIME_OF_DAY, '08:00')
model: Model = Model(config, time, default_source,
default_source, info, estimator, data_cache)
assert model._nextBackup(
now=now, last_backup=None) == now
assert model._nextBackup(
now=now, last_backup=now - timedelta(days=1)) == now
assert model._nextBackup(now=now, last_backup=now) == datetime(
1985, 12, 6, 8, 0, tzinfo=test_tz)
assert model._nextBackup(now=now, last_backup=datetime(
1985, 12, 6, 8, 0, tzinfo=test_tz)) == datetime(1985, 12, 7, 8, 0, tzinfo=test_tz)
assert model._nextBackup(now=datetime(1985, 12, 6, 8, 0, tzinfo=test_tz), last_backup=datetime(
1985, 12, 6, 8, 0, tzinfo=test_tz)) == datetime(1985, 12, 7, 8, 0, tzinfo=test_tz)
def test_next_time_of_day_drift(estimator, data_cache):
time: FakeTime = FakeTime()
now: datetime = datetime(1985, 12, 6, 1, 0, 0).astimezone(timezone.utc)
time.setNow(now)
info = GlobalInfo(time)
# ignore the backup cooldown for this test
info.triggerBackupCooldown(timedelta(days=-7))
config: Config = createConfig().override(Setting.DAYS_BETWEEN_BACKUPS, 1).override(
Setting.BACKUP_TIME_OF_DAY, '08:00')
model: Model = Model(config, time, default_source,
default_source, info, estimator, data_cache)
assert model._nextBackup(
now=now, last_backup=None) == now
assert model._nextBackup(
now=now, last_backup=now - timedelta(days=1) + timedelta(minutes=1)) == datetime(1985, 12, 5, 8, 0, tzinfo=test_tz)
def test_next_time_of_day_dest_disabled(model, time, source, dest):
dest.setEnabled(True)
assert model._nextBackup(
now=time.now(), last_backup=None) == time.now()
dest.setEnabled(False)
assert model._nextBackup(now=time.now(), last_backup=None) is None
@pytest.mark.asyncio
async def test_sync_empty(model, time, source, dest):
source.setEnabled(False)
dest.setEnabled(False)
await model.sync(time.now())
assert len(model.backups) == 0
@pytest.mark.asyncio
async def test_sync_single_source(model: Model, source, dest, time):
backup = await source.create(CreateOptions(time.now(), "name"))
dest.setEnabled(False)
await model.sync(time.now())
assert len(model.backups) == 1
assert backup.slug() in model.backups
assert model.backups[backup.slug()].getSource(
source.name()) is backup
assert model.backups[backup.slug()].getSource(dest.name()) is None
@pytest.mark.asyncio
async def test_sync_source_and_dest(model: Model, time, source, dest: HelperTestSource):
backup_source = await source.create(CreateOptions(time.now(), "name"))
await model._syncBackups([source, dest], time.now())
assert len(model.backups) == 1
backup_dest = await dest.save(model.backups[backup_source.slug()])
await model._syncBackups([source, dest], time.now())
assert len(model.backups) == 1
assert model.backups[backup_source.slug()].getSource(
source.name()) is backup_source
assert model.backups[backup_source.slug()].getSource(
dest.name()) is backup_dest
@pytest.mark.asyncio
async def test_sync_different_sources(model: Model, time, source, dest):
backup_source = await source.create(CreateOptions(time.now(), "name"))
backup_dest = await dest.create(CreateOptions(time.now(), "name"))
await model._syncBackups([source, dest], time.now())
assert len(model.backups) == 2
assert model.backups[backup_source.slug()].getSource(
source.name()) is backup_source
assert model.backups[backup_dest.slug()].getSource(
dest.name()) is backup_dest
@pytest.mark.asyncio
async def test_removal(model: Model, time, source, dest):
await source.create(CreateOptions(time.now(), "name"))
await model._syncBackups([source, dest], time.now())
assert len(model.backups) == 1
source.current = {}
await model._syncBackups([source, dest], time.now())
assert len(model.backups) == 0
@pytest.mark.asyncio
async def test_new_backup(model: Model, source, dest, time):
await model.sync(time.now())
assert len(model.backups) == 1
assert len(source.created) == 1
assert source.created[0].date() == time.now()
assert len(source.current) == 1
assert len(dest.current) == 1
@pytest.mark.asyncio
async def test_upload_backup(time, model: Model, dest, source):
dest.setEnabled(True)
await model.sync(time.now())
assert len(model.backups) == 1
source.assertThat(created=1, current=1)
assert len(source.created) == 1
assert source.created[0].date() == time.now()
assert len(source.current) == 1
assert len(dest.current) == 1
assert len(dest.saved) == 1
@pytest.mark.asyncio
async def test_disabled(time, model: Model, source, dest):
# create two disabled sources
source.setEnabled(False)
source.insert("newer", time.now(), "slug1")
dest.setEnabled(False)
dest.insert("s2", time.now(), "slug2")
await model.sync(time.now())
source.assertUnchanged()
dest.assertUnchanged()
assert len(model.backups) == 0
@pytest.mark.asyncio
async def test_delete_source(time, model: Model, source, dest):
time = FakeTime()
now = time.now()
# create two source backups
source.setMax(1)
older = source.insert("older", now - timedelta(minutes=1), "older")
newer = source.insert("newer", now, "newer")
# configure only one to be kept
await model.sync(now)
assert len(model.backups) == 1
assert len(source.saved) == 0
assert source.deleted == [older]
assert len(source.saved) == 0
assert newer.slug() in model.backups
assert model.backups[newer.slug()].getSource(source.name()) == newer
@pytest.mark.asyncio
async def test_delete_dest(time, model: Model, source, dest):
now = time.now()
# create two source backups
dest.setMax(1)
older = dest.insert("older", now - timedelta(minutes=1), "older")
newer = dest.insert("newer", now, "newer")
# configure only one to be kept
await model.sync(now)
assert len(model.backups) == 1
assert len(dest.saved) == 0
assert dest.deleted == [older]
assert len(source.saved) == 0
assert newer.slug() in model.backups
assert model.backups[newer.slug()].getSource(dest.name()) == newer
source.assertUnchanged()
@pytest.mark.asyncio
async def test_new_upload_with_delete(time, model: Model, source, dest, simple_config):
now = time.now()
# create a single old backups
source.setMax(1)
dest.setMax(1)
backup_dest = dest.insert("older", now - timedelta(days=1), "older")
backup_source = source.insert("older", now - timedelta(days=1), "older")
# configure only one to be kept in both places
simple_config.config.update({
"days_between_backups": 1
})
model.reinitialize()
await model.sync(now)
# Old snapshto shoudl be deleted, new one shoudl be created and uploaded.
source.assertThat(current=1, created=1, deleted=1)
dest.assertThat(current=1, saved=1, deleted=1)
assert dest.deleted == [backup_dest]
assert source.deleted == [backup_source]
assert len(model.backups) == 1
assertBackup(model, [source.created[0], dest.saved[0]])
@pytest.mark.asyncio
async def test_new_upload_no_delete(time, model: Model, source, dest, simple_config):
now = time.now()
# create a single old backup
source.setMax(2)
dest.setMax(2)
backup_dest = dest.insert("older", now - timedelta(days=1), "older")
backup_source = source.insert("older", now - timedelta(days=1), "older")
# configure keeping two in both places
simple_config.config.update({
"days_between_backups": 1
})
model.reinitialize()
await model.sync(now)
# Another backup should have been created and saved
source.assertThat(current=2, created=1)
dest.assertThat(current=2, saved=1)
assert len(model.backups) == 2
assertBackup(model, [source.created[0], dest.saved[0]])
assertBackup(model, [backup_dest, backup_source])
@pytest.mark.asyncio
async def test_multiple_deletes_allowed(time, model: Model, source, dest, simple_config):
now = time.now()
simple_config.config.update({"confirm_multiple_deletes": False})
# create 4 backups in dest
dest.setMax(1)
current = dest.insert("current", now, "current")
old = dest.insert("old", now - timedelta(days=1), "old")
older = dest.insert("older", now - timedelta(days=2), "older")
oldest = dest.insert("oldest", now - timedelta(days=3), "oldest")
# configure keeping 1
simple_config.config.update({
"max_backups_in_google_drive": 1,
})
model.reinitialize()
await model.sync(now)
source.assertUnchanged()
dest.assertThat(current=1, deleted=3)
assert dest.deleted == [oldest, older, old]
assert len(model.backups) == 1
assertBackup(model, [current])
@pytest.mark.asyncio
async def test_confirm_multiple_deletes(time, model: Model, source, dest, simple_config):
now = time.now()
dest.setMax(1)
source.setMax(1)
dest.insert("current", now, "current")
dest.insert("old", now - timedelta(days=1), "old")
dest.insert("older", now - timedelta(days=2), "older")
dest.insert("oldest", now - timedelta(days=2), "olderest")
source.insert("current", now, "current")
source.insert("old", now - timedelta(days=1), "old")
source.insert("older", now - timedelta(days=2), "older")
with pytest.raises(DeleteMutlipleBackupsError) as thrown:
await model.sync(now)
thrown.value.data() == {
source.name(): 2,
dest.name(): 3
}
source.assertUnchanged()
dest.assertUnchanged()
@pytest.mark.asyncio
async def test_dont_upload_deletable(time, model: Model, source, dest):
now = time.now()
# a new backup in Drive and an old backup in HA
dest.setMax(1)
current = dest.insert("current", now, "current")
old = source.insert("old", now - timedelta(days=1), "old")
# configure keeping 1
await model.sync(now)
# Nothing should happen, because the upload from hassio would have to be deleted right after it's uploaded.
source.assertUnchanged()
dest.assertUnchanged()
assert len(model.backups) == 2
assertBackup(model, [current])
assertBackup(model, [old])
@pytest.mark.asyncio
async def test_dont_upload_when_disabled(time, model: Model, source, dest):
now = time.now()
# Make an enabled destination but with upload diabled.
dest.setMax(1)
dest.setUpload(False)
await model.sync(now)
# Verify the backup was created at the source but not uploaded.
source.assertThat(current=1, created=1)
dest.assertUnchanged()
assert len(model.backups) == 1
@pytest.mark.asyncio
async def test_dont_delete_purgable(time, model: Model, source, dest, simple_config):
now = time.now()
# create a single old backup, retained
source.setMax(1)
dest.setMax(1)
backup_dest = dest.insert("older", now - timedelta(days=1), "older")
backup_dest.setRetained(True)
backup_source = source.insert("older", now - timedelta(days=1), "older")
backup_source.setRetained(True)
# configure only one to be kept in both places
simple_config.config.update({
"days_between_backups": 1
})
model.reinitialize()
await model.sync(now)
# Old snapshto shoudl be kept, new one should be created and uploaded.
source.assertThat(current=2, created=1)
dest.assertThat(current=2, saved=1)
assert len(model.backups) == 2
assertBackup(model, [backup_dest, backup_source])
assertBackup(model, [source.created[0], dest.saved[0]])
@pytest.mark.asyncio
async def test_generational_delete(time, model: Model, dest, source, simple_config):
time.setNow(time.local(2019, 5, 10))
now = time.now()
# Create 4 backups, configured to keep 3
source.setMax(3)
source.insert("Fri", time.local(2019, 5, 10, 1))
source.insert("Thu", time.local(2019, 5, 9, 1))
wed = source.insert("Wed", time.local(2019, 5, 8, 1))
source.insert("Mon", time.local(2019, 5, 6, 1))
# configure only one to be kept in both places
simple_config.config.update({
"days_between_backups": 1,
"generational_weeks": 1,
"generational_days": 2
})
model.reinitialize()
await model.sync(now)
# Shoud only delete wed, since it isn't kept in the generational backup config
source.assertThat(current=3, deleted=1)
assert source.deleted == [wed]
assert len(model.backups) == 3
dest.assertThat(current=3, saved=3)
@pytest.mark.asyncio
async def test_delete_when_drive_disabled(time, model: Model, dest: HelperTestSource, source: HelperTestSource, simple_config):
time.setNow(time.local(2019, 5, 10))
now = time.now()
dest.setEnabled(False)
dest.setNeedsConfiguration(False)
# Create 4 backups, configured to keep 3
source.setMax(3)
source.insert("Fri", time.local(2019, 5, 10, 1))
source.insert("Thu", time.local(2019, 5, 9, 1))
source.insert("Wed", time.local(2019, 5, 8, 1))
mon = source.insert("Mon", time.local(2019, 5, 7, 1))
await model.sync(now)
# Shoud only delete mon, the oldest one
source.assertThat(current=3, deleted=1)
assert source.deleted == [mon]
assert len(model.backups) == 3
dest.assertThat(current=0)
@pytest.mark.asyncio
async def test_wait_for_startup_no_backup(time: FakeTime, model: Model, dest: HelperTestSource, source: HelperTestSource, global_info: GlobalInfo):
time.setNow(time.toUtc(time.local(2019, 5, 10)))
global_info.__init__(time)
global_info.triggerBackupCooldown(timedelta(minutes=10))
assert model.nextBackup(time.now()) == time.now() + timedelta(minutes=10)
assert model.nextBackup(time.now()) == global_info.backupCooldownTime()
assert model.waiting_for_startup
time.advance(minutes=10)
assert model.nextBackup(time.now()) == time.now()
assert not model.waiting_for_startup
@pytest.mark.asyncio
async def test_wait_for_startup_with_backup(time: FakeTime, model: Model, dest: HelperTestSource, source: HelperTestSource, global_info: GlobalInfo):
time.setNow(time.local(2019, 5, 10))
global_info.__init__(time)
global_info.triggerBackupCooldown(timedelta(minutes=10))
source.setMax(3)
source.insert("old", time.now() - timedelta(days=7))
assert model.nextBackup(time.now()) == time.now() + timedelta(minutes=10)
assert model.nextBackup(time.now()) == global_info.backupCooldownTime()
assert model.waiting_for_startup
time.advance(minutes=10)
assert model.nextBackup(time.now()) == time.now()
assert not model.waiting_for_startup
@pytest.mark.asyncio
async def test_wait_for_startup_with_backup_during_cooldown(time: FakeTime, model: Model, dest: HelperTestSource, source: HelperTestSource, global_info: GlobalInfo, simple_config: Config):
global_info.triggerBackupCooldown(timedelta(minutes=10))
source.setMax(3)
source.insert("old", time.now())
backup_time = time.toLocal(time.now()) + timedelta(minutes=5)
simple_config.override(Setting.BACKUP_TIME_OF_DAY, f"{backup_time.hour}:{backup_time.minute}")
model.reinitialize()
await model.sync(time.now())
assert model.nextBackup(time.now()) == global_info.backupCooldownTime()
assert model.waiting_for_startup
time.advance(minutes=15)
assert model.nextBackup(time.now()) == global_info.backupCooldownTime()
assert not model.waiting_for_startup
@pytest.mark.asyncio
async def test_ignore_startup_delay(time: FakeTime, model: Model, dest: HelperTestSource, source: HelperTestSource, global_info: GlobalInfo):
time.setNow(time.local(2019, 5, 10))
global_info.__init__(time)
global_info.triggerBackupCooldown(timedelta(minutes=10))
model.ignore_startup_delay = True
assert model.nextBackup(time.now()) == time.now()
assert not model.waiting_for_startup
def assertBackup(model, sources):
matches = {}
for source in sources:
matches[source.source()] = source
slug = source.slug()
assert slug in model.backups
assert model.backups[slug].sources == matches
@pytest.mark.asyncio
async def test_delete_after_upload(time: FakeTime, model: Model, dest: HelperTestSource, source: HelperTestSource, global_info: GlobalInfo):
model.config.override(Setting.DELETE_AFTER_UPLOAD, True)
source.setMax(100)
dest.setMax(100)
dest.insert("Destination 1", time.now())
dest.reset()
# Nothing should happen on a sync, the backup is already backed up.
await model.sync(time.now())
dest.assertThat(current=1)
source.assertThat()
time.advance(days=7)
source.insert("Source 1", time.now())
source.reset()
# Source backup should get backed up and the deleted
await model.sync(time.now())
source.assertThat(deleted=1, current=0)
dest.assertThat(saved=1, current=2)
@pytest.mark.asyncio
async def test_delete_after_upload_multiple_deletes(time: FakeTime, model: Model, dest: HelperTestSource, source: HelperTestSource, global_info: GlobalInfo):
model.config.override(Setting.DELETE_AFTER_UPLOAD, True)
source.setMax(100)
dest.setMax(100)
source.insert("Src 1", time.now())
time.advance(days=1)
source.insert("Src 2", time.now())
source.reset()
# Deleteing multiple backups should still fail with DELETE_AFTER_UPLOAD:True
with pytest.raises(DeleteMutlipleBackupsError):
await model.sync(time.now())
# But the backup should still get backed up
source.assertThat(current=2)
dest.assertThat(saved=2, current=2)
@pytest.mark.asyncio
async def test_delete_after_upload_simple_sync(time: FakeTime, model: Model, dest: HelperTestSource, source: HelperTestSource, global_info: GlobalInfo):
model.config.override(Setting.DELETE_AFTER_UPLOAD, True)
source.setMax(100)
dest.setMax(100)
# A sync should create a backup, back it up to dest, and then delete it from source.
await model.sync(time.now())
source.assertThat(created=1, deleted=1, current=0)
dest.assertThat(saved=1, current=1)
time.advance(hours=1)
source.reset()
dest.reset()
# Next sync should do nothing
await model.sync(time.now())
source.assertThat()
dest.assertThat(current=1)
@pytest.mark.asyncio
async def test_never_delete_ignored_backups(time: FakeTime, model: Model, dest: HelperTestSource, source: HelperTestSource):
source.setMax(1)
dest.setMax(1)
# A sync should create a backup and back it up to dest.
await model.sync(time.now())
source.assertThat(created=1, current=1)
dest.assertThat(saved=1, current=1)
source.reset()
dest.reset()
# Another sync shoudl delete a backup, which is just a sanity check.
time.advance(days=5)
await model.sync(time.now())
source.assertThat(created=1, current=1, deleted=1)
dest.assertThat(saved=1, current=1, deleted=1)
assert model.nextBackup(time.now()) == time.now() + timedelta(days=3)
source.reset()
dest.reset()
# Make the backup ignored, which should cause a new backup to be created
# and synced without the ignored one getting deleted.
next(iter((await dest.get()).values())).setIgnore(True)
next(iter((await source.get()).values())).setIgnore(True)
assert model.nextBackup(time.now()) < time.now()
await model.sync(time.now())
source.assertThat(created=1, current=2)
dest.assertThat(saved=1, current=2)
@pytest.mark.asyncio
async def test_ignored_backups_dont_upload(time: FakeTime, model: Model, dest: HelperTestSource, source: HelperTestSource):
source.setMax(2)
dest.setMax(2)
older = source.insert("older", time.now() - timedelta(days=1), slug="older")
older.setIgnore(True)
source.insert("newer", time.now(), slug="newer")
source.reset()
# A sync should backup the last backup and ignore the older one
await model.sync(time.now())
source.assertThat(created=0, current=2)
dest.assertThat(saved=1, current=1)
uploaded = await dest.get()
assert len(uploaded) == 1
assert next(iter(uploaded.values())).name() == "newer"
@pytest.mark.asyncio
async def test_dirty_cache_gets_saved(time: FakeTime, model: Model, data_cache: DataCache):
data_cache.makeDirty()
await model.sync(time.now())
assert not data_cache.dirty
@pytest.mark.asyncio
async def test_delete_after_upload_with_no_backups(source: HelperTestSource, dest: HelperTestSource, time: FakeTime, model: Model, data_cache: DataCache, simple_config: Config):
source.setMax(0)
dest.setMax(2)
simple_config.override(Setting.DELETE_AFTER_UPLOAD, True)
source.insert("older", time.now() - timedelta(days=1), slug="older")
source.insert("newer", time.now(), slug="newer")
source.reset()
with pytest.raises(DeleteMutlipleBackupsError):
await model.sync(time.now())
simple_config.override(Setting.CONFIRM_MULTIPLE_DELETES, False)
await model.sync(time.now())
dest.assertThat(saved=2, current=2)
source.assertThat(deleted=2, current=0)
@pytest.mark.asyncio
async def test_purge_before_upload(source: HelperTestSource, dest: HelperTestSource, time: FakeTime, model: Model, data_cache: DataCache, simple_config: Config):
source.setMax(2)
dest.setMax(2)
older = source.insert("older", time.now() - timedelta(days=7), slug="older")
oldest = source.insert("oldest", time.now() - timedelta(days=14), slug="oldest")
await model.sync(time.now() - timedelta(days=7))
source.allow_create = False
dest.allow_save = False
dest.reset()
source.reset()
# trying to sync now should do nothing.
with pytest.raises(IntentionalFailure):
await model.sync(time.now())
source.assertThat(current=2)
dest.assertThat(current=2)
simple_config.override(Setting.DELETE_BEFORE_NEW_BACKUP, True)
# Trying to sync should delete the backup before syncing and then fail to create a new one.
with pytest.raises(IntentionalFailure):
await model.sync(time.now())
source.assertThat(deleted=1, current=1)
assert oldest.slug() not in (await source.get()).keys()
dest.assertThat(current=2)
# Trying to do it again should do nothing (eg not delete another backup)
with pytest.raises(IntentionalFailure):
await model.sync(time.now())
source.assertThat(deleted=1, current=1)
dest.assertThat(current=2)
# let the new source backup get created, which then deletes the destination but fails to save
source.allow_create = True
with pytest.raises(IntentionalFailure):
await model.sync(time.now())
source.assertThat(deleted=1, current=2, created=1)
dest.assertThat(current=1, deleted=1)
assert oldest.slug() not in (await dest.get()).keys()
# now let the new backup get saved.
dest.allow_save = True
await model.sync(time.now())
source.assertThat(deleted=1, current=2, created=1)
dest.assertThat(current=2, deleted=1, saved=1)
assert oldest.slug() not in (await source.get()).keys()
assert older.slug() in (await source.get()).keys()
assert oldest.slug() not in (await dest.get()).keys()
assert older.slug() in (await dest.get()).keys()
@pytest.mark.asyncio
async def test_generational_empty(time, model: Model, dest, source, simple_config: Config):
time.setNow(time.local(2019, 5, 10))
now = time.now()
simple_config.config.update({
"days_between_backups": 1,
"generational_weeks": 1,
"generational_days": 2
})
simple_config.override(Setting.DAYS_BETWEEN_BACKUPS, 1)
model.reinitialize()
assert len(model.backups) == 0
await model.sync(now)
assert len(model.backups) == 1
@pytest.mark.asyncio
async def test_delete_ignored_other_backup_after_some_time(time: FakeTime, model: Model, dest: HelperTestSource, source: HelperTestSource, simple_config: Config):
# Create a few backups, one ignored
source.setMax(2)
dest.setMax(2)
simple_config.override(Setting.DAYS_BETWEEN_BACKUPS, 0)
simple_config.override(Setting.IGNORE_OTHER_BACKUPS, True)
simple_config.override(Setting.DELETE_IGNORED_AFTER_DAYS, 1)
ignored = source.insert("Ignored", time.now())
source.insert("Existing", time.now())
ignored.setIgnore(True)
await model.sync(time.now())
source.assertThat(current=2)
dest.assertThat(saved=1, current=1)
source.reset()
dest.reset()
time.advance(hours=23)
await model.sync(time.now())
source.assertThat(current=2)
dest.assertThat(current=1)
time.advance(hours=2)
await model.sync(time.now())
source.assertThat(current=1, deleted=1)
dest.assertThat(current=1)
@pytest.mark.asyncio
async def test_delete_ignored_upgrade_backup_after_some_time(time: FakeTime, model: Model, dest: HelperTestSource, source: HelperTestSource, simple_config: Config):
# Create a few backups, one ignored
source.setMax(2)
dest.setMax(2)
simple_config.override(Setting.DAYS_BETWEEN_BACKUPS, 0)
simple_config.override(Setting.IGNORE_UPGRADE_BACKUPS, True)
simple_config.override(Setting.DELETE_IGNORED_AFTER_DAYS, 1)
ignored = source.insert("Ignored", time.now())
source.insert("Existing", time.now())
ignored.setIgnore(True)
await model.sync(time.now())
source.assertThat(current=2)
dest.assertThat(saved=1, current=1)
source.reset()
dest.reset()
time.advance(hours=23)
await model.sync(time.now())
source.assertThat(current=2)
dest.assertThat(current=1)
time.advance(hours=2)
await model.sync(time.now())
source.assertThat(current=1, deleted=1)
dest.assertThat(current=1)
@pytest.mark.asyncio
async def test_zero_config_whiled_deleting_backups(time: FakeTime, model: Model, dest: HelperTestSource, source: HelperTestSource, simple_config: Config):
"""
Issue #745 identified that setting setting destination max backups to 0 and "delete after upload"=True would cause destination
backups to get deleted due to an error in the logic for handling purges. This test verifies that no longer happens.
"""
source.setMax(1)
dest.setMax(1)
simple_config.override(Setting.DAYS_BETWEEN_BACKUPS, 1)
simple_config.override(Setting.DELETE_AFTER_UPLOAD, True)
source.insert("Backup", time.now())
await model.sync(time.now())
source.assertThat(current=0, deleted=1)
dest.assertThat(current=1, saved=1)
source.reset()
dest.reset()
dest.setMax(0)
await model.sync(time.now())
dest.assertThat(current=1)
source.assertThat()
async def simulate_backups_timeline(time: FakeTime, model: Model, start: datetime, end: datetime):
time.setNow(time.toUtc(start))
await model.sync(time.now())
while time.now() < end:
next = model.nextBackup(time.now())
assert next is not None
assert next > time.now()
time.setNow(next)
await model.sync(time.now())
@pytest.mark.asyncio
async def test_generational_delete_issue602(time: FakeTime, model: Model, dest: HelperTestSource, source: HelperTestSource, simple_config: Config):
time.setTimeZone("Europe/Rome")
start = time.local(2021, 1, 1, 1)
end = time.local(2023, 5, 1, 1)
simple_config.override(Setting.MAX_BACKUPS_IN_HA, 1)
simple_config.override(Setting.MAX_BACKUPS_IN_GOOGLE_DRIVE, 50)
simple_config.override(Setting.DAYS_BETWEEN_BACKUPS, 1)
simple_config.override(Setting.IGNORE_OTHER_BACKUPS, True)
simple_config.override(Setting.IGNORE_UPGRADE_BACKUPS, True)
simple_config.override(Setting.BACKUP_TIME_OF_DAY, "00:00")
simple_config.override(Setting.GENERATIONAL_DAYS, 14)
simple_config.override(Setting.GENERATIONAL_WEEKS, 5)
simple_config.override(Setting.GENERATIONAL_MONTHS, 13)
simple_config.override(Setting.GENERATIONAL_YEARS, 1)
simple_config.override(Setting.GENERATIONAL_DELETE_EARLY, True)
source.setMax(1)
dest.setMax(30)
model.reinitialize()
await simulate_backups_timeline(time, model, start, end)
dates = list([x.date() for x in dest.current.values()])
dates.sort()
assert dates == [time.parse('2022-04-30T22:00:00+00:00'),
time.parse('2022-05-31T22:00:00+00:00'),
time.parse('2022-06-30T22:00:00+00:00'),
time.parse('2022-07-31T22:00:00+00:00'),
time.parse('2022-08-31T22:00:00+00:00'),
time.parse('2022-09-30T22:00:00+00:00'),
time.parse('2022-10-31T23:00:00+00:00'),
time.parse('2022-11-30T23:00:00+00:00'),
time.parse('2022-12-31T23:00:00+00:00'),
time.parse('2023-01-31T23:00:00+00:00'),
time.parse('2023-02-28T23:00:00+00:00'),
time.parse('2023-03-31T22:00:00+00:00'),
time.parse('2023-04-02T22:00:00+00:00'),
time.parse('2023-04-09T22:00:00+00:00'),
time.parse('2023-04-16T22:00:00+00:00'),
time.parse('2023-04-18T22:00:00+00:00'),
time.parse('2023-04-19T22:00:00+00:00'),
time.parse('2023-04-20T22:00:00+00:00'),
time.parse('2023-04-21T22:00:00+00:00'),
time.parse('2023-04-22T22:00:00+00:00'),
time.parse('2023-04-23T22:00:00+00:00'),
time.parse('2023-04-24T22:00:00+00:00'),
time.parse('2023-04-25T22:00:00+00:00'),
time.parse('2023-04-26T22:00:00+00:00'),
time.parse('2023-04-27T22:00:00+00:00'),
time.parse('2023-04-28T22:00:00+00:00'),
time.parse('2023-04-29T22:00:00+00:00'),
time.parse('2023-04-30T22:00:00+00:00'),
time.parse('2023-05-01T22:00:00+00:00')]
@pytest.mark.asyncio
async def test_generational_delete_issue809(time: FakeTime, model: Model, dest: HelperTestSource, source: HelperTestSource, simple_config: Config):
time.setTimeZone("America/Los_Angeles")
simple_config.override(Setting.MAX_BACKUPS_IN_HA, 10)
simple_config.override(Setting.MAX_BACKUPS_IN_GOOGLE_DRIVE, 32)
simple_config.override(Setting.DAYS_BETWEEN_BACKUPS, 1)
simple_config.override(Setting.BACKUP_TIME_OF_DAY, "02:00")
simple_config.override(Setting.GENERATIONAL_DAYS, 7)
simple_config.override(Setting.GENERATIONAL_WEEKS, 4)
simple_config.override(Setting.GENERATIONAL_MONTHS, 6)
simple_config.override(Setting.GENERATIONAL_YEARS, 10)
source.setMax(simple_config.get(Setting.MAX_BACKUPS_IN_HA))
dest.setMax(simple_config.get(Setting.MAX_BACKUPS_IN_GOOGLE_DRIVE))
model.reinitialize()
start = time.local(2021, 1, 1)
end = time.local(2024, 5, 1)
await simulate_backups_timeline(time, model, start, end)
dates = list([x.date() for x in dest.current.values()])
dates.sort()
assert dates == [time.parse('2021-01-01 10:00:00+00:00'),
time.parse('2022-01-01 10:00:00+00:00'),
time.parse('2023-01-01 10:00:00+00:00'),
time.parse('2023-12-01 10:00:00+00:00'),
time.parse('2024-01-01 10:00:00+00:00'),
time.parse('2024-02-01 10:00:00+00:00'),
time.parse('2024-03-01 10:00:00+00:00'),
time.parse('2024-04-01 09:00:00+00:00'),
time.parse('2024-04-08 09:00:00+00:00'),
time.parse('2024-04-09 09:00:00+00:00'),
time.parse('2024-04-10 09:00:00+00:00'),
time.parse('2024-04-11 09:00:00+00:00'),
time.parse('2024-04-12 09:00:00+00:00'),
time.parse('2024-04-13 09:00:00+00:00'),
time.parse('2024-04-14 09:00:00+00:00'),
time.parse('2024-04-15 09:00:00+00:00'),
time.parse('2024-04-16 09:00:00+00:00'),
time.parse('2024-04-17 09:00:00+00:00'),
time.parse('2024-04-18 09:00:00+00:00'),
time.parse('2024-04-19 09:00:00+00:00'),
time.parse('2024-04-20 09:00:00+00:00'),
time.parse('2024-04-21 09:00:00+00:00'),
time.parse('2024-04-22 09:00:00+00:00'),
time.parse('2024-04-23 09:00:00+00:00'),
time.parse('2024-04-24 09:00:00+00:00'),
time.parse('2024-04-25 09:00:00+00:00'),
time.parse('2024-04-26 09:00:00+00:00'),
time.parse('2024-04-27 09:00:00+00:00'),
time.parse('2024-04-28 09:00:00+00:00'),
time.parse('2024-04-29 09:00:00+00:00'),
time.parse('2024-04-30 09:00:00+00:00'),
time.parse('2024-05-01 09:00:00+00:00')]
async def test_generational_delete_dst_start_rome(time: FakeTime, model: Model, dest: HelperTestSource, source: HelperTestSource, simple_config: Config):
time.setTimeZone("Europe/Rome")
simple_config.override(Setting.MAX_BACKUPS_IN_HA, 1)
simple_config.override(Setting.MAX_BACKUPS_IN_GOOGLE_DRIVE, 8)
simple_config.override(Setting.DAYS_BETWEEN_BACKUPS, 1)
simple_config.override(Setting.BACKUP_TIME_OF_DAY, "02:00")
simple_config.override(Setting.GENERATIONAL_DAYS, 3)
simple_config.override(Setting.GENERATIONAL_WEEKS, 3)
simple_config.override(Setting.GENERATIONAL_MONTHS, 2)
simple_config.override(Setting.GENERATIONAL_DELETE_EARLY, True)
source.setMax(simple_config.get(Setting.MAX_BACKUPS_IN_HA))
dest.setMax(simple_config.get(Setting.MAX_BACKUPS_IN_GOOGLE_DRIVE))
model.reinitialize()
start = time.local(2023, 1, 1)
end = time.local(2023, 3, 25)
await simulate_backups_timeline(time, model, start, end)
dates = list([x.date() for x in dest.current.values()])
dates.sort()
assert dates == [time.local(2023, 2, 1, 2),
time.local(2023, 3, 1, 2),
time.local(2023, 3, 6, 2),
time.local(2023, 3, 13, 2),
time.local(2023, 3, 20, 2),
time.local(2023, 3, 23, 2),
time.local(2023, 3, 24, 2),
time.local(2023, 3, 25, 2)]
assert time.now() == time.local(2023, 3, 25, 2)
assert model.nextBackup(time.now()) == time.local(2023, 3, 26, 2)
for x in range(0, 24 * 15):
time.advance(minutes=15)
assert model.nextBackup(time.now()) == time.local(2023, 3, 26, 2)
time.setNow(time.toUtc(time.local(2023, 3, 26, 2)))
await model.sync(time.now())
assert max([x.date() for x in dest.current.values()]) == time.local(2023, 3, 26, 2)
for x in range(0, 24 * 15):
time.advance(minutes=15)
assert model.nextBackup(time.now()) == time.local(2023, 3, 27, 2)
async def test_generational_delete_dst_start_los_angeles(time: FakeTime, model: Model, dest: HelperTestSource, source: HelperTestSource, simple_config: Config):
time.setTimeZone("America/Los_Angeles")
simple_config.override(Setting.MAX_BACKUPS_IN_HA, 1)
simple_config.override(Setting.MAX_BACKUPS_IN_GOOGLE_DRIVE, 8)
simple_config.override(Setting.DAYS_BETWEEN_BACKUPS, 1)
simple_config.override(Setting.BACKUP_TIME_OF_DAY, "02:00")
simple_config.override(Setting.GENERATIONAL_DAYS, 3)
simple_config.override(Setting.GENERATIONAL_WEEKS, 3)
simple_config.override(Setting.GENERATIONAL_MONTHS, 2)
simple_config.override(Setting.GENERATIONAL_DELETE_EARLY, True)
source.setMax(simple_config.get(Setting.MAX_BACKUPS_IN_HA))
dest.setMax(simple_config.get(Setting.MAX_BACKUPS_IN_GOOGLE_DRIVE))
model.reinitialize()
start = time.local(2023, 1, 1)
end = time.local(2023, 3, 11)
await simulate_backups_timeline(time, model, start, end)
dates = list([x.date() for x in dest.current.values()])
dates.sort()
assert dates == [time.local(2023, 2, 1, 2),
time.local(2023, 2, 20, 2),
time.local(2023, 2, 27, 2),
time.local(2023, 3, 1, 2),
time.local(2023, 3, 6, 2),
time.local(2023, 3, 9, 2),
time.local(2023, 3, 10, 2),
time.local(2023, 3, 11, 2)]
assert time.now() == time.local(2023, 3, 11, 2)
assert model.nextBackup(time.now()) == time.local(2023, 3, 12, 2)
for x in range(0, 24 * 15):
time.advance(minutes=15)
assert model.nextBackup(time.now()) == time.local(2023, 3, 12, 2)
time.setNow(time.toUtc(time.local(2023, 3, 12, 2)))
await model.sync(time.now())
assert max([x.date() for x in dest.current.values()]) == time.local(2023, 3, 12, 2)
for x in range(0, 24 * 15):
time.advance(minutes=15)
assert model.nextBackup(time.now()) == time.local(2023, 3, 13, 2)
async def test_generational_delete_dst_start_rome_2_30(time: FakeTime, model: Model, dest: HelperTestSource, source: HelperTestSource, simple_config: Config):
time.setTimeZone("Europe/Rome")
simple_config.override(Setting.MAX_BACKUPS_IN_HA, 1)
simple_config.override(Setting.MAX_BACKUPS_IN_GOOGLE_DRIVE, 8)
simple_config.override(Setting.DAYS_BETWEEN_BACKUPS, 1)
simple_config.override(Setting.BACKUP_TIME_OF_DAY, "02:30")
simple_config.override(Setting.GENERATIONAL_DAYS, 3)
simple_config.override(Setting.GENERATIONAL_WEEKS, 3)
simple_config.override(Setting.GENERATIONAL_MONTHS, 2)
simple_config.override(Setting.GENERATIONAL_DELETE_EARLY, True)
source.setMax(simple_config.get(Setting.MAX_BACKUPS_IN_HA))
dest.setMax(simple_config.get(Setting.MAX_BACKUPS_IN_GOOGLE_DRIVE))
model.reinitialize()
start = time.local(2023, 1, 1)
end = time.local(2023, 3, 25)
await simulate_backups_timeline(time, model, start, end)
dates = list([x.date() for x in dest.current.values()])
dates.sort()
assert dates == [time.local(2023, 2, 1, 2, 30),
time.local(2023, 3, 1, 2, 30),
time.local(2023, 3, 6, 2, 30),
time.local(2023, 3, 13, 2, 30),
time.local(2023, 3, 20, 2, 30),
time.local(2023, 3, 23, 2, 30),
time.local(2023, 3, 24, 2, 30),
time.local(2023, 3, 25, 2, 30)]
assert time.now() == time.local(2023, 3, 25, 2, 30)
assert model.nextBackup(time.now()) == time.local(2023, 3, 26, 2, 30)
for x in range(0, 24 * 15):
time.advance(minutes=15)
assert model.nextBackup(time.now()) == time.local(2023, 3, 26, 2, 30)
time.setNow(time.toUtc(time.local(2023, 3, 26, 2, 30)))
await model.sync(time.now())
assert max([x.date() for x in dest.current.values()]) == time.local(2023, 3, 26, 2, 30)
for x in range(0, 24 * 15):
time.advance(minutes=15)
assert model.nextBackup(time.now()) == time.local(2023, 3, 27, 2, 30)
async def test_generational_delete_dst_start_rome_3_00(time: FakeTime, model: Model, dest: HelperTestSource, source: HelperTestSource, simple_config: Config):
time.setTimeZone("Europe/Rome")
simple_config.override(Setting.MAX_BACKUPS_IN_HA, 1)
simple_config.override(Setting.MAX_BACKUPS_IN_GOOGLE_DRIVE, 8)
simple_config.override(Setting.DAYS_BETWEEN_BACKUPS, 1)
simple_config.override(Setting.BACKUP_TIME_OF_DAY, "03:00")
simple_config.override(Setting.GENERATIONAL_DAYS, 3)
simple_config.override(Setting.GENERATIONAL_WEEKS, 3)
simple_config.override(Setting.GENERATIONAL_MONTHS, 2)
simple_config.override(Setting.GENERATIONAL_DELETE_EARLY, True)
source.setMax(simple_config.get(Setting.MAX_BACKUPS_IN_HA))
dest.setMax(simple_config.get(Setting.MAX_BACKUPS_IN_GOOGLE_DRIVE))
model.reinitialize()
start = time.local(2023, 1, 1)
end = time.local(2023, 3, 25)
await simulate_backups_timeline(time, model, start, end)
dates = list([x.date() for x in dest.current.values()])
dates.sort()
assert dates == [time.local(2023, 2, 1, 3),
time.local(2023, 3, 1, 3),
time.local(2023, 3, 6, 3),
time.local(2023, 3, 13, 3),
time.local(2023, 3, 20, 3),
time.local(2023, 3, 23, 3),
time.local(2023, 3, 24, 3),
time.local(2023, 3, 25, 3)]
assert time.now() == time.local(2023, 3, 25, 3)
assert model.nextBackup(time.now()) == time.local(2023, 3, 26, 3)
for x in range(0, 24 * 15):
time.advance(minutes=15)
assert model.nextBackup(time.now()) == time.local(2023, 3, 26, 3)
time.setNow(time.toUtc(time.local(2023, 3, 26, 3)))
await model.sync(time.now())
assert max([x.date() for x in dest.current.values()]) == time.local(2023, 3, 26, 3)
for x in range(0, 24 * 15):
time.advance(minutes=15)
assert model.nextBackup(time.now()) == time.local(2023, 3, 27, 3)
async def test_generational_delete_dst_end_rome(time: FakeTime, model: Model, dest: HelperTestSource, source: HelperTestSource, simple_config: Config):
time.setTimeZone("Europe/Rome")
simple_config.override(Setting.MAX_BACKUPS_IN_HA, 1)
simple_config.override(Setting.MAX_BACKUPS_IN_GOOGLE_DRIVE, 8)
simple_config.override(Setting.DAYS_BETWEEN_BACKUPS, 1)
simple_config.override(Setting.BACKUP_TIME_OF_DAY, "02:00")
simple_config.override(Setting.GENERATIONAL_DAYS, 3)
simple_config.override(Setting.GENERATIONAL_WEEKS, 3)
simple_config.override(Setting.GENERATIONAL_MONTHS, 2)
simple_config.override(Setting.GENERATIONAL_DELETE_EARLY, True)
source.setMax(simple_config.get(Setting.MAX_BACKUPS_IN_HA))
dest.setMax(simple_config.get(Setting.MAX_BACKUPS_IN_GOOGLE_DRIVE))
model.reinitialize()
start = time.local(2023, 6, 1)
end = time.local(2023, 10, 28)
await simulate_backups_timeline(time, model, start, end)
dates = list([x.date() for x in dest.current.values()])
dates.sort()
assert dates == [time.local(2023, 9, 1, 2),
time.local(2023, 10, 1, 2),
time.local(2023, 10, 9, 2),
time.local(2023, 10, 16, 2),
time.local(2023, 10, 23, 2),
time.local(2023, 10, 26, 2),
time.local(2023, 10, 27, 2),
time.local(2023, 10, 28, 2)]
assert time.now() == time.local(2023, 10, 28, 2)
assert model.nextBackup(time.now()) == time.local(2023, 10, 29, 2)
for x in range(0, 24 * 15):
time.advance(minutes=15)
assert model.nextBackup(time.now()) == time.local(2023, 10, 29, 2)
time.setNow(time.toUtc(time.local(2023, 10, 29, 2)))
await model.sync(time.now())
assert max([x.date() for x in dest.current.values()]) == time.local(2023, 10, 29, 2)
for x in range(0, 24 * 15):
time.advance(minutes=15)
assert model.nextBackup(time.now()) == time.local(2023, 10, 30, 2)
async def test_generational_delete_dst_end_rome_2_30(time: FakeTime, model: Model, dest: HelperTestSource, source: HelperTestSource, simple_config: Config):
time.setTimeZone("Europe/Rome")
simple_config.override(Setting.MAX_BACKUPS_IN_HA, 1)
simple_config.override(Setting.MAX_BACKUPS_IN_GOOGLE_DRIVE, 8)
simple_config.override(Setting.DAYS_BETWEEN_BACKUPS, 1)
simple_config.override(Setting.BACKUP_TIME_OF_DAY, "02:30")
simple_config.override(Setting.GENERATIONAL_DAYS, 3)
simple_config.override(Setting.GENERATIONAL_WEEKS, 3)
simple_config.override(Setting.GENERATIONAL_MONTHS, 2)
simple_config.override(Setting.GENERATIONAL_DELETE_EARLY, True)
source.setMax(simple_config.get(Setting.MAX_BACKUPS_IN_HA))
dest.setMax(simple_config.get(Setting.MAX_BACKUPS_IN_GOOGLE_DRIVE))
model.reinitialize()
start = time.local(2023, 6, 1)
end = time.local(2023, 10, 28)
await simulate_backups_timeline(time, model, start, end)
dates = list([x.date() for x in dest.current.values()])
dates.sort()
assert dates == [time.local(2023, 9, 1, 2, 30),
time.local(2023, 10, 1, 2, 30),
time.local(2023, 10, 9, 2, 30),
time.local(2023, 10, 16, 2, 30),
time.local(2023, 10, 23, 2, 30),
time.local(2023, 10, 26, 2, 30),
time.local(2023, 10, 27, 2, 30),
time.local(2023, 10, 28, 2, 30)]
assert time.now() == time.local(2023, 10, 28, 2, 30)
assert model.nextBackup(time.now()) == time.local(2023, 10, 29, 2, 30)
for x in range(0, 24 * 15):
time.advance(minutes=15)
assert model.nextBackup(time.now()) == time.local(2023, 10, 29, 2, 30)
time.setNow(time.toUtc(time.local(2023, 10, 29, 2, 30)))
await model.sync(time.now())
assert max([x.date() for x in dest.current.values()]) == time.local(2023, 10, 29, 2, 30)
for x in range(0, 24 * 15):
time.advance(minutes=15)
assert model.nextBackup(time.now()) == time.local(2023, 10, 30, 2, 30)
async def test_generational_delete_dst_end_rome_3_00(time: FakeTime, model: Model, dest: HelperTestSource, source: HelperTestSource, simple_config: Config):
time.setTimeZone("Europe/Rome")
simple_config.override(Setting.MAX_BACKUPS_IN_HA, 1)
simple_config.override(Setting.MAX_BACKUPS_IN_GOOGLE_DRIVE, 8)
simple_config.override(Setting.DAYS_BETWEEN_BACKUPS, 1)
simple_config.override(Setting.BACKUP_TIME_OF_DAY, "03:00")
simple_config.override(Setting.GENERATIONAL_DAYS, 3)
simple_config.override(Setting.GENERATIONAL_WEEKS, 3)
simple_config.override(Setting.GENERATIONAL_MONTHS, 2)
simple_config.override(Setting.GENERATIONAL_DELETE_EARLY, True)
source.setMax(simple_config.get(Setting.MAX_BACKUPS_IN_HA))
dest.setMax(simple_config.get(Setting.MAX_BACKUPS_IN_GOOGLE_DRIVE))
model.reinitialize()
start = time.local(2023, 6, 1)
end = time.local(2023, 10, 28)
await simulate_backups_timeline(time, model, start, end)
dates = list([x.date() for x in dest.current.values()])
dates.sort()
assert dates == [time.local(2023, 9, 1, 3),
time.local(2023, 10, 1, 3),
time.local(2023, 10, 9, 3),
time.local(2023, 10, 16, 3),
time.local(2023, 10, 23, 3),
time.local(2023, 10, 26, 3),
time.local(2023, 10, 27, 3),
time.local(2023, 10, 28, 3)]
assert time.now() == time.local(2023, 10, 28, 3)
assert model.nextBackup(time.now()) == time.local(2023, 10, 29, 3)
for x in range(0, 24 * 15):
time.advance(minutes=15)
assert model.nextBackup(time.now()) == time.local(2023, 10, 29, 3)
time.setNow(time.toUtc(time.local(2023, 10, 29, 3)))
await model.sync(time.now())
assert max([x.date() for x in dest.current.values()]) == time.local(2023, 10, 29, 3)
for x in range(0, 24 * 15):
time.advance(minutes=15)
assert model.nextBackup(time.now()) == time.local(2023, 10, 30, 3)
def test_next_time_over_a_day(estimator, data_cache):
time: FakeTime = FakeTime()
now: datetime = time.localize(datetime(1985, 12, 6))
time.setNow(now)
info = GlobalInfo(time)
config: Config = createConfig()
config.override(Setting.BACKUP_TIME_OF_DAY, "00:00")
config.override(Setting.DAYS_BETWEEN_BACKUPS, 2.0)
model: Model = Model(config, time, default_source,
default_source, info, estimator, data_cache)
assert model._nextBackup(
now=now, last_backup=None) == now
assert model._nextBackup(
now=now, last_backup=now) == now + timedelta(days=2)