mirror of
https://github.com/ArchiveBox/ArchiveBox.git
synced 2025-05-21 10:25:11 -04:00
big overhaul of REST API, split into auth, core, and cli methods
This commit is contained in:
parent
e5aba0dc2e
commit
75153252dc
20 changed files with 790 additions and 265 deletions
archivebox/api
|
@ -1,48 +1,107 @@
|
|||
__package__ = 'archivebox.api'
|
||||
|
||||
from typing import Optional
|
||||
|
||||
from django.http import HttpRequest
|
||||
from django.contrib.auth import login
|
||||
from django.contrib.auth import authenticate
|
||||
from ninja import Form, Router, Schema
|
||||
from ninja.security import HttpBearer
|
||||
from django.contrib.auth.models import AbstractBaseUser
|
||||
|
||||
from api.models import Token
|
||||
|
||||
router = Router()
|
||||
from ninja.security import HttpBearer, APIKeyQuery, APIKeyHeader, HttpBasicAuth, django_auth_superuser
|
||||
|
||||
|
||||
class GlobalAuth(HttpBearer):
|
||||
def authenticate(self, request, token):
|
||||
def auth_using_token(token, request: Optional[HttpRequest]=None) -> Optional[AbstractBaseUser]:
|
||||
"""Given an API token string, check if a corresponding non-expired APIToken exists, and return its user"""
|
||||
from api.models import APIToken # lazy import model to avoid loading it at urls.py import time
|
||||
|
||||
user = None
|
||||
|
||||
submitted_empty_form = token in ('string', '', None)
|
||||
if submitted_empty_form:
|
||||
user = request.user # see if user is authed via django session and use that as the default
|
||||
else:
|
||||
try:
|
||||
return Token.objects.get(token=token).user
|
||||
except Token.DoesNotExist:
|
||||
token = APIToken.objects.get(token=token)
|
||||
if token.is_valid():
|
||||
user = token.user
|
||||
except APIToken.DoesNotExist:
|
||||
pass
|
||||
|
||||
if not user:
|
||||
print('[❌] Failed to authenticate API user using API Key:', request)
|
||||
|
||||
class AuthSchema(Schema):
|
||||
email: str
|
||||
password: str
|
||||
return None
|
||||
|
||||
|
||||
@router.post("/authenticate", auth=None) # overriding global auth
|
||||
def get_token(request, auth_data: AuthSchema):
|
||||
user = authenticate(username=auth_data.email, password=auth_data.password)
|
||||
if user:
|
||||
# Assuming a user can have multiple tokens and you want to create a new one every time
|
||||
new_token = Token.objects.create(user=user)
|
||||
return {"token": new_token.token, "expires": new_token.expiry_as_iso8601}
|
||||
def auth_using_password(username, password, request: Optional[HttpRequest]=None) -> Optional[AbstractBaseUser]:
|
||||
"""Given a username and password, check if they are valid and return the corresponding user"""
|
||||
user = None
|
||||
|
||||
submitted_empty_form = (username, password) in (('string', 'string'), ('', ''), (None, None))
|
||||
if submitted_empty_form:
|
||||
user = request.user # see if user is authed via django session and use that as the default
|
||||
else:
|
||||
return {"error": "Invalid credentials"}
|
||||
user = authenticate(
|
||||
username=username,
|
||||
password=password,
|
||||
)
|
||||
|
||||
if not user:
|
||||
print('[❌] Failed to authenticate API user using API Key:', request)
|
||||
|
||||
return user
|
||||
|
||||
|
||||
class TokenValidationSchema(Schema):
|
||||
token: str
|
||||
### Base Auth Types
|
||||
|
||||
class APITokenAuthCheck:
|
||||
"""The base class for authentication methods that use an api.models.APIToken"""
|
||||
def authenticate(self, request: HttpRequest, key: Optional[str]=None) -> Optional[AbstractBaseUser]:
|
||||
user = auth_using_token(
|
||||
token=key,
|
||||
request=request,
|
||||
)
|
||||
if user is not None:
|
||||
login(request, user, backend='django.contrib.auth.backends.ModelBackend')
|
||||
return user
|
||||
|
||||
class UserPassAuthCheck:
|
||||
"""The base class for authentication methods that use a username & password"""
|
||||
def authenticate(self, request: HttpRequest, username: Optional[str]=None, password: Optional[str]=None) -> Optional[AbstractBaseUser]:
|
||||
user = auth_using_password(
|
||||
username=username,
|
||||
password=password,
|
||||
request=request,
|
||||
)
|
||||
if user is not None:
|
||||
login(request, user, backend='django.contrib.auth.backends.ModelBackend')
|
||||
return user
|
||||
|
||||
|
||||
@router.post("/validate_token", auth=None) # No authentication required for this endpoint
|
||||
def validate_token(request, token_data: TokenValidationSchema):
|
||||
try:
|
||||
# Attempt to authenticate using the provided token
|
||||
user = GlobalAuth().authenticate(request, token_data.token)
|
||||
if user:
|
||||
return {"status": "valid"}
|
||||
else:
|
||||
return {"status": "invalid"}
|
||||
except Token.DoesNotExist:
|
||||
return {"status": "invalid"}
|
||||
### Django-Ninja-Provided Auth Methods
|
||||
|
||||
class UsernameAndPasswordAuth(UserPassAuthCheck, HttpBasicAuth):
|
||||
"""Allow authenticating by passing username & password via HTTP Basic Authentication (not recommended)"""
|
||||
pass
|
||||
|
||||
class QueryParamTokenAuth(APITokenAuthCheck, APIKeyQuery):
|
||||
"""Allow authenticating by passing api_key=xyz as a GET/POST query parameter"""
|
||||
param_name = "api_key"
|
||||
|
||||
class HeaderTokenAuth(APITokenAuthCheck, APIKeyHeader):
|
||||
"""Allow authenticating by passing X-API-Key=xyz as a request header"""
|
||||
param_name = "X-API-Key"
|
||||
|
||||
class BearerTokenAuth(APITokenAuthCheck, HttpBearer):
|
||||
"""Allow authenticating by passing Bearer=xyz as a request header"""
|
||||
pass
|
||||
|
||||
|
||||
### Enabled Auth Methods
|
||||
|
||||
API_AUTH_METHODS = [
|
||||
QueryParamTokenAuth(),
|
||||
HeaderTokenAuth(),
|
||||
BearerTokenAuth(),
|
||||
django_auth_superuser,
|
||||
UsernameAndPasswordAuth(),
|
||||
]
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue