package httplog import ( "context" "encoding/json" "os" "sync" "sync/atomic" "time" "github.com/cloudwego/hertz/pkg/app" "github.com/cloudwego/hertz/pkg/common/tracer/stats" "github.com/go-redis/redis/v8" ) type AccessEvent struct { Timestamp string `json:"@timestamp"` TsMs int64 `json:"ts_ms"` Service string `json:"service,omitempty"` Instance string `json:"instance,omitempty"` Method string `json:"method"` Host string `json:"host,omitempty"` Path string `json:"path"` UserID uint `json:"user_id,omitempty"` Body string `json:"body,omitempty"` Query string `json:"query,omitempty"` Result string `json:"result,omitempty"` ResultTruncated bool `json:"result_truncated,omitempty"` RequestURI string `json:"request_uri,omitempty"` Route string `json:"route,omitempty"` Status int `json:"status"` CostMs int64 `json:"cost_ms"` RecvBytes int `json:"recv_bytes"` SendBytes int `json:"send_bytes"` RemoteAddr string `json:"remote_addr,omitempty"` ClientIP string `json:"client_ip,omitempty"` UserAgent string `json:"ua,omitempty"` Referer string `json:"referer,omitempty"` RequestID string `json:"request_id,omitempty"` Error string `json:"error,omitempty"` Panicked bool `json:"panicked"` PanicValue string `json:"panic_value"` } // UserIDExtractor 从请求上下文中提取用户 ID 的函数签名。 // 不同项目可根据自身认证方式实现此函数。 type UserIDExtractor func(c *app.RequestContext) uint type RedisListWriter struct { rdb *redis.Client key string ch chan []byte flushInterval time.Duration maxBatch int ctx context.Context cancel context.CancelFunc wg sync.WaitGroup dropped uint64 } func NewRedisListWriter(rdb *redis.Client, key string, queueSize int, maxBatch int, flushInterval time.Duration) *RedisListWriter { ctx, cancel := context.WithCancel(context.Background()) w := &RedisListWriter{ rdb: rdb, key: key, ch: make(chan []byte, queueSize), flushInterval: flushInterval, maxBatch: maxBatch, ctx: ctx, cancel: cancel, } w.wg.Add(1) go w.loop() return w } func (w *RedisListWriter) Enqueue(b []byte) { select { case w.ch <- b: default: atomic.AddUint64(&w.dropped, 1) } } func (w *RedisListWriter) Dropped() uint64 { return atomic.LoadUint64(&w.dropped) } func (w *RedisListWriter) Close(ctx context.Context) error { w.cancel() done := make(chan struct{}) go func() { w.wg.Wait() close(done) }() select { case <-ctx.Done(): return ctx.Err() case <-done: return nil } } func (w *RedisListWriter) loop() { defer w.wg.Done() ticker := time.NewTicker(w.flushInterval) defer ticker.Stop() flush := func(buf [][]byte) { if len(buf) == 0 { return } args := make([]interface{}, 0, len(buf)) for _, b := range buf { args = append(args, string(b)) } cctx, cancel := context.WithTimeout(w.ctx, 2*time.Second) _ = w.rdb.RPush(cctx, w.key, args...).Err() cancel() } var buf [][]byte for { select { case <-w.ctx.Done(): flush(buf) return case b := <-w.ch: buf = append(buf, b) if len(buf) >= w.maxBatch { flush(buf) buf = buf[:0] } case <-ticker.C: flush(buf) buf = buf[:0] } } } type TracerOption func(*RedisAccessLogTracer) // WithUserIDExtractor 设置自定义的用户 ID 提取函数 func WithUserIDExtractor(fn UserIDExtractor) TracerOption { return func(t *RedisAccessLogTracer) { t.userIDFn = fn } } // WithSkipPrefix 设置需要跳过日志记录的路径前缀(如 /health, /metrics) func WithSkipPrefix(prefix string) TracerOption { return func(t *RedisAccessLogTracer) { t.skipPrefix = prefix } } // WithMaxResponseBody 设置响应体采集的最大字节数 func WithMaxResponseBody(n int) TracerOption { return func(t *RedisAccessLogTracer) { t.maxRespBody = n } } type RedisAccessLogTracer struct { writer *RedisListWriter service string instance string skipPrefix string maxRespBody int userIDFn UserIDExtractor } func NewRedisAccessLogTracer(rdb *redis.Client, redisKey string, service string, opts ...TracerOption) *RedisAccessLogTracer { host, _ := os.Hostname() t := &RedisAccessLogTracer{ writer: NewRedisListWriter(rdb, redisKey, 8192, 512, 200*time.Millisecond), service: service, instance: host, maxRespBody: 2048, } for _, opt := range opts { opt(t) } return t } func (t *RedisAccessLogTracer) Start(ctx context.Context, _ *app.RequestContext) context.Context { return ctx } func (t *RedisAccessLogTracer) Finish(_ context.Context, c *app.RequestContext) { if t.skipPrefix != "" { p := string(c.Request.URI().PathOriginal()) if len(p) >= len(t.skipPrefix) && p[:len(t.skipPrefix)] == t.skipPrefix { return } } ti := c.GetTraceInfo() st := ti.Stats() var cost time.Duration if rpcStart := st.GetEvent(stats.HTTPStart); rpcStart != nil { if rpcFinish := st.GetEvent(stats.HTTPFinish); rpcFinish != nil { cost = rpcFinish.Time().Sub(rpcStart.Time()) } } now := time.Now().UTC() req := &c.Request uri := req.URI() remoteAddr := "" if ra := c.RemoteAddr(); ra != nil { remoteAddr = ra.String() } errStr := "" if st.Error() != nil { errStr = st.Error().Error() } panicked, panicVal := st.Panicked() panicStr := "" if panicVal != nil { panicStr = anyToString(panicVal) } ev := AccessEvent{ Timestamp: now.Format(time.RFC3339Nano), TsMs: now.UnixMilli(), Service: t.service, Instance: t.instance, Method: string(req.Method()), Host: string(req.Host()), Path: string(uri.PathOriginal()), Query: string(uri.QueryString()), RequestURI: string(uri.RequestURI()), Status: c.Response.StatusCode(), CostMs: cost.Milliseconds(), RecvBytes: st.RecvSize(), SendBytes: st.SendSize(), RemoteAddr: remoteAddr, ClientIP: c.ClientIP(), UserAgent: string(req.Header.UserAgent()), Referer: string(req.Header.Peek("Referer")), RequestID: string(req.Header.Peek("X-Request-Id")), Error: errStr, Panicked: panicked, PanicValue: panicStr, } if t.userIDFn != nil { ev.UserID = t.userIDFn(c) } if formJSON, ok, _ := FormBodyToJSONWithFilePlaceholder(c); ok { ev.Body = string(formJSON) } if respBody, ok, trunc := ResponseBodySnippet(c, t.maxRespBody); ok { ev.Result = respBody ev.ResultTruncated = trunc } b, err := json.Marshal(ev) if err != nil { return } t.writer.Enqueue(b) } func (t *RedisAccessLogTracer) Close(ctx context.Context) error { return t.writer.Close(ctx) } func anyToString(v interface{}) string { switch x := v.(type) { case string: return x default: b, _ := json.Marshal(x) return string(b) } }