62 lines
1.3 KiB
Go
62 lines
1.3 KiB
Go
// Copyright (c) 2025 Zeni Kim <zenik@smarteching.com>
|
|
// Use of this source code is governed by MIT-style
|
|
// license that can be found in the LICENSE file.
|
|
|
|
package core
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
|
|
"github.com/hibiken/asynq"
|
|
)
|
|
|
|
type Asynqtask func(context.Context, *asynq.Task) error
|
|
|
|
type Queuemux struct {
|
|
themux *asynq.ServeMux
|
|
}
|
|
|
|
func (q *Queuemux) QueueInit() {
|
|
q.themux = asynq.NewServeMux()
|
|
}
|
|
|
|
func (q *Queuemux) AddWork(pattern string, work Asynqtask) {
|
|
q.themux.HandleFunc(pattern, work)
|
|
}
|
|
|
|
// RunQueueserver starts the queue server with the given configuration and handles tasks using the assigned ServeMux.
|
|
// Configures the queue server with concurrency limits and priority-based queue management.
|
|
// Logs and terminates the program if the server fails to run.
|
|
func (q *Queuemux) RunQueueserver(config QueueConfig) {
|
|
|
|
redisAddr := fmt.Sprintf("%v:%v", os.Getenv("REDIS_HOST"), os.Getenv("REDIS_PORT"))
|
|
|
|
queues := config.Queues
|
|
if queues == nil {
|
|
queues = map[string]int{
|
|
"critical": 6,
|
|
"default": 3,
|
|
"low": 1,
|
|
}
|
|
}
|
|
|
|
concurrency := config.Concurrency
|
|
if concurrency <= 0 {
|
|
concurrency = 10
|
|
}
|
|
|
|
srv := asynq.NewServer(
|
|
asynq.RedisClientOpt{Addr: redisAddr},
|
|
asynq.Config{
|
|
Concurrency: concurrency,
|
|
Queues: queues,
|
|
},
|
|
)
|
|
|
|
if err := srv.Run(q.themux); err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
}
|