package rabbitMq import ( "encoding/json" "fmt" amqp "github.com/rabbitmq/amqp091-go" "hepa-calc-api/config" "hepa-calc-api/global" "hepa-calc-api/utils" "log" "net/url" "sync" "time" ) // Client 客户端 type Client struct { m *sync.Mutex queueName string logger *log.Logger connection *amqp.Connection channel *amqp.Channel done chan bool notifyConnClose chan *amqp.Error notifyChanClose chan *amqp.Error notifyConfirm chan amqp.Confirmation isReady bool } // ConsumeS 消费端结构体 type ConsumeS struct { QueueName string // 队列名称 ExchangeName string // 交换机名称 RoutingKey string // 路由键 Handler func(amqp.Delivery) // 回调处理方法 } // PublishS 生产端结构体 type PublishS struct { QueueName string // 队列名称 ExchangeName string // 交换机名称 RoutingKey string // 路由键 Message interface{} // 消息内容 Delay time.Duration // 延迟时间 } // NewRabbitMQClient 初始化客户端 func NewRabbitMQClient() error { var err error m := config.C.Amqp user := url.QueryEscape(m.User) password := url.QueryEscape(m.Password) dsn := fmt.Sprintf("amqp://%s:%s@%s:%d/%s", user, password, m.Host, m.Port, m.Vhost) // 连接到 RabbitMQ 服务器 global.RabbitConn, err = amqp.Dial(dsn) if err != nil { return err } // 创建一个通道 global.RabbitChannel, err = global.RabbitConn.Channel() if err != nil { return err } return nil } // Publish 生产 func Publish(queueName, exchangeName, routingKey string, message interface{}) error { err := global.RabbitChannel.ExchangeDeclare( exchangeName, // name "x-delayed-message", // type true, // durable false, // auto-deleted false, // internal false, // 阻塞处理 amqp.Table{"x-delayed-type": "direct"}, // arguments ) if err != nil { return err } _, err = global.RabbitChannel.QueueDeclare( queueName, // 队列名字 true, // 消息是否持久化 false, // 不使用的时候删除队列 false, // 是否排他 false, // 是否阻塞 nil, // arguments ) if err != nil { return err } // 将队列绑定到延迟交换器 err = global.RabbitChannel.QueueBind( queueName, // 队列名称 routingKey, // 路由键 exchangeName, // 交换器名称 false, nil, ) if err != nil { return err } body, err := json.Marshal(message) if err != nil { return err } err = global.RabbitChannel.Publish( exchangeName, // exchange(交换机名字) routingKey, // false, // 是否为无法路由的消息进行返回处理 false, // 是否对路由到无消费者队列的消息进行返回处理 RabbitMQ 3.0 废弃 amqp.Publishing{ ContentType: "text/plain", Body: body, // 消息内容 }) if err != nil { return err } return nil } // PublishWithDelay 生产延迟消息 func (p PublishS) PublishWithDelay() error { err := global.RabbitChannel.ExchangeDeclare( p.ExchangeName, // name "x-delayed-message", // type true, // durable false, // auto-deleted false, // internal false, // 阻塞处理 amqp.Table{"x-delayed-type": "direct"}, // arguments ) if err != nil { return err } _, err = global.RabbitChannel.QueueDeclare( p.QueueName, // 队列名字 true, // 消息是否持久化 false, // 不使用的时候删除队列 false, // 是否排他 false, // 是否阻塞 nil, // arguments ) if err != nil { return err } // 将队列绑定到延迟交换器 err = global.RabbitChannel.QueueBind( p.QueueName, // 队列名称 p.RoutingKey, // 路由键 p.ExchangeName, // 交换器名称 false, nil, ) if err != nil { return err } body, err := json.Marshal(p.Message) if err != nil { return err } err = global.RabbitChannel.Publish( p.ExchangeName, // exchange p.RoutingKey, // routing key false, // 是否为无法路由的消息进行返回处理 false, // 是否对路由到无消费者队列的消息进行返回处理 RabbitMQ 3.0 废弃 amqp.Publishing{ ContentType: "text/plain", Body: body, Headers: amqp.Table{"x-delay": int32(p.Delay / time.Millisecond)}, }) if err != nil { return err } return nil } // Consume 消费 func (s ConsumeS) Consume() error { err := global.RabbitChannel.ExchangeDeclare( s.ExchangeName, // name "x-delayed-message", // type true, // durable false, // auto-deleted false, // internal false, // 阻塞处理 amqp.Table{"x-delayed-type": "direct"}, // arguments ) if err != nil { return err } queue, err := global.RabbitChannel.QueueDeclare( s.QueueName, // 队列名称 true, // 是否持久化 false, // 是否自动删除队列 false, // 排他 false, // 阻塞处理 nil, // arguments ) if err != nil { return err } // 将队列绑定到延迟交换器 err = global.RabbitChannel.QueueBind( s.QueueName, // 队列名称 s.RoutingKey, // 路由键 s.ExchangeName, // 交换器名称 false, nil, ) if err != nil { return err } msgs, err := global.RabbitChannel.Consume( queue.Name, // queue "", // 消费者标签 false, // 是否自动确认 false, // 是否独占 false, // 是否无等待 false, //其他参数 nil, // 其他参数 ) if err != nil { return err } // 消费消息 go func() { for msg := range msgs { s.Handler(msg) } }() return nil } // Close 关闭rabbitMq func Close() { if global.RabbitChannel != nil { err := global.RabbitChannel.Close() if err != nil { utils.LogJsonErr("关闭rabbitMq:", err.Error()) return } } if global.RabbitConn != nil { err := global.RabbitConn.Close() if err != nil { utils.LogJsonErr("关闭rabbitMq:", err.Error()) return } } } // HandleReconnect 处理重新连接 func HandleReconnect() { for { notifyClose := make(chan *amqp.Error) global.RabbitConn.NotifyClose(notifyClose) err := <-notifyClose if err != nil { time.Sleep(30 * time.Second) _ = NewRabbitMQClient() } } }