本文到这里就结束了,介绍了rabbitmq通信模型中的发布订阅,适合于做模块之间的异步通信。
大家好,我是指北君。
今天指北君带领大家接着学习rabbitmq,了解rabbitmq的五大通信模型之一的发布订阅模型;接下来还会有关于rabbitmq的系列教程,对你有帮助的话记得关注哦~
上一篇文章中,简单的介绍了一下rabbitmq的work模型。这篇文章来学习一下rabbitmq中的发布订阅模型。
发布订阅模型(publish/subscribe):简单的说就是队列里面的消息会被多个消费者同时接受到,消费者接收到的信息一致。
发布订阅模型适合于做模块之间的异步通信。
public class producer {= "exchange_publish_1";(string[] args) throws ioexception, timeoutexception {= connectionutils.getconnection();= connection.createchannel();// 声明交换机.exchangedeclare(exchange_name, "fanout");// 发送消息到交换机(int i = 0; i < 100; i++) {.basicpublish(exchange_name, "", null, ("发布订阅模型的第 " + i + " 条消息").getbytes());}// 关闭资源.close();.close();}}
// 消费者1{= "queue_publish_1";= "exchange_publish_1";(string[] args) throws ioexception, timeoutexception {= connectionutils.getconnection();= connection.createchannel();// 声明队列.queuedeclare(queue_name, false, false, false, null);// 声明交换机.exchangedeclare(exchange_name, "fanout");// 将队列绑定到交换机.queuebind(queue_name, exchange_name, "");= new defaultconsumer(channel) {(string consumertag, envelope envelope, amqp.basicproperties properties, byte[] body) throws ioexception {.out.println("队列1接收到的消息是:" + new string(body));}};.basicconsume(queue_name, true, defaultconsumer);}}
// 消费者2{= "queue_publish_2";= "exchange_publish_1";(string[] args) throws ioexception, timeoutexception {= connectionutils.getconnection();= connection.createchannel();// 声明队列.queuedeclare(queue_name, false, false, false, null);// 声明交换机.exchangedeclare(exchange_name, "fanout");// 将队列绑定到交换机.queuebind(queue_name, exchange_name, "");= new defaultconsumer(channel) {(string consumertag, envelope envelope, amqp.basicproperties properties, byte[] body) throws ioexception {.out.println("队列2接收到的消息是:" + new string(body));}};.basicconsume(queue_name, true, defaultconsumer);}}
先启动2个消费者,再启动生产者
可以看出来消费者1和消费者2接收到的消息是一模一样的,每个消费者都收到了生产者发送的消息;
发布订阅模型,用到了一个新的东西-交换机,这里也解释一下相关方法的参数:
// 声明交换机.exchangedeclare(exchange_name, "fanout");// 该方法的最多参数的重载方法是:.declareok exchangedeclare(string exchange,,boolean durable,boolean autodelete,boolean internal,<string, object> arguments) throws ioexception;/** * param1:exchange,交换机名称 * param2:type,交换机类型;直接写 string效果一致;内置了4种交换机类型: * direct(路由模式)、fanout(发布订阅模式)、 * topic(topic模式-模糊匹配)、headers(标头交换,由headers的参数分配,不常用) * param3:durable,是否持久化交换机 false:默认值,不持久化 * param4:autodelete,没有消费者使用时,是否自动删除交换机 false:默认值,不删除 * param5:internal,是否内置,如果设置 为true,则表示是内置的交换器, 客户端程序无法直接发送消息到这个交换器中, 只能通过交换器路由到交换器的方式 false:默认值,允许外部直接访问 * param6:arguments,交换机的一些其他属性,默认值为 null */
// 将队列绑定到交换机.queuebind(queue_name, exchange_name, "");/** * param1:destination,目的地,队列的名字 * param2:source,资源,交换机的名字 * param3:routingkey,路由键(目前没有用到routingkey,填 "" 即可) */
本文到这里就结束了,介绍了rabbitmq通信模型中的发布订阅模型,适合于做模块之间的异步通信。