forked from goffee/core
add base queue system
This commit is contained in:
parent
4968da25f3
commit
b274d3268f
4 changed files with 82 additions and 2 deletions
49
queue.go
Normal file
49
queue.go
Normal file
|
|
@ -0,0 +1,49 @@
|
|||
// Copyright (c) 2024 Zeni Kim <zenik@smarteching.com>
|
||||
// Use of this source code is governed by MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package core
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
|
||||
"github.com/hibiken/asynq"
|
||||
)
|
||||
|
||||
type Asynqtask func(context.Context, *asynq.Task) error
|
||||
|
||||
type Queuemux struct {
|
||||
themux *asynq.ServeMux
|
||||
}
|
||||
|
||||
func (q *Queuemux) QueueInit() {
|
||||
q.themux = asynq.NewServeMux()
|
||||
}
|
||||
|
||||
func (q *Queuemux) AddWork(pattern string, work Asynqtask) {
|
||||
q.themux.HandleFunc(pattern, work)
|
||||
}
|
||||
|
||||
func (q *Queuemux) RunQueueserver() {
|
||||
|
||||
redisAddr := fmt.Sprintf("%v:%v", os.Getenv("REDIS_HOST"), os.Getenv("REDIS_PORT"))
|
||||
|
||||
srv := asynq.NewServer(
|
||||
asynq.RedisClientOpt{Addr: redisAddr},
|
||||
asynq.Config{Concurrency: 10,
|
||||
// Optionally specify multiple queues with different priority.
|
||||
Queues: map[string]int{
|
||||
"critical": 6,
|
||||
"default": 3,
|
||||
"low": 1,
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
if err := srv.Run(q.themux); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue