Stages
Workflows let you reshape the Hub pipeline without code — wire a device through a few filters into a model on a visual canvas. Custom stages are the developer side of that same system: instead of choosing from the built-in nodes, you bring your own microservice in as a workflow stage. The engine is open — every built-in stage (classification, thumbnails, sprites) is just a service that consumes a message off a queue, does one job, and hands the result back — and your service plugs in the same way.
A workflow stage is a step in a workflow, implemented as a microservice the workflows engine triggers automatically for every recording: it receives a run from a queue, does the work, and returns the result — in whatever language suits the job, deployed and scaled on its own. Stages are asynchronous: they run alongside the built-in analysis and never block it.
This page is the contract your microservice codes against — the queue it listens on, the message it receives, how it returns a result, and how the engine tracks it to completion. It is capability-agnostic: it never assumes what your stage does, so the same mechanism serves a speech-to-text service, a custom detector, or any other step. For a concrete capability built on it, see the block types a stage can produce.
Status — rolling out. The queue, the
WorkflowRunit dispatches and the completion mechanics here are already how the pipeline works internally. The config-driven stage registration (thekerberoshub.workflows.stagesvalues section — see Registering a stage) is the addition that lets a custom stage join without changing engine code — dispatched by the standalone workflows engine (hub-workflows), which runs alongside the analysis service and consumes the classify results it tees over. It is landing now for self-hosted deployments.This page covers how a microservice delivers a result. For the complementary receiving side — one shared core that takes a result from either the API or the queue and routes each block to the right actions by its type — see Ingest.
When to add a stage
A stage is one of two transports for getting your data into the Hub. The other is an authenticated ingest API push. They deliver the same data to the same place; they differ in who triggers the work and where your code runs.
- Ingest API push — your service
POSTs whenever it has data. Works on every deployment, needs no cluster access. The right starting point for most integrators. See Ingest → Over the API. - Workflow stage (this page) — the pipeline triggers your service automatically on every ingest / re-analysis, with queue-level delivery guarantees. Available on self-hosted deployments that can run custom stages.
Reach for a stage only when you control the deployment and want the capability to run automatically as a built-in step of every recording’s analysis.
Anatomy of a stage
A stage has exactly two runtime dependencies: the message broker (to receive events and hand results back) and the database (to read and write event metadata). There is no service-to-service HTTP and no shared in-process state — every hand-off goes through the broker. That loose coupling is what lets any stage scale, restart or be replaced without touching the rest of the pipeline.
Registering a stage
You add a stage entirely in the chart’s values.yaml — no engine code changes. A stage is two halves that share one name under kerberoshub, and both only take effect when the workflows engine is on (kerberoshub.workflows.enabled: true):
- the workflow stage object (
kerberoshub.workflows.stages.<name>) — declares the stage and how the engine routes to it; - the service deployment (
kerberoshub.services.<name>) — deploys your microservice.
Each half has its own enabled, so turn both on (plus the engine): routing with no microservice queues messages nobody reads, and a microservice with no routing never receives any.
The two sections divide by concern. kerberoshub.workflows is the engine’s behaviour — its enabled switch and the stages routing registry. kerberoshub.services holds the deployments of the whole workflows subsystem in one uniform shape: the engine itself (services.workflows, a chart default you don’t normally touch) and one microservice per stage (services.<name>). So adding a stage is always the same two edits — a routing entry under workflows.stages, and your microservice under services.
# values.yaml
kerberoshub:
workflows:
enabled: true # master switch: the workflows engine
# ── the workflow stage object: declare + route ──────────────
stages:
speed: # stage name (and default operation id)
enabled: true # route to this stage
dispatch: conditional # always | conditional
# conditional only — see Conditional routing:
needsMode: any # any | all
needs:
- operation: classify
condition: { path: inputs.classify.properties, op: contains, value: car }
# ── the deployments: the engine (chart default) + your microservice ─
services:
# workflows: … # the engine itself — a chart default; you don't set this here
speed: # same name as the stage object
enabled: true # deploy the microservice
repository: ghcr.io/acme/speed
tag: "v1.0.0"
queue: "hub-workflows-speed" # the queue your microservice consumes
replicas: 1
pullPolicy: IfNotPresent
logLevel: info # trace | debug | info | warn | error
resources: {}Workflow stage object — kerberoshub.workflows.stages.<name>
| Field | Required | Value | What you use it for |
|---|---|---|---|
enabled | yes | bool | Route to the stage. Off = the engine doesn’t know it exists. |
operation | no | string | The stage’s operation id — the key your result is filed under (results.<operation>) and that other stages depend on. Defaults to the stage name; set it only to differ from the key. |
dispatch | no | always | conditional | always (default) runs on every recording; conditional runs only when the recording matches — see Conditional routing. |
needs | conditional only | list | The match rule — see Conditional routing. |
needsMode | no | any | all | How multiple needs combine — any (default) or all. |
Stage vs. operation. On this page, stage is the step and operation is just its id — the
operationfield above, which defaults to the stage name. See the glossary for the full vocabulary.
Service deployment — kerberoshub.services.<name>
A normal microservice Deployment, keyed to the same name as the stage object. (The workflows engine itself is deployed from this same section as services.workflows — the one services entry with no matching stage, and a chart default you don’t normally touch.)
| Field | Required | Value | What you use it for |
|---|---|---|---|
enabled | yes | bool | Deploy the microservice pod. Off = nothing runs. |
repository | yes | string | Your microservice’s container image. |
tag | yes | string | Image tag. |
queue | yes | string | The queue your microservice consumes; the engine dispatches to this exact name. See Queue naming. |
replicas | no | int | Pod count. |
pullPolicy | no | IfNotPresent | Always | … | Image pull policy. |
logLevel | no | trace…error | Microservice log verbosity. |
resources | no | object | Standard pod requests/limits. |
topologySpreadConstraints, volumes, volumeMounts | no | list | Standard optional Deployment extras. |
It’s the engine switch, not the front-end one.
kerberoshub.workflows.enabledtoggles the workflows engine that dispatches stages. Don’t confuse it with the unrelatedkerberoshub.…features.workflows.enabledfront-end feature flag.
Minimal stage. The smallest live stage is enabled: true on both halves, the image + queue on the deployment, and dispatch: always. Everything else is additive — you can switch to conditional routing later without touching the rest.
Conditional routing
By default (dispatch: always) a stage runs on every recording. Set dispatch: conditional to run it only on recordings that match a rule you declare in the stage object — handy when the deciding signal (e.g. the classifier saw a car) isn’t known until the run is already underway. A conditional stage is never queued up front; the engine re-checks it as the run progresses and fires it the moment the rule holds. Recordings that never match never touch the stage.
The rule is a list of needs combined by needsMode:
stages:
speed:
enabled: true
dispatch: conditional
needsMode: any # any (default) | all
needs:
- operation: classify # GATE — wait until this op is on the run
condition: # PREDICATE — tested once the gate is ready
path: inputs.classify.properties
op: contains
value: caroperation— the gate: an upstream operation id whose result must be present on the run before this need is checked (e.g.classify, or another stage’s id). Leave it empty to check the run with no gate — fordevice/user/ identity rules that are known the moment the run opens.condition— the predicate tested once the gate is ready. Omit it to fire on the gate alone (i.e. “as soon asclassifyis present”).needsMode— with more than one need:any(default) fires on the first satisfied need (OR);allfires only when every need is satisfied (AND — a join).
# only on one camera, and only when a car was seen — both must hold
needsMode: all
needs:
- operation: classify
condition: { path: inputs.classify.properties, op: contains, value: car }
- operation: # no gate — checked the moment the run opens
condition: { path: device.deviceKey, op: eq, value: device02 }The condition object
A condition is { path, op, value }.
path — a dot-path relative to the WorkflowRun your stage receives (see The workflow run). It walks objects, and a * segment fans out across the elements of an array (see Matching inside arrays). Valid roots:
| Root | Example | Notes |
|---|---|---|
inputs.<op>.<field> | inputs.classify.properties | An upstream result. classify is always present (the trigger); its fields are properties, objectCount, details (details is an array — not reachable). |
results.<op>.<field> | results.speed.kmh | A finished stage’s output. Fields of your own custom operations are accepted as-is; array fields can be matched element-wise with * (see Matching inside arrays). |
device.<field> | device.deviceKey | One of deviceKey, deviceName, provider, storageSolution. |
user.<field> | user.organisationId | One of id, organisationId. |
| scalar | key, operation, runId, traceId | Top-level identity values (not traversable). |
Credentials (storage, user.storage) are deliberately not matchable.
op and value — the comparison and its operand:
op | value | Matches when |
|---|---|---|
exists | (none) | the path resolves to anything. |
eq | scalar | the value equals value (numbers compared numerically). |
ne | scalar | the value differs, or the path is absent. |
contains | scalar | the value is an array containing value, or a string containing value. |
in | list | the value is one of the entries in value. |
gt / gte / lt / lte | number | the value is numerically > / >= / < / <= value. |
containsandinare mirror images: usecontainswhen the run holds a list and you test for a member (inputs.classify.properties contains car); useinwhen the run holds a single value and you test it against a set (device.deviceKey in [device01, device02]).
Matching inside arrays
A path segment of * fans out across the elements of the array at that position and continues resolving from each element, so it reaches a field on every object in a list. This is how you branch on the result a delegated stage hands back: the engine mirrors your block envelope into results.<operation> grouped by block type, so a stage that emits detection blocks exposes results.<operation>.detections as an array.
Say your detector stage returns a list of boxes:
"results": { "detector": { "detections": [
{ "label": "car", "score": 0.92 },
{ "label": "car", "score": 0.41 }
] } }A * matches across the list — the predicate holds when any element satisfies a positive operator (exists / eq / contains / in / gt / gte / lt / lte):
# fire when the stage found ANY box scoring above 0.8
condition: { path: results.detector.detections.*.score, op: gt, value: 0.8 }That is what makes per-element numbers work: each fanned-out value is a single score, so gt / lt compare it directly. ne is the inverse — it holds only when no element equals value (…detections.*.label, op: ne, value: car ⇒ not one box is a car). Nested lists compose: results.detector.detections.*.boxes.*.x reaches every box of every detection.
Two limits to keep in mind. * is the only index — there is no numeric [0], and a plain step onto an array (without *) matches nothing, so gate on the array itself with contains / exists or fan out with *. And separate needs don’t correlate: …*.label eq car and …*.score gt 0.8 can be satisfied by different elements — there is no “the same object has both” across two conditions. Built-in classify fields stay non-traversable (inputs.classify.details is validated as a leaf); * applies to your own operation’s result arrays.
The engine validates every condition path at boot and refuses to start on an unknown one — a typo fails fast instead of silently never firing.
How your microservice connects
The chart deploys your microservice from kerberoshub.services.<name> and injects a fixed set of environment variables — the connection contract. Whatever language your microservice is in, it reads these to reach the broker, find its queues, and fetch media; nothing else is wired for it.
| Variable | Example | What it is |
|---|---|---|
QUEUE_SYSTEM | RABBITMQ | The broker driver to connect with (the deployment’s queueProvider). |
RABBITMQ_HOST | rabbitmq.rabbitmq:5672 | Broker address — with RABBITMQ_EXCHANGE, RABBITMQ_USERNAME, RABBITMQ_PASSWORD completing the connection. |
<NAME>_QUEUE | SPEED_QUEUE | The queue you consume dispatched runs from. The variable name is your stage’s key upper-cased (hyphens become underscores — my-stage → MY_STAGE_QUEUE); its value is services.<name>.queue. See Queue naming. |
WORKFLOWS_QUEUE | hub-workflows-queue | The engine queue you publish the finished run back to. See Sending a result back. |
KERBEROS_STORAGE_URI | https://api.vault.example.com | Global media-storage endpoint — with KERBEROS_STORAGE_ACCESS_KEY and KERBEROS_STORAGE_SECRET. Per-recording overrides also travel on each run’s storage; prefer those when present. |
LOG_LEVEL | info | Microservice log verbosity (services.<name>.logLevel). |
Two things to note:
- No datastore by default. The microservice contract is broker + queues + media storage; the chart injects no database connection. A delegated stage hands its result back over
WORKFLOWS_QUEUEfor the platform to persist, while a stage that writes its own collection brings its own datastore access. - Deploying outside the chart. To run the microservice yourself, leave
services.<name>.enabledoff (so the chart deploys no pod) but keep the stage underworkflows.stagesso the engine still routes to it; then wire these same variables into your own deployment. The consume and return queue names are the only hard requirement.
The message you receive
Queue naming
Your microservice consumes from one queue, and you choose its name. The source of truth is the queue value on your stage’s deployment in the Helm chart — kerberoshub.services.<name>.queue:
kerberoshub:
services:
speed:
queue: "hub-workflows-speed" # ← anything you want; your microservice consumes this exact nameThe engine reads that same value from the stage registry and dispatches there, so the only rule is that the two agree — the queue is the one thing that binds the engine to your microservice. These examples use a consistent hub-workflows-<stage> form (hub-workflows-speed, hub-workflows-llm), but the name is an arbitrary string your broker accepts (vision.requests, team7-detector work just as well) and does not have to follow the platform’s kcloud-… convention — it only has to match on both sides.
If you omit queue, the engine falls back to a derived default, kcloud-<operation>-queue.fifo — so the convention is just that fallback, not the source of truth. Queue names are literal strings: the default deployment runs RabbitMQ, so a .fifo suffix is only part of a name, not an SQS feature.
The workflow run
Your microservice does not receive the pipeline’s internal PipelineEvent. The engine dispatches a single, self-contained models.WorkflowRun as JSON: the run’s identity, the read-only context your microservice needs, and the credentials to fetch the media. Model your microservice’s input type on this — every field below is present on the inbound dispatch, and nothing else is:
{
"operation": "speed",
"runId": "665f1b2c3d4e5f6071829304",
"key": "front-gate/2026/06/12/08-30-00.mp4",
"traceId": "8f3a1c2b4d5e6f70",
"user": {
"organisationId": "64f0a1b2c3d4e5f600112233",
"storage": { "uri": "s3://kerberos-vault", "access_key": "AKIA…", "provider": "kerberos-vault", "secret_key": "…" }
},
"device": {
"deviceKey": "front-gate",
"deviceName": "Front Gate",
"provider": "kerberos-vault",
"storageSolution": "vault"
},
"inputs": {
"classify": {
"properties": ["car", "person"],
"objectCount": 2,
"details": [{ "classified": "car", "distance": 142.6, "isStatic": false }]
}
},
"results": {},
"storage": {
"uri": "s3://kerberos-storage",
"accessKey": "AKIA…",
"secret": "…",
"vaultOverrideUri": "s3://tenant-bucket",
"vaultOverrideAccessKey": "AKIA…",
"vaultOverrideSecret": "…",
"vaultOverrideProvider": "kerberos-vault"
}
}Top-level fields
| Field | Type | What it is |
|---|---|---|
operation | string | Your stage’s operation id (the name you registered, e.g. speed). It is also the key you file your result under on the way back (results.<operation>). |
runId | string | The run’s unique id. Use it as your idempotency key — a redelivery carries the same runId. |
key | string | The recording reference (media key) the run is about. Resolve which recording to fetch from this. |
traceId | string | Distributed-trace id; propagate it on your logs/spans so the run stays traceable end-to-end. |
user | object | Curated, secret-free account context — see below. |
device | object | The recording’s device context — see below. |
inputs | object | The run’s immutable start context, keyed by the upstream operation that produced it — see below. Read-only. |
results | object | Accumulated upstream stage outputs, keyed by operation (e.g. results.detection). Empty if your stage runs first. Read-only inbound; on return, results.<operation> carries your result — filled by the engine from your payload (delegated) or set by you (own collection). |
storage | object | The credentials to fetch the media — see below. Present only on the inbound dispatch; clear it before returning the run. |
payload,workflowIdandworkflowNameare not sent inbound.payloadis the channel you fill on the way back (delegated-ingest stages only);workflowId/workflowNameare engine-internal.
user — account context
| Field | Type | What it is |
|---|---|---|
user.organisationId | string | The organisation that owns the recording. The run, and everything derived from it, is scoped to this id — scope your own writes to it too. |
user.storage | object | The account’s storage object, carried so the engine can resolve a per-recording vault override. You normally don’t need it — fetch media with the top-level storage. |
user.storage.uri | string | Account storage endpoint. |
user.storage.access_key | string | Account storage access key. |
user.storage.provider | string | Account storage provider. |
user.storage.secret_key | string | Account storage secret. |
(user.storage uses snake_case keys — it is the account Storage object. The media-fetch storage below uses camelCase.)
device — recording context
| Field | Type | What it is |
|---|---|---|
device.deviceKey | string | Stable id of the camera/device the recording came from. |
device.deviceName | string | Human-readable device name (for logs/labels). |
device.provider | string | Where the media is served from (the media VideoProvider). |
device.storageSolution | string | Where the media is stored (the media StorageSolution). |
inputs.classify — the trigger result
inputs is keyed by upstream operation. Every run opens from the classifier, so inputs.classify is always present — it is the classification result that triggered the run:
| Field | Type | What it is |
|---|---|---|
inputs.classify.properties | string[] | Flat list of the detected class strings, e.g. ["car","person"]. Gate on it with contains / in / exists. |
inputs.classify.objectCount | int | Number of detected objects. Gate on it numerically (gt / gte / lt / lte / eq). |
inputs.classify.details | object[] | Per-object detail — each entry carries classified (the class), distance, isStatic and trajectory/frame geometry. It is an array, so a condition path can’t index into it: read it in microservice code, but gate on properties / objectCount. |
storage — media-fetch credentials
The credentials your microservice uses to fetch the recording. The base trio is always set; the vaultOverride* quartet appears when the recording lives on its own (per-tenant) backend — prefer the override when present, otherwise use the base.
| Field | Type | What it is |
|---|---|---|
storage.uri | string | Base storage endpoint. |
storage.accessKey | string | Base storage access key. |
storage.secret | string | Base storage secret. |
storage.vaultOverrideUri | string | Per-recording override endpoint (when set). |
storage.vaultOverrideAccessKey | string | Override access key. |
storage.vaultOverrideSecret | string | Override secret. |
storage.vaultOverrideProvider | string | Override provider. |
inputs and results are your read-only upstream context — the same bags the condition matcher evaluates needs against. The engine routes purely by operation and the registry; it never inspects your output to decide where the run goes.
Acknowledgement
The broker delivers at least once. Acknowledge a message only after the work is durably done (result written or routed back); on failure, let it nack so the broker can redeliver. Because redelivery is possible, make your stage idempotent — key your output by the recording (key) and the run (runId) so a replay replaces rather than duplicates.
Doing the work
Your microservice is a stateless consumer: pull a run, fetch the media with the credentials in storage, compute, route the result back. It can be written in any language that can speak the broker and the WorkflowRun JSON — the only contract is the queue it reads and the run it returns. Reuse the context already on the run (inputs / results) rather than re-fetching it. Keep it single-purpose; if you need a second capability, add a second stage.
Sending a result back
You return the same WorkflowRun you received — echo runId, key, traceId and user so the engine can locate and scope the run — with storage cleared and your result in exactly one channel. Publish it back to the engine’s queue (WORKFLOWS_QUEUE, default hub-workflows-queue); the engine marks the stage resolved and fires any conditional stage that was waiting on it.
There are two sinks. Default to letting the platform persist your result — hand it back and the ingest core stores it, so your microservice needs no datastore of its own. A stage that produces genuinely new data can instead own its storage and write its own collection.
Enrich in place
The default sink: hand the platform a block envelope in payload and it stores the result for you — your microservice needs no database. The envelope is an ordered list of typed blocks — { "type": "detection", "data": … }, { "type": "marker", "data": … } — and the engine routes each block by its type through the shared ingest core, persists it against the run’s own recording, and mirrors the blocks into results.<operation> grouped by type (results.<operation>.detections, …markers) so a downstream condition can branch on what the stage produced — element-wise with *. A stage no longer declares a result kind — each block self-describes via its type; see Blocks for each type’s contract. Set payload or results[operation], never both.
Own collection
For genuinely new, standalone data with no built-in block type — embeddings, descriptions, a custom search index — a stage can write its own collection, keyed by the recording, and set only its routing values under results.<operation> on the returned run (leave payload empty). The platform just records the resolution; your microservice owns the write, and so brings its own datastore access.
The difference between the sinks is only who writes the result — the platform, from the block envelope you hand back, or your microservice into its own collection. Either way the engine marks the stage resolved when your run comes back.
Completion and acknowledgement
Every custom stage is asynchronous: nothing blocks on it. The analysis service’s built-in pipeline continues independently, the workflows engine tracks the stage’s run on its own, and your stage’s result lands whenever the microservice finishes. (Blocking, “required” stages are intentionally out of scope in this design — there is no way for a custom stage to stall a run.)
Whichever sink you use, your microservice routes the run back to the workflows engine — its WORKFLOWS_QUEUE — once the work is durably done. The engine records the stage as resolved on the run ($addToSet resolvedoperations), which keeps the run’s provenance complete and stops a re-run from redoing the work. An own-collection stage’s returned run carries just its routing values under results.<operation>; a delegated stage carries the typed payload. A run that never hears back from a stage still completes on the engine’s own rules (with a safety timeout as a backstop), so a crashed microservice can’t wedge the pipeline.
Failure modes & gotchas
- Routing without a microservice (or vice-versa). The two
enabledflags are independent: routing (workflows.stages.<name>.enabled) with no microservice queues messages no one consumes; a microservice (services.<name>.enabled) with no routing never receives any. Keep them enabled together — they share the stage name, so they always address the same queue. - No completion ack. A microservice that writes its result but never echoes back to the workflows engine (
WORKFLOWS_QUEUE) leaves the stage absent fromresolvedoperations. Harmless to the run (stages are async), but it breaks provenance and lets a re-run repeat the work. Always ack. - Re-decode cost. A stage that re-fetches and re-decodes the video pays that cost per recording; reuse data already in the envelope or the database where you can.
- Non-idempotent writes. Redelivery will duplicate output unless you upsert on a stable key.
Checklist
- Pick a unique operation id — it’s the routing key, the result key (
results.<id>) and the completion key (the queue is whatever you set inservices.<id>.queue) - Add routing under
kerberoshub.workflows.stages.<id>(enabled: true,dispatch: always) and deployment underkerberoshub.services.<id>(enabled: true, image +queue) - Make sure the workflows engine is enabled (
kerberoshub.workflows.enabled) - Consume the dispatched
WorkflowRun, resolve the recording fromkey, fetch media with the credentials instorage - Read upstream context from
inputs/resultsinstead of re-fetching it - Pick a sink — enrich in place (set
payload, the default) or your own collection (setresults.<id>) - Idempotent writes (upsert by
key+runId) - Route the run back to the workflows engine (
WORKFLOWS_QUEUE) withstoragecleared and your result in one channel - (Optional) gate per-recording with
dispatch: conditional+needs+needsMode