Go Fiber worker queue dengan Redis cocok ketika aplikasi HTTP perlu memindahkan pekerjaan berat atau lambat ke proses background, tetapi tetap butuh kontrol operasional yang jelas. Tantangan utamanya bukan sekadar memasukkan job ke antrean, melainkan memastikan job tidak hilang, tidak diproses ganda tanpa kontrol, bisa di-retry dengan aman, dan tetap konsisten saat worker crash di tengah proses.

Di artikel ini, kita akan membangun pendekatan pragmatis: producer-consumer dengan Go Fiber sebagai API enqueue, Redis sebagai broker dan penyimpan state kerja, retry dengan backoff, dead-letter queue sederhana, idempotensi, distributed lock untuk mencegah job ganda, serta pola manual ack dan visibility timeout agar job yang macet bisa diambil ulang.

Arsitektur worker queue yang andal

Desain minimal yang cukup aman untuk banyak kasus produksi terdiri dari komponen berikut:

  • Producer: endpoint Fiber yang menerima request lalu membuat job.
  • Queue utama: Redis List untuk antrean job siap proses.
  • Processing set: Redis Sorted Set untuk job yang sedang dikerjakan beserta batas waktu visibilitasnya.
  • Job store: Redis Hash/String untuk metadata job, payload, status, percobaan retry, dan hasil error terakhir.
  • Worker: proses Go yang mengambil job, memasang lock, memproses, lalu ack atau requeue.
  • Dead-letter queue: antrean untuk job yang gagal permanen setelah retry habis.
  • Reaper/reconciler: proses berkala yang memindahkan job stuck dari processing kembali ke queue bila visibility timeout terlewati.

Kenapa tidak cukup memakai satu Redis List saja? Karena operasi dequeue biasa membuat job langsung hilang dari antrean saat diambil worker. Jika worker crash sebelum menyelesaikan job, Anda kehilangan jejak tanpa mekanisme pemulihan. Karena itu kita butuh state tambahan untuk job yang sedang diproses.

Alur data

  1. Client memanggil endpoint Fiber untuk membuat job.
  2. API menyimpan metadata job ke Redis dan memasukkan job_id ke queue utama.
  3. Worker mengambil job_id, menandai job sebagai processing, dan memasukkannya ke Sorted Set processing dengan timestamp deadline.
  4. Worker memasang distributed lock berdasarkan identitas job atau resource target.
  5. Jika sukses, worker menjalankan handler bisnis.
  6. Jika berhasil, worker menghapus job dari processing dan menandainya selesai.
  7. Jika gagal, worker menambah hitungan retry dan menjadwalkan ulang dengan backoff, atau memindahkan ke dead-letter queue bila batas retry terlampaui.
  8. Jika worker crash, reaper akan mendeteksi job yang melewati visibility timeout dan memasukkannya kembali ke queue.

Struktur key Redis yang disarankan

Gunakan penamaan key yang konsisten agar mudah diinspeksi di produksi.

queue:jobs                    # List, antrean utama berisi job_id
queue:jobs:processing         # ZSET, job_id -> visibility deadline (unix ts)
queue:jobs:retry             # ZSET, job_id -> next retry time
queue:jobs:dead              # List, job gagal permanen
job:{id}                     # Hash atau String JSON, metadata lengkap job
lock:job:{id}                # Lock per job untuk cegah duplikasi eksekusi
lock:resource:{key}          # Lock per resource, opsional untuk kasus spesifik
idempotency:{key}            # Penanda request/job yang sudah pernah diproses

Metadata yang umumnya perlu disimpan di job:{id}:

  • type: tipe job, misalnya send_email atau refresh_report.
  • status: queued, processing, done, retrying, dead.
  • payload: data JSON.
  • attempt: jumlah percobaan saat ini.
  • max_attempts: batas retry.
  • created_at, updated_at.
  • last_error: error terakhir untuk debugging.
  • idempotency_key: untuk mencegah pemrosesan ganda.
  • resource_key: jika job menyentuh resource tertentu yang perlu lock khusus.

Format payload job: sederhana, eksplisit, dan aman di-evolusi

Jangan langsung memasukkan objek Go internal yang rapuh ke antrean. Simpan payload yang stabil dan eksplisit. Contoh struktur job:

type Job struct {
    ID             string          `json:"id"`
    Type           string          `json:"type"`
    Payload        json.RawMessage `json:"payload"`
    Attempt        int             `json:"attempt"`
    MaxAttempts    int             `json:"max_attempts"`
    IdempotencyKey string          `json:"idempotency_key"`
    ResourceKey    string          `json:"resource_key,omitempty"`
    CreatedAt      time.Time       `json:"created_at"`
}

type SendEmailPayload struct {
    UserID   string `json:"user_id"`
    Email    string `json:"email"`
    Template string `json:"template"`
}

Prinsipnya:

  • Payload harus cukup untuk diproses ulang tanpa bergantung pada state memori worker.
  • Jangan menyimpan data terlalu besar jika cukup menyimpan referensi ID lalu membaca detail dari database saat proses berjalan.
  • Tambahkan versioning bila perlu, misalnya field schema_version, jika format payload mungkin berubah.

Endpoint Fiber untuk enqueue job

Contoh berikut menunjukkan endpoint Fiber yang menerima request, membuat job, menyimpan metadata ke Redis, lalu mendorong job_id ke queue. Contoh ini sengaja fokus pada alur inti, bukan seluruh bootstrap aplikasi.

package main

import (
    "context"
    "encoding/json"
    "time"

    "github.com/gofiber/fiber/v2"
    "github.com/google/uuid"
    "github.com/redis/go-redis/v9"
)

type EnqueueEmailRequest struct {
    UserID   string `json:"user_id"`
    Email    string `json:"email"`
    Template string `json:"template"`
}

type Job struct {
    ID             string          `json:"id"`
    Type           string          `json:"type"`
    Payload        json.RawMessage `json:"payload"`
    Attempt        int             `json:"attempt"`
    MaxAttempts    int             `json:"max_attempts"`
    IdempotencyKey string          `json:"idempotency_key"`
    ResourceKey    string          `json:"resource_key,omitempty"`
    CreatedAt      time.Time       `json:"created_at"`
}

var rdb *redis.Client
var ctx = context.Background()

func enqueueEmail(c *fiber.Ctx) error {
    var req EnqueueEmailRequest
    if err := c.BodyParser(&req); err != nil {
        return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{"error": "invalid payload"})
    }

    payload, _ := json.Marshal(req)
    job := Job{
        ID:             uuid.NewString(),
        Type:           "send_email",
        Payload:        payload,
        Attempt:        0,
        MaxAttempts:    5,
        IdempotencyKey: "send_email:" + req.UserID + ":" + req.Template,
        ResourceKey:    "user:" + req.UserID,
        CreatedAt:      time.Now().UTC(),
    }

    raw, _ := json.Marshal(job)
    pipe := rdb.TxPipeline()
    pipe.Set(ctx, "job:"+job.ID, raw, 24*time.Hour)
    pipe.RPush(ctx, "queue:jobs", job.ID)
    _, err := pipe.Exec(ctx)
    if err != nil {
        return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{"error": "failed to enqueue job"})
    }

    return c.Status(fiber.StatusAccepted).JSON(fiber.Map{
        "job_id": job.ID,
        "status": "queued",
    })
}

Catatan penting: penyimpanan metadata dan push ke queue dilakukan dalam pipeline agar lebih efisien. Jika Anda membutuhkan jaminan atomis yang lebih kuat pada beberapa langkah yang saling bergantung, pertimbangkan transaksi Redis atau Lua script.

Worker loop Go: dequeue, visibility timeout, lock, dan manual ack

Pola pragmatis yang aman adalah:

  1. Worker mengambil job dari queue.
  2. Sebelum memproses, job didaftarkan ke queue:jobs:processing dengan deadline visibility timeout.
  3. Worker memasang lock.
  4. Worker memproses job.
  5. Jika berhasil, lakukan manual ack: hapus dari processing dan perbarui status.
  6. Jika gagal, requeue dengan backoff atau kirim ke dead-letter.

