Files

1049 lines
40 KiB
Python

import os
import json
from time import sleep
import pytest
import asyncio
from yarl import URL
from aiohttp.client_exceptions import ClientResponseError
from backup.config import Config, Setting
from dev.simulationserver import SimulationServer
from dev.simulated_google import SimulatedGoogle, URL_MATCH_UPLOAD_PROGRESS, URL_MATCH_FILE
from dev.request_interceptor import RequestInterceptor
from backup.drive import DriveSource, FolderFinder, DriveRequests, RETRY_SESSION_ATTEMPTS, UPLOAD_SESSION_EXPIRATION_DURATION, URL_START_UPLOAD
from backup.drive.driverequests import (BASE_CHUNK_SIZE, CHUNK_UPLOAD_TARGET_SECONDS)
from backup.drive.drivesource import FOLDER_MIME_TYPE
from backup.exceptions import (BackupFolderInaccessible, BackupFolderMissingError,
DriveQuotaExceeded, ExistingBackupFolderError,
GoogleCantConnect, GoogleCredentialsExpired,
GoogleInternalError, GoogleUnexpectedError,
GoogleSessionError, GoogleTimeoutError, CredRefreshMyError, CredRefreshGoogleError)
from backup.creds import Creds
from backup.model import DriveBackup, DummyBackup
from .faketime import FakeTime
from .helpers import compareStreams, createBackupTar
RETRY_EXHAUSTION_SLEEPS = [2, 4, 8, 16, 32]
class BackupHelper():
def __init__(self, uploader, time):
self.time = time
self.uploader = uploader
async def createFile(self, size=1024 * 1024 * 2, slug="testslug", name="Test Name", note=None):
from_backup: DummyBackup = DummyBackup(
name, self.time.toUtc(self.time.local(1985, 12, 6)), "fake source", slug, note=note, size=size)
data = await self.uploader.upload(createBackupTar(slug, name, self.time.now(), size))
return from_backup, data
@pytest.fixture
def backup_helper(uploader, time):
return BackupHelper(uploader, time)
@pytest.mark.asyncio
async def test_sync_empty(drive) -> None:
assert len(await drive.get()) == 0
@pytest.mark.asyncio
async def test_CRUD(backup_helper, drive, time, session) -> None:
from_backup, data = await backup_helper.createFile()
backup: DriveBackup = await drive.save(from_backup, data)
assert backup.name() == "Test Name"
assert backup.date() == time.local(1985, 12, 6)
assert not backup.retained()
assert backup.size() == data.size()
assert backup.slug() == "testslug"
assert len(backup.id()) > 0
assert backup.backupType() == from_backup.backupType()
assert backup.protected() == from_backup.protected()
from_backup.addSource(backup)
# downlaod the item, its bytes should match up
download = await drive.read(from_backup)
data.position(0)
await compareStreams(data, download)
# read the item, make sure its data matches up
backups = await drive.get()
assert len(backups) == 1
backup = backups[from_backup.slug()]
assert backup.name() == "Test Name"
assert backup.date() == time.local(1985, 12, 6)
assert not backup.retained()
assert backup.size() == data.size()
assert backup.slug() == "testslug"
assert len(backup.id()) > 0
assert backup.backupType() == from_backup.backupType()
assert backup.protected() == from_backup.protected()
# update retention
assert not backup.retained()
await drive.retain(from_backup, True)
assert (await drive.get())[from_backup.slug()].retained()
await drive.retain(from_backup, False)
assert not (await drive.get())[from_backup.slug()].retained()
# Delete the item, make sure its gone
await drive.delete(from_backup)
backups = await drive.get()
assert len(backups) == 0
@pytest.mark.asyncio
async def test_folder_creation(drive, time, config):
assert len(await drive.get()) == 0
folderId = await drive.getFolderId()
assert len(folderId) > 0
item = await drive.drivebackend.get(folderId)
assert not item["trashed"]
assert item["name"] == "Home Assistant Backups"
assert item["mimeType"] == FOLDER_MIME_TYPE
assert item["appProperties"]['backup_folder'] == 'true'
# sync again, assert the folder is reused
time.advanceDay()
os.remove(config.get(Setting.FOLDER_FILE_PATH))
assert len(await drive.get()) == 0
assert await drive.getFolderId() == folderId
# trash the folder, assert we create a new one on sync
await drive.drivebackend.update(folderId, {"trashed": True})
assert (await drive.drivebackend.get(folderId))["trashed"] is True
assert len(await drive.get()) == 0
time.advanceDay()
assert await drive.getFolderId() != folderId
# delete the folder, assert we create a new one
folderId = await drive.getFolderId()
await drive.drivebackend.delete(folderId)
time.advanceDay()
assert len(await drive.get()) == 0
time.advanceDay()
assert await drive.getFolderId() != folderId
@pytest.mark.asyncio
async def test_folder_selection(drive, time):
folder_metadata = {
'name': "Junk Data",
'mimeType': FOLDER_MIME_TYPE,
'appProperties': {
"backup_folder": "true",
},
}
# create two fodlers at different times
id_old = (await drive.drivebackend.createFolder(folder_metadata))['id']
sleep(2)
id_new = (await drive.drivebackend.createFolder(folder_metadata))['id']
# Verify we use the newest
await drive.get()
assert await drive.getFolderId() == id_new
assert await drive.getFolderId() != id_old
@pytest.mark.asyncio
async def test_bad_auth_creds(drive: DriveSource, time):
drive.drivebackend.creds._refresh_token = "not_allowed"
with pytest.raises(GoogleCredentialsExpired):
await drive.get()
assert time.sleeps == []
@pytest.mark.asyncio
async def test_out_of_space(backup_helper, drive: DriveSource, google: SimulatedGoogle):
google.setDriveSpaceAvailable(100)
from_backup, data = await backup_helper.createFile()
await drive.get()
with pytest.raises(DriveQuotaExceeded) as e:
await drive.save(from_backup, data)
assert e.value.data() == {
'backup_size': '2 MB',
'free_space': '100 B'
}
@pytest.mark.asyncio
async def test_out_of_space_error(backup_helper, drive: DriveSource, google: SimulatedGoogle):
google.setDriveSpaceAvailable(100)
from_backup, data = await backup_helper.createFile()
# Without calling drive.get(), the parser has no info about drive's space usage
with pytest.raises(DriveQuotaExceeded) as e:
await drive.save(from_backup, data)
assert e.value.data() == {
'backup_size': 'Error',
'free_space': 'Error'
}
@pytest.mark.asyncio
async def test_drive_dns_resolution_error(drive: DriveSource, config: Config, time):
config.override(Setting.DRIVE_URL,
"http://fsdfsdasdasdf.saasdsdfsdfsd.com:2567")
with pytest.raises(GoogleCantConnect):
await drive.get()
assert time.sleeps == []
@pytest.mark.asyncio
async def test_drive_connect_error(drive: DriveSource, config: Config, time):
config.override(Setting.DRIVE_URL, "http://localhost:1034")
with pytest.raises(GoogleCantConnect):
await drive.get()
assert time.sleeps == []
@pytest.mark.asyncio
async def test_upload_session_expired(drive, time, backup_helper, interceptor: RequestInterceptor):
from_backup, data = await backup_helper.createFile()
interceptor.setError(URL_MATCH_UPLOAD_PROGRESS, status=404)
with pytest.raises(GoogleSessionError):
await drive.save(from_backup, data)
assert time.sleeps == []
@pytest.mark.asyncio
async def test_upload_resume(drive: DriveSource, time, backup_helper: BackupHelper, google: SimulatedGoogle, interceptor: RequestInterceptor):
from_backup, data = await backup_helper.createFile()
interceptor.setError(URL_MATCH_UPLOAD_PROGRESS, fail_after=1, status=500)
# Upload, which will fail
with pytest.raises(GoogleInternalError):
await drive.save(from_backup, data)
# Verify we uploaded one chunk
assert google.chunks == [BASE_CHUNK_SIZE]
# Retry the upload, which shoudl now pass
interceptor.clear()
data.position(0)
drive_backup = await drive.save(from_backup, data)
from_backup.addSource(drive_backup)
assert google.chunks == [BASE_CHUNK_SIZE,
BASE_CHUNK_SIZE, (data.size()) - BASE_CHUNK_SIZE * 2]
# Verify the data is correct
data.position(0)
await compareStreams(data, await drive.read(from_backup))
def test_chunk_size(drive: DriveSource, config: Config):
max = config.get(Setting.MAXIMUM_UPLOAD_CHUNK_BYTES) / BASE_CHUNK_SIZE
assert drive.drivebackend._getNextChunkSize(
1000000000, 0) == max
assert drive.drivebackend._getNextChunkSize(
1, CHUNK_UPLOAD_TARGET_SECONDS) == 1
assert drive.drivebackend._getNextChunkSize(
1000000000, CHUNK_UPLOAD_TARGET_SECONDS) == max
assert drive.drivebackend._getNextChunkSize(
1, CHUNK_UPLOAD_TARGET_SECONDS) == 1
assert drive.drivebackend._getNextChunkSize(
1, 1) == CHUNK_UPLOAD_TARGET_SECONDS
assert drive.drivebackend._getNextChunkSize(
1, 1.01) == CHUNK_UPLOAD_TARGET_SECONDS - 1
def test_chunk_size_limits(drive: DriveSource, config: Config):
config.override(Setting.MAXIMUM_UPLOAD_CHUNK_BYTES, 1)
assert drive.drivebackend._getNextChunkSize(1000000000, 0) == 1
assert drive.drivebackend._getNextChunkSize(1, 1000000) == 1
config.override(Setting.MAXIMUM_UPLOAD_CHUNK_BYTES, BASE_CHUNK_SIZE * 1.5)
assert drive.drivebackend._getNextChunkSize(1000000000, 0) == 1
assert drive.drivebackend._getNextChunkSize(1, 1000000) == 1
config.override(Setting.MAXIMUM_UPLOAD_CHUNK_BYTES, BASE_CHUNK_SIZE * 3.5)
assert drive.drivebackend._getNextChunkSize(1000000000, 0) == 3
assert drive.drivebackend._getNextChunkSize(1, 1000000) == 1
@pytest.mark.asyncio
async def test_working_through_upload(drive: DriveSource, server: SimulationServer, backup_helper: BackupHelper, interceptor: RequestInterceptor):
assert not drive.isWorking()
# Let a single chunk upload, then wait
matcher = interceptor.setWaiter(URL_MATCH_UPLOAD_PROGRESS, attempts=1)
from_backup, data = await backup_helper.createFile(size=1024 * 1024 * 10)
save_task = asyncio.create_task(drive.save(from_backup, data))
await matcher.waitForCall()
assert drive.isWorking()
# let it complete
matcher.clear()
await save_task
assert not drive.isWorking()
@pytest.mark.asyncio
async def test_drive_timeout(drive, config, time: FakeTime):
# Ensure we have credentials
await drive.get()
config.override(Setting.GOOGLE_DRIVE_TIMEOUT_SECONDS, 0.000001)
with pytest.raises(GoogleTimeoutError):
await drive.get()
assert time.sleeps == []
@pytest.mark.asyncio
async def test_resume_upload_attempts_exhausted(drive: DriveSource, time, backup_helper, interceptor: RequestInterceptor, google: SimulatedGoogle):
# Allow an upload to update one chunk and then fail.
from_backup, data = await backup_helper.createFile()
interceptor.setError(URL_MATCH_UPLOAD_PROGRESS, fail_after=1, status=500)
with pytest.raises(GoogleInternalError):
await drive.save(from_backup, data)
assert google.chunks == [BASE_CHUNK_SIZE]
# Verify we have a cached location
assert drive.drivebackend.last_attempt_location is not None
assert drive.drivebackend.last_attempt_count == 1
last_location = drive.drivebackend.last_attempt_location
for x in range(1, RETRY_SESSION_ATTEMPTS):
data.position(0)
with pytest.raises(GoogleInternalError):
await drive.save(from_backup, data)
assert drive.drivebackend.last_attempt_count == x + 1
# We should still be using the same location url
assert drive.drivebackend.last_attempt_location == last_location
# Another attempt should use another location url
with pytest.raises(GoogleInternalError):
data.position(0)
await drive.save(from_backup, data)
assert drive.drivebackend.last_attempt_count == 0
assert drive.drivebackend.last_attempt_location is not None
assert drive.drivebackend.last_attempt_location != last_location
# Now let it succeed
interceptor.clear()
data.position(0)
drive_backup = await drive.save(from_backup, data)
from_backup.addSource(drive_backup)
# And verify the bytes are correct
data.position(0)
await compareStreams(data, await drive.read(from_backup))
@pytest.mark.asyncio
async def test_google_internal_error(drive, server, time: FakeTime, interceptor: RequestInterceptor):
interceptor.setError(URL_MATCH_FILE, 500)
with pytest.raises(GoogleInternalError):
await drive.get()
assert time.sleeps == RETRY_EXHAUSTION_SLEEPS
time.clearSleeps()
interceptor.clear()
interceptor.setError(URL_MATCH_FILE, 500)
with pytest.raises(GoogleInternalError):
await drive.get()
assert time.sleeps == RETRY_EXHAUSTION_SLEEPS
@pytest.mark.asyncio
async def test_check_time(drive: DriveSource, drive_creds: Creds):
assert not await drive.check()
drive.saveCreds(drive_creds)
assert await drive.check()
@pytest.mark.asyncio
async def test_disable_upload(drive: DriveSource, config: Config):
assert drive.upload()
config.override(Setting.ENABLE_DRIVE_UPLOAD, False)
assert not drive.upload()
@pytest.mark.asyncio
async def test_resume_session_abandoned_on_http4XX(time, drive: DriveSource, config: Config, server, backup_helper, interceptor: RequestInterceptor):
from_backup, data = await backup_helper.createFile()
# Configure the upload to fail after the first upload chunk
interceptor.setError(URL_MATCH_UPLOAD_PROGRESS, 402, 1)
with pytest.raises(ClientResponseError):
await drive.save(from_backup, data)
# Verify a requst was made to start the upload but not cached
assert server.wasUrlRequested(
"/upload/drive/v3/files/?uploadType=resumable&supportsAllDrives=true")
assert drive.drivebackend.last_attempt_count == 1
assert drive.drivebackend.last_attempt_location is None
assert drive.drivebackend.last_attempt_metadata is None
# upload again, which should retry
server.urls.clear()
interceptor.clear()
data.position(0)
backup = await drive.save(from_backup, data)
assert server.wasUrlRequested(URL_START_UPLOAD)
# Verify the uploaded bytes are identical
from_backup.addSource(backup)
download = await drive.read(from_backup)
data.position(0)
await compareStreams(data, download)
@pytest.mark.asyncio
async def test_resume_session_abandoned_after_a_long_time(time: FakeTime, drive: DriveSource, config: Config, server: SimulationServer, backup_helper, interceptor: RequestInterceptor):
from_backup, data = await backup_helper.createFile()
# Configure the upload to fail after the first upload chunk
interceptor.setError(URL_MATCH_UPLOAD_PROGRESS, 501, 1)
with pytest.raises(ClientResponseError):
await drive.save(from_backup, data)
# Verify it reuses the session a few times
assert server.wasUrlRequested(URL_START_UPLOAD)
assert drive.drivebackend.last_attempt_count == 1
assert drive.drivebackend.last_attempt_location is not None
assert drive.drivebackend.last_attempt_metadata is not None
data.position(0)
with pytest.raises(ClientResponseError):
await drive.save(from_backup, data)
assert drive.drivebackend.last_attempt_count == 2
assert drive.drivebackend.last_attempt_location is not None
assert drive.drivebackend.last_attempt_metadata is not None
last_location = drive.drivebackend.last_attempt_location
# Fast forward a lot, then verify the session is restarted
server.urls.clear()
interceptor.clear()
time.advance(duration=UPLOAD_SESSION_EXPIRATION_DURATION)
data.position(0)
await drive.save(from_backup, data)
assert interceptor.urlWasCalled(URL_START_UPLOAD)
assert not interceptor.urlWasCalled(last_location)
@pytest.mark.asyncio
async def test_chunk_upload_resets_attempt_counter(time: FakeTime, drive: DriveSource, config: Config, server: SimulationServer, backup_helper: BackupHelper, interceptor: RequestInterceptor):
from_backup, data = await backup_helper.createFile(size=1024 * 1024 * 10)
# Configure the upload to fail after the first upload chunk
interceptor.setError(URL_MATCH_UPLOAD_PROGRESS, 501, 1)
with pytest.raises(ClientResponseError):
await drive.save(from_backup, data)
data.position(0)
with pytest.raises(ClientResponseError):
await drive.save(from_backup, data)
# Verify the session was started
assert interceptor.urlWasCalled(URL_START_UPLOAD)
assert interceptor.urlWasCalled(URL_MATCH_UPLOAD_PROGRESS)
assert drive.drivebackend.last_attempt_count == 2
location = drive.drivebackend.last_attempt_location
assert location is not None
# Allow one more chunk to succeed
interceptor.clear()
interceptor.setError(URL_MATCH_UPLOAD_PROGRESS, 501, 2)
data.position(0)
with pytest.raises(ClientResponseError):
await drive.save(from_backup, data)
# Verify the session was reused and the attempt counter was reset
assert not interceptor.urlWasCalled(URL_START_UPLOAD)
assert interceptor.urlWasCalled(URL_MATCH_UPLOAD_PROGRESS)
assert interceptor.urlWasCalled(URL(location).path)
assert drive.drivebackend.last_attempt_count == 1
assert drive.drivebackend.last_attempt_location == location
@pytest.mark.asyncio
async def test_resume_session_reused_on_http5XX(time, drive: DriveSource, config: Config, server, backup_helper, interceptor: RequestInterceptor):
await verify_upload_resumed(time, drive, config, server, interceptor, 550, backup_helper)
@pytest.mark.asyncio
async def test_resume_session_reused_abonded_after_retries(time, drive: DriveSource, config: Config, server: SimulationServer, backup_helper, interceptor: RequestInterceptor):
from_backup, data = await backup_helper.createFile()
# Configure the upload to fail after the first upload chunk
interceptor.setError(URL_MATCH_UPLOAD_PROGRESS, 501, 1)
with pytest.raises(ClientResponseError):
await drive.save(from_backup, data)
# Verify a requst was made to start the upload but not cached
assert server.wasUrlRequested(URL_START_UPLOAD)
assert drive.drivebackend.last_attempt_count == 1
assert drive.drivebackend.last_attempt_location is not None
assert drive.drivebackend.last_attempt_metadata is not None
last_location = drive.drivebackend.last_attempt_location
for x in range(1, RETRY_SESSION_ATTEMPTS):
server.urls.clear()
interceptor.clear()
interceptor.setError(URL_MATCH_UPLOAD_PROGRESS, 501)
data.position(0)
with pytest.raises(ClientResponseError):
await drive.save(from_backup, data)
assert not server.wasUrlRequested(URL_START_UPLOAD)
assert server.wasUrlRequested(last_location)
assert drive.drivebackend.last_attempt_count == x + 1
assert drive.drivebackend.last_attempt_location is last_location
assert drive.drivebackend.last_attempt_metadata is not None
# Next attempt should give up and restart the upload
server.urls.clear()
interceptor.clear()
interceptor.setError(URL_MATCH_UPLOAD_PROGRESS, 501, 1)
data.position(0)
with pytest.raises(ClientResponseError):
await drive.save(from_backup, data)
assert server.wasUrlRequested(URL_START_UPLOAD)
assert not server.wasUrlRequested(last_location)
assert drive.drivebackend.last_attempt_count == 1
# upload again, which should retry
server.urls.clear()
interceptor.clear()
data.position(0)
backup = await drive.save(from_backup, data)
assert not server.wasUrlRequested(URL_START_UPLOAD)
# Verify the uploaded bytes are identical
from_backup.addSource(backup)
download = await drive.read(from_backup)
data.position(0)
await compareStreams(data, download)
async def verify_upload_resumed(time, drive: DriveSource, config: Config, server: SimulationServer, interceptor: RequestInterceptor, status, backup_helper, expected=ClientResponseError):
from_backup, data = await backup_helper.createFile()
# Configure the upload to fail after the first upload chunk
interceptor.setError(URL_MATCH_UPLOAD_PROGRESS, status, 1)
with pytest.raises(expected):
await drive.save(from_backup, data)
# Verify a requst was made to start the upload
assert server.wasUrlRequested(URL_START_UPLOAD)
assert drive.drivebackend.last_attempt_location is not None
assert drive.drivebackend.last_attempt_metadata is not None
last_location = drive.drivebackend.last_attempt_location
# Retry the upload and let is succeed
server.urls.clear()
interceptor.clear()
data.position(0)
backup = await drive.save(from_backup, data)
# We shoudl nto see the upload "initialize" url
assert not server.wasUrlRequested(URL_START_UPLOAD)
# We should see the last location url (which has a unique token) reused to resume the upload
assert server.wasUrlRequested(last_location)
# The saved metadata should be cleared out.
assert drive.drivebackend.last_attempt_count == 1
assert drive.drivebackend.last_attempt_location is None
assert drive.drivebackend.last_attempt_metadata is None
# Verify the uploaded bytes are identical
from_backup.addSource(backup)
download = await drive.read(from_backup)
data.position(0)
await compareStreams(data, download)
@pytest.mark.asyncio
async def test_recreate_folder_when_deleted(time, drive: DriveSource, config: Config, backup_helper, folder_finder: FolderFinder):
await drive.get()
id = await drive.getFolderId()
await drive.drivebackend.delete(id)
assert len(await drive.get()) == 0
assert id != await drive.getFolderId()
@pytest.mark.asyncio
async def test_recreate_folder_when_losing_permissions(time, drive: DriveSource, config: Config, backup_helper, google: SimulatedGoogle):
await drive.get()
id = await drive.getFolderId()
google.lostPermission.append(id)
assert len(await drive.get()) == 0
assert id != await drive.getFolderId()
@pytest.mark.asyncio
async def test_folder_missing_on_upload(time, drive: DriveSource, config: Config, backup_helper):
# Make the folder
await drive.get()
# Require a specified folder so we don't query
config.override(Setting.SPECIFY_BACKUP_FOLDER, True)
# Delete the folder
await drive.drivebackend.delete(await drive.getFolderId())
# Then try to make one
from_backup, data = await backup_helper.createFile()
# Configure the upload to fail after the first upload chunk
with pytest.raises(BackupFolderInaccessible):
await drive.save(from_backup, data)
@pytest.mark.asyncio
async def test_folder_error_on_upload_lost_permission(time, drive: DriveSource, config: Config, google: SimulatedGoogle, backup_helper, session):
# Make the folder
await drive.get()
# Require a specified folder so we don't query
config.override(Setting.SPECIFY_BACKUP_FOLDER, True)
# Make the folder inaccessible
google.lostPermission.append(await drive.getFolderId())
time.advanceDay()
# Fail to upload
with pytest.raises(BackupFolderInaccessible):
await drive.save(*await backup_helper.createFile())
@pytest.mark.asyncio
async def test_folder_error_on_upload_lost_permission_custom_client(time, drive: DriveSource, config: Config, google: SimulatedGoogle, backup_helper, session):
# Make the folder
await drive.get()
# Require a specified folder so we don't query
config.override(Setting.SPECIFY_BACKUP_FOLDER, True)
google._client_id_hack = config.get(Setting.DEFAULT_DRIVE_CLIENT_ID)
config.override(Setting.DEFAULT_DRIVE_CLIENT_ID, "something-else")
# Make the folder inaccessible
google.lostPermission.append(await drive.getFolderId())
time.advanceDay()
# Fail to upload
with pytest.raises(BackupFolderInaccessible):
await drive.save(*await backup_helper.createFile())
@pytest.mark.asyncio
async def test_folder_error_on_query_lost_permission(time, drive: DriveSource, config: Config, google: SimulatedGoogle):
# Make the folder
await drive.get()
# Require a specified folder so we don't query
config.override(Setting.SPECIFY_BACKUP_FOLDER, "true")
config.override(Setting.DEFAULT_DRIVE_CLIENT_ID, "something")
# Make the folder inaccessible
google.lostPermission.append(await drive.getFolderId())
# It shoudl fail!
with pytest.raises(BackupFolderInaccessible):
await drive.get()
@pytest.mark.asyncio
async def test_folder_error_on_query_deleted(time, drive: DriveSource, config: Config, server):
# Make the folder
await drive.get()
# Require a specified folder so we don't query
config.override(Setting.SPECIFY_BACKUP_FOLDER, "true")
config.override(Setting.DEFAULT_DRIVE_CLIENT_ID, "something")
# Delete the folder
await drive.drivebackend.delete(await drive.getFolderId())
# It should fail!
with pytest.raises(BackupFolderInaccessible):
await drive.get()
@pytest.mark.asyncio
async def test_backup_folder_not_specified(time, drive: DriveSource, config: Config, server, backup_helper):
config.override(Setting.SPECIFY_BACKUP_FOLDER, "true")
with pytest.raises(BackupFolderMissingError):
await drive.get()
from_backup, data = await backup_helper.createFile()
with pytest.raises(BackupFolderMissingError):
await drive.save(from_backup, data)
config.override(Setting.DEFAULT_DRIVE_CLIENT_ID, "something")
with pytest.raises(BackupFolderMissingError):
await drive.get()
with pytest.raises(BackupFolderMissingError):
await drive.save(from_backup, data)
@pytest.mark.asyncio
async def test_folder_invalid_when_specified(time, drive: DriveSource, config: Config, server):
await drive.get()
config.override(Setting.SPECIFY_BACKUP_FOLDER, "true")
await drive.drivebackend.update(await drive.getFolderId(), {"trashed": True})
time.advanceDay()
with pytest.raises(BackupFolderInaccessible):
await drive.get()
@pytest.mark.asyncio
async def test_no_folder_when_required(time, drive: DriveSource, config: Config):
config.override(Setting.SPECIFY_BACKUP_FOLDER, "true")
with pytest.raises(BackupFolderMissingError):
await drive.get()
@pytest.mark.asyncio
async def test_existing_folder_already_exists(time, drive: DriveSource, config: Config, folder_finder: FolderFinder):
await drive.get()
drive.checkBeforeChanges()
# Reset folder, try again
folder_finder.reset()
await drive.get()
with pytest.raises(ExistingBackupFolderError):
drive.checkBeforeChanges()
@pytest.mark.asyncio
async def test_existing_resolved_use_existing(time, drive: DriveSource, config: Config, folder_finder: FolderFinder):
await drive.get()
drive.checkBeforeChanges()
folder_id = await drive.getFolderId()
# Reset folder, try again
folder_finder.reset()
await drive.get()
with pytest.raises(ExistingBackupFolderError):
drive.checkBeforeChanges()
folder_finder.resolveExisting(True)
await drive.get()
drive.checkBeforeChanges()
assert await drive.getFolderId() == folder_id
@pytest.mark.asyncio
async def test_existing_resolved_create_new(time, drive: DriveSource, config: Config, folder_finder: FolderFinder):
await drive.get()
drive.checkBeforeChanges()
folder_id = await drive.getFolderId()
# Reset folder, try again
folder_finder.reset()
await drive.get()
with pytest.raises(ExistingBackupFolderError):
drive.checkBeforeChanges()
folder_finder.resolveExisting(False)
await drive.get()
drive.checkBeforeChanges()
assert await drive.getFolderId() != folder_id
@pytest.mark.asyncio
async def test_cred_refresh_with_secret(drive: DriveSource, google: SimulatedGoogle, time: FakeTime, config: Config):
google.resetDriveAuth()
with open(config.get(Setting.CREDENTIALS_FILE_PATH), "w") as f:
creds = google.creds()
creds._secret = config.get(Setting.DEFAULT_DRIVE_CLIENT_SECRET)
json.dump(creds.serialize(), f)
drive.drivebackend.tryLoadCredentials()
await drive.get()
old_creds = drive.drivebackend.creds
# valid creds should be reused
await drive.get()
assert old_creds.access_token == drive.drivebackend.creds.access_token
# then refreshed when they expire
time.advanceDay()
await drive.get()
assert old_creds.access_token != drive.drivebackend.creds.access_token
# verify the client_secret is kept
with open(config.get(Setting.CREDENTIALS_FILE_PATH)) as f:
assert "client_secret" in json.load(f)
@pytest.mark.asyncio
async def test_cred_refresh_no_secret(drive: DriveSource, google: SimulatedGoogle, time: FakeTime, config: Config):
drive.saveCreds(google.creds())
await drive.get()
old_creds = drive.drivebackend.creds
await drive.get()
assert old_creds.access_token == drive.drivebackend.creds.access_token
time.advanceDay()
await drive.get()
assert old_creds.access_token != drive.drivebackend.creds.access_token
with open(config.get(Setting.CREDENTIALS_FILE_PATH)) as f:
assert "client_secret" not in json.load(f)
@pytest.mark.asyncio
async def test_cred_refresh_upgrade_default_client(drive: DriveSource, server: SimulationServer, time: FakeTime, config: Config):
return
# TODO: Enable this when we start removing the default client_secret
config.override(Setting.DEFAULT_DRIVE_CLIENT_ID, server.getSetting("drive_client_id"))
creds = server.getCurrentCreds()
creds_with_secret = server.getCurrentCreds()
creds_with_secret._secret = server.getSetting("drive_client_secret")
with open(config.get(Setting.CREDENTIALS_FILE_PATH), "w") as f:
json.dump(creds_with_secret.serialize(), f)
# reload the creds
drive.drivebackend.tryLoadCredentials()
# Verify the "client secret" was removed
with open(config.get(Setting.CREDENTIALS_FILE_PATH)) as f:
saved_creds = json.load(f)
assert saved_creds == creds.serialize()
await drive.get()
old_creds = drive.drivebackend.cred_bearer
await drive.get()
assert old_creds == drive.drivebackend.cred_bearer
time.advanceDay()
await drive.get()
assert old_creds != drive.drivebackend.cred_bearer
@pytest.mark.asyncio
async def test_cant_reach_refresh_server(drive: DriveSource, server: SimulationServer, config: Config, time):
config.override(Setting.TOKEN_SERVER_HOSTS, "http://lkasdpoiwehjhcty.com")
drive.drivebackend.creds._secret = None
time.advanceDay()
with pytest.raises(CredRefreshMyError) as error:
await drive.get()
assert error.value.data() == {"reason": "Couldn't communicate with lkasdpoiwehjhcty.com"}
@pytest.mark.asyncio
async def test_refresh_problem_with_google(drive: DriveSource, interceptor: RequestInterceptor, config: Config, time):
time.advanceDay()
interceptor.setError(".*/oauth2/v4/token.*", status=510)
drive.drivebackend.creds._secret = None
with pytest.raises(CredRefreshGoogleError) as error:
await drive.get()
assert error.value.data() == {"from_google": "Google returned HTTP 510"}
@pytest.mark.asyncio
async def test_ignore_trashed_backups(time, drive: DriveSource, config: Config, server, backup_helper):
backup = await backup_helper.createFile()
drive_backup = await drive.save(*backup)
assert len(await drive.get()) == 1
await drive.drivebackend.update(drive_backup.id(), {"trashed": True})
assert len(await drive.get()) == 0
@pytest.mark.asyncio
async def test_download_timeout(time, drive: DriveSource, config: Config, interceptor: RequestInterceptor, backup_helper):
config.override(Setting.DOWNLOAD_TIMEOUT_SECONDS, 0.1)
from_backup, data = await backup_helper.createFile()
backup = await drive.save(from_backup, data)
# Verify the uploaded bytes are identical
from_backup.addSource(backup)
interceptor.setSleep(URL_MATCH_FILE, sleep=100)
download = await drive.read(from_backup)
data.position(0)
with pytest.raises(GoogleTimeoutError):
await compareStreams(data, download)
@pytest.mark.asyncio
async def test_resume_session_ignored_on_http404(time, drive: DriveSource, config: Config, server: SimulationServer, backup_helper: BackupHelper, interceptor: RequestInterceptor):
from_backup, data = await backup_helper.createFile()
# Configure the upload to fail
interceptor.setError(URL_MATCH_UPLOAD_PROGRESS, 500)
with pytest.raises(GoogleInternalError):
await drive.save(from_backup, data)
# Verify a requst was made to start the upload
assert server.wasUrlRequested(URL_START_UPLOAD)
location = drive.drivebackend.last_attempt_location
assert location is not None
server.urls.clear()
interceptor.clear()
data.position(0)
interceptor.setError(location, 404)
with pytest.raises(GoogleUnexpectedError):
await drive.save(from_backup, data)
assert interceptor.urlWasCalled(URL(location).path)
data.position(0)
# Location should have been reset, and another attempt should succeed using a new url.
assert drive.drivebackend.last_attempt_location is None
await drive.save(from_backup, data)
@pytest.mark.asyncio
async def test_resume_session_ignored_on_http410(time, drive: DriveSource, config: Config, server: SimulationServer, backup_helper: BackupHelper, interceptor: RequestInterceptor):
from_backup, data = await backup_helper.createFile()
# Configure the upload to fail
interceptor.setError(URL_MATCH_UPLOAD_PROGRESS, 500)
with pytest.raises(GoogleInternalError):
await drive.save(from_backup, data)
# Verify a requst was made to start the upload
assert server.wasUrlRequested(URL_START_UPLOAD)
location = drive.drivebackend.last_attempt_location
assert location is not None
server.urls.clear()
interceptor.clear()
data.position(0)
data.position(0)
interceptor.setError(location, 410)
with pytest.raises(GoogleUnexpectedError):
await drive.save(from_backup, data)
assert interceptor.urlWasCalled(URL(location).path)
# Location should have been reset, and anohter attempt should succeed using a new url.
assert drive.drivebackend.last_attempt_location is None
await drive.save(from_backup, data)
@pytest.mark.asyncio
async def test_resume_session_reused_on_http408(time, drive: DriveSource, config: Config, server: SimulationServer, backup_helper: BackupHelper, interceptor: RequestInterceptor):
from_backup, data = await backup_helper.createFile()
# Configure the upload to fail
interceptor.setError(URL_MATCH_UPLOAD_PROGRESS, 408)
with pytest.raises(GoogleTimeoutError):
await drive.save(from_backup, data)
# Verify a requst was made to start the upload
assert server.wasUrlRequested(URL_START_UPLOAD)
location = drive.drivebackend.last_attempt_location
assert location is not None
server.urls.clear()
interceptor.clear()
data.position(0)
await drive.save(from_backup, data)
assert interceptor.urlWasCalled(URL(location).path)
@pytest.mark.asyncio
async def test_shared_drive_manager(drive: DriveSource, time: FakeTime, folder_finder: FolderFinder, backup_helper: BackupHelper, drive_requests: DriveRequests):
# Make a shared drive folder
folder_metadata = {
'name': "Shared Drive",
'mimeType': FOLDER_MIME_TYPE,
'driveId': "test_shared_drive_id",
'appProperties': {
"backup_folder": "true",
},
}
shared_drive_folder_id = (await drive.drivebackend.createFolder(folder_metadata))['id']
await folder_finder.save(shared_drive_folder_id)
# Save a backup
from_backup, data = await backup_helper.createFile()
backup = await drive.save(from_backup, data)
assert len(await drive.get()) == 1
from_backup.addSource(backup)
# Delete the backup, and verify it was deleted instead of trashed
await drive.delete(from_backup)
assert len(await drive.get()) == 0
with pytest.raises(ClientResponseError) as exc:
await drive_requests.get(backup.id())
assert exc.value.code == 404
@pytest.mark.asyncio
async def test_shared_drive_content_manager(drive: DriveSource, time: FakeTime, folder_finder: FolderFinder, backup_helper: BackupHelper, drive_requests: DriveRequests):
# Make a shared drive folder where the user has capabilities consistent with a "content manager" role.
folder_metadata = {
'name': "Shared Drive",
'mimeType': FOLDER_MIME_TYPE,
'driveId': "test_shared_drive_id",
'appProperties': {
"backup_folder": "true",
},
'capabilities': {
'canDeleteChildren': False,
'canTrashChildren': True,
'canDelete': False,
'canTrash': True,
}
}
shared_drive_folder_id = (await drive.drivebackend.createFolder(folder_metadata))['id']
await folder_finder.save(shared_drive_folder_id)
# Save a backup
from_backup, data = await backup_helper.createFile()
backup = await drive.save(from_backup, data)
assert len(await drive.get()) == 1
from_backup.addSource(backup)
# Delete the backup, and verify it was onyl trashed
await drive.delete(from_backup)
assert len(await drive.get()) == 0
assert (await drive_requests.get(backup.id()))['trashed']
@pytest.mark.asyncio
async def test_note(drive: DriveSource, time: FakeTime, folder_finder: FolderFinder, backup_helper: BackupHelper, drive_requests: DriveRequests):
from_backup, data = await backup_helper.createFile()
backup = await drive.save(from_backup, data)
assert backup.note() is None
full = DummyBackup(backup.name(), backup.date(),
backup.size(), backup.slug(), "dummy")
full.addSource(backup)
await drive.note(full, "new note")
assert backup.note() == "new note"
await drive.note(full, None)
assert backup.note() is None
@pytest.mark.asyncio
async def test_note_truncation(drive: DriveSource, time: FakeTime, folder_finder: FolderFinder, backup_helper: BackupHelper, drive_requests: DriveRequests):
from_backup, data = await backup_helper.createFile()
backup = await drive.save(from_backup, data)
assert backup.note() is None
full = DummyBackup(backup.name(), backup.date(),
backup.size(), backup.slug(), "dummy")
full.addSource(backup)
long_note = "".join(["" for x in range(500)])
await drive.note(full, long_note)
assert backup.note() == "".join(["" for x in range(38)])
@pytest.mark.asyncio
async def test_note_creation(drive: DriveSource, time: FakeTime, folder_finder: FolderFinder, backup_helper: BackupHelper, drive_requests: DriveRequests):
from_backup, data = await backup_helper.createFile(note="test")
backup = await drive.save(from_backup, data)
assert backup.note() == "test"