package httplog import ( "apiServer_service/utils/logger" "apiServer_service/utils/redis_tools" "bytes" "context" "crypto/sha1" "encoding/hex" "encoding/json" "fmt" "io" "net/http" "os" "strconv" "time" "github.com/go-redis/redis/v8" ) type AccessEventData struct { Timestamp string `json:"@timestamp"` TsMs int64 `json:"ts_ms"` } var popBatchLua = redis.NewScript(` local key = KEYS[1] local n = tonumber(ARGV[1]) local res = redis.call("LRANGE", key, 0, n-1) if (#res > 0) then redis.call("LTRIM", key, n, -1) end return res `) func getenv(key, def string) string { if v := os.Getenv(key); v != "" { return v } return def } func getenvInt(key string, def int) int { v := os.Getenv(key) if v == "" { return def } i, err := strconv.Atoi(v) if err != nil { return def } return i } func Updater(redisKey, esIndexPrefix string) { if redisKey == "" { redisKey = getenv("ES_REDIS_KEY", "access_log") } esBulkURL := getenv("ES_BULK_URL", "https://elasticsearch.hostidc.net/_bulk") if esIndexPrefix == "" { esIndexPrefix = getenv("ES_INDEX_PREFIX", "access") } esUser := getenv("ES_USERNAME", "") esPass := getenv("ES_PASSWORD", "") batchSize := getenvInt("ES_BATCH_SIZE", 1000) pollIntervalMs := getenvInt("ES_POLL_INTERVAL_MS", 200) httpTimeoutMs := getenvInt("ES_HTTP_TIMEOUT_MS", 5000) rdb := redis_tools.ConnectRedis() httpClient := &http.Client{Timeout: time.Duration(httpTimeoutMs) * time.Millisecond} ctx := context.Background() for { items, err := popBatchLua.Run(ctx, rdb, []string{redisKey}, batchSize).StringSlice() if err != nil { time.Sleep(time.Second) continue } if len(items) == 0 { time.Sleep(time.Duration(pollIntervalMs) * time.Millisecond) continue } body, _ := buildBulkBody(items, esIndexPrefix) ok, _, err := postBulk(ctx, httpClient, esBulkURL, esUser, esPass, body) if err != nil || !ok { logger.Error("ESBulk", "写入失败: ", err) continue } } } func buildBulkBody(items []string, indexPrefix string) ([]byte, int) { var buf bytes.Buffer count := 0 for _, line := range items { idx := indexPrefix + "-" + time.Now().Format("2006.01.02") var ev AccessEventData if json.Unmarshal([]byte(line), &ev) == nil { if ev.TsMs > 0 { idx = indexPrefix + "-" + time.UnixMilli(ev.TsMs).UTC().Format("2006.01.02") } else if ev.Timestamp != "" { if ts, err := time.Parse(time.RFC3339Nano, ev.Timestamp); err == nil { idx = indexPrefix + "-" + ts.UTC().Format("2006.01.02") } } } sum := sha1.Sum([]byte(line)) docID := hex.EncodeToString(sum[:]) meta := fmt.Sprintf(`{"index":{"_index":"%s","_id":"%s"}}`+"\n", idx, docID) buf.WriteString(meta) buf.WriteString(line) buf.WriteByte('\n') count++ } return buf.Bytes(), count } func postBulk(ctx context.Context, client *http.Client, url, user, pass string, body []byte) (bool, []byte, error) { req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body)) if err != nil { return false, nil, err } req.Header.Set("Content-Type", "application/x-ndjson") if user != "" || pass != "" { req.SetBasicAuth(user, pass) } resp, err := client.Do(req) if err != nil { return false, nil, err } defer resp.Body.Close() b, _ := io.ReadAll(resp.Body) if resp.StatusCode < 200 || resp.StatusCode >= 300 { return false, b, fmt.Errorf("bulk http status=%d", resp.StatusCode) } var out struct { Errors bool `json:"errors"` } if json.Unmarshal(b, &out) == nil && out.Errors { return false, b, fmt.Errorf("bulk response errors=true") } return true, b, nil }