Contoh implementasi sederhana:

package main

import (
    "context"
    "encoding/json"
    "errors"
    "fmt"
    "log"
    "math"
    "time"

    "github.com/redis/go-redis/v9"
)

var workerCtx = context.Background()

func workerLoop(rdb *redis.Client) {
    for {
        res, err := rdb.BLPop(workerCtx, 5*time.Second, "queue:jobs").Result()
        if err != nil {
            if err == redis.Nil {
                continue
            }
            log.Printf("blpop error: %v", err)
            time.Sleep(time.Second)
            continue
        }
        if len(res) != 2 {
            continue
        }

        jobID := res[1]
        if err := markProcessing(rdb, jobID, 30*time.Second); err != nil {
            log.Printf("markProcessing error job=%s err=%v", jobID, err)
            _ = rdb.LPush(workerCtx, "queue:jobs", jobID).Err()
            continue
        }

        if err := processOne(rdb, jobID); err != nil {
            log.Printf("processOne error job=%s err=%v", jobID, err)
        }
    }
}

func markProcessing(rdb *redis.Client, jobID string, visibilityTimeout time.Duration) error {
    deadline := time.Now().Add(visibilityTimeout).Unix()
    pipe := rdb.TxPipeline()
    pipe.ZAdd(workerCtx, "queue:jobs:processing", redis.Z{Score: float64(deadline), Member: jobID})
    pipe.Set(workerCtx, "job:"+jobID+":heartbeat", fmt.Sprintf("%d", time.Now().Unix()), visibilityTimeout*2)
    _, err := pipe.Exec(workerCtx)
    return err
}

func processOne(rdb *redis.Client, jobID string) error {
    raw, err := rdb.Get(workerCtx, "job:"+jobID).Bytes()
    if err != nil {
        return err
    }

    var job Job
    if err := json.Unmarshal(raw, &job); err != nil {
        return moveToDead(rdb, jobID, "invalid job payload")
    }

    unlock, err := acquireLock(rdb, "lock:job:"+job.ID, 45*time.Second)
    if err != nil {
        return requeueWithBackoff(rdb, job, "lock not acquired")
    }
    defer unlock()

    if job.ResourceKey != "" {
        resourceUnlock, err := acquireLock(rdb, "lock:resource:"+job.ResourceKey, 45*time.Second)
        if err != nil {
            return requeueWithBackoff(rdb, job, "resource busy")
        }
        defer resourceUnlock()
    }

    done, err := checkIdempotency(rdb, job.IdempotencyKey)
    if err != nil {
        return err
    }
    if done {
        return ackSuccess(rdb, jobID)
    }

    if err := handleJob(job); err != nil {
        return requeueWithBackoff(rdb, job, err.Error())
    }

    if err := markIdempotentDone(rdb, job.IdempotencyKey, 24*time.Hour); err != nil {
        return err
    }

    return ackSuccess(rdb, jobID)
}

func handleJob(job Job) error {
    switch job.Type {
    case "send_email":
        var p SendEmailPayload
        if err := json.Unmarshal(job.Payload, &p); err != nil {
            return err
        }
        // Panggil provider email / service bisnis di sini.
        // Pastikan operasi downstream juga idempotent bila memungkinkan.
        return nil
    default:
        return errors.New("unknown job type")
    }
}

func ackSuccess(rdb *redis.Client, jobID string) error {
    raw, err := rdb.Get(workerCtx, "job:"+jobID).Bytes()
    if err != nil {
        return err
    }
    var job Job
    _ = json.Unmarshal(raw, &job)

    updated, _ := json.Marshal(map[string]any{
        "id": job.ID,
        "type": job.Type,
        "payload": json.RawMessage(job.Payload),
        "attempt": job.Attempt,
        "max_attempts": job.MaxAttempts,
        "idempotency_key": job.IdempotencyKey,
        "resource_key": job.ResourceKey,
        "created_at": job.CreatedAt,
        "status": "done",
        "updated_at": time.Now().UTC(),
    })

    pipe := rdb.TxPipeline()
    pipe.ZRem(workerCtx, "queue:jobs:processing", jobID)
    pipe.Set(workerCtx, "job:"+jobID, updated, 24*time.Hour)
    pipe.Del(workerCtx, "job:"+jobID+":heartbeat")
    _, err = pipe.Exec(workerCtx)
    return err
}

func requeueWithBackoff(rdb *redis.Client, job Job, reason string) error {
    job.Attempt++
    if job.Attempt >= job.MaxAttempts {
        return moveToDead(rdb, job.ID, reason)
    }

    delay := backoff(job.Attempt)
    updated, _ := json.Marshal(map[string]any{
        "id": job.ID,
        "type": job.Type,
        "payload": json.RawMessage(job.Payload),
        "attempt": job.Attempt,
        "max_attempts": job.MaxAttempts,
        "idempotency_key": job.IdempotencyKey,
        "resource_key": job.ResourceKey,
        "created_at": job.CreatedAt,
        "status": "retrying",
        "last_error": reason,
        "updated_at": time.Now().UTC(),
    })

    nextTime := time.Now().Add(delay).Unix()
    pipe := rdb.TxPipeline()
    pipe.Set(workerCtx, "job:"+job.ID, updated, 24*time.Hour)
    pipe.ZRem(workerCtx, "queue:jobs:processing", job.ID)
    pipe.ZAdd(workerCtx, "queue:jobs:retry", redis.Z{Score: float64(nextTime), Member: job.ID})
    pipe.Del(workerCtx, "job:"+job.ID+":heartbeat")
    _, err := pipe.Exec(workerCtx)
    return err
}

func moveToDead(rdb *redis.Client, jobID, reason string) error {
    raw, _ := rdb.Get(workerCtx, "job:"+jobID).Bytes()
    var obj map[string]any
    _ = json.Unmarshal(raw, &obj)
    obj["status"] = "dead"
    obj["last_error"] = reason
    obj["updated_at"] = time.Now().UTC()
    updated, _ := json.Marshal(obj)

    pipe := rdb.TxPipeline()
    pipe.Set(workerCtx, "job:"+jobID, updated, 7*24*time.Hour)
    pipe.ZRem(workerCtx, "queue:jobs:processing", jobID)
    pipe.RPush(workerCtx, "queue:jobs:dead", jobID)
    pipe.Del(workerCtx, "job:"+jobID+":heartbeat")
    _, err := pipe.Exec(workerCtx)
    return err
}

func backoff(attempt int) time.Duration {
    sec := math.Min(math.Pow(2, float64(attempt)), 300)
    return time.Duration(sec) * time.Second
}

Poin penting dari contoh di atas:

  • Manual ack terjadi hanya setelah proses benar-benar selesai.
  • Visibility timeout dipakai agar job yang diambil worker tetapi tidak selesai tetap bisa dipulihkan.
  • Lock per job mencegah dua worker mengeksekusi job yang sama pada kondisi race.
  • Lock per resource berguna bila dua job berbeda menyentuh entitas yang sama, misalnya user, invoice, atau stok produk.

Distributed lock: kapan perlu dan apa batasannya

Lock Redis lazim dipakai dengan pola SET key value NX EX ttl. Tujuannya bukan membuat sistem jadi exactly-once, karena itu sulit dicapai di sistem terdistribusi, melainkan mengurangi eksekusi ganda yang berbahaya.

func acquireLock(rdb *redis.Client, key string, ttl time.Duration) (func(), error) {
    token := fmt.Sprintf("%d", time.Now().UnixNano())
    ok, err := rdb.SetNX(workerCtx, key, token, ttl).Result()
    if err != nil {
        return nil, err
    }
    if !ok {
        return nil, errors.New("lock busy")
    }

    unlock := func() {
        current, err := rdb.Get(workerCtx, key).Result()
        if err == nil && current == token {
            _ = rdb.Del(workerCtx, key).Err()
        }
    }
    return unlock, nil
}

