flashQ Documentation
flashQ is a high-performance job queue built with Rust. It provides a BullMQ-compatible API without requiring Redis, making it perfect for AI workloads, LLM pipelines, and high-throughput applications.
🚀 Quick Start
Get up and running in under 5 minutes with Docker and TypeScript.
🤖 AI Workloads
Learn how to build LLM pipelines, RAG workflows, and batch inference.
📦 Migration
Already using BullMQ? Migrate in minutes with the same API.
📚 API Reference
Complete reference for Queue, Worker, and all job options.
Why flashQ?
| Feature | flashQ | BullMQ + Redis |
|---|---|---|
| External dependencies | None | Redis server required |
| Throughput | 300K jobs/sec | ~30K jobs/sec |
| Max payload | 10 MB | ~5 MB recommended |
| API compatibility | Same BullMQ-style API | |
Installation
Server
Start the flashQ server using Docker (recommended) or download the binary.
Docker (Recommended)
# Pull multi-arch image (amd64 + arm64)
docker pull ghcr.io/egeominotti/flashq:latest
# Run with dashboard enabled
docker run -d --name flashq \
-p 6789:6789 \
-p 6790:6790 \
-e HTTP=1 \
ghcr.io/egeominotti/flashq:latest
Binary
# Linux x86_64
curl -L https://github.com/egeominotti/flashq/releases/latest/download/flashq-linux-x86_64.tar.gz | tar xz
./flashq-server
# macOS Apple Silicon
curl -L https://github.com/egeominotti/flashq/releases/latest/download/flashq-macos-arm64.tar.gz | tar xz
./flashq-server
SDK
Install the TypeScript SDK in your project:
# Using bun (recommended)
bun add flashq
# Using npm
npm install flashq
# Using yarn
yarn add flashq
flashQ includes built-in TypeScript definitions. No additional @types package needed.
Quick Start
Get your first job queue running in under 5 minutes.
Start the server
docker run -d -p 6789:6789 ghcr.io/egeominotti/flashq:latest
Install the SDK
bun add flashq
Create a queue and add jobs
import { Queue } from 'flashq';
const queue = new Queue('emails');
// Add a job
await queue.add('send-welcome', {
to: 'user@example.com',
subject: 'Welcome!'
});
console.log('Job added!');
Process jobs with a worker
import { Worker } from 'flashq';
const worker = new Worker('emails', async (job) => {
console.log(`Sending email to ${job.data.to}`);
// Your email sending logic here
await sendEmail(job.data);
return { sent: true };
});
worker.on('completed', (job, result) => {
console.log(`Job ${job.id} completed`);
});
worker.on('failed', (job, error) => {
console.error(`Job ${job.id} failed: ${error.message}`);
});
Your job queue is now running. The worker will automatically process jobs as they're added to the queue.
Core Concepts
Queues
A Queue is a named container for jobs. Jobs in a queue are processed in priority order (highest first), with FIFO ordering for jobs of the same priority.
const emailQueue = new Queue('emails');
const reportQueue = new Queue('reports');
Jobs
A Job is a unit of work with a name, data payload, and optional configuration. Jobs progress through states: waiting → active → completed or failed.
const job = await queue.add('process-image', {
imageUrl: 'https://example.com/image.jpg',
filters: ['resize', 'compress']
}, {
priority: 10,
attempts: 3
});
Workers
A Worker processes jobs from a queue. Workers can run concurrently and automatically handle job acknowledgment, retries, and error handling.
const worker = new Worker('emails', processor, {
concurrency: 10 // Process 10 jobs in parallel
});
Job States
| State | Description |
|---|---|
waiting |
Job is queued and ready to be processed |
delayed |
Job is scheduled to run at a future time |
active |
Job is currently being processed by a worker |
completed |
Job finished successfully |
failed |
Job failed and exhausted all retry attempts (in DLQ) |
waiting-children |
Job is waiting for dependent jobs to complete |
Queue API
The Queue class provides methods for adding jobs and managing queue state.
Constructor
const queue = new Queue(name, options?);
| Option | Type | Default | Description |
|---|---|---|---|
host |
string | 'localhost' |
Server hostname |
port |
number | 6789 |
Server port |
token |
string | - | Authentication token |
Methods
add(name, data, opts?)
Add a single job to the queue.
const job = await queue.add('send-email', {
to: 'user@example.com'
}, {
priority: 10,
delay: 5000,
attempts: 3
});
console.log(job.id); // Unique job ID
addBulk(jobs)
Add multiple jobs in a single batch operation. More efficient than calling add() multiple times.
const jobs = await queue.addBulk([
{ name: 'send', data: { to: 'a@test.com' } },
{ name: 'send', data: { to: 'b@test.com' }, opts: { priority: 10 } },
{ name: 'send', data: { to: 'c@test.com' }, opts: { delay: 5000 } },
]);
getJob(jobId)
Get a job by its ID, including current state and data.
const job = await queue.getJob(123);
console.log(job.state); // 'completed'
console.log(job.result); // { sent: true }
finished(jobId, timeout?) New
Wait for a job to complete and return its result. Perfect for synchronous workflows.
const job = await queue.add('generate', { prompt });
const result = await queue.finished(job.id, 30000); // 30s timeout
console.log(result); // Worker's return value
getJobCounts()
Get counts of jobs in each state.
const counts = await queue.getJobCounts();
// { waiting: 10, active: 5, completed: 100, failed: 2, delayed: 3 }
pause() / resume()
Pause or resume job processing on the queue.
await queue.pause(); // Workers stop pulling jobs
await queue.resume(); // Workers resume pulling jobs
drain()
Remove all waiting jobs from the queue.
await queue.drain(); // Clear waiting jobs only
obliterate()
Remove all data associated with the queue (jobs, DLQ, settings).
await queue.obliterate(); // Nuclear option - removes everything
obliterate() is irreversible. All jobs and queue data will be permanently deleted.
Job Options
Configure job behavior with these options when calling queue.add().
| Option | Type | Default | Description |
|---|---|---|---|
priority |
number | 0 |
Higher priority jobs are processed first |
delay |
number | 0 |
Delay in milliseconds before job becomes available |
attempts |
number | 1 |
Number of retry attempts on failure |
backoff |
number | object | - | Backoff strategy for retries |
timeout |
number | - | Job processing timeout in milliseconds |
jobId |
string | - | Custom job ID for idempotency |
depends_on |
number[] | - | Job IDs that must complete before this job runs |
ttl |
number | - | Time-to-live: auto-fail if not processed in time |
Priority
Jobs with higher priority values are processed first.
await queue.add('low', data, { priority: 1 });
await queue.add('high', data, { priority: 100 }); // Processed first
await queue.add('urgent', data, { priority: 1000 }); // Processed before 'high'
Delay
Schedule a job to run after a specified delay.
// Run after 5 seconds
await queue.add('reminder', data, { delay: 5000 });
// Run after 1 hour
await queue.add('daily-report', data, { delay: 60 * 60 * 1000 });
Backoff
Configure retry delay strategy.
// Fixed delay: retry after 5s each time
await queue.add('job', data, {
attempts: 3,
backoff: 5000
});
// Exponential backoff: 1s, 2s, 4s, 8s...
await queue.add('job', data, {
attempts: 5,
backoff: {
type: 'exponential',
delay: 1000
}
});
Custom Job ID (Idempotency)
Use jobId to prevent duplicate jobs. If a job with the same ID already exists, the existing job is returned.
// Only one job per order
await queue.add('process-order', orderData, {
jobId: `order-${orderId}`
});
// Second call with same jobId returns existing job
await queue.add('process-order', orderData, {
jobId: `order-${orderId}`
}); // No duplicate created
Job Dependencies
Create workflows where jobs wait for other jobs to complete before running.
// Step 1: Fetch data
const fetchJob = await queue.add('fetch', { url });
// Step 2: Process (waits for fetch to complete)
const processJob = await queue.add('process', { data }, {
depends_on: [fetchJob.id]
});
// Step 3: Save (waits for process to complete)
const saveJob = await queue.add('save', { destination }, {
depends_on: [processJob.id]
});
// Wait for the final result
const result = await queue.finished(saveJob.id);
Multiple Dependencies
A job can depend on multiple jobs. It will only run when ALL dependencies have completed.
// Fan-out: multiple parallel jobs
const job1 = await queue.add('task1', data1);
const job2 = await queue.add('task2', data2);
const job3 = await queue.add('task3', data3);
// Fan-in: aggregate results (waits for all 3)
const aggregateJob = await queue.add('aggregate', {}, {
depends_on: [job1.id, job2.id, job3.id]
});
Dependencies are perfect for RAG pipelines: embed → search → generate. See RAG Workflows for a complete example.
Worker API
Workers process jobs from a queue. They automatically handle job acknowledgment, retries, and error handling.
Constructor
const worker = new Worker(queueName, processor, options?);
Options
| Option | Type | Default | Description |
|---|---|---|---|
concurrency |
number | 1 |
Number of jobs to process in parallel |
autorun |
boolean | true |
Start processing immediately |
host |
string | 'localhost' |
Server hostname |
port |
number | 6789 |
Server port |
Processor Function
The processor function receives a job object and should return a result (or throw an error).
const worker = new Worker('emails', async (job) => {
// Access job properties
console.log(job.id); // Unique job ID
console.log(job.name); // Job name
console.log(job.data); // Job payload
console.log(job.attempts); // Current attempt number
// Do work...
const result = await processJob(job.data);
// Return result (stored with the job)
return result;
});
Graceful Shutdown
// Close worker and wait for active jobs to finish
await worker.close();
// Handle process termination
process.on('SIGTERM', async () => {
await worker.close();
process.exit(0);
});
Events
Workers emit events for job lifecycle changes.
| Event | Arguments | Description |
|---|---|---|
completed |
(job, result) |
Job completed successfully |
failed |
(job, error) |
Job failed (after all retries) |
error |
(error) |
Worker-level error |
active |
(job) |
Job started processing |
progress |
(job, progress) |
Job progress updated |
worker.on('completed', (job, result) => {
console.log(`✅ Job ${job.id} completed`, result);
});
worker.on('failed', (job, error) => {
console.error(`❌ Job ${job.id} failed: ${error.message}`);
});
worker.on('active', (job) => {
console.log(`🔄 Job ${job.id} started`);
});
Rate Limiting
Control job throughput to avoid overwhelming external APIs or services.
import { FlashQ } from 'flashq';
const client = new FlashQ();
// Limit to 100 jobs per second
await client.setRateLimit('openai-calls', 100);
// Limit to 10 jobs per second (for expensive API)
await client.setRateLimit('anthropic-calls', 10);
// Remove rate limit
await client.clearRateLimit('openai-calls');
Concurrency Limiting
Limit how many jobs can be processed simultaneously across all workers.
// Max 5 concurrent jobs processing
await client.setConcurrency('heavy-tasks', 5);
// Remove concurrency limit
await client.clearConcurrency('heavy-tasks');
Rate limiting is essential for controlling LLM API costs. Set limits based on your API tier and budget.
Retries & Backoff
Configure automatic retries with exponential backoff for failed jobs.
Basic Retries
// Retry up to 3 times on failure
await queue.add('send-email', emailData, {
attempts: 3
});
Fixed Delay Backoff
Wait a fixed time between retry attempts.
// Retry after 5 seconds each time
await queue.add('api-call', data, {
attempts: 5,
backoff: 5000 // 5 seconds
});
Exponential Backoff
Increase delay exponentially: 1s, 2s, 4s, 8s... Perfect for rate-limited APIs.
// Exponential: base * 2^attempt
await queue.add('openai-request', data, {
attempts: 5,
backoff: {
type: 'exponential',
delay: 1000 // base delay
}
});
// Delays: 1s, 2s, 4s, 8s, 16s
Retry Flow
| Attempt | Exponential (1s base) | Fixed (5s) |
|---|---|---|
| 1 | 1 second | 5 seconds |
| 2 | 2 seconds | 5 seconds |
| 3 | 4 seconds | 5 seconds |
| 4 | 8 seconds | 5 seconds |
| 5 | 16 seconds | 5 seconds |
After exhausting all attempts, failed jobs are moved to the Dead Letter Queue (DLQ). See Dead Letter Queue for handling failed jobs.
AI Workloads
flashQ is optimized for AI/ML workloads with features designed for LLM pipelines, RAG systems, and batch inference.
🔗 Job Dependencies
Chain jobs for multi-step AI workflows. Perfect for RAG: embed → search → generate.
⏱️ Rate Limiting
Control API costs with per-queue rate limits. Never exceed your OpenAI/Anthropic quota.
📦 Large Payloads
10MB payload support for embeddings, images, and large context windows.
🔄 Smart Retries
Automatic retries with exponential backoff. Handle API rate limits gracefully.
RAG Workflows
Build Retrieval-Augmented Generation pipelines with job dependencies.
import { Queue, Worker } from 'flashq';
const rag = new Queue('rag-pipeline');
// Create RAG pipeline
async function askQuestion(question: string) {
// Step 1: Embed the question
const embedJob = await rag.add('embed', { text: question });
// Step 2: Search vector DB (waits for embedding)
const searchJob = await rag.add('search', { query: question }, {
depends_on: [embedJob.id]
});
// Step 3: Generate answer (waits for search)
const generateJob = await rag.add('generate', { question }, {
depends_on: [searchJob.id],
priority: 10
});
// Wait for result
return rag.finished(generateJob.id);
}
// Workers for each step
new Worker('rag-pipeline', async (job) => {
switch (job.name) {
case 'embed':
return await openai.embeddings.create({
model: 'text-embedding-3-small',
input: job.data.text
});
case 'search':
return await vectorDb.search(job.data.query, { limit: 5 });
case 'generate':
return await openai.chat.completions.create({
model: 'gpt-4',
messages: [{ role: 'user', content: job.data.question }]
});
}
}, { concurrency: 10 });
// Usage
const answer = await askQuestion('What is flashQ?');
console.log(answer);
LLM Pipelines
Build multi-step LLM workflows with job dependencies, progress tracking, and automatic retries.
import { Queue, Worker } from 'flashq';
const llm = new Queue('llm-pipeline');
// Multi-step summarization pipeline
async function summarizeDocument(doc: string) {
// Step 1: Chunk the document
const chunkJob = await llm.add('chunk', { text: doc });
// Step 2: Summarize each chunk (parallel)
const summarizeJob = await llm.add('summarize-chunks', {}, {
depends_on: [chunkJob.id]
});
// Step 3: Combine summaries
const combineJob = await llm.add('combine', {}, {
depends_on: [summarizeJob.id],
priority: 10
});
return llm.finished(combineJob.id, 60000);
}
// Worker with progress tracking
new Worker('llm-pipeline', async (job) => {
if (job.name === 'summarize-chunks') {
const chunks = job.data.chunks;
const summaries = [];
for (let i = 0; i < chunks.length; i++) {
const summary = await openai.chat.completions.create({
model: 'gpt-4',
messages: [{ role: 'user', content: `Summarize: ${chunks[i]}` }]
});
summaries.push(summary);
// Update progress
await job.updateProgress(((i + 1) / chunks.length) * 100);
}
return { summaries };
}
}, {
concurrency: 5
});
Pipeline Patterns
Sequential Chain
A → B → C. Each step waits for the previous to complete. Perfect for multi-turn conversations.
Fan-Out / Fan-In
Split work into parallel chunks, then aggregate. Ideal for document processing.
Conditional Flow
Branch based on intermediate results. Use for AI classification and routing.
Retry with Fallback
Try GPT-4, fallback to GPT-3.5 on failure. Automatic with retry configuration.
Batch Inference
Process large datasets efficiently with batch operations and progress tracking.
import { Queue, Worker } from 'flashq';
const batch = new Queue('batch-inference');
// Submit batch job for embeddings
async function embedDocuments(documents: string[]) {
// Split into chunks of 100
const chunkSize = 100;
const jobs = [];
for (let i = 0; i < documents.length; i += chunkSize) {
const chunk = documents.slice(i, i + chunkSize);
const job = await batch.add('embed', {
documents: chunk,
batchIndex: i / chunkSize
}, {
priority: 5,
attempts: 3,
backoff: { type: 'exponential', delay: 2000 }
});
jobs.push(job);
}
// Wait for all jobs
return Promise.all(jobs.map(j => batch.finished(j.id)));
}
// Worker optimized for batch processing
new Worker('batch-inference', async (job) => {
const { documents } = job.data;
// Use OpenAI batch embedding API
const response = await openai.embeddings.create({
model: 'text-embedding-3-small',
input: documents
});
return {
embeddings: response.data.map(d => d.embedding),
model: response.model,
usage: response.usage
};
}, {
concurrency: 10 // Process 10 batches in parallel
});
// Usage: Process 10,000 documents
const results = await embedDocuments(myDocuments);
console.log(`Processed ${results.length} batches`);
flashQ supports payloads up to 10MB, making it perfect for passing embeddings (1536-3072 dimensions) between pipeline stages.
Cron Jobs
Schedule recurring jobs with standard cron expressions. flashQ supports 6-field cron (including seconds).
import { FlashQ } from 'flashq';
const client = new FlashQ();
// Every minute
await client.addCron('health-check', {
queue: 'monitoring',
schedule: '0 * * * * *', // sec min hour day month weekday
data: { type: 'health' }
});
// Every hour at minute 0
await client.addCron('hourly-sync', {
queue: 'sync',
schedule: '0 0 * * * *',
data: { source: 'external-api' }
});
// Every day at 2 AM
await client.addCron('daily-report', {
queue: 'reports',
schedule: '0 0 2 * * *',
data: { reportType: 'daily' }
});
// Every Monday at 9 AM
await client.addCron('weekly-digest', {
queue: 'emails',
schedule: '0 0 9 * * 1',
data: { template: 'weekly-digest' }
});
// List all cron jobs
const crons = await client.listCrons();
console.log(crons);
// Delete a cron job
await client.deleteCron('health-check');
Cron Expression Format
| Field | Values | Description |
|---|---|---|
| Second | 0-59 | Optional, defaults to 0 |
| Minute | 0-59 | Required |
| Hour | 0-23 | Required |
| Day | 1-31 | Required |
| Month | 1-12 | Required |
| Weekday | 0-6 | 0 = Sunday |
Common Patterns
| Expression | Description |
|---|---|
0 * * * * * | Every minute |
0 0 * * * * | Every hour |
0 0 0 * * * | Every day at midnight |
0 30 9 * * 1-5 | Weekdays at 9:30 AM |
0 0 */2 * * * | Every 2 hours |
Dead Letter Queue
Jobs that fail all retry attempts are moved to the Dead Letter Queue (DLQ) for inspection and manual retry.
import { FlashQ } from 'flashq';
const client = new FlashQ();
// Get failed jobs from DLQ
const failedJobs = await client.getDlq('emails', 10);
for (const job of failedJobs) {
console.log(`Job ${job.id} failed: ${job.error}`);
console.log(`Data: ${JSON.stringify(job.data)}`);
console.log(`Attempts: ${job.attempts}`);
}
// Retry a specific failed job
await client.retryDlq('emails', 123);
// Retry ALL failed jobs in queue
await client.retryDlq('emails');
// Discard a job directly to DLQ
await client.discard(456);
DLQ Best Practices
Monitor Regularly
Set up alerts for DLQ growth. A growing DLQ indicates systemic issues.
Log Failures
Include detailed error messages in job failures for easier debugging.
Automate Retries
For transient errors, set up automatic retry schedules during off-peak hours.
Clean Periodically
Use clean() to remove old DLQ entries after investigation.
Progress Tracking
Track job progress in real-time for long-running tasks like file processing or AI inference.
import { Queue, Worker, FlashQ } from 'flashq';
const queue = new Queue('processing');
const client = new FlashQ();
// Worker that reports progress
new Worker('processing', async (job) => {
const items = job.data.items;
const results = [];
for (let i = 0; i < items.length; i++) {
// Process item
const result = await processItem(items[i]);
results.push(result);
// Update progress (0-100)
const progress = Math.round(((i + 1) / items.length) * 100);
await job.updateProgress(progress, `Processed ${i + 1}/${items.length}`);
}
return results;
});
// Monitor progress from producer
const job = await queue.add('batch', { items: largeArray });
// Poll progress
const interval = setInterval(async () => {
const { progress, message } = await client.getProgress(job.id);
console.log(`Progress: ${progress}% - ${message}`);
if (progress === 100) {
clearInterval(interval);
}
}, 1000);
// Or listen to progress events
worker.on('progress', (job, progress) => {
console.log(`Job ${job.id}: ${progress}%`);
});
For jobs running longer than the timeout, use job.heartbeat() to prevent stall detection from failing the job.
Self-Hosting
flashQ is designed for easy self-hosting. Run it on any Linux, macOS, or container environment.
System Requirements
| Component | Minimum | Recommended |
|---|---|---|
| CPU | 1 core | 2+ cores |
| Memory | 512 MB | 1-2 GB |
| Disk | 100 MB | 1 GB (for persistence) |
| OS | Linux (x86_64, arm64), macOS | |
Binary Installation
# Linux x86_64
curl -L https://github.com/egeominotti/flashq/releases/latest/download/flashq-linux-x86_64.tar.gz | tar xz
sudo mv flashq-server /usr/local/bin/
# Linux ARM64 (Raspberry Pi, AWS Graviton)
curl -L https://github.com/egeominotti/flashq/releases/latest/download/flashq-linux-arm64.tar.gz | tar xz
sudo mv flashq-server /usr/local/bin/
# macOS Apple Silicon
curl -L https://github.com/egeominotti/flashq/releases/latest/download/flashq-macos-arm64.tar.gz | tar xz
sudo mv flashq-server /usr/local/bin/
Systemd Service
[Unit]
Description=flashQ Job Queue Server
After=network.target postgresql.service
[Service]
Type=simple
User=flashq
Environment=HTTP=1
Environment=DATABASE_URL=postgres://flashq:password@localhost/flashq
ExecStart=/usr/local/bin/flashq-server
Restart=always
RestartSec=5
[Install]
WantedBy=multi-user.target
# Enable and start
sudo systemctl enable flashq
sudo systemctl start flashq
# Check status
sudo systemctl status flashq
# View logs
sudo journalctl -u flashq -f
Docker
The recommended way to run flashQ in production.
Quick Start
# Run with dashboard enabled
docker run -d --name flashq \
-p 6789:6789 \
-p 6790:6790 \
-e HTTP=1 \
ghcr.io/egeominotti/flashq:latest
Docker Compose (Production)
version: '3.8'
services:
flashq:
image: ghcr.io/egeominotti/flashq:latest
ports:
- "6789:6789" # TCP
- "6790:6790" # HTTP/Dashboard
environment:
- HTTP=1
- DATABASE_URL=postgres://flashq:secret@postgres:5432/flashq
- AUTH_TOKENS=your-secret-token
depends_on:
postgres:
condition: service_healthy
restart: unless-stopped
deploy:
resources:
limits:
memory: 1G
postgres:
image: postgres:16-alpine
environment:
POSTGRES_USER: flashq
POSTGRES_PASSWORD: secret
POSTGRES_DB: flashq
volumes:
- flashq_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U flashq"]
interval: 5s
timeout: 5s
retries: 5
volumes:
flashq_data:
Environment Variables
| Variable | Default | Description |
|---|---|---|
PORT | 6789 | TCP port for client connections |
HTTP | 0 | Enable HTTP API (1 = enabled) |
HTTP_PORT | 6790 | HTTP API and dashboard port |
GRPC | 0 | Enable gRPC API (1 = enabled) |
GRPC_PORT | 6791 | gRPC API port |
DATABASE_URL | - | PostgreSQL connection string |
AUTH_TOKENS | - | Comma-separated auth tokens |
CLUSTER_MODE | 0 | Enable clustering (1 = enabled) |
NODE_ID | auto | Unique node ID for clustering |
Clustering (High Availability)
Run multiple flashQ nodes for high availability. PostgreSQL is used for coordination and leader election.
Architecture
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Node 1 │ │ Node 2 │ │ Node 3 │
│ (Leader) │ │(Follower)│ │(Follower)│
└────┬─────┘ └────┬─────┘ └────┬─────┘
│ │ │
└───────────────┼───────────────┘
│
┌──────▼──────┐
│ PostgreSQL │
│ (Shared) │
└─────────────┘
How It Works
- Leader election: Uses PostgreSQL advisory locks (
pg_try_advisory_lock) - Leader responsibilities: Runs background tasks (cron, cleanup, timeouts)
- All nodes: Handle client requests (push/pull/ack)
- Automatic failover: Within 5 seconds when leader crashes
- Health checks: Stale nodes cleaned after 30s of no heartbeat
Multi-Node Setup
version: '3.8'
services:
flashq-node1:
image: ghcr.io/egeominotti/flashq:latest
ports:
- "6789:6789"
- "6790:6790"
environment:
- CLUSTER_MODE=1
- NODE_ID=node-1
- HTTP=1
- DATABASE_URL=postgres://flashq:secret@postgres:5432/flashq
depends_on:
- postgres
flashq-node2:
image: ghcr.io/egeominotti/flashq:latest
ports:
- "6793:6789"
- "6794:6790"
environment:
- CLUSTER_MODE=1
- NODE_ID=node-2
- HTTP=1
- DATABASE_URL=postgres://flashq:secret@postgres:5432/flashq
depends_on:
- postgres
postgres:
image: postgres:16-alpine
environment:
POSTGRES_USER: flashq
POSTGRES_PASSWORD: secret
POSTGRES_DB: flashq
volumes:
- ha_data:/var/lib/postgresql/data
volumes:
ha_data:
Cluster Endpoints
| Endpoint | Description |
|---|---|
GET /health | Node health with leader/follower status |
GET /cluster/nodes | List all nodes in cluster |
# Check cluster status
curl http://localhost:6790/cluster/nodes
# Response
{
"nodes": [
{ "id": "node-1", "host": "flashq-node1", "is_leader": true },
{ "id": "node-2", "host": "flashq-node2", "is_leader": false }
]
}
Configuration
Configure the flashQ server with environment variables.
| Variable | Default | Description |
|---|---|---|
PORT |
6789 |
TCP port for client connections |
HTTP |
0 |
Enable HTTP API and dashboard (set to 1) |
HTTP_PORT |
6790 |
HTTP API port |
DATABASE_URL |
- | PostgreSQL connection URL for persistence |
AUTH_TOKENS |
- | Comma-separated list of valid auth tokens |
CLUSTER_MODE |
0 |
Enable clustering (set to 1) |
Example: Production Docker
docker run -d --name flashq \
-p 6789:6789 \
-p 6790:6790 \
-e HTTP=1 \
-e DATABASE_URL=postgres://user:pass@db:5432/flashq \
-e AUTH_TOKENS=secret1,secret2 \
ghcr.io/egeominotti/flashq:latest
Migration from BullMQ
flashQ uses a BullMQ-compatible API, making migration straightforward.
Install flashQ SDK
bun add flashq
Start the flashQ server
docker run -d -p 6789:6789 ghcr.io/egeominotti/flashq:latest
Update your imports
// Before (BullMQ)
import { Queue, Worker } from 'bullmq';
// After (flashQ)
import { Queue, Worker } from 'flashq';
Remove Redis configuration
// Before (BullMQ)
const queue = new Queue('emails', {
connection: { host: 'localhost', port: 6379 }
});
// After (flashQ) - no Redis needed!
const queue = new Queue('emails');
Your code should work without any other changes. The Queue and Worker APIs are compatible.
API Reference
Complete reference for all SDK methods and server commands.
Queue Class
| Method | Description |
|---|---|
add(name, data, opts?) | Add a single job to the queue |
addBulk(jobs) | Add multiple jobs in batch |
getJob(jobId) | Get job by ID with current state |
getJobs(state?, limit?, offset?) | List jobs with filtering and pagination |
getJobCounts() | Get counts by state |
count() | Count waiting + delayed jobs |
finished(jobId, timeout?) | Wait for job completion |
pause() | Pause the queue |
resume() | Resume the queue |
isPaused() | Check if queue is paused |
drain() | Remove all waiting jobs |
obliterate() | Remove ALL queue data |
clean(grace, state, limit?) | Cleanup by age and state |
close() | Close connection |
Worker Class
| Method / Event | Description |
|---|---|
run() | Start processing (if autorun=false) |
pause() | Pause the worker |
resume() | Resume the worker |
close() | Stop and close connections |
on('completed') | Job completed event |
on('failed') | Job failed event |
on('active') | Job started event |
on('progress') | Progress update event |
on('error') | Worker error event |
FlashQ Client (Low-Level)
| Method | Description |
|---|---|
connect() | Connect to server |
close() | Close connection |
auth(token) | Authenticate with token |
push(queue, data, opts?) | Push a job |
pushBatch(queue, jobs) | Push multiple jobs |
pull(queue) | Pull a job (blocking) |
pullBatch(queue, count) | Pull multiple jobs |
ack(jobId, result?) | Acknowledge completion |
ackBatch(jobIds) | Batch acknowledge |
fail(jobId, error?) | Fail a job |
cancel(jobId) | Cancel pending job |
getJob(jobId) | Get job details |
getState(jobId) | Get job state only |
getResult(jobId) | Get job result |
getJobByCustomId(customId) | Lookup by custom ID |
progress(jobId, pct, msg?) | Update progress |
getProgress(jobId) | Get job progress |
update(jobId, data) | Update job data |
changePriority(jobId, pri) | Change priority |
moveToDelayed(jobId, delay) | Move to delayed |
promote(jobId) | Move delayed to waiting |
discard(jobId) | Move to DLQ |
heartbeat(jobId) | Send heartbeat |
log(jobId, msg, level?) | Add log entry |
getLogs(jobId) | Get job logs |
getDlq(queue, count?) | Get DLQ jobs |
retryDlq(queue, jobId?) | Retry DLQ jobs |
setRateLimit(queue, limit) | Set rate limit |
clearRateLimit(queue) | Clear rate limit |
setConcurrency(queue, limit) | Set concurrency |
clearConcurrency(queue) | Clear concurrency |
addCron(name, options) | Add cron job |
deleteCron(name) | Delete cron job |
listCrons() | List cron jobs |
stats() | Get queue statistics |
metrics() | Get detailed metrics |
listQueues() | List all queues |
Job Options (Complete)
| Option | Type | Description |
|---|---|---|
priority | number | Higher = processed first |
delay | number | Delay in milliseconds |
attempts | number | Max retry attempts |
backoff | number | object | Retry backoff strategy |
timeout | number | Processing timeout (ms) |
ttl | number | Time-to-live (ms) |
jobId | string | Custom ID for idempotency |
depends_on | number[] | Job dependencies |
unique_key | string | Deduplication key |
tags | string[] | Job tags for filtering |
lifo | boolean | Last-in-first-out mode |
stall_timeout | number | Stall detection (ms) |
debounce_id | string | Debounce identifier |
debounce_ttl | number | Debounce window (ms) |
keepCompletedAge | number | Keep result for duration (ms) |
keepCompletedCount | number | Keep in last N completed |
HTTP API Endpoints
| Method | Endpoint | Description |
|---|---|---|
| GET | /health | Health check |
| GET | /metrics/prometheus | Prometheus metrics |
| GET | /cluster/nodes | List cluster nodes |
| POST | /api/push | Push job via HTTP |
| POST | /api/pull | Pull job via HTTP |
| POST | /api/ack | Acknowledge job |
| GET | /api/job/:id | Get job details |
| GET | /api/stats | Queue statistics |
Troubleshooting
Connection refused
Error: ECONNREFUSED 127.0.0.1:6789
Solution: Make sure the flashQ server is running:
docker ps | grep flashq
Job timeout
Error: Job exceeds processing timeout
Solution: Increase the timeout in job options:
await queue.add('long-job', data, {
timeout: 300000 // 5 minutes
});
Authentication failed
Error: AUTH_FAILED
Solution: Set the token in your client configuration:
const queue = new Queue('emails', {
token: 'your-auth-token'
});
Job stuck in active state
Error: Job remains active after worker crash
Solution: Jobs are automatically recovered after the stall timeout. For long jobs, use heartbeat:
new Worker('queue', async (job) => {
for (const item of items) {
await processItem(item);
await job.heartbeat(); // Prevent stall detection
}
});
Memory usage growing
Cause: Completed jobs and results accumulating
Solution: Configure retention or use clean():
// During job creation
await queue.add('job', data, {
keepCompletedAge: 3600000, // Keep for 1 hour
keepCompletedCount: 1000 // Keep last 1000
});
// Manual cleanup
await queue.clean(3600000, 'completed');