修改了amqp的使用

This commit is contained in:
wucongxing8150 2024-05-29 14:49:24 +08:00
parent a04e59c3de
commit e425546627
6 changed files with 185 additions and 57 deletions

View File

@ -0,0 +1 @@
package service

View File

@ -8,6 +8,7 @@ import (
dtoV1 "hospital-open-api/api/dto/v1"
"hospital-open-api/api/model"
requestsV1 "hospital-open-api/api/requests/v1"
"hospital-open-api/extend/rabbitMq"
"hospital-open-api/global"
"strconv"
"time"
@ -55,6 +56,10 @@ func (r *CouponService) ReceiveCoupon(req requestsV1.ReceiveCoupon) (g *dtoV1.Re
}
}()
// 建立队列连接
rabbitMQ, err := rabbitMq.NewRabbitMQClient()
defer rabbitMQ.Close()
// 获取优惠卷数据
couponDao := dao.CouponDao{}
userCouponDao := dao.UserCouponDao{}
@ -164,12 +169,12 @@ func (r *CouponService) ReceiveCoupon(req requestsV1.ReceiveCoupon) (g *dtoV1.Re
}
// 有效类型1:绝对时效xxx-xxx时间段有效 2:相对时效 n天内有效
if coupon.CouponType == 1 {
if coupon.ValidType == 1 {
UserCouponModel.ValidStartTime = time.Time(coupon.ValidStartTime)
UserCouponModel.ValidEndTime = time.Time(coupon.ValidEndTime)
}
if coupon.CouponType == 2 {
if coupon.ValidType == 2 {
UserCouponModel.ValidStartTime = now
UserCouponModel.ValidEndTime = now.AddDate(0, 0, coupon.ValidDays)
}
@ -228,8 +233,22 @@ func (r *CouponService) ReceiveCoupon(req requestsV1.ReceiveCoupon) (g *dtoV1.Re
year, month, day := now.Date()
location := now.Location()
endOfDay := time.Date(year, month, day, 23, 59, 59, 0, location)
if userCoupon.ValidEndTime.After(endOfDay) {
if userCoupon.ValidEndTime.Before(endOfDay) {
// 需添加队列
data := make(map[string]interface{})
data["user_coupon_id"] = fmt.Sprintf("%d", userCoupon.UserCouponId)
delay := userCoupon.ValidEndTime.Sub(time.Now())
if delay < 10 {
delay = 10 * time.Second
}
err = rabbitMQ.PublishWithDelay("amqp.delay.direct", "UserCouponExpired", data, delay)
if err != nil {
tx.Rollback()
return nil, err
}
}
}

View File

@ -1,42 +0,0 @@
package core
import (
"fmt"
"github.com/streadway/amqp"
"hospital-open-api/config"
"hospital-open-api/global"
"net/url"
)
func Amqp() {
m := config.C.Amqp
user := url.QueryEscape(m.User)
password := url.QueryEscape(m.Password)
dsn := fmt.Sprintf("amqp://%s:%s@%s:%d/%s", user,
password, m.Host, m.Port, m.Vhost)
conn, err := amqp.Dial(dsn)
if err != nil {
panic("rabbitMq初始化失败! " + err.Error())
}
defer func(conn *amqp.Connection) {
_ = conn.Close()
}(conn)
ch, err := conn.Channel()
if err != nil {
panic("rabbitMq初始化失败! " + err.Error())
}
defer func(ch *amqp.Channel) {
_ = ch.Close()
}(ch)
global.AmqpConn = conn
global.AmqpChannel = ch
fmt.Println("初始化rabbitMq成功......")
}

156
extend/rabbitMq/rabbitMq.go Normal file
View File

@ -0,0 +1,156 @@
package rabbitMq
import (
"encoding/json"
"fmt"
"github.com/streadway/amqp"
"hospital-open-api/config"
"net/url"
"time"
)
type RabbitMQ struct {
conn *amqp.Connection
channel *amqp.Channel
}
// NewRabbitMQClient 初始化客户端
func NewRabbitMQClient() (*RabbitMQ, error) {
m := config.C.Amqp
user := url.QueryEscape(m.User)
password := url.QueryEscape(m.Password)
dsn := fmt.Sprintf("amqp://%s:%s@%s:%d/%s", user, password, m.Host, m.Port, m.Vhost)
conn, err := amqp.Dial(dsn)
if err != nil {
return nil, err
}
ch, err := conn.Channel()
if err != nil {
return nil, err
}
return &RabbitMQ{
conn: conn,
channel: ch,
}, nil
}
// Publish 生产
func (r *RabbitMQ) Publish(queueName, exchangeName string, message interface{}) error {
q, err := r.channel.QueueDeclare(
queueName, // 队列名字
false, // 消息是否持久化
false, // 不使用的时候删除队列
false, // 排他
false, // 是否等待服务器确认
nil, // arguments
)
if err != nil {
return err
}
// Serialize the message
body, err := json.Marshal(message)
if err != nil {
return err
}
err = r.channel.Publish(
exchangeName, // exchange交换机名字
q.Name, //
false, // 是否为无法路由的消息进行返回处理
false, // 是否对路由到无消费者队列的消息进行返回处理 RabbitMQ 3.0 废弃
amqp.Publishing{
ContentType: "text/plain",
Body: body, // 消息内容
})
if err != nil {
return err
}
return nil
}
func (r *RabbitMQ) PublishWithDelay(exchangeName, routingKey string, message interface{}, delay time.Duration) error {
err := r.channel.ExchangeDeclare(
exchangeName, // name
"x-delayed-message", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
amqp.Table{"x-delayed-type": "direct"}, // arguments
)
if err != nil {
return err
}
body, err := json.Marshal(message)
if err != nil {
return err
}
err = r.channel.Publish(
exchangeName, // exchange
routingKey, // routing key
false, // 是否为无法路由的消息进行返回处理
false, // 是否对路由到无消费者队列的消息进行返回处理 RabbitMQ 3.0 废弃
amqp.Publishing{
ContentType: "text/plain",
Body: body,
Headers: amqp.Table{"x-delay": int32(delay / time.Millisecond)},
})
if err != nil {
return err
}
return nil
}
// Consume 消费
func (r *RabbitMQ) Consume(queueName, exchangeName string) (<-chan amqp.Delivery, error) {
q, err := r.channel.QueueDeclare(
queueName, // 队列名称
false, // 消息者名称
false, // 是否确认消费
false, // 排他
false, // no-wait
nil, // arguments
)
if err != nil {
return nil, err
}
msgs, err := r.channel.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
return nil, err
}
return msgs, nil
}
func (r *RabbitMQ) Close() {
if r.channel != nil {
err := r.channel.Close()
if err != nil {
return
}
}
if r.conn != nil {
err := r.conn.Close()
if err != nil {
return
}
}
}

View File

@ -6,18 +6,15 @@ import (
"github.com/go-playground/validator/v10"
"github.com/go-redis/redis/v8"
"github.com/sirupsen/logrus"
"github.com/streadway/amqp"
"gorm.io/gorm"
)
// 全局变量
var (
Db *gorm.DB // 数据库
Logger *logrus.Logger // 日志
Redis *redis.Client // redis
Validate *validator.Validate // 验证器
Trans ut.Translator // Validate/v10 全局验证器
Snowflake *snowflake.Node // 雪花算法
AmqpConn *amqp.Connection // rabbitMq队列
AmqpChannel *amqp.Channel // rabbitMq队列
Db *gorm.DB // 数据库
Logger *logrus.Logger // 日志
Redis *redis.Client // redis
Validate *validator.Validate // 验证器
Trans ut.Translator // Validate/v10 全局验证器
Snowflake *snowflake.Node // 雪花算法
)

View File

@ -30,9 +30,6 @@ func main() {
// 加载雪花算法
core.Snowflake()
// 加载队列
core.Amqp()
// 初始化路由-加载中间件
r := router.Init()