diff --git a/config/queue.go b/config/queue.go new file mode 100644 index 0000000..8060572 --- /dev/null +++ b/config/queue.go @@ -0,0 +1,20 @@ +// Copyright (c) 2025 Zeni Kim +// Use of this source code is governed by MIT-style +// license that can be found in the LICENSE file. + +package config + +import "git.smarteching.com/goffee/core" + +// Retrieve the main config for the Queue +func GetQueueConfig() core.QueueConfig { + //##################################### + //# Main configuration for Queue ##### + //##################################### + + return core.QueueConfig{ + // For enabling and disabling the queue system + // set to true to enable it, set to false to disable + EnableQueue: false, + } +} diff --git a/controllers/queuesample.go b/controllers/queuesample.go new file mode 100644 index 0000000..d289aee --- /dev/null +++ b/controllers/queuesample.go @@ -0,0 +1,52 @@ +// Copyright (c) 2025 Zeni Kim +// Use of this source code is governed by MIT-style +// license that can be found in the LICENSE file. + +package controllers + +import ( + "encoding/json" + "log" + "time" + + "git.smarteching.com/goffee/core" + "git.smarteching.com/goffee/cup/workers" + "github.com/hibiken/asynq" +) + +// Make samples queues +func Queuesample(c *core.Context) *core.Response { + + // Get client queue asynq + client := c.GetQueueClient() + + // Create a task with typename and payload. + payload, err := json.Marshal(workers.EmailTaskPayload{UserID: 42}) + if err != nil { + log.Fatal(err) + } + + t1 := asynq.NewTask(workers.TypeWelcomeEmail, payload) + + t2 := asynq.NewTask(workers.TypeReminderEmail, payload) + + // Process the task immediately. + info, err := client.Enqueue(t1) + if err != nil { + log.Fatal(err) + } + log.Printf(" [*] Successfully enqueued task: %+v", info) + + // Process 2 task 1 min later. + for i := 1; i < 3; i++ { + info, err = client.Enqueue(t2, asynq.ProcessIn(1*time.Minute)) + if err != nil { + log.Fatal(err) + } + log.Printf(" [*] Successfully enqueued task: %+v", info) + } + + message := "{\"message\": \"Task queued\"}" + return c.Response.Json(message) + +} diff --git a/go.mod b/go.mod index e70b070..2e7078c 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ go 1.23.1 require ( git.smarteching.com/goffee/core v1.8.4 github.com/google/uuid v1.6.0 + github.com/hibiken/asynq v0.25.1 github.com/joho/godotenv v1.5.1 github.com/julienschmidt/httprouter v1.3.0 gorm.io/gorm v1.25.12 @@ -44,14 +45,19 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/redis/go-redis/v9 v9.7.0 // indirect + github.com/robfig/cron/v3 v3.0.1 // indirect github.com/sendgrid/rest v2.6.9+incompatible // indirect github.com/sendgrid/sendgrid-go v3.16.0+incompatible // indirect github.com/sirupsen/logrus v1.9.3 // indirect + github.com/spf13/cast v1.7.0 // indirect golang.org/x/crypto v0.28.0 // indirect golang.org/x/image v0.21.0 // indirect golang.org/x/net v0.30.0 // indirect golang.org/x/sync v0.8.0 // indirect + golang.org/x/sys v0.27.0 // indirect golang.org/x/text v0.19.0 // indirect + golang.org/x/time v0.8.0 // indirect + google.golang.org/protobuf v1.35.2 // indirect gorm.io/driver/mysql v1.5.7 // indirect gorm.io/driver/postgres v1.5.9 // indirect gorm.io/driver/sqlite v1.5.6 // indirect diff --git a/go.sum b/go.sum index be301b7..ca95d88 100644 --- a/go.sum +++ b/go.sum @@ -47,6 +47,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/harranali/mailing v1.2.0 h1:ihIyJwB8hyRVcdk+v465wk1PHMrSrgJqo/kMd+gZClY= github.com/harranali/mailing v1.2.0/go.mod h1:4a5N3yG98pZKluMpmcYlTtll7bisvOfGQEMIng3VQk4= +github.com/hibiken/asynq v0.25.1 h1:phj028N0nm15n8O2ims+IvJ2gz4k2auvermngh9JhTw= +github.com/hibiken/asynq v0.25.1/go.mod h1:pazWNOLBu0FEynQRBvHA26qdIKRSmfdIfUm4HdsLmXg= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= @@ -89,6 +91,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E= github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/saintfish/chardet v0.0.0-20120816061221-3af4cd4741ca/go.mod h1:uugorj2VCxiV1x+LzaIdVa9b4S4qGAcH6cbhh4qVxOU= github.com/sendgrid/rest v2.6.9+incompatible h1:1EyIcsNdn9KIisLW50MKwmSRSK+ekueiEMJ7NEoxJo0= github.com/sendgrid/rest v2.6.9+incompatible/go.mod h1:kXX7q3jZtJXK5c5qK83bSGMdV6tsOE70KbHoqJls4lE= @@ -96,6 +100,8 @@ github.com/sendgrid/sendgrid-go v3.16.0+incompatible h1:i8eE6IMkiCy7vusSdacHHSBU github.com/sendgrid/sendgrid-go v3.16.0+incompatible/go.mod h1:QRQt+LX/NmgVEvmdRw0VT/QgUn499+iza2FnDca9fg8= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/spf13/cast v1.7.0 h1:ntdiHjuueXFgm5nzDRdOS4yfT43P5Fnud6DH50rz/7w= +github.com/spf13/cast v1.7.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo= github.com/ssor/bom v0.0.0-20170718123548-6386211fdfcf/go.mod h1:RJID2RhlZKId02nZ62WenDCkgHFerpIOmW0iT7GKmXM= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= @@ -116,11 +122,17 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s= +golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM= golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/time v0.8.0 h1:9i3RxcPv3PZnitoVGMPDKZSq1xW1gK1Xy3ArNOGZfEg= +golang.org/x/time v0.8.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io= +google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/main.go b/main.go index 4b9c956..5fac06e 100644 --- a/main.go +++ b/main.go @@ -62,6 +62,9 @@ func main() { registerGlobalHooks() registerRoutes() registerEvents() + if config.GetQueueConfig().EnableQueue == true { + registerQueues() + } if config.GetGormConfig().EnableGorm == true { RunAutoMigrations() } diff --git a/register-queues.go b/register-queues.go new file mode 100644 index 0000000..a1cf059 --- /dev/null +++ b/register-queues.go @@ -0,0 +1,30 @@ +// Copyright (c) 2025 Zeni Kim +// Use of this source code is governed by MIT-style +// license that can be found in the LICENSE file. + +package main + +import ( + "git.smarteching.com/goffee/core" + "git.smarteching.com/goffee/cup/workers" +) + +// Register queues +func registerQueues() { + + var queque = new(core.Queuemux) + queque.QueueInit() + //######################################## + //# quques registration ##### + //######################################## + + // register your queues here ... + + queque.AddWork(workers.TypeWelcomeEmail, workers.HandleWelcomeEmailTask) + queque.AddWork(workers.TypeReminderEmail, workers.HandleReminderEmailTask) + + //######################################## + // Start queue server, DO NOT TOUCH + //######################################## + go queque.RunQueueserver() +} diff --git a/routes.go b/routes.go index 785c59e..f8d7fec 100644 --- a/routes.go +++ b/routes.go @@ -26,6 +26,7 @@ func registerRoutes() { controller.Get("/themecontent", controllers.Themecontent) controller.Get("/themepanel", controllers.Themedemo) controller.Get("/themeelements", controllers.ThemeElements) + controller.Get("/queuesample", controllers.Queuesample) // Uncomment the lines below to enable authentication controller.Post("/signup", controllers.Signup) diff --git a/workers/workers.go b/workers/workers.go new file mode 100644 index 0000000..e7a9cad --- /dev/null +++ b/workers/workers.go @@ -0,0 +1,39 @@ +package workers + +import ( + "context" + "encoding/json" + "log" + + "github.com/hibiken/asynq" +) + +// A list of task types. +const ( + TypeWelcomeEmail = "email:welcome" + TypeReminderEmail = "email:reminder" +) + +// Task payload for any email related tasks. +type EmailTaskPayload struct { + // ID for the email recipient. + UserID int +} + +func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error { + var p EmailTaskPayload + if err := json.Unmarshal(t.Payload(), &p); err != nil { + return err + } + log.Printf(" [*] Send Welcome Email to User %d", p.UserID) + return nil +} + +func HandleReminderEmailTask(ctx context.Context, t *asynq.Task) error { + var p EmailTaskPayload + if err := json.Unmarshal(t.Payload(), &p); err != nil { + return err + } + log.Printf(" [*] Send Reminder Email to User %d", p.UserID) + return nil +}