
队列
大约 4 分钟
1. 核心概念与数据结构
任务(Job)
定义在src/worker/common/job.rs
,是可执行任务的最小单元,包含以下关键属性:
queue
:任务所属队列名称class
:任务处理器类名(关联执行逻辑)args
:任务参数(JSON格式)retry
:是否允许重试- 生命周期字段:
created_at
(创建时间)、enqueued_at
(入队时间)、failed_at
(失败时间)等 - 重试相关:
retry_count
(重试次数)、error_message
(错误信息)
// 任务结构体核心定义
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Job {
pub queue: String,
pub args: JsonValue,
pub retry: bool,
pub class: String,
pub jid: i64, // 任务唯一ID
pub created_at: f64,
// 其他生命周期字段...
}
队列(Queue)
用于分类管理任务,支持自定义队列名称,默认队列包括:
default
:默认任务队列mailer
:邮件相关任务队列logininfo
:登录信息相关任务队列
队列配置通过src/config/appconfig.rs
的Workers
结构体管理,支持通过配置文件扩展自定义队列。
2. 任务队列核心机制
任务入队与存储
- 即时任务:直接存入Redis的列表(
queue:{队列名}
),例如queue:mailer
存储邮件任务。 - 延时任务:存入Redis的有序集合(
schedule
),以“执行时间戳”为分数,到期后由调度器迁移至对应队列。 - 重试任务:失败后存入Redis的有序集合(
retry
),根据重试策略计算下次执行时间。
入队逻辑通过UnitOfWork
(src/worker/common/unit_of_work.rs
)实现,核心方法包括:
enqueue()
:将任务加入即时队列schedule(duration)
:将任务加入延时队列(指定延迟时间)reenqueue()
:将失败任务加入重试队列
任务调度与执行
处理器(Processor):
- 定义在
src/worker/common/processor.rs
,是任务队列的核心调度组件。 - 启动时注册所有任务处理器(如
MailerWorker
、RequestUrlWorker
等),并关联对应的队列。 - 根据配置的
num_workers
启动多个工作线程(Worker)并行处理任务。
- 定义在
任务拉取与执行流程:
- 工作线程通过
fetch()
方法从Redis队列中拉取任务(使用brpop
阻塞命令,避免空轮询)。 - 任务拉取后,通过
process_one()
方法调用对应处理器的perform()
方法执行任务逻辑。 - 执行完成后更新任务状态,失败时根据
retry
配置决定是否加入重试队列。
- 工作线程通过
定时检查机制:
- 每5秒检查
schedule
和retry
有序集合,将到期任务(当前时间 ≥ 执行时间)迁移至对应队列。 - 代码实现见
Processor::run()
中的调度任务:// 定期检查延时任务和重试任务 loop { select! { _ = tokio::time::sleep(Duration::from_secs(5)) => {} _ = cancellation_token.cancelled() => break, } sched.enqueue_jobs(chrono::Utc::now(), &["retry".to_string(), "schedule".to_string()]).await?; }
- 每5秒检查
3. 任务处理器与注册
处理器类型
系统通过Worker
trait(src/worker/common/worker.rs
)定义任务处理器接口,每个处理器对应一种任务类型,核心实现包括:
MailerWorker
:处理邮件发送任务(队列mailer
)RequestUrlWorker
:处理URL请求任务(队列default
)InvokeFunctionWorker
:处理函数调用任务(队列default
)LoginInfoWorker
:处理登录信息相关任务(队列default
)JobWorker
:处理定时任务配置更新(队列default
)
注册逻辑
处理器通过Processor::register()
方法注册,关联任务类名与执行逻辑:
// 注册所有处理器(src/worker/processor_manager.rs)
pub async fn processor_job() -> Processor {
let mut p = init_process().await;
p.register(MailerWorker::new());
p.register(LoginInfoWorker::new());
p.register(JobWorker::new());
p.register(InvokeFunctionWorker::new());
p.register(RequestUrlWorker::new());
p
}
4. 关键特性
重试机制:
- 任务失败后,根据
retry
配置自动加入重试队列。 - 重试延迟时间按指数退避策略计算:
count^4 + 15 + 随机值
(count
为重试次数)。
- 任务失败后,根据
任务去重:
- 支持通过
unique_for
配置任务唯一执行窗口(如10秒内同一任务仅执行一次)。 - 通过SHA-256哈希参数生成唯一键,结合Redis的
setnx
实现去重。
- 支持通过
动态队列配置:
- 默认队列与配置文件中的自定义队列合并,避免重复(
get_queues
函数)。 - 可通过
WorkerOpts
为不同任务指定专属队列(如邮件任务使用mailer
队列)。
- 默认队列与配置文件中的自定义队列合并,避免重复(
5. 核心代码模块
模块路径 | 功能 |
---|---|
src/worker/common/job.rs | 定义Job 结构体及序列化逻辑。 |
src/worker/common/processor.rs | 实现任务调度核心逻辑,管理工作线程与队列。 |
src/worker/common/unit_of_work.rs | 处理任务入队、调度、重试等操作。 |
src/worker/common/worker.rs | 定义Worker trait及任务执行接口。 |
src/worker/processor_manager.rs | 初始化处理器并注册所有任务处理器。 |
综上,该任务队列系统基于Redis实现高效的分布式任务调度,支持多队列并行处理、延时任务、自动重试和任务去重,通过模块化的处理器设计可灵活扩展新任务类型,适用于高并发场景下的异步任务处理。