Refactor, add hon auth handler

This commit is contained in:
Andre Basche 2023-04-15 14:22:04 +02:00
parent d52d622785
commit 4a0ee8569b
7 changed files with 186 additions and 136 deletions

View file

@ -5,13 +5,14 @@ import secrets
import urllib
from datetime import datetime, timedelta
from pprint import pformat
from typing import List, Tuple
from urllib import parse
from urllib.parse import quote
from aiohttp import ClientResponse
from yarl import URL
from pyhon import const, exceptions
from pyhon.connection.handler.auth import HonAuthConnectionHandler
_LOGGER = logging.getLogger(__name__)
@ -22,6 +23,7 @@ class HonAuth:
def __init__(self, session, email, password, device) -> None:
self._session = session
self._request = HonAuthConnectionHandler(session)
self._email = email
self._password = password
self._access_token = ""
@ -29,26 +31,25 @@ class HonAuth:
self._cognito_token = ""
self._id_token = ""
self._device = device
self._called_urls: List[Tuple[int, str]] = []
self._expires: datetime = datetime.utcnow()
@property
def cognito_token(self):
def cognito_token(self) -> str:
return self._cognito_token
@property
def id_token(self):
def id_token(self) -> str:
return self._id_token
@property
def access_token(self):
def access_token(self) -> str:
return self._access_token
@property
def refresh_token(self):
def refresh_token(self) -> str:
return self._refresh_token
def _check_token_expiration(self, hours):
def _check_token_expiration(self, hours: int) -> bool:
return datetime.utcnow() >= self._expires + timedelta(hours=hours)
@property
@ -59,34 +60,38 @@ class HonAuth:
def token_expires_soon(self) -> bool:
return self._check_token_expiration(self._TOKEN_EXPIRE_WARNING_HOURS)
async def _error_logger(self, response, fail=True):
result = "hOn Authentication Error\n"
for i, (status, url) in enumerate(self._called_urls):
result += f" {i + 1: 2d} {status} - {url}\n"
result += f"ERROR - {response.status} - {response.request_info.url}\n"
result += f"{15 * '='} Response {15 * '='}\n{await response.text()}\n{40 * '='}"
_LOGGER.error(result)
async def _error_logger(self, response: ClientResponse, fail: bool = True) -> None:
output = "hOn Authentication Error\n"
for i, (status, url) in enumerate(self._request.called_urls):
output += f" {i + 1: 2d} {status} - {url}\n"
output += f"ERROR - {response.status} - {response.request_info.url}\n"
output += f"{15 * '='} Response {15 * '='}\n{await response.text()}\n{40 * '='}"
_LOGGER.error(output)
if fail:
raise exceptions.HonAuthenticationError("Can't login")
async def _load_login(self):
def _generate_nonce(self) -> str:
nonce = secrets.token_hex(16)
nonce = f"{nonce[:8]}-{nonce[8:12]}-{nonce[12:16]}-{nonce[16:20]}-{nonce[20:]}"
return f"{nonce[:8]}-{nonce[8:12]}-{nonce[12:16]}-{nonce[16:20]}-{nonce[20:]}"
async def _load_login(self):
login_url = await self._introduce()
login_url = await self._handle_redirects(login_url)
return await self._login_url(login_url)
async def _introduce(self) -> str:
redirect_uri = urllib.parse.quote(f"{const.APP}://mobilesdk/detect/oauth/done")
params = {
"response_type": "token+id_token",
"client_id": const.CLIENT_ID,
"redirect_uri": urllib.parse.quote(
f"{const.APP}://mobilesdk/detect/oauth/done"
),
"redirect_uri": redirect_uri,
"display": "touch",
"scope": "api openid refresh_token web",
"nonce": nonce,
"nonce": self._generate_nonce(),
}
params = "&".join([f"{k}={v}" for k, v in params.items()])
async with self._session.get(
f"{const.AUTH_API}/services/oauth2/authorize/expid_Login?{params}"
) as response:
self._called_urls.append((response.status, response.request_info.url))
url = f"{const.AUTH_API}/services/oauth2/authorize/expid_Login?{params}"
async with self._request.get(url) as response:
text = await response.text()
self._expires = datetime.utcnow()
if not (login_url := re.findall("url = '(.+?)'", text)):
@ -94,37 +99,30 @@ class HonAuth:
self._parse_token_data(text)
raise exceptions.HonNoAuthenticationNeeded()
await self._error_logger(response)
return False
async with self._session.get(login_url[0], allow_redirects=False) as redirect1:
self._called_urls.append((redirect1.status, redirect1.request_info.url))
if not (url := redirect1.headers.get("Location")):
await self._error_logger(redirect1)
return False
async with self._session.get(url, allow_redirects=False) as redirect2:
self._called_urls.append((redirect2.status, redirect2.request_info.url))
if not (
url := redirect2.headers.get("Location")
+ "&System=IoT_Mobile_App&RegistrationSubChannel=hOn"
):
await self._error_logger(redirect2)
return False
async with self._session.get(
URL(url, encoded=True), headers={"user-agent": const.USER_AGENT}
) as login_screen:
self._called_urls.append(
(login_screen.status, login_screen.request_info.url)
)
if context := re.findall(
'"fwuid":"(.*?)","loaded":(\\{.*?})', await login_screen.text()
):
return login_url[0]
async def _manual_redirect(self, url: str) -> str:
async with self._request.get(url, allow_redirects=False) as response:
if not (new_location := response.headers.get("Location")):
await self._error_logger(response)
return new_location
async def _handle_redirects(self, login_url) -> str:
redirect1 = await self._manual_redirect(login_url)
redirect2 = await self._manual_redirect(redirect1)
return f"{redirect2}&System=IoT_Mobile_App&RegistrationSubChannel=hOn"
async def _login_url(self, login_url: str) -> str:
headers = {"user-agent": const.USER_AGENT}
url = URL(login_url, encoded=True)
async with self._request.get(url, headers=headers) as response:
text = await response.text()
if context := re.findall('"fwuid":"(.*?)","loaded":(\\{.*?})', text):
fw_uid, loaded_str = context[0]
loaded = json.loads(loaded_str)
login_url = login_url[0].replace(
"/".join(const.AUTH_API.split("/")[:-1]), ""
)
return fw_uid, loaded, login_url
await self._error_logger(login_screen)
return False
result = login_url.replace("/".join(const.AUTH_API.split("/")[:-1]), "")
return fw_uid, loaded, result
await self._error_logger(response)
async def _login(self, fw_uid, loaded, login_url):
data = {
@ -157,13 +155,12 @@ class HonAuth:
"aura.token": None,
}
params = {"r": 3, "other.LightningLoginCustom.login": 1}
async with self._session.post(
async with self._request.post(
const.AUTH_API + "/s/sfsites/aura",
headers={"Content-Type": "application/x-www-form-urlencoded"},
data="&".join(f"{k}={json.dumps(v)}" for k, v in data.items()),
params=params,
) as response:
self._called_urls.append((response.status, response.request_info.url))
if response.status == 200:
try:
data = await response.json()
@ -186,8 +183,7 @@ class HonAuth:
self._id_token = id_token[0]
async def _get_token(self, url):
async with self._session.get(url) as response:
self._called_urls.append((response.status, response.request_info.url))
async with self._request.get(url) as response:
if response.status != 200:
await self._error_logger(response)
return False
@ -196,15 +192,13 @@ class HonAuth:
await self._error_logger(response)
return False
if "ProgressiveLogin" in url[0]:
async with self._session.get(url[0]) as response:
self._called_urls.append((response.status, response.request_info.url))
async with self._request.get(url[0]) as response:
if response.status != 200:
await self._error_logger(response)
return False
url = re.findall("href\\s*=\\s*[\"'](.*?)[\"']", await response.text())
url = "/".join(const.AUTH_API.split("/")[:-1]) + url[0]
async with self._session.get(url) as response:
self._called_urls.append((response.status, response.request_info.url))
async with self._request.get(url) as response:
if response.status != 200:
await self._error_logger(response)
return False
@ -214,10 +208,9 @@ class HonAuth:
async def _api_auth(self):
post_headers = {"id-token": self._id_token}
data = self._device.get()
async with self._session.post(
async with self._request.post(
f"{const.API_URL}/auth/v1/login", headers=post_headers, json=data
) as response:
self._called_urls.append((response.status, response.request_info.url))
try:
json_data = await response.json()
except json.JSONDecodeError:
@ -246,10 +239,9 @@ class HonAuth:
"refresh_token": self._refresh_token,
"grant_type": "refresh_token",
}
async with self._session.post(
async with self._request.post(
f"{const.AUTH_API}/services/oauth2/token", params=params
) as response:
self._called_urls.append((response.status, response.request_info.url))
if response.status >= 400:
await self._error_logger(response, fail=False)
return False
@ -261,7 +253,7 @@ class HonAuth:
def clear(self):
self._session.cookie_jar.clear_domain(const.AUTH_API.split("/")[-2])
self._called_urls = []
self._request.called_urls = []
self._cognito_token = ""
self._id_token = ""
self._access_token = ""