diff --git a/.env b/.env index 81d780d..e55b89d 100644 --- a/.env +++ b/.env @@ -1,8 +1,16 @@ HOST='0.0.0.0' PORT=8081 -SqlName="db.sql" -# redis配置 +# ------------------ 数据库配置 ------------------ +DB_TYPE= +DB_HOST=127.0.0.1 +DB_PORT=3306 +DB_USER=root +DB_PASSWORD= +DB_NAME=app +DB_SCHEMA=public + +# ------------------ Redis 配置 ------------------ REDIS_HOST="127.0.0.1:6379" REDIS_PASSWORD= LOG_LEVEL="debug" @@ -15,6 +23,9 @@ NACOS_GROUP_NAME= NACOS_USER= NACOS_PASSWORD= +# ------------------ SMS 服务 Token ------------------ +SERVICE_TOKEN= + # ------------------ GRPC配置 ------------------ GRPC_TOKEN= diff --git a/.env.example b/.env.example index c9bbb39..b5f6ce4 100644 --- a/.env.example +++ b/.env.example @@ -1,8 +1,16 @@ HOST='0.0.0.0' PORT=8081 -SqlName="db.sql" -# Redis 配置 +# ------------------ 数据库配置 ------------------ +DB_TYPE=mysql +DB_HOST=127.0.0.1 +DB_PORT=3306 +DB_USER=root +DB_PASSWORD= +DB_NAME=app +DB_SCHEMA=public + +# ------------------ Redis 配置 ------------------ REDIS_HOST="127.0.0.1:6379" REDIS_PASSWORD= LOG_LEVEL="debug" @@ -27,6 +35,9 @@ ES_BATCH_SIZE=1000 ES_POLL_INTERVAL_MS=200 ES_HTTP_TIMEOUT_MS=5000 +# ------------------ SMS 服务 Token ------------------ +SERVICE_TOKEN= + # ------------------ GRPC 配置 ------------------ GRPC_TOKEN= diff --git a/.gitea/workflows/build.yaml b/.gitea/workflows/build.yaml index def817c..f0a00dc 100644 --- a/.gitea/workflows/build.yaml +++ b/.gitea/workflows/build.yaml @@ -16,6 +16,7 @@ jobs: run: | go build -ldflags="-s -w" -o server ./cmd/main_program go build -ldflags="-s -w" -o cli ./cmd/cli_control + go build -ldflags="-s -w" -o scheduler ./cmd/scheduler - name: Save artifact uses: actions/upload-artifact@v3 @@ -24,6 +25,7 @@ jobs: path: | ./server ./cli + ./scheduler deploy: needs: build @@ -51,13 +53,10 @@ jobs: while read -r ip; do if [ -n "$ip" ]; then echo "Deploying to $ip..." - scp -o StrictHostKeyChecking=no server ${{ vars.ROOT_USER_NAME }}@"$ip":/root/server.tmp - scp -o StrictHostKeyChecking=no cli ${{ vars.ROOT_USER_NAME }}@"$ip":/root/cli.tmp + scp -o StrictHostKeyChecking=no server cli scheduler ${{ vars.ROOT_USER_NAME }}@"$ip":/root/ ssh -n ${{ vars.ROOT_USER_NAME }}@"$ip" << 'ENDSSH' - mv /root/server.tmp /root/server - mv /root/cli.tmp /root/cli - chmod +x /root/server /root/cli - systemctl restart server cli + chmod +x /root/server /root/cli /root/scheduler + systemctl restart server cli scheduler ENDSSH echo "Deployment to $ip completed" fi diff --git a/README.md b/README.md index 53be7b8..36f640b 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # apiServer 微服务模板 -基于 [CloudWego Hertz](https://github.com/cloudwego/hertz) 的 Go 微服务脚手架,集成 Nacos 服务注册/发现 + gRPC 客户端 + 访问日志上报(Redis → ES)。 +基于 [CloudWego Hertz](https://github.com/cloudwego/hertz) 的 Go 微服务脚手架,集成 Nacos 服务注册/发现 + gRPC 客户端 + 访问日志上报(Redis → ES)+ 定时任务 + 事件 Hook 体系。 ## 项目结构 @@ -8,19 +8,27 @@ ├── apps/ # 业务处理层 (Handler) ├── cmd/ │ ├── main_program/ # 主程序入口 (HTTP 服务) -│ └── cli_control/ # CLI 工具入口 (httplog 上报) +│ ├── cli_control/ # CLI 工具入口 (httplog 上报) +│ └── scheduler/ # 定时任务调度器 (独立进程) +├── crontab/ # 定时任务框架 +│ └── tasks/ # 具体任务实现 +├── hooks/ # 事件 Hook 体系 +│ └── registers/ # Hook 注册入口 ├── middleware/ # HTTP 中间件 (Recovery, AccessLog, CORS) -├── models/request_models/ # 请求参数模型 +├── models/ +│ ├── database/ # 数据库连接 & ORM 模型 (GORM) +│ └── request_models/ # 请求参数模型 ├── proto/ # Protobuf 生成代码 ├── routes/ # 路由定义 ├── utils/ │ ├── httplog/ # HTTP 访问日志采集 & ES 上报 -│ ├── logger/ # 日志工具 (logrus) +│ ├── logger/ # 日志工具 (logrus,主服务/定时任务分离) │ ├── nacos/ # Nacos 服务注册/发现/配置 │ ├── redis_tools/ # Redis 连接 & 通用操作 │ ├── request/ # 请求绑定 & 统一响应 │ └── server_cli/ # gRPC 客户端 -├── start.sh / stop.sh / restart.sh +├── deploy/ # systemd 服务单元 +├── start.sh / stop.sh / restart.sh / install.sh ├── .env.example # 环境变量示例 └── go.mod ``` @@ -38,10 +46,12 @@ go mod tidy # 3. 开发运行 go run ./cmd/main_program # HTTP 服务 go run ./cmd/cli_control # httplog 上报 +go run ./cmd/scheduler # 定时任务调度器 # 4. 构建 go build -ldflags="-s -w" -o server ./cmd/main_program go build -ldflags="-s -w" -o cli ./cmd/cli_control +go build -ldflags="-s -w" -o scheduler ./cmd/scheduler ``` ## 模板使用步骤 @@ -58,111 +68,164 @@ go build -ldflags="-s -w" -o cli ./cmd/cli_control ### cmd/main_program — HTTP 主服务 -启动 Hertz HTTP 服务,绑定路由和中间件,可选注册到 Nacos。 +启动 Hertz HTTP 服务,绑定路由和中间件,可选连接数据库、注册到 Nacos。 ```bash go run ./cmd/main_program ``` -启动流程:加载 `.env` → 校验 `HOST`/`PORT` → 注册中间件(Recovery、AccessLog、CORS)→ 绑定路由 → Nacos 注册(可选)→ 启动 HTTP 监听 → 等待信号优雅关闭。 +启动流程:加载 `.env` → 校验 `HOST`/`PORT` → 连接数据库(可选)→ 注册中间件 → 绑定路由 → Nacos 注册(可选)→ HTTP 监听 → 等待信号优雅关闭。 ### cmd/cli_control — httplog 日志上报 -独立后台进程,从 Redis 队列中消费访问日志,批量写入 Elasticsearch。 +独立后台进程,从 Redis 队列消费访问日志,批量写入 Elasticsearch。 ```bash go run ./cmd/cli_control ``` -启动后会以轮询方式从 `ES_REDIS_KEY` 队列中批量 pop 日志条目,组装 ES `_bulk` 请求写入 `ES_INDEX_PREFIX-YYYY.MM.DD` 索引。 +### cmd/scheduler — 定时任务调度器 + +独立进程运行定时任务,使用专用日志文件 `logs/cron_*.log`。 + +```bash +go run ./cmd/scheduler # 常驻模式 +go run ./cmd/scheduler list # 列出已注册任务 +go run ./cmd/scheduler run-once # 立即执行所有任务一次 +go run ./cmd/scheduler run 健康检查 # 立即执行指定任务 +``` + +### crontab — 定时任务框架 + +基于 `robfig/cron` 的调度器,支持秒级 cron 表达式,内置 panic 恢复。 + +**新增任务三步走:** + +```go +// 1. 在 crontab/tasks/ 下新建文件,实现 Task 接口 +type MyTask struct{} + +func (t *MyTask) Name() string { return "我的任务" } +func (t *MyTask) Spec() string { return "0 */10 * * * *" } // 每10分钟 +func (t *MyTask) Run() { + logger.CronInfo("MyTask", "执行中...") +} + +// 2. 在 crontab/tasks/register.go 注册 +func RegisterTasks(scheduler *crontab.Scheduler) { + scheduler.Register(&HealthCheckTask{}) + scheduler.Register(&MyTask{}) // 新增 +} + +// 3. 完成,scheduler 进程会自动调度 +``` + +定时任务日志独立写入 `logs/cron_YYYYMMDD.log`,不与主服务日志混合。 + +### hooks — 事件 Hook 体系 + +发布-订阅模式的业务事件系统,支持同步/异步处理和优先级排序。 + +**触发事件:** + +```go +import "apiServer_service/hooks" + +hooks.DefaultManager().Trigger(ctx, hooks.EventType("order_paid"), &hooks.EventPayload{ + UserID: 123, + Extra: map[string]any{"order_id": 456, "amount": 99.9}, +}) +``` + +**注册 Handler:** + +```go +// hooks/registers/register.go +func RegisterHooks(m *hooks.Manager) { + // 同步处理(按优先级顺序执行,阻塞直到完成) + m.Register(hooks.EventType("order_paid"), hooks.HandlerFunc(OnOrderPaid)) + + // 异步处理(不阻塞主流程) + m.RegisterAsync(hooks.EventType("order_paid"), hooks.HandlerFunc(SendNotification)) + + // 指定优先级(数值越大越先执行) + m.RegisterWithPriority(hooks.EventType("order_paid"), 10, hooks.HandlerFunc(DeductInventory)) +} + +func OnOrderPaid(ctx context.Context, event hooks.EventType, payload *hooks.EventPayload) error { + // 业务逻辑 + return nil +} +``` + +**特性:** + +| 能力 | 说明 | +|------|------| +| 同步执行 | `Register` — 按优先级顺序执行,阻塞调用方 | +| 异步执行 | `RegisterAsync` — goroutine 执行,不阻塞 | +| 优先级 | `RegisterWithPriority` — 数值越大越先执行 | +| panic 安全 | 异步 handler panic 不影响主流程 | +| 函数适配 | `HandlerFunc` 可直接用匿名函数注册 | ### middleware — HTTP 中间件 在 `cmd/main_program/routs.go` 中统一注册: ```go -r.Use(middleware.Recovery()) // panic 恢复,防止单个请求崩溃整个服务 -r.Use(middleware.AccessLog()) // 请求日志(方法、路径、状态码、耗时) +r.Use(middleware.Recovery()) // panic 恢复 +r.Use(middleware.AccessLog()) // 请求日志 r.Use(middleware.CORS()) // 跨域支持 ``` -### utils/httplog — 访问日志采集 +### models/database — 数据库 (GORM) -Hertz Tracer 实现,在请求完成后采集完整的访问事件(方法、路径、状态码、耗时、请求体、响应体等),通过 Redis List 异步缓冲。 - -**在主服务中接入:** +支持 MySQL 和 PostgreSQL,`DB_TYPE` 为空时跳过连接。 ```go -import ( - "apiServer_service/utils/httplog" - "apiServer_service/utils/redis_tools" - "github.com/cloudwego/hertz/pkg/app/server" -) +import db "apiServer_service/models/database" +db.GetDB().Where("parent_id = ?", 0).Find(&groups) +db.GetDB().Create(&db.HostGroup{Name: "生产环境"}) +``` + +新增模型在 `models/database/` 下定义结构体,在 `init.go` 的 `Migrate()` 中注册。 + +### utils/httplog — 访问日志采集 + +Hertz Tracer 实现,采集访问事件通过 Redis List 异步缓冲,由 `cli` 进程批量写入 ES。 + +```go rdb := redis_tools.ConnectRedis() tracer := httplog.NewRedisAccessLogTracer(rdb, "access_log", "my-service", - httplog.WithSkipPrefix("/health"), // 跳过健康检查路径 - httplog.WithMaxResponseBody(4096), // 响应体最大采集 4KB - httplog.WithUserIDExtractor(func(c *app.RequestContext) uint { - // 根据你的认证方式提取 user_id - return 0 - }), + httplog.WithSkipPrefix("/health"), + httplog.WithMaxResponseBody(4096), + httplog.WithUserIDExtractor(func(c *app.RequestContext) uint { return 0 }), ) h := server.Default(server.WithTracer(tracer)) ``` -**特性:** -- 敏感字段自动脱敏(password, token, secret 等) -- multipart 文件字段替换为 `[file]` 占位符 -- 非文本响应自动跳过(图片、zip 等) -- 异步写入 Redis,队列满时丢弃(不阻塞业务) - ### utils/redis_tools — Redis 工具 -单例连接,提供通用 KV 和 List 操作: - ```go -import "apiServer_service/utils/redis_tools" - -// 连接(全局只初始化一次) -rdb := redis_tools.ConnectRedis() - -// KV 操作 redis_tools.SetCache("key", "value", 10*time.Minute) -val, err := redis_tools.GetCache("key") +val, _ := redis_tools.GetCache("key") redis_tools.Del("key1", "key2") redis_tools.Exists("key") - -// List 操作 redis_tools.AddToList("queue", "item") -items, _ := redis_tools.GetAllFromList("queue") -redis_tools.RemoveFromList("queue", "item") ``` ### utils/nacos — Nacos 服务注册/发现/配置 ```go -import "apiServer_service/utils/nacos" - -// 注册当前服务(读取 NACOS_SERVICE_* 环境变量) nacos.RegisterService() - -// 发现服务(带内存缓存) -instance, err := nacos.DiscoverService("user-service") -addr := instance.Ip + ":" + strconv.Itoa(int(instance.Port)) - -// 配置管理 +instance, _ := nacos.DiscoverService("user-service") content := nacos.GetConfig("app.yaml", "DEFAULT_GROUP") -nacos.AddConfig("app.yaml", "DEFAULT_GROUP", "key: value") ``` ### utils/server_cli — gRPC 客户端 -通过 Nacos 服务发现获取 gRPC 地址,连接复用: - ```go -import "apiServer_service/utils/server_cli" - err := server_cli.ReportVisit(token, note, ip, os, point, userId) defer server_cli.CloseGrpcConn() ``` @@ -170,37 +233,24 @@ defer server_cli.CloseGrpcConn() ### utils/request — 请求绑定 & 统一响应 ```go -import "apiServer_service/utils/request" - -// 参数绑定(失败自动返回 400) -var req MyRequest -if err := request.BindRequestStruct(c, &req); err != nil { - return -} - -// 统一响应 -request.Success(c, data) // 200 {"code":200,"message":"Success","data":...} -request.BadRequest(c, "参数错误") // 400 -request.Unauthorized(c, "未登录") // 401 -request.NotFound(c, "资源不存在") // 404 -request.Error(c, 500, "服务器内部错误") // 自定义状态码 -request.FileResponse(c, "/path/to/file", "download.zip") +request.BindRequestStruct(c, &req) // 失败自动 400 +request.Success(c, data) // 200 +request.BadRequest(c, "参数错误") // 400 +request.Unauthorized(c, "未登录") // 401 +request.NotFound(c, "不存在") // 404 +request.Error(c, 500, "错误") // 自定义 +request.FileResponse(c, path, name) // 文件下载 ``` ### routes — 路由定义 -在 `routes/` 下按模块拆分路由文件,在 `cmd/main_program/routs.go` 中注册: - ```go func SetupRoutes(r *server.Hertz) { - r.Use(middleware.Recovery()) - r.Use(middleware.AccessLog()) - r.Use(middleware.CORS()) - + r.Use(middleware.Recovery(), middleware.AccessLog(), middleware.CORS()) api := r.Group("/api") { routes.RegisterIndexRoutes(api) - // routes.RegisterUserRoutes(api) // 新增模块在此注册 + // routes.RegisterUserRoutes(api) // 新增模块在此注册 } } ``` @@ -214,40 +264,32 @@ func SetupRoutes(r *server.Hertz) { ```bash go build -ldflags="-s -w" -o server ./cmd/main_program go build -ldflags="-s -w" -o cli ./cmd/cli_control +go build -ldflags="-s -w" -o scheduler ./cmd/scheduler ``` ### 首次安装(systemd 服务注册) -将二进制、`.env`、脚本和 `deploy/` 目录上传到服务器后执行: - ```bash -chmod +x install.sh start.sh stop.sh restart.sh server cli - -# 安装 systemd 服务 + 设置开机自启 +chmod +x install.sh start.sh stop.sh restart.sh server cli scheduler sudo bash install.sh ``` -`install.sh` 会自动: -1. 将 `deploy/*.service` 适配当前路径后复制到 `/etc/systemd/system/` -2. 执行 `systemctl daemon-reload` -3. 执行 `systemctl enable server cli` 开机自启 - ### 日常运维 ```bash -bash start.sh # 启动全部服务 -bash stop.sh # 停止全部服务 -bash restart.sh # 重启全部服务 +bash start.sh # 启动全部 (server + cli + scheduler) +bash stop.sh # 停止全部 +bash restart.sh # 重启全部 ``` -也可以直接使用 `systemctl` 管理单个服务: +单独管理: ```bash -systemctl status server # 查看主服务状态 -systemctl status cli # 查看 CLI 状态 -systemctl restart server # 只重启主服务 -journalctl -u server -f # 查看主服务实时日志 -journalctl -u cli -f --since today # 查看 CLI 今日日志 +systemctl status server # 主服务状态 +systemctl status scheduler # 调度器状态 +systemctl restart scheduler # 只重启调度器 +journalctl -u server -f # 主服务实时日志 +journalctl -u scheduler -f # 调度器实时日志 ``` ### systemd 服务特性 @@ -255,41 +297,43 @@ journalctl -u cli -f --since today # 查看 CLI 今日日志 | 特性 | 说明 | |------|------| | 开机自启 | `install.sh` 执行后自动启用 | -| 崩溃自动重启 | `Restart=always`,server 间隔 3s,cli 间隔 5s | -| 优雅关闭 | `KillSignal=SIGTERM`,等待 10s 超时后 SIGKILL | -| 环境变量 | 通过 `EnvironmentFile` 加载 `.env` | +| 崩溃自动重启 | `Restart=always`,server 3s / cli 5s / scheduler 5s | +| 优雅关闭 | `SIGTERM`,server 10s / scheduler 15s 超时 | +| 环境变量 | `EnvironmentFile` 加载 `.env` | | 文件描述符 | `LimitNOFILE=65536` | -| 日志 | 同时写入 `logs/*.out` 和 `journalctl` | ### 部署目录结构 ``` /root/ -├── server # HTTP 主服务二进制 -├── cli # httplog 上报二进制 +├── server # HTTP 主服务 +├── cli # httplog 上报 +├── scheduler # 定时任务调度器 ├── .env # 环境配置 ├── deploy/ -│ ├── server.service # systemd 服务单元(模板) -│ └── cli.service -├── install.sh # 首次安装脚本 -├── start.sh # 启动 -├── stop.sh # 停止 -├── restart.sh # 重启 +│ ├── server.service +│ ├── cli.service +│ └── scheduler.service +├── install.sh / start.sh / stop.sh / restart.sh └── logs/ - ├── server.out - └── cli.out + ├── server.out # 主服务输出 + ├── cli.out # httplog 输出 + ├── scheduler.out # 调度器输出 + └── cron_*.log # 定时任务专用日志 ``` ## 内置功能清单 - Hertz HTTP 框架 + 路由分组 - Recovery / AccessLog / CORS 中间件 -- 统一 JSON 响应格式 -- 参数绑定与校验 +- 统一 JSON 响应格式 + 参数绑定校验 +- GORM 数据库(MySQL / PostgreSQL,自动迁移) +- 事件 Hook 体系(同步/异步、优先级、发布-订阅) +- 定时任务调度器(秒级 cron、独立进程、专用日志) - HTTP 访问日志采集 → Redis 缓冲 → ES 批量上报 - Redis 工具(单例连接池) - Nacos 服务注册、发现、配置管理 -- gRPC 客户端(含连接复用) -- 彩色日志输出 + 文件日志 +- gRPC 客户端(连接复用) +- 彩色日志 + 文件日志(主服务/定时任务分离) - 优雅关闭 (Graceful Shutdown) - systemd 服务管理(开机自启 + 崩溃自动重启) diff --git a/cmd/main_program/main.go b/cmd/main_program/main.go index 511d007..dc2412d 100644 --- a/cmd/main_program/main.go +++ b/cmd/main_program/main.go @@ -1,6 +1,7 @@ package main import ( + db "apiServer_service/models/database" "apiServer_service/utils/logger" "apiServer_service/utils/nacos" "fmt" @@ -25,6 +26,11 @@ func main() { logger.Fatal("Config", "HOST 和 PORT 环境变量必须设置") } + if os.Getenv("DB_TYPE") != "" { + db.ConnectDatabase() + db.Migrate() + } + h := server.Default( server.WithHostPorts(host+":"+port), server.WithExitWaitTime(0), diff --git a/cmd/scheduler/main.go b/cmd/scheduler/main.go new file mode 100644 index 0000000..36c9018 --- /dev/null +++ b/cmd/scheduler/main.go @@ -0,0 +1,79 @@ +package main + +import ( + "apiServer_service/crontab" + "apiServer_service/crontab/tasks" + db "apiServer_service/models/database" + "apiServer_service/utils/logger" + "fmt" + "os" + "os/signal" + "syscall" + + "github.com/joho/godotenv" +) + +func init() { + if err := godotenv.Load(".env"); err != nil { + fmt.Println("Warning: .env file not found, using system environment variables") + } +} + +func main() { + if os.Getenv("DB_TYPE") != "" { + db.ConnectDatabase() + } + + scheduler := crontab.DefaultScheduler() + tasks.RegisterTasks(scheduler) + + if len(os.Args) > 1 { + switch os.Args[1] { + case "run-once": + logger.CronInfo("Scheduler", "立即执行所有任务") + scheduler.RunOnce() + return + case "run": + if len(os.Args) > 2 { + name := os.Args[2] + if scheduler.RunTaskByName(name) { + logger.CronInfo("Scheduler", fmt.Sprintf("任务执行完成: %s", name)) + } else { + logger.CronError("Scheduler", fmt.Sprintf("未找到任务: %s", name)) + } + return + } + logger.CronError("Scheduler", "用法: scheduler run <任务名>") + return + case "list": + fmt.Println("已注册任务:") + for _, task := range scheduler.GetTasks() { + fmt.Printf(" - %-20s %s\n", task.Name(), task.Spec()) + } + return + default: + fmt.Printf("未知命令: %s\n", os.Args[1]) + printUsage() + return + } + } + + scheduler.Start() + logger.CronInfo("Scheduler", "调度器已启动") + + quit := make(chan os.Signal, 1) + signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) + <-quit + + logger.CronInfo("Scheduler", "正在停止...") + scheduler.Stop() + logger.CronInfo("Scheduler", "已停止") +} + +func printUsage() { + fmt.Println("用法:") + fmt.Println(" scheduler 启动调度器(后台常驻)") + fmt.Println(" scheduler list 列出所有已注册任务") + fmt.Println(" scheduler run-once 立即执行所有任务一次") + fmt.Println(" scheduler run <名> 立即执行指定任务一次") +} diff --git a/crontab/scheduler.go b/crontab/scheduler.go new file mode 100644 index 0000000..51a6e0e --- /dev/null +++ b/crontab/scheduler.go @@ -0,0 +1,99 @@ +package crontab + +import ( + "apiServer_service/utils/logger" + "fmt" + "sync" + + "github.com/robfig/cron/v3" +) + +// Task 定时任务接口 +type Task interface { + Name() string + Spec() string + Run() +} + +type Scheduler struct { + cron *cron.Cron + tasks []Task + mu sync.RWMutex +} + +var ( + defaultScheduler *Scheduler + once sync.Once +) + +func DefaultScheduler() *Scheduler { + once.Do(func() { + defaultScheduler = NewScheduler() + }) + return defaultScheduler +} + +func NewScheduler() *Scheduler { + return &Scheduler{ + cron: cron.New(cron.WithSeconds()), + } +} + +func (s *Scheduler) Register(task Task) { + s.mu.Lock() + defer s.mu.Unlock() + + entryID, err := s.cron.AddFunc(task.Spec(), func() { + logger.CronInfo("Cron", fmt.Sprintf("执行任务: %s", task.Name())) + defer func() { + if r := recover(); r != nil { + logger.CronError("Cron", fmt.Sprintf("任务 %s panic: %v", task.Name(), r)) + } + }() + task.Run() + }) + if err != nil { + logger.CronError("Cron", fmt.Sprintf("注册任务失败: %s, %v", task.Name(), err)) + return + } + s.tasks = append(s.tasks, task) + logger.CronInfo("Cron", fmt.Sprintf("注册任务: %s [%s] (id=%d)", task.Name(), task.Spec(), entryID)) +} + +func (s *Scheduler) Start() { + logger.CronInfo("Cron", fmt.Sprintf("调度器启动,已注册 %d 个任务", len(s.tasks))) + s.cron.Start() +} + +func (s *Scheduler) Stop() { + logger.CronInfo("Cron", "调度器停止") + ctx := s.cron.Stop() + <-ctx.Done() +} + +func (s *Scheduler) RunOnce() { + s.mu.RLock() + defer s.mu.RUnlock() + for _, task := range s.tasks { + logger.CronInfo("Cron", fmt.Sprintf("立即执行: %s", task.Name())) + task.Run() + } +} + +func (s *Scheduler) RunTaskByName(name string) bool { + s.mu.RLock() + defer s.mu.RUnlock() + for _, task := range s.tasks { + if task.Name() == name { + task.Run() + return true + } + } + return false +} + +func (s *Scheduler) GetTasks() []Task { + s.mu.RLock() + defer s.mu.RUnlock() + return s.tasks +} diff --git a/crontab/tasks/health_check.go b/crontab/tasks/health_check.go new file mode 100644 index 0000000..86ecab7 --- /dev/null +++ b/crontab/tasks/health_check.go @@ -0,0 +1,27 @@ +package tasks + +import ( + "apiServer_service/utils/logger" + "fmt" + "runtime" +) + +// HealthCheckTask 健康检查示例任务 +type HealthCheckTask struct{} + +func (t *HealthCheckTask) Name() string { + return "健康检查" +} + +func (t *HealthCheckTask) Spec() string { + return "0 */5 * * * *" +} + +func (t *HealthCheckTask) Run() { + var m runtime.MemStats + runtime.ReadMemStats(&m) + logger.CronInfo("HealthCheck", fmt.Sprintf( + "goroutines=%d, heap=%dMB, sys=%dMB", + runtime.NumGoroutine(), m.HeapAlloc/1024/1024, m.Sys/1024/1024, + )) +} diff --git a/crontab/tasks/register.go b/crontab/tasks/register.go new file mode 100644 index 0000000..089a86d --- /dev/null +++ b/crontab/tasks/register.go @@ -0,0 +1,13 @@ +package tasks + +import ( + "apiServer_service/crontab" +) + +// RegisterTasks 注册所有定时任务,新增任务在此添加 +func RegisterTasks(scheduler *crontab.Scheduler) { + scheduler.Register(&HealthCheckTask{}) + + // 新增任务示例: + // scheduler.Register(NewXxxTask()) +} diff --git a/deploy/scheduler.service b/deploy/scheduler.service new file mode 100644 index 0000000..0ca56db --- /dev/null +++ b/deploy/scheduler.service @@ -0,0 +1,22 @@ +[Unit] +Description=API Server Scheduler (crontab) +After=network.target redis.service +Wants=redis.service + +[Service] +Type=simple +WorkingDirectory=/root +ExecStart=/root/scheduler +Restart=always +RestartSec=5 +LimitNOFILE=65536 +KillSignal=SIGTERM +TimeoutStopSec=15 + +EnvironmentFile=-/root/.env + +StandardOutput=append:/root/logs/scheduler.out +StandardError=append:/root/logs/scheduler.out + +[Install] +WantedBy=multi-user.target diff --git a/go.mod b/go.mod index 69ba443..78b3aad 100644 --- a/go.mod +++ b/go.mod @@ -7,9 +7,13 @@ require ( github.com/go-redis/redis/v8 v8.11.5 github.com/joho/godotenv v1.5.1 github.com/nacos-group/nacos-sdk-go/v2 v2.2.9 + github.com/robfig/cron/v3 v3.0.1 github.com/sirupsen/logrus v1.9.3 google.golang.org/grpc v1.71.0 google.golang.org/protobuf v1.36.5 + gorm.io/driver/mysql v1.5.7 + gorm.io/driver/postgres v1.5.11 + gorm.io/gorm v1.25.12 ) require ( diff --git a/hooks/event.go b/hooks/event.go new file mode 100644 index 0000000..ca7331d --- /dev/null +++ b/hooks/event.go @@ -0,0 +1,24 @@ +package hooks + +import "context" + +// EventType 业务事件类型,按项目需求在此扩展 +type EventType string + +// Handler 事件处理器接口 +type Handler interface { + Handle(ctx context.Context, event EventType, payload *EventPayload) error +} + +// HandlerFunc 函数适配器,方便用匿名函数注册 handler +type HandlerFunc func(ctx context.Context, event EventType, payload *EventPayload) error + +func (f HandlerFunc) Handle(ctx context.Context, event EventType, payload *EventPayload) error { + return f(ctx, event, payload) +} + +// EventPayload 事件携带的上下文信息 +type EventPayload struct { + UserID int + Extra map[string]any +} diff --git a/hooks/global.go b/hooks/global.go new file mode 100644 index 0000000..4702046 --- /dev/null +++ b/hooks/global.go @@ -0,0 +1,7 @@ +package hooks + +var defaultManager = NewManager() + +func DefaultManager() *Manager { + return defaultManager +} diff --git a/hooks/manager.go b/hooks/manager.go new file mode 100644 index 0000000..26c5d99 --- /dev/null +++ b/hooks/manager.go @@ -0,0 +1,83 @@ +package hooks + +import ( + "apiServer_service/utils/logger" + "context" + "fmt" + "sort" + "sync" +) + +type handlerWithMeta struct { + priority int + async bool + h Handler +} + +type Manager struct { + mu sync.RWMutex + handlers map[EventType][]handlerWithMeta +} + +func NewManager() *Manager { + return &Manager{ + handlers: make(map[EventType][]handlerWithMeta), + } +} + +// Register 同步注册,优先级 0 +func (m *Manager) Register(event EventType, h Handler) { + m.RegisterWithOptions(event, 0, false, h) +} + +// RegisterWithPriority 指定优先级(同步) +func (m *Manager) RegisterWithPriority(event EventType, priority int, h Handler) { + m.RegisterWithOptions(event, priority, false, h) +} + +// RegisterAsync 异步注册,优先级 0 +func (m *Manager) RegisterAsync(event EventType, h Handler) { + m.RegisterWithOptions(event, 0, true, h) +} + +// RegisterWithOptions 核心注册:优先级 + 同步/异步 +func (m *Manager) RegisterWithOptions(event EventType, priority int, async bool, h Handler) { + m.mu.Lock() + defer m.mu.Unlock() + m.handlers[event] = append(m.handlers[event], handlerWithMeta{ + priority: priority, + async: async, + h: h, + }) +} + +// Trigger 触发事件,按优先级从高到低执行 +func (m *Manager) Trigger(ctx context.Context, event EventType, payload *EventPayload) { + m.mu.RLock() + hs := m.handlers[event] + m.mu.RUnlock() + + if len(hs) == 0 { + return + } + + sorted := make([]handlerWithMeta, len(hs)) + copy(sorted, hs) + sort.SliceStable(sorted, func(i, j int) bool { + return sorted[i].priority > sorted[j].priority + }) + + for _, hm := range sorted { + if hm.async { + go func(hm handlerWithMeta) { + if err := hm.h.Handle(ctx, event, payload); err != nil { + logger.Error("Hook", fmt.Sprintf("async event=%s err=%v", event, err)) + } + }(hm) + } else { + if err := hm.h.Handle(ctx, event, payload); err != nil { + logger.Error("Hook", fmt.Sprintf("event=%s err=%v", event, err)) + } + } + } +} diff --git a/hooks/registers/register.go b/hooks/registers/register.go new file mode 100644 index 0000000..4b21e5e --- /dev/null +++ b/hooks/registers/register.go @@ -0,0 +1,12 @@ +package registers + +import ( + "apiServer_service/hooks" +) + +// RegisterHooks 注册所有事件 handler,新增 hook 在此添加 +func RegisterHooks(m *hooks.Manager) { + // 示例: + // m.Register(hooks.EventType("order_paid"), hooks.HandlerFunc(OnOrderPaid)) + // m.RegisterAsync(hooks.EventType("user_registered"), hooks.HandlerFunc(SendWelcomeEmail)) +} diff --git a/install.sh b/install.sh index aa8e844..6c087ad 100644 --- a/install.sh +++ b/install.sh @@ -5,6 +5,7 @@ APP_DIR="$(cd "$(dirname "$0")" && pwd)" DEPLOY_DIR="$APP_DIR/deploy" SERVICE_DIR="/etc/systemd/system" LOG_DIR="$APP_DIR/logs" +SERVICES="server cli scheduler" if [ "$(id -u)" -ne 0 ]; then echo "请使用 root 权限运行: sudo bash install.sh" @@ -15,14 +16,13 @@ mkdir -p "$LOG_DIR" echo "=== 安装 systemd 服务 ===" -for svc in server cli; do +for svc in $SERVICES; do src="$DEPLOY_DIR/${svc}.service" if [ ! -f "$src" ]; then echo "[${svc}] service 文件不存在: $src" continue fi - # 用实际路径替换模板中的 /root sed "s|WorkingDirectory=/root|WorkingDirectory=$APP_DIR|g; \ s|ExecStart=/root/|ExecStart=$APP_DIR/|g; \ s|EnvironmentFile=-/root/.env|EnvironmentFile=-$APP_DIR/.env|g; \ @@ -34,7 +34,7 @@ done systemctl daemon-reload -for svc in server cli; do +for svc in $SERVICES; do systemctl enable "$svc" echo "[${svc}] 已设置开机自启" done @@ -42,8 +42,9 @@ done echo "" echo "=== 安装完成 ===" echo "使用方式:" -echo " bash start.sh # 启动全部" -echo " bash stop.sh # 停止全部" -echo " bash restart.sh # 重启全部" -echo " systemctl status server # 查看主服务状态" -echo " journalctl -u server -f # 查看主服务实时日志" +echo " bash start.sh # 启动全部" +echo " bash stop.sh # 停止全部" +echo " bash restart.sh # 重启全部" +echo " systemctl status server # 查看主服务状态" +echo " systemctl status scheduler # 查看调度器状态" +echo " journalctl -u scheduler -f # 查看调度器实时日志" diff --git a/models/database/base_model.go b/models/database/base_model.go new file mode 100644 index 0000000..8cf4ed4 --- /dev/null +++ b/models/database/base_model.go @@ -0,0 +1,11 @@ +package db + +import "gorm.io/gorm" + +// HostGroup 主机组 +type HostGroup struct { + gorm.Model + Name string `json:"name" gorm:"not null;comment:主机组名称;index"` + Description string `json:"description" gorm:"null;comment:主机组描述;index"` + ParentID uint `json:"parent_id" gorm:"null;comment:父级ID;index"` +} diff --git a/models/database/init.go b/models/database/init.go new file mode 100644 index 0000000..949dac1 --- /dev/null +++ b/models/database/init.go @@ -0,0 +1,100 @@ +package db + +import ( + "apiServer_service/utils/logger" + "fmt" + "os" + "sync" + "time" + + "gorm.io/driver/mysql" + "gorm.io/driver/postgres" + "gorm.io/gorm" +) + +var ( + DB *gorm.DB + once sync.Once +) + +func ConnectDatabase() { + once.Do(func() { + dbType := os.Getenv("DB_TYPE") + dbUser := os.Getenv("DB_USER") + dbPassword := os.Getenv("DB_PASSWORD") + dbName := os.Getenv("DB_NAME") + dbHost := os.Getenv("DB_HOST") + dbPort := os.Getenv("DB_PORT") + + if dbType == "" { + logger.Warn("Database", "DB_TYPE 未配置,跳过数据库连接") + return + } + + var dialector gorm.Dialector + + switch dbType { + case "mysql": + dsn := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", + dbUser, dbPassword, dbHost, dbPort, dbName) + dialector = mysql.Open(dsn) + case "postgres": + dbSchema := os.Getenv("DB_SCHEMA") + if dbSchema == "" { + dbSchema = "public" + } + dsn := fmt.Sprintf("host=%s user=%s password=%s dbname=%s port=%s sslmode=disable TimeZone=Asia/Shanghai search_path=%s", + dbHost, dbUser, dbPassword, dbName, dbPort, dbSchema) + dialector = postgres.Open(dsn) + default: + logger.Fatal("Database", fmt.Sprintf("不支持的 DB_TYPE: %s(仅支持 mysql / postgres)", dbType)) + return + } + + conn, err := gorm.Open(dialector, &gorm.Config{}) + if err != nil { + logger.Fatal("Database", "连接失败: ", err) + return + } + + sqlDB, err := conn.DB() + if err != nil { + logger.Fatal("Database", "获取底层连接池失败: ", err) + return + } + sqlDB.SetMaxOpenConns(50) + sqlDB.SetMaxIdleConns(10) + sqlDB.SetConnMaxLifetime(30 * time.Minute) + sqlDB.SetConnMaxIdleTime(10 * time.Minute) + + DB = conn + logger.Info("Database", fmt.Sprintf("连接成功 (%s)", dbType)) + }) +} + +func GetDB() *gorm.DB { + if DB == nil { + ConnectDatabase() + } + return DB +} + +// Migrate 自动迁移所有模型,新增模型在此注册 +func Migrate() { + if DB == nil { + logger.Warn("Database", "数据库未连接,跳过迁移") + return + } + + models := []interface{}{ + &HostGroup{}, + } + + for _, model := range models { + if err := DB.AutoMigrate(model); err != nil { + logger.Error("Database", fmt.Sprintf("迁移 %T 失败: %v", model, err)) + } else { + logger.Info("Database", fmt.Sprintf("迁移 %T 完成", model)) + } + } +} diff --git a/restart.sh b/restart.sh index 1cd4bc1..8016021 100644 --- a/restart.sh +++ b/restart.sh @@ -1,5 +1,5 @@ #!/bin/bash -SERVICES="server cli" +SERVICES="server cli scheduler" for svc in $SERVICES; do systemctl restart "$svc" diff --git a/start.sh b/start.sh index 665abe0..3cba17b 100644 --- a/start.sh +++ b/start.sh @@ -1,5 +1,5 @@ #!/bin/bash -SERVICES="server cli" +SERVICES="server cli scheduler" for svc in $SERVICES; do if systemctl is-active --quiet "$svc"; then diff --git a/stop.sh b/stop.sh index 96bc207..fb8c29c 100644 --- a/stop.sh +++ b/stop.sh @@ -1,5 +1,5 @@ #!/bin/bash -SERVICES="server cli" +SERVICES="server cli scheduler" for svc in $SERVICES; do if systemctl is-active --quiet "$svc"; then diff --git a/utils/logger/log.go b/utils/logger/log.go index bd0aded..497ab5e 100644 --- a/utils/logger/log.go +++ b/utils/logger/log.go @@ -70,8 +70,10 @@ func (f *colorFormatter) Format(entry *logrus.Entry) ([]byte, error) { } var ( - instance *logrus.Logger - once sync.Once + instance *logrus.Logger + once sync.Once + cronInstance *logrus.Logger + cronOnce sync.Once ) func GetLogger() *logrus.Logger { @@ -150,6 +152,63 @@ func joinToString(parts ...interface{}) string { return strings.Join(strs, " ") } +// GetCronLogger 返回定时任务专用 logger,写入独立日志文件 +func GetCronLogger() *logrus.Logger { + cronOnce.Do(func() { + cronInstance = logrus.New() + + switch strings.ToLower(os.Getenv("LOG_LEVEL")) { + case "debug": + cronInstance.SetLevel(logrus.DebugLevel) + case "info": + cronInstance.SetLevel(logrus.InfoLevel) + case "warn": + cronInstance.SetLevel(logrus.WarnLevel) + case "error": + cronInstance.SetLevel(logrus.ErrorLevel) + default: + cronInstance.SetLevel(logrus.InfoLevel) + } + + cronInstance.SetFormatter(&colorFormatter{}) + cronInstance.SetReportCaller(true) + + logDir := os.Getenv("LOG_SAVE_PATH") + if logDir == "" { + logDir = "./logs" + } + if err := os.MkdirAll(logDir, 0755); err != nil { + cronInstance.SetOutput(os.Stdout) + return + } + + logFile := logDir + "/cron_" + time.Now().Format("20060102") + ".log" + f, err := os.OpenFile(logFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) + if err != nil { + cronInstance.SetOutput(os.Stdout) + return + } + cronInstance.SetOutput(io.MultiWriter(os.Stdout, f)) + }) + return cronInstance +} + +func CronDebug(title string, content ...interface{}) { + GetCronLogger().WithField("title", title).Debug(joinToString(content...)) +} + +func CronInfo(title string, content ...interface{}) { + GetCronLogger().WithField("title", title).Info(joinToString(content...)) +} + +func CronWarn(title string, content ...interface{}) { + GetCronLogger().WithField("title", title).Warn(joinToString(content...)) +} + +func CronError(title string, content ...interface{}) { + GetCronLogger().WithField("title", title).Error(joinToString(content...)) +} + func Debug(title string, content ...interface{}) { GetLogger().WithField("title", title).Debug(joinToString(content...)) }