增加了 用户会员过期队列
This commit is contained in:
parent
9d17c26ef5
commit
8094cd3c51
114
api/amqp/consumer/UserMemberExpire.go
Normal file
114
api/amqp/consumer/UserMemberExpire.go
Normal file
@ -0,0 +1,114 @@
|
||||
package consumer
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/rabbitmq/amqp091-go"
|
||||
"hepa-calc-api/api/dao"
|
||||
"hepa-calc-api/extend/rabbitMq"
|
||||
"hepa-calc-api/global"
|
||||
"hepa-calc-api/utils"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
type userMemberExpireData struct {
|
||||
UserId string `json:"user_id"`
|
||||
}
|
||||
|
||||
// UserMemberExpire 用户会员过期
|
||||
func UserMemberExpire(msg amqp091.Delivery) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
utils.LogJsonInfo("consumer.UserMemberExpire:", r)
|
||||
_ = msg.Reject(false)
|
||||
}
|
||||
}()
|
||||
|
||||
// 记录日志
|
||||
utils.LogJsonInfo("consumer.UserMemberExpire:", string(msg.Body))
|
||||
if msg.Body == nil {
|
||||
_ = msg.Ack(false)
|
||||
return
|
||||
}
|
||||
|
||||
// 解析JSON字符串到结构体
|
||||
var data userMemberExpireData
|
||||
err := json.Unmarshal(msg.Body, &data)
|
||||
if err != nil {
|
||||
_ = msg.Ack(false)
|
||||
return
|
||||
}
|
||||
|
||||
userId, err := strconv.ParseInt(data.UserId, 10, 64)
|
||||
if err != nil {
|
||||
_ = msg.Ack(false)
|
||||
return
|
||||
}
|
||||
|
||||
// 获取用户数据
|
||||
userDao := dao.UserDao{}
|
||||
user, err := userDao.GetUserById(userId)
|
||||
if err != nil || user == nil {
|
||||
utils.LogJsonInfo("consumer.UserMemberExpire:", "无用户数据")
|
||||
_ = msg.Reject(false)
|
||||
return
|
||||
}
|
||||
|
||||
// 检测用户是否为会员
|
||||
if user.IsMember == 0 {
|
||||
_ = msg.Reject(false)
|
||||
return
|
||||
}
|
||||
|
||||
// 检测会员过期时间
|
||||
now := time.Now()
|
||||
diffTime := user.MemberExpireDate.Sub(now)
|
||||
if diffTime >= 60*time.Second {
|
||||
// 重新添加入队列
|
||||
data := make(map[string]interface{})
|
||||
data["user_id"] = fmt.Sprintf("%d", user.UserId)
|
||||
p := rabbitMq.PublishS{
|
||||
QueueName: "user.member.expired.delay.queue",
|
||||
ExchangeName: "amqp.delay.direct",
|
||||
RoutingKey: "UserMemberExpired",
|
||||
Message: data,
|
||||
Delay: diffTime,
|
||||
}
|
||||
err = p.PublishWithDelay()
|
||||
if err != nil {
|
||||
utils.LogJsonErr("consumer.UserMemberExpire:", err.Error())
|
||||
// 重回队列
|
||||
_ = msg.Reject(true)
|
||||
return
|
||||
}
|
||||
|
||||
_ = msg.Ack(false)
|
||||
return
|
||||
}
|
||||
|
||||
// 开始事务
|
||||
tx := global.Db.Begin()
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
tx.Rollback()
|
||||
utils.LogJsonErr("consumer.UserMemberExpire:", r)
|
||||
_ = msg.Reject(false)
|
||||
}
|
||||
}()
|
||||
|
||||
// 修改用户表
|
||||
userData := make(map[string]interface{})
|
||||
userData["is_member"] = 0
|
||||
err = userDao.EditUserById(tx, userId, userData)
|
||||
if err != nil {
|
||||
tx.Rollback()
|
||||
utils.LogJsonErr("consumer.UserMemberExpire:", err.Error())
|
||||
_ = msg.Reject(false)
|
||||
return
|
||||
}
|
||||
|
||||
tx.Commit()
|
||||
|
||||
_ = msg.Ack(false)
|
||||
}
|
||||
71
api/crontab/UserMemberExpire.go
Normal file
71
api/crontab/UserMemberExpire.go
Normal file
@ -0,0 +1,71 @@
|
||||
package crontab
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"hepa-calc-api/api/dao"
|
||||
"hepa-calc-api/api/model"
|
||||
"hepa-calc-api/extend/rabbitMq"
|
||||
"hepa-calc-api/utils"
|
||||
"time"
|
||||
)
|
||||
|
||||
// UserMemberExpire 用户会员过期
|
||||
func UserMemberExpire() {
|
||||
// 获取今日过期用户
|
||||
users := getExecUserMember()
|
||||
if len(users) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
for _, user := range users {
|
||||
// 计算过期时间
|
||||
validEndTime := user.MemberExpireDate
|
||||
|
||||
delay := validEndTime.Sub(time.Now())
|
||||
if delay < 5*time.Second {
|
||||
delay = 5 * time.Second
|
||||
}
|
||||
|
||||
// 添加处理用户会员过期队列
|
||||
data := make(map[string]interface{})
|
||||
data["user_id"] = fmt.Sprintf("%d", user.UserId)
|
||||
|
||||
p := rabbitMq.PublishS{
|
||||
QueueName: "user.member.expired.delay.queue",
|
||||
ExchangeName: "amqp.delay.direct",
|
||||
RoutingKey: "UserMemberExpired",
|
||||
Message: data,
|
||||
Delay: delay,
|
||||
}
|
||||
err := p.PublishWithDelay()
|
||||
if err != nil {
|
||||
utils.LogJsonErr("添加处理用户会员过期队列失败:", err.Error())
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 获取可执行数据
|
||||
func getExecUserMember() (users []*model.User) {
|
||||
now := time.Now()
|
||||
|
||||
// 今天开始时间
|
||||
year, month, day := now.Date()
|
||||
location := now.Location()
|
||||
startTime := time.Date(year, month, day, 00, 00, 00, 0, location).Format("2006-01-02 15:04:05")
|
||||
|
||||
// 今天结束时间
|
||||
endTime := time.Date(year, month, day, 23, 59, 59, 0, location).Format("2006-01-02 15:04:05")
|
||||
|
||||
maps := make(map[string]interface{})
|
||||
maps["is_member"] = 1
|
||||
|
||||
userDao := dao.UserDao{}
|
||||
users, err := userDao.GetUserListByMemberValidTime(maps, startTime, endTime)
|
||||
if err != nil {
|
||||
utils.LogJsonErr("系统优惠卷过期:", err.Error())
|
||||
return nil
|
||||
}
|
||||
|
||||
return users
|
||||
}
|
||||
@ -106,3 +106,12 @@ func (r *UserDao) GetUser(maps interface{}) (m *model.User, err error) {
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
// GetUserListByMemberValidTime 获取列表-今天开始时间/过期时间
|
||||
func (r *UserDao) GetUserListByMemberValidTime(maps interface{}, startTime, endTime string) (m []*model.User, err error) {
|
||||
err = global.Db.Where(maps).Where("member_expire_date BETWEEN ? AND ?", startTime, endTime).Find(&m).Error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
@ -21,6 +21,12 @@ func StartCron() {
|
||||
panic("定时器启动失败:" + err.Error())
|
||||
}
|
||||
|
||||
// 用户会员过期
|
||||
_, err = c.AddFunc("0 0 0 * * *", crontab.UserMemberExpire)
|
||||
if err != nil {
|
||||
panic("定时器启动失败:" + err.Error())
|
||||
}
|
||||
|
||||
// 启动定时任务调度器
|
||||
c.Start()
|
||||
|
||||
|
||||
@ -49,6 +49,15 @@ func StartRabbitMqConsume() {
|
||||
Handler: consumer.CancelUnPayOrder,
|
||||
}
|
||||
go startConsumer(s)
|
||||
|
||||
// 用户会员过期
|
||||
s = rabbitMq.ConsumeS{
|
||||
QueueName: "user.member.expired.delay.queue",
|
||||
ExchangeName: "amqp.delay.direct",
|
||||
RoutingKey: "UserMemberExpired",
|
||||
Handler: consumer.UserMemberExpire,
|
||||
}
|
||||
go startConsumer(s)
|
||||
}
|
||||
|
||||
// startConsumer 启动指定的消费者协程
|
||||
|
||||
@ -13,8 +13,8 @@ import (
|
||||
func Redis() {
|
||||
global.Redis = redis.NewClient(&redis.Options{
|
||||
Addr: config.C.Redis.Host + ":" + strconv.Itoa(config.C.Redis.Port),
|
||||
Password: config.C.Redis.Password, // no password set
|
||||
DB: config.C.Redis.Db, // use default DB
|
||||
Password: config.C.Redis.Password,
|
||||
DB: config.C.Redis.Db,
|
||||
PoolSize: config.C.Redis.PoolSize,
|
||||
})
|
||||
_, err := global.Redis.Ping(context.Background()).Result()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user