Queue
The queue package provides async task processing backed by Redis using hibiken/asynq . It supports enqueuing tasks for background processing, handler registration, configurable concurrency, and named queue priorities.
Import
import "github.com/gofastadev/gofasta/pkg/queue"Key Types
QueueService
type QueueService interface {
Enqueue(ctx context.Context, taskName string, payload []byte, opts ...asynq.Option) (*asynq.TaskInfo, error)
RegisterHandler(pattern string, handler asynq.Handler)
Start() error
Shutdown()
}AsynqQueue
type AsynqQueue struct {
client *asynq.Client
server *asynq.Server
mux *asynq.ServeMux
}Key Functions
| Function | Signature | Description |
|---|---|---|
NewQueueService | func NewQueueService(cfg *config.QueueConfig, logger *slog.Logger) (QueueService, error) | Creates a queue service from config; returns nil if disabled |
NewAsynqQueue | func NewAsynqQueue(cfg *config.QueueConfig) (*AsynqQueue, error) | Creates an asynq-based queue with Redis connection |
Enqueue | func (q *AsynqQueue) Enqueue(ctx context.Context, taskName string, payload []byte, opts ...asynq.Option) (*asynq.TaskInfo, error) | Enqueues a task for background processing |
RegisterHandler | func (q *AsynqQueue) RegisterHandler(pattern string, handler asynq.Handler) | Registers a handler for a task pattern |
Start | func (q *AsynqQueue) Start() error | Starts processing tasks |
Shutdown | func (q *AsynqQueue) Shutdown() | Gracefully shuts down the queue server and client |
Task Handlers
Task handlers implement the asynq.Handler interface with a Handle(ctx context.Context, task *asynq.Task) error method. You can also use asynq.HandlerFunc for simple cases.
// As a struct implementing asynq.Handler
type SendEmailHandler struct {
mailClient *mailer.Client
}
func (h *SendEmailHandler) Handle(ctx context.Context, task *asynq.Task) error {
var payload EmailPayload
if err := json.Unmarshal(task.Payload(), &payload); err != nil {
return err
}
return h.mailClient.Send(ctx, payload.To, payload.Subject, payload.Body)
}Each task type typically has an Enqueue helper:
const TaskSendEmail = "email:send"
func EnqueueSendEmail(q queue.QueueService, ctx context.Context, to, subject, body string) error {
payload, _ := json.Marshal(EmailPayload{To: to, Subject: subject, Body: body})
_, err := q.Enqueue(ctx, TaskSendEmail, payload)
return err
}Usage
Setting Up the Queue
queueService, err := queue.NewQueueService(&cfg.Queue, logger)
if err != nil {
log.Fatalf("failed to create queue: %v", err)
}
if queueService == nil {
logger.Info("queue is disabled")
}Registering Handlers
queueService.RegisterHandler("email:send", &SendEmailHandler{
mailClient: mailClient,
})
queueService.RegisterHandler("image:process", asynq.HandlerFunc(
func(ctx context.Context, task *asynq.Task) error {
// process the image...
return nil
},
))Enqueuing Tasks
// Immediate execution
payload, _ := json.Marshal(map[string]string{
"to": "user@example.com",
"subject": "Welcome!",
"body": "Thanks for signing up.",
})
_, err := queueService.Enqueue(ctx, "email:send", payload)
// With options (delay, max retries, queue name)
_, err = queueService.Enqueue(ctx, "email:send_reminder", payload,
asynq.ProcessIn(24*time.Hour),
asynq.MaxRetry(5),
asynq.Queue("critical"),
)Starting and Stopping Workers
// Start processing tasks (blocks)
go func() {
if err := queueService.Start(); err != nil {
log.Fatalf("queue error: %v", err)
}
}()
// Graceful shutdown
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
queueService.Shutdown()Configuration via config.yaml
queue:
enabled: true
concurrency: 10
queues:
critical: 6
default: 3
low: 1
redis:
host: localhost
port: "6379"
password: ""
db: 0Environment variables use the GOFASTA_ prefix (e.g., GOFASTA_QUEUE_CONCURRENCY, GOFASTA_QUEUE_REDIS_HOST).
Related Pages
- Scheduler — Cron-based scheduling pairs well with queued jobs
- Mailer — Offload email sending to the queue
- Resilience — Retry and circuit breaker for job handlers
Last updated on