飞道的博客

golang rabbitMQ 生产者复用channel以及生产者组分发策略

684人阅读  评论(0)

引用的是rabbitMQ官方示例的库github.com/rabbitmq/amqp091-go

在网络编程中我们知道tcp连接的创建、交互、销毁等相关操作的"代价"都是很高的,所以就要去实现如何复用这些连接,并要做到高效并可靠。

预期效果:

项目初始化构建时可以自定义选择生产者开启多个connection,每个connection可以启动多少个channel【都是全局复用的】,因为rabbitMQ所有的命令都是基本都是通过channel去操作完成的,所以这个channel很重要,也是我们想要复用的重点。

初始化创建完connection和channel后,当生产者需要发送一条消息的时候,我们可以通过一些策略去选择它发送到哪个connection和channel,我这里采用的就是随机选择,也可以采用哈希取模、轮询权重算法等,这个可以根据自身业务来做。

我简单画了一个效果图:

定义RabbitMQ的Config、Connection、Channel结构体


   
  1. type Config struct {
  2. Host string
  3. Port int
  4. User string
  5. Password string
  6. }
  7. type Channel struct {
  8. m *sync.Mutex
  9. ch *amqp.Channel
  10. }
  11. type Connection struct {
  12. ctx context.Context
  13. n int
  14. conn *amqp.Connection
  15. ch []*Channel
  16. }

实例化RabbitMQ结构体


   
  1. func (mq *Connection) New(config Config) (rabbitmq *Connection) {
  2. configString := fmt.Sprintf( "amqp://%s:%s@%s:%d/", config.User, config.Password, config.Host, config.Port)
  3. conn, err := amqp.Dial(configString)
  4. if err != nil {
  5. log.Panicf( "amqp connect error: %v \n", err)
  6. }
  7. rabbitmq = &Connection{
  8. ctx: context.Background(),
  9. conn: conn,
  10. }
  11. return
  12. }

一、创建消费者


   
  1. // ConsumeWithWork rabbitmq消费消息[work模式 channelNums可以设置当前连接开启多少个channel]
  2. func (mq *Connection) ConsumeWithWork(queueName string, channelNums int) {
  3. for i := 0; i < channelNums; i++ {
  4. go func(i int) {
  5. ch, err := mq.conn.Channel()
  6. if err != nil {
  7. log.Panicf( "amqp open a channel error: %v \n", err)
  8. }
  9. q, err := ch.QueueDeclare(
  10. queueName, // name
  11. true, // durable
  12. false, // delete when unused
  13. false, // exclusive
  14. false, // no-wait
  15. nil, // arguments
  16. )
  17. if err != nil {
  18. log.Panicf( "amqp declare a queue error: %v \n", err)
  19. }
  20. err = ch.Qos(
  21. 1, // prefetch count
  22. 0, // prefetch size
  23. false, // global
  24. )
  25. if err != nil {
  26. log.Panicf( "amqp set QoS error: %v \n", err)
  27. }
  28. msg, err := ch.Consume(
  29. q.Name, // queue
  30. "", // consumer
  31. false, // auto-ack
  32. false, // exclusive
  33. false, // no-local
  34. false, // no-wait
  35. nil, // args
  36. )
  37. if err != nil {
  38. log.Panicf( "amqp register a consumer error: %v \n", err)
  39. }
  40. log.Printf( " [work-%d] Waiting for messages. To exit press CTRL+C", i)
  41. for d := range msg {
  42. time.Sleep(time.Second)
  43. fmt.Printf( "[work-%d] Received a message: %s \n", i, d.Body)
  44. err = d.Ack( false)
  45. if err != nil {
  46. log.Printf( "work_one Ack Err: %v", err)
  47. }
  48. }
  49. }(i)
  50. }
  51. var forever chan struct{}
  52. <-forever
  53. }

二、创建生产者组


   
  1. // NewPlusherGroups 创建生产者组
  2. func NewPlusherGroups(config Config, connNums, channelNums int) (plusherGroups map[ int]*Connection) {
  3. plusherGroups = make( map[ int]*Connection, connNums)
  4. for i := 0; i < connNums; i++ {
  5. var rabbitmq *Connection
  6. rabbitmq = rabbitmq.New(config)
  7. rabbitmq.n = i
  8. for cN := 0; cN < channelNums; cN++ {
  9. ch, err := rabbitmq.conn.Channel()
  10. if err != nil {
  11. log.Panicf( "amqp open a channel error: %v \n", err)
  12. }
  13. rabbitmq.ch = append(rabbitmq.ch, &Channel{ch: ch, m: &sync.Mutex{}})
  14. }
  15. plusherGroups[i] = rabbitmq
  16. }
  17. return
  18. }

