@company-manager/docs

Job Processing

Queue-based workflow and node processing diagrams

Workflow Job Processing

This page covers the queue-based job processing system for workflow execution.

Job Processing Architecture

flowchart TB
    subgraph "Job Sources"
        API[API Calls]
        WEBHOOK[Webhooks]
        SCHEDULER[Scheduler]
        EVENTS[Application Events]
    end

    subgraph "Queue Layer"
        WF_QUEUE[workflow-execution queue]
        NODE_QUEUE[workflow-node queue]
        SCHED_QUEUE[workflow-scheduled queue]
        HOOK_QUEUE[workflow-webhook queue]
    end

    subgraph "Worker Pool"
        WF_WORKER[Execution Workers]
        NODE_WORKER[Node Workers]
        SCHED_WORKER[Schedule Workers]
        HOOK_WORKER[Webhook Workers]
    end

    subgraph "Processing"
        ENGINE[Workflow Engine]
        EXECUTORS[Node Executors]
        STORAGE[State Storage]
    end

    API --> WF_QUEUE
    WEBHOOK --> HOOK_QUEUE
    SCHEDULER --> SCHED_QUEUE
    EVENTS --> WF_QUEUE

    WF_QUEUE --> WF_WORKER
    NODE_QUEUE --> NODE_WORKER
    SCHED_QUEUE --> SCHED_WORKER
    HOOK_QUEUE --> HOOK_WORKER

    WF_WORKER --> ENGINE
    NODE_WORKER --> EXECUTORS
    ENGINE --> STORAGE
    EXECUTORS --> STORAGE

    style WF_QUEUE fill:#fff3e0
    style ENGINE fill:#e1f5fe

Job Types

QueueJob TypeDescription
workflow-executionExecute full workflowRuns complete workflow from start
workflow-nodeExecute single nodeDistributed node execution
workflow-scheduledProcess scheduled triggerHandles cron-based workflows
workflow-webhookProcess webhook triggerHandles incoming webhooks

Workflow Execution Job Flow

sequenceDiagram
    participant Source as Trigger Source
    participant Queue as Job Queue
    participant Worker as Execution Worker
    participant Engine as Workflow Engine
    participant DB as Database
    participant State as State Storage

    Source->>Queue: Enqueue workflow-execution job
    Note over Queue: {workflowId, input, userId}

    Worker->>Queue: Poll for jobs
    Queue-->>Worker: Job data

    Worker->>DB: Load workflow definition
    DB-->>Worker: Workflow

    Worker->>DB: Create execution record (PENDING)
    DB-->>Worker: Execution ID

    Worker->>Engine: Initialize engine
    Worker->>State: Initialize state

    Worker->>Engine: Run workflow
    Engine->>Engine: Process nodes

    alt All nodes complete
        Worker->>DB: Update execution (COMPLETED)
        Worker->>Queue: Acknowledge job
    else Error occurred
        Worker->>DB: Update execution (FAILED)
        Worker->>Queue: Handle based on retry policy
    end

Distributed Node Processing

flowchart TD
    subgraph "Main Execution"
        START[Start Workflow] --> IDENTIFY[Identify Parallelizable Nodes]
        IDENTIFY --> QUEUE_NODES[Queue Individual Node Jobs]
    end

    subgraph "Node Job Queue"
        QUEUE_NODES --> JOB1[Node A Job]
        QUEUE_NODES --> JOB2[Node B Job]
        QUEUE_NODES --> JOB3[Node C Job]
    end

    subgraph "Worker Pool"
        JOB1 --> WORKER1[Worker 1]
        JOB2 --> WORKER2[Worker 2]
        JOB3 --> WORKER3[Worker 3]
    end

    subgraph "Results"
        WORKER1 --> COLLECT[Collect Results]
        WORKER2 --> COLLECT
        WORKER3 --> COLLECT
        COLLECT --> CONTINUE[Continue Workflow]
    end

Scheduled Workflow Processing

sequenceDiagram
    participant Cron as Cron Scheduler
    participant Queue as Scheduled Queue
    participant Worker as Schedule Worker
    participant DB as Database
    participant Exec as Execution Queue

    Note over Cron: Every minute

    Cron->>Queue: Enqueue check job
    Queue->>Worker: Process scheduled check

    Worker->>DB: Find due scheduled workflows
    Note over DB: WHERE nextRunAt <= NOW()

    DB-->>Worker: Due workflows list

    loop For each due workflow
        Worker->>DB: Create execution record
        Worker->>Exec: Queue workflow-execution job
        Worker->>DB: Update schedule (nextRunAt, runCount)
    end

    Worker->>Queue: Acknowledge job

Webhook Processing Flow

flowchart TD
    WEBHOOK[Webhook Received] --> VALIDATE[Validate Signature]
    VALIDATE --> PARSE[Parse Payload]

    PARSE --> FIND[Find Matching Workflows]
    FIND --> MATCH{Matches Found?}

    MATCH -->|Yes| QUEUE[Queue Execution Jobs]
    MATCH -->|No| LOG[Log No Match]

    QUEUE --> LOOP{More Workflows?}
    LOOP -->|Yes| CREATE[Create Execution]
    CREATE --> ENQUEUE[Enqueue Job]
    ENQUEUE --> LOOP

    LOOP -->|No| RESPOND[Return Response]

    LOG --> RESPOND

    style VALIDATE fill:#fff3e0

Job Retry Strategy

stateDiagram-v2
    [*] --> QUEUED: Job Created

    QUEUED --> PROCESSING: Worker Picks Up
    PROCESSING --> COMPLETED: Success
    PROCESSING --> FAILED: Error

    FAILED --> RETRY_WAIT: Retries Remaining
    FAILED --> DEAD_LETTER: Max Retries

    RETRY_WAIT --> QUEUED: Backoff Complete

    COMPLETED --> [*]
    DEAD_LETTER --> [*]

    note right of RETRY_WAIT
        Exponential backoff:
        1s, 2s, 4s, 8s...
    end note

Job Data Structures

erDiagram
    JOB ||--o{ JOB_ATTEMPT : has

    JOB {
        string id PK
        string queue
        json data
        enum status
        int priority
        int maxRetries
        int retryCount
        timestamp createdAt
        timestamp processedAt
        timestamp completedAt
    }

    JOB_ATTEMPT {
        string id PK
        string jobId FK
        int attemptNumber
        enum status
        string error
        timestamp startedAt
        timestamp finishedAt
        int duration
    }

Concurrency Control

flowchart TD
    subgraph "Concurrency Limits"
        GLOBAL[Global Limit: 100 jobs]
        TENANT[Per-Tenant Limit: 10 jobs]
        WORKFLOW[Per-Workflow Limit: 5 jobs]
    end

    JOB[New Job] --> CHECK_GLOBAL{Global Limit?}
    CHECK_GLOBAL -->|OK| CHECK_TENANT{Tenant Limit?}
    CHECK_GLOBAL -->|Exceeded| WAIT_GLOBAL[Wait in Queue]

    CHECK_TENANT -->|OK| CHECK_WF{Workflow Limit?}
    CHECK_TENANT -->|Exceeded| WAIT_TENANT[Wait in Queue]

    CHECK_WF -->|OK| PROCESS[Process Job]
    CHECK_WF -->|Exceeded| WAIT_WF[Wait in Queue]

    WAIT_GLOBAL --> CHECK_GLOBAL
    WAIT_TENANT --> CHECK_TENANT
    WAIT_WF --> CHECK_WF

Job Priority Levels

flowchart TD
    subgraph "Priority Queues"
        HIGH[High Priority]
        NORMAL[Normal Priority]
        LOW[Low Priority]
    end

    subgraph "Job Types by Priority"
        HIGH --> H1[User-triggered workflows]
        HIGH --> H2[Webhook responses]

        NORMAL --> N1[Scheduled workflows]
        NORMAL --> N2[Standard automation]

        LOW --> L1[Background tasks]
        LOW --> L2[Cleanup jobs]
    end

    subgraph "Processing Order"
        WORKER[Worker] --> CHECK_HIGH{High queue?}
        CHECK_HIGH -->|Has jobs| PROCESS_HIGH[Process High]
        CHECK_HIGH -->|Empty| CHECK_NORMAL{Normal queue?}
        CHECK_NORMAL -->|Has jobs| PROCESS_NORMAL[Process Normal]
        CHECK_NORMAL -->|Empty| CHECK_LOW{Low queue?}
        CHECK_LOW -->|Has jobs| PROCESS_LOW[Process Low]
        CHECK_LOW -->|Empty| WAIT[Wait for jobs]
    end

Queue Monitoring

flowchart LR
    subgraph "Metrics"
        DEPTH[Queue Depth]
        LATENCY[Processing Latency]
        THROUGHPUT[Throughput]
        ERRORS[Error Rate]
    end

    subgraph "Alerts"
        DEPTH --> ALERT_DEPTH{> Threshold?}
        LATENCY --> ALERT_LAT{> Threshold?}
        ERRORS --> ALERT_ERR{> Threshold?}

        ALERT_DEPTH -->|Yes| NOTIFY[Alert Team]
        ALERT_LAT -->|Yes| NOTIFY
        ALERT_ERR -->|Yes| NOTIFY
    end

    subgraph "Actions"
        NOTIFY --> SCALE[Scale Workers]
        NOTIFY --> INVESTIGATE[Investigate Issues]
    end

Dead Letter Queue Processing

flowchart TD
    FAILED_JOB[Failed Job] --> MAX_RETRIES{Max Retries?}

    MAX_RETRIES -->|Yes| DLQ[Move to Dead Letter Queue]
    MAX_RETRIES -->|No| RETRY[Retry with Backoff]

    DLQ --> ALERT[Alert Operations]
    DLQ --> LOG[Log Failure Details]

    ALERT --> REVIEW[Manual Review]
    REVIEW --> DECISION{Decision}

    DECISION -->|Fix & Retry| REQUEUE[Requeue Job]
    DECISION -->|Discard| DELETE[Delete Job]
    DECISION -->|Investigate| DEBUG[Debug Issue]

    REQUEUE --> MAIN_QUEUE[Main Queue]
    DELETE --> ARCHIVE[Archive for Audit]