
Queue
1. Core Concepts and Data Structures
Job
Defined in src/worker/common/job.rs
, it is the smallest unit of executable task, containing the following key attributes:
queue
: The queue name to which the task belongsclass
: The task processor class name (associated execution logic)args
: Task parameters (JSON format)retry
: Whether to allow retries- Lifecycle fields:
created_at
(creation time),enqueued_at
(enqueue time),failed_at
(failure time), etc. - Retry related:
retry_count
(retry count),error_message
(error information)
// Core definition of the Job structure
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Job {
pub queue: String,
pub args: JsonValue,
pub retry: bool,
pub class: String,
pub jid: i64, // Unique task ID
pub created_at: f64,
// Other lifecycle fields...
}
Queue
Used to categorize and manage tasks, supports custom queue names, default queues include:
default
: Default task queuemailer
: Email-related task queuelogininfo
: Login information-related task queue
Queue configuration is managed through the Workers
structure in src/config/appconfig.rs
, supporting extension of custom queues through configuration files.
2. Task Queue Core Mechanisms
Task Enqueuing and Storage
- Immediate Tasks: Directly stored in Redis lists (
queue:{queue_name}
), for example,queue:mailer
stores email tasks. - Delayed Tasks: Stored in Redis sorted sets (
schedule
), with "execution timestamp" as the score, migrated to the corresponding queue by the scheduler when due. - Retry Tasks: After failure, stored in Redis sorted sets (
retry
), with the next execution time calculated based on the retry strategy.
The enqueuing logic is implemented through UnitOfWork
(src/worker/common/unit_of_work.rs
), with core methods including:
enqueue()
: Add a task to the immediate queueschedule(duration)
: Add a task to the delayed queue (specifying delay time)reenqueue()
: Add a failed task to the retry queue
Task Scheduling and Execution
Processor:
- Defined in
src/worker/common/processor.rs
, it is the core scheduling component of the task queue. - Registers all task processors (such as
MailerWorker
,RequestUrlWorker
, etc.) at startup and associates them with the corresponding queues. - Starts multiple worker threads in parallel to process tasks based on the configured
num_workers
.
- Defined in
Task Fetching and Execution Process:
- Worker threads fetch tasks from Redis queues using the
fetch()
method (using thebrpop
blocking command to avoid empty polling). - After a task is fetched, the
perform()
method of the corresponding processor is called through theprocess_one()
method to execute the task logic. - After execution, the task status is updated, and if it fails, it is added to the retry queue based on the
retry
configuration.
- Worker threads fetch tasks from Redis queues using the
Periodic Check Mechanism:
- Every 5 seconds, the
schedule
andretry
sorted sets are checked, and due tasks (current time ≥ execution time) are migrated to the corresponding queues. - The implementation can be seen in the scheduling task in
Processor::run()
:// Periodically check delayed tasks and retry tasks loop { select! { _ = tokio::time::sleep(Duration::from_secs(5)) => {} _ = cancellation_token.cancelled() => break, } sched.enqueue_jobs(chrono::Utc::now(), &["retry".to_string(), "schedule".to_string()]).await?; }
- Every 5 seconds, the
3. Task Processors and Registration
Processor Types
The system defines the task processor interface through the Worker
trait (src/worker/common/worker.rs
), with each processor corresponding to a type of task. Core implementations include:
MailerWorker
: Processes email sending tasks (queuemailer
)RequestUrlWorker
: Processes URL request tasks (queuedefault
)InvokeFunctionWorker
: Processes function call tasks (queuedefault
)LoginInfoWorker
: Processes login information-related tasks (queuedefault
)JobWorker
: Processes scheduled task configuration updates (queuedefault
)
Registration Logic
Processors are registered through the Processor::register()
method, associating task class names with execution logic:
// Register all processors (src/worker/processor_manager.rs)
pub async fn processor_job() -> Processor {
let mut p = init_process().await;
p.register(MailerWorker::new());
p.register(LoginInfoWorker::new());
p.register(JobWorker::new());
p.register(InvokeFunctionWorker::new());
p.register(RequestUrlWorker::new());
p
}
4. Key Features
Retry Mechanism:
- After a task fails, it is automatically added to the retry queue based on the
retry
configuration. - Retry delay time is calculated using an exponential backoff strategy:
count^4 + 15 + random value
(count
is the retry count).
- After a task fails, it is automatically added to the retry queue based on the
Task Deduplication:
- Supports configuring a unique execution window for tasks through
unique_for
(e.g., executing the same task only once within 10 seconds). - Generates a unique key by SHA-256 hashing the parameters and implements deduplication using Redis's
setnx
.
- Supports configuring a unique execution window for tasks through
Dynamic Queue Configuration:
- Default queues are merged with custom queues from the configuration file, avoiding duplication (the
get_queues
function). - Different tasks can be assigned dedicated queues through
WorkerOpts
(e.g., email tasks use themailer
queue).
- Default queues are merged with custom queues from the configuration file, avoiding duplication (the
5. Core Code Modules
Module Path | Functionality |
---|---|
src/worker/common/job.rs | Defines the Job structure and serialization logic. |
src/worker/common/processor.rs | Implements core task scheduling logic, manages worker threads and queues. |
src/worker/common/unit_of_work.rs | Handles task enqueuing, scheduling, retrying, and other operations. |
src/worker/common/worker.rs | Defines the Worker trait and task execution interface. |
src/worker/processor_manager.rs | Initializes processors and registers all task processors. |
In summary, this task queue system implements efficient distributed task scheduling based on Redis, supporting multi-queue parallel processing, delayed tasks, automatic retries, and task deduplication. Through modular processor design, it can flexibly extend new task types, suitable for asynchronous task processing in high-concurrency scenarios.