From 032730ba3eac17a6daf188c3418aaf7b7c0ac7dd Mon Sep 17 00:00:00 2001 From: wucongxing8150 <815046773@qq.com> Date: Wed, 29 May 2024 15:37:06 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8F=91=E6=94=BE=E4=BC=98=E6=83=A0=E5=8D=B7?= =?UTF-8?q?=E6=97=B6=E5=A2=9E=E5=8A=A0=E4=BA=86=E9=80=9A=E7=9F=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api/service/MessagePush.go | 35 +++++++++++++++++++++++++++++++++++ api/service/v1/Coupon.go | 18 ++++++++++++++++-- extend/rabbitMq/rabbitMq.go | 17 ++++++++--------- 3 files changed, 59 insertions(+), 11 deletions(-) diff --git a/api/service/MessagePush.go b/api/service/MessagePush.go index 6d43c33..69d9d6d 100644 --- a/api/service/MessagePush.go +++ b/api/service/MessagePush.go @@ -1 +1,36 @@ package service + +import ( + "errors" + "fmt" + "hospital-open-api/extend/rabbitMq" +) + +// PatientDistributeCoupon 患者-优惠劵发放-站内 +func PatientDistributeCoupon(couponName string, userId int64) (bool, error) { + // 建立队列连接 + rabbitMQ, err := rabbitMq.NewRabbitMQClient() + if err != nil { + return false, errors.New("内部错误") + } + + defer rabbitMQ.Close() + + data := make(map[string]interface{}) + data["user_id"] = fmt.Sprintf("%d", userId) + data["notice_type"] = 3 + data["notice_system_type"] = 2 + data["from_name"] = "肝胆小秘书" + data["notice_brief_title"] = "有新的优惠券已下发至您的账户,点击查看详情。" + data["notice_title"] = fmt.Sprintf("【%s】已到账", couponName) + data["notice_content"] = "有新的优惠劵已下发至您的账户中,点击查看详情!" + data["link_type"] = 7 + + err = rabbitMQ.Publish("send.station.message.queue", "amqp.direct", "SendStationMessage", data) + if err != nil { + return false, err + } + + return true, nil + +} diff --git a/api/service/v1/Coupon.go b/api/service/v1/Coupon.go index 0b809b3..0379e72 100644 --- a/api/service/v1/Coupon.go +++ b/api/service/v1/Coupon.go @@ -8,8 +8,10 @@ import ( dtoV1 "hospital-open-api/api/dto/v1" "hospital-open-api/api/model" requestsV1 "hospital-open-api/api/requests/v1" + "hospital-open-api/api/service" "hospital-open-api/extend/rabbitMq" "hospital-open-api/global" + "hospital-open-api/utils" "strconv" "time" ) @@ -58,6 +60,11 @@ func (r *CouponService) ReceiveCoupon(req requestsV1.ReceiveCoupon) (g *dtoV1.Re // 建立队列连接 rabbitMQ, err := rabbitMq.NewRabbitMQClient() + if err != nil { + tx.Rollback() + return g, errors.New("内部错误") + } + defer rabbitMQ.Close() // 获取优惠卷数据 @@ -124,7 +131,7 @@ func (r *CouponService) ReceiveCoupon(req requestsV1.ReceiveCoupon) (g *dtoV1.Re now := time.Now() // 检测优惠卷过期时间 - if coupon.CouponType == 1 { + if coupon.ValidType == 1 { // 1:绝对时效 validStartTime := time.Time(coupon.ValidStartTime) if now.Before(validStartTime) { @@ -244,11 +251,18 @@ func (r *CouponService) ReceiveCoupon(req requestsV1.ReceiveCoupon) (g *dtoV1.Re delay = 10 * time.Second } - err = rabbitMQ.PublishWithDelay("amqp.delay.direct", "UserCouponExpired", data, delay) + err = rabbitMQ.PublishWithDelay("user.coupon.expired.delay.queue", "amqp.delay.direct", "UserCouponExpired", data, delay) if err != nil { tx.Rollback() return nil, err } + + // 发送通知 + res, _ := service.PatientDistributeCoupon(coupon.CouponName, user.UserId) + if !res { + utils.LogJsonInfo("优惠卷通知发送失败") + } + } } diff --git a/extend/rabbitMq/rabbitMq.go b/extend/rabbitMq/rabbitMq.go index bc5a8a3..9cc9557 100644 --- a/extend/rabbitMq/rabbitMq.go +++ b/extend/rabbitMq/rabbitMq.go @@ -39,20 +39,19 @@ func NewRabbitMQClient() (*RabbitMQ, error) { } // Publish 生产 -func (r *RabbitMQ) Publish(queueName, exchangeName string, message interface{}) error { - q, err := r.channel.QueueDeclare( +func (r *RabbitMQ) Publish(queueName, exchangeName, routingKey string, message interface{}) error { + _, err := r.channel.QueueDeclare( queueName, // 队列名字 - false, // 消息是否持久化 + true, // 消息是否持久化 false, // 不使用的时候删除队列 - false, // 排他 - false, // 是否等待服务器确认 + false, // 是否排他 + false, // 是否阻塞 nil, // arguments ) if err != nil { return err } - // Serialize the message body, err := json.Marshal(message) if err != nil { return err @@ -60,7 +59,7 @@ func (r *RabbitMQ) Publish(queueName, exchangeName string, message interface{}) err = r.channel.Publish( exchangeName, // exchange(交换机名字) - q.Name, // + routingKey, // false, // 是否为无法路由的消息进行返回处理 false, // 是否对路由到无消费者队列的消息进行返回处理 RabbitMQ 3.0 废弃 amqp.Publishing{ @@ -74,9 +73,9 @@ func (r *RabbitMQ) Publish(queueName, exchangeName string, message interface{}) return nil } -func (r *RabbitMQ) PublishWithDelay(exchangeName, routingKey string, message interface{}, delay time.Duration) error { +func (r *RabbitMQ) PublishWithDelay(queueName, exchangeName, routingKey string, message interface{}, delay time.Duration) error { err := r.channel.ExchangeDeclare( - exchangeName, // name + queueName, // name "x-delayed-message", // type true, // durable false, // auto-deleted