6050d11f27
- 创建基于 CloudWego Hertz 的 Go 微服务脚手架 - 集成 Nacos 服务注册/发现功能 - 添加 gRPC 客户端支持 - 实现环境变量配置管理 (.env.example) - 添加 HTTP 中间件 (Recovery, AccessLog, CORS) - 配置 Gitea CI/CD 构建部署流程 BREAKING CHANGE: 项目结构调整,从简单的 API 服务升级为完整的微服务架构
286 lines
6.8 KiB
Go
286 lines
6.8 KiB
Go
package httplog
|
||
|
||
import (
|
||
"context"
|
||
"encoding/json"
|
||
"os"
|
||
"sync"
|
||
"sync/atomic"
|
||
"time"
|
||
|
||
"github.com/cloudwego/hertz/pkg/app"
|
||
"github.com/cloudwego/hertz/pkg/common/tracer/stats"
|
||
"github.com/go-redis/redis/v8"
|
||
)
|
||
|
||
type AccessEvent struct {
|
||
Timestamp string `json:"@timestamp"`
|
||
TsMs int64 `json:"ts_ms"`
|
||
Service string `json:"service,omitempty"`
|
||
Instance string `json:"instance,omitempty"`
|
||
Method string `json:"method"`
|
||
Host string `json:"host,omitempty"`
|
||
Path string `json:"path"`
|
||
UserID uint `json:"user_id,omitempty"`
|
||
Body string `json:"body,omitempty"`
|
||
Query string `json:"query,omitempty"`
|
||
Result string `json:"result,omitempty"`
|
||
ResultTruncated bool `json:"result_truncated,omitempty"`
|
||
RequestURI string `json:"request_uri,omitempty"`
|
||
Route string `json:"route,omitempty"`
|
||
Status int `json:"status"`
|
||
CostMs int64 `json:"cost_ms"`
|
||
RecvBytes int `json:"recv_bytes"`
|
||
SendBytes int `json:"send_bytes"`
|
||
RemoteAddr string `json:"remote_addr,omitempty"`
|
||
ClientIP string `json:"client_ip,omitempty"`
|
||
UserAgent string `json:"ua,omitempty"`
|
||
Referer string `json:"referer,omitempty"`
|
||
RequestID string `json:"request_id,omitempty"`
|
||
Error string `json:"error,omitempty"`
|
||
Panicked bool `json:"panicked"`
|
||
PanicValue string `json:"panic_value"`
|
||
}
|
||
|
||
// UserIDExtractor 从请求上下文中提取用户 ID 的函数签名。
|
||
// 不同项目可根据自身认证方式实现此函数。
|
||
type UserIDExtractor func(c *app.RequestContext) uint
|
||
|
||
type RedisListWriter struct {
|
||
rdb *redis.Client
|
||
key string
|
||
ch chan []byte
|
||
flushInterval time.Duration
|
||
maxBatch int
|
||
|
||
ctx context.Context
|
||
cancel context.CancelFunc
|
||
wg sync.WaitGroup
|
||
|
||
dropped uint64
|
||
}
|
||
|
||
func NewRedisListWriter(rdb *redis.Client, key string, queueSize int, maxBatch int, flushInterval time.Duration) *RedisListWriter {
|
||
ctx, cancel := context.WithCancel(context.Background())
|
||
w := &RedisListWriter{
|
||
rdb: rdb,
|
||
key: key,
|
||
ch: make(chan []byte, queueSize),
|
||
flushInterval: flushInterval,
|
||
maxBatch: maxBatch,
|
||
ctx: ctx,
|
||
cancel: cancel,
|
||
}
|
||
w.wg.Add(1)
|
||
go w.loop()
|
||
return w
|
||
}
|
||
|
||
func (w *RedisListWriter) Enqueue(b []byte) {
|
||
select {
|
||
case w.ch <- b:
|
||
default:
|
||
atomic.AddUint64(&w.dropped, 1)
|
||
}
|
||
}
|
||
|
||
func (w *RedisListWriter) Dropped() uint64 { return atomic.LoadUint64(&w.dropped) }
|
||
|
||
func (w *RedisListWriter) Close(ctx context.Context) error {
|
||
w.cancel()
|
||
done := make(chan struct{})
|
||
go func() {
|
||
w.wg.Wait()
|
||
close(done)
|
||
}()
|
||
select {
|
||
case <-ctx.Done():
|
||
return ctx.Err()
|
||
case <-done:
|
||
return nil
|
||
}
|
||
}
|
||
|
||
func (w *RedisListWriter) loop() {
|
||
defer w.wg.Done()
|
||
|
||
ticker := time.NewTicker(w.flushInterval)
|
||
defer ticker.Stop()
|
||
|
||
flush := func(buf [][]byte) {
|
||
if len(buf) == 0 {
|
||
return
|
||
}
|
||
args := make([]interface{}, 0, len(buf))
|
||
for _, b := range buf {
|
||
args = append(args, string(b))
|
||
}
|
||
cctx, cancel := context.WithTimeout(w.ctx, 2*time.Second)
|
||
_ = w.rdb.RPush(cctx, w.key, args...).Err()
|
||
cancel()
|
||
}
|
||
|
||
var buf [][]byte
|
||
for {
|
||
select {
|
||
case <-w.ctx.Done():
|
||
flush(buf)
|
||
return
|
||
case b := <-w.ch:
|
||
buf = append(buf, b)
|
||
if len(buf) >= w.maxBatch {
|
||
flush(buf)
|
||
buf = buf[:0]
|
||
}
|
||
case <-ticker.C:
|
||
flush(buf)
|
||
buf = buf[:0]
|
||
}
|
||
}
|
||
}
|
||
|
||
type TracerOption func(*RedisAccessLogTracer)
|
||
|
||
// WithUserIDExtractor 设置自定义的用户 ID 提取函数
|
||
func WithUserIDExtractor(fn UserIDExtractor) TracerOption {
|
||
return func(t *RedisAccessLogTracer) {
|
||
t.userIDFn = fn
|
||
}
|
||
}
|
||
|
||
// WithSkipPrefix 设置需要跳过日志记录的路径前缀(如 /health, /metrics)
|
||
func WithSkipPrefix(prefix string) TracerOption {
|
||
return func(t *RedisAccessLogTracer) {
|
||
t.skipPrefix = prefix
|
||
}
|
||
}
|
||
|
||
// WithMaxResponseBody 设置响应体采集的最大字节数
|
||
func WithMaxResponseBody(n int) TracerOption {
|
||
return func(t *RedisAccessLogTracer) {
|
||
t.maxRespBody = n
|
||
}
|
||
}
|
||
|
||
type RedisAccessLogTracer struct {
|
||
writer *RedisListWriter
|
||
service string
|
||
instance string
|
||
skipPrefix string
|
||
maxRespBody int
|
||
userIDFn UserIDExtractor
|
||
}
|
||
|
||
func NewRedisAccessLogTracer(rdb *redis.Client, redisKey string, service string, opts ...TracerOption) *RedisAccessLogTracer {
|
||
host, _ := os.Hostname()
|
||
t := &RedisAccessLogTracer{
|
||
writer: NewRedisListWriter(rdb, redisKey, 8192, 512, 200*time.Millisecond),
|
||
service: service,
|
||
instance: host,
|
||
maxRespBody: 2048,
|
||
}
|
||
for _, opt := range opts {
|
||
opt(t)
|
||
}
|
||
return t
|
||
}
|
||
|
||
func (t *RedisAccessLogTracer) Start(ctx context.Context, _ *app.RequestContext) context.Context {
|
||
return ctx
|
||
}
|
||
|
||
func (t *RedisAccessLogTracer) Finish(_ context.Context, c *app.RequestContext) {
|
||
if t.skipPrefix != "" {
|
||
p := string(c.Request.URI().PathOriginal())
|
||
if len(p) >= len(t.skipPrefix) && p[:len(t.skipPrefix)] == t.skipPrefix {
|
||
return
|
||
}
|
||
}
|
||
|
||
ti := c.GetTraceInfo()
|
||
st := ti.Stats()
|
||
|
||
var cost time.Duration
|
||
if rpcStart := st.GetEvent(stats.HTTPStart); rpcStart != nil {
|
||
if rpcFinish := st.GetEvent(stats.HTTPFinish); rpcFinish != nil {
|
||
cost = rpcFinish.Time().Sub(rpcStart.Time())
|
||
}
|
||
}
|
||
|
||
now := time.Now().UTC()
|
||
req := &c.Request
|
||
uri := req.URI()
|
||
|
||
remoteAddr := ""
|
||
if ra := c.RemoteAddr(); ra != nil {
|
||
remoteAddr = ra.String()
|
||
}
|
||
|
||
errStr := ""
|
||
if st.Error() != nil {
|
||
errStr = st.Error().Error()
|
||
}
|
||
panicked, panicVal := st.Panicked()
|
||
panicStr := ""
|
||
if panicVal != nil {
|
||
panicStr = anyToString(panicVal)
|
||
}
|
||
|
||
ev := AccessEvent{
|
||
Timestamp: now.Format(time.RFC3339Nano),
|
||
TsMs: now.UnixMilli(),
|
||
Service: t.service,
|
||
Instance: t.instance,
|
||
Method: string(req.Method()),
|
||
Host: string(req.Host()),
|
||
Path: string(uri.PathOriginal()),
|
||
Query: string(uri.QueryString()),
|
||
RequestURI: string(uri.RequestURI()),
|
||
Status: c.Response.StatusCode(),
|
||
CostMs: cost.Milliseconds(),
|
||
RecvBytes: st.RecvSize(),
|
||
SendBytes: st.SendSize(),
|
||
RemoteAddr: remoteAddr,
|
||
ClientIP: c.ClientIP(),
|
||
UserAgent: string(req.Header.UserAgent()),
|
||
Referer: string(req.Header.Peek("Referer")),
|
||
RequestID: string(req.Header.Peek("X-Request-Id")),
|
||
Error: errStr,
|
||
Panicked: panicked,
|
||
PanicValue: panicStr,
|
||
}
|
||
|
||
if t.userIDFn != nil {
|
||
ev.UserID = t.userIDFn(c)
|
||
}
|
||
|
||
if formJSON, ok, _ := FormBodyToJSONWithFilePlaceholder(c); ok {
|
||
ev.Body = string(formJSON)
|
||
}
|
||
|
||
if respBody, ok, trunc := ResponseBodySnippet(c, t.maxRespBody); ok {
|
||
ev.Result = respBody
|
||
ev.ResultTruncated = trunc
|
||
}
|
||
|
||
b, err := json.Marshal(ev)
|
||
if err != nil {
|
||
return
|
||
}
|
||
t.writer.Enqueue(b)
|
||
}
|
||
|
||
func (t *RedisAccessLogTracer) Close(ctx context.Context) error {
|
||
return t.writer.Close(ctx)
|
||
}
|
||
|
||
func anyToString(v interface{}) string {
|
||
switch x := v.(type) {
|
||
case string:
|
||
return x
|
||
default:
|
||
b, _ := json.Marshal(x)
|
||
return string(b)
|
||
}
|
||
}
|