C#用RabbitMQ实现消息订阅与发布

编辑: admin 分类: c#语言 发布时间: 2021-11-25 来源:互联网
目录
  • Fanout交换机模型
  • RabbitMQ控制台操作
    • 新增两个队列
    • 绑定fanout交换机
  • 示例效果图
    • 核心代码
      • 消息发布
      • 消息订阅

    Fanout交换机模型

    扇形交换机,采【来源:http://www.yidunidc.com/hkgf.html网络转载请说明出处】用广播模式,根据绑定的交换机,路由到与之对应的所有队列。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。

    RabbitMQ控制台操作

    新增两个队列

    在同一个Virtual host下新增两个队列Q1,Q2,如下图所示:

    绑定fanout交换机

    将两个队列绑定到系统默认的fanout交换机,如下所示:

    示例效果图

    生产者,采用Fanout类型交换机发布消息,如下图所示:

     当生产者发布 一条消息时,Q1,Q2两个队列均会收到,如下图所示:

    当启动消费者后,两个消费者,均会订阅到相关消息,如下图所示:

    核心代码

    消息发布

    建立连接后,将通道声明类型为Fanout的交换机,如下所示:

    /// <summary>
        /// fanout类型交换机,发送消息
        /// </summary>
        public class RabbitMqFanoutSendHelper : RabbitMqHelper {
            /// <summary>
            /// 发送消息
            /// </summary>
            /// <param name="msg"></param>
            /// <returns></returns>
            public bool SendMsg(string msg)
            {
                try
                {
                    using (var conn = GetConnection("/Alan.hsiang"))
                    {
                        using (var channel = conn.CreateModel())
                        {
                            channel.ExchangeDeclare(exchange: "amq.fanout", type: ExchangeType.Fanout,durable:true);
    
                            var body = Encoding.UTF8.GetBytes(msg);
    
                            channel.BasicPublish(exchange: "amq.fanout",
                                                 routingKey: "",
                                                 basicProperties: null,
                                                 body: body);
    
                            //Console.WriteLine(" [x] Sent {0}", message);
                        };
                    };
                    return true;
                }
                catch (Exception ex)
                {
                    throw ex;
                }
            }
        }

    消息订阅

    建立连接后,通道声明类型为Fanout的交换机,并绑定队列进行订阅,如下所示:

    /// <summary>
        /// 扇形交换机接收消息
        /// </summary>
        public class RabbitMqFanoutReceiveHelper : RabbitMqHelper
        {
            public RabbitMqReceiveEventHandler OnReceiveEvent;
    
            private IConnection conn;
    
            private IModel channel;
    
            private EventingBasicConsumer consumer;
    
            public bool StartReceiveMsg(string queueName)
            {
                try
                {
                    conn = GetConnection("/Alan.hsiang");
    
                    channel = conn.CreateModel();
                    channel.ExchangeDeclare(exchange: "amq.fanout", type: ExchangeType.Fanout,durable:true);
                    //此处随机取出交换机下的队列
                    //var queueName = channel.QueueDeclare().QueueName;
                    channel.QueueBind(queue: queueName, exchange: "amq.fanout", routingKey: "");
                    consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body.ToArray();
                        var message = Encoding.UTF8.GetString(body);
                        //Console.WriteLine(" [x] Received {0}", message);
                        if (OnReceiveEvent != null)
                        {
                            OnReceiveEvent(queueName+"::"+message);
                        }
                    };
                    channel.BasicConsume(queue: queueName,
                                            autoAck: true,
                                            consumer: consumer);
                    return true;
                }
                catch (Exception ex)
                {
                    throw ex;
                }
            }
        }

    作者:Alan.hsiang
    出处:http://www.cnblogs.com/hsiang/

    以上就是C#用RabbitMQ实现消息订阅与发布的详细内容,更多关于C#用RabbitMQ实现消息订阅与发布的资料请关注海外IDC网其它相关文章!

    【本文由:日本cn2服务器 提供 转载请保留URL】