mirror of
https://github.com/nathom/streamrip.git
synced 2025-05-15 07:34:48 -04:00
merge
This commit is contained in:
commit
583ae4edc4
8 changed files with 341 additions and 50 deletions
4
setup.py
4
setup.py
|
@ -14,13 +14,13 @@ requirements = read_file("requirements.txt").strip().split()
|
|||
# https://github.com/pypa/sampleproject/blob/main/setup.py
|
||||
setup(
|
||||
name=pkg_name,
|
||||
version="0.2.4",
|
||||
version="0.2.5",
|
||||
author='Nathan',
|
||||
author_email='nathanthomas707@gmail.com',
|
||||
keywords='lossless, hi-res, qobuz, tidal, deezer, audio, convert',
|
||||
description="A stream downloader for Qobuz, Tidal, and Deezer.",
|
||||
long_description=read_file("README.md"),
|
||||
long_description_content_type='text/markdown',
|
||||
long_description_content_type="text/markdown",
|
||||
install_requires=requirements,
|
||||
py_modules=["streamrip"],
|
||||
entry_points={
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
import logging
|
||||
from getpass import getpass
|
||||
import os
|
||||
from getpass import getpass
|
||||
|
||||
import click
|
||||
|
||||
|
@ -21,15 +21,13 @@ if not os.path.isdir(CACHE_DIR):
|
|||
@click.group(invoke_without_command=True)
|
||||
@click.option("-c", "--convert", metavar="CODEC")
|
||||
@click.option("-u", "--urls", metavar="URLS")
|
||||
@click.option("-t", "--text", metavar='PATH')
|
||||
@click.option("-t", "--text", metavar="PATH")
|
||||
@click.option("-nd", "--no-db", is_flag=True)
|
||||
@click.option("--debug", is_flag=True)
|
||||
@click.option("--reset-config", is_flag=True)
|
||||
@click.pass_context
|
||||
def cli(ctx, **kwargs):
|
||||
"""
|
||||
|
||||
"""
|
||||
""""""
|
||||
global config
|
||||
global core
|
||||
|
||||
|
@ -53,10 +51,10 @@ def cli(ctx, **kwargs):
|
|||
logger.debug(f"handling {kwargs['urls']}")
|
||||
core.handle_urls(kwargs["urls"])
|
||||
|
||||
if kwargs['text'] is not None:
|
||||
if os.path.isfile(kwargs['text']):
|
||||
if kwargs["text"] is not None:
|
||||
if os.path.isfile(kwargs["text"]):
|
||||
logger.debug(f"Handling {kwargs['text']}")
|
||||
core.handle_txt(kwargs['text'])
|
||||
core.handle_txt(kwargs["text"])
|
||||
else:
|
||||
click.secho(f"Text file {kwargs['text']} does not exist.")
|
||||
|
||||
|
@ -176,24 +174,24 @@ def discover(ctx, **kwargs):
|
|||
|
||||
|
||||
@cli.command()
|
||||
@click.option("-o", "--open", is_flag=True, help='Open the config file')
|
||||
@click.option("-q", "--qobuz", is_flag=True, help='Set Qobuz credentials')
|
||||
@click.option("-t", "--tidal", is_flag=True, help='Set Tidal credentials')
|
||||
@click.option("-o", "--open", is_flag=True, help="Open the config file")
|
||||
@click.option("-q", "--qobuz", is_flag=True, help="Set Qobuz credentials")
|
||||
@click.option("-t", "--tidal", is_flag=True, help="Set Tidal credentials")
|
||||
@click.pass_context
|
||||
def config(ctx, **kwargs):
|
||||
"""Manage the streamrip configuration."""
|
||||
|
||||
if kwargs['open']:
|
||||
if kwargs["open"]:
|
||||
click.launch(CONFIG_PATH)
|
||||
|
||||
if kwargs['qobuz']:
|
||||
config.file['qobuz']['email'] = input("Qobuz email: ")
|
||||
config.file['qobuz']['password'] = getpass()
|
||||
if kwargs["qobuz"]:
|
||||
config.file["qobuz"]["email"] = input("Qobuz email: ")
|
||||
config.file["qobuz"]["password"] = getpass()
|
||||
config.save()
|
||||
|
||||
if kwargs['tidal']:
|
||||
config.file['tidal']['email'] = input("Tidal email: ")
|
||||
config.file['tidal']['password'] = getpass()
|
||||
if kwargs["tidal"]:
|
||||
config.file["tidal"]["email"] = input("Tidal email: ")
|
||||
config.file["tidal"]["password"] = getpass()
|
||||
config.save()
|
||||
|
||||
|
||||
|
|
|
@ -1,13 +1,18 @@
|
|||
import base64
|
||||
import datetime
|
||||
import click
|
||||
import hashlib
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
# import sys
|
||||
import time
|
||||
from abc import ABC, abstractmethod
|
||||
from pprint import pformat # , pprint
|
||||
from typing import Generator, Sequence, Tuple, Union
|
||||
|
||||
import click
|
||||
import requests
|
||||
from requests.packages import urllib3
|
||||
import tidalapi
|
||||
from dogpile.cache import make_region
|
||||
|
||||
|
@ -29,12 +34,22 @@ from .exceptions import (
|
|||
)
|
||||
from .spoofbuz import Spoofer
|
||||
|
||||
urllib3.disable_warnings()
|
||||
requests.adapters.DEFAULT_RETRIES = 5
|
||||
|
||||
os.makedirs(CACHE_DIR, exist_ok=True)
|
||||
region = make_region().configure(
|
||||
"dogpile.cache.dbm",
|
||||
arguments={"filename": os.path.join(CACHE_DIR, "clients.db")},
|
||||
)
|
||||
|
||||
TIDAL_BASE = "https://api.tidalhifi.com/v1"
|
||||
TIDAL_AUTH_URL = "https://auth.tidal.com/v1/oauth2"
|
||||
TIDAL_CLIENT_INFO = {
|
||||
"id": "aR7gUaTK1ihpXOEP",
|
||||
"secret": "eVWBEkuL2FCjxgjOkR3yK0RYZEbcrMXRc2l8fU3ZCdE=",
|
||||
}
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
TRACK_CACHE_TIME = datetime.timedelta(weeks=2).total_seconds()
|
||||
|
@ -123,7 +138,7 @@ class QobuzClient(ClientInterface):
|
|||
:type pwd: str
|
||||
:param kwargs: app_id: str, secrets: list, return_secrets: bool
|
||||
"""
|
||||
click.secho(f"Logging into {self.source}", fg='green')
|
||||
click.secho(f"Logging into {self.source}", fg="green")
|
||||
if self.logged_in:
|
||||
logger.debug("Already logged in")
|
||||
return
|
||||
|
@ -416,6 +431,7 @@ class DeezerClient(ClientInterface):
|
|||
return url
|
||||
|
||||
|
||||
'''
|
||||
class TidalClient(ClientInterface):
|
||||
source = "tidal"
|
||||
|
||||
|
@ -423,7 +439,7 @@ class TidalClient(ClientInterface):
|
|||
self.logged_in = False
|
||||
|
||||
def login(self, email: str, pwd: str):
|
||||
click.secho(f"Logging into {self.source}", fg='green')
|
||||
click.secho(f"Logging into {self.source}", fg="green")
|
||||
if self.logged_in:
|
||||
return
|
||||
|
||||
|
@ -499,3 +515,217 @@ class TidalClient(ClientInterface):
|
|||
resp = self.session.request("GET", f"tracks/{track_id}/streamUrl", params)
|
||||
resp.raise_for_status()
|
||||
return resp.json()
|
||||
'''
|
||||
|
||||
|
||||
class TidalClient(ClientInterface):
|
||||
source = "tidal"
|
||||
|
||||
def __init__(self):
|
||||
self.logged_in = False
|
||||
|
||||
self.device_code = None
|
||||
self.user_code = None
|
||||
self.verification_url = None
|
||||
self.auth_check_timeout = None
|
||||
self.auth_check_interval = None
|
||||
self.user_id = None
|
||||
self.country_code = None
|
||||
self.access_token = None
|
||||
self.refresh_token = None
|
||||
self.expiry = None
|
||||
|
||||
def login(
|
||||
self,
|
||||
user_id=None,
|
||||
country_code=None,
|
||||
access_token=None,
|
||||
token_expiry=None,
|
||||
refresh_token=None,
|
||||
):
|
||||
if access_token is not None:
|
||||
self.token_expiry = token_expiry
|
||||
self.refresh_token = refresh_token
|
||||
if self.token_expiry - time.time() < 86400: # 1 day
|
||||
self._refresh_access_token()
|
||||
else:
|
||||
self._login_by_access_token(access_token, user_id)
|
||||
else:
|
||||
self._login_new_user()
|
||||
|
||||
self.logged_in = True
|
||||
click.secho("Logged into Tidal", fg='green')
|
||||
|
||||
def get(self, item_id, media_type):
|
||||
return self._api_get(item_id, media_type)
|
||||
|
||||
def search(self, query, media_type="album", limit: int = 100):
|
||||
params = {
|
||||
"query": query,
|
||||
"limit": limit,
|
||||
}
|
||||
return self._api_get(f"search/{media_type}s", params=params)
|
||||
|
||||
def get_file_url(self, track_id, quality: int = 7):
|
||||
params = {
|
||||
"audioquality": TIDAL_Q_IDS[quality],
|
||||
"playbackmode": "STREAM",
|
||||
"assetpresentation": "FULL",
|
||||
}
|
||||
resp = self._api_request(f"tracks/{track_id}/playbackinfopostpaywall", params)
|
||||
manifest = json.loads(base64.b64decode(resp["manifest"]).decode("utf-8"))
|
||||
return {
|
||||
"url": manifest["urls"][0],
|
||||
"enc_key": manifest.get("keyId"),
|
||||
"codec": manifest["codecs"],
|
||||
}
|
||||
|
||||
def _login_new_user(self, launch=True):
|
||||
login_link = f"https://{self._get_device_code()}"
|
||||
|
||||
click.secho(
|
||||
f"Go to {login_link} to log into Tidal within 5 minutes.", fg="blue"
|
||||
)
|
||||
if launch:
|
||||
click.launch(login_link)
|
||||
|
||||
start = time.time()
|
||||
elapsed = 0
|
||||
while elapsed < 600: # 5 mins to login
|
||||
elapsed = time.time() - start
|
||||
status = self._check_auth_status()
|
||||
if status == 2:
|
||||
# pending
|
||||
time.sleep(4)
|
||||
continue
|
||||
elif status == 1:
|
||||
# error checking
|
||||
raise Exception
|
||||
elif status == 0:
|
||||
# successful
|
||||
break
|
||||
else:
|
||||
raise Exception
|
||||
|
||||
def _get_device_code(self):
|
||||
data = {
|
||||
"client_id": TIDAL_CLIENT_INFO["id"],
|
||||
"scope": "r_usr+w_usr+w_sub",
|
||||
}
|
||||
resp = self._api_post(f"{TIDAL_AUTH_URL}/device_authorization", data)
|
||||
|
||||
if "status" in resp and resp["status"] != 200:
|
||||
raise Exception(f"Device authorization failed {resp}")
|
||||
|
||||
logger.debug(pformat(resp))
|
||||
self.device_code = resp["deviceCode"]
|
||||
self.user_code = resp["userCode"]
|
||||
self.user_code_expiry = resp["expiresIn"]
|
||||
self.auth_interval = resp["interval"]
|
||||
return resp["verificationUriComplete"]
|
||||
|
||||
def _check_auth_status(self):
|
||||
data = {
|
||||
"client_id": TIDAL_CLIENT_INFO["id"],
|
||||
"device_code": self.device_code,
|
||||
"grant_type": "urn:ietf:params:oauth:grant-type:device_code",
|
||||
"scope": "r_usr+w_usr+w_sub",
|
||||
}
|
||||
logger.debug(data)
|
||||
resp = self._api_post(
|
||||
f"{TIDAL_AUTH_URL}/token",
|
||||
data,
|
||||
(TIDAL_CLIENT_INFO["id"], TIDAL_CLIENT_INFO["secret"]),
|
||||
)
|
||||
logger.debug(resp)
|
||||
|
||||
if resp.get("status", 200) != 200:
|
||||
if resp["status"] == 400 and resp["sub_status"] == 1002:
|
||||
return 2
|
||||
else:
|
||||
return 1
|
||||
|
||||
self.user_id = resp["user"]["userId"]
|
||||
self.country_code = resp["user"]["countryCode"]
|
||||
self.access_token = resp["access_token"]
|
||||
self.refresh_token = resp["refresh_token"]
|
||||
self.token_expiry = resp["expires_in"] + time.time()
|
||||
return 0
|
||||
|
||||
def _verify_access_token(self, token):
|
||||
headers = {
|
||||
"authorization": f"Bearer {token}",
|
||||
}
|
||||
r = requests.get("https://api.tidal.com/v1/sessions", headers=headers).json()
|
||||
if r.status != 200:
|
||||
raise Exception("Login failed")
|
||||
|
||||
return True
|
||||
|
||||
def _refresh_access_token(self):
|
||||
data = {
|
||||
"client_id": TIDAL_CLIENT_INFO["id"],
|
||||
"refresh_token": self.refresh_token,
|
||||
"grant_type": "refresh_token",
|
||||
"scope": "r_usr+w_usr+w_sub",
|
||||
}
|
||||
resp = self._api_post(
|
||||
f"{TIDAL_AUTH_URL}/token",
|
||||
data,
|
||||
(TIDAL_CLIENT_INFO["id"], TIDAL_CLIENT_INFO["secret"]),
|
||||
)
|
||||
|
||||
if resp.get("status", 200) != 200:
|
||||
raise Exception("Refresh failed")
|
||||
|
||||
self.user_id = resp["user"]["userId"]
|
||||
self.country_code = resp["user"]["countryCode"]
|
||||
self.access_token = resp["access_token"]
|
||||
self.token_expiry = resp["expires_in"] + time.time()
|
||||
|
||||
def _login_by_access_token(self, token, user_id=None):
|
||||
headers = {"authorization": f"Bearer {token}"}
|
||||
resp = requests.get("https://api.tidal.com/v1/sessions", headers=headers).json()
|
||||
if resp.get("status", 200) != 200:
|
||||
raise Exception(f"Login failed {resp=}")
|
||||
|
||||
if str(resp.get("userId")) != str(user_id):
|
||||
raise Exception(f"User id mismatch {resp['userId']} v {user_id}")
|
||||
|
||||
self.user_id = resp["userId"]
|
||||
self.country_code = resp["countryCode"]
|
||||
self.access_token = token
|
||||
|
||||
def get_tokens(self):
|
||||
return {
|
||||
k: getattr(self, k)
|
||||
for k in (
|
||||
"user_id",
|
||||
"country_code",
|
||||
"access_token",
|
||||
"refresh_token",
|
||||
"token_expiry",
|
||||
)
|
||||
}
|
||||
|
||||
def _api_get(self, item_id: str, media_type: str) -> dict:
|
||||
item = self._api_request(f"{media_type}s/{item_id}")
|
||||
if media_type in ("playlist", "album"):
|
||||
resp = self._api_request(f"{media_type}s/{item_id}/items")
|
||||
item["tracks"] = [item["item"] for item in resp["items"]]
|
||||
|
||||
return item
|
||||
|
||||
def _api_request(self, path, params=None) -> dict:
|
||||
if params is None:
|
||||
params = {}
|
||||
|
||||
headers = {"authorization": f"Bearer {self.access_token}"}
|
||||
params["countryCode"] = self.country_code
|
||||
params["limit"] = 100
|
||||
r = requests.get(f"{TIDAL_BASE}/{path}", headers=headers, params=params).json()
|
||||
return r
|
||||
|
||||
def _api_post(self, url, data, auth=None):
|
||||
r = requests.post(url, data=data, auth=auth, verify=False).json()
|
||||
return r
|
||||
|
|
|
@ -32,14 +32,18 @@ class Config:
|
|||
|
||||
defaults = {
|
||||
"qobuz": {
|
||||
"enabled": True,
|
||||
"email": None,
|
||||
"password": None,
|
||||
"app_id": "", # Avoid NoneType error
|
||||
"secrets": [],
|
||||
},
|
||||
"tidal": {"enabled": True, "email": None, "password": None},
|
||||
"deezer": {"enabled": True},
|
||||
"tidal": {
|
||||
"user_id": None,
|
||||
"country_code": None,
|
||||
"access_token": None,
|
||||
"refresh_token": None,
|
||||
"token_expiry": 0,
|
||||
},
|
||||
"database": {"enabled": True, "path": None},
|
||||
"conversion": {
|
||||
"enabled": False,
|
||||
|
@ -120,10 +124,7 @@ class Config:
|
|||
|
||||
@property
|
||||
def tidal_creds(self):
|
||||
return {
|
||||
"email": self.file["tidal"]["email"],
|
||||
"pwd": self.file["tidal"]["password"],
|
||||
}
|
||||
return self.file["tidal"]
|
||||
|
||||
@property
|
||||
def qobuz_creds(self):
|
||||
|
|
|
@ -4,6 +4,7 @@ import shutil
|
|||
import subprocess
|
||||
from tempfile import gettempdir
|
||||
from typing import Optional
|
||||
|
||||
from mutagen.flac import FLAC as FLAC_META
|
||||
|
||||
from .exceptions import ConversionError
|
||||
|
|
|
@ -63,18 +63,21 @@ class MusicDL(list):
|
|||
:param source:
|
||||
:type source: str
|
||||
"""
|
||||
click.secho(f"Enter {capitalize(source)} email:", fg="green")
|
||||
self.config.file[source]["email"] = input()
|
||||
click.secho(
|
||||
f"Enter {capitalize(source)} password (will not show on screen):",
|
||||
fg="green",
|
||||
)
|
||||
self.config.file[source]["password"] = getpass(
|
||||
prompt=""
|
||||
) # does hashing work for tidal?
|
||||
if source == "qobuz":
|
||||
click.secho(f"Enter {capitalize(source)} email:", fg="green")
|
||||
self.config.file[source]["email"] = input()
|
||||
click.secho(
|
||||
f"Enter {capitalize(source)} password (will not show on screen):",
|
||||
fg="green",
|
||||
)
|
||||
self.config.file[source]["password"] = getpass(
|
||||
prompt=""
|
||||
) # does hashing work for tidal?
|
||||
|
||||
self.config.save()
|
||||
click.secho(f'Credentials saved to config file at "{self.config._path}"')
|
||||
self.config.save()
|
||||
click.secho(f'Credentials saved to config file at "{self.config._path}"')
|
||||
else:
|
||||
raise Exception
|
||||
|
||||
def assert_creds(self, source: str):
|
||||
assert source in ("qobuz", "tidal", "deezer"), f"Invalid source {source}"
|
||||
|
@ -82,7 +85,7 @@ class MusicDL(list):
|
|||
# no login for deezer
|
||||
return
|
||||
|
||||
if (
|
||||
if source == "qobuz" and (
|
||||
self.config.file[source]["email"] is None
|
||||
or self.config.file[source]["password"] is None
|
||||
):
|
||||
|
@ -98,8 +101,13 @@ class MusicDL(list):
|
|||
"""
|
||||
for source, url_type, item_id in self.parse_urls(url):
|
||||
if item_id in self.db:
|
||||
logger.info(f"ID {item_id} already downloaded, use --no-db to override.")
|
||||
click.secho(f"ID {item_id} already downloaded, use --no-db to override.", fg='magenta')
|
||||
logger.info(
|
||||
f"ID {item_id} already downloaded, use --no-db to override."
|
||||
)
|
||||
click.secho(
|
||||
f"ID {item_id} already downloaded, use --no-db to override.",
|
||||
fg="magenta",
|
||||
)
|
||||
continue
|
||||
|
||||
self.handle_item(source, url_type, item_id)
|
||||
|
@ -180,6 +188,9 @@ class MusicDL(list):
|
|||
self.config.file["qobuz"]["secrets"],
|
||||
) = client.get_tokens()
|
||||
self.config.save()
|
||||
elif client.source == 'tidal':
|
||||
self.config.file['tidal'] = client.get_tokens()
|
||||
self.config.save()
|
||||
|
||||
def parse_urls(self, url: str) -> Tuple[str, str]:
|
||||
"""Returns the type of the url and the id.
|
||||
|
|
|
@ -2,6 +2,7 @@ import logging
|
|||
import os
|
||||
import re
|
||||
import shutil
|
||||
import sys
|
||||
from abc import ABC, abstractmethod
|
||||
from pprint import pformat, pprint
|
||||
from tempfile import gettempdir
|
||||
|
@ -32,6 +33,7 @@ from .exceptions import (
|
|||
from .metadata import TrackMetadata
|
||||
from .utils import (
|
||||
clean_format,
|
||||
decrypt_mqa_file,
|
||||
quality_id,
|
||||
safe_get,
|
||||
tidal_cover_url,
|
||||
|
@ -137,9 +139,9 @@ class Track:
|
|||
|
||||
@staticmethod
|
||||
def _get_tracklist(resp, source):
|
||||
if source in ("qobuz", "tidal"):
|
||||
if source == "qobuz":
|
||||
return resp["tracks"]["items"]
|
||||
elif source == "deezer":
|
||||
elif source in ("tidal", "deezer"):
|
||||
return resp["tracks"]
|
||||
|
||||
raise NotImplementedError(source)
|
||||
|
@ -226,7 +228,10 @@ class Track:
|
|||
else:
|
||||
raise InvalidSourceError(self.client.source)
|
||||
|
||||
shutil.move(temp_file, self.final_path)
|
||||
if dl_info.get("enc_key"):
|
||||
decrypt_mqa_file(temp_file, self.final_path, dl_info["enc_key"])
|
||||
else:
|
||||
shutil.move(temp_file, self.final_path)
|
||||
|
||||
if isinstance(database, MusicDB):
|
||||
database.add(self.id)
|
||||
|
@ -288,7 +293,10 @@ class Track:
|
|||
:raises IndexError
|
||||
"""
|
||||
|
||||
track = cls._get_tracklist(album, client.source)[pos]
|
||||
logger.debug(pos)
|
||||
tracklist = cls._get_tracklist(album, client.source)
|
||||
logger.debug(len(tracklist))
|
||||
track = tracklist[pos]
|
||||
meta = TrackMetadata(album=album, track=track, source=client.source)
|
||||
return cls(client=client, meta=meta, id=track["id"])
|
||||
|
||||
|
@ -431,7 +439,7 @@ class Track:
|
|||
sampling_rate=kwargs.get("sampling_rate"),
|
||||
remove_source=kwargs.get("remove_source", True),
|
||||
)
|
||||
click.secho(f"Converting {self!s}", fg='blue')
|
||||
click.secho(f"Converting {self!s}", fg="blue")
|
||||
engine.convert()
|
||||
|
||||
def get(self, *keys, default=None):
|
||||
|
@ -743,7 +751,7 @@ class Album(Tracklist):
|
|||
This uses a classmethod to convert an item into a Track object, which
|
||||
stores the metadata inside a TrackMetadata object.
|
||||
"""
|
||||
logging.debug("Loading tracks to album")
|
||||
logging.debug(f"Loading {self.tracktotal} tracks to album")
|
||||
for i in range(self.tracktotal):
|
||||
# append method inherited from superclass list
|
||||
self.append(
|
||||
|
@ -1091,6 +1099,14 @@ class Artist(Tracklist):
|
|||
def load_meta(self):
|
||||
"""Send an API call to get album info based on id."""
|
||||
self.meta = self.client.get(self.id, media_type="artist")
|
||||
<<<<<<< HEAD
|
||||
||||||| 24bf328
|
||||
# TODO find better fix for this
|
||||
self.name = self.meta['items'][0]['artist']['name']
|
||||
=======
|
||||
# TODO find better fix for this
|
||||
self.name = self.meta["items"][0]["artist"]["name"]
|
||||
>>>>>>> tidalmqa
|
||||
self._load_albums()
|
||||
|
||||
def _load_albums(self):
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
import base64
|
||||
import logging
|
||||
import logging.handlers as handlers
|
||||
import os
|
||||
|
@ -5,6 +6,8 @@ from string import Formatter
|
|||
from typing import Optional
|
||||
|
||||
import requests
|
||||
from Crypto.Cipher import AES
|
||||
from Crypto.Util import Counter
|
||||
from pathvalidate import sanitize_filename
|
||||
from tqdm import tqdm
|
||||
|
||||
|
@ -155,3 +158,34 @@ def init_log(
|
|||
|
||||
def capitalize(s: str) -> str:
|
||||
return s[0].upper() + s[1:]
|
||||
|
||||
|
||||
def decrypt_mqa_file(in_path, out_path, encryption_key):
|
||||
# Do not change this
|
||||
master_key = "UIlTTEMmmLfGowo/UC60x2H45W6MdGgTRfo/umg4754="
|
||||
|
||||
# Decode the base64 strings to ascii strings
|
||||
master_key = base64.b64decode(master_key)
|
||||
security_token = base64.b64decode(encryption_key)
|
||||
|
||||
# Get the IV from the first 16 bytes of the securityToken
|
||||
iv = security_token[:16]
|
||||
encrypted_st = security_token[16:]
|
||||
|
||||
# Initialize decryptor
|
||||
decryptor = AES.new(master_key, AES.MODE_CBC, iv)
|
||||
|
||||
# Decrypt the security token
|
||||
decrypted_st = decryptor.decrypt(encrypted_st)
|
||||
|
||||
# Get the audio stream decryption key and nonce from the decrypted security token
|
||||
key = decrypted_st[:16]
|
||||
nonce = decrypted_st[16:24]
|
||||
|
||||
counter = Counter.new(64, prefix=nonce, initial_value=0)
|
||||
decryptor = AES.new(key, AES.MODE_CTR, counter=counter)
|
||||
|
||||
with open(in_path, "rb") as enc_file:
|
||||
dec_bytes = decryptor.decrypt(enc_file.read())
|
||||
with open(out_path, "wb") as dec_file:
|
||||
dec_file.write(dec_bytes)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue