Files

428 lines
12 KiB
Python

import json
import logging
import os
import tempfile
import asyncio
import platform
import aiohttp
from yarl import URL
import pytest
from aiohttp import ClientSession
from injector import (ClassAssistedBuilder, Injector, Module, inject, provider,
singleton)
from backup.config import Config, Setting
from backup.model import Coordinator
from dev.simulationserver import SimulationServer
from backup.drive import DriveRequests, DriveSource, FolderFinder, AuthCodeQuery
from backup.util import GlobalInfo, Estimator, Resolver, DataCache
from backup.ha import HaRequests, HaSource, HaUpdater
from backup.logger import reset
from backup.model import DummyBackup, DestinationPrecache, Model
from backup.time import Time
from backup.module import BaseModule
from backup.debugworker import DebugWorker
from backup.creds import Creds, DriveRequester
from backup.server import ErrorStore
from backup.ha import AddonStopper
from backup.ui import UiServer
from backup.watcher import Watcher
from .faketime import FakeTime
from .helpers import Uploader, createBackupTar
from dev.ports import Ports
from dev.simulated_google import SimulatedGoogle
from dev.request_interceptor import RequestInterceptor
from dev.simulated_supervisor import SimulatedSupervisor
@singleton
class FsFaker():
@inject
def __init__(self):
self.bytes_free = 1024 * 1024 * 1024
self.bytes_total = 1024 * 1024 * 1024
self.old_method = None
def start(self):
if platform.system() != "Windows":
self.old_method = os.statvfs
os.statvfs = self._hijack
def stop(self):
if platform.system() != "Windows":
os.statvfs = self.old_method
def _hijack(self, path):
return os.statvfs_result((0, 1, int(self.bytes_total), int(self.bytes_free), int(self.bytes_free), 0, 0, 0, 0, 255))
def setFreeBytes(self, bytes_free, bytes_total=1):
self.bytes_free = bytes_free
self.bytes_total = bytes_total
if self.bytes_free > self.bytes_total:
self.bytes_total = self.bytes_free
class ReaderHelper:
def __init__(self, session, ui_port, ingress_port):
self.session = session
self.ui_port = ui_port
self.ingress_port = ingress_port
self.timeout = aiohttp.ClientTimeout(total=20)
def getUrl(self, ingress=True, ssl=False):
if ssl:
protocol = "https"
else:
protocol = "http"
if ingress:
return protocol + "://localhost:" + str(self.ingress_port) + "/"
else:
return protocol + "://localhost:" + str(self.ui_port) + "/"
async def getjson(self, path, status=200, json=None, auth=None, ingress=True, ssl=False, sslcontext=None):
async with self.session.get(self.getUrl(ingress, ssl) + path, json=json, auth=auth, ssl=sslcontext, timeout=self.timeout) as resp:
assert resp.status == status
return await resp.json()
async def get(self, path, status=200, json=None, auth=None, ingress=True, ssl=False):
async with self.session.get(self.getUrl(ingress, ssl) + path, json=json, auth=auth, timeout=self.timeout) as resp:
if resp.status != status:
import logging
logging.getLogger().error(resp.text())
assert resp.status == status
return await resp.text()
async def postjson(self, path, status=200, json=None, ingress=True):
async with self.session.post(self.getUrl(ingress) + path, json=json, timeout=self.timeout) as resp:
assert resp.status == status
return await resp.json()
async def assertError(self, path, error_type="generic_error", status=500, ingress=True, json=None):
logging.getLogger().info("Requesting " + path)
data = await self.getjson(path, status=status, ingress=ingress, json=json)
assert data['error_type'] == error_type
# This module should onyl ever have bindings that can also be satisfied by MainModule
class TestModule(Module):
def __init__(self, config: Config, ports: Ports):
self.ports = ports
self.config = config
@provider
@singleton
def getDriveCreds(self, time: Time) -> Creds:
return Creds(time, "test_client_id", time.now(), "test_access_token", "test_refresh_token", "test_client_secret")
@provider
@singleton
def getTime(self) -> Time:
return FakeTime()
@provider
@singleton
def getPorts(self) -> Ports:
return self.ports
@provider
@singleton
def getConfig(self) -> Config:
return self.config
@pytest.fixture
def event_loop():
if platform.system() == "Windows":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
return asyncio.new_event_loop()
@pytest.fixture
async def generate_config(server_url: URL, ports, cleandir):
return Config.withOverrides({
Setting.DRIVE_URL: str(server_url),
Setting.SUPERVISOR_URL: str(server_url) + "/",
Setting.AUTHORIZATION_HOST: str(server_url),
Setting.TOKEN_SERVER_HOSTS: str(server_url),
Setting.DRIVE_REFRESH_URL: str(server_url.with_path("/oauth2/v4/token")),
Setting.DRIVE_AUTHORIZE_URL: str(server_url.with_path("/o/oauth2/v2/auth")),
Setting.DRIVE_TOKEN_URL: str(server_url.with_path("/token")),
Setting.DRIVE_DEVICE_CODE_URL: str(server_url.with_path("/device/code")),
Setting.SUPERVISOR_TOKEN: "test_header",
Setting.SECRETS_FILE_PATH: "secrets.yaml",
Setting.CREDENTIALS_FILE_PATH: "credentials.dat",
Setting.FOLDER_FILE_PATH: "folder.dat",
Setting.RETAINED_FILE_PATH: "retained.json",
Setting.ID_FILE_PATH: "id.json",
Setting.DATA_CACHE_FILE_PATH: "data_cache.json",
Setting.STOP_ADDON_STATE_PATH: "stop_addon.json",
Setting.INGRESS_TOKEN_FILE_PATH: "ingress.dat",
Setting.DEFAULT_DRIVE_CLIENT_ID: "test_client_id",
Setting.DEFAULT_DRIVE_CLIENT_SECRET: "test_client_secret",
Setting.BACKUP_DIRECTORY_PATH: os.path.join(cleandir, "backups"),
Setting.PORT: ports.ui,
Setting.INGRESS_PORT: ports.ingress,
Setting.BACKUP_STARTUP_DELAY_MINUTES: 0,
Setting.PING_TIMEOUT: 0.1,
})
@pytest.fixture
async def injector(cleandir, ports, generate_config):
drive_creds = Creds(FakeTime(), "test_client_id", None, "test_access_token", "test_refresh_token")
os.mkdir(os.path.join(cleandir, "backups"))
with open(os.path.join(cleandir, "secrets.yaml"), "w") as f:
f.write("for_unit_tests: \"password value\"\n")
with open(os.path.join(cleandir, "credentials.dat"), "w") as f:
f.write(json.dumps(drive_creds.serialize()))
return Injector([BaseModule(), TestModule(generate_config, ports)])
@pytest.fixture
async def ui_server(injector, server):
os.mkdir("static")
server = injector.get(UiServer)
await server.run()
yield server
await server.shutdown()
@pytest.fixture
def reader(server, ui_server, session, ui_port, ingress_port):
return ReaderHelper(session, ui_port, ingress_port)
@pytest.fixture
async def uploader(injector: Injector, server_url):
return injector.get(ClassAssistedBuilder[Uploader]).build(host=str(server_url))
@pytest.fixture
async def google(injector: Injector):
return injector.get(SimulatedGoogle)
@pytest.fixture
async def interceptor(injector: Injector):
return injector.get(RequestInterceptor)
@pytest.fixture
async def supervisor(injector: Injector, server, session):
return injector.get(SimulatedSupervisor)
@pytest.fixture
async def addon_stopper(injector: Injector):
return injector.get(AddonStopper)
@pytest.fixture
async def server(injector, port, drive_creds: Creds, session):
server = injector.get(SimulationServer)
# start the server
logging.getLogger().info("Starting SimulationServer on port " + str(port))
await server.start(port)
yield server
await server.stop()
@pytest.fixture
async def data_cache(injector):
return injector.get(DataCache)
@pytest.fixture
async def session(injector):
async with injector.get(ClientSession) as session:
yield session
@pytest.fixture
async def precache(injector):
return injector.get(DestinationPrecache)
@pytest.fixture
async def backup(coord, source, dest):
await coord.sync()
assert len(coord.backups()) == 1
return coord.backups()[0]
@pytest.fixture
async def fs(injector):
faker = injector.get(FsFaker)
faker.start()
yield faker
faker.stop()
@pytest.fixture
async def estimator(injector, fs):
return injector.get(Estimator)
@pytest.fixture
async def device_code(injector):
return injector.get(AuthCodeQuery)
@pytest.fixture
async def error_store(injector):
return injector.get(ErrorStore)
@pytest.fixture
async def model(injector):
return injector.get(Model)
@pytest.fixture
async def global_info(injector):
return injector.get(GlobalInfo)
@pytest.fixture
async def server_url(port):
return URL("http://localhost:").with_port(port)
@pytest.fixture
async def ports(unused_tcp_port_factory):
return Ports(unused_tcp_port_factory(), unused_tcp_port_factory(), unused_tcp_port_factory())
@pytest.fixture
async def port(ports: Ports):
return ports.server
@pytest.fixture
async def ui_url(ports: Ports):
return URL("http://localhost").with_port(ports.ingress)
@pytest.fixture
async def ui_port(ports: Ports):
return ports.ui
@pytest.fixture
async def ingress_port(ports: Ports):
return ports.ingress
@pytest.fixture
async def coord(injector):
return injector.get(Coordinator)
@pytest.fixture()
async def updater(injector):
return injector.get(HaUpdater)
@pytest.fixture()
async def cleandir():
newpath = tempfile.mkdtemp()
os.chdir(newpath)
return newpath
@pytest.fixture
async def time(injector):
reset()
return injector.get(Time)
@pytest.fixture
async def config(injector):
return injector.get(Config)
@pytest.fixture
async def drive_creds(injector):
return injector.get(Creds)
@pytest.fixture
async def drive(injector, server, session):
return injector.get(DriveSource)
@pytest.fixture
async def ha(injector, server, session):
return injector.get(HaSource)
@pytest.fixture
async def ha_requests(injector, server):
return injector.get(HaRequests)
@pytest.fixture
async def drive_requests(injector, server):
return injector.get(DriveRequests)
@pytest.fixture
async def drive_requester(injector, server):
return injector.get(DriveRequester)
@pytest.fixture(autouse=True)
def verify_closed_responses(drive_requester: DriveRequester):
yield "unused"
for resp in drive_requester.all_resposnes:
assert resp.closed
@pytest.fixture
async def resolver(injector):
return injector.get(Resolver)
@pytest.fixture
async def client_identifier(injector):
return injector.get(Config).clientIdentifier()
@pytest.fixture
async def debug_worker(injector):
return injector.get(DebugWorker)
@pytest.fixture()
async def folder_finder(injector):
return injector.get(FolderFinder)
@pytest.fixture()
async def watcher(injector):
watcher = injector.get(Watcher)
yield watcher
await watcher.stop()
class BackupHelper():
def __init__(self, uploader, time):
self.time = time
self.uploader = uploader
async def createFile(self, size=1024 * 1024 * 2, slug="testslug", name="Test Name"):
from_backup: DummyBackup = DummyBackup(
name, self.time.toUtc(self.time.local(1985, 12, 6)), "fake source", slug)
data = await self.uploader.upload(createBackupTar(slug, name, self.time.now(), size))
return from_backup, data
@pytest.fixture
def backup_helper(uploader, time):
return BackupHelper(uploader, time)