285 lines
6.6 KiB
Go
285 lines
6.6 KiB
Go
package rabbitMq
|
||
|
||
import (
|
||
"encoding/json"
|
||
"fmt"
|
||
amqp "github.com/rabbitmq/amqp091-go"
|
||
"hepa-calc-api/config"
|
||
"hepa-calc-api/global"
|
||
"hepa-calc-api/utils"
|
||
"log"
|
||
"net/url"
|
||
"sync"
|
||
"time"
|
||
)
|
||
|
||
// Client 客户端
|
||
type Client struct {
|
||
m *sync.Mutex
|
||
queueName string
|
||
logger *log.Logger
|
||
connection *amqp.Connection
|
||
channel *amqp.Channel
|
||
done chan bool
|
||
notifyConnClose chan *amqp.Error
|
||
notifyChanClose chan *amqp.Error
|
||
notifyConfirm chan amqp.Confirmation
|
||
isReady bool
|
||
}
|
||
|
||
// ConsumeS 消费端结构体
|
||
type ConsumeS struct {
|
||
QueueName string // 队列名称
|
||
ExchangeName string // 交换机名称
|
||
RoutingKey string // 路由键
|
||
Handler func(amqp.Delivery) // 回调处理方法
|
||
}
|
||
|
||
// PublishS 生产端结构体
|
||
type PublishS struct {
|
||
QueueName string // 队列名称
|
||
ExchangeName string // 交换机名称
|
||
RoutingKey string // 路由键
|
||
Message interface{} // 消息内容
|
||
Delay time.Duration // 延迟时间
|
||
}
|
||
|
||
// NewRabbitMQClient 初始化客户端
|
||
func NewRabbitMQClient() error {
|
||
var err error
|
||
m := config.C.Amqp
|
||
|
||
user := url.QueryEscape(m.User)
|
||
password := url.QueryEscape(m.Password)
|
||
|
||
dsn := fmt.Sprintf("amqp://%s:%s@%s:%d/%s", user, password, m.Host, m.Port, m.Vhost)
|
||
|
||
// 连接到 RabbitMQ 服务器
|
||
global.RabbitConn, err = amqp.Dial(dsn)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// 创建一个通道
|
||
global.RabbitChannel, err = global.RabbitConn.Channel()
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// Publish 生产
|
||
func Publish(queueName, exchangeName, routingKey string, message interface{}) error {
|
||
err := global.RabbitChannel.ExchangeDeclare(
|
||
exchangeName, // name
|
||
"x-delayed-message", // type
|
||
true, // durable
|
||
false, // auto-deleted
|
||
false, // internal
|
||
false, // 阻塞处理
|
||
amqp.Table{"x-delayed-type": "direct"}, // arguments
|
||
)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
_, err = global.RabbitChannel.QueueDeclare(
|
||
queueName, // 队列名字
|
||
true, // 消息是否持久化
|
||
false, // 不使用的时候删除队列
|
||
false, // 是否排他
|
||
false, // 是否阻塞
|
||
nil, // arguments
|
||
)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// 将队列绑定到延迟交换器
|
||
err = global.RabbitChannel.QueueBind(
|
||
queueName, // 队列名称
|
||
routingKey, // 路由键
|
||
exchangeName, // 交换器名称
|
||
false,
|
||
nil,
|
||
)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
body, err := json.Marshal(message)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
err = global.RabbitChannel.Publish(
|
||
exchangeName, // exchange(交换机名字)
|
||
routingKey, //
|
||
false, // 是否为无法路由的消息进行返回处理
|
||
false, // 是否对路由到无消费者队列的消息进行返回处理 RabbitMQ 3.0 废弃
|
||
amqp.Publishing{
|
||
ContentType: "text/plain",
|
||
Body: body, // 消息内容
|
||
})
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// PublishWithDelay 生产延迟消息
|
||
func (p PublishS) PublishWithDelay() error {
|
||
err := global.RabbitChannel.ExchangeDeclare(
|
||
p.ExchangeName, // name
|
||
"x-delayed-message", // type
|
||
true, // durable
|
||
false, // auto-deleted
|
||
false, // internal
|
||
false, // 阻塞处理
|
||
amqp.Table{"x-delayed-type": "direct"}, // arguments
|
||
)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
_, err = global.RabbitChannel.QueueDeclare(
|
||
p.QueueName, // 队列名字
|
||
true, // 消息是否持久化
|
||
false, // 不使用的时候删除队列
|
||
false, // 是否排他
|
||
false, // 是否阻塞
|
||
nil, // arguments
|
||
)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// 将队列绑定到延迟交换器
|
||
err = global.RabbitChannel.QueueBind(
|
||
p.QueueName, // 队列名称
|
||
p.RoutingKey, // 路由键
|
||
p.ExchangeName, // 交换器名称
|
||
false,
|
||
nil,
|
||
)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
body, err := json.Marshal(p.Message)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
err = global.RabbitChannel.Publish(
|
||
p.ExchangeName, // exchange
|
||
p.RoutingKey, // routing key
|
||
false, // 是否为无法路由的消息进行返回处理
|
||
false, // 是否对路由到无消费者队列的消息进行返回处理 RabbitMQ 3.0 废弃
|
||
amqp.Publishing{
|
||
ContentType: "text/plain",
|
||
Body: body,
|
||
Headers: amqp.Table{"x-delay": int32(p.Delay / time.Millisecond)},
|
||
})
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// Consume 消费
|
||
func (s ConsumeS) Consume() error {
|
||
err := global.RabbitChannel.ExchangeDeclare(
|
||
s.ExchangeName, // name
|
||
"x-delayed-message", // type
|
||
true, // durable
|
||
false, // auto-deleted
|
||
false, // internal
|
||
false, // 阻塞处理
|
||
amqp.Table{"x-delayed-type": "direct"}, // arguments
|
||
)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
queue, err := global.RabbitChannel.QueueDeclare(
|
||
s.QueueName, // 队列名称
|
||
true, // 是否持久化
|
||
false, // 是否自动删除队列
|
||
false, // 排他
|
||
false, // 阻塞处理
|
||
nil, // arguments
|
||
)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// 将队列绑定到延迟交换器
|
||
err = global.RabbitChannel.QueueBind(
|
||
s.QueueName, // 队列名称
|
||
s.RoutingKey, // 路由键
|
||
s.ExchangeName, // 交换器名称
|
||
false,
|
||
nil,
|
||
)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
msgs, err := global.RabbitChannel.Consume(
|
||
queue.Name, // queue
|
||
"", // 消费者标签
|
||
false, // 是否自动确认
|
||
false, // 是否独占
|
||
false, // 是否无等待
|
||
false, //其他参数
|
||
nil, // 其他参数
|
||
)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// 消费消息
|
||
go func() {
|
||
for msg := range msgs {
|
||
s.Handler(msg)
|
||
}
|
||
}()
|
||
|
||
return nil
|
||
}
|
||
|
||
// Close 关闭rabbitMq
|
||
func Close() {
|
||
if global.RabbitChannel != nil {
|
||
err := global.RabbitChannel.Close()
|
||
if err != nil {
|
||
utils.LogJsonErr("关闭rabbitMq:", err.Error())
|
||
return
|
||
}
|
||
}
|
||
if global.RabbitConn != nil {
|
||
err := global.RabbitConn.Close()
|
||
if err != nil {
|
||
utils.LogJsonErr("关闭rabbitMq:", err.Error())
|
||
return
|
||
}
|
||
}
|
||
}
|
||
|
||
// HandleReconnect 处理重新连接
|
||
func HandleReconnect() {
|
||
for {
|
||
notifyClose := make(chan *amqp.Error)
|
||
global.RabbitConn.NotifyClose(notifyClose)
|
||
err := <-notifyClose
|
||
if err != nil {
|
||
time.Sleep(30 * time.Second)
|
||
_ = NewRabbitMQClient()
|
||
}
|
||
}
|
||
}
|