import logging from datetime import datetime, timezone from app.celery_app import celery_app logger = logging.getLogger(__name__) @celery_app.task def aggregate_daily_stats(): """Aggregate daily statistics and store to Redis for dashboard consumption.""" from sqlalchemy import func, select from sqlalchemy.orm import Session from app.db.session import sync_engine from app.models.comment import Comment from app.models.event import Event from app.models.favorite import Favorite from app.models.rating import Rating from app.models.shooting import ShootingRequest from app.models.spot import Spot from app.models.user import User try: with Session(sync_engine) as session: stats = { "users": session.execute(select(func.count(User.id))).scalar() or 0, "spots": session.execute(select(func.count(Spot.id))).scalar() or 0, "approved_spots": session.execute( select(func.count(Spot.id)).where(Spot.audit_status == "approved") ).scalar() or 0, "comments": session.execute(select(func.count(Comment.id))).scalar() or 0, "ratings": session.execute(select(func.count(Rating.id))).scalar() or 0, "favorites": session.execute(select(func.count(Favorite.id))).scalar() or 0, "shootings": session.execute(select(func.count(ShootingRequest.id))).scalar() or 0, "events": session.execute(select(func.count(Event.id))).scalar() or 0, "generated_at": datetime.now(timezone.utc).isoformat(), } import redis r = redis.from_url(str(_get_redis_url())) r.hset("ciyuan:daily_stats", mapping={k: str(v) for k, v in stats.items()}) r.expire("ciyuan:daily_stats", 86400 * 2) logger.info("Daily stats aggregated: %s", stats) return stats except Exception: logger.exception("Failed to aggregate daily stats") raise def _get_redis_url(): from app.core.config import settings return settings.REDIS_URL