from __future__ import annotations import os import uuid from abc import ABC, abstractmethod from pathlib import Path import boto3 from botocore.config import Config as BotoConfig from app.core.config import settings class StorageBackend(ABC): @abstractmethod def upload(self, file_data: bytes, filename: str, content_type: str = "") -> str: """Upload file and return its public URL.""" @abstractmethod def delete(self, key: str) -> None: """Delete a file by key/path.""" @abstractmethod def generate_presigned_url(self, key: str, content_type: str = "", expires: int = 3600) -> str: """Generate a presigned upload URL (S3-like backends only).""" class LocalStorageBackend(StorageBackend): def __init__(self, storage_path: str, serve_url_prefix: str = "/uploads"): self.storage_path = Path(storage_path) self.storage_path.mkdir(parents=True, exist_ok=True) self.serve_url_prefix = serve_url_prefix.rstrip("/") def _make_key(self, filename: str) -> str: ext = Path(filename).suffix return f"{uuid.uuid4().hex}{ext}" def upload(self, file_data: bytes, filename: str, content_type: str = "") -> str: key = self._make_key(filename) dest = self.storage_path / key dest.write_bytes(file_data) return f"{self.serve_url_prefix}/{key}" def delete(self, key: str) -> None: path = key.lstrip("/") if path.startswith("uploads/"): path = path[len("uploads/"):] target = self.storage_path / path if target.exists(): target.unlink() def generate_presigned_url(self, key: str, content_type: str = "", expires: int = 3600) -> str: raise NotImplementedError("Local storage does not support presigned URLs") class S3StorageBackend(StorageBackend): """Compatible with MinIO, Aliyun OSS, Tencent COS, and AWS S3.""" def __init__( self, endpoint: str, access_key: str, secret_key: str, bucket: str, region: str = "", public_url: str = "", ): self.bucket = bucket self.public_url = public_url.rstrip("/") if public_url else "" kwargs: dict = { "endpoint_url": endpoint, "aws_access_key_id": access_key, "aws_secret_access_key": secret_key, "config": BotoConfig(signature_version="s3v4"), } if region: kwargs["region_name"] = region self.client = boto3.client("s3", **kwargs) try: self.client.head_bucket(Bucket=bucket) except Exception: try: self.client.create_bucket(Bucket=bucket) except Exception: pass def _make_key(self, filename: str) -> str: ext = Path(filename).suffix return f"images/{uuid.uuid4().hex}{ext}" def _get_url(self, key: str) -> str: if self.public_url: return f"{self.public_url}/{key}" return f"{self.client.meta.endpoint_url}/{self.bucket}/{key}" def upload(self, file_data: bytes, filename: str, content_type: str = "") -> str: key = self._make_key(filename) extra = {} if content_type: extra["ContentType"] = content_type self.client.put_object(Bucket=self.bucket, Key=key, Body=file_data, **extra) return self._get_url(key) def delete(self, key: str) -> None: self.client.delete_object(Bucket=self.bucket, Key=key) def generate_presigned_url(self, key: str, content_type: str = "", expires: int = 3600) -> str: params: dict = {"Bucket": self.bucket, "Key": key} if content_type: params["ContentType"] = content_type return self.client.generate_presigned_url( "put_object", Params=params, ExpiresIn=expires ) def get_storage_backend() -> StorageBackend: backend = settings.STORAGE_BACKEND.lower() if backend == "s3": return S3StorageBackend( endpoint=settings.S3_ENDPOINT, access_key=settings.S3_ACCESS_KEY, secret_key=settings.S3_SECRET_KEY, bucket=settings.S3_BUCKET, region=settings.S3_REGION, public_url=settings.S3_PUBLIC_URL, ) return LocalStorageBackend( storage_path=settings.LOCAL_STORAGE_PATH, ) storage: StorageBackend | None = None def init_storage() -> StorageBackend: global storage storage = get_storage_backend() return storage