package rabbitMq import ( "encoding/json" "fmt" "github.com/streadway/amqp" "hospital-open-api/config" "net/url" "time" ) type RabbitMQ struct { conn *amqp.Connection channel *amqp.Channel } // NewRabbitMQClient 初始化客户端 func NewRabbitMQClient() (*RabbitMQ, error) { m := config.C.Amqp user := url.QueryEscape(m.User) password := url.QueryEscape(m.Password) dsn := fmt.Sprintf("amqp://%s:%s@%s:%d/%s", user, password, m.Host, m.Port, m.Vhost) conn, err := amqp.Dial(dsn) if err != nil { return nil, err } ch, err := conn.Channel() if err != nil { return nil, err } return &RabbitMQ{ conn: conn, channel: ch, }, nil } // Publish 生产 func (r *RabbitMQ) Publish(queueName, exchangeName, routingKey string, message interface{}) error { _, err := r.channel.QueueDeclare( queueName, // 队列名字 true, // 消息是否持久化 false, // 不使用的时候删除队列 false, // 是否排他 false, // 是否阻塞 nil, // arguments ) if err != nil { return err } body, err := json.Marshal(message) if err != nil { return err } err = r.channel.Publish( exchangeName, // exchange(交换机名字) routingKey, // false, // 是否为无法路由的消息进行返回处理 false, // 是否对路由到无消费者队列的消息进行返回处理 RabbitMQ 3.0 废弃 amqp.Publishing{ ContentType: "text/plain", Body: body, // 消息内容 }) if err != nil { return err } return nil } func (r *RabbitMQ) PublishWithDelay(queueName, exchangeName, routingKey string, message interface{}, delay time.Duration) error { err := r.channel.ExchangeDeclare( queueName, // name "x-delayed-message", // type true, // durable false, // auto-deleted false, // internal false, // 阻塞处理 amqp.Table{"x-delayed-type": "direct"}, // arguments ) if err != nil { return err } body, err := json.Marshal(message) if err != nil { return err } err = r.channel.Publish( exchangeName, // exchange routingKey, // routing key false, // 是否为无法路由的消息进行返回处理 false, // 是否对路由到无消费者队列的消息进行返回处理 RabbitMQ 3.0 废弃 amqp.Publishing{ ContentType: "text/plain", Body: body, Headers: amqp.Table{"x-delay": int32(delay / time.Millisecond)}, }) if err != nil { return err } return nil } // Consume 消费 func (r *RabbitMQ) Consume(queueName string) (<-chan amqp.Delivery, error) { q, err := r.channel.QueueDeclare( queueName, // 队列名称 false, // 是否持久化 false, // 是否自动删除队列 false, // 排他 false, // 阻塞处理 nil, // arguments ) if err != nil { return nil, err } msgs, err := r.channel.Consume( q.Name, // queue "", // consumer true, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) if err != nil { return nil, err } return msgs, nil } func (r *RabbitMQ) Close() { if r.channel != nil { err := r.channel.Close() if err != nil { return } } if r.conn != nil { err := r.conn.Close() if err != nil { return } } }