三、将消息随机分发给不同的connection、channel


   
  1. // SendMessageWithWork 生产者发送消息[work模式+(many conn and many channel)]
  2. func SendMessageWithWork(plusherGroups map[int]*Connection, queueName, body string) bool {
  3. if plusherGroups == nil {
  4. log.Panicln( "SendMessageWithWork plusherGroups params is nil!")
  5. }
  6. rand.Seed(time.Now().UnixNano())
  7. //获取连接个数
  8. connNums := len(plusherGroups)
  9. //随机分配一个连接对象
  10. randConnIndex := rand.Intn(connNums)
  11. //选择随机分配的连接对象
  12. conn := plusherGroups[randConnIndex]
  13. //获取当前对象的channel个数
  14. channelNums := len(conn.ch)
  15. //随机分配一个channel对象
  16. randChannelIndex := rand.Intn(channelNums)
  17. //选择随机分配的channel
  18. ch := conn.ch[randChannelIndex]
  19. //既然采用了发布者复用conn、channel的形式那么一定要加锁处理
  20. //这里为每个对象的操作进行加锁(非线程安全,不加锁会报错的)
  21. //至于在存在并发竞争的情况下会存在一定性能损耗,但是我们配置好适量的conn和channel这个基本可以忽略
  22. ch.m.Lock()
  23. defer ch.m.Unlock()
  24. q, err := ch.ch.QueueDeclare(
  25. queueName, // name
  26. true, // durable
  27. false, // delete when unused
  28. false, // exclusive
  29. false, // no-wait
  30. nil, // arguments
  31. )
  32. if err != nil {
  33. log.Panicf( "amqp declare a queue error: %v \n", err)
  34. }
  35. body = fmt.Sprintf( "conn[%d] channel[%d] send message : %s", randConnIndex, randChannelIndex, body)
  36. err = ch.ch.PublishWithContext(conn.ctx,
  37. "", // exchange
  38. q.Name, // routing key
  39. false, // mandatory
  40. false,
  41. amqp.Publishing{
  42. DeliveryMode: amqp.Persistent,
  43. ContentType: "text/plain",
  44. Body: [] byte(body),
  45. })
  46. if err != nil {
  47. log.Panicf( "amqp publish a message error: %v \n", err)
  48. }
  49. return true
  50. }

四、main函数调用消费者


   
  1. package main
  2. import (
  3. rabbitmq "go-test/rabbitmq/package"
  4. )
  5. func main() {
  6. queueName := "task_queue"
  7. config := rabbitmq.Config{
  8. Host: "192.168.6.103",
  9. Port: 5672,
  10. User: "root",
  11. Password: "root",
  12. }
  13. var mq *rabbitmq.Connection
  14. mq = mq.New(config)
  15. //开启N个消费者
  16. mq.ConsumeWithWork(queueName, 3)
  17. }

五、main函数调用生产者组发送消息


   
  1. package main
  2. import (
  3. "fmt"
  4. "github.com/gin-gonic/gin"
  5. rabbitmq "go-test/rabbitmq/package"
  6. "net/http"
  7. "time"
  8. )
  9. func main() {
  10. var messageNo int
  11. queueName := "task_queue"
  12. config := rabbitmq.Config{
  13. Host: "192.168.6.103",
  14. Port: 5672,
  15. User: "root",
  16. Password: "root",
  17. }
  18. //conn连接数
  19. connNums := 2
  20. //channel连接数
  21. channelNums := 3
  22. //启动N个不同conn的连接,并且每个连接对应的channel为N个的rabbitmq实例
  23. plusherGroup := rabbitmq.NewPlusherGroups(config, connNums, channelNums)
  24. e := gin.Default()
  25. e.GET( "/", func(c *gin.Context) {
  26. body := fmt.Sprintf( "这是第%d条消息...", messageNo)
  27. if rabbitmq.SendMessageWithWork(plusherGroup, queueName, body) == true {
  28. messageNo++
  29. c.JSON( 200, gin.H{
  30. "code": 200,
  31. "msg": "success",
  32. })
  33. } else {
  34. c.JSON( 200, gin.H{
  35. "code": 500,
  36. "msg": "error",
  37. })
  38. }
  39. })
  40. server := &http.Server{
  41. Addr: ":18776",
  42. Handler: e,
  43. ReadTimeout: time.Minute,
  44. WriteTimeout: time.Minute,
  45. }
  46. if err := server.ListenAndServe(); err != nil {
  47. panic(any( "HttpServer启动失败"))
  48. }
  49. }

执行流程:

  1. 启动消费者进程

可以看到我们用3个协程开启了3个work,也就是对应了3个channel

  1. 启动生产者组进程

这里用的gin框架,正常启动

我们可以看到rabbitMQ的控制台中,一共3个连接,1个是消费者进程,另外2个是生产者组进程,这2个正好和我们上面配置的connNums参数匹配

我们可以看到rabbitMQ的控制台中,一共9个channel,3个是消费者进程,另外6个是生产者组进程,这6个正好和我们上面配置的channelNums参数匹配

  1. 调用发送消息

ab.exe -n 1000 -c 1000 http://127.0.0.1:18776/

我们来看消费者日志打印情况,标红的可以证明我们在发送消息时让生产者根据我们的随机分配策略选择connection和channel


转载:https://blog.csdn.net/Enjun97/article/details/128974918
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场