Pada aplikasi backend modern, tidak semua pekerjaan cocok dijalankan langsung di dalam request HTTP. Ada tugas yang lambat, rentan gagal, atau mengonsumsi resource besar, seperti mengirim email, membuat laporan PDF/CSV, sinkronisasi data, dan import file berukuran besar. Jika semua itu diproses sinkron di handler API, latensi naik, timeout mudah terjadi, dan pengalaman pengguna memburuk.

Di sinilah pola asynchronous job processing berguna. Dalam artikel ini, kita akan membangun alur kerja menggunakan Go Fiber v3 sebagai HTTP framework dan Redis sebagai antrian pekerjaan. Fokusnya bukan sekadar “bisa jalan”, tetapi bagaimana membuatnya andal: ada status job, timeout, retry, backoff, deduplikasi sederhana, dan observabilitas dasar.

Contoh implementasi akan memakai pendekatan praktis dengan Redis list dan hash agar konsepnya jelas. Di produksi, Anda juga bisa mempertimbangkan library khusus seperti Asynq atau sistem broker lain jika kebutuhan sudah lebih kompleks. Namun memahami fondasinya tetap penting.

Kapan Memakai Redis Queue dan Kapan Cukup Goroutine?

Banyak developer Go langsung berpikir: “kenapa tidak pakai goroutine saja?” Untuk beberapa kasus, itu memang cukup. Tetapi ada perbedaan mendasar antara pekerjaan di memori proses dan pekerjaan yang dimasukkan ke antrian terpisah.

Kapan goroutine biasa cukup

  • Pekerjaan sangat cepat dan tidak penting jika hilang saat proses restart.
  • Tidak butuh retry otomatis.
  • Tidak butuh pelacakan status job.
  • Aplikasi hanya berjalan pada satu instance dan beban rendah.

Contoh: membersihkan cache lokal, menulis log tambahan non-kritis, atau prefetch data yang tidak berdampak jika gagal.

Kapan Redis queue lebih tepat

  • Pekerjaan perlu bertahan walaupun server API restart.
  • Butuh retry, timeout, dan status tracking.
  • Worker ingin dipisah dari web server agar beban CPU/memori tidak mengganggu request HTTP.
  • Aplikasi berjalan di banyak instance.
  • Butuh kontrol terhadap laju konsumsi pekerjaan dan observabilitas.

Contoh nyata:

  • Pengiriman email: API cukup menerima request, lalu email dikirim worker di belakang layar.
  • Generate report: pengguna menerima job ID, lalu memeriksa status sampai file siap diunduh.
  • Import data besar: file diproses bertahap tanpa menahan koneksi HTTP lama.

Catatan: Redis queue bukan solusi untuk semua masalah. Jika Anda butuh urutan pesan yang kuat, retention lama, replay, atau throughput event streaming skala besar, Kafka atau broker lain mungkin lebih tepat. Redis unggul untuk job queue yang sederhana hingga menengah, cepat, dan mudah dioperasikan.

Arsitektur Sederhana yang Disarankan

Arsitektur minimal yang umum dipakai:

  1. API Fiber menerima request dari klien.
  2. API memvalidasi payload, membuat job ID, menyimpan metadata job ke Redis, lalu memasukkan job ke queue.
  3. Worker mengambil job dari Redis, memprosesnya, lalu memperbarui status job.
  4. Klien memanggil endpoint status untuk melihat progres atau hasil.

Komponen data di Redis yang praktis untuk tutorial ini:

  • List untuk queue, misalnya key: queue:jobs
  • Hash untuk metadata job, misalnya key: job:{id}
  • String/lock key untuk deduplikasi sederhana, misalnya key: dedupe:{fingerprint}

Status job yang berguna:

  • queued
  • processing
  • succeeded
  • failed

Tambahkan juga field seperti:

  • type
  • payload
  • attempt
  • max_attempts
  • created_at
  • started_at
  • finished_at
  • error
  • result
  • trace_id

Desain Payload Job yang Baik

Kesalahan umum adalah memasukkan seluruh objek domain mentah ke queue. Akibatnya payload membengkak, sulit diversioning, dan coupling antarlayanan meningkat. Lebih aman mendesain payload dengan prinsip berikut:

Simpan data secukupnya

Masukkan hanya informasi yang benar-benar diperlukan worker. Misalnya untuk email:

  • to
  • template
  • variables
  • request_id

Jika data bisa diambil ulang dari database menggunakan ID, sering kali lebih baik cukup kirim ID-nya.

Gunakan tipe job eksplisit

Daripada payload ambigu, gunakan field type seperti:

  • email.send
  • report.generate
  • import.customers

Ini memudahkan routing di worker.

Pikirkan idempoten

Worker bisa gagal setelah sebagian pekerjaan selesai. Jika job di-retry, apakah aman dijalankan lagi? Untuk email, idempoten lebih sulit karena pengiriman ganda berbahaya. Solusinya bisa dengan menyimpan message_key unik atau melakukan deduplikasi berdasarkan konteks bisnis.

Contoh struktur job

type Job struct {
    ID          string          `json:"id"`
    Type        string          `json:"type"`
    Payload     json.RawMessage `json:"payload"`
    Attempt     int             `json:"attempt"`
    MaxAttempts int             `json:"max_attempts"`
    TimeoutSec  int             `json:"timeout_sec"`
    TraceID     string          `json:"trace_id"`
    CreatedAt   time.Time       `json:"created_at"`
}

Contoh payload email:

type SendEmailPayload struct {
    To         string            `json:"to"`
    Subject    string            `json:"subject"`
    Template   string            `json:"template"`
    Variables  map[string]string `json:"variables"`
    MessageKey string            `json:"message_key"`
}

Implementasi API Enqueue dengan Go Fiber v3

Bagian ini menunjukkan implementasi dasar: endpoint untuk membuat job dan endpoint untuk membaca status job. Kode dibuat ringkas, tetapi tetap realistis.

Inisialisasi Fiber dan Redis client

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "time"

    "github.com/gofiber/fiber/v3"
    "github.com/google/uuid"
    "github.com/redis/go-redis/v9"
)

var rdb = redis.NewClient(&redis.Options{
    Addr: "localhost:6379",
})

const queueKey = "queue:jobs"

type Job struct {
    ID          string          `json:"id"`
    Type        string          `json:"type"`
    Payload     json.RawMessage `json:"payload"`
    Attempt     int             `json:"attempt"`
    MaxAttempts int             `json:"max_attempts"`
    TimeoutSec  int             `json:"timeout_sec"`
    TraceID     string          `json:"trace_id"`
    CreatedAt   time.Time       `json:"created_at"`
}

type SendEmailPayload struct {
    To         string            `json:"to"`
    Subject    string            `json:"subject"`
    Template   string            `json:"template"`
    Variables  map[string]string `json:"variables"`
    MessageKey string            `json:"message_key"`
}

Endpoint enqueue email job

