Create data archive and use it to test

This commit is contained in:
Andre Basche 2023-06-25 17:29:04 +02:00
parent 66cb7bcc24
commit ef67188b93
6 changed files with 248 additions and 102 deletions

View file

@ -1,8 +1,9 @@
import json
import logging
from datetime import datetime
from pathlib import Path
from pprint import pformat
from typing import Dict, Optional
from typing import Dict, Optional, Any, List, no_type_check
from aiohttp import ClientSession
from typing_extensions import Self
@ -66,11 +67,13 @@ class HonAPI:
).create()
return self
async def load_appliances(self) -> Dict:
async def load_appliances(self) -> List[Dict[str, Any]]:
async with self._hon.get(f"{const.API_URL}/commands/v1/appliance") as resp:
return await resp.json()
if result := await resp.json():
return result.get("payload", {}).get("appliances", {})
return []
async def load_commands(self, appliance: HonAppliance) -> Dict:
async def load_commands(self, appliance: HonAppliance) -> Dict[str, Any]:
params: Dict = {
"applianceType": appliance.appliance_type,
"applianceModelId": appliance.appliance_model_id,
@ -93,27 +96,29 @@ class HonAPI:
return {}
return result
async def command_history(self, appliance: HonAppliance) -> Dict:
async def load_command_history(
self, appliance: HonAppliance
) -> List[Dict[str, Any]]:
url: str = (
f"{const.API_URL}/commands/v1/appliance/{appliance.mac_address}/history"
)
async with self._hon.get(url) as response:
result: Dict = await response.json()
if not result or not result.get("payload"):
return {}
return []
return result["payload"]["history"]
async def command_favourites(self, appliance: HonAppliance) -> Dict:
async def load_favourites(self, appliance: HonAppliance) -> List[Dict[str, Any]]:
url: str = (
f"{const.API_URL}/commands/v1/appliance/{appliance.mac_address}/favourite"
)
async with self._hon.get(url) as response:
result: Dict = await response.json()
if not result or not result.get("payload"):
return {}
return []
return result["payload"]["favourites"]
async def last_activity(self, appliance: HonAppliance) -> Dict:
async def load_last_activity(self, appliance: HonAppliance) -> Dict[str, Any]:
url: str = f"{const.API_URL}/commands/v1/retrieve-last-activity"
params: Dict = {"macAddress": appliance.mac_address}
async with self._hon.get(url, params=params) as response:
@ -122,19 +127,19 @@ class HonAPI:
return activity
return {}
async def appliance_model(self, appliance: HonAppliance) -> Dict:
async def load_appliance_data(self, appliance: HonAppliance) -> Dict[str, Any]:
url: str = f"{const.API_URL}/commands/v1/appliance-model"
params: Dict = {
"code": appliance.info["code"],
"code": appliance.code,
"macAddress": appliance.mac_address,
}
async with self._hon.get(url, params=params) as response:
result: Dict = await response.json()
if result and (activity := result.get("attributes")):
return activity
if result:
return result.get("payload", {}).get("applianceModel", {})
return {}
async def load_attributes(self, appliance: HonAppliance) -> Dict:
async def load_attributes(self, appliance: HonAppliance) -> Dict[str, Any]:
params: Dict = {
"macAddress": appliance.mac_address,
"applianceType": appliance.appliance_type,
@ -144,7 +149,7 @@ class HonAPI:
async with self._hon.get(url, params=params) as response:
return (await response.json()).get("payload", {})
async def load_statistics(self, appliance: HonAppliance) -> Dict:
async def load_statistics(self, appliance: HonAppliance) -> Dict[str, Any]:
params: Dict = {
"macAddress": appliance.mac_address,
"applianceType": appliance.appliance_type,
@ -153,7 +158,7 @@ class HonAPI:
async with self._hon.get(url, params=params) as response:
return (await response.json()).get("payload", {})
async def load_maintenance(self, appliance: HonAppliance):
async def load_maintenance(self, appliance: HonAppliance) -> Dict[str, Any]:
url = f"{const.API_URL}/commands/v1/maintenance-cycle"
params = {"macAddress": appliance.mac_address}
async with self._hon.get(url, params=params) as response:
@ -192,7 +197,7 @@ class HonAPI:
_LOGGER.error("%s - Payload:\n%s", url, pformat(data))
return False
async def appliance_configuration(self) -> Dict:
async def appliance_configuration(self) -> Dict[str, Any]:
url: str = f"{const.API_URL}/config/v1/program-list-rules"
async with self._hon_anonymous.get(url) as response:
result: Dict = await response.json()
@ -200,7 +205,9 @@ class HonAPI:
return data
return {}
async def app_config(self, language: str = "en", beta: bool = True) -> Dict:
async def app_config(
self, language: str = "en", beta: bool = True
) -> Dict[str, Any]:
url: str = f"{const.API_URL}/app-config"
payload_data: Dict = {
"languageCode": language,
@ -214,7 +221,7 @@ class HonAPI:
return data
return {}
async def translation_keys(self, language: str = "en") -> Dict:
async def translation_keys(self, language: str = "en") -> Dict[str, Any]:
config = await self.app_config(language=language)
if url := config.get("language", {}).get("jsonPath"):
async with self._hon_anonymous.get(url) as response:
@ -227,3 +234,61 @@ class HonAPI:
await self._hon_handler.close()
if self._hon_anonymous_handler is not None:
await self._hon_anonymous_handler.close()
class TestAPI(HonAPI):
def __init__(self, path):
super().__init__()
self._anonymous = True
self._path: Path = path
def _load_json(self, appliance: HonAppliance, file) -> Dict[str, Any]:
directory = f"{appliance.appliance_type}_{appliance.appliance_model_id}".lower()
path = f"{self._path}/{directory}/{file}.json"
with open(path, "r", encoding="utf-8") as json_file:
return json.loads(json_file.read())
async def load_appliances(self) -> List[Dict[str, Any]]:
result = []
for appliance in self._path.glob("*/"):
with open(
appliance / "appliance_data.json", "r", encoding="utf-8"
) as json_file:
result.append(json.loads(json_file.read()))
return result
async def load_commands(self, appliance: HonAppliance) -> Dict[str, Any]:
return self._load_json(appliance, "commands")
@no_type_check
async def load_command_history(
self, appliance: HonAppliance
) -> List[Dict[str, Any]]:
return self._load_json(appliance, "command_history")
async def load_favourites(self, appliance: HonAppliance) -> List[Dict[str, Any]]:
return []
async def load_last_activity(self, appliance: HonAppliance) -> Dict[str, Any]:
return {}
async def load_appliance_data(self, appliance: HonAppliance) -> Dict[str, Any]:
return self._load_json(appliance, "appliance_data")
async def load_attributes(self, appliance: HonAppliance) -> Dict[str, Any]:
return self._load_json(appliance, "attributes")
async def load_statistics(self, appliance: HonAppliance) -> Dict[str, Any]:
return self._load_json(appliance, "statistics")
async def load_maintenance(self, appliance: HonAppliance) -> Dict[str, Any]:
return self._load_json(appliance, "maintenance")
async def send_command(
self,
appliance: HonAppliance,
command: str,
parameters: Dict,
ancillary_parameters: Dict,
) -> bool:
return True