idconvert/backend/core/pocketbase.py

103 lines
4.1 KiB
Python

"""Async HTTP client for PocketBase REST API.
All PocketBase I/O goes through this module. Repositories import functions
from here — they do not construct httpx clients directly.
"""
from __future__ import annotations
import httpx
import structlog
from config import settings
log = structlog.get_logger()
# ── Internal helpers ──────────────────────────────────────────────────────────
def _client() -> httpx.AsyncClient:
return httpx.AsyncClient(base_url=settings.pocketbase_url, timeout=10.0)
# ── User operations ───────────────────────────────────────────────────────────
async def get_user_by_token(token: str) -> dict:
"""Validate a PocketBase auth token and return the user record.
Raises httpx.HTTPStatusError if the token is invalid or expired.
"""
async with _client() as client:
resp = await client.get(
'/api/collections/users/auth-refresh',
headers={'Authorization': f'Bearer {token}'},
)
resp.raise_for_status()
body = resp.json()
record = body['record']
record['token'] = body['token'] # refreshed token
return record
async def get_user_by_id(user_id: str, admin_token: str) -> dict:
"""Fetch a user record by ID using an admin token."""
async with _client() as client:
resp = await client.get(
f'/api/collections/users/records/{user_id}',
headers={'Authorization': f'Bearer {admin_token}'},
)
resp.raise_for_status()
return resp.json()
# ── Conversion operations ─────────────────────────────────────────────────────
async def create_conversion(user_id: str, file_hash: str, admin_token: str) -> str:
"""Create a conversion record. Triggers the credit-deduction hook atomically.
Returns the new record ID. Raises httpx.HTTPStatusError on failure,
including INSUFFICIENT_CREDITS from the hook.
"""
async with _client() as client:
resp = await client.post(
'/api/collections/conversions/records',
json={'user': user_id, 'file_hash': file_hash, 'status': 'pending'},
headers={'Authorization': f'Bearer {admin_token}'},
)
resp.raise_for_status()
return resp.json()['id']
async def update_conversion_status(record_id: str, status: str, admin_token: str) -> None:
"""Update conversion status to 'complete' or 'failed'."""
async with _client() as client:
resp = await client.patch(
f'/api/collections/conversions/records/{record_id}',
json={'status': status},
headers={'Authorization': f'Bearer {admin_token}'},
)
resp.raise_for_status()
async def update_conversion_download_url(record_id: str, url: str, admin_token: str) -> None:
"""Store the MinIO presigned download URL on the conversion record."""
async with _client() as client:
resp = await client.patch(
f'/api/collections/conversions/records/{record_id}',
json={'status': 'complete', 'download_url': url},
headers={'Authorization': f'Bearer {admin_token}'},
)
resp.raise_for_status()
# ── Purchase operations ───────────────────────────────────────────────────────
async def create_purchase(user_id: str, credits: int, stripe_id: str, admin_token: str) -> str:
"""Create a purchase record. Triggers the credit top-up hook atomically."""
async with _client() as client:
resp = await client.post(
'/api/collections/purchases/records',
json={'user': user_id, 'credits': credits, 'stripe_id': stripe_id},
headers={'Authorization': f'Bearer {admin_token}'},
)
resp.raise_for_status()
return resp.json()['id']