hepa-calc-api/api/amqp/consumer/CouponExpire.go

124 lines
2.6 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package consumer
import (
"encoding/json"
"fmt"
"github.com/rabbitmq/amqp091-go"
"hepa-calc-api/api/dao"
"hepa-calc-api/extend/rabbitMq"
"hepa-calc-api/global"
"hepa-calc-api/utils"
"strconv"
"time"
)
type couponExpireData struct {
CouponId string `json:"coupon_id"`
}
// CouponExpire 优惠卷过期
func CouponExpire(msg amqp091.Delivery) {
defer func() {
if r := recover(); r != nil {
utils.LogJsonErr("consumer.CouponExpire:", r)
_ = msg.Reject(false)
}
}()
// 记录日志
utils.LogJsonInfo("consumer.CouponExpire:", string(msg.Body))
if msg.Body == nil {
_ = msg.Ack(false)
return
}
// 解析JSON字符串到结构体
var data couponExpireData
err := json.Unmarshal(msg.Body, &data)
if err != nil {
_ = msg.Ack(false)
return
}
couponId, err := strconv.ParseInt(data.CouponId, 10, 64)
if err != nil {
_ = msg.Ack(false)
return
}
// 获取优惠卷数据
couponDao := dao.CouponDao{}
coupon, err := couponDao.GetCouponById(couponId)
if err != nil || coupon == nil {
utils.LogJsonInfo("consumer.CouponExpire:", "无优惠卷数据")
_ = msg.Reject(false)
return
}
// 检测优惠卷状态-状态1:正常 2:强制失效 3:结束 4:删除)
if coupon.CouponStatus != 1 {
// 正常,无需处理
_ = msg.Ack(false)
return
}
// 检测优惠卷过期类型
if coupon.ValidType == 2 {
// 正常,无需处理
_ = msg.Ack(false)
return
}
// 检测优惠卷过期时间
now := time.Now()
validEndTime := time.Time(*coupon.ValidEndTime)
diffTime := validEndTime.Sub(now)
if diffTime >= 60*time.Second {
// 重新添加入队列
data := make(map[string]interface{})
data["coupon_id"] = fmt.Sprintf("%d", coupon.CouponId)
p := rabbitMq.PublishS{
QueueName: "coupon.expired.delay.queue",
ExchangeName: "amqp.delay.direct",
RoutingKey: "CouponExpired",
Message: data,
Delay: diffTime,
}
err = p.PublishWithDelay()
if err != nil {
utils.LogJsonErr("consumer.CouponExpire:", err.Error())
// 重回队列
_ = msg.Reject(true)
return
}
_ = msg.Ack(false)
return
}
// 开始事务
tx := global.Db.Begin()
defer func() {
if r := recover(); r != nil {
tx.Rollback()
utils.LogJsonErr("consumer.CouponExpire:", r)
_ = msg.Reject(false)
}
}()
// 修改用户优惠卷表
couponData := make(map[string]interface{})
couponData["coupon_status"] = 3
err = couponDao.EditCouponById(tx, couponId, couponData)
if err != nil {
tx.Rollback()
utils.LogJsonErr("consumer.CouponExpire:", err.Error())
_ = msg.Reject(false)
return
}
tx.Commit()
_ = msg.Ack(false)
}