From 100M Stuck Messages to Zero OOM: A 7-Month War with a Go Event Pipeline

Three phases of debugging unbounded concurrency, memory leaks, and backpressure — processing 100K+ events/hour for Shopify merchants

Introduction

At TrueProfit, our TruePixel service processes real-time e-commerce events for Shopify merchants — ad tracking, attribution, visitor analytics, product stats. At peak, we handle 100K+ data points per hour across hundreds of thousands of unique users.

Over 7 months — August 2025 to March 2026 — I fought three distinct battles with this pipeline. Each fix revealed the next problem. Phase 1 cleared a 100-million-message backlog. Phase 2 slowed but didn't stop an OOM cycle. Phase 3 eliminated it entirely. This is the full story.


The Architecture

The Event Processor sits at the center: it consumes from Kafka, routes each event through 5 stages (index → ad_lead → attribution → customer → product_stats), and writes to Elasticsearch, MongoDB, and Redis. 30 pods in production.

graph LR EC["🌐 Event Collector
(Shopify webhooks, pixel)"] K["📨 Kafka
(3+ partitions, millions msgs)"] EP["⚙️ Event Processor
(Go, 30 pods)"] ES["🔍 Elasticsearch
(events, attribution)"] MG["🍃 MongoDB
(customers, ad leads)"] RD["⚡ Redis
(product stats, cache)"] EC -->|HTTPS| K K -->|Sarama| EP EP -->|index, attribution| ES EP -->|customer, ad_lead| MG EP -->|product stats| RD RD -->|batch flush| ES
TruePixel Event Processor — simplified data flow

Simple enough in theory. The pain came from scale: sustained high throughput, 50K+ unique shopID:clientID keys in flight, and incremental features shipped without considering memory budgets.


Phase 1: The 100M Message Meltdown

Phase 1
100M messages stuck. Pipeline blocked. 3-day war room.
August 25–28, 2025

The Crisis

Monday morning. The Kafka consumer lag dashboard turns red. 100 million+ messages are queued and growing. Every 7 days, the backlog accumulates to 100M+, pods restart under memory pressure, and the cycle repeats. I spent 3 days in a war room fixing this.

Root Cause: Unbounded Worker Concurrency

The event processor was configured with 150 concurrent workers and unlimited per-user goroutines:

processor/config.go — BEFORE (Aug 25, 2025)
// Production config — this will hurt you
workerPool: pond.NewPool(150, pond.WithQueueSize(300))
// No limit on concurrent users
// Cleanup: every 5 minutes (too slow)
// Result: 150 workers × goroutines-per-event × 50K+ users = CPU overload

150 workers times multiple goroutines per event times 50K+ unique users: the Go scheduler couldn't cope. GC pauses cascaded. Kafka consumer lag grew exponentially.

Worse: there was a race condition in event ordering. Events for the same user were processed out of order, corrupting attribution data.

The Fix: 48 Hours, 12 Hotfix Commits

August 27: Fix the race condition first — route events through per-user queues keyed by shopID:clientID. Each user's events are now processed sequentially.

August 28, 09:51: Emergency CPU reduction — slash the worker pool:

processor/config.go — commit fe00ed0
// AFTER: Bounded and explicit
workerPool: pond.NewPool(50, pond.WithQueueSize(100))  // 150 → 50
maxUsers:   300                                         // was unlimited
cleanupTicker: 30 * time.Second                        // was 5 minutes

August 28, 13:06: Final hardening — a bounded semaphore on the Kafka message handler itself:

processor/consume.go — commit 75fd4c4
// Bounded concurrency: at most 10 messages in flight
workerCount := 10
sem := make(chan struct{}, workerCount)

for msg := range partition.Messages() {
    sem <- struct{}{}  // Block if 10 in-flight — natural backpressure
    go func(m *sarama.ConsumerMessage) {
        defer func() { <-sem }()
        processEvent(m)
    }(msg)
}

Results

Metric Before After
Workers150 (unbounded)50 (bounded)
Max concurrent usersUnlimited300
CPU utilizationOverloaded (100%)Stable (60–70%)
Kafka lag100M+ (growing)Cleared within hours
Event orderingRace conditionFIFO per user

The backlog cleared. Pipeline stabilized. I thought the war was over. It wasn't.


Phase 2: The OOM That Wouldn't Die

Phase 2
New feature ships. OOM kills begin. 30 pods on a sawtooth death cycle.
October – December 2025

New Feature, New Problem

With the pipeline stable, a team member (Dat) shipped a new feature: visitor tracking. Track unique visitors per week per shop — straightforward business requirement. The implementation added three storage layers per event:

  1. Ristretto in-memory cache — check if visitor seen recently
  2. Elasticsearch — check weekly visitor index
  3. MongoDB — increment weekly unique visitor counter

Sounds reasonable. Except the cache was configured like this:

visitor/cache.go — commit aaa55a5 (Oct 24, 2025)
// 1 GB cache budget in a pod with 512 MiB memory limit.
cache, _ := ristretto.NewCache(&ristretto.Config[string, string]{
    NumCounters: 1e7,      // 10 MILLION key slots
    MaxCost:     1 << 30,  // 1 GB memory budget  ← the bomb
    BufferItems: 64,
})

1 GB cache budget in a pod with a 512 MiB memory limit. The math was always going to end one way.

The Slow Bleed

The OOM didn't happen immediately. Memory crept up over 3 days, then the pod was killed and the cycle restarted:

TimeMemoryEvent
Day 032 MiBPod starts fresh
Day 1~200 MiBCache filling with visitor keys
Day 2~350 MiBApproaching limit
Day 3480–512 MiBOOM kill → restart → repeat

30 pods, all on the same death cycle. Grafana showed a perfect sawtooth pattern — memory climbs linearly, hits 512 MiB, drops to zero, climbs again. Two pod families (595d8d6476-* and 78cdb9f9dd-*) showed the same slope even after a deploy, confirming this was a code-level leak, not infrastructure.

The visitor tracking code also had compounding issues:

visitor/handler.go — original (problematic)
// One timeout wrapping everything — if ES is slow, MongoDB waits too
ctx, cancel := context.WithTimeout(baseCtx, 60*time.Second)
defer cancel()

checkCache(ctx)          // sub-millisecond, but context held for 60s
queryES(ctx)             // could wait up to 60s — holds ctx open
writeMongoDB(ctx)        // could wait up to 60s — holds ctx open

// Info-level logs in the hot path
// 1,000+ visitors/sec = 1,000+ log allocations/sec
logger.Info("visitor cache hit", "shopID", shopID)

The Fix (December 4, 2025)

Hung (with my review) landed the OOM triage in commit 0aae4ed:

visitor/cache.go — commit 0aae4ed (Dec 4, 2025)
// FIX 1: Halve the cache budget (stops the immediate bleeding)
cache, _ := ristretto.NewCache(&ristretto.Config[string, string]{
    NumCounters: 5e6,       // 5M (was 10M)
    MaxCost:     500 << 20, // 500 MB (was 1 GB)
})

// FIX 2: Auto-calculate cost — prevents over-admission
cache.Set(key, "exists", -1)  // -1 = auto-calculate, not manual "1"

// FIX 3: Per-operation timeouts — no more shared deadline
esCtx, esCancel := context.WithTimeout(baseCtx, 30*time.Second)
existsInES, _ := repo.VisitorExistsInES(esCtx, shopID, clientID)
esCancel()  // Release ES context immediately after use

mongoCtx, mongoCancel := context.WithTimeout(baseCtx, 15*time.Second)
repo.IncrementWeeklyVisitors(mongoCtx, shopID, clientID)
mongoCancel()

// FIX 4: Debug logs — no allocations on production hot path
logger.Debug("visitor cache hit", "shopID", shopID)  // was Info

December 12: Hung migrated visitor checks from MongoDB to Redis — eliminating the 190ms MongoDB upsert per event. January 6, 2026: I removed the MongoDB dependency from the product stats path entirely (b5935ba).

OOM frequency dropped. But the sawtooth pattern persisted at a slower rate. The real root cause was still lurking.


Phase 3: The Final Fix

Phase 3
1 week profiling. 6 root causes. Nuclear rewrite. Zero OOM.
March 2026

Still Leaking

Even after the Phase 2 fixes, Grafana showed the same sawtooth: 32 MiB → slow climb → 480 MiB → OOM kill. Just took longer. I spent a week profiling with pprof. The architecture comparison tells the story:

graph TB subgraph before["❌ OrderedEventProcessor (1,291 lines)"] K1["Kafka Consumer
(Watermill)"] SM["sync.Map
1 goroutine per user
50,000+ goroutines"] PS["Product Stats Flush
Load ALL keys → memory
480 MiB spike"] LUA["SCRIPT LOAD
500 calls/sec"] K1 --> SM SM --> PS PS --> LUA end subgraph after["✅ Bounded Keyed Dispatcher (309 lines)"] K2["Kafka Consumer
(Sarama direct)"] UD["userDispatcher
8 workers per partition
~128 goroutines"] BF["Batch Flush
100 keys at a time
~10 MiB"] SHA["EVALSHA
0 SCRIPT LOAD"] K2 --> UD UD --> BF BF --> SHA end
Before vs After: architecture comparison. 1,291 LOC → 309 LOC, 50K goroutines → 128.

Six root causes I finally identified through profiling:

#Root CauseImpact
1Duplicate events still spawned goroutinesWasted memory per duplicate
2Customer stage (~190ms) blocked Kafka ack pathAttribution latency ~250ms
3Attribution read stale ad data (ad stage async)1–5% accuracy loss
4sync.Map: unbounded per-user goroutines50K+ goroutines at peak
5ALL Redis keys loaded into memory before ES bulk writePeak 480–512 MiB
6SCRIPT LOAD per key per flush500 Redis roundtrips/sec

The Nuclear Option: Rewrite

I deleted the entire OrderedEventProcessor (1,291 lines) and kafka_clean.go (294 lines) and replaced them with a bounded keyed dispatcher. The new processing pipeline:

graph LR KM["Kafka Message
key: shopID:clientID"] subgraph dispatcher["userDispatcher (per partition)"] direction TB BP["Backpressure Gate
max 512 msgs / 32 MiB"] W["Worker Pool
8 bounded workers"] BP --> W end subgraph stages["Event Processing Stages"] direction TB S1["1️⃣ Index Event · SYNC
Dedup: early return if seen"] S2["2️⃣ Ad Lead · ASYNC
Must complete before attribution"] S3["3️⃣ Attribution · ASYNC
Reads ad data from payload"] S4["4️⃣ Customer · FIRE & FORGET
Bounded async pool"] S5["5️⃣ Product Stats · FIRE & FORGET
Redis buffer → batch ES flush"] S1 --> S2 --> S3 S3 --> S4 S3 --> S5 end KM --> dispatcher --> stages
New pipeline: bounded workers, explicit stage sequencing, dedup at entry

The key innovation — the userDispatcher with dual-budget backpressure:

processor/dispatcher.go — SHTP-6414 (Mar 2026)
// The dispatcher: bounded workers + backpressure + per-user ordering
type userDispatcher struct {
    queues          map[string][]*sarama.ConsumerMessage
    pendingMessages int
    pendingBytes    int64
    readyKeys       chan string
    budgetAvailable chan struct{}
    mu              sync.Mutex
}

func (d *userDispatcher) dispatch(msg *sarama.ConsumerMessage) {
    userKey := string(msg.Key)
    for {
        d.mu.Lock()
        // BACKPRESSURE: Block the Kafka consume loop when budget exhausted.
        // This tells Kafka "slow down" — backpressure, not buffering.
        if d.pendingMessages >= 512 || d.pendingBytes > 32<<20 {
            d.mu.Unlock()
            <-d.budgetAvailable  // Wait for a worker to drain
            continue
        }
        d.queues[userKey] = append(d.queues[userKey], msg)
        d.pendingMessages++
        d.pendingBytes += int64(len(msg.Value))
        // Only signal readyKeys if this user isn't already in-flight.
        // Preserves per-user ordering without goroutine-per-user.
        if len(d.queues[userKey]) == 1 {
            d.readyKeys <- userKey
        }
        d.mu.Unlock()
        break
    }
}

Product stats: batch flush instead of load-all:

product_stats/flush.go
// BEFORE: Load ALL 50K Redis keys at once → 480 MiB spike
keys := getAllKeysFromRedis()  // 50,000 keys in one []string
bulkWriteToES(ctx, keys)      // one enormous allocation

// AFTER: Scan in batches of 100 → ~10 MiB peak
for batch := range scanKeysInBatches(ctx, 100) {
    bulkWriteToES(ctx, batch)  // process and release; GC can reclaim
}

Lua script SHA caching — eliminate 500 SCRIPT LOAD calls per second:

redis/scripts.go
// BEFORE: SCRIPT LOAD on every flush — 500 extra roundtrips/sec
redis.Call("EVAL", luaScript, keys, args...)

// AFTER: Compute SHA once at startup, reuse forever
var atomicGetDeleteSHA = func() string {
    sha, err := rdb.ScriptLoad(ctx, redisScript).Result()
    if err != nil {
        panic(fmt.Errorf("failed to load redis script: %w", err))
    }
    return sha
}()

redis.Call("EVALSHA", atomicGetDeleteSHA, keys, args...)  // 0 extra roundtrips

The full Kafka claim scheduler flow — showing how backpressure propagates from dispatcher back to the consumer loop:

flowchart TD C["ConsumeClaim goroutine
(per partition)"] --> D["dispatch(msg)"] D --> B{"Budget available?
msgs < 512 AND bytes < 32 MiB"} B -->|No| W["⏸ Block — wait for budgetAvailable
(Kafka consumer pauses naturally)"] W --> D B -->|Yes| Q["Append to queues[userKey]"] Q --> R{"Key already in-flight?"} R -->|No| READY["Push userKey → readyKeys channel"] R -->|Yes| HOLD["Stays queued
(preserves ordering)"] READY --> K["Worker picks up from readyKeys"] K --> P["ProcessEvent
(5 stages)"] P --> M["Mark contiguous offsets
for commit"] M --> F["Release budget
Reschedule key if queue not empty"] HOLD -.->|"Worker finishes → reschedules key"| K
Kafka claim → dispatch → worker → commit. Backpressure propagates upward automatically.

The Final Results

Metric Phase 1 (Aug '25) Phase 2 (Dec '25) Phase 3 (Mar '26)
Goroutines 50K+ (unbounded) 50K+ (same arch) ~128 (bounded)
Peak memory 480–512 MiB 480–512 MiB < 200 MiB
Memory trend Sawtooth (OOM cycle) Slower sawtooth Flat (stable)
OOM kills / week 3–5 per pod 1–2 per pod 0
Attribution latency ~250ms ~250ms < 100ms
Redis SCRIPT LOAD/sec 500 500 0
Flush peak memory 480 MiB 480 MiB ~10 MiB
Code complexity 1,291 LOC 1,291 LOC 309 LOC
Throughput 10K msg/min 10K msg/min 10K msg/min (maintained)
Framework overhead 15% CPU (Watermill) 15% CPU (Watermill) 0% (Sarama direct)

What I Learned: 7 Key Takeaways

1
Unbounded concurrency is a time bomb

sync.Map + goroutine-per-user "looks safe" but creates a deterministic memory leak. It only manifests under sustained production load — never in staging. Always set explicit bounds with semaphores or bounded worker pools before shipping.

2
Backpressure beats buffering

Without backpressure, Kafka keeps pushing into an overwhelmed consumer. The dispatcher's blocking <-d.budgetAvailable is the critical valve — it tells Kafka "slow down" instead of buffering into OOM. If your consumer doesn't push back, something else absorbs the pressure.

3
Memory budgets need dual limits

"Max 512 messages" isn't enough if one message is 1 MB. Always track both count AND byte size. Our dispatcher enforces both: 512 messages AND 32 MiB. One dimension alone will be gamed by the data distribution.

4
Timeouts must be per-operation

One global 60-second timeout wrapping (cache + ES + MongoDB) wastes latency budget and leaks contexts when upstream is slow. Each downstream call deserves its own timeout: 30s for ES, 15s for MongoDB, nothing for cache. Shared deadlines punish the fast for the slow.

5
Batch everything that touches memory

Loading 50K Redis keys into a Go map: 480 MiB spike. Processing 100 at a time: ~10 MiB. A 98% reduction from a trivial change. If you're loading "all of X" into memory, you're probably wrong. The answer is almost always "scan in batches and release."

6
Cache your expensive setup operations

500 SCRIPT LOAD calls/sec exhausted our Redis connection pool. Pre-computing the SHA at startup: zero extra roundtrips per flush. Same principle applies to compiled regexes, prepared SQL statements, and TLS session resumption.

7
Sometimes you have to delete and rewrite

I tried patching the OrderedEventProcessor for 5 months. Each fix helped but the architecture was fundamentally wrong: goroutine-per-user will always leak, and load-all will always spike. The rewrite (1,291 LOC → 309 LOC) solved all six root causes simultaneously. Know when incremental fixes are just postponing the inevitable.


The War Timeline

August 25, 2025
100M messages stuck. Pipeline blocked.
Kafka lag growing unbounded. War room declared.
August 27–28, 2025
12 hotfix commits in 48 hours.
Worker pool 150→50. Race condition fixed. Bounded semaphore added. Pipeline restored.
October 24, 2025
Visitor tracking feature ships.
1 GB Ristretto cache in a 512 MiB pod. The clock starts ticking.
November – December 2025
OOM kills begin. 30 pods on sawtooth cycle.
Memory: 32 → 512 MiB → kill → repeat. Grafana shows perfect sawtooth on all pods.
December 4, 2025
Cache reduced 1 GB → 500 MB. Per-op timeouts. Debug logs.
OOM frequency reduced. Sawtooth slows but persists.
December 12, 2025
Visitor checks migrated MongoDB → Redis.
190ms MongoDB upsert per event eliminated from hot path.
January 6, 2026
Profit optimizer MongoDB dependency removed.
Simpler architecture. Fewer moving parts per event.
March 12–18, 2026
1 week of pprof profiling. Still sawtoothing.
6 root causes identified: goroutine-per-user, load-all flush, SCRIPT LOAD, stale attribution, duplicate goroutines, shared timeout.
March 26, 2026
Nuclear option: delete 1,585 LOC, rewrite as 309 LOC bounded dispatcher.
All 6 root causes resolved in one rewrite. Memory drops to flat < 200 MiB.
March 30, 2026
SHTP-6414 hardened: async stages, dedup, dual-budget backpressure.
Production stable 30+ days. Zero OOM kills.