Keterbatasan penting: lock dengan TTL bisa kedaluwarsa saat job masih berjalan lama. Jika itu mungkin terjadi, worker perlu memperpanjang lock secara berkala lewat heartbeat atau memilih TTL yang realistis. Kalau tidak, worker lain bisa mengambil alih sementara proses lama belum selesai.

Lock bukan pengganti idempotensi. Walaupun lock mengurangi race condition, sistem tetap harus tahan terhadap duplikasi karena crash, timeout, retry, atau reconnect jaringan tetap dapat memicu pemrosesan ulang.

Idempotensi: pertahanan utama terhadap duplicate processing

Masalah duplicate processing nyata sering muncul saat:

  • Producer mengirim request dua kali karena timeout dari client.
  • Worker crash setelah efek samping terjadi, tetapi sebelum ack tersimpan.
  • Reaper mengembalikan job ke queue saat worker lama sebenarnya masih hidup namun terisolasi jaringan.

Karena itu, handler bisnis harus didesain idempotent. Misalnya, pengiriman email promosi untuk kombinasi user dan template tertentu bisa diberi idempotency_key. Sebelum menjalankan aksi, worker mengecek apakah key itu sudah pernah diselesaikan.

func checkIdempotency(rdb *redis.Client, key string) (bool, error) {
    if key == "" {
        return false, nil
    }
    n, err := rdb.Exists(workerCtx, "idempotency:"+key).Result()
    return n > 0, err
}

func markIdempotentDone(rdb *redis.Client, key string, ttl time.Duration) error {
    if key == "" {
        return nil
    }
    return rdb.Set(workerCtx, "idempotency:"+key, "1", ttl).Err()
}

Jika efek samping terjadi di database SQL, idempotensi yang lebih kuat biasanya dibuat di level database: unique constraint, tabel outbox/inbox, atau status transisi yang tervalidasi. Redis bisa membantu, tetapi sumber kebenaran bisnis sering kali tetap ada di database utama.

Visibility timeout dan pemulihan saat worker crash

Visibility timeout berarti job yang sudah diambil worker dianggap sementara tidak terlihat oleh worker lain, tetapi belum dianggap selesai. Jika worker gagal menyelesaikan job sebelum deadline, job dapat diambil ulang.

Kenapa perlu mekanisme ini

Tanpa visibility timeout, ada dua pilihan buruk:

  • Job dihapus dari queue terlalu awal lalu hilang saat worker crash.
  • Job tetap ada di queue dan bisa diambil worker lain, memicu duplikasi instan.

Pola processing set memberi kompromi yang masuk akal: job keluar dari queue utama, tetapi masih tercatat sebagai in-flight dan bisa dipulihkan.

Reaper untuk job stuck

Butuh proses berkala untuk memindahkan job yang sudah melewati deadline kembali ke queue.

func retryDueJobs(rdb *redis.Client) error {
    now := time.Now().Unix()
    ids, err := rdb.ZRangeByScore(workerCtx, "queue:jobs:retry", &redis.ZRangeBy{
        Min: "-inf",
        Max: fmt.Sprintf("%d", now),
    }).Result()
    if err != nil {
        return err
    }
    for _, id := range ids {
        pipe := rdb.TxPipeline()
        pipe.ZRem(workerCtx, "queue:jobs:retry", id)
        pipe.RPush(workerCtx, "queue:jobs", id)
        _, _ = pipe.Exec(workerCtx)
    }
    return nil
}

func requeueExpiredProcessing(rdb *redis.Client) error {
    now := time.Now().Unix()
    ids, err := rdb.ZRangeByScore(workerCtx, "queue:jobs:processing", &redis.ZRangeBy{
        Min: "-inf",
        Max: fmt.Sprintf("%d", now),
    }).Result()
    if err != nil {
        return err
    }
    for _, id := range ids {
        pipe := rdb.TxPipeline()
        pipe.ZRem(workerCtx, "queue:jobs:processing", id)
        pipe.RPush(workerCtx, "queue:jobs", id)
        pipe.Del(workerCtx, "job:"+id+":heartbeat")
        _, _ = pipe.Exec(workerCtx)
    }
    return nil
}

