后台 Worker 与定时任务系统
大约 4 分钟
架构与流程概览
flowchart LR
A[DB: SysJobModel(任务定义)] -->|update_job| B[注册周期任务 periodic_worker]
B -->|到点触发| C[Worker.perform 执行]
C --> D[SysJobLogModel 记录日志]
E[手动执行 execute_job] --> C
F[enqueue_async 入队] --> G[Processor 并发消费]
G --> C
- 存储:SysJobModel(任务)、SysJobLogModel(日志)
- 调度:update_job → 清空已有周期任务 → 扫库 → 按类型注册到 periodic_worker
- 执行:Worker.perform 处理消息,写入日志与运行次数
- 即时触发:execute_job 或 XxxWorker::enqueue_async / execute_async
快速上手
- 启动消费程序并注册 Worker(默认注册:MailerWorker、LoginInfoWorker、JobWorker、InvokeFunctionWorker、RequestUrlWorker)
- 首次启动或任务变更后调用 update_job() 刷新调度
pub async fn worker_init(self) -> Self {
let p = processor_job().await;
tokio::spawn(async move {
p.run().await;
});
s_sys_job::update_job().await;
self
}
任务管理 API(参考)
- 列表:GET /job/list → SysJobModel::list
- 新增:POST /job/add → SysJobModel::add
- 编辑:POST /job/edit → SysJobModel::edit
- 删除:POST /job/delete → SysJobModel::del
- 手动执行:POST /job/execute → hand_execute_job → execute_job
- Cron 校验:POST /job/validate-cron → 返回 validate 与未来 12 次触发时间(本地时区)
周期调度与手动执行
- 更新调度
pub async fn update_job() {
clear_periodic_worker().await;
let jobs = SysJobModel::all_job().await.unwrap_or_default();
for job in jobs {
worker_execute_job(job).await; // 按 task_type 注册到 periodic_worker
}
}
- 注册周期任务(按类型分发)
pub async fn worker_execute_job(job: JobRes) {
if job.task_type == "geturl" {
let reqarg = RequestUrlMsg { /* url, job_id, task_type, job_name, job_group */ };
periodic_worker(&job.cron_expression, &job.job_name, &job.job_group, reqarg, RequestUrlWorker::class_name()).await;
} else if job.task_type == "invokefunction" {
// 解析 job_params 的 JSON,构造 InvokeFunctionMsg
periodic_worker(&job.cron_expression, &job.job_name, &job.job_group, invokemsg, InvokeFunctionWorker::class_name()).await;
}
}
- 手动执行一次(不等到 Cron 到点)
pub async fn execute_job(job: JobRes) {
if job.task_type == "geturl" {
let _ = RequestUrlWorker::enqueue_async(reqarg).await;
} else if job.task_type == "invokefunction" {
let _ = InvokeFunctionWorker::enqueue_async(invokemsg).await;
}
}
内置任务类型与 Worker
| Worker | 队列 | 用途 | 关键参数 | 日志 |
|---|---|---|---|---|
| RequestUrlWorker | default | 定时发起 HTTP GET(Webhook/心跳) | RequestUrlMsg { url, job_id, ... };job_params 为完整 URL | 记录响应或错误,最长 2048 字符,状态 Sucess/Failed;run_count 自增 |
| InvokeFunctionWorker | default | 定时调用内部函数映射 | InvokeFunctionMsg { callfun, parmets };job_params 为 JSON | 保存函数返回文本/错误,状态 Sucess/Failed;run_count 自增 |
| MailerWorker | mailer | 发送邮件(文本/HTML/模板) | Email { from,to,subject,text,html };mail_template(dir, Args) | 发送结果(通常无需记录到任务日志表) |
| LoginInfoWorker | logininfo | 登录日志后置处理 | LoginInfoMsg { ipaddr, info_id } | 视实现需求(地理解析/风控/审计) |
| JobWorker | default | 任务系统自身辅助 | - | - |
典型用法示例
- 新增 geturl 任务:每分钟请求 URL
{
"job_name": "PingGoogle",
"job_group": "monitor",
"task_type": "geturl",
"cron_expression": "0 * * * * *",
"job_params": "https://www.google.com"
}
- 新增 invokefunction 任务:每 5 分钟调用内部函数
{
"job_name": "RefreshApiCache",
"job_group": "ops",
"task_type": "invokefunction",
"cron_expression": "0 */5 * * * *",
"job_params": "{\"callfun\":\"updateapi\",\"parmets\":\"force\"}"
}
- 登录日志直接异步执行(不入队)
let rlogin_info_add = SysLoginInfoModel::add(login_add).await;
if let Ok(login_info_add) = rlogin_info_add {
let info_msg = LoginInfoMsg { ipaddr, info_id: login_info_add.info_id };
let _ = LoginInfoWorker::execute_async(info_msg).await; // 立即处理
}
- 发送邮件(入队)
use crate::worker::mailer::{Email, MailerWorker};
let email = Email {
from: None,
to: "user@example.com".into(),
reply_to: None,
subject: "Hello".into(),
text: "纯文本内容".into(),
html: "<b>HTML 内容</b>".into(),
};
let _ = MailerWorker::enqueue_async(email).await;
- 使用模板发送邮件
use crate::worker::mailer::{Args, mail_template};
let _ = mail_template(
"welcome".into(),
Args {
from: None,
to: "user@example.com".into(),
reply_to: None,
locals: serde_json::json!({ "username": "Alice", "link": "https://example.com" }),
}
).await;
队列与 Processor
- 默认队列常量:
pub const DEFAULT_QUEUES: &[&str] = &["default", "mailer", "logininfo"];
- Processor 注册与扩展:
pub async fn processor_job() -> Processor {
let mut p = init_process().await; // 合并 APPCOFIG.workers.queues 并设置并发
p.register(MailerWorker::new());
p.register(LoginInfoWorker::new());
p.register(JobWorker::new());
p.register(InvokeFunctionWorker::new());
p.register(RequestUrlWorker::new());
p
}
如需新增自定义 Worker,实现 AppWorker<T> + Worker<T>,在 processor_job 中注册并指定队列即可。
日志与统计
- 每次执行前:SysJobModel::updata_run_count(job_id) 更新运行次数
- 统一写入日志:SysJobLogModel::add(JobLogAdd)
- 字段:job_id、run_count、status(Success/Failed)、job_message(最多 2048 字符)
- 超长截断策略:保留前 2000 字符并追加 “...数据太长,不记录完整内容。”
建议补充可观测性字段:耗时、HTTP 状态码、错误码、重试次数等。
安全与稳定性建议
- invokefunction:使用明确的函数映射白名单(已实现),拒绝任意反射/动态执行
- 参数校验:严格校验 job_params 长度与格式,JSON 解析失败要记录并告警
- HTTP 调用:为 RequestUrlWorker 增加超时与重试策略,限制最大响应体大小
- 时区一致性:当前使用 Local,如需统一 UTC,请在 validate_cron 与 periodic_worker 内部统一处理
- 幂等性:内部函数应尽量幂等,防止重试/并发导致副作用
- 错误处理:谨慎使用 unwrap_or_default,建议显式上报异常并打点
常见问题
- Cron 到点未触发?
- 是否已执行 update_job() 刷新调度
- 表达式是否为 6 段(含秒位)
- periodic_worker 注册是否成功(查看日志)
- 执行报错 job_params error?
- 该类型任务未填写必需参数
- JSON 解析失败(serde_json::from_str)?
- invokefunction 的 job_params 必须是形如 {"callfun":"...","parmets":"..."} 的 JSON 字符串