diff --git a/api/amqp/consumer/UserCouponExpire.go b/api/amqp/consumer/UserCouponExpire.go new file mode 100644 index 0000000..ab9df72 --- /dev/null +++ b/api/amqp/consumer/UserCouponExpire.go @@ -0,0 +1,121 @@ +package consumer + +import ( + "encoding/json" + "fmt" + "github.com/rabbitmq/amqp091-go" + "hepa-calc-api/api/dao" + "hepa-calc-api/extend/rabbitMq" + "hepa-calc-api/global" + "hepa-calc-api/utils" + "strconv" + "time" +) + +type UserCouponExpireData struct { + UserCouponId string `json:"user_coupon_id"` +} + +// UserCouponExpire 用户优惠卷过期 +func UserCouponExpire(msg amqp091.Delivery) { + defer func() { + if r := recover(); r != nil { + utils.LogJsonInfo("consumer.UserCouponExpire:", r) + _ = msg.Reject(false) + } + }() + + // 记录日志 + utils.LogJsonInfo("consumer.UserCouponExpire:", string(msg.Body)) + if msg.Body == nil { + _ = msg.Ack(false) + return + } + + // 解析JSON字符串到结构体 + var data UserCouponExpireData + err := json.Unmarshal(msg.Body, &data) + if err != nil { + _ = msg.Ack(false) + return + } + + userCouponId, err := strconv.ParseInt(data.UserCouponId, 10, 64) + if err != nil { + _ = msg.Ack(false) + return + } + + // 获取优惠卷数据 + userCouponDao := dao.UserCouponDao{} + userCoupon, err := userCouponDao.GetUserCouponById(userCouponId) + if err != nil || userCoupon == nil { + utils.LogJsonInfo("consumer.UserCouponExpire:", "无优惠卷数据") + _ = msg.Reject(false) + return + } + + // 检测优惠卷是否被使用 + if userCoupon.UserCouponStatus == 1 { + _ = msg.Reject(false) + return + } + + // 检测优惠卷是否已执行过期处理 + if userCoupon.UserCouponStatus == 3 { + _ = msg.Reject(false) + return + } + + // 检测优惠卷过期时间 + now := time.Now() + validEndTime := time.Time(userCoupon.ValidEndTime) + diffTime := validEndTime.Sub(now) + if diffTime >= 60*time.Second { + // 重新添加入队列 + data := make(map[string]interface{}) + data["user_coupon_id"] = fmt.Sprintf("%d", userCoupon.UserCouponId) + p := rabbitMq.PublishS{ + QueueName: "user.coupon.expired.delay.queue", + ExchangeName: "amqp.delay.direct", + RoutingKey: "UserCouponExpired", + Message: data, + Delay: diffTime, + } + err = p.PublishWithDelay() + if err != nil { + utils.LogJsonErr("consumer.UserCouponExpire:", err.Error()) + // 重回队列 + _ = msg.Reject(true) + return + } + + _ = msg.Ack(false) + return + } + + // 开始事务 + tx := global.Db.Begin() + defer func() { + if r := recover(); r != nil { + tx.Rollback() + utils.LogJsonErr("consumer.UserCouponExpire:", r) + _ = msg.Reject(false) + } + }() + + // 修改用户优惠卷表 + userCouponData := make(map[string]interface{}) + userCouponData["user_coupon_status"] = 3 + err = userCouponDao.EditUserCouponById(tx, userCouponId, userCouponData) + if err != nil { + tx.Rollback() + utils.LogJsonErr("consumer.UserCouponExpire:", err.Error()) + _ = msg.Reject(false) + return + } + + tx.Commit() + + err = msg.Ack(false) +} diff --git a/api/controller/CallBack.go b/api/controller/CallBack.go index b05af41..2682ca4 100644 --- a/api/controller/CallBack.go +++ b/api/controller/CallBack.go @@ -93,6 +93,15 @@ func (r *CallBack) WxPaySingle(c *gin.Context) { return } + // 增加题目支付次数 + questionDao := dao.QuestionDao{} + err = questionDao.Inc(tx, orderSingle.QuestionId, "pay_count", 1) + if err != nil { + tx.Rollback() + responses.FailWithMessage("内部错误", c) + return + } + tx.Commit() c.JSON(http.StatusOK, gin.H{"code": "SUCCESS", "message": "OK"}) } diff --git a/api/crontab/SystemCouponExpire.go b/api/crontab/SystemCouponExpire.go new file mode 100644 index 0000000..b547c84 --- /dev/null +++ b/api/crontab/SystemCouponExpire.go @@ -0,0 +1,55 @@ +package crontab + +import ( + "hepa-calc-api/api/dao" + "hepa-calc-api/api/model" + "hepa-calc-api/utils" + "time" +) + +// SystemCouponExpire 系统优惠卷过期 +func SystemCouponExpire() { + // 获取今日过期优惠卷 + coupons := getSystemExecCoupon() + if len(coupons) == 0 { + return + } + + for _, coupon := range coupons { + // 计算过期时间 + validEndTime := time.Time(coupon.ValidEndTime) + + diffTime := validEndTime.Sub(time.Now()) + if diffTime < 5*time.Second { + diffTime = 5 * time.Second + } + + // 添加处理优惠卷过期队列 + } +} + +// 获取今日过期优惠卷 +func getSystemExecCoupon() (coupon []*model.Coupon) { + now := time.Now() + + // 今天开始时间 + year, month, day := now.Date() + location := now.Location() + startTime := time.Date(year, month, day, 00, 00, 00, 0, location).Format("2006-01-02 15:04:05") + + // 今天结束时间 + endTime := time.Date(year, month, day, 23, 59, 59, 0, location).Format("2006-01-02 15:04:05") + + maps := make(map[string]interface{}) + maps["coupon_status"] = 1 + maps["valid_type"] = 1 + + couponDao := dao.CouponDao{} + coupons, err := couponDao.GetCouponListByValidTime(maps, startTime, endTime) + if err != nil { + utils.LogJsonErr("系统优惠卷过期:", err.Error()) + return nil + } + + return coupons +} diff --git a/api/crontab/UserCouponExpire.go b/api/crontab/UserCouponExpire.go new file mode 100644 index 0000000..9f4c95c --- /dev/null +++ b/api/crontab/UserCouponExpire.go @@ -0,0 +1,75 @@ +package crontab + +import ( + "fmt" + "hepa-calc-api/api/dao" + "hepa-calc-api/api/model" + "hepa-calc-api/extend/rabbitMq" + "hepa-calc-api/utils" + "time" +) + +// UserCouponExpire 用户优惠卷过期 +func UserCouponExpire() { + // 获取今日过期优惠卷 + userCoupons := getUserExecCoupon() + if len(userCoupons) == 0 { + return + } + + for _, userCoupon := range userCoupons { + // 计算过期时间 + validEndTime := time.Time(userCoupon.ValidEndTime) + + delay := validEndTime.Sub(time.Now()) + if delay < 5*time.Second { + delay = 5 * time.Second + } + + // 添加处理优惠卷过期队列 + data := make(map[string]interface{}) + data["user_coupon_id"] = fmt.Sprintf("%d", userCoupon.UserCouponId) + + p := rabbitMq.PublishS{ + QueueName: "user.coupon.expired.delay.queue", + ExchangeName: "amqp.delay.direct", + RoutingKey: "UserCouponExpired", + Message: data, + Delay: delay, + } + err := p.PublishWithDelay() + if err != nil { + utils.LogJsonErr("添加处理用户优惠卷过期队列失败:", err.Error()) + return + } + + fmt.Println(userCoupon.UserCouponId) + now := time.Now().Format("2006-01-02 15:04:05") + fmt.Println("添加队列成功" + now) + } +} + +// 获取今日过期优惠卷 +func getUserExecCoupon() (coupon []*model.UserCoupon) { + now := time.Now() + + // 今天开始时间 + year, month, day := now.Date() + location := now.Location() + startTime := time.Date(year, month, day, 00, 00, 00, 0, location).Format("2006-01-02 15:04:05") + + // 今天结束时间 + endTime := time.Date(year, month, day, 23, 59, 59, 0, location).Format("2006-01-02 15:04:05") + + maps := make(map[string]interface{}) + maps["user_coupon_status"] = 0 + + userCouponDao := dao.UserCouponDao{} + userCoupons, err := userCouponDao.GetUserCouponListByValidTime(maps, startTime, endTime) + if err != nil { + utils.LogJsonErr("系统优惠卷过期:", err.Error()) + return nil + } + + return userCoupons +} diff --git a/api/dao/Coupon.go b/api/dao/Coupon.go index f466675..e318f07 100644 --- a/api/dao/Coupon.go +++ b/api/dao/Coupon.go @@ -106,3 +106,12 @@ func (r *CouponDao) GetCoupon(maps interface{}) (m *model.Coupon, err error) { } return m, nil } + +// GetCouponListByValidTime 获取列表-今天开始时间/过期时间 +func (r *CouponDao) GetCouponListByValidTime(maps interface{}, startTime, endTime string) (m []*model.Coupon, err error) { + err = global.Db.Where(maps).Where("valid_end_time BETWEEN ? AND ?", startTime, endTime).Find(&m).Error + if err != nil { + return nil, err + } + return m, nil +} diff --git a/api/dao/UserCoupon.go b/api/dao/UserCoupon.go index b7b7dba..97bcacd 100644 --- a/api/dao/UserCoupon.go +++ b/api/dao/UserCoupon.go @@ -200,3 +200,12 @@ func (r *UserCouponDao) GetUserCouponPageSearch(req requests.GetUserCouponPage, } return m, totalRecords, nil } + +// GetUserCouponListByValidTime 获取列表-今天开始时间/过期时间 +func (r *UserCouponDao) GetUserCouponListByValidTime(maps interface{}, startTime, endTime string) (m []*model.UserCoupon, err error) { + err = global.Db.Where(maps).Where("valid_end_time BETWEEN ? AND ?", startTime, endTime).Find(&m).Error + if err != nil { + return nil, err + } + return m, nil +} diff --git a/api/router/router.go b/api/router/router.go index 34429ca..052546c 100644 --- a/api/router/router.go +++ b/api/router/router.go @@ -153,7 +153,6 @@ func privateRouter(r *gin.Engine, api controller.Api) { // 增加问题提交次数(提交个人信息进行了算算的人次) clickGroup.PUT("/submit/:question_id", api.Question.PutQuestionSubmitCount) } - } // 用户 @@ -246,5 +245,4 @@ func privateRouter(r *gin.Engine, api controller.Api) { // 获取会员配置数据 memberGroup.GET("", api.SystemMember.GetSystemMember) } - } diff --git a/api/service/OrderMember.go b/api/service/OrderMember.go index 4565109..78c7bb1 100644 --- a/api/service/OrderMember.go +++ b/api/service/OrderMember.go @@ -297,7 +297,7 @@ func (r *OrderMemberService) GetJsapiPrepay(m *model.OrderMember) (prepay *jsapi jsapiRequest := weChat.JsapiRequest{ AppId: config.C.Wechat.AppId, - MchId: config.C.Wechat.Pay1659662936.MchId, + MchId: config.C.Wechat.Pay1281030301.MchId, Description: "肝病算一算", OutTradeNo: m.OrderNo, NotifyUrl: config.C.Wechat.RefundNotifyDomain + config.C.Wechat.RefundNotifyUrl, @@ -331,7 +331,7 @@ func (r *OrderMemberService) GetAppPrepay(m *model.OrderMember) (prepay *app.Pre appRequest := weChat.AppRequest{ AppId: config.C.Wechat.AppId, - MchId: config.C.Wechat.Pay1659662936.MchId, + MchId: config.C.Wechat.Pay1281030301.MchId, Description: "肝病算一算", OutTradeNo: m.OrderNo, NotifyUrl: config.C.Wechat.RefundNotifyDomain + config.C.Wechat.RefundNotifyUrl, diff --git a/api/service/OrderSingle.go b/api/service/OrderSingle.go index ec0d9a2..749d410 100644 --- a/api/service/OrderSingle.go +++ b/api/service/OrderSingle.go @@ -293,7 +293,7 @@ func (r *OrderSingleService) GetJsapiPrepay(m *model.OrderSingle) (prepay *jsapi jsapiRequest := weChat.JsapiRequest{ AppId: config.C.Wechat.AppId, - MchId: config.C.Wechat.Pay1659662936.MchId, + MchId: config.C.Wechat.Pay1281030301.MchId, Description: "肝病算一算", OutTradeNo: m.OrderNo, NotifyUrl: config.C.Wechat.RefundNotifyDomain + config.C.Wechat.RefundNotifyUrl, @@ -327,7 +327,7 @@ func (r *OrderSingleService) GetAppPrepay(m *model.OrderSingle) (prepay *app.Pre appRequest := weChat.AppRequest{ AppId: config.C.Wechat.AppId, - MchId: config.C.Wechat.Pay1659662936.MchId, + MchId: config.C.Wechat.Pay1281030301.MchId, Description: "肝病算一算", OutTradeNo: m.OrderNo, NotifyUrl: config.C.Wechat.RefundNotifyDomain + config.C.Wechat.RefundNotifyUrl, diff --git a/api/service/UserCoupon.go b/api/service/UserCoupon.go index 4baee2a..6648575 100644 --- a/api/service/UserCoupon.go +++ b/api/service/UserCoupon.go @@ -117,7 +117,7 @@ func (r *UserCouponService) ReturnUserCoupon(tx *gorm.DB, userCouponId int64) bo return true } -// GetUserUsableQuestionCoupon 获取患者可使用优惠卷-单项 +// GetUserUsableQuestionCoupon 获取用户可使用优惠卷-单项 func (r *UserCouponService) GetUserUsableQuestionCoupon(userId, questionId int64, amountTotal float64) (g []*dto.UserCouponDto, err error) { // 获取用户数据 userDao := dao.UserDao{} @@ -162,7 +162,7 @@ func (r *UserCouponService) GetUserUsableQuestionCoupon(userId, questionId int64 return g, nil } -// GetUserUsableMemberCoupon 获取患者可使用优惠卷-会员 +// GetUserUsableMemberCoupon 获取用户可使用优惠卷-会员 func (r *UserCouponService) GetUserUsableMemberCoupon(userId, systemMemberId int64, amountTotal float64) (g []*dto.UserCouponDto, err error) { // 获取用户数据 userDao := dao.UserDao{} diff --git a/config.yaml b/config.yaml index 9bd3306..f3b5eca 100644 --- a/config.yaml +++ b/config.yaml @@ -47,15 +47,23 @@ dysms: # [微信] wechat: - app-id: wxc8ac5051745bc795 - app-secret: 678b63a8a7541e528abc3040c3cea809 + app-id: wx68affaa9d23528f8 + app-secret: 2963c90242ddb2421c939591ad9e903d pay-notify-url: callback/wxpay/single refund-notify-url: callback/wxpay/inquiry/refund - refund-notify-domain: https://dev.hospital.applets.igandanyiyuan.com/ - pay-1659662936: - mch-id: 1659662936 - v3-api-secret: gdxz292sjSOadNNad2pCda03NfC2msmY - mch-certificate-serial-number: 12FAA5F61708B795BB5337AE915494E2DC2CA87B - platform-certs: extend/weChat/certs/1659662936/wechatpay_5B5C8A69CC86D1127F6B6AA06AAAF10531EEFE90.pem - private-key: extend/weChat/certs/1659662936/apiclient_key.pem - certificate: extend/weChat/certs/1659662936/apiclient_cert.pem \ No newline at end of file + refund-notify-domain: https://dev-hepa.igandan.com/api/ + pay-1281030301: + mch-id: 1281030301 + v3-api-secret: sB2tCkT70uwEy7cQCu1llA6nilTbek6F + mch-certificate-serial-number: 3D1685894CEDD41753A470211F975D40AE1375F5 + platform-certs: extend/weChat/certs/1281030301/wechatpay_5B5C8A69CC86D1127F6B6AA06AAAF10531EEFE90.pem + private-key: extend/weChat/certs/1281030301/apiclient_key.pem + certificate: extend/weChat/certs/1281030301/apiclient_cert.pem + +# [rabbitMq] +amqp: + host: 127.0.0.1 + port: 5672 + user: gdxz_2022rabbitmq + password: qwr2p&¥e@3.2p + vhost: gdxz_hepa \ No newline at end of file diff --git a/config/amqp.go b/config/amqp.go new file mode 100644 index 0000000..f7d1c63 --- /dev/null +++ b/config/amqp.go @@ -0,0 +1,9 @@ +package config + +type Amqp struct { + Host string `mapstructure:"host" json:"host" yaml:"host"` // 服务器地址 + Port int `mapstructure:"port" json:"port" yaml:"port"` // 服务器端口 + Password string `mapstructure:"password" json:"password" yaml:"password"` // 密码 + User string `mapstructure:"user" json:"user" yaml:"user"` + Vhost string `mapstructure:"vhost" json:"vhost" yaml:"vhost"` +} diff --git a/config/config.go b/config/config.go index 5785c32..4a24ee6 100644 --- a/config/config.go +++ b/config/config.go @@ -13,4 +13,5 @@ type Config struct { Snowflake int64 `mapstructure:"snowflake" json:"snowflake" yaml:"snowflake"` Dysms Dysms `mapstructure:"dysms" json:"dysms" yaml:"dysms"` Wechat Wechat `mapstructure:"wechat" json:"wechat" yaml:"wechat"` + Amqp Amqp `mapstructure:"amqp" json:"amqp" yaml:"amqp"` } diff --git a/config/wechat.go b/config/wechat.go index f13960c..d03f879 100644 --- a/config/wechat.go +++ b/config/wechat.go @@ -6,11 +6,11 @@ type Wechat struct { PayNotifyUrl string `mapstructure:"pay-notify-url" json:"pay-notify-url" yaml:"pay-notify-url"` RefundNotifyDomain string `mapstructure:"refund-notify-domain" json:"refund-notify-domain" yaml:"refund-notify-domain"` // 回调域名 RefundNotifyUrl string `mapstructure:"refund-notify-url" json:"refund-notify-url" yaml:"refund-notify-url"` - Pay1659662936 Pay1659662936 `mapstructure:"pay-1659662936" json:"pay-1659662936" yaml:"pay-1659662936"` + Pay1281030301 Pay1281030301 `mapstructure:"pay-1281030301" json:"pay-1281030301" yaml:"pay-1281030301"` } -// Pay1659662936 成都欣欣相照 -type Pay1659662936 struct { +// Pay1281030301 app +type Pay1281030301 struct { MchId string `mapstructure:"mch-id" json:"mch-id" yaml:"mch-id"` // 商户号 V3ApiSecret string `mapstructure:"v3-api-secret" json:"v3-api-secret" yaml:"v3-api-secret"` // 商户APIv3密钥 MchCertificateSerialNumber string `mapstructure:"mch-certificate-serial-number" json:"mch-certificate-serial-number" yaml:"mch-certificate-serial-number"` // 商户证书序列号 diff --git a/core/cron.go b/core/cron.go index 9b58f86..8ed59bc 100644 --- a/core/cron.go +++ b/core/cron.go @@ -3,16 +3,24 @@ package core import ( "fmt" "github.com/robfig/cron/v3" + "hepa-calc-api/api/crontab" ) func StartCron() { c := cron.New(cron.WithSeconds()) - //_, err := c.AddFunc("0 * * * * *", crontab.HandleQuestionQaExpire) + // 系统优惠卷过期 + //_, err := c.AddFunc("* * * * * *", crontab.SystemCouponExpire) //if err != nil { // panic("定时器启动失败:" + err.Error()) //} + // 用户优惠卷过期 + _, err := c.AddFunc("0 0 0 * * *", crontab.UserCouponExpire) + if err != nil { + panic("定时器启动失败:" + err.Error()) + } + // 启动定时任务调度器 c.Start() diff --git a/core/rabbitMq.go b/core/rabbitMq.go new file mode 100644 index 0000000..cfb5df2 --- /dev/null +++ b/core/rabbitMq.go @@ -0,0 +1,70 @@ +package core + +import ( + "fmt" + "hepa-calc-api/api/amqp/consumer" + "hepa-calc-api/extend/rabbitMq" + "hepa-calc-api/utils" +) + +// StartRabbitMq 启动rabbitmq +func StartRabbitMq() { + err := rabbitMq.NewRabbitMQClient() + if err != nil { + utils.LogJsonErr("启动rabbitmq:", err.Error()) + panic(err.Error()) + } + + // 保持连接 + go rabbitMq.HandleReconnect() + + fmt.Println("初始化rabbitMq成功......") +} + +// StartRabbitMqConsume 启动消费者端 +func StartRabbitMqConsume() { + s := rabbitMq.ConsumeS{ + QueueName: "user.coupon.expired.delay.queue", + ExchangeName: "amqp.delay.direct", + RoutingKey: "UserCouponExpired", + Handler: consumer.UserCouponExpire, + } + + // 用户优惠卷过期 + go startConsumer(s) +} + +// UserCouponExpire 用户优惠卷过期 +//func userCouponExpire() { +// go func() { +// err := rabbitMq.Consume( +// "user.coupon.expired.delay.queue", // 交换机名称 +// "amqp.delay.direct", // 队列名称 +// "UserCouponExpired", // 路由键 +// consumer.UserCouponExpire, // 消息处理函数 +// ) +// if err != nil { +// fmt.Println(err.Error()) +// utils.LogJsonErr("消费用户优惠卷过期队列失败", err) +// } +// }() +//} + +// startConsumer 启动指定的消费者协程 +func startConsumer(s rabbitMq.ConsumeS) { + go func() { + defer func() { + if r := recover(); r != nil { + utils.LogJsonErr(fmt.Sprintf("消费者协程崩溃 - %s", s.QueueName), fmt.Errorf("%v", r)) + // 重新启动客户端 + StartRabbitMq() + } + }() + + err := s.Consume() + if err != nil { + fmt.Println(err.Error()) + utils.LogJsonErr(fmt.Sprintf("启动消费者队列失败 - %s", s.QueueName), err) + } + }() +} diff --git a/extend/rabbitMq/rabbitMq.go b/extend/rabbitMq/rabbitMq.go new file mode 100644 index 0000000..8e9c868 --- /dev/null +++ b/extend/rabbitMq/rabbitMq.go @@ -0,0 +1,284 @@ +package rabbitMq + +import ( + "encoding/json" + "fmt" + amqp "github.com/rabbitmq/amqp091-go" + "hepa-calc-api/config" + "hepa-calc-api/global" + "hepa-calc-api/utils" + "log" + "net/url" + "sync" + "time" +) + +// Client 客户端 +type Client struct { + m *sync.Mutex + queueName string + logger *log.Logger + connection *amqp.Connection + channel *amqp.Channel + done chan bool + notifyConnClose chan *amqp.Error + notifyChanClose chan *amqp.Error + notifyConfirm chan amqp.Confirmation + isReady bool +} + +// ConsumeS 消费端结构体 +type ConsumeS struct { + QueueName string // 队列名称 + ExchangeName string // 交换机名称 + RoutingKey string // 路由键 + Handler func(amqp.Delivery) // 回调处理方法 +} + +// PublishS 生产端结构体 +type PublishS struct { + QueueName string // 队列名称 + ExchangeName string // 交换机名称 + RoutingKey string // 路由键 + Message interface{} // 消息内容 + Delay time.Duration // 延迟时间 +} + +// NewRabbitMQClient 初始化客户端 +func NewRabbitMQClient() error { + var err error + m := config.C.Amqp + + user := url.QueryEscape(m.User) + password := url.QueryEscape(m.Password) + + dsn := fmt.Sprintf("amqp://%s:%s@%s:%d/%s", user, password, m.Host, m.Port, m.Vhost) + + // 连接到 RabbitMQ 服务器 + global.RabbitConn, err = amqp.Dial(dsn) + if err != nil { + return err + } + + // 创建一个通道 + global.RabbitChannel, err = global.RabbitConn.Channel() + if err != nil { + return err + } + + return nil +} + +// Publish 生产 +func Publish(queueName, exchangeName, routingKey string, message interface{}) error { + err := global.RabbitChannel.ExchangeDeclare( + exchangeName, // name + "x-delayed-message", // type + true, // durable + false, // auto-deleted + false, // internal + false, // 阻塞处理 + amqp.Table{"x-delayed-type": "direct"}, // arguments + ) + if err != nil { + return err + } + + _, err = global.RabbitChannel.QueueDeclare( + queueName, // 队列名字 + true, // 消息是否持久化 + false, // 不使用的时候删除队列 + false, // 是否排他 + false, // 是否阻塞 + nil, // arguments + ) + if err != nil { + return err + } + + // 将队列绑定到延迟交换器 + err = global.RabbitChannel.QueueBind( + queueName, // 队列名称 + routingKey, // 路由键 + exchangeName, // 交换器名称 + false, + nil, + ) + if err != nil { + return err + } + + body, err := json.Marshal(message) + if err != nil { + return err + } + + err = global.RabbitChannel.Publish( + exchangeName, // exchange(交换机名字) + routingKey, // + false, // 是否为无法路由的消息进行返回处理 + false, // 是否对路由到无消费者队列的消息进行返回处理 RabbitMQ 3.0 废弃 + amqp.Publishing{ + ContentType: "text/plain", + Body: body, // 消息内容 + }) + if err != nil { + return err + } + + return nil +} + +// PublishWithDelay 生产延迟消息 +func (p PublishS) PublishWithDelay() error { + err := global.RabbitChannel.ExchangeDeclare( + p.ExchangeName, // name + "x-delayed-message", // type + true, // durable + false, // auto-deleted + false, // internal + false, // 阻塞处理 + amqp.Table{"x-delayed-type": "direct"}, // arguments + ) + if err != nil { + return err + } + + _, err = global.RabbitChannel.QueueDeclare( + p.QueueName, // 队列名字 + true, // 消息是否持久化 + false, // 不使用的时候删除队列 + false, // 是否排他 + false, // 是否阻塞 + nil, // arguments + ) + if err != nil { + return err + } + + // 将队列绑定到延迟交换器 + err = global.RabbitChannel.QueueBind( + p.QueueName, // 队列名称 + p.RoutingKey, // 路由键 + p.ExchangeName, // 交换器名称 + false, + nil, + ) + if err != nil { + return err + } + + body, err := json.Marshal(p.Message) + if err != nil { + return err + } + + err = global.RabbitChannel.Publish( + p.ExchangeName, // exchange + p.RoutingKey, // routing key + false, // 是否为无法路由的消息进行返回处理 + false, // 是否对路由到无消费者队列的消息进行返回处理 RabbitMQ 3.0 废弃 + amqp.Publishing{ + ContentType: "text/plain", + Body: body, + Headers: amqp.Table{"x-delay": int32(p.Delay / time.Millisecond)}, + }) + if err != nil { + return err + } + + return nil +} + +// Consume 消费 +func (s ConsumeS) Consume() error { + err := global.RabbitChannel.ExchangeDeclare( + s.ExchangeName, // name + "x-delayed-message", // type + true, // durable + false, // auto-deleted + false, // internal + false, // 阻塞处理 + amqp.Table{"x-delayed-type": "direct"}, // arguments + ) + if err != nil { + return err + } + + queue, err := global.RabbitChannel.QueueDeclare( + s.QueueName, // 队列名称 + true, // 是否持久化 + false, // 是否自动删除队列 + false, // 排他 + false, // 阻塞处理 + nil, // arguments + ) + if err != nil { + return err + } + + // 将队列绑定到延迟交换器 + err = global.RabbitChannel.QueueBind( + s.QueueName, // 队列名称 + s.RoutingKey, // 路由键 + s.ExchangeName, // 交换器名称 + false, + nil, + ) + if err != nil { + return err + } + + msgs, err := global.RabbitChannel.Consume( + queue.Name, // queue + "", // 消费者标签 + false, // 是否自动确认 + false, // 是否独占 + false, // 是否无等待 + false, //其他参数 + nil, // 其他参数 + ) + if err != nil { + return err + } + + // 消费消息 + go func() { + for msg := range msgs { + s.Handler(msg) + } + }() + + return nil +} + +// Close 关闭rabbitMq +func Close() { + if global.RabbitChannel != nil { + err := global.RabbitChannel.Close() + if err != nil { + utils.LogJsonErr("关闭rabbitMq:", err.Error()) + return + } + } + if global.RabbitConn != nil { + err := global.RabbitConn.Close() + if err != nil { + utils.LogJsonErr("关闭rabbitMq:", err.Error()) + return + } + } +} + +// HandleReconnect 处理重新连接 +func HandleReconnect() { + for { + notifyClose := make(chan *amqp.Error) + global.RabbitConn.NotifyClose(notifyClose) + err := <-notifyClose + if err != nil { + time.Sleep(30 * time.Second) + _ = NewRabbitMQClient() + } + } +} diff --git a/extend/weChat/base.go b/extend/weChat/base.go index 8cf036f..98a4b9d 100644 --- a/extend/weChat/base.go +++ b/extend/weChat/base.go @@ -13,10 +13,10 @@ import ( // 创建客户端 func createClient() (*core.Client, error) { - mchId := config.C.Wechat.Pay1659662936.MchId // 商户号 - mchCertificateSerialNumber := config.C.Wechat.Pay1659662936.MchCertificateSerialNumber // 商户证书序列号 - v3ApiSecret := config.C.Wechat.Pay1659662936.V3ApiSecret // 商户APIv3密钥 - privateKeyPath := "extend/weChat/certs/" + config.C.Wechat.Pay1659662936.MchId + "/apiclient_key.pem" // 商户私钥文件地址 + mchId := config.C.Wechat.Pay1281030301.MchId // 商户号 + mchCertificateSerialNumber := config.C.Wechat.Pay1281030301.MchCertificateSerialNumber // 商户证书序列号 + v3ApiSecret := config.C.Wechat.Pay1281030301.V3ApiSecret // 商户APIv3密钥 + privateKeyPath := config.C.Wechat.Pay1281030301.PrivateKey // 商户私钥文件地址 if mchId == "" { return nil, errors.New("商户号错误") diff --git a/extend/weChat/certs/1281030301/apiclient_key.pem b/extend/weChat/certs/1281030301/apiclient_key.pem new file mode 100644 index 0000000..35a62ce --- /dev/null +++ b/extend/weChat/certs/1281030301/apiclient_key.pem @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDR4fiYRUjnLQA+ +TPMslDvQIIyZm4ajWFcSXg6yozhdi/O+vuRhwgyeGXsE4SpvuxkK4HPZaocrsjqe +68Y46DyhudzXIBy50IF2UeN5ilj7ydq5TJbBOi1iOF1lgOUNK6DGiOQ+folVNKyN +sjrsA3RHUKHF/JdOfcpSGzvV79FnmjSll/KsdxEc9LCiAa5t7GYA9Hx7AIDG4i8b +03d5EJXvE8BGUZQItE6VIpHwyySTHtGQkPyI6lqMKt2Jlx0TFHu0GkTEmhuA2Z5C +5gOqYWclQO+X2ORgulL9Q3mNcpfy9uBynI/CJDbSvVTkNfwYB0Ny1knu/+in7C6p +IPJYfZpPAgMBAAECggEAWb5tJPcjSC5W10ziAiLUPJdeZ2Q4OupQOPtc/4eJV367 +V8maMC7gZE3y61A4bBQtjhgRkVrat5V7OW8JkFXFb0XhJ1+EyPNeGDDFureseuWC +EA+uuqrcsw306a0mw+3uzlXEevByWquuSNx4E2katE/HDLiIHjjtZRReDomAGfLw +vLNZ40RgdNSXmwpVRoHOZnvX+C2Hxh4fKsrxs1HRCjFsSpTxKwsK6/Kdudj4hsaO +Z5glqf4qopPlQIIkToxOe5p7ukwo7vqaqogixx3jOjruOMpzxZ24wU5AxneLcOYr +ZC1659Uwv96TsKpueakWw0lq3TXkv9Qs+wDNI+NC6QKBgQDx4hWXplLYPYwgGgxG +L9Pk+If9aDS9wj78JUaYBXKR5atXmf/62ufahW4IEDfmMugC1bm9MoYC7CmHv1nk +nxckB84B20NOSiaxbkOaJuMHDtnNeXCmL8Gvtb8fTni1AM/g6XEleEw1r40LNZAq +kdyQ1+OUFhRQGOvAe1SRr1rdGwKBgQDeIcauwMPkc3Seh6Fb66FQ4Lgmt1GvvuiG +UJiwgwPFbEaKKczBHBL6hlKoAFweFa2Qw2xJ0H8jwb8RnkkNcAKYUyMc2wi85+Ih +d568pvXMdYl4DGeGdd/sXKf4dGDjcb2AHzNfDuklbCsAojtO3pLWOs5U/RSKe8ps +hhWa8nPO3QKBgQCIxuKU3X1tP+hz4qbcLYFxscQcXIeuYiABrwZrQnFV5Pxtzex9 +Krn+zIK61oj1iAXATKD6Ro6XKnoVg/POHtQUEMHCNP2rUKzumj5p9eFdBV3OHgTA +RLMOrARGLLZ/C9WBBiBwIsVdekaUdxZtrAuAcEQFYjLcVCtDrbnVo8YKzwKBgEsg +V0cBMP+RwM5hBsTE45Er/3wwofLzeUb7+Tgxh1P887qEuphRO2X5ifkB7iXKpSIB +xh0M5AMe4tU9mG1wBaCo9YYr2j+xmTxCbbBWM2mMEwtD/rtuIGabS7/u9FnYPQQZ +CVHMBDRA6iZTuAVLp5PG3cPGuGzBw0uC6cm22E4NAoGAKrbg3nFPKtzBJZ8Z0iNH +G2gNotmHXbd4+gs4e3Iz9xsEabm4wCoEibNojNdMkX8zX267ebgfvesGEobROjW9 +Intvh39xi3aQ2Q5gvXzKdY0lDzgVvQ7udmLQ4dCUyYnpLr7Yac0asRxLge2esCWV +x2uW2YTiUW3zsbHN/N/vJ5I= +-----END PRIVATE KEY----- diff --git a/extend/weChat/parseNotify.go b/extend/weChat/parseNotify.go index dc8b08b..9152cbc 100644 --- a/extend/weChat/parseNotify.go +++ b/extend/weChat/parseNotify.go @@ -14,10 +14,10 @@ import ( // ParseNotify 回调通知的验签与解密 func ParseNotify(c *gin.Context) (notifyReq *notify.Request, t *payments.Transaction, err error) { - mchId := config.C.Wechat.Pay1659662936.MchId // 商户号 - mchCertificateSerialNumber := config.C.Wechat.Pay1659662936.MchCertificateSerialNumber // 商户证书序列号 - v3ApiSecret := config.C.Wechat.Pay1659662936.V3ApiSecret // 商户APIv3密钥 - privateKeyPath := "extend/weChat/certs/" + config.C.Wechat.Pay1659662936.MchId + "/apiclient_key.pem" // 商户私钥文件地址 + mchId := config.C.Wechat.Pay1281030301.MchId // 商户号 + mchCertificateSerialNumber := config.C.Wechat.Pay1281030301.MchCertificateSerialNumber // 商户证书序列号 + v3ApiSecret := config.C.Wechat.Pay1281030301.V3ApiSecret // 商户APIv3密钥 + privateKeyPath := "extend/weChat/certs/" + config.C.Wechat.Pay1281030301.MchId + "/apiclient_key.pem" // 商户私钥文件地址 // 使用 utils 提供的函数从本地文件中加载商户私钥,商户私钥会用来生成请求的签名 mchPrivateKey, err := utils.LoadPrivateKeyWithPath(privateKeyPath) diff --git a/global/global.go b/global/global.go index c026d8c..ac68306 100644 --- a/global/global.go +++ b/global/global.go @@ -5,17 +5,20 @@ import ( ut "github.com/go-playground/universal-translator" "github.com/go-playground/validator/v10" "github.com/go-redis/redis/v8" + "github.com/rabbitmq/amqp091-go" "github.com/sirupsen/logrus" "gorm.io/gorm" ) // 全局变量 var ( - Db *gorm.DB // 数据库 - Logger *logrus.Logger // 日志 - Redis *redis.Client // redis - Validate *validator.Validate // 验证器 - Trans ut.Translator // Validate/v10 全局验证器 - Snowflake *snowflake.Node // 雪花算法 - UserId int64 // 用户id + Db *gorm.DB // 数据库 + Logger *logrus.Logger // 日志 + Redis *redis.Client // redis + Validate *validator.Validate // 验证器 + Trans ut.Translator // Validate/v10 全局验证器 + Snowflake *snowflake.Node // 雪花算法 + UserId int64 // 用户id + RabbitConn *amqp091.Connection // rabbitmq + RabbitChannel *amqp091.Channel // rabbitmq ) diff --git a/go.mod b/go.mod index 333af8d..5dfb2a5 100644 --- a/go.mod +++ b/go.mod @@ -70,6 +70,7 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 // indirect github.com/pelletier/go-toml/v2 v2.2.2 // indirect + github.com/rabbitmq/amqp091-go v1.10.0 // indirect github.com/richardlehane/mscfb v1.0.4 // indirect github.com/richardlehane/msoleps v1.0.3 // indirect github.com/sagikazarmark/locafero v0.4.0 // indirect diff --git a/go.sum b/go.sum index 6edd94e..b1dbbc3 100644 --- a/go.sum +++ b/go.sum @@ -164,6 +164,8 @@ github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= +github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= github.com/richardlehane/mscfb v1.0.4 h1:WULscsljNPConisD5hR0+OyZjwK46Pfyr6mPu5ZawpM= github.com/richardlehane/mscfb v1.0.4/go.mod h1:YzVpcZg9czvAuhk9T+a3avCpcFPMUWm7gK3DypaEsUk= github.com/richardlehane/msoleps v1.0.1/go.mod h1:BWev5JBpU9Ko2WAgmZEuiz4/u3ZYTKbjLycmwiWUfWg= diff --git a/main.go b/main.go index 67a55d3..6ec7bc1 100644 --- a/main.go +++ b/main.go @@ -7,6 +7,7 @@ import ( "hepa-calc-api/api/router" "hepa-calc-api/config" "hepa-calc-api/core" + "hepa-calc-api/extend/rabbitMq" "net/http" "strconv" ) @@ -30,7 +31,16 @@ func main() { // 加载雪花算法 core.Snowflake() - // 加载定时器 + // 确保在应用关闭时关闭 RabbitMQ 连接和通道 + defer rabbitMq.Close() + + // 启动rabbitmq + core.StartRabbitMq() + + // 启动rabbitmq消费者端 + core.StartRabbitMqConsume() + + // 加载定时器-定时器需要在队列后方,不然可能会出现生产失败的问题 core.StartCron() // 初始化路由-加载中间件