117 lines
2.4 KiB
Go
117 lines
2.4 KiB
Go
package consumer
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"github.com/rabbitmq/amqp091-go"
|
|
"hepa-calc-api/api/dao"
|
|
"hepa-calc-api/extend/rabbitMq"
|
|
"hepa-calc-api/global"
|
|
"hepa-calc-api/utils"
|
|
"strconv"
|
|
"time"
|
|
)
|
|
|
|
type userMemberExpireData struct {
|
|
UserId string `json:"user_id"`
|
|
}
|
|
|
|
// UserMemberExpire 用户会员过期
|
|
func UserMemberExpire(msg amqp091.Delivery) {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
utils.LogJsonInfo("consumer.UserMemberExpire:", r)
|
|
_ = msg.Reject(false)
|
|
}
|
|
}()
|
|
|
|
// 记录日志
|
|
utils.LogJsonInfo("consumer.UserMemberExpire:", string(msg.Body))
|
|
if msg.Body == nil {
|
|
_ = msg.Ack(false)
|
|
return
|
|
}
|
|
|
|
// 解析JSON字符串到结构体
|
|
var data userMemberExpireData
|
|
err := json.Unmarshal(msg.Body, &data)
|
|
if err != nil {
|
|
_ = msg.Ack(false)
|
|
return
|
|
}
|
|
|
|
userId, err := strconv.ParseInt(data.UserId, 10, 64)
|
|
if err != nil {
|
|
_ = msg.Ack(false)
|
|
return
|
|
}
|
|
|
|
// 获取用户数据
|
|
userDao := dao.UserDao{}
|
|
user, err := userDao.GetUserById(userId)
|
|
if err != nil || user == nil {
|
|
utils.LogJsonInfo("consumer.UserMemberExpire:", "无用户数据")
|
|
_ = msg.Reject(false)
|
|
return
|
|
}
|
|
|
|
// 检测用户是否为会员
|
|
if user.IsMember == 0 {
|
|
_ = msg.Reject(false)
|
|
return
|
|
}
|
|
|
|
// 检测会员过期时间
|
|
now := time.Now()
|
|
memberExpireDate := time.Time(*user.MemberExpireDate)
|
|
|
|
diffTime := memberExpireDate.Sub(now)
|
|
if diffTime >= 60*time.Second {
|
|
// 重新添加入队列
|
|
data := make(map[string]interface{})
|
|
data["user_id"] = fmt.Sprintf("%d", user.UserId)
|
|
p := rabbitMq.PublishS{
|
|
QueueName: "user.member.expired.delay.queue",
|
|
ExchangeName: "amqp.delay.direct",
|
|
RoutingKey: "UserMemberExpired",
|
|
Message: data,
|
|
Delay: diffTime,
|
|
}
|
|
err = p.PublishWithDelay()
|
|
if err != nil {
|
|
utils.LogJsonErr("consumer.UserMemberExpire:", err.Error())
|
|
// 重回队列
|
|
_ = msg.Reject(true)
|
|
return
|
|
}
|
|
|
|
_ = msg.Ack(false)
|
|
return
|
|
}
|
|
|
|
// 开始事务
|
|
tx := global.Db.Begin()
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
tx.Rollback()
|
|
utils.LogJsonErr("consumer.UserMemberExpire:", r)
|
|
_ = msg.Reject(false)
|
|
}
|
|
}()
|
|
|
|
// 修改用户表
|
|
userData := make(map[string]interface{})
|
|
userData["is_member"] = 0
|
|
err = userDao.EditUserById(tx, userId, userData)
|
|
if err != nil {
|
|
tx.Rollback()
|
|
utils.LogJsonErr("consumer.UserMemberExpire:", err.Error())
|
|
_ = msg.Reject(false)
|
|
return
|
|
}
|
|
|
|
tx.Commit()
|
|
|
|
_ = msg.Ack(false)
|
|
}
|