queue system settings expanded. Update debug to goffee log system
This commit is contained in:
parent
fc57baff78
commit
f4476d3e89
5 changed files with 74 additions and 4 deletions
|
|
@ -16,5 +16,15 @@ func GetQueueConfig() core.QueueConfig {
|
|||
// For enabling and disabling the queue system
|
||||
// set to true to enable it, set to false to disable
|
||||
EnableQueue: false,
|
||||
|
||||
// Number of concurrent workers processing tasks
|
||||
Concurrency: 10,
|
||||
|
||||
// Queue names with priority weights (higher number = higher priority)
|
||||
Queues: map[string]int{
|
||||
"critical": 6,
|
||||
"default": 3,
|
||||
"low": 1,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
|
|||
55
controllers/queuesample.go
Normal file
55
controllers/queuesample.go
Normal file
|
|
@ -0,0 +1,55 @@
|
|||
// Copyright (c) 2025 Zeni Kim <zenik@smarteching.com>
|
||||
// Use of this source code is governed by MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package controllers
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"git.smarteching.com/goffee/core"
|
||||
"git.smarteching.com/goffee/cup/workers"
|
||||
"github.com/hibiken/asynq"
|
||||
)
|
||||
|
||||
// Make samples queues
|
||||
func Queuesample(c *core.Context) *core.Response {
|
||||
|
||||
// Get client queue asynq
|
||||
client := c.GetQueueClient()
|
||||
|
||||
// Create a task with typename and payload.
|
||||
payload, err := json.Marshal(workers.EmailTaskPayload{UserID: 42})
|
||||
if err != nil {
|
||||
c.GetLogger().Error(err.Error())
|
||||
return c.Response.SetStatusCode(500).Json(`{"message": "internal error"}`)
|
||||
}
|
||||
|
||||
t1 := asynq.NewTask(workers.TypeWelcomeEmail, payload)
|
||||
|
||||
t2 := asynq.NewTask(workers.TypeReminderEmail, payload)
|
||||
|
||||
// Process the task immediately.
|
||||
info, err := client.Enqueue(t1)
|
||||
if err != nil {
|
||||
c.GetLogger().Error(err.Error())
|
||||
return c.Response.SetStatusCode(500).Json(`{"message": "internal error"}`)
|
||||
}
|
||||
c.GetLogger().Info(fmt.Sprintf(" [*] Successfully enqueued task: %+v", info))
|
||||
|
||||
// Process 2 task 1 min later.
|
||||
for i := 1; i < 3; i++ {
|
||||
info, err = client.Enqueue(t2, asynq.ProcessIn(1*time.Minute))
|
||||
if err != nil {
|
||||
c.GetLogger().Error(err.Error())
|
||||
return c.Response.SetStatusCode(500).Json(`{"message": "internal error"}`)
|
||||
}
|
||||
c.GetLogger().Info(fmt.Sprintf(" [*] Successfully enqueued task: %+v", info))
|
||||
}
|
||||
|
||||
message := "{\"message\": \"Task queued\"}"
|
||||
return c.Response.Json(message)
|
||||
|
||||
}
|
||||
|
|
@ -6,6 +6,7 @@ package main
|
|||
|
||||
import (
|
||||
"git.smarteching.com/goffee/core"
|
||||
"git.smarteching.com/goffee/cup/config"
|
||||
"git.smarteching.com/goffee/cup/workers"
|
||||
)
|
||||
|
||||
|
|
@ -26,5 +27,5 @@ func registerQueues() {
|
|||
//########################################
|
||||
// Start queue server, DO NOT TOUCH
|
||||
//########################################
|
||||
go queque.RunQueueserver()
|
||||
go queque.RunQueueserver(config.GetQueueConfig())
|
||||
}
|
||||
|
|
|
|||
|
|
@ -28,6 +28,9 @@ func registerRoutes() {
|
|||
controller.Post("/reset-password", controllers.ResetPasswordRequest)
|
||||
controller.Post("/reset-password/code/:code", controllers.SetNewPassword)
|
||||
|
||||
// queue sample route
|
||||
controller.Get("/queuesample", controllers.Queuesample)
|
||||
|
||||
// Uncomment the lines below to enable user administration
|
||||
controller.Get("/admin/users", controllers.AdminUsersList)
|
||||
controller.Post("/admin/users", controllers.AdminUsersList)
|
||||
|
|
|
|||
|
|
@ -3,8 +3,9 @@ package workers
|
|||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"log"
|
||||
"fmt"
|
||||
|
||||
"git.smarteching.com/goffee/core/logger"
|
||||
"github.com/hibiken/asynq"
|
||||
)
|
||||
|
||||
|
|
@ -25,7 +26,7 @@ func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
|
|||
if err := json.Unmarshal(t.Payload(), &p); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Printf(" [*] Send Welcome Email to User %d", p.UserID)
|
||||
logger.ResolveLogger().Info(fmt.Sprintf(" [*] Send Welcome Email to User %d", p.UserID))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -34,6 +35,6 @@ func HandleReminderEmailTask(ctx context.Context, t *asynq.Task) error {
|
|||
if err := json.Unmarshal(t.Payload(), &p); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Printf(" [*] Send Reminder Email to User %d", p.UserID)
|
||||
logger.ResolveLogger().Info(fmt.Sprintf(" [*] Send Reminder Email to User %d", p.UserID))
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue