switch everywhere to use Snapshot.pk and ArchiveResult.pk instead of id

This commit is contained in:
Nick Sweeting 2024-05-13 05:12:12 -07:00
parent 9733b8d04c
commit 0420662174
No known key found for this signature in database
15 changed files with 175 additions and 104 deletions

View file

@ -12,7 +12,7 @@ from signal_webhooks.models import WebhookBase
from django_stubs_ext.db.models import TypedModelMeta
from abid_utils.models import ABIDModel
from abid_utils.models import ABIDModel, ABIDField
def generate_secret_token() -> str:
@ -21,7 +21,15 @@ def generate_secret_token() -> str:
class APIToken(ABIDModel):
abid_prefix = 'apt'
abid_ts_src = 'self.created'
abid_uri_src = 'self.token'
abid_subtype_src = 'self.user_id'
abid_rand_src = 'self.id'
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=True)
uuid = models.UUIDField(blank=True, null=True, editable=True, unique=True)
abid = ABIDField(prefix=abid_prefix)
user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE)
token = models.CharField(max_length=32, default=generate_secret_token, unique=True)
@ -42,7 +50,8 @@ class APIToken(ABIDModel):
def __json__(self) -> dict:
return {
"TYPE": "APIToken",
"id": str(self.id),
"uuid": str(self.id),
"abid": str(self.calculate_abid()),
"user_id": str(self.user.id),
"user_username": self.user.username,
"token": self.token,
@ -77,9 +86,14 @@ class OutboundWebhook(ABIDModel, WebhookBase):
Model used in place of (extending) signals_webhooks.models.WebhookModel. Swapped using:
settings.SIGNAL_WEBHOOKS_CUSTOM_MODEL = 'api.models.OutboundWebhook'
"""
ID_PREFIX = 'whk'
abid_prefix = 'whk'
abid_ts_src = 'self.created'
abid_uri_src = 'self.endpoint'
abid_subtype_src = 'self.ref'
abid_rand_src = 'self.id'
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=True)
uuid = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=True)
abid = ABIDField(prefix=abid_prefix)
WebhookBase._meta.get_field('name').help_text = (
'Give your webhook a descriptive name (e.g. Notify ACME Slack channel of any new ArchiveResults).')
@ -92,3 +106,4 @@ class OutboundWebhook(ABIDModel, WebhookBase):
class Meta(WebhookBase.Meta):
verbose_name = 'API Outbound Webhook'

View file

@ -47,6 +47,6 @@ def check_api_token(request, token_data: TokenAuthSchema):
request=request,
)
if user:
return {"success": True, "user_id": str(user.id)}
return {"success": True, "user_id": str(user.pk)}
return {"success": False, "user_id": None}

View file

@ -10,7 +10,7 @@ from ninja import Router, Schema, FilterSchema, Field, Query
from ninja.pagination import paginate
from core.models import Snapshot, ArchiveResult, Tag
from abid_utils.abid import ABID
router = Router(tags=['Core Models'])
@ -20,9 +20,12 @@ router = Router(tags=['Core Models'])
### ArchiveResult #########################################################################
class ArchiveResultSchema(Schema):
id: UUID
pk: str
uuid: UUID
abid: str
snapshot_abid: str
snapshot_id: UUID
snapshot_url: str
snapshot_tags: str
@ -36,8 +39,16 @@ class ArchiveResultSchema(Schema):
created: datetime
@staticmethod
def resolve_id(obj):
return obj.uuid
def resolve_pk(obj):
return str(obj.pk)
@staticmethod
def resolve_uuid(obj):
return str(obj.uuid)
@staticmethod
def resolve_abid(obj):
return str(obj.ABID)
@staticmethod
def resolve_created(obj):
@ -47,16 +58,21 @@ class ArchiveResultSchema(Schema):
def resolve_snapshot_url(obj):
return obj.snapshot.url
@staticmethod
def resolve_snapshot_abid(obj):
return str(obj.snapshot.ABID)
@staticmethod
def resolve_snapshot_tags(obj):
return obj.snapshot.tags_str()
class ArchiveResultFilterSchema(FilterSchema):
id: Optional[UUID] = Field(None, q='uuid')
uuid: Optional[UUID] = Field(None, q='uuid')
# abid: Optional[str] = Field(None, q='abid')
search: Optional[str] = Field(None, q=['snapshot__url__icontains', 'snapshot__title__icontains', 'snapshot__tags__name__icontains', 'extractor', 'output__icontains'])
snapshot_id: Optional[UUID] = Field(None, q='snapshot_id')
snapshot_uuid: Optional[UUID] = Field(None, q='snapshot_uuid')
snapshot_url: Optional[str] = Field(None, q='snapshot__url')
snapshot_tag: Optional[str] = Field(None, q='snapshot__tags__name')
@ -115,7 +131,9 @@ def get_archiveresult(request, archiveresult_id: str):
class SnapshotSchema(Schema):
id: UUID
pk: str
uuid: UUID
abid: str
url: str
tags: str
@ -128,9 +146,17 @@ class SnapshotSchema(Schema):
archiveresults: List[ArchiveResultSchema]
# @staticmethod
# def resolve_id(obj):
# return str(obj.id)
@staticmethod
def resolve_pk(obj):
return str(obj.pk)
@staticmethod
def resolve_uuid(obj):
return str(obj.uuid)
@staticmethod
def resolve_abid(obj):
return str(obj.ABID)
@staticmethod
def resolve_tags(obj):
@ -167,10 +193,10 @@ def list_snapshots(request, filters: SnapshotFilterSchema = Query(...), with_arc
results = filters.filter(qs)
return results
@router.get("/snapshot/{snapshot_id}", response=SnapshotSchema)
def get_snapshot(request, snapshot_id: str, with_archiveresults: bool=True):
@router.get("/snapshot/{snapshot_uuid}", response=SnapshotSchema)
def get_snapshot(request, snapshot_uuid: str, with_archiveresults: bool=True):
request.with_archiveresults = with_archiveresults
snapshot = get_object_or_404(Snapshot, id=snapshot_id)
snapshot = get_object_or_404(Snapshot, uuid=snapshot_uuid)
return snapshot
@ -179,9 +205,9 @@ def get_snapshot(request, snapshot_id: str, with_archiveresults: bool=True):
# snapshot = Snapshot.objects.create(**payload.dict())
# return snapshot
#
# @router.put("/snapshot/{snapshot_id}", response=SnapshotSchema)
# def update_snapshot(request, snapshot_id: str, payload: SnapshotSchema):
# snapshot = get_object_or_404(Snapshot, id=snapshot_id)
# @router.put("/snapshot/{snapshot_uuid}", response=SnapshotSchema)
# def update_snapshot(request, snapshot_uuid: str, payload: SnapshotSchema):
# snapshot = get_object_or_404(Snapshot, uuid=snapshot_uuid)
#
# for attr, value in payload.dict().items():
# setattr(snapshot, attr, value)
@ -189,9 +215,9 @@ def get_snapshot(request, snapshot_id: str, with_archiveresults: bool=True):
#
# return snapshot
#
# @router.delete("/snapshot/{snapshot_id}")
# def delete_snapshot(request, snapshot_id: str):
# snapshot = get_object_or_404(Snapshot, id=snapshot_id)
# @router.delete("/snapshot/{snapshot_uuid}")
# def delete_snapshot(request, snapshot_uuid: str):
# snapshot = get_object_or_404(Snapshot, uuid=snapshot_uuid)
# snapshot.delete()
# return {"success": True}