Ini sederhana, tetapi ada trade-off. Jika worker masih benar-benar bekerja namun lambat, reaper bisa memicu requeue terlalu dini. Karena itu:

  • Set visibility timeout lebih besar dari durasi normal job.
  • Tambahkan heartbeat atau perpanjangan deadline untuk job yang lama.
  • Pisahkan job cepat dan job lambat ke queue berbeda bila karakteristiknya sangat berbeda.

Retry dengan backoff dan dead-letter queue

Tidak semua kegagalan layak di-retry. Bedakan setidaknya dua kategori:

  • Transient error: timeout jaringan, koneksi Redis/DB terputus singkat, rate limit sementara.
  • Permanent error: payload invalid, resource tidak ada, bug logika tertentu yang tidak akan membaik dengan retry.

Backoff membantu mengurangi tekanan pada sistem downstream. Strategi paling umum adalah exponential backoff, idealnya dengan jitter agar banyak worker tidak retry serentak. Pada contoh sebelumnya kita memakai backoff eksponensial sederhana. Untuk produksi, menambahkan jitter acak biasanya lebih aman.

Dead-letter queue berguna agar job yang gagal permanen tidak memutar tanpa akhir. Tim operasional bisa memeriksa isi queue:jobs:dead, melihat last_error, memperbaiki data, lalu memutuskan apakah job perlu dijalankan ulang secara manual.

Menjaga konsistensi saat worker crash di tengah proses

Ini bagian paling penting secara operasional. Pertanyaannya: apa yang terjadi jika worker crash setelah menulis ke database, tetapi sebelum ack ke Redis?

Jawaban praktisnya: anggap itu akan terjadi, lalu desain sistem agar aman.

Pola yang membantu

  • Idempotent handler: saat job diproses ulang, efek samping tidak digandakan.
  • Status transisi di database: misalnya update hanya jika status lama tertentu masih cocok.
  • Unique constraint: mencegah insert duplikat.
  • Outbox pattern: jika perubahan database dan event lanjutan harus konsisten.
  • Checkpoint state: untuk job panjang, simpan progres agar re-run bisa melanjutkan atau menghindari langkah yang sudah selesai.

Contoh kasus nyata:

  • Kirim email: simpan record bahwa email untuk idempotency_key tertentu sudah terkirim sebelum atau sesudah memanggil provider, tergantung desain kompensasi Anda.
  • Refresh cache: tulis data baru ke key versi sementara, lalu swap atomik untuk mencegah cache stale setengah jadi.
  • Sinkronisasi invoice: gunakan status seperti syncing, synced, failed dan transisi yang tervalidasi.

Masalah operasional nyata dan cara menghindarinya

1. Duplicate processing

Penyebab umum:

  • Retry setelah timeout.
  • Crash sebelum ack.
  • Reaper yang terlalu agresif.

Mitigasi:

  • Idempotency key.
  • Lock per job atau per resource.
  • Visibility timeout yang tepat.
  • Handler yang aman dijalankan ulang.

2. Stuck job

Penyebab umum:

  • Worker mati tanpa cleanup.
  • Lock tidak dilepas.
  • Job masuk processing tetapi tidak pernah ack atau requeue.

Mitigasi:

  • TTL pada lock.
  • Processing set + reaper.
  • Heartbeat untuk mendeteksi worker hidup.
  • Alert jika ukuran processing terus naik.

3. Race condition

Contoh: dua job berbeda mengubah resource yang sama hampir bersamaan. Lock per job tidak cukup karena ID job berbeda. Solusinya adalah lock per resource atau serialisasi berdasarkan key bisnis yang sama.

4. Cache stale

Jika job bertugas memperbarui cache, jangan hapus cache lebih dulu lalu mengisi ulang dengan proses panjang tanpa pengaman. Gunakan salah satu pendekatan berikut:

  • Tulis ke key sementara lalu rename/swap atomik.
  • Simpan versi data dan hanya tampilkan versi terakhir yang lengkap.
  • Tambahkan timestamp pembaruan dan fallback ke database bila cache terlalu tua.

Observability dasar yang sebaiknya ada sejak awal

Worker queue yang tampak berjalan baik bisa menyimpan masalah tersembunyi jika tidak dipantau. Minimal, kumpulkan metrik dan log berikut:

  • Queue depth: panjang queue:jobs.
  • Processing count: jumlah item di queue:jobs:processing.
  • Retry scheduled: jumlah item di queue:jobs:retry.
  • Dead-letter count.
  • Job latency: waktu dari enqueue sampai selesai.
  • Processing duration: lama handler berjalan.
  • Retry rate dan error rate per tipe job.

Pada level log, sertakan:

  • job_id
  • job_type
  • attempt
  • idempotency_key
  • resource_key
  • error ringkas dan konteks downstream

Kalau memakai tracing, buat span terpisah untuk fase enqueue, dequeue, acquire lock, business handler, dan ack. Ini sangat membantu saat menganalisis latensi dan bottleneck.

Langkah debugging di produksi

Saat ada laporan job tidak jalan, jalan dua kali, atau antrean menumpuk, urutkan investigasi seperti ini:

  1. Cek panjang queue: apakah queue:jobs terus naik? Jika ya, worker mungkin mati atau lambat.
  2. Cek processing set: apakah banyak job kedaluwarsa tetapi tidak kembali ke queue? Reaper mungkin gagal.
  3. Cek retry set: apakah job tertahan di jadwal retry karena scheduler tidak berjalan?
  4. Cek dead-letter: apakah banyak job berakhir dead dengan error yang sama? Biasanya ada bug payload atau dependency downstream rusak.
  5. Cek lock key: apakah lock tersisa terlalu lama? TTL mungkin terlalu panjang atau unlock gagal.
  6. Cek log per job_id: telusuri siklus hidup job dari enqueue sampai ack.
  7. Cek dependency eksternal: database, API pihak ketiga, SMTP, storage, atau rate limit.

Beberapa perintah Redis yang sering dipakai saat inspeksi:

LLEN queue:jobs
ZRANGE queue:jobs:processing 0 -1 WITHSCORES
ZRANGE queue:jobs:retry 0 -1 WITHSCORES
LRANGE queue:jobs:dead 0 50
GET job:{id}
TTL lock:job:{id}
TTL lock:resource:{key}

Hindari menghapus key secara manual tanpa memahami efek sampingnya. Misalnya, menghapus lock sembarangan bisa memicu dua worker memproses resource yang sama. Jika perlu intervensi manual, dokumentasikan prosedurnya.

Kapan pendekatan ini cukup, dan kapan perlu tool queue yang lebih khusus

Pendekatan Redis + Go Fiber seperti di atas cukup baik jika:

  • Anda ingin kontrol penuh atas alur job.
  • Volume masih masuk akal untuk Redis.
  • Kebutuhan utama adalah keandalan pragmatis, bukan fitur orkestrasi kompleks.

Namun, jika Anda butuh penjadwalan kompleks, workflow panjang multi-langkah, audit state yang sangat rinci, atau jaminan delivery yang lebih formal, pertimbangkan tool yang memang dibuat untuk itu. Redis queue kustom memberi fleksibilitas tinggi, tetapi tanggung jawab operasional juga pindah ke aplikasi Anda.

Penutup

Membangun Go Fiber worker queue andal dengan Redis bukan soal menambahkan list dan worker loop saja. Fondasi yang benar justru ada pada state in-flight, manual ack, visibility timeout, retry dengan backoff, dead-letter, idempotensi, dan lock untuk mencegah race yang merusak data.

Jika Anda hanya mengambil satu prinsip dari artikel ini, ambillah ini: anggap job bisa diproses ulang kapan saja. Dari prinsip itu, keputusan desain lain menjadi lebih jelas: payload harus cukup untuk replay, handler harus idempotent, lock hanya pelengkap, dan observability wajib ada sejak awal.