import os import json from time import sleep import pytest import asyncio from yarl import URL from aiohttp.client_exceptions import ClientResponseError from backup.config import Config, Setting from dev.simulationserver import SimulationServer from dev.simulated_google import SimulatedGoogle, URL_MATCH_UPLOAD_PROGRESS, URL_MATCH_FILE from dev.request_interceptor import RequestInterceptor from backup.drive import DriveSource, FolderFinder, DriveRequests, RETRY_SESSION_ATTEMPTS, UPLOAD_SESSION_EXPIRATION_DURATION, URL_START_UPLOAD from backup.drive.driverequests import (BASE_CHUNK_SIZE, CHUNK_UPLOAD_TARGET_SECONDS) from backup.drive.drivesource import FOLDER_MIME_TYPE from backup.exceptions import (BackupFolderInaccessible, BackupFolderMissingError, DriveQuotaExceeded, ExistingBackupFolderError, GoogleCantConnect, GoogleCredentialsExpired, GoogleInternalError, GoogleUnexpectedError, GoogleSessionError, GoogleTimeoutError, CredRefreshMyError, CredRefreshGoogleError) from backup.creds import Creds from backup.model import DriveBackup, DummyBackup from .faketime import FakeTime from .helpers import compareStreams, createBackupTar RETRY_EXHAUSTION_SLEEPS = [2, 4, 8, 16, 32] class BackupHelper(): def __init__(self, uploader, time): self.time = time self.uploader = uploader async def createFile(self, size=1024 * 1024 * 2, slug="testslug", name="Test Name", note=None): from_backup: DummyBackup = DummyBackup( name, self.time.toUtc(self.time.local(1985, 12, 6)), "fake source", slug, note=note, size=size) data = await self.uploader.upload(createBackupTar(slug, name, self.time.now(), size)) return from_backup, data @pytest.fixture def backup_helper(uploader, time): return BackupHelper(uploader, time) @pytest.mark.asyncio async def test_sync_empty(drive) -> None: assert len(await drive.get()) == 0 @pytest.mark.asyncio async def test_CRUD(backup_helper, drive, time, session) -> None: from_backup, data = await backup_helper.createFile() backup: DriveBackup = await drive.save(from_backup, data) assert backup.name() == "Test Name" assert backup.date() == time.local(1985, 12, 6) assert not backup.retained() assert backup.size() == data.size() assert backup.slug() == "testslug" assert len(backup.id()) > 0 assert backup.backupType() == from_backup.backupType() assert backup.protected() == from_backup.protected() from_backup.addSource(backup) # downlaod the item, its bytes should match up download = await drive.read(from_backup) data.position(0) await compareStreams(data, download) # read the item, make sure its data matches up backups = await drive.get() assert len(backups) == 1 backup = backups[from_backup.slug()] assert backup.name() == "Test Name" assert backup.date() == time.local(1985, 12, 6) assert not backup.retained() assert backup.size() == data.size() assert backup.slug() == "testslug" assert len(backup.id()) > 0 assert backup.backupType() == from_backup.backupType() assert backup.protected() == from_backup.protected() # update retention assert not backup.retained() await drive.retain(from_backup, True) assert (await drive.get())[from_backup.slug()].retained() await drive.retain(from_backup, False) assert not (await drive.get())[from_backup.slug()].retained() # Delete the item, make sure its gone await drive.delete(from_backup) backups = await drive.get() assert len(backups) == 0 @pytest.mark.asyncio async def test_folder_creation(drive, time, config): assert len(await drive.get()) == 0 folderId = await drive.getFolderId() assert len(folderId) > 0 item = await drive.drivebackend.get(folderId) assert not item["trashed"] assert item["name"] == "Home Assistant Backups" assert item["mimeType"] == FOLDER_MIME_TYPE assert item["appProperties"]['backup_folder'] == 'true' # sync again, assert the folder is reused time.advanceDay() os.remove(config.get(Setting.FOLDER_FILE_PATH)) assert len(await drive.get()) == 0 assert await drive.getFolderId() == folderId # trash the folder, assert we create a new one on sync await drive.drivebackend.update(folderId, {"trashed": True}) assert (await drive.drivebackend.get(folderId))["trashed"] is True assert len(await drive.get()) == 0 time.advanceDay() assert await drive.getFolderId() != folderId # delete the folder, assert we create a new one folderId = await drive.getFolderId() await drive.drivebackend.delete(folderId) time.advanceDay() assert len(await drive.get()) == 0 time.advanceDay() assert await drive.getFolderId() != folderId @pytest.mark.asyncio async def test_folder_selection(drive, time): folder_metadata = { 'name': "Junk Data", 'mimeType': FOLDER_MIME_TYPE, 'appProperties': { "backup_folder": "true", }, } # create two fodlers at different times id_old = (await drive.drivebackend.createFolder(folder_metadata))['id'] sleep(2) id_new = (await drive.drivebackend.createFolder(folder_metadata))['id'] # Verify we use the newest await drive.get() assert await drive.getFolderId() == id_new assert await drive.getFolderId() != id_old @pytest.mark.asyncio async def test_bad_auth_creds(drive: DriveSource, time): drive.drivebackend.creds._refresh_token = "not_allowed" with pytest.raises(GoogleCredentialsExpired): await drive.get() assert time.sleeps == [] @pytest.mark.asyncio async def test_out_of_space(backup_helper, drive: DriveSource, google: SimulatedGoogle): google.setDriveSpaceAvailable(100) from_backup, data = await backup_helper.createFile() await drive.get() with pytest.raises(DriveQuotaExceeded) as e: await drive.save(from_backup, data) assert e.value.data() == { 'backup_size': '2 MB', 'free_space': '100 B' } @pytest.mark.asyncio async def test_out_of_space_error(backup_helper, drive: DriveSource, google: SimulatedGoogle): google.setDriveSpaceAvailable(100) from_backup, data = await backup_helper.createFile() # Without calling drive.get(), the parser has no info about drive's space usage with pytest.raises(DriveQuotaExceeded) as e: await drive.save(from_backup, data) assert e.value.data() == { 'backup_size': 'Error', 'free_space': 'Error' } @pytest.mark.asyncio async def test_drive_dns_resolution_error(drive: DriveSource, config: Config, time): config.override(Setting.DRIVE_URL, "http://fsdfsdasdasdf.saasdsdfsdfsd.com:2567") with pytest.raises(GoogleCantConnect): await drive.get() assert time.sleeps == [] @pytest.mark.asyncio async def test_drive_connect_error(drive: DriveSource, config: Config, time): config.override(Setting.DRIVE_URL, "http://localhost:1034") with pytest.raises(GoogleCantConnect): await drive.get() assert time.sleeps == [] @pytest.mark.asyncio async def test_upload_session_expired(drive, time, backup_helper, interceptor: RequestInterceptor): from_backup, data = await backup_helper.createFile() interceptor.setError(URL_MATCH_UPLOAD_PROGRESS, status=404) with pytest.raises(GoogleSessionError): await drive.save(from_backup, data) assert time.sleeps == [] @pytest.mark.asyncio async def test_upload_resume(drive: DriveSource, time, backup_helper: BackupHelper, google: SimulatedGoogle, interceptor: RequestInterceptor): from_backup, data = await backup_helper.createFile() interceptor.setError(URL_MATCH_UPLOAD_PROGRESS, fail_after=1, status=500) # Upload, which will fail with pytest.raises(GoogleInternalError): await drive.save(from_backup, data) # Verify we uploaded one chunk assert google.chunks == [BASE_CHUNK_SIZE] # Retry the upload, which shoudl now pass interceptor.clear() data.position(0) drive_backup = await drive.save(from_backup, data) from_backup.addSource(drive_backup) assert google.chunks == [BASE_CHUNK_SIZE, BASE_CHUNK_SIZE, (data.size()) - BASE_CHUNK_SIZE * 2] # Verify the data is correct data.position(0) await compareStreams(data, await drive.read(from_backup)) def test_chunk_size(drive: DriveSource, config: Config): max = config.get(Setting.MAXIMUM_UPLOAD_CHUNK_BYTES) / BASE_CHUNK_SIZE assert drive.drivebackend._getNextChunkSize( 1000000000, 0) == max assert drive.drivebackend._getNextChunkSize( 1, CHUNK_UPLOAD_TARGET_SECONDS) == 1 assert drive.drivebackend._getNextChunkSize( 1000000000, CHUNK_UPLOAD_TARGET_SECONDS) == max assert drive.drivebackend._getNextChunkSize( 1, CHUNK_UPLOAD_TARGET_SECONDS) == 1 assert drive.drivebackend._getNextChunkSize( 1, 1) == CHUNK_UPLOAD_TARGET_SECONDS assert drive.drivebackend._getNextChunkSize( 1, 1.01) == CHUNK_UPLOAD_TARGET_SECONDS - 1 def test_chunk_size_limits(drive: DriveSource, config: Config): config.override(Setting.MAXIMUM_UPLOAD_CHUNK_BYTES, 1) assert drive.drivebackend._getNextChunkSize(1000000000, 0) == 1 assert drive.drivebackend._getNextChunkSize(1, 1000000) == 1 config.override(Setting.MAXIMUM_UPLOAD_CHUNK_BYTES, BASE_CHUNK_SIZE * 1.5) assert drive.drivebackend._getNextChunkSize(1000000000, 0) == 1 assert drive.drivebackend._getNextChunkSize(1, 1000000) == 1 config.override(Setting.MAXIMUM_UPLOAD_CHUNK_BYTES, BASE_CHUNK_SIZE * 3.5) assert drive.drivebackend._getNextChunkSize(1000000000, 0) == 3 assert drive.drivebackend._getNextChunkSize(1, 1000000) == 1 @pytest.mark.asyncio async def test_working_through_upload(drive: DriveSource, server: SimulationServer, backup_helper: BackupHelper, interceptor: RequestInterceptor): assert not drive.isWorking() # Let a single chunk upload, then wait matcher = interceptor.setWaiter(URL_MATCH_UPLOAD_PROGRESS, attempts=1) from_backup, data = await backup_helper.createFile(size=1024 * 1024 * 10) save_task = asyncio.create_task(drive.save(from_backup, data)) await matcher.waitForCall() assert drive.isWorking() # let it complete matcher.clear() await save_task assert not drive.isWorking() @pytest.mark.asyncio async def test_drive_timeout(drive, config, time: FakeTime): # Ensure we have credentials await drive.get() config.override(Setting.GOOGLE_DRIVE_TIMEOUT_SECONDS, 0.000001) with pytest.raises(GoogleTimeoutError): await drive.get() assert time.sleeps == [] @pytest.mark.asyncio async def test_resume_upload_attempts_exhausted(drive: DriveSource, time, backup_helper, interceptor: RequestInterceptor, google: SimulatedGoogle): # Allow an upload to update one chunk and then fail. from_backup, data = await backup_helper.createFile() interceptor.setError(URL_MATCH_UPLOAD_PROGRESS, fail_after=1, status=500) with pytest.raises(GoogleInternalError): await drive.save(from_backup, data) assert google.chunks == [BASE_CHUNK_SIZE] # Verify we have a cached location assert drive.drivebackend.last_attempt_location is not None assert drive.drivebackend.last_attempt_count == 1 last_location = drive.drivebackend.last_attempt_location for x in range(1, RETRY_SESSION_ATTEMPTS): data.position(0) with pytest.raises(GoogleInternalError): await drive.save(from_backup, data) assert drive.drivebackend.last_attempt_count == x + 1 # We should still be using the same location url assert drive.drivebackend.last_attempt_location == last_location # Another attempt should use another location url with pytest.raises(GoogleInternalError): data.position(0) await drive.save(from_backup, data) assert drive.drivebackend.last_attempt_count == 0 assert drive.drivebackend.last_attempt_location is not None assert drive.drivebackend.last_attempt_location != last_location # Now let it succeed interceptor.clear() data.position(0) drive_backup = await drive.save(from_backup, data) from_backup.addSource(drive_backup) # And verify the bytes are correct data.position(0) await compareStreams(data, await drive.read(from_backup)) @pytest.mark.asyncio async def test_google_internal_error(drive, server, time: FakeTime, interceptor: RequestInterceptor): interceptor.setError(URL_MATCH_FILE, 500) with pytest.raises(GoogleInternalError): await drive.get() assert time.sleeps == RETRY_EXHAUSTION_SLEEPS time.clearSleeps() interceptor.clear() interceptor.setError(URL_MATCH_FILE, 500) with pytest.raises(GoogleInternalError): await drive.get() assert time.sleeps == RETRY_EXHAUSTION_SLEEPS @pytest.mark.asyncio async def test_check_time(drive: DriveSource, drive_creds: Creds): assert not await drive.check() drive.saveCreds(drive_creds) assert await drive.check() @pytest.mark.asyncio async def test_disable_upload(drive: DriveSource, config: Config): assert drive.upload() config.override(Setting.ENABLE_DRIVE_UPLOAD, False) assert not drive.upload() @pytest.mark.asyncio async def test_resume_session_abandoned_on_http4XX(time, drive: DriveSource, config: Config, server, backup_helper, interceptor: RequestInterceptor): from_backup, data = await backup_helper.createFile() # Configure the upload to fail after the first upload chunk interceptor.setError(URL_MATCH_UPLOAD_PROGRESS, 402, 1) with pytest.raises(ClientResponseError): await drive.save(from_backup, data) # Verify a requst was made to start the upload but not cached assert server.wasUrlRequested( "/upload/drive/v3/files/?uploadType=resumable&supportsAllDrives=true") assert drive.drivebackend.last_attempt_count == 1 assert drive.drivebackend.last_attempt_location is None assert drive.drivebackend.last_attempt_metadata is None # upload again, which should retry server.urls.clear() interceptor.clear() data.position(0) backup = await drive.save(from_backup, data) assert server.wasUrlRequested(URL_START_UPLOAD) # Verify the uploaded bytes are identical from_backup.addSource(backup) download = await drive.read(from_backup) data.position(0) await compareStreams(data, download) @pytest.mark.asyncio async def test_resume_session_abandoned_after_a_long_time(time: FakeTime, drive: DriveSource, config: Config, server: SimulationServer, backup_helper, interceptor: RequestInterceptor): from_backup, data = await backup_helper.createFile() # Configure the upload to fail after the first upload chunk interceptor.setError(URL_MATCH_UPLOAD_PROGRESS, 501, 1) with pytest.raises(ClientResponseError): await drive.save(from_backup, data) # Verify it reuses the session a few times assert server.wasUrlRequested(URL_START_UPLOAD) assert drive.drivebackend.last_attempt_count == 1 assert drive.drivebackend.last_attempt_location is not None assert drive.drivebackend.last_attempt_metadata is not None data.position(0) with pytest.raises(ClientResponseError): await drive.save(from_backup, data) assert drive.drivebackend.last_attempt_count == 2 assert drive.drivebackend.last_attempt_location is not None assert drive.drivebackend.last_attempt_metadata is not None last_location = drive.drivebackend.last_attempt_location # Fast forward a lot, then verify the session is restarted server.urls.clear() interceptor.clear() time.advance(duration=UPLOAD_SESSION_EXPIRATION_DURATION) data.position(0) await drive.save(from_backup, data) assert interceptor.urlWasCalled(URL_START_UPLOAD) assert not interceptor.urlWasCalled(last_location) @pytest.mark.asyncio async def test_chunk_upload_resets_attempt_counter(time: FakeTime, drive: DriveSource, config: Config, server: SimulationServer, backup_helper: BackupHelper, interceptor: RequestInterceptor): from_backup, data = await backup_helper.createFile(size=1024 * 1024 * 10) # Configure the upload to fail after the first upload chunk interceptor.setError(URL_MATCH_UPLOAD_PROGRESS, 501, 1) with pytest.raises(ClientResponseError): await drive.save(from_backup, data) data.position(0) with pytest.raises(ClientResponseError): await drive.save(from_backup, data) # Verify the session was started assert interceptor.urlWasCalled(URL_START_UPLOAD) assert interceptor.urlWasCalled(URL_MATCH_UPLOAD_PROGRESS) assert drive.drivebackend.last_attempt_count == 2 location = drive.drivebackend.last_attempt_location assert location is not None # Allow one more chunk to succeed interceptor.clear() interceptor.setError(URL_MATCH_UPLOAD_PROGRESS, 501, 2) data.position(0) with pytest.raises(ClientResponseError): await drive.save(from_backup, data) # Verify the session was reused and the attempt counter was reset assert not interceptor.urlWasCalled(URL_START_UPLOAD) assert interceptor.urlWasCalled(URL_MATCH_UPLOAD_PROGRESS) assert interceptor.urlWasCalled(URL(location).path) assert drive.drivebackend.last_attempt_count == 1 assert drive.drivebackend.last_attempt_location == location @pytest.mark.asyncio async def test_resume_session_reused_on_http5XX(time, drive: DriveSource, config: Config, server, backup_helper, interceptor: RequestInterceptor): await verify_upload_resumed(time, drive, config, server, interceptor, 550, backup_helper) @pytest.mark.asyncio async def test_resume_session_reused_abonded_after_retries(time, drive: DriveSource, config: Config, server: SimulationServer, backup_helper, interceptor: RequestInterceptor): from_backup, data = await backup_helper.createFile() # Configure the upload to fail after the first upload chunk interceptor.setError(URL_MATCH_UPLOAD_PROGRESS, 501, 1) with pytest.raises(ClientResponseError): await drive.save(from_backup, data) # Verify a requst was made to start the upload but not cached assert server.wasUrlRequested(URL_START_UPLOAD) assert drive.drivebackend.last_attempt_count == 1 assert drive.drivebackend.last_attempt_location is not None assert drive.drivebackend.last_attempt_metadata is not None last_location = drive.drivebackend.last_attempt_location for x in range(1, RETRY_SESSION_ATTEMPTS): server.urls.clear() interceptor.clear() interceptor.setError(URL_MATCH_UPLOAD_PROGRESS, 501) data.position(0) with pytest.raises(ClientResponseError): await drive.save(from_backup, data) assert not server.wasUrlRequested(URL_START_UPLOAD) assert server.wasUrlRequested(last_location) assert drive.drivebackend.last_attempt_count == x + 1 assert drive.drivebackend.last_attempt_location is last_location assert drive.drivebackend.last_attempt_metadata is not None # Next attempt should give up and restart the upload server.urls.clear() interceptor.clear() interceptor.setError(URL_MATCH_UPLOAD_PROGRESS, 501, 1) data.position(0) with pytest.raises(ClientResponseError): await drive.save(from_backup, data) assert server.wasUrlRequested(URL_START_UPLOAD) assert not server.wasUrlRequested(last_location) assert drive.drivebackend.last_attempt_count == 1 # upload again, which should retry server.urls.clear() interceptor.clear() data.position(0) backup = await drive.save(from_backup, data) assert not server.wasUrlRequested(URL_START_UPLOAD) # Verify the uploaded bytes are identical from_backup.addSource(backup) download = await drive.read(from_backup) data.position(0) await compareStreams(data, download) async def verify_upload_resumed(time, drive: DriveSource, config: Config, server: SimulationServer, interceptor: RequestInterceptor, status, backup_helper, expected=ClientResponseError): from_backup, data = await backup_helper.createFile() # Configure the upload to fail after the first upload chunk interceptor.setError(URL_MATCH_UPLOAD_PROGRESS, status, 1) with pytest.raises(expected): await drive.save(from_backup, data) # Verify a requst was made to start the upload assert server.wasUrlRequested(URL_START_UPLOAD) assert drive.drivebackend.last_attempt_location is not None assert drive.drivebackend.last_attempt_metadata is not None last_location = drive.drivebackend.last_attempt_location # Retry the upload and let is succeed server.urls.clear() interceptor.clear() data.position(0) backup = await drive.save(from_backup, data) # We shoudl nto see the upload "initialize" url assert not server.wasUrlRequested(URL_START_UPLOAD) # We should see the last location url (which has a unique token) reused to resume the upload assert server.wasUrlRequested(last_location) # The saved metadata should be cleared out. assert drive.drivebackend.last_attempt_count == 1 assert drive.drivebackend.last_attempt_location is None assert drive.drivebackend.last_attempt_metadata is None # Verify the uploaded bytes are identical from_backup.addSource(backup) download = await drive.read(from_backup) data.position(0) await compareStreams(data, download) @pytest.mark.asyncio async def test_recreate_folder_when_deleted(time, drive: DriveSource, config: Config, backup_helper, folder_finder: FolderFinder): await drive.get() id = await drive.getFolderId() await drive.drivebackend.delete(id) assert len(await drive.get()) == 0 assert id != await drive.getFolderId() @pytest.mark.asyncio async def test_recreate_folder_when_losing_permissions(time, drive: DriveSource, config: Config, backup_helper, google: SimulatedGoogle): await drive.get() id = await drive.getFolderId() google.lostPermission.append(id) assert len(await drive.get()) == 0 assert id != await drive.getFolderId() @pytest.mark.asyncio async def test_folder_missing_on_upload(time, drive: DriveSource, config: Config, backup_helper): # Make the folder await drive.get() # Require a specified folder so we don't query config.override(Setting.SPECIFY_BACKUP_FOLDER, True) # Delete the folder await drive.drivebackend.delete(await drive.getFolderId()) # Then try to make one from_backup, data = await backup_helper.createFile() # Configure the upload to fail after the first upload chunk with pytest.raises(BackupFolderInaccessible): await drive.save(from_backup, data) @pytest.mark.asyncio async def test_folder_error_on_upload_lost_permission(time, drive: DriveSource, config: Config, google: SimulatedGoogle, backup_helper, session): # Make the folder await drive.get() # Require a specified folder so we don't query config.override(Setting.SPECIFY_BACKUP_FOLDER, True) # Make the folder inaccessible google.lostPermission.append(await drive.getFolderId()) time.advanceDay() # Fail to upload with pytest.raises(BackupFolderInaccessible): await drive.save(*await backup_helper.createFile()) @pytest.mark.asyncio async def test_folder_error_on_upload_lost_permission_custom_client(time, drive: DriveSource, config: Config, google: SimulatedGoogle, backup_helper, session): # Make the folder await drive.get() # Require a specified folder so we don't query config.override(Setting.SPECIFY_BACKUP_FOLDER, True) google._client_id_hack = config.get(Setting.DEFAULT_DRIVE_CLIENT_ID) config.override(Setting.DEFAULT_DRIVE_CLIENT_ID, "something-else") # Make the folder inaccessible google.lostPermission.append(await drive.getFolderId()) time.advanceDay() # Fail to upload with pytest.raises(BackupFolderInaccessible): await drive.save(*await backup_helper.createFile()) @pytest.mark.asyncio async def test_folder_error_on_query_lost_permission(time, drive: DriveSource, config: Config, google: SimulatedGoogle): # Make the folder await drive.get() # Require a specified folder so we don't query config.override(Setting.SPECIFY_BACKUP_FOLDER, "true") config.override(Setting.DEFAULT_DRIVE_CLIENT_ID, "something") # Make the folder inaccessible google.lostPermission.append(await drive.getFolderId()) # It shoudl fail! with pytest.raises(BackupFolderInaccessible): await drive.get() @pytest.mark.asyncio async def test_folder_error_on_query_deleted(time, drive: DriveSource, config: Config, server): # Make the folder await drive.get() # Require a specified folder so we don't query config.override(Setting.SPECIFY_BACKUP_FOLDER, "true") config.override(Setting.DEFAULT_DRIVE_CLIENT_ID, "something") # Delete the folder await drive.drivebackend.delete(await drive.getFolderId()) # It should fail! with pytest.raises(BackupFolderInaccessible): await drive.get() @pytest.mark.asyncio async def test_backup_folder_not_specified(time, drive: DriveSource, config: Config, server, backup_helper): config.override(Setting.SPECIFY_BACKUP_FOLDER, "true") with pytest.raises(BackupFolderMissingError): await drive.get() from_backup, data = await backup_helper.createFile() with pytest.raises(BackupFolderMissingError): await drive.save(from_backup, data) config.override(Setting.DEFAULT_DRIVE_CLIENT_ID, "something") with pytest.raises(BackupFolderMissingError): await drive.get() with pytest.raises(BackupFolderMissingError): await drive.save(from_backup, data) @pytest.mark.asyncio async def test_folder_invalid_when_specified(time, drive: DriveSource, config: Config, server): await drive.get() config.override(Setting.SPECIFY_BACKUP_FOLDER, "true") await drive.drivebackend.update(await drive.getFolderId(), {"trashed": True}) time.advanceDay() with pytest.raises(BackupFolderInaccessible): await drive.get() @pytest.mark.asyncio async def test_no_folder_when_required(time, drive: DriveSource, config: Config): config.override(Setting.SPECIFY_BACKUP_FOLDER, "true") with pytest.raises(BackupFolderMissingError): await drive.get() @pytest.mark.asyncio async def test_existing_folder_already_exists(time, drive: DriveSource, config: Config, folder_finder: FolderFinder): await drive.get() drive.checkBeforeChanges() # Reset folder, try again folder_finder.reset() await drive.get() with pytest.raises(ExistingBackupFolderError): drive.checkBeforeChanges() @pytest.mark.asyncio async def test_existing_resolved_use_existing(time, drive: DriveSource, config: Config, folder_finder: FolderFinder): await drive.get() drive.checkBeforeChanges() folder_id = await drive.getFolderId() # Reset folder, try again folder_finder.reset() await drive.get() with pytest.raises(ExistingBackupFolderError): drive.checkBeforeChanges() folder_finder.resolveExisting(True) await drive.get() drive.checkBeforeChanges() assert await drive.getFolderId() == folder_id @pytest.mark.asyncio async def test_existing_resolved_create_new(time, drive: DriveSource, config: Config, folder_finder: FolderFinder): await drive.get() drive.checkBeforeChanges() folder_id = await drive.getFolderId() # Reset folder, try again folder_finder.reset() await drive.get() with pytest.raises(ExistingBackupFolderError): drive.checkBeforeChanges() folder_finder.resolveExisting(False) await drive.get() drive.checkBeforeChanges() assert await drive.getFolderId() != folder_id @pytest.mark.asyncio async def test_cred_refresh_with_secret(drive: DriveSource, google: SimulatedGoogle, time: FakeTime, config: Config): google.resetDriveAuth() with open(config.get(Setting.CREDENTIALS_FILE_PATH), "w") as f: creds = google.creds() creds._secret = config.get(Setting.DEFAULT_DRIVE_CLIENT_SECRET) json.dump(creds.serialize(), f) drive.drivebackend.tryLoadCredentials() await drive.get() old_creds = drive.drivebackend.creds # valid creds should be reused await drive.get() assert old_creds.access_token == drive.drivebackend.creds.access_token # then refreshed when they expire time.advanceDay() await drive.get() assert old_creds.access_token != drive.drivebackend.creds.access_token # verify the client_secret is kept with open(config.get(Setting.CREDENTIALS_FILE_PATH)) as f: assert "client_secret" in json.load(f) @pytest.mark.asyncio async def test_cred_refresh_no_secret(drive: DriveSource, google: SimulatedGoogle, time: FakeTime, config: Config): drive.saveCreds(google.creds()) await drive.get() old_creds = drive.drivebackend.creds await drive.get() assert old_creds.access_token == drive.drivebackend.creds.access_token time.advanceDay() await drive.get() assert old_creds.access_token != drive.drivebackend.creds.access_token with open(config.get(Setting.CREDENTIALS_FILE_PATH)) as f: assert "client_secret" not in json.load(f) @pytest.mark.asyncio async def test_cred_refresh_upgrade_default_client(drive: DriveSource, server: SimulationServer, time: FakeTime, config: Config): return # TODO: Enable this when we start removing the default client_secret config.override(Setting.DEFAULT_DRIVE_CLIENT_ID, server.getSetting("drive_client_id")) creds = server.getCurrentCreds() creds_with_secret = server.getCurrentCreds() creds_with_secret._secret = server.getSetting("drive_client_secret") with open(config.get(Setting.CREDENTIALS_FILE_PATH), "w") as f: json.dump(creds_with_secret.serialize(), f) # reload the creds drive.drivebackend.tryLoadCredentials() # Verify the "client secret" was removed with open(config.get(Setting.CREDENTIALS_FILE_PATH)) as f: saved_creds = json.load(f) assert saved_creds == creds.serialize() await drive.get() old_creds = drive.drivebackend.cred_bearer await drive.get() assert old_creds == drive.drivebackend.cred_bearer time.advanceDay() await drive.get() assert old_creds != drive.drivebackend.cred_bearer @pytest.mark.asyncio async def test_cant_reach_refresh_server(drive: DriveSource, server: SimulationServer, config: Config, time): config.override(Setting.TOKEN_SERVER_HOSTS, "http://lkasdpoiwehjhcty.com") drive.drivebackend.creds._secret = None time.advanceDay() with pytest.raises(CredRefreshMyError) as error: await drive.get() assert error.value.data() == {"reason": "Couldn't communicate with lkasdpoiwehjhcty.com"} @pytest.mark.asyncio async def test_refresh_problem_with_google(drive: DriveSource, interceptor: RequestInterceptor, config: Config, time): time.advanceDay() interceptor.setError(".*/oauth2/v4/token.*", status=510) drive.drivebackend.creds._secret = None with pytest.raises(CredRefreshGoogleError) as error: await drive.get() assert error.value.data() == {"from_google": "Google returned HTTP 510"} @pytest.mark.asyncio async def test_ignore_trashed_backups(time, drive: DriveSource, config: Config, server, backup_helper): backup = await backup_helper.createFile() drive_backup = await drive.save(*backup) assert len(await drive.get()) == 1 await drive.drivebackend.update(drive_backup.id(), {"trashed": True}) assert len(await drive.get()) == 0 @pytest.mark.asyncio async def test_download_timeout(time, drive: DriveSource, config: Config, interceptor: RequestInterceptor, backup_helper): config.override(Setting.DOWNLOAD_TIMEOUT_SECONDS, 0.1) from_backup, data = await backup_helper.createFile() backup = await drive.save(from_backup, data) # Verify the uploaded bytes are identical from_backup.addSource(backup) interceptor.setSleep(URL_MATCH_FILE, sleep=100) download = await drive.read(from_backup) data.position(0) with pytest.raises(GoogleTimeoutError): await compareStreams(data, download) @pytest.mark.asyncio async def test_resume_session_ignored_on_http404(time, drive: DriveSource, config: Config, server: SimulationServer, backup_helper: BackupHelper, interceptor: RequestInterceptor): from_backup, data = await backup_helper.createFile() # Configure the upload to fail interceptor.setError(URL_MATCH_UPLOAD_PROGRESS, 500) with pytest.raises(GoogleInternalError): await drive.save(from_backup, data) # Verify a requst was made to start the upload assert server.wasUrlRequested(URL_START_UPLOAD) location = drive.drivebackend.last_attempt_location assert location is not None server.urls.clear() interceptor.clear() data.position(0) interceptor.setError(location, 404) with pytest.raises(GoogleUnexpectedError): await drive.save(from_backup, data) assert interceptor.urlWasCalled(URL(location).path) data.position(0) # Location should have been reset, and another attempt should succeed using a new url. assert drive.drivebackend.last_attempt_location is None await drive.save(from_backup, data) @pytest.mark.asyncio async def test_resume_session_ignored_on_http410(time, drive: DriveSource, config: Config, server: SimulationServer, backup_helper: BackupHelper, interceptor: RequestInterceptor): from_backup, data = await backup_helper.createFile() # Configure the upload to fail interceptor.setError(URL_MATCH_UPLOAD_PROGRESS, 500) with pytest.raises(GoogleInternalError): await drive.save(from_backup, data) # Verify a requst was made to start the upload assert server.wasUrlRequested(URL_START_UPLOAD) location = drive.drivebackend.last_attempt_location assert location is not None server.urls.clear() interceptor.clear() data.position(0) data.position(0) interceptor.setError(location, 410) with pytest.raises(GoogleUnexpectedError): await drive.save(from_backup, data) assert interceptor.urlWasCalled(URL(location).path) # Location should have been reset, and anohter attempt should succeed using a new url. assert drive.drivebackend.last_attempt_location is None await drive.save(from_backup, data) @pytest.mark.asyncio async def test_resume_session_reused_on_http408(time, drive: DriveSource, config: Config, server: SimulationServer, backup_helper: BackupHelper, interceptor: RequestInterceptor): from_backup, data = await backup_helper.createFile() # Configure the upload to fail interceptor.setError(URL_MATCH_UPLOAD_PROGRESS, 408) with pytest.raises(GoogleTimeoutError): await drive.save(from_backup, data) # Verify a requst was made to start the upload assert server.wasUrlRequested(URL_START_UPLOAD) location = drive.drivebackend.last_attempt_location assert location is not None server.urls.clear() interceptor.clear() data.position(0) await drive.save(from_backup, data) assert interceptor.urlWasCalled(URL(location).path) @pytest.mark.asyncio async def test_shared_drive_manager(drive: DriveSource, time: FakeTime, folder_finder: FolderFinder, backup_helper: BackupHelper, drive_requests: DriveRequests): # Make a shared drive folder folder_metadata = { 'name': "Shared Drive", 'mimeType': FOLDER_MIME_TYPE, 'driveId': "test_shared_drive_id", 'appProperties': { "backup_folder": "true", }, } shared_drive_folder_id = (await drive.drivebackend.createFolder(folder_metadata))['id'] await folder_finder.save(shared_drive_folder_id) # Save a backup from_backup, data = await backup_helper.createFile() backup = await drive.save(from_backup, data) assert len(await drive.get()) == 1 from_backup.addSource(backup) # Delete the backup, and verify it was deleted instead of trashed await drive.delete(from_backup) assert len(await drive.get()) == 0 with pytest.raises(ClientResponseError) as exc: await drive_requests.get(backup.id()) assert exc.value.code == 404 @pytest.mark.asyncio async def test_shared_drive_content_manager(drive: DriveSource, time: FakeTime, folder_finder: FolderFinder, backup_helper: BackupHelper, drive_requests: DriveRequests): # Make a shared drive folder where the user has capabilities consistent with a "content manager" role. folder_metadata = { 'name': "Shared Drive", 'mimeType': FOLDER_MIME_TYPE, 'driveId': "test_shared_drive_id", 'appProperties': { "backup_folder": "true", }, 'capabilities': { 'canDeleteChildren': False, 'canTrashChildren': True, 'canDelete': False, 'canTrash': True, } } shared_drive_folder_id = (await drive.drivebackend.createFolder(folder_metadata))['id'] await folder_finder.save(shared_drive_folder_id) # Save a backup from_backup, data = await backup_helper.createFile() backup = await drive.save(from_backup, data) assert len(await drive.get()) == 1 from_backup.addSource(backup) # Delete the backup, and verify it was onyl trashed await drive.delete(from_backup) assert len(await drive.get()) == 0 assert (await drive_requests.get(backup.id()))['trashed'] @pytest.mark.asyncio async def test_note(drive: DriveSource, time: FakeTime, folder_finder: FolderFinder, backup_helper: BackupHelper, drive_requests: DriveRequests): from_backup, data = await backup_helper.createFile() backup = await drive.save(from_backup, data) assert backup.note() is None full = DummyBackup(backup.name(), backup.date(), backup.size(), backup.slug(), "dummy") full.addSource(backup) await drive.note(full, "new note") assert backup.note() == "new note" await drive.note(full, None) assert backup.note() is None @pytest.mark.asyncio async def test_note_truncation(drive: DriveSource, time: FakeTime, folder_finder: FolderFinder, backup_helper: BackupHelper, drive_requests: DriveRequests): from_backup, data = await backup_helper.createFile() backup = await drive.save(from_backup, data) assert backup.note() is None full = DummyBackup(backup.name(), backup.date(), backup.size(), backup.slug(), "dummy") full.addSource(backup) long_note = "".join(["䷷" for x in range(500)]) await drive.note(full, long_note) assert backup.note() == "".join(["䷷" for x in range(38)]) @pytest.mark.asyncio async def test_note_creation(drive: DriveSource, time: FakeTime, folder_finder: FolderFinder, backup_helper: BackupHelper, drive_requests: DriveRequests): from_backup, data = await backup_helper.createFile(note="test") backup = await drive.save(from_backup, data) assert backup.note() == "test"