Skip to Content

Queue

The queue package provides async task processing backed by Redis using hibiken/asynq . It supports enqueuing tasks for background processing, handler registration, configurable concurrency, and named queue priorities.

Import

import "github.com/gofastadev/gofasta/pkg/queue"

Key Types

QueueService

type QueueService interface { Enqueue(ctx context.Context, taskName string, payload []byte, opts ...asynq.Option) (*asynq.TaskInfo, error) RegisterHandler(pattern string, handler asynq.Handler) Start() error Shutdown() }

AsynqQueue

type AsynqQueue struct { client *asynq.Client server *asynq.Server mux *asynq.ServeMux }

Key Functions

FunctionSignatureDescription
NewQueueServicefunc NewQueueService(cfg *config.QueueConfig, logger *slog.Logger) (QueueService, error)Creates a queue service from config; returns nil if disabled
NewAsynqQueuefunc NewAsynqQueue(cfg *config.QueueConfig) (*AsynqQueue, error)Creates an asynq-based queue with Redis connection
Enqueuefunc (q *AsynqQueue) Enqueue(ctx context.Context, taskName string, payload []byte, opts ...asynq.Option) (*asynq.TaskInfo, error)Enqueues a task for background processing
RegisterHandlerfunc (q *AsynqQueue) RegisterHandler(pattern string, handler asynq.Handler)Registers a handler for a task pattern
Startfunc (q *AsynqQueue) Start() errorStarts processing tasks
Shutdownfunc (q *AsynqQueue) Shutdown()Gracefully shuts down the queue server and client

Task Handlers

Task handlers implement the asynq.Handler interface with a Handle(ctx context.Context, task *asynq.Task) error method. You can also use asynq.HandlerFunc for simple cases.

// As a struct implementing asynq.Handler type SendEmailHandler struct { mailClient *mailer.Client } func (h *SendEmailHandler) Handle(ctx context.Context, task *asynq.Task) error { var payload EmailPayload if err := json.Unmarshal(task.Payload(), &payload); err != nil { return err } return h.mailClient.Send(ctx, payload.To, payload.Subject, payload.Body) }

Each task type typically has an Enqueue helper:

const TaskSendEmail = "email:send" func EnqueueSendEmail(q queue.QueueService, ctx context.Context, to, subject, body string) error { payload, _ := json.Marshal(EmailPayload{To: to, Subject: subject, Body: body}) _, err := q.Enqueue(ctx, TaskSendEmail, payload) return err }

Usage

Setting Up the Queue

queueService, err := queue.NewQueueService(&cfg.Queue, logger) if err != nil { log.Fatalf("failed to create queue: %v", err) } if queueService == nil { logger.Info("queue is disabled") }

Registering Handlers

queueService.RegisterHandler("email:send", &SendEmailHandler{ mailClient: mailClient, }) queueService.RegisterHandler("image:process", asynq.HandlerFunc( func(ctx context.Context, task *asynq.Task) error { // process the image... return nil }, ))

Enqueuing Tasks

// Immediate execution payload, _ := json.Marshal(map[string]string{ "to": "user@example.com", "subject": "Welcome!", "body": "Thanks for signing up.", }) _, err := queueService.Enqueue(ctx, "email:send", payload) // With options (delay, max retries, queue name) _, err = queueService.Enqueue(ctx, "email:send_reminder", payload, asynq.ProcessIn(24*time.Hour), asynq.MaxRetry(5), asynq.Queue("critical"), )

Starting and Stopping Workers

// Start processing tasks (blocks) go func() { if err := queueService.Start(); err != nil { log.Fatalf("queue error: %v", err) } }() // Graceful shutdown quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) <-quit queueService.Shutdown()

Configuration via config.yaml

queue: enabled: true concurrency: 10 queues: critical: 6 default: 3 low: 1 redis: host: localhost port: "6379" password: "" db: 0

Environment variables use the GOFASTA_ prefix (e.g., GOFASTA_QUEUE_CONCURRENCY, GOFASTA_QUEUE_REDIS_HOST).

  • Scheduler — Cron-based scheduling pairs well with queued jobs
  • Mailer — Offload email sending to the queue
  • Resilience — Retry and circuit breaker for job handlers
Last updated on