Skip to content

Latest commit

 

History

History
225 lines (167 loc) · 6.58 KB

File metadata and controls

225 lines (167 loc) · 6.58 KB

RustQueue

An async job processing engine built in Rust. Submit jobs via HTTP, a concurrent worker picks them up, processes them, retries on failure, and streams live status updates to any connected client over Server-Sent Events.

Built to understand how job queues like BullMQ and Sidekiq actually work under the hood -- not just use them as a black box.

Stack: Axum, Tokio, sqlx, PostgreSQL, lettre (AWS SES)

Status: Active development. Core is working and stable. Open to contributions -- see CONTRIBUTING.md.


What it does

  • Accepts jobs via a REST API (currently supports email job type)
  • Persists jobs to PostgreSQL with full status tracking
  • Claims jobs safely using SELECT FOR UPDATE SKIP LOCKED -- no double-processing, even with multiple concurrent workers
  • Processes up to 10 jobs concurrently via a Semaphore-capped worker pool
  • Retries failed jobs with exponential backoff (1s, 5s, 25s...) up to max_attempts
  • Recovers jobs orphaned by crashed workers via a stale-job reaper (runs every 5 minutes)
  • Wakes the worker instantly on new jobs via Postgres LISTEN/NOTIFY -- no polling delay
  • Streams live status updates to connected clients via Server-Sent Events

Architecture

HTTP clients
    |
    |  POST /jobs      GET /jobs      GET /jobs/:id      GET /events
    v
  Axum router  ---- Arc<AppState> (PgPool + broadcast::Sender) ----+
                                                                    |
              +-----------------------------------------------------+
              |  Worker loop (tokio::spawn)                         |
              |    wakes on LISTEN/NOTIFY or 2s fallback            |
              |    SELECT FOR UPDATE SKIP LOCKED                     |
              |    Semaphore-capped concurrency (max 10)             |
              |    executor dispatch via HandlerRegistry             |
              |    exponential backoff retries                       |
              |                                                      |
              |  Stale-job reaper (tokio::spawn)                    |
              |    runs every 5 minutes                             |
              |    reclaims jobs stuck in "processing" > 5 min      |
              +-----------------------------------------------------+
              |
         broadcast::Sender<String>  ->  SSE /events stream
              |
         PostgreSQL  (jobs table)

Jobs table schema

column type notes
id UUID primary key, auto-generated
job_type VARCHAR "email" -- extensible via HandlerRegistry
payload JSONB job-specific data
status VARCHAR pending -> processing -> done / failed
attempts INT how many times tried (including successes)
max_attempts INT default 3
last_error TEXT nullable, last failure message
scheduled_at TIMESTAMPTZ run not before this time
processing_started_at TIMESTAMPTZ set on claim, cleared on exit -- used by stale-job reaper
created_at TIMESTAMPTZ
updated_at TIMESTAMPTZ

API

method route purpose
POST /jobs Submit a new job
GET /jobs List jobs (supports ?limit=N and ?status=pending/done/failed/processing)
GET /jobs/:id Get a specific job by ID
GET /events SSE stream of live job status updates

Submit a job

POST /jobs
Content-Type: application/json

{
  "job_type": "email",
  "payload": {
    "to": "someone@example.com"
  }
}

Filter jobs

GET /jobs?status=failed&limit=10

Getting started

Prerequisites

  • Rust (stable)
  • PostgreSQL
  • An AWS SES account (with at least one verified email address)
  • sqlx-cli: cargo install sqlx-cli --no-default-features --features postgres

Setup

git clone https://github.com/Ekojoecovenant/rustqueue.git
cd rustqueue

cp .env.example .env
# fill in your credentials

sqlx migrate run

cargo run

The server starts on the port defined in your .env (default 3000).

Environment variables

DATABASE_URL=postgresql://user:password@localhost:5432/rustqueue
PORT=3000
SMTP_USERNAME=your_ses_smtp_username
SMTP_PASSWORD=your_ses_smtp_password
SMTP_HOST=email-smtp.eu-north-1.amazonaws.com
SMTP_PORT=587
FROM_EMAIL=you@yourdomain.com

Note: AWS SES sandbox accounts can only send to verified email addresses. You will need to verify the to address in your SES console, or request production access.


Adding a new job type

  1. Create src/executor/your_handler.rs implementing the JobHandler trait:

    pub struct YourHandler { /* config fields */ }
    
    #[async_trait]
    impl JobHandler for YourHandler {
        async fn execute(&self, payload: &Value) -> Result<()> {
            // your logic here
            Ok(())
        }
    }
  2. Register it in HandlerRegistry::new() in src/executor/mod.rs:

    handlers.insert("your_type".to_string(), Arc::new(YourHandler::new(...)?));
  3. Submit jobs with "job_type": "your_type" and a matching payload.

The worker loop, retry logic, SSE updates, and stale-job recovery all work automatically for any registered job type.


Project structure

rustqueue/
+-- src/
|   +-- main.rs              # startup, router, shared state wiring
|   +-- config.rs            # env config
|   +-- db.rs                # sqlx pool setup
|   +-- models/
|   |   +-- job.rs           # Job struct, CreateJobRequest
|   +-- routes/
|   |   +-- jobs.rs          # POST /jobs, GET /jobs, GET /jobs/:id
|   |   +-- events.rs        # GET /events (SSE)
|   +-- worker/
|   |   +-- mod.rs           # worker loop, stale-job reaper
|   +-- executor/
|       +-- mod.rs           # JobHandler trait, HandlerRegistry
|       +-- email.rs         # email job handler (AWS SES via lettre)
+-- migrations/              # sqlx migration files
+-- .env.example

Roadmap

  • Telegram job handler (second executor, proving extensibility)
  • Scheduled jobs via API (run_at field on job submission)
  • Typed JobStatus enum replacing raw strings
  • Bulk email sends (one job, multiple recipients)
  • Cursor-based pagination on GET /jobs
  • Graceful shutdown (let in-flight jobs finish before exit)
  • Optional SMTP config (fail at dispatch time, not startup)
  • Explore Cloudflare Email Service as alternative executor

Contributing

See CONTRIBUTING.md.


Author

Ekojoe Covenant Lemom -- Backend Engineer https://ekojoe.name.ng | https://github.com/Ekojoecovenant


License

MIT