Files
hassio-addons-avm/hassio-google-drive-backup/tests/helpers.py

220 lines
6.4 KiB
Python

import json
import tarfile
import pytest
import platform
import os
from datetime import datetime
from io import BytesIO, IOBase
from aiohttp import ClientSession
from injector import inject, singleton
from backup.util import AsyncHttpGetter
from backup.model import SimulatedSource
from backup.time import Time
from backup.config import CreateOptions
all_folders = [
"share",
"ssl",
"addons/local"
]
all_addons = [
{
"name": "Sexy Robots",
"slug": "sexy_robots",
"description": "The robots you already know, but sexier. See what they don't want you to see.",
"version": "0.69",
"size": 1,
"logo": True,
"state": "started"
},
{
"name": "Particle Accelerator",
"slug": "particla_accel",
"description": "What CAN'T you do with Home Assistant?",
"version": "0.5",
"size": 500.3,
"logo": True,
"state": "started"
},
{
"name": "Empty Addon",
"slug": "addon_empty",
"description": "Explore the meaning of the universe by contemplating whats missing.",
"version": "0.-1",
"size": 1024 * 1024 * 1024 * 21.2,
"logo": False,
"state": "started"
}
]
def skipForWindows():
if platform.system() == "Windows":
pytest.skip("This test can't be run in windows environments")
def skipForRoot():
if os.getuid() == 0:
pytest.skip("This test can't be run as root")
def createBackupTar(slug: str, name: str, date: datetime, padSize: int, included_folders=None, included_addons=None, password=None) -> BytesIO:
backup_type = "full"
haVersion = None
if included_folders is not None:
folders = []
for folder in included_folders:
if folder == "homeassistant":
haVersion = "0.92.2"
else:
folders.append(folder)
else:
folders = all_folders.copy()
haVersion = "0.92.2"
if included_addons is not None:
backup_type = "partial"
addons = []
for addon in all_addons:
if addon['slug'] in included_addons:
addons.append(addon)
else:
addons = all_addons.copy()
backup_info = {
"slug": slug,
"name": name,
"date": date.isoformat(),
"type": backup_type,
"protected": password is not None,
"homeassistant": haVersion,
"folders": folders,
"addons": addons,
"repositories": [
"https://github.com/hassio-addons/repository"
]
}
stream = BytesIO()
tar = tarfile.open(fileobj=stream, mode="w")
add(tar, "backup.json", BytesIO(json.dumps(backup_info).encode()))
add(tar, "padding.dat", getTestStream(padSize))
tar.close()
stream.seek(0)
stream.size = lambda: len(stream.getbuffer())
return stream
def add(tar, name, stream):
info = tarfile.TarInfo(name)
info.size = len(stream.getbuffer())
stream.seek(0)
tar.addfile(info, stream)
def parseBackupInfo(stream: BytesIO):
with tarfile.open(fileobj=stream, mode="r") as tar:
info = tar.getmember("backup.json")
with tar.extractfile(info) as f:
backup_data = json.load(f)
backup_data['size'] = float(
round(len(stream.getbuffer()) / 1024.0 / 1024.0, 2))
backup_data['version'] = 'dev'
return backup_data
def getTestStream(size: int):
"""
Produces a stream of repeating prime sequences to avoid accidental repetition
"""
arr = bytearray()
while True:
for prime in [4759, 4783, 4787, 4789, 4793, 4799, 4801, 4813, 4817, 4831, 4861, 4871, 4877, 4889, 4903, 4909, 4919, 4931, 4933, 4937]:
for x in range(prime):
if len(arr) < size:
arr.append(x % 255)
else:
break
if len(arr) >= size:
break
if len(arr) >= size:
break
return BytesIO(arr)
async def compareStreams(left, right):
await left.setup()
await right.setup()
while True:
from_left = await left.read(1024 * 1024)
from_right = await right.read(1024 * 1024)
if len(from_left.getbuffer()) == 0:
assert len(from_right.getbuffer()) == 0
break
if from_left.getbuffer() != from_right.getbuffer():
print("break!")
assert from_left.getbuffer() == from_right.getbuffer()
class IntentionalFailure(Exception):
pass
class HelperTestSource(SimulatedSource):
def __init__(self, name, is_destination=False):
super().__init__(name, is_destination=is_destination)
self.allow_create = True
self.allow_save = True
self.queries = 0
def reset(self):
self.saved = []
self.deleted = []
self.created = []
self.queries = 0
@property
def query_count(self):
return self.queries
async def get(self):
self.queries += 1
return await super().get()
def assertThat(self, created=0, deleted=0, saved=0, current=0):
assert len(self.saved) == saved
assert len(self.deleted) == deleted
assert len(self.created) == created
assert len(self.current) == current
return self
def assertUnchanged(self):
self.assertThat(current=len(self.current))
return self
async def create(self, options: CreateOptions):
if not self.allow_create:
raise IntentionalFailure()
return await super().create(options)
async def save(self, backup, bytes: IOBase = None):
if not self.allow_save:
raise IntentionalFailure()
return await super().save(backup, bytes=bytes)
@singleton
class Uploader():
@inject
def __init__(self, host, session: ClientSession, time: Time):
self.host = host
self.session = session
self.time = time
async def upload(self, data) -> AsyncHttpGetter:
async with await self.session.post(self.host + "/uploadfile", data=data) as resp:
resp.raise_for_status()
source = AsyncHttpGetter(self.host + "/readfile", {}, self.session, time=self.time)
return source