From f4476d3e89207dae1581d36f31ac6d0f1efc90a9 Mon Sep 17 00:00:00 2001 From: Zeni Kim Date: Mon, 18 May 2026 22:37:26 -0500 Subject: [PATCH] queue system settings expanded. Update debug to goffee log system --- config/queue.go | 10 +++++++ controllers/queuesample.go | 55 ++++++++++++++++++++++++++++++++++++++ register-queues.go | 3 ++- routes.go | 3 +++ workers/workers.go | 7 ++--- 5 files changed, 74 insertions(+), 4 deletions(-) create mode 100644 controllers/queuesample.go diff --git a/config/queue.go b/config/queue.go index 8060572..0c01453 100644 --- a/config/queue.go +++ b/config/queue.go @@ -16,5 +16,15 @@ func GetQueueConfig() core.QueueConfig { // For enabling and disabling the queue system // set to true to enable it, set to false to disable EnableQueue: false, + + // Number of concurrent workers processing tasks + Concurrency: 10, + + // Queue names with priority weights (higher number = higher priority) + Queues: map[string]int{ + "critical": 6, + "default": 3, + "low": 1, + }, } } diff --git a/controllers/queuesample.go b/controllers/queuesample.go new file mode 100644 index 0000000..0745a17 --- /dev/null +++ b/controllers/queuesample.go @@ -0,0 +1,55 @@ +// Copyright (c) 2025 Zeni Kim +// Use of this source code is governed by MIT-style +// license that can be found in the LICENSE file. + +package controllers + +import ( + "encoding/json" + "fmt" + "time" + + "git.smarteching.com/goffee/core" + "git.smarteching.com/goffee/cup/workers" + "github.com/hibiken/asynq" +) + +// Make samples queues +func Queuesample(c *core.Context) *core.Response { + + // Get client queue asynq + client := c.GetQueueClient() + + // Create a task with typename and payload. + payload, err := json.Marshal(workers.EmailTaskPayload{UserID: 42}) + if err != nil { + c.GetLogger().Error(err.Error()) + return c.Response.SetStatusCode(500).Json(`{"message": "internal error"}`) + } + + t1 := asynq.NewTask(workers.TypeWelcomeEmail, payload) + + t2 := asynq.NewTask(workers.TypeReminderEmail, payload) + + // Process the task immediately. + info, err := client.Enqueue(t1) + if err != nil { + c.GetLogger().Error(err.Error()) + return c.Response.SetStatusCode(500).Json(`{"message": "internal error"}`) + } + c.GetLogger().Info(fmt.Sprintf(" [*] Successfully enqueued task: %+v", info)) + + // Process 2 task 1 min later. + for i := 1; i < 3; i++ { + info, err = client.Enqueue(t2, asynq.ProcessIn(1*time.Minute)) + if err != nil { + c.GetLogger().Error(err.Error()) + return c.Response.SetStatusCode(500).Json(`{"message": "internal error"}`) + } + c.GetLogger().Info(fmt.Sprintf(" [*] Successfully enqueued task: %+v", info)) + } + + message := "{\"message\": \"Task queued\"}" + return c.Response.Json(message) + +} diff --git a/register-queues.go b/register-queues.go index a1cf059..169ad84 100644 --- a/register-queues.go +++ b/register-queues.go @@ -6,6 +6,7 @@ package main import ( "git.smarteching.com/goffee/core" + "git.smarteching.com/goffee/cup/config" "git.smarteching.com/goffee/cup/workers" ) @@ -26,5 +27,5 @@ func registerQueues() { //######################################## // Start queue server, DO NOT TOUCH //######################################## - go queque.RunQueueserver() + go queque.RunQueueserver(config.GetQueueConfig()) } diff --git a/routes.go b/routes.go index faaf06b..b6b1f3b 100644 --- a/routes.go +++ b/routes.go @@ -28,6 +28,9 @@ func registerRoutes() { controller.Post("/reset-password", controllers.ResetPasswordRequest) controller.Post("/reset-password/code/:code", controllers.SetNewPassword) + // queue sample route + controller.Get("/queuesample", controllers.Queuesample) + // Uncomment the lines below to enable user administration controller.Get("/admin/users", controllers.AdminUsersList) controller.Post("/admin/users", controllers.AdminUsersList) diff --git a/workers/workers.go b/workers/workers.go index e7a9cad..6edde6f 100644 --- a/workers/workers.go +++ b/workers/workers.go @@ -3,8 +3,9 @@ package workers import ( "context" "encoding/json" - "log" + "fmt" + "git.smarteching.com/goffee/core/logger" "github.com/hibiken/asynq" ) @@ -25,7 +26,7 @@ func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error { if err := json.Unmarshal(t.Payload(), &p); err != nil { return err } - log.Printf(" [*] Send Welcome Email to User %d", p.UserID) + logger.ResolveLogger().Info(fmt.Sprintf(" [*] Send Welcome Email to User %d", p.UserID)) return nil } @@ -34,6 +35,6 @@ func HandleReminderEmailTask(ctx context.Context, t *asynq.Task) error { if err := json.Unmarshal(t.Payload(), &p); err != nil { return err } - log.Printf(" [*] Send Reminder Email to User %d", p.UserID) + logger.ResolveLogger().Info(fmt.Sprintf(" [*] Send Reminder Email to User %d", p.UserID)) return nil }