56 lines
2.1 KiB
Python
56 lines
2.1 KiB
Python
import logging
|
|
from datetime import datetime, timezone
|
|
|
|
from app.celery_app import celery_app
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@celery_app.task
|
|
def aggregate_daily_stats():
|
|
"""Aggregate daily statistics and store to Redis for dashboard consumption."""
|
|
from sqlalchemy import func, select
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.db.session import sync_engine
|
|
from app.models.comment import Comment
|
|
from app.models.event import Event
|
|
from app.models.favorite import Favorite
|
|
from app.models.rating import Rating
|
|
from app.models.shooting import ShootingRequest
|
|
from app.models.spot import Spot
|
|
from app.models.user import User
|
|
|
|
try:
|
|
with Session(sync_engine) as session:
|
|
stats = {
|
|
"users": session.execute(select(func.count(User.id))).scalar() or 0,
|
|
"spots": session.execute(select(func.count(Spot.id))).scalar() or 0,
|
|
"approved_spots": session.execute(
|
|
select(func.count(Spot.id)).where(Spot.audit_status == "approved")
|
|
).scalar() or 0,
|
|
"comments": session.execute(select(func.count(Comment.id))).scalar() or 0,
|
|
"ratings": session.execute(select(func.count(Rating.id))).scalar() or 0,
|
|
"favorites": session.execute(select(func.count(Favorite.id))).scalar() or 0,
|
|
"shootings": session.execute(select(func.count(ShootingRequest.id))).scalar() or 0,
|
|
"events": session.execute(select(func.count(Event.id))).scalar() or 0,
|
|
"generated_at": datetime.now(timezone.utc).isoformat(),
|
|
}
|
|
|
|
import redis
|
|
r = redis.from_url(str(_get_redis_url()))
|
|
r.hset("ciyuan:daily_stats", mapping={k: str(v) for k, v in stats.items()})
|
|
r.expire("ciyuan:daily_stats", 86400 * 2)
|
|
|
|
logger.info("Daily stats aggregated: %s", stats)
|
|
return stats
|
|
|
|
except Exception:
|
|
logger.exception("Failed to aggregate daily stats")
|
|
raise
|
|
|
|
|
|
def _get_redis_url():
|
|
from app.core.config import settings
|
|
return settings.REDIS_URL
|