Files
shiran 6050d11f27 feat: 添加微服务模板基础架构
- 创建基于 CloudWego Hertz 的 Go 微服务脚手架
- 集成 Nacos 服务注册/发现功能
- 添加 gRPC 客户端支持
- 实现环境变量配置管理 (.env.example)
- 添加 HTTP 中间件 (Recovery, AccessLog, CORS)
- 配置 Gitea CI/CD 构建部署流程

BREAKING CHANGE: 项目结构调整,从简单的 API 服务升级为完整的微服务架构
2026-04-15 11:13:38 +08:00

286 lines
6.8 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package httplog
import (
"context"
"encoding/json"
"os"
"sync"
"sync/atomic"
"time"
"github.com/cloudwego/hertz/pkg/app"
"github.com/cloudwego/hertz/pkg/common/tracer/stats"
"github.com/go-redis/redis/v8"
)
type AccessEvent struct {
Timestamp string `json:"@timestamp"`
TsMs int64 `json:"ts_ms"`
Service string `json:"service,omitempty"`
Instance string `json:"instance,omitempty"`
Method string `json:"method"`
Host string `json:"host,omitempty"`
Path string `json:"path"`
UserID uint `json:"user_id,omitempty"`
Body string `json:"body,omitempty"`
Query string `json:"query,omitempty"`
Result string `json:"result,omitempty"`
ResultTruncated bool `json:"result_truncated,omitempty"`
RequestURI string `json:"request_uri,omitempty"`
Route string `json:"route,omitempty"`
Status int `json:"status"`
CostMs int64 `json:"cost_ms"`
RecvBytes int `json:"recv_bytes"`
SendBytes int `json:"send_bytes"`
RemoteAddr string `json:"remote_addr,omitempty"`
ClientIP string `json:"client_ip,omitempty"`
UserAgent string `json:"ua,omitempty"`
Referer string `json:"referer,omitempty"`
RequestID string `json:"request_id,omitempty"`
Error string `json:"error,omitempty"`
Panicked bool `json:"panicked"`
PanicValue string `json:"panic_value"`
}
// UserIDExtractor 从请求上下文中提取用户 ID 的函数签名。
// 不同项目可根据自身认证方式实现此函数。
type UserIDExtractor func(c *app.RequestContext) uint
type RedisListWriter struct {
rdb *redis.Client
key string
ch chan []byte
flushInterval time.Duration
maxBatch int
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
dropped uint64
}
func NewRedisListWriter(rdb *redis.Client, key string, queueSize int, maxBatch int, flushInterval time.Duration) *RedisListWriter {
ctx, cancel := context.WithCancel(context.Background())
w := &RedisListWriter{
rdb: rdb,
key: key,
ch: make(chan []byte, queueSize),
flushInterval: flushInterval,
maxBatch: maxBatch,
ctx: ctx,
cancel: cancel,
}
w.wg.Add(1)
go w.loop()
return w
}
func (w *RedisListWriter) Enqueue(b []byte) {
select {
case w.ch <- b:
default:
atomic.AddUint64(&w.dropped, 1)
}
}
func (w *RedisListWriter) Dropped() uint64 { return atomic.LoadUint64(&w.dropped) }
func (w *RedisListWriter) Close(ctx context.Context) error {
w.cancel()
done := make(chan struct{})
go func() {
w.wg.Wait()
close(done)
}()
select {
case <-ctx.Done():
return ctx.Err()
case <-done:
return nil
}
}
func (w *RedisListWriter) loop() {
defer w.wg.Done()
ticker := time.NewTicker(w.flushInterval)
defer ticker.Stop()
flush := func(buf [][]byte) {
if len(buf) == 0 {
return
}
args := make([]interface{}, 0, len(buf))
for _, b := range buf {
args = append(args, string(b))
}
cctx, cancel := context.WithTimeout(w.ctx, 2*time.Second)
_ = w.rdb.RPush(cctx, w.key, args...).Err()
cancel()
}
var buf [][]byte
for {
select {
case <-w.ctx.Done():
flush(buf)
return
case b := <-w.ch:
buf = append(buf, b)
if len(buf) >= w.maxBatch {
flush(buf)
buf = buf[:0]
}
case <-ticker.C:
flush(buf)
buf = buf[:0]
}
}
}
type TracerOption func(*RedisAccessLogTracer)
// WithUserIDExtractor 设置自定义的用户 ID 提取函数
func WithUserIDExtractor(fn UserIDExtractor) TracerOption {
return func(t *RedisAccessLogTracer) {
t.userIDFn = fn
}
}
// WithSkipPrefix 设置需要跳过日志记录的路径前缀(如 /health, /metrics
func WithSkipPrefix(prefix string) TracerOption {
return func(t *RedisAccessLogTracer) {
t.skipPrefix = prefix
}
}
// WithMaxResponseBody 设置响应体采集的最大字节数
func WithMaxResponseBody(n int) TracerOption {
return func(t *RedisAccessLogTracer) {
t.maxRespBody = n
}
}
type RedisAccessLogTracer struct {
writer *RedisListWriter
service string
instance string
skipPrefix string
maxRespBody int
userIDFn UserIDExtractor
}
func NewRedisAccessLogTracer(rdb *redis.Client, redisKey string, service string, opts ...TracerOption) *RedisAccessLogTracer {
host, _ := os.Hostname()
t := &RedisAccessLogTracer{
writer: NewRedisListWriter(rdb, redisKey, 8192, 512, 200*time.Millisecond),
service: service,
instance: host,
maxRespBody: 2048,
}
for _, opt := range opts {
opt(t)
}
return t
}
func (t *RedisAccessLogTracer) Start(ctx context.Context, _ *app.RequestContext) context.Context {
return ctx
}
func (t *RedisAccessLogTracer) Finish(_ context.Context, c *app.RequestContext) {
if t.skipPrefix != "" {
p := string(c.Request.URI().PathOriginal())
if len(p) >= len(t.skipPrefix) && p[:len(t.skipPrefix)] == t.skipPrefix {
return
}
}
ti := c.GetTraceInfo()
st := ti.Stats()
var cost time.Duration
if rpcStart := st.GetEvent(stats.HTTPStart); rpcStart != nil {
if rpcFinish := st.GetEvent(stats.HTTPFinish); rpcFinish != nil {
cost = rpcFinish.Time().Sub(rpcStart.Time())
}
}
now := time.Now().UTC()
req := &c.Request
uri := req.URI()
remoteAddr := ""
if ra := c.RemoteAddr(); ra != nil {
remoteAddr = ra.String()
}
errStr := ""
if st.Error() != nil {
errStr = st.Error().Error()
}
panicked, panicVal := st.Panicked()
panicStr := ""
if panicVal != nil {
panicStr = anyToString(panicVal)
}
ev := AccessEvent{
Timestamp: now.Format(time.RFC3339Nano),
TsMs: now.UnixMilli(),
Service: t.service,
Instance: t.instance,
Method: string(req.Method()),
Host: string(req.Host()),
Path: string(uri.PathOriginal()),
Query: string(uri.QueryString()),
RequestURI: string(uri.RequestURI()),
Status: c.Response.StatusCode(),
CostMs: cost.Milliseconds(),
RecvBytes: st.RecvSize(),
SendBytes: st.SendSize(),
RemoteAddr: remoteAddr,
ClientIP: c.ClientIP(),
UserAgent: string(req.Header.UserAgent()),
Referer: string(req.Header.Peek("Referer")),
RequestID: string(req.Header.Peek("X-Request-Id")),
Error: errStr,
Panicked: panicked,
PanicValue: panicStr,
}
if t.userIDFn != nil {
ev.UserID = t.userIDFn(c)
}
if formJSON, ok, _ := FormBodyToJSONWithFilePlaceholder(c); ok {
ev.Body = string(formJSON)
}
if respBody, ok, trunc := ResponseBodySnippet(c, t.maxRespBody); ok {
ev.Result = respBody
ev.ResultTruncated = trunc
}
b, err := json.Marshal(ev)
if err != nil {
return
}
t.writer.Enqueue(b)
}
func (t *RedisAccessLogTracer) Close(ctx context.Context) error {
return t.writer.Close(ctx)
}
func anyToString(v interface{}) string {
switch x := v.(type) {
case string:
return x
default:
b, _ := json.Marshal(x)
return string(b)
}
}