Job Processing
Queue-based workflow and node processing diagrams
Workflow Job Processing
This page covers the queue-based job processing system for workflow execution.
Job Processing Architecture
flowchart TB
subgraph "Job Sources"
API[API Calls]
WEBHOOK[Webhooks]
SCHEDULER[Scheduler]
EVENTS[Application Events]
end
subgraph "Queue Layer"
WF_QUEUE[workflow-execution queue]
NODE_QUEUE[workflow-node queue]
SCHED_QUEUE[workflow-scheduled queue]
HOOK_QUEUE[workflow-webhook queue]
end
subgraph "Worker Pool"
WF_WORKER[Execution Workers]
NODE_WORKER[Node Workers]
SCHED_WORKER[Schedule Workers]
HOOK_WORKER[Webhook Workers]
end
subgraph "Processing"
ENGINE[Workflow Engine]
EXECUTORS[Node Executors]
STORAGE[State Storage]
end
API --> WF_QUEUE
WEBHOOK --> HOOK_QUEUE
SCHEDULER --> SCHED_QUEUE
EVENTS --> WF_QUEUE
WF_QUEUE --> WF_WORKER
NODE_QUEUE --> NODE_WORKER
SCHED_QUEUE --> SCHED_WORKER
HOOK_QUEUE --> HOOK_WORKER
WF_WORKER --> ENGINE
NODE_WORKER --> EXECUTORS
ENGINE --> STORAGE
EXECUTORS --> STORAGE
style WF_QUEUE fill:#fff3e0
style ENGINE fill:#e1f5feJob Types
| Queue | Job Type | Description |
|---|---|---|
| workflow-execution | Execute full workflow | Runs complete workflow from start |
| workflow-node | Execute single node | Distributed node execution |
| workflow-scheduled | Process scheduled trigger | Handles cron-based workflows |
| workflow-webhook | Process webhook trigger | Handles incoming webhooks |
Workflow Execution Job Flow
sequenceDiagram
participant Source as Trigger Source
participant Queue as Job Queue
participant Worker as Execution Worker
participant Engine as Workflow Engine
participant DB as Database
participant State as State Storage
Source->>Queue: Enqueue workflow-execution job
Note over Queue: {workflowId, input, userId}
Worker->>Queue: Poll for jobs
Queue-->>Worker: Job data
Worker->>DB: Load workflow definition
DB-->>Worker: Workflow
Worker->>DB: Create execution record (PENDING)
DB-->>Worker: Execution ID
Worker->>Engine: Initialize engine
Worker->>State: Initialize state
Worker->>Engine: Run workflow
Engine->>Engine: Process nodes
alt All nodes complete
Worker->>DB: Update execution (COMPLETED)
Worker->>Queue: Acknowledge job
else Error occurred
Worker->>DB: Update execution (FAILED)
Worker->>Queue: Handle based on retry policy
endDistributed Node Processing
flowchart TD
subgraph "Main Execution"
START[Start Workflow] --> IDENTIFY[Identify Parallelizable Nodes]
IDENTIFY --> QUEUE_NODES[Queue Individual Node Jobs]
end
subgraph "Node Job Queue"
QUEUE_NODES --> JOB1[Node A Job]
QUEUE_NODES --> JOB2[Node B Job]
QUEUE_NODES --> JOB3[Node C Job]
end
subgraph "Worker Pool"
JOB1 --> WORKER1[Worker 1]
JOB2 --> WORKER2[Worker 2]
JOB3 --> WORKER3[Worker 3]
end
subgraph "Results"
WORKER1 --> COLLECT[Collect Results]
WORKER2 --> COLLECT
WORKER3 --> COLLECT
COLLECT --> CONTINUE[Continue Workflow]
endScheduled Workflow Processing
sequenceDiagram
participant Cron as Cron Scheduler
participant Queue as Scheduled Queue
participant Worker as Schedule Worker
participant DB as Database
participant Exec as Execution Queue
Note over Cron: Every minute
Cron->>Queue: Enqueue check job
Queue->>Worker: Process scheduled check
Worker->>DB: Find due scheduled workflows
Note over DB: WHERE nextRunAt <= NOW()
DB-->>Worker: Due workflows list
loop For each due workflow
Worker->>DB: Create execution record
Worker->>Exec: Queue workflow-execution job
Worker->>DB: Update schedule (nextRunAt, runCount)
end
Worker->>Queue: Acknowledge jobWebhook Processing Flow
flowchart TD
WEBHOOK[Webhook Received] --> VALIDATE[Validate Signature]
VALIDATE --> PARSE[Parse Payload]
PARSE --> FIND[Find Matching Workflows]
FIND --> MATCH{Matches Found?}
MATCH -->|Yes| QUEUE[Queue Execution Jobs]
MATCH -->|No| LOG[Log No Match]
QUEUE --> LOOP{More Workflows?}
LOOP -->|Yes| CREATE[Create Execution]
CREATE --> ENQUEUE[Enqueue Job]
ENQUEUE --> LOOP
LOOP -->|No| RESPOND[Return Response]
LOG --> RESPOND
style VALIDATE fill:#fff3e0Job Retry Strategy
stateDiagram-v2
[*] --> QUEUED: Job Created
QUEUED --> PROCESSING: Worker Picks Up
PROCESSING --> COMPLETED: Success
PROCESSING --> FAILED: Error
FAILED --> RETRY_WAIT: Retries Remaining
FAILED --> DEAD_LETTER: Max Retries
RETRY_WAIT --> QUEUED: Backoff Complete
COMPLETED --> [*]
DEAD_LETTER --> [*]
note right of RETRY_WAIT
Exponential backoff:
1s, 2s, 4s, 8s...
end noteJob Data Structures
erDiagram
JOB ||--o{ JOB_ATTEMPT : has
JOB {
string id PK
string queue
json data
enum status
int priority
int maxRetries
int retryCount
timestamp createdAt
timestamp processedAt
timestamp completedAt
}
JOB_ATTEMPT {
string id PK
string jobId FK
int attemptNumber
enum status
string error
timestamp startedAt
timestamp finishedAt
int duration
}Concurrency Control
flowchart TD
subgraph "Concurrency Limits"
GLOBAL[Global Limit: 100 jobs]
TENANT[Per-Tenant Limit: 10 jobs]
WORKFLOW[Per-Workflow Limit: 5 jobs]
end
JOB[New Job] --> CHECK_GLOBAL{Global Limit?}
CHECK_GLOBAL -->|OK| CHECK_TENANT{Tenant Limit?}
CHECK_GLOBAL -->|Exceeded| WAIT_GLOBAL[Wait in Queue]
CHECK_TENANT -->|OK| CHECK_WF{Workflow Limit?}
CHECK_TENANT -->|Exceeded| WAIT_TENANT[Wait in Queue]
CHECK_WF -->|OK| PROCESS[Process Job]
CHECK_WF -->|Exceeded| WAIT_WF[Wait in Queue]
WAIT_GLOBAL --> CHECK_GLOBAL
WAIT_TENANT --> CHECK_TENANT
WAIT_WF --> CHECK_WFJob Priority Levels
flowchart TD
subgraph "Priority Queues"
HIGH[High Priority]
NORMAL[Normal Priority]
LOW[Low Priority]
end
subgraph "Job Types by Priority"
HIGH --> H1[User-triggered workflows]
HIGH --> H2[Webhook responses]
NORMAL --> N1[Scheduled workflows]
NORMAL --> N2[Standard automation]
LOW --> L1[Background tasks]
LOW --> L2[Cleanup jobs]
end
subgraph "Processing Order"
WORKER[Worker] --> CHECK_HIGH{High queue?}
CHECK_HIGH -->|Has jobs| PROCESS_HIGH[Process High]
CHECK_HIGH -->|Empty| CHECK_NORMAL{Normal queue?}
CHECK_NORMAL -->|Has jobs| PROCESS_NORMAL[Process Normal]
CHECK_NORMAL -->|Empty| CHECK_LOW{Low queue?}
CHECK_LOW -->|Has jobs| PROCESS_LOW[Process Low]
CHECK_LOW -->|Empty| WAIT[Wait for jobs]
endQueue Monitoring
flowchart LR
subgraph "Metrics"
DEPTH[Queue Depth]
LATENCY[Processing Latency]
THROUGHPUT[Throughput]
ERRORS[Error Rate]
end
subgraph "Alerts"
DEPTH --> ALERT_DEPTH{> Threshold?}
LATENCY --> ALERT_LAT{> Threshold?}
ERRORS --> ALERT_ERR{> Threshold?}
ALERT_DEPTH -->|Yes| NOTIFY[Alert Team]
ALERT_LAT -->|Yes| NOTIFY
ALERT_ERR -->|Yes| NOTIFY
end
subgraph "Actions"
NOTIFY --> SCALE[Scale Workers]
NOTIFY --> INVESTIGATE[Investigate Issues]
endDead Letter Queue Processing
flowchart TD
FAILED_JOB[Failed Job] --> MAX_RETRIES{Max Retries?}
MAX_RETRIES -->|Yes| DLQ[Move to Dead Letter Queue]
MAX_RETRIES -->|No| RETRY[Retry with Backoff]
DLQ --> ALERT[Alert Operations]
DLQ --> LOG[Log Failure Details]
ALERT --> REVIEW[Manual Review]
REVIEW --> DECISION{Decision}
DECISION -->|Fix & Retry| REQUEUE[Requeue Job]
DECISION -->|Discard| DELETE[Delete Job]
DECISION -->|Investigate| DEBUG[Debug Issue]
REQUEUE --> MAIN_QUEUE[Main Queue]
DELETE --> ARCHIVE[Archive for Audit]