285 lines
6.6 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package rabbitMq
import (
"encoding/json"
"fmt"
amqp "github.com/rabbitmq/amqp091-go"
"hepa-calc-api/config"
"hepa-calc-api/global"
"hepa-calc-api/utils"
"log"
"net/url"
"sync"
"time"
)
// Client 客户端
type Client struct {
m *sync.Mutex
queueName string
logger *log.Logger
connection *amqp.Connection
channel *amqp.Channel
done chan bool
notifyConnClose chan *amqp.Error
notifyChanClose chan *amqp.Error
notifyConfirm chan amqp.Confirmation
isReady bool
}
// ConsumeS 消费端结构体
type ConsumeS struct {
QueueName string // 队列名称
ExchangeName string // 交换机名称
RoutingKey string // 路由键
Handler func(amqp.Delivery) // 回调处理方法
}
// PublishS 生产端结构体
type PublishS struct {
QueueName string // 队列名称
ExchangeName string // 交换机名称
RoutingKey string // 路由键
Message interface{} // 消息内容
Delay time.Duration // 延迟时间
}
// NewRabbitMQClient 初始化客户端
func NewRabbitMQClient() error {
var err error
m := config.C.Amqp
user := url.QueryEscape(m.User)
password := url.QueryEscape(m.Password)
dsn := fmt.Sprintf("amqp://%s:%s@%s:%d/%s", user, password, m.Host, m.Port, m.Vhost)
// 连接到 RabbitMQ 服务器
global.RabbitConn, err = amqp.Dial(dsn)
if err != nil {
return err
}
// 创建一个通道
global.RabbitChannel, err = global.RabbitConn.Channel()
if err != nil {
return err
}
return nil
}
// Publish 生产
func Publish(queueName, exchangeName, routingKey string, message interface{}) error {
err := global.RabbitChannel.ExchangeDeclare(
exchangeName, // name
"x-delayed-message", // type
true, // durable
false, // auto-deleted
false, // internal
false, // 阻塞处理
amqp.Table{"x-delayed-type": "direct"}, // arguments
)
if err != nil {
return err
}
_, err = global.RabbitChannel.QueueDeclare(
queueName, // 队列名字
true, // 消息是否持久化
false, // 不使用的时候删除队列
false, // 是否排他
false, // 是否阻塞
nil, // arguments
)
if err != nil {
return err
}
// 将队列绑定到延迟交换器
err = global.RabbitChannel.QueueBind(
queueName, // 队列名称
routingKey, // 路由键
exchangeName, // 交换器名称
false,
nil,
)
if err != nil {
return err
}
body, err := json.Marshal(message)
if err != nil {
return err
}
err = global.RabbitChannel.Publish(
exchangeName, // exchange交换机名字
routingKey, //
false, // 是否为无法路由的消息进行返回处理
false, // 是否对路由到无消费者队列的消息进行返回处理 RabbitMQ 3.0 废弃
amqp.Publishing{
ContentType: "text/plain",
Body: body, // 消息内容
})
if err != nil {
return err
}
return nil
}
// PublishWithDelay 生产延迟消息
func (p PublishS) PublishWithDelay() error {
err := global.RabbitChannel.ExchangeDeclare(
p.ExchangeName, // name
"x-delayed-message", // type
true, // durable
false, // auto-deleted
false, // internal
false, // 阻塞处理
amqp.Table{"x-delayed-type": "direct"}, // arguments
)
if err != nil {
return err
}
_, err = global.RabbitChannel.QueueDeclare(
p.QueueName, // 队列名字
true, // 消息是否持久化
false, // 不使用的时候删除队列
false, // 是否排他
false, // 是否阻塞
nil, // arguments
)
if err != nil {
return err
}
// 将队列绑定到延迟交换器
err = global.RabbitChannel.QueueBind(
p.QueueName, // 队列名称
p.RoutingKey, // 路由键
p.ExchangeName, // 交换器名称
false,
nil,
)
if err != nil {
return err
}
body, err := json.Marshal(p.Message)
if err != nil {
return err
}
err = global.RabbitChannel.Publish(
p.ExchangeName, // exchange
p.RoutingKey, // routing key
false, // 是否为无法路由的消息进行返回处理
false, // 是否对路由到无消费者队列的消息进行返回处理 RabbitMQ 3.0 废弃
amqp.Publishing{
ContentType: "text/plain",
Body: body,
Headers: amqp.Table{"x-delay": int32(p.Delay / time.Millisecond)},
})
if err != nil {
return err
}
return nil
}
// Consume 消费
func (s ConsumeS) Consume() error {
err := global.RabbitChannel.ExchangeDeclare(
s.ExchangeName, // name
"x-delayed-message", // type
true, // durable
false, // auto-deleted
false, // internal
false, // 阻塞处理
amqp.Table{"x-delayed-type": "direct"}, // arguments
)
if err != nil {
return err
}
queue, err := global.RabbitChannel.QueueDeclare(
s.QueueName, // 队列名称
true, // 是否持久化
false, // 是否自动删除队列
false, // 排他
false, // 阻塞处理
nil, // arguments
)
if err != nil {
return err
}
// 将队列绑定到延迟交换器
err = global.RabbitChannel.QueueBind(
s.QueueName, // 队列名称
s.RoutingKey, // 路由键
s.ExchangeName, // 交换器名称
false,
nil,
)
if err != nil {
return err
}
msgs, err := global.RabbitChannel.Consume(
queue.Name, // queue
"", // 消费者标签
false, // 是否自动确认
false, // 是否独占
false, // 是否无等待
false, //其他参数
nil, // 其他参数
)
if err != nil {
return err
}
// 消费消息
go func() {
for msg := range msgs {
s.Handler(msg)
}
}()
return nil
}
// Close 关闭rabbitMq
func Close() {
if global.RabbitChannel != nil {
err := global.RabbitChannel.Close()
if err != nil {
utils.LogJsonErr("关闭rabbitMq:", err.Error())
return
}
}
if global.RabbitConn != nil {
err := global.RabbitConn.Close()
if err != nil {
utils.LogJsonErr("关闭rabbitMq:", err.Error())
return
}
}
}
// HandleReconnect 处理重新连接
func HandleReconnect() {
for {
notifyClose := make(chan *amqp.Error)
global.RabbitConn.NotifyClose(notifyClose)
err := <-notifyClose
if err != nil {
time.Sleep(30 * time.Second)
_ = NewRabbitMQClient()
}
}
}