From e425546627b136e9f4aa5c7c3680a936c5a31568 Mon Sep 17 00:00:00 2001 From: wucongxing8150 <815046773@qq.com> Date: Wed, 29 May 2024 14:49:24 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E4=BA=86amqp=E7=9A=84?= =?UTF-8?q?=E4=BD=BF=E7=94=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api/service/MessagePush.go | 1 + api/service/v1/Coupon.go | 25 +++++- core/amqp.go | 42 ---------- extend/rabbitMq/rabbitMq.go | 156 ++++++++++++++++++++++++++++++++++++ global/global.go | 15 ++-- main.go | 3 - 6 files changed, 185 insertions(+), 57 deletions(-) create mode 100644 api/service/MessagePush.go delete mode 100644 core/amqp.go create mode 100644 extend/rabbitMq/rabbitMq.go diff --git a/api/service/MessagePush.go b/api/service/MessagePush.go new file mode 100644 index 0000000..6d43c33 --- /dev/null +++ b/api/service/MessagePush.go @@ -0,0 +1 @@ +package service diff --git a/api/service/v1/Coupon.go b/api/service/v1/Coupon.go index eaf5141..0b809b3 100644 --- a/api/service/v1/Coupon.go +++ b/api/service/v1/Coupon.go @@ -8,6 +8,7 @@ import ( dtoV1 "hospital-open-api/api/dto/v1" "hospital-open-api/api/model" requestsV1 "hospital-open-api/api/requests/v1" + "hospital-open-api/extend/rabbitMq" "hospital-open-api/global" "strconv" "time" @@ -55,6 +56,10 @@ func (r *CouponService) ReceiveCoupon(req requestsV1.ReceiveCoupon) (g *dtoV1.Re } }() + // 建立队列连接 + rabbitMQ, err := rabbitMq.NewRabbitMQClient() + defer rabbitMQ.Close() + // 获取优惠卷数据 couponDao := dao.CouponDao{} userCouponDao := dao.UserCouponDao{} @@ -164,12 +169,12 @@ func (r *CouponService) ReceiveCoupon(req requestsV1.ReceiveCoupon) (g *dtoV1.Re } // 有效类型(1:绝对时效,xxx-xxx时间段有效 2:相对时效 n天内有效) - if coupon.CouponType == 1 { + if coupon.ValidType == 1 { UserCouponModel.ValidStartTime = time.Time(coupon.ValidStartTime) UserCouponModel.ValidEndTime = time.Time(coupon.ValidEndTime) } - if coupon.CouponType == 2 { + if coupon.ValidType == 2 { UserCouponModel.ValidStartTime = now UserCouponModel.ValidEndTime = now.AddDate(0, 0, coupon.ValidDays) } @@ -228,8 +233,22 @@ func (r *CouponService) ReceiveCoupon(req requestsV1.ReceiveCoupon) (g *dtoV1.Re year, month, day := now.Date() location := now.Location() endOfDay := time.Date(year, month, day, 23, 59, 59, 0, location) - if userCoupon.ValidEndTime.After(endOfDay) { + if userCoupon.ValidEndTime.Before(endOfDay) { // 需添加队列 + data := make(map[string]interface{}) + data["user_coupon_id"] = fmt.Sprintf("%d", userCoupon.UserCouponId) + + delay := userCoupon.ValidEndTime.Sub(time.Now()) + + if delay < 10 { + delay = 10 * time.Second + } + + err = rabbitMQ.PublishWithDelay("amqp.delay.direct", "UserCouponExpired", data, delay) + if err != nil { + tx.Rollback() + return nil, err + } } } diff --git a/core/amqp.go b/core/amqp.go deleted file mode 100644 index 156318e..0000000 --- a/core/amqp.go +++ /dev/null @@ -1,42 +0,0 @@ -package core - -import ( - "fmt" - "github.com/streadway/amqp" - "hospital-open-api/config" - "hospital-open-api/global" - "net/url" -) - -func Amqp() { - m := config.C.Amqp - - user := url.QueryEscape(m.User) - password := url.QueryEscape(m.Password) - - dsn := fmt.Sprintf("amqp://%s:%s@%s:%d/%s", user, - password, m.Host, m.Port, m.Vhost) - - conn, err := amqp.Dial(dsn) - if err != nil { - panic("rabbitMq初始化失败! " + err.Error()) - } - - defer func(conn *amqp.Connection) { - _ = conn.Close() - }(conn) - - ch, err := conn.Channel() - if err != nil { - panic("rabbitMq初始化失败! " + err.Error()) - } - - defer func(ch *amqp.Channel) { - _ = ch.Close() - }(ch) - - global.AmqpConn = conn - global.AmqpChannel = ch - - fmt.Println("初始化rabbitMq成功......") -} diff --git a/extend/rabbitMq/rabbitMq.go b/extend/rabbitMq/rabbitMq.go new file mode 100644 index 0000000..0793c75 --- /dev/null +++ b/extend/rabbitMq/rabbitMq.go @@ -0,0 +1,156 @@ +package rabbitMq + +import ( + "encoding/json" + "fmt" + "github.com/streadway/amqp" + "hospital-open-api/config" + "net/url" + "time" +) + +type RabbitMQ struct { + conn *amqp.Connection + channel *amqp.Channel +} + +// NewRabbitMQClient 初始化客户端 +func NewRabbitMQClient() (*RabbitMQ, error) { + m := config.C.Amqp + + user := url.QueryEscape(m.User) + password := url.QueryEscape(m.Password) + + dsn := fmt.Sprintf("amqp://%s:%s@%s:%d/%s", user, password, m.Host, m.Port, m.Vhost) + conn, err := amqp.Dial(dsn) + if err != nil { + return nil, err + } + + ch, err := conn.Channel() + if err != nil { + return nil, err + } + + return &RabbitMQ{ + conn: conn, + channel: ch, + }, nil +} + +// Publish 生产 +func (r *RabbitMQ) Publish(queueName, exchangeName string, message interface{}) error { + q, err := r.channel.QueueDeclare( + queueName, // 队列名字 + false, // 消息是否持久化 + false, // 不使用的时候删除队列 + false, // 排他 + false, // 是否等待服务器确认 + nil, // arguments + ) + if err != nil { + return err + } + + // Serialize the message + body, err := json.Marshal(message) + if err != nil { + return err + } + + err = r.channel.Publish( + exchangeName, // exchange(交换机名字) + q.Name, // + false, // 是否为无法路由的消息进行返回处理 + false, // 是否对路由到无消费者队列的消息进行返回处理 RabbitMQ 3.0 废弃 + amqp.Publishing{ + ContentType: "text/plain", + Body: body, // 消息内容 + }) + if err != nil { + return err + } + + return nil +} + +func (r *RabbitMQ) PublishWithDelay(exchangeName, routingKey string, message interface{}, delay time.Duration) error { + err := r.channel.ExchangeDeclare( + exchangeName, // name + "x-delayed-message", // type + true, // durable + false, // auto-deleted + false, // internal + false, // no-wait + amqp.Table{"x-delayed-type": "direct"}, // arguments + ) + if err != nil { + return err + } + + body, err := json.Marshal(message) + if err != nil { + return err + } + + err = r.channel.Publish( + exchangeName, // exchange + routingKey, // routing key + false, // 是否为无法路由的消息进行返回处理 + false, // 是否对路由到无消费者队列的消息进行返回处理 RabbitMQ 3.0 废弃 + amqp.Publishing{ + ContentType: "text/plain", + Body: body, + Headers: amqp.Table{"x-delay": int32(delay / time.Millisecond)}, + }) + if err != nil { + return err + } + + return nil +} + +// Consume 消费 +func (r *RabbitMQ) Consume(queueName, exchangeName string) (<-chan amqp.Delivery, error) { + q, err := r.channel.QueueDeclare( + queueName, // 队列名称 + false, // 消息者名称 + false, // 是否确认消费 + false, // 排他 + false, // no-wait + nil, // arguments + ) + if err != nil { + return nil, err + } + + msgs, err := r.channel.Consume( + q.Name, // queue + "", // consumer + true, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + if err != nil { + return nil, err + } + + return msgs, nil +} + +func (r *RabbitMQ) Close() { + if r.channel != nil { + err := r.channel.Close() + if err != nil { + return + } + } + if r.conn != nil { + err := r.conn.Close() + if err != nil { + return + } + } +} diff --git a/global/global.go b/global/global.go index 0b6f721..662c098 100644 --- a/global/global.go +++ b/global/global.go @@ -6,18 +6,15 @@ import ( "github.com/go-playground/validator/v10" "github.com/go-redis/redis/v8" "github.com/sirupsen/logrus" - "github.com/streadway/amqp" "gorm.io/gorm" ) // 全局变量 var ( - Db *gorm.DB // 数据库 - Logger *logrus.Logger // 日志 - Redis *redis.Client // redis - Validate *validator.Validate // 验证器 - Trans ut.Translator // Validate/v10 全局验证器 - Snowflake *snowflake.Node // 雪花算法 - AmqpConn *amqp.Connection // rabbitMq队列 - AmqpChannel *amqp.Channel // rabbitMq队列 + Db *gorm.DB // 数据库 + Logger *logrus.Logger // 日志 + Redis *redis.Client // redis + Validate *validator.Validate // 验证器 + Trans ut.Translator // Validate/v10 全局验证器 + Snowflake *snowflake.Node // 雪花算法 ) diff --git a/main.go b/main.go index 0879514..e4f0154 100644 --- a/main.go +++ b/main.go @@ -30,9 +30,6 @@ func main() { // 加载雪花算法 core.Snowflake() - // 加载队列 - core.Amqp() - // 初始化路由-加载中间件 r := router.Init()