Introduction
At TrueProfit, our TruePixel service processes real-time e-commerce events for Shopify merchants — ad tracking, attribution, visitor analytics, product stats. At peak, we handle 100K+ data points per hour across hundreds of thousands of unique users.
Over 7 months — August 2025 to March 2026 — I fought three distinct battles with this pipeline. Each fix revealed the next problem. Phase 1 cleared a 100-million-message backlog. Phase 2 slowed but didn't stop an OOM cycle. Phase 3 eliminated it entirely. This is the full story.
The Architecture
The Event Processor sits at the center: it consumes from Kafka, routes each event through 5 stages (index → ad_lead → attribution → customer → product_stats), and writes to Elasticsearch, MongoDB, and Redis. 30 pods in production.
(Shopify webhooks, pixel)"] K["📨 Kafka
(3+ partitions, millions msgs)"] EP["⚙️ Event Processor
(Go, 30 pods)"] ES["🔍 Elasticsearch
(events, attribution)"] MG["🍃 MongoDB
(customers, ad leads)"] RD["⚡ Redis
(product stats, cache)"] EC -->|HTTPS| K K -->|Sarama| EP EP -->|index, attribution| ES EP -->|customer, ad_lead| MG EP -->|product stats| RD RD -->|batch flush| ES
Simple enough in theory. The pain came from scale: sustained high throughput, 50K+ unique shopID:clientID keys in flight, and incremental features shipped without considering memory budgets.
Phase 1: The 100M Message Meltdown
The Crisis
Monday morning. The Kafka consumer lag dashboard turns red. 100 million+ messages are queued and growing. Every 7 days, the backlog accumulates to 100M+, pods restart under memory pressure, and the cycle repeats. I spent 3 days in a war room fixing this.
Root Cause: Unbounded Worker Concurrency
The event processor was configured with 150 concurrent workers and unlimited per-user goroutines:
// Production config — this will hurt you
workerPool: pond.NewPool(150, pond.WithQueueSize(300))
// No limit on concurrent users
// Cleanup: every 5 minutes (too slow)
// Result: 150 workers × goroutines-per-event × 50K+ users = CPU overload
150 workers times multiple goroutines per event times 50K+ unique users: the Go scheduler couldn't cope. GC pauses cascaded. Kafka consumer lag grew exponentially.
Worse: there was a race condition in event ordering. Events for the same user were processed out of order, corrupting attribution data.
The Fix: 48 Hours, 12 Hotfix Commits
August 27: Fix the race condition first — route events through per-user queues keyed by shopID:clientID. Each user's events are now processed sequentially.
August 28, 09:51: Emergency CPU reduction — slash the worker pool:
// AFTER: Bounded and explicit
workerPool: pond.NewPool(50, pond.WithQueueSize(100)) // 150 → 50
maxUsers: 300 // was unlimited
cleanupTicker: 30 * time.Second // was 5 minutes
August 28, 13:06: Final hardening — a bounded semaphore on the Kafka message handler itself:
// Bounded concurrency: at most 10 messages in flight
workerCount := 10
sem := make(chan struct{}, workerCount)
for msg := range partition.Messages() {
sem <- struct{}{} // Block if 10 in-flight — natural backpressure
go func(m *sarama.ConsumerMessage) {
defer func() { <-sem }()
processEvent(m)
}(msg)
}
Results
| Metric | Before | After |
|---|---|---|
| Workers | 150 (unbounded) | 50 (bounded) |
| Max concurrent users | Unlimited | 300 |
| CPU utilization | Overloaded (100%) | Stable (60–70%) |
| Kafka lag | 100M+ (growing) | Cleared within hours |
| Event ordering | Race condition | FIFO per user |
The backlog cleared. Pipeline stabilized. I thought the war was over. It wasn't.
Phase 2: The OOM That Wouldn't Die
New Feature, New Problem
With the pipeline stable, a team member (Dat) shipped a new feature: visitor tracking. Track unique visitors per week per shop — straightforward business requirement. The implementation added three storage layers per event:
- Ristretto in-memory cache — check if visitor seen recently
- Elasticsearch — check weekly visitor index
- MongoDB — increment weekly unique visitor counter
Sounds reasonable. Except the cache was configured like this:
// 1 GB cache budget in a pod with 512 MiB memory limit.
cache, _ := ristretto.NewCache(&ristretto.Config[string, string]{
NumCounters: 1e7, // 10 MILLION key slots
MaxCost: 1 << 30, // 1 GB memory budget ← the bomb
BufferItems: 64,
})
1 GB cache budget in a pod with a 512 MiB memory limit. The math was always going to end one way.
The Slow Bleed
The OOM didn't happen immediately. Memory crept up over 3 days, then the pod was killed and the cycle restarted:
| Time | Memory | Event |
|---|---|---|
| Day 0 | 32 MiB | Pod starts fresh |
| Day 1 | ~200 MiB | Cache filling with visitor keys |
| Day 2 | ~350 MiB | Approaching limit |
| Day 3 | 480–512 MiB | OOM kill → restart → repeat |
30 pods, all on the same death cycle. Grafana showed a perfect sawtooth pattern — memory climbs linearly, hits 512 MiB, drops to zero, climbs again. Two pod families (595d8d6476-* and 78cdb9f9dd-*) showed the same slope even after a deploy, confirming this was a code-level leak, not infrastructure.
The visitor tracking code also had compounding issues:
// One timeout wrapping everything — if ES is slow, MongoDB waits too
ctx, cancel := context.WithTimeout(baseCtx, 60*time.Second)
defer cancel()
checkCache(ctx) // sub-millisecond, but context held for 60s
queryES(ctx) // could wait up to 60s — holds ctx open
writeMongoDB(ctx) // could wait up to 60s — holds ctx open
// Info-level logs in the hot path
// 1,000+ visitors/sec = 1,000+ log allocations/sec
logger.Info("visitor cache hit", "shopID", shopID)
The Fix (December 4, 2025)
Hung (with my review) landed the OOM triage in commit 0aae4ed:
// FIX 1: Halve the cache budget (stops the immediate bleeding)
cache, _ := ristretto.NewCache(&ristretto.Config[string, string]{
NumCounters: 5e6, // 5M (was 10M)
MaxCost: 500 << 20, // 500 MB (was 1 GB)
})
// FIX 2: Auto-calculate cost — prevents over-admission
cache.Set(key, "exists", -1) // -1 = auto-calculate, not manual "1"
// FIX 3: Per-operation timeouts — no more shared deadline
esCtx, esCancel := context.WithTimeout(baseCtx, 30*time.Second)
existsInES, _ := repo.VisitorExistsInES(esCtx, shopID, clientID)
esCancel() // Release ES context immediately after use
mongoCtx, mongoCancel := context.WithTimeout(baseCtx, 15*time.Second)
repo.IncrementWeeklyVisitors(mongoCtx, shopID, clientID)
mongoCancel()
// FIX 4: Debug logs — no allocations on production hot path
logger.Debug("visitor cache hit", "shopID", shopID) // was Info
December 12: Hung migrated visitor checks from MongoDB to Redis — eliminating the 190ms MongoDB upsert per event. January 6, 2026: I removed the MongoDB dependency from the product stats path entirely (b5935ba).
OOM frequency dropped. But the sawtooth pattern persisted at a slower rate. The real root cause was still lurking.
Phase 3: The Final Fix
Still Leaking
Even after the Phase 2 fixes, Grafana showed the same sawtooth: 32 MiB → slow climb → 480 MiB → OOM kill. Just took longer. I spent a week profiling with pprof. The architecture comparison tells the story:
(Watermill)"] SM["sync.Map
1 goroutine per user
50,000+ goroutines"] PS["Product Stats Flush
Load ALL keys → memory
480 MiB spike"] LUA["SCRIPT LOAD
500 calls/sec"] K1 --> SM SM --> PS PS --> LUA end subgraph after["✅ Bounded Keyed Dispatcher (309 lines)"] K2["Kafka Consumer
(Sarama direct)"] UD["userDispatcher
8 workers per partition
~128 goroutines"] BF["Batch Flush
100 keys at a time
~10 MiB"] SHA["EVALSHA
0 SCRIPT LOAD"] K2 --> UD UD --> BF BF --> SHA end
Six root causes I finally identified through profiling:
| # | Root Cause | Impact |
|---|---|---|
| 1 | Duplicate events still spawned goroutines | Wasted memory per duplicate |
| 2 | Customer stage (~190ms) blocked Kafka ack path | Attribution latency ~250ms |
| 3 | Attribution read stale ad data (ad stage async) | 1–5% accuracy loss |
| 4 | sync.Map: unbounded per-user goroutines | 50K+ goroutines at peak |
| 5 | ALL Redis keys loaded into memory before ES bulk write | Peak 480–512 MiB |
| 6 | SCRIPT LOAD per key per flush | 500 Redis roundtrips/sec |
The Nuclear Option: Rewrite
I deleted the entire OrderedEventProcessor (1,291 lines) and kafka_clean.go (294 lines) and replaced them with a bounded keyed dispatcher. The new processing pipeline:
key: shopID:clientID"] subgraph dispatcher["userDispatcher (per partition)"] direction TB BP["Backpressure Gate
max 512 msgs / 32 MiB"] W["Worker Pool
8 bounded workers"] BP --> W end subgraph stages["Event Processing Stages"] direction TB S1["1️⃣ Index Event · SYNC
Dedup: early return if seen"] S2["2️⃣ Ad Lead · ASYNC
Must complete before attribution"] S3["3️⃣ Attribution · ASYNC
Reads ad data from payload"] S4["4️⃣ Customer · FIRE & FORGET
Bounded async pool"] S5["5️⃣ Product Stats · FIRE & FORGET
Redis buffer → batch ES flush"] S1 --> S2 --> S3 S3 --> S4 S3 --> S5 end KM --> dispatcher --> stages
The key innovation — the userDispatcher with dual-budget backpressure:
// The dispatcher: bounded workers + backpressure + per-user ordering
type userDispatcher struct {
queues map[string][]*sarama.ConsumerMessage
pendingMessages int
pendingBytes int64
readyKeys chan string
budgetAvailable chan struct{}
mu sync.Mutex
}
func (d *userDispatcher) dispatch(msg *sarama.ConsumerMessage) {
userKey := string(msg.Key)
for {
d.mu.Lock()
// BACKPRESSURE: Block the Kafka consume loop when budget exhausted.
// This tells Kafka "slow down" — backpressure, not buffering.
if d.pendingMessages >= 512 || d.pendingBytes > 32<<20 {
d.mu.Unlock()
<-d.budgetAvailable // Wait for a worker to drain
continue
}
d.queues[userKey] = append(d.queues[userKey], msg)
d.pendingMessages++
d.pendingBytes += int64(len(msg.Value))
// Only signal readyKeys if this user isn't already in-flight.
// Preserves per-user ordering without goroutine-per-user.
if len(d.queues[userKey]) == 1 {
d.readyKeys <- userKey
}
d.mu.Unlock()
break
}
}
Product stats: batch flush instead of load-all:
// BEFORE: Load ALL 50K Redis keys at once → 480 MiB spike
keys := getAllKeysFromRedis() // 50,000 keys in one []string
bulkWriteToES(ctx, keys) // one enormous allocation
// AFTER: Scan in batches of 100 → ~10 MiB peak
for batch := range scanKeysInBatches(ctx, 100) {
bulkWriteToES(ctx, batch) // process and release; GC can reclaim
}
Lua script SHA caching — eliminate 500 SCRIPT LOAD calls per second:
// BEFORE: SCRIPT LOAD on every flush — 500 extra roundtrips/sec
redis.Call("EVAL", luaScript, keys, args...)
// AFTER: Compute SHA once at startup, reuse forever
var atomicGetDeleteSHA = func() string {
sha, err := rdb.ScriptLoad(ctx, redisScript).Result()
if err != nil {
panic(fmt.Errorf("failed to load redis script: %w", err))
}
return sha
}()
redis.Call("EVALSHA", atomicGetDeleteSHA, keys, args...) // 0 extra roundtrips
The full Kafka claim scheduler flow — showing how backpressure propagates from dispatcher back to the consumer loop:
(per partition)"] --> D["dispatch(msg)"] D --> B{"Budget available?
msgs < 512 AND bytes < 32 MiB"} B -->|No| W["⏸ Block — wait for budgetAvailable
(Kafka consumer pauses naturally)"] W --> D B -->|Yes| Q["Append to queues[userKey]"] Q --> R{"Key already in-flight?"} R -->|No| READY["Push userKey → readyKeys channel"] R -->|Yes| HOLD["Stays queued
(preserves ordering)"] READY --> K["Worker picks up from readyKeys"] K --> P["ProcessEvent
(5 stages)"] P --> M["Mark contiguous offsets
for commit"] M --> F["Release budget
Reschedule key if queue not empty"] HOLD -.->|"Worker finishes → reschedules key"| K
The Final Results
| Metric | Phase 1 (Aug '25) | Phase 2 (Dec '25) | Phase 3 (Mar '26) |
|---|---|---|---|
| Goroutines | 50K+ (unbounded) | 50K+ (same arch) | ~128 (bounded) |
| Peak memory | 480–512 MiB | 480–512 MiB | < 200 MiB |
| Memory trend | Sawtooth (OOM cycle) | Slower sawtooth | Flat (stable) |
| OOM kills / week | 3–5 per pod | 1–2 per pod | 0 |
| Attribution latency | ~250ms | ~250ms | < 100ms |
| Redis SCRIPT LOAD/sec | 500 | 500 | 0 |
| Flush peak memory | 480 MiB | 480 MiB | ~10 MiB |
| Code complexity | 1,291 LOC | 1,291 LOC | 309 LOC |
| Throughput | 10K msg/min | 10K msg/min | 10K msg/min (maintained) |
| Framework overhead | 15% CPU (Watermill) | 15% CPU (Watermill) | 0% (Sarama direct) |
What I Learned: 7 Key Takeaways
sync.Map + goroutine-per-user "looks safe" but creates a deterministic memory leak. It only manifests under sustained production load — never in staging. Always set explicit bounds with semaphores or bounded worker pools before shipping.
Without backpressure, Kafka keeps pushing into an overwhelmed consumer. The dispatcher's blocking <-d.budgetAvailable is the critical valve — it tells Kafka "slow down" instead of buffering into OOM. If your consumer doesn't push back, something else absorbs the pressure.
"Max 512 messages" isn't enough if one message is 1 MB. Always track both count AND byte size. Our dispatcher enforces both: 512 messages AND 32 MiB. One dimension alone will be gamed by the data distribution.
One global 60-second timeout wrapping (cache + ES + MongoDB) wastes latency budget and leaks contexts when upstream is slow. Each downstream call deserves its own timeout: 30s for ES, 15s for MongoDB, nothing for cache. Shared deadlines punish the fast for the slow.
Loading 50K Redis keys into a Go map: 480 MiB spike. Processing 100 at a time: ~10 MiB. A 98% reduction from a trivial change. If you're loading "all of X" into memory, you're probably wrong. The answer is almost always "scan in batches and release."
500 SCRIPT LOAD calls/sec exhausted our Redis connection pool. Pre-computing the SHA at startup: zero extra roundtrips per flush. Same principle applies to compiled regexes, prepared SQL statements, and TLS session resumption.
I tried patching the OrderedEventProcessor for 5 months. Each fix helped but the architecture was fundamentally wrong: goroutine-per-user will always leak, and load-all will always spike. The rewrite (1,291 LOC → 309 LOC) solved all six root causes simultaneously. Know when incremental fixes are just postponing the inevitable.