How I Actually Build Event-Driven AI Pipelines (And the Queue That Saved Me)
Table of Contents
The difference between a demo pipeline and a production pipeline is what happens when a marketing campaign sends 500 events in an hour. I learned this the hard way when my n8n workflow fell over during a product launch.
I see the same failure pattern in discussions on r/n8n and r/selfhosted. Someone builds a webhook → LLM workflow. It works for ten events a day. Then real traffic hits, the LLM API rate-limits, events duplicate or get lost, and nobody knows what failed. I was that someone.
This post is my actual mental model for event-driven AI pipelines. It is based on something I built, something that broke, and something I rebuilt.
The Pipeline That Broke
My first pipeline was simple:
- n8n receives a webhook from my blog’s comment system.
- n8n sends the comment to an LLM for sentiment analysis.
- n8n posts the result to Slack.
It worked for weeks. Then I ran a promotion. The blog got 500 comments in an hour. n8n processed them all simultaneously. The LLM API rate-limited me. Events started failing. Some got retried and created duplicates in Slack. Others got lost entirely. I spent my evening manually cleaning up Slack and apologizing to my team for the spam.
The fix was not a better LLM node. The fix was architecture.
The Shape of the Pipeline I Rebuilt
I split the pipeline into four layers:
- Ingestion: receive and validate events.
- Queue: buffer events and absorb spikes.
- Processing: run the durable workflow that calls the LLM and handles side effects.
- Observability: know what happened, what failed, and what is stuck.
Each layer has one job. Mixing them is how pipelines break. My original pipeline mixed ingestion and processing in one n8n workflow. That was the mistake.
Ingestion: Validate Everything
The ingestion layer is where trust enters the system. If you accept garbage here, everything downstream becomes harder.
My baseline:
- HTTPS only.
- HMAC signature verification.
- Timestamp validation to prevent replays.
- Payload size limits.
- Rate limiting per source.
I use n8n for this layer because it is fast to change and does not need durability. The only thing that matters here is that bad events get rejected before they reach the queue.
After the incident, I added a rate limiter in n8n: max 10 events per minute from the blog webhook. Excess events get rejected with a 429. The blog retries later. This alone would have prevented the flood.
Queue: The Layer That Saved Me
A queue is not optional. It is the pressure valve that keeps spikes from hitting the LLM API directly. Without it, a traffic burst becomes a rate-limit storm, which becomes duplicated or dropped events.
I use Redis Streams because my cluster already runs Redis. Consumer groups are enough for my workload. For very high throughput, Kafka is the next step. The important thing is not which queue you pick. It is that the queue lets you slow down consumption without losing events.
My Redis Streams setup:
import redis
r = redis.Redis(host='redis', port=6379, decode_responses=True)
# Producer (n8n drops validated events here)def enqueue_event(event): r.xadd('ai-pipeline', {'payload': json.dumps(event)})
# Consumer (Temporal worker reads from here)def read_events(count=10): return r.xreadgroup('ai-workers', 'worker-1', {'ai-pipeline': '>'}, count=count)Key properties I care about:
- Consumer groups for parallel processing. I run two Temporal workers.
- Persistence so events survive restarts. Redis is configured with AOF.
- A way to inspect pending messages.
XPENDINGshows me what is stuck. - A dead-letter path for poison events. Events that fail 3 times go to a separate stream for manual review.
Processing: Make It Durable
This is where I use Temporal. An event-driven AI pipeline has side effects: writes to a database, calls to an LLM, calls to another API. If the worker crashes mid-processing, you need to resume where you left off, not start over.
Temporal gives you that through event history replay. More importantly, it forces you to separate workflow logic from activity logic. That separation makes retries, compensation, and idempotency explicit instead of accidental.
My Temporal workflow looks like this:
from temporalio import workflowfrom temporalio.activity import activity
@workflow.defnclass CommentAnalysisWorkflow: @workflow.run async def run(self, event: dict) -> dict: # Activity: call LLM sentiment = await workflow.execute_activity( analyze_sentiment, event['comment'], start_to_close_timeout=timedelta(seconds=30), retry_policy=RetryPolicy(maximum_attempts=3), )
# Activity: post to Slack await workflow.execute_activity( post_to_slack, {'text': f"Comment sentiment: {sentiment}"}, start_to_close_timeout=timedelta(seconds=10), )
return {'sentiment': sentiment}Each LLM call is an activity with its own retry policy. Side effects happen inside activities, not workflows. Activities are idempotent. The Slack post uses a deduplication key based on the event ID.
Idempotency Is the Real Feature
At-least-once delivery is easy. Exactly-once processing is hard. The practical middle ground is at-least-once delivery plus idempotent processing.
I use a deduplication key based on the event source and a stable identifier. That key gets stored in Redis with a TTL and used as the Temporal workflow ID. This prevents two bad outcomes: duplicate LLM calls and duplicate Slack posts.
The TTL matters. Too short and retries look like new events. Too long and legitimate retries get rejected. I use 24 hours for comment events and 1 hour for real-time events.
Error Handling: What I Actually Do
LLM APIs fail in predictable ways. Rate limits. Timeouts. Context window overflows. Ambiguous 500s. Your pipeline should expect all of them.
My error handling stack:
- Exponential backoff with jitter for transient failures. Temporal handles this automatically.
- A circuit breaker that pauses calls when the API is clearly struggling. I use a simple Redis counter: if 5 failures happen in 1 minute, stop for 5 minutes.
- A dead letter queue for events that fail all retries. I review these daily.
- Alerts on DLQ size, error rate, and consumer lag. I have a Grafana alert for DLQ size > 10.
The DLQ is not a failure. It is a safety valve. The failure is not knowing the DLQ is growing. I missed this once. The DLQ grew to 200 events before I noticed. Now I have an alert.
Observability: The Metrics I Actually Watch
I track six signals in Grafana:
- Queue length and consumer lag. If lag grows, I need more workers.
- LLM API latency and error rate. If error rate spikes, the provider is struggling.
- Workflow success and failure rates. If failures grow, I check the DLQ.
- DLQ size. If this grows, something is systematically wrong.
- Token cost per event. I have a monthly budget. This keeps me honest.
- End-to-end processing latency. From webhook to Slack post. Should be under 10 seconds for comments.
These tell me whether the pipeline is healthy, whether I need to scale workers, and whether the LLM provider is having a bad day.
My Deployment Heuristic (Updated)
| Workload | Stack | Why |
|---|---|---|
| < 100 events/day, low stakes | n8n only | Simple, fast to change |
| 100 to 10k events/day, some side effects | n8n + Redis Streams + Temporal | Queue absorbs spikes, Temporal handles durability |
| > 10k events/day or strict retention | Kafka + Temporal | Kafka is better for high throughput |
| Multiple LLM providers or heavy routing | Add LiteLLM | Gateway handles routing and fallbacks |
I am in the second category. My blog gets 200-500 events per day on normal days, spikes to 2,000 during promotions. The queue + Temporal setup handles this comfortably.
Conclusion
A reliable event-driven AI pipeline is not about picking the right tools. It is about designing for the ways the system will fail. Queue your events. Make processing durable. Deduplicate aggressively. Handle errors explicitly. Watch the right metrics.
Start simple, but do not skip the queue. That is the part that saved me when the promotion hit. Without it, I would have spent another evening cleaning up Slack.
My pipeline now handles spikes without drama. The LLM API still rate-limits me sometimes. The difference is that now the events wait in the queue instead of failing or duplicating. I sleep better.