From 8094cd3c513bd973bd4af5067bb192ff50f33f99 Mon Sep 17 00:00:00 2001 From: wucongxing8150 <815046773@qq.com> Date: Thu, 1 Aug 2024 14:16:15 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E4=BA=86=20=E7=94=A8?= =?UTF-8?q?=E6=88=B7=E4=BC=9A=E5=91=98=E8=BF=87=E6=9C=9F=E9=98=9F=E5=88=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api/amqp/consumer/UserMemberExpire.go | 114 ++++++++++++++++++++++++++ api/crontab/UserMemberExpire.go | 71 ++++++++++++++++ api/dao/User.go | 9 ++ core/cron.go | 6 ++ core/rabbitMq.go | 9 ++ core/redis.go | 4 +- 6 files changed, 211 insertions(+), 2 deletions(-) create mode 100644 api/amqp/consumer/UserMemberExpire.go create mode 100644 api/crontab/UserMemberExpire.go diff --git a/api/amqp/consumer/UserMemberExpire.go b/api/amqp/consumer/UserMemberExpire.go new file mode 100644 index 0000000..a096293 --- /dev/null +++ b/api/amqp/consumer/UserMemberExpire.go @@ -0,0 +1,114 @@ +package consumer + +import ( + "encoding/json" + "fmt" + "github.com/rabbitmq/amqp091-go" + "hepa-calc-api/api/dao" + "hepa-calc-api/extend/rabbitMq" + "hepa-calc-api/global" + "hepa-calc-api/utils" + "strconv" + "time" +) + +type userMemberExpireData struct { + UserId string `json:"user_id"` +} + +// UserMemberExpire 用户会员过期 +func UserMemberExpire(msg amqp091.Delivery) { + defer func() { + if r := recover(); r != nil { + utils.LogJsonInfo("consumer.UserMemberExpire:", r) + _ = msg.Reject(false) + } + }() + + // 记录日志 + utils.LogJsonInfo("consumer.UserMemberExpire:", string(msg.Body)) + if msg.Body == nil { + _ = msg.Ack(false) + return + } + + // 解析JSON字符串到结构体 + var data userMemberExpireData + err := json.Unmarshal(msg.Body, &data) + if err != nil { + _ = msg.Ack(false) + return + } + + userId, err := strconv.ParseInt(data.UserId, 10, 64) + if err != nil { + _ = msg.Ack(false) + return + } + + // 获取用户数据 + userDao := dao.UserDao{} + user, err := userDao.GetUserById(userId) + if err != nil || user == nil { + utils.LogJsonInfo("consumer.UserMemberExpire:", "无用户数据") + _ = msg.Reject(false) + return + } + + // 检测用户是否为会员 + if user.IsMember == 0 { + _ = msg.Reject(false) + return + } + + // 检测会员过期时间 + now := time.Now() + diffTime := user.MemberExpireDate.Sub(now) + if diffTime >= 60*time.Second { + // 重新添加入队列 + data := make(map[string]interface{}) + data["user_id"] = fmt.Sprintf("%d", user.UserId) + p := rabbitMq.PublishS{ + QueueName: "user.member.expired.delay.queue", + ExchangeName: "amqp.delay.direct", + RoutingKey: "UserMemberExpired", + Message: data, + Delay: diffTime, + } + err = p.PublishWithDelay() + if err != nil { + utils.LogJsonErr("consumer.UserMemberExpire:", err.Error()) + // 重回队列 + _ = msg.Reject(true) + return + } + + _ = msg.Ack(false) + return + } + + // 开始事务 + tx := global.Db.Begin() + defer func() { + if r := recover(); r != nil { + tx.Rollback() + utils.LogJsonErr("consumer.UserMemberExpire:", r) + _ = msg.Reject(false) + } + }() + + // 修改用户表 + userData := make(map[string]interface{}) + userData["is_member"] = 0 + err = userDao.EditUserById(tx, userId, userData) + if err != nil { + tx.Rollback() + utils.LogJsonErr("consumer.UserMemberExpire:", err.Error()) + _ = msg.Reject(false) + return + } + + tx.Commit() + + _ = msg.Ack(false) +} diff --git a/api/crontab/UserMemberExpire.go b/api/crontab/UserMemberExpire.go new file mode 100644 index 0000000..a84927a --- /dev/null +++ b/api/crontab/UserMemberExpire.go @@ -0,0 +1,71 @@ +package crontab + +import ( + "fmt" + "hepa-calc-api/api/dao" + "hepa-calc-api/api/model" + "hepa-calc-api/extend/rabbitMq" + "hepa-calc-api/utils" + "time" +) + +// UserMemberExpire 用户会员过期 +func UserMemberExpire() { + // 获取今日过期用户 + users := getExecUserMember() + if len(users) == 0 { + return + } + + for _, user := range users { + // 计算过期时间 + validEndTime := user.MemberExpireDate + + delay := validEndTime.Sub(time.Now()) + if delay < 5*time.Second { + delay = 5 * time.Second + } + + // 添加处理用户会员过期队列 + data := make(map[string]interface{}) + data["user_id"] = fmt.Sprintf("%d", user.UserId) + + p := rabbitMq.PublishS{ + QueueName: "user.member.expired.delay.queue", + ExchangeName: "amqp.delay.direct", + RoutingKey: "UserMemberExpired", + Message: data, + Delay: delay, + } + err := p.PublishWithDelay() + if err != nil { + utils.LogJsonErr("添加处理用户会员过期队列失败:", err.Error()) + return + } + } +} + +// 获取可执行数据 +func getExecUserMember() (users []*model.User) { + now := time.Now() + + // 今天开始时间 + year, month, day := now.Date() + location := now.Location() + startTime := time.Date(year, month, day, 00, 00, 00, 0, location).Format("2006-01-02 15:04:05") + + // 今天结束时间 + endTime := time.Date(year, month, day, 23, 59, 59, 0, location).Format("2006-01-02 15:04:05") + + maps := make(map[string]interface{}) + maps["is_member"] = 1 + + userDao := dao.UserDao{} + users, err := userDao.GetUserListByMemberValidTime(maps, startTime, endTime) + if err != nil { + utils.LogJsonErr("系统优惠卷过期:", err.Error()) + return nil + } + + return users +} diff --git a/api/dao/User.go b/api/dao/User.go index 3f55f33..3457457 100644 --- a/api/dao/User.go +++ b/api/dao/User.go @@ -106,3 +106,12 @@ func (r *UserDao) GetUser(maps interface{}) (m *model.User, err error) { } return m, nil } + +// GetUserListByMemberValidTime 获取列表-今天开始时间/过期时间 +func (r *UserDao) GetUserListByMemberValidTime(maps interface{}, startTime, endTime string) (m []*model.User, err error) { + err = global.Db.Where(maps).Where("member_expire_date BETWEEN ? AND ?", startTime, endTime).Find(&m).Error + if err != nil { + return nil, err + } + return m, nil +} diff --git a/core/cron.go b/core/cron.go index 4fa4366..878373e 100644 --- a/core/cron.go +++ b/core/cron.go @@ -21,6 +21,12 @@ func StartCron() { panic("定时器启动失败:" + err.Error()) } + // 用户会员过期 + _, err = c.AddFunc("0 0 0 * * *", crontab.UserMemberExpire) + if err != nil { + panic("定时器启动失败:" + err.Error()) + } + // 启动定时任务调度器 c.Start() diff --git a/core/rabbitMq.go b/core/rabbitMq.go index 72a35dd..e257ec6 100644 --- a/core/rabbitMq.go +++ b/core/rabbitMq.go @@ -49,6 +49,15 @@ func StartRabbitMqConsume() { Handler: consumer.CancelUnPayOrder, } go startConsumer(s) + + // 用户会员过期 + s = rabbitMq.ConsumeS{ + QueueName: "user.member.expired.delay.queue", + ExchangeName: "amqp.delay.direct", + RoutingKey: "UserMemberExpired", + Handler: consumer.UserMemberExpire, + } + go startConsumer(s) } // startConsumer 启动指定的消费者协程 diff --git a/core/redis.go b/core/redis.go index 2c9d55a..0b7bfa8 100644 --- a/core/redis.go +++ b/core/redis.go @@ -13,8 +13,8 @@ import ( func Redis() { global.Redis = redis.NewClient(&redis.Options{ Addr: config.C.Redis.Host + ":" + strconv.Itoa(config.C.Redis.Port), - Password: config.C.Redis.Password, // no password set - DB: config.C.Redis.Db, // use default DB + Password: config.C.Redis.Password, + DB: config.C.Redis.Db, PoolSize: config.C.Redis.PoolSize, }) _, err := global.Redis.Ping(context.Background()).Result()