package consumer import ( "encoding/json" "fmt" "github.com/rabbitmq/amqp091-go" "hepa-calc-api/api/dao" "hepa-calc-api/extend/rabbitMq" "hepa-calc-api/global" "hepa-calc-api/utils" "strconv" "time" ) type userCouponExpireData struct { UserCouponId string `json:"user_coupon_id"` } // UserCouponExpire 用户优惠卷过期 func UserCouponExpire(msg amqp091.Delivery) { defer func() { if r := recover(); r != nil { utils.LogJsonErr("consumer.UserCouponExpire:", r) _ = msg.Reject(false) } }() // 记录日志 utils.LogJsonInfo("consumer.UserCouponExpire:", string(msg.Body)) if msg.Body == nil { _ = msg.Ack(false) return } // 解析JSON字符串到结构体 var data userCouponExpireData err := json.Unmarshal(msg.Body, &data) if err != nil { _ = msg.Ack(false) return } userCouponId, err := strconv.ParseInt(data.UserCouponId, 10, 64) if err != nil { _ = msg.Ack(false) return } // 获取优惠卷数据 userCouponDao := dao.UserCouponDao{} userCoupon, err := userCouponDao.GetUserCouponById(userCouponId) if err != nil || userCoupon == nil { utils.LogJsonInfo("consumer.UserCouponExpire:", "无优惠卷数据") _ = msg.Reject(false) return } // 检测优惠卷是否被使用 if userCoupon.UserCouponStatus == 1 { _ = msg.Reject(false) return } // 检测优惠卷是否已执行过期处理 if userCoupon.UserCouponStatus == 3 { _ = msg.Reject(false) return } // 检测优惠卷过期时间 now := time.Now() validEndTime := time.Time(*userCoupon.ValidEndTime) diffTime := validEndTime.Sub(now) if diffTime >= 60*time.Second { // 重新添加入队列 data := make(map[string]interface{}) data["user_coupon_id"] = fmt.Sprintf("%d", userCoupon.UserCouponId) p := rabbitMq.PublishS{ QueueName: "user.coupon.expired.delay.queue", ExchangeName: "amqp.delay.direct", RoutingKey: "UserCouponExpired", Message: data, Delay: diffTime, } err = p.PublishWithDelay() if err != nil { utils.LogJsonErr("consumer.UserCouponExpire:", err.Error()) // 重回队列 _ = msg.Reject(true) return } _ = msg.Ack(false) return } // 开始事务 tx := global.Db.Begin() defer func() { if r := recover(); r != nil { tx.Rollback() utils.LogJsonErr("consumer.UserCouponExpire:", r) _ = msg.Reject(false) } }() // 修改用户优惠卷表 userCouponData := make(map[string]interface{}) userCouponData["user_coupon_status"] = 3 err = userCouponDao.EditUserCouponById(tx, userCouponId, userCouponData) if err != nil { tx.Rollback() utils.LogJsonErr("consumer.UserCouponExpire:", err.Error()) _ = msg.Reject(false) return } tx.Commit() _ = msg.Ack(false) }