2024-05-29 15:54:39 +08:00

156 lines
3.3 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package rabbitMq
import (
"encoding/json"
"fmt"
"github.com/streadway/amqp"
"hospital-admin-api/config"
"net/url"
"time"
)
type RabbitMQ struct {
conn *amqp.Connection
channel *amqp.Channel
}
// NewRabbitMQClient 初始化客户端
func NewRabbitMQClient() (*RabbitMQ, error) {
m := config.C.Amqp
user := url.QueryEscape(m.User)
password := url.QueryEscape(m.Password)
dsn := fmt.Sprintf("amqp://%s:%s@%s:%d/%s", user, password, m.Host, m.Port, m.Vhost)
conn, err := amqp.Dial(dsn)
if err != nil {
return nil, err
}
ch, err := conn.Channel()
if err != nil {
return nil, err
}
return &RabbitMQ{
conn: conn,
channel: ch,
}, nil
}
// Publish 生产
func (r *RabbitMQ) Publish(queueName, exchangeName, routingKey string, message interface{}) error {
_, err := r.channel.QueueDeclare(
queueName, // 队列名字
true, // 消息是否持久化
false, // 不使用的时候删除队列
false, // 是否排他
false, // 是否阻塞
nil, // arguments
)
if err != nil {
return err
}
body, err := json.Marshal(message)
if err != nil {
return err
}
err = r.channel.Publish(
exchangeName, // exchange交换机名字
routingKey, //
false, // 是否为无法路由的消息进行返回处理
false, // 是否对路由到无消费者队列的消息进行返回处理 RabbitMQ 3.0 废弃
amqp.Publishing{
ContentType: "text/plain",
Body: body, // 消息内容
})
if err != nil {
return err
}
return nil
}
func (r *RabbitMQ) PublishWithDelay(queueName, exchangeName, routingKey string, message interface{}, delay time.Duration) error {
err := r.channel.ExchangeDeclare(
queueName, // name
"x-delayed-message", // type
true, // durable
false, // auto-deleted
false, // internal
false, // 阻塞处理
amqp.Table{"x-delayed-type": "direct"}, // arguments
)
if err != nil {
return err
}
body, err := json.Marshal(message)
if err != nil {
return err
}
err = r.channel.Publish(
exchangeName, // exchange
routingKey, // routing key
false, // 是否为无法路由的消息进行返回处理
false, // 是否对路由到无消费者队列的消息进行返回处理 RabbitMQ 3.0 废弃
amqp.Publishing{
ContentType: "text/plain",
Body: body,
Headers: amqp.Table{"x-delay": int32(delay / time.Millisecond)},
})
if err != nil {
return err
}
return nil
}
// Consume 消费
func (r *RabbitMQ) Consume(queueName string) (<-chan amqp.Delivery, error) {
q, err := r.channel.QueueDeclare(
queueName, // 队列名称
false, // 是否持久化
false, // 是否自动删除队列
false, // 排他
false, // 阻塞处理
nil, // arguments
)
if err != nil {
return nil, err
}
msgs, err := r.channel.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
return nil, err
}
return msgs, nil
}
func (r *RabbitMQ) Close() {
if r.channel != nil {
err := r.channel.Close()
if err != nil {
return
}
}
if r.conn != nil {
err := r.conn.Close()
if err != nil {
return
}
}
}