func enqueueEmail(c fiber.Ctx) error {
    ctx := context.Background()

    var req SendEmailPayload
    if err := c.Bind().Body(&req); err != nil {
        return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
            "error": "payload tidak valid",
        })
    }
    if req.To == "" || req.Template == "" {
        return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
            "error": "to dan template wajib diisi",
        })
    }

    payloadBytes, _ := json.Marshal(req)
    jobID := uuid.NewString()
    traceID := uuid.NewString()

    job := Job{
        ID:          jobID,
        Type:        "email.send",
        Payload:     payloadBytes,
        Attempt:     0,
        MaxAttempts: 5,
        TimeoutSec:  30,
        TraceID:     traceID,
        CreatedAt:   time.Now().UTC(),
    }

    // Deduplikasi sederhana berbasis message_key
    if req.MessageKey != "" {
        dedupeKey := "dedupe:" + req.MessageKey
        ok, err := rdb.SetNX(ctx, dedupeKey, jobID, 10*time.Minute).Result()
        if err != nil {
            return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{"error": err.Error()})
        }
        if !ok {
            existingJobID, _ := rdb.Get(ctx, dedupeKey).Result()
            return c.Status(fiber.StatusConflict).JSON(fiber.Map{
                "error": "job duplikat",
                "job_id": existingJobID,
            })
        }
    }

    jobBytes, _ := json.Marshal(job)
    jobKey := "job:" + jobID

    pipe := rdb.TxPipeline()
    pipe.HSet(ctx, jobKey, map[string]any{
        "status":       "queued",
        "type":         job.Type,
        "payload":      string(job.Payload),
        "attempt":      job.Attempt,
        "max_attempts": job.MaxAttempts,
        "timeout_sec":  job.TimeoutSec,
        "trace_id":     job.TraceID,
        "created_at":   job.CreatedAt.Format(time.RFC3339),
    })
    pipe.Expire(ctx, jobKey, 24*time.Hour)
    pipe.RPush(ctx, queueKey, jobBytes)

    if _, err := pipe.Exec(ctx); err != nil {
        return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{"error": err.Error()})
    }

    return c.Status(fiber.StatusAccepted).JSON(fiber.Map{
        "job_id":   jobID,
        "status":   "queued",
        "trace_id": traceID,
    })
}

Poin penting pada handler di atas:

  • Mengembalikan 202 Accepted karena pekerjaan belum selesai.
  • Menyimpan metadata job sebelum atau bersamaan dengan enqueue.
  • Menggunakan deduplikasi sederhana dengan SETNX.
  • Memberi TTL pada metadata agar Redis tidak dipenuhi job lama selamanya.

Endpoint cek status job

func getJobStatus(c fiber.Ctx) error {
    ctx := context.Background()
    jobID := c.Params("id")
    jobKey := "job:" + jobID

    result, err := rdb.HGetAll(ctx, jobKey).Result()
    if err != nil {
        return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{"error": err.Error()})
    }
    if len(result) == 0 {
        return c.Status(fiber.StatusNotFound).JSON(fiber.Map{"error": "job tidak ditemukan"})
    }

    return c.JSON(result)
}

Registrasi route

func main() {
    app := fiber.New()

    app.Post("/jobs/email", enqueueEmail)
    app.Get("/jobs/:id", getJobStatus)

    log.Fatal(app.Listen(":3000"))
}

Membangun Worker: Timeout, Retry, dan Backoff

Worker bertugas mengambil job dari queue, menjalankan handler sesuai tipe job, lalu memperbarui status. Untuk implementasi dasar, kita bisa memakai BLPOP agar worker menunggu job tanpa polling agresif.

func workerLoop() {
    ctx := context.Background()

    for {
        res, err := rdb.BLPop(ctx, 5*time.Second, queueKey).Result()
        if err == redis.Nil {
            continue
        }
        if err != nil {
            log.Printf("BLPOP error: %v", err)
            time.Sleep(2 * time.Second)
            continue
        }

        var job Job
        if err := json.Unmarshal([]byte(res[1]), &job); err != nil {
            log.Printf("unmarshal job error: %v", err)
            continue
        }

        processJob(ctx, job)
    }
}

Handler utama job:

func processJob(ctx context.Context, job Job) {
    jobKey := "job:" + job.ID

    rdb.HSet(ctx, jobKey, map[string]any{
        "status":     "processing",
        "started_at": time.Now().UTC().Format(time.RFC3339),
        "attempt":    job.Attempt + 1,
    })

    timeout := time.Duration(job.TimeoutSec) * time.Second
    execCtx, cancel := context.WithTimeout(ctx, timeout)
    defer cancel()

    err := dispatchJob(execCtx, job)
    if err == nil {
        rdb.HSet(ctx, jobKey, map[string]any{
            "status":      "succeeded",
            "finished_at": time.Now().UTC().Format(time.RFC3339),
            "error":       "",
        })
        return
    }

    attempts := job.Attempt + 1
    if attempts >= job.MaxAttempts {
        rdb.HSet(ctx, jobKey, map[string]any{
            "status":      "failed",
            "finished_at": time.Now().UTC().Format(time.RFC3339),
            "error":       err.Error(),
        })
        return
    }

    backoff := time.Duration(attempts*attempts) * time.Second
    job.Attempt = attempts
    jobBytes, _ := json.Marshal(job)

    rdb.HSet(ctx, jobKey, map[string]any{
        "status": "queued",
        "error":  err.Error(),
    })

    go func() {
        time.Sleep(backoff)
        rdb.RPush(context.Background(), queueKey, jobBytes)
    }()
}

Fungsi dispatcher:

func dispatchJob(ctx context.Context, job Job) error {
    switch job.Type {
    case "email.send":
        var p SendEmailPayload
        if err := json.Unmarshal(job.Payload, &p); err != nil {
            return err
        }
        return sendEmail(ctx, p)
    default:
        return fmt.Errorf("job type tidak dikenal: %s", job.Type)
    }
}

func sendEmail(ctx context.Context, p SendEmailPayload) error {
    // Ganti dengan provider email sesungguhnya.
    // Pastikan operasi mematuhi ctx untuk timeout/cancel.
    select {
    case <-time.After(2 * time.Second):
        log.Printf("email sent to=%s template=%s", p.To, p.Template)
        return nil
    case <-ctx.Done():
        return ctx.Err()
    }
}

Mengapa timeout penting?

Tanpa timeout, worker bisa menggantung pada panggilan jaringan atau operasi I/O yang tidak selesai. Akibatnya throughput turun dan queue macet. Dengan context.WithTimeout, kita memastikan satu job tidak menahan worker selamanya.

Mengapa retry perlu backoff?

Retry tanpa jeda dapat memperparah gangguan, misalnya saat SMTP server atau storage sedang bermasalah. Backoff memberi waktu bagi dependensi untuk pulih dan mencegah retry storm. Pola yang umum adalah exponential backoff ditambah jitter. Contoh di atas memakai kuadrat sederhana agar mudah dibaca; untuk produksi, tambahkan jitter acak.

Deduplikasi Sederhana dan Idempoten

Deduplikasi sering disalahpahami. Deduplikasi tidak sama dengan idempoten. Deduplikasi mencegah job yang sama dimasukkan terlalu sering dalam jangka tertentu. Idempoten memastikan jika job yang sama tetap terproses dua kali, hasil akhirnya tidak merusak sistem.

Deduplikasi sederhana dengan Redis

Pada endpoint enqueue, kita menggunakan SETNX untuk key seperti dedupe:{message_key}. Jika key sudah ada, artinya job serupa baru saja dibuat. Teknik ini sederhana dan efektif untuk mencegah klik ganda atau request berulang dari client.

Keterbatasannya

  • Bergantung pada kualitas message_key.
  • Hanya berlaku dalam jendela TTL tertentu.
  • Tidak otomatis membuat proses bisnis menjadi idempoten.

Untuk email, Anda bisa menyimpan message_key ke database log pengiriman dan menolak pengiriman ulang dengan kunci yang sama. Untuk report, gunakan kombinasi parameter report + user + rentang waktu sebagai fingerprint.

Observabilitas Job: Apa yang Harus Dicatat?

Sistem asinkron sulit dioperasikan tanpa observabilitas. Minimal, sediakan:

Status dan metadata job

API status harus bisa menampilkan:

  • status sekarang
  • jumlah attempt
  • waktu dibuat, mulai, selesai
  • pesan error terakhir
  • hasil jika ada, misalnya URL report

Structured logging

Sertakan job_id, trace_id, dan job_type dalam log. Ini memudahkan korelasi antara request API dan eksekusi worker.

Metric dasar

Jika menggunakan Prometheus atau sistem serupa, pantau:

  • jumlah job masuk per tipe
  • jumlah job sukses/gagal
  • durasi eksekusi
  • retry count
  • queue length

Queue length penting untuk mendeteksi backlog. Jika antrean terus bertambah, worker mungkin kurang banyak, ada bottleneck dependensi, atau ada job yang terlalu berat.

Tracing

Jika stack Anda mendukung OpenTelemetry, teruskan trace_id dari request API ke metadata job lalu gunakan di worker. Walaupun tidak selalu membentuk trace distributed penuh, ini tetap membantu investigasi.

Use Case Praktis: Email, Report, dan Import Data

1. Pengiriman email

Cocok untuk asynchronous queue karena pengguna tidak perlu menunggu SMTP/provider email merespons. Pastikan ada timeout, retry terbatas, dan deduplikasi jika email tidak boleh terkirim ganda.

2. Generate report

Request API bisa mengembalikan job_id. Worker membaca parameter report, mengambil data dari database, membuat file CSV/PDF, mengunggah hasil ke object storage, lalu menyimpan URL file pada field result. Endpoint status dapat mengembalikan URL saat status succeeded.

3. Import data besar

Alih-alih parsing seluruh file di request handler, simpan file lebih dulu ke storage, lalu kirim job berisi lokasi file dan konteks user. Worker memproses bertahap, mencatat progres, dan menghindari timeout HTTP yang panjang.

Trade-off, Kesalahan Umum, dan Tips Debugging

Trade-off memakai Redis queue

  • Kelebihan: cepat, sederhana, mudah dioperasikan, cocok untuk job queue umum.
  • Kekurangan: implementasi keandalan lanjutan perlu perhatian ekstra, terutama untuk visibility timeout, dead-letter queue, dan recovery worker crash.

Pada implementasi tutorial ini, ada keterbatasan penting: jika worker mengambil job lalu proses mati sebelum status diperbarui atau job di-requeue, job bisa hilang dari list. Untuk produksi, pertimbangkan pola processing queue, Redis Streams, atau library queue yang sudah menangani ack/retry semantics dengan lebih matang.

Kesalahan umum

  • Menjalankan job berat langsung di handler HTTP.
  • Tidak menetapkan timeout untuk panggilan eksternal.
  • Retry tanpa batas.
  • Tidak menyimpan status job sehingga klien buta terhadap progres.
  • Payload terlalu besar dan sensitif.
  • Tidak memikirkan idempoten.

Tips debugging

  • Cek isi Redis dengan LLEN queue:jobs, HGETALL job:{id}.
  • Log setiap transisi status: queued → processing → succeeded/failed.
  • Tambahkan field error terakhir dan waktu attempt terakhir.
  • Pastikan worker memakai context timeout untuk panggilan HTTP/SMTP/database.
  • Jika backlog tinggi, ukur durasi tiap tipe job dan pisahkan worker berdasarkan jenis pekerjaan.

Penutup

Menggabungkan Go Fiber v3 dengan Redis queue adalah pendekatan praktis untuk memindahkan pekerjaan berat atau lambat dari jalur request HTTP ke proses asinkron yang lebih terkendali. Dibanding goroutine biasa, Redis queue memberi keuntungan berupa persistensi, retry, pemantauan status, dan pemisahan beban antara API dan worker.

Kunci desain yang baik ada pada beberapa hal: payload yang ramping dan eksplisit, timeout yang tegas, retry dengan backoff, deduplikasi sederhana untuk mencegah duplikasi jelas, serta observabilitas yang memadai. Untuk use case seperti pengiriman email, generate report, dan import data besar, pola ini sangat relevan.

Jika kebutuhan Anda berkembang ke skenario yang lebih kompleks, seperti dead-letter queue, scheduling, concurrency control per tipe job, atau recovery yang lebih kuat saat worker crash, pertimbangkan abstractions yang lebih matang. Namun sebagai fondasi, pola pada artikel ini sudah cukup untuk membangun sistem asinkron yang rapi, praktis, dan jauh lebih andal daripada sekadar menembakkan goroutine dari handler HTTP.