RabbitMQ是一个消息队列,我们可以使用RabbitMQ 做消息队列,消息通知的业务功能,而且根据网上的不可靠消息得出,RabbitMQ 的性能水平甚至比 activeMQ 还要好,所以也是我选择认真去学习RabbitMQ的原因,当然我也有做个关于 activeMQ 的一些简单的 Demo 有机会的话可以分享出来~
Rabbit 使用 Erlang 来编写的AMQP 服务器。 什么是 AMQP 本人已经百度给大家了:
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有 RabbitMQ等。
哎哟,先从按照说起吧~ 我使用macbook 来进行学习和开发的 顺便提一下 本人是位果粉,别喷~ 使用 MAC OS 安非常简单,还我是建议使用 MAC OS 的同学使用 brew 进行开发,没有别的 就是因为方便。
使用这个命令之前 如果没有安装过 Homebrew 就安装一下吧 有机会我也分享一下安装方式 顺便介绍一下这个让你懒癌晚期的Homebrew。
打开终端输入:
brew install rabbitmq
然后会告诉你 无法获得 /usr/local/sbin 的写入权限。 因为是英文的错误提示,本人会看不会写 所以直接写中文给各位看了~ 还是那句 别喷~ 谢谢
好,接下来让他有权限,其实这个目录是没有的sbin。
mkdir /usr/local/sbin 创建目录
chmod 777 /usr/local/sbin 开权限,全开别烦
brew link rabbitmq link 一下
然后就没有然后了
启动 rabbitMQ
MacBook-Pro:~ Tony$ /usr/local/sbin/rabbitmq-server
RabbitMQ 3.5.6. Copyright (C) 2007-2015 Pivotal Software, Inc.
## ## Licensed under the MPL. See http://www.rabbitmq.com/
## ##
########## Logs: /usr/local/var/log/rabbitmq/rabbit@localhost.log
###### ## /usr/local/var/log/rabbitmq/rabbit@localhost-sasl.log
##########
Starting broker... completed with 10 plugins.
目前为止已经安装完 rabbitMQ 并且启动起来 了 简单到飞
开始废话 说说概念,别方 我也是抄书的~
消费者和生产者,由于书本的篇幅太长,就抄了!简单明了告诉你们,消息的产生者就是生产者,获得消息的称为消息消费者。我QQ M了你一下 我就是消息的生产者了,你在QQ看到我的消息了,你就是消息的消费者了,这才是正常写书方式嘛··· 举个开发的业务背景的例子,我支付了一张订单,这个业务产生了几个次要的业务,例如积分业务,我购买了订单,订单核心功能处理订单的业务,然后生产了一个消息并通过rabbitMQ 发送了一个消息,内容为产生了一个订单,然后订阅了消息的消息消费者就会消费消息,然后根据消息处理自己的业务,该干嘛干嘛去~ 积分业务就去加你的积分等等。当然这个只是举例,别吐槽 可以在台服务器或者一个业务方法上实现。 上面的举例对于一些分布式的微服务上是非常常见的方式了~
几个重要的概念:
信道:在连接到rabbitMQ 之后,你将可以创建并获得一个AMQP信道,信道是创建在“真实的”TCP连接内的虚拟链接。所有的AMQP命令都是通过 AMQP 信道发出去的,每条信道会被指派一个唯一的ID。所有发布消息还是订阅都是通过AMQP 信道去处理的。这个概念非常像端口号【但是不是端口啊 ,别说我混淆你】,而且他的英文名字可以理解到他的意思channel 有使用NIO 的开发者已经比较了解~ 其实使用AMQP信道的原因非常简单,建立TCP连接的性能代价是非常高的,当你在不同的线程当中去创建TCP连接的方式去和RabbitMQ进行通信的代价是十分高的,但是在多个不同的线程当中通过channel 我们就可以通过同一个TCP连接进行通信了。好像写得好乱不知道他们理不理解我写什么,唉 算了 本人的水平有限,况且大多数情况下都是我自己看而已,放弃~ 抽象图如下 丑,不过先将就~
交换器 与 路由键: 这个好难解释啊~ 直接说工作流程吧~ 哎哟其实应该把 队列写到这里来,算了吧 不改了。 先记住,交换器当中里面有N个队列,而路由键 是让交换器知道消息应该放到哪个队列当中去。别废话举例: 产生消息 要告诉 RabbitMQ 这个消息是放到那个交换器上,【注意:交换器是由开发者去创建的你可以搞N个交换器出来,没有管你】
上代码:
this.channel.basicPublish(“hello-exchange”,”hola”,properties, SerializationUtils.serialize(msg));
首先 hello-exchange 是交换器的名字, hola 是路由键 这个让hello-exchange 去判断应该分发到那个队列当中,SerializationUtils.serialize(msg) 将一个消息序列化。 channel 就不用说啦上面说了,是一个AMQP信道。
交换器有很多不同的类型一会会一一介绍。交换器的类型不同也注定了消息会分发到哪个队列当中,所有尤其重要。
工作流程:
1、消费者订阅消息,先创建一个交换器,如果交换器已经存在即返回一个交换器。当然你可以passive 为 true 如果 passive 为 true , 交换器已经存在返回一个已经存在的交换器,否则报错。我暂时不知道用在哪里,所以 先记住一般情况下 创建一个交换器,如果存在就返回一个已经存在的交换器,如果不存在则创建并返回。
JAVA 代码如下【直接给源码的方法定义,是不是很贴心】:
备注是自己翻译的别喷
/**
* Declare an exchange, via an interface that allows the complete set of
* arguments.
* @see com.rabbitmq.client.AMQP.Exchange.Declare
* @see com.rabbitmq.client.AMQP.Exchange.DeclareOk
* @param exchange the name of the exchange 【交换器名称】
* @param type the exchange type 【交换器类型 一会说 别着急】
* @param durable true if we are declaring a durable exchange (the exchange will survive a server restart) 【是否持久化交换器,这个后面说,还是简单说一下,纠结帝~ 就是交换器中的消息会持久化,就是会存在硬盘当中,宕机或重启服务时会自动恢复当时会牺牲性能,除非你的SSD硬盘好快,当我没有说过~】
* @param autoDelete true if the server should delete the exchange when it is no longer in use 【当这个交换器 已经没有人使用的时候 会自动删除,longer 长时期 当时我不知道长时期是什么时候,所以我会记住是 没人用就自己删掉 自动消失】
* @param internal true if the exchange is internal, i.e. can't be directly
* published to by a client. 【不知道什么鬼意思,如果是内部 不会直接发送到客户端,我一般写false ,真心不知道他什么意思,后面学下去应该明白】
* @param arguments other properties (construction arguments) for the exchange 【其他参数 暂时没有用到】
* @return a declaration-confirm method to indicate the exchange was successfully declared
* @throws java.io.IOException if an error is encountered 【会抛出IO异常】
*/
Exchange.DeclareOk exchangeDeclare(String exchange,String type, boolean durable, boolean autoDelete, boolean internal,Map<String, Object> arguments) throws IOException;
2、消费者创建队列,并绑定队列到交换器当中,上写路由键,让交换器知道这个路由键的消息是跑到这个队列当中的。
JAVA 代码如下【直接给源码的方法定义】:
/**
* Declare a queue
* @see com.rabbitmq.client.AMQP.Queue.Declare
* @see com.rabbitmq.client.AMQP.Queue.DeclareOk
* @param queue the name of the queue 【队列名称】
* @param durable true if we are declaring a durable queue (the queue will survive a server restart) 【持久化,这里特别说一下,如果你想消息是持久化的,必须消息是持久化的,交换器也是持久化的,队列更是持久化的,其中一个不是也无法恢复消息】
* @param exclusive true if we are declaring an exclusive queue (restricted to this connection) 【私有的,独有的。 这个队列之后这个应用可以消费,上面的英文注释是 说restricted to this connection 就是限制在这个连接可以消费,就是说不限制channel信道咯,具体没有试过,但是应该是这样,除非备注骗我,我读得书少,你唔好呃我!!!】
* @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use) 【没有人使用自动删除】 注意:如果exclusive为true 最好 autodelete都为true 至于为什么 这么简单自己想~
* @param arguments other properties (construction arguments) for the queue 【其他参数没有玩过】
* @return a declaration-confirm method to indicate the queue was successfully declared
* @throws java.io.IOException if an error is encountered
*/
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException;/**
* Declare a queue
* @see com.rabbitmq.client.AMQP.Queue.Declare
* @see com.rabbitmq.client.AMQP.Queue.DeclareOk
* @param queue the name of the queue
* @param durable true if we are declaring a durable queue (the queue will survive a server restart)
* @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
* @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
* @param arguments other properties (construction arguments) for the queue
* @return a declaration-confirm method to indicate the queue was successfully declared
* @throws java.io.IOException if an error is encountered
*/
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;
绑定队列和交换器 并指定路由键
/**
* Bind a queue to an exchange.
* @see com.rabbitmq.client.AMQP.Queue.Bind
* @see com.rabbitmq.client.AMQP.Queue.BindOk
* @param queue the name of the queue 【队列名称】
* @param exchange the name of the exchange 【交换器名称】
* @param routingKey the routine key to use for the binding 【路由键】
* @param arguments other properties (binding parameters) 【其他参数 还是那句没有玩过】
* @return a binding-confirm method if the binding was successfully created
* @throws java.io.IOException if an error is encountered
*/
Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
3、然后消费者就去订阅消息咯,注意在JAVA里面的订阅方法会产生线程阻塞的。
JAVA 代码:
订阅消息并消费
/**
* Start a non-nolocal, non-exclusive consumer, with
* a server-generated consumerTag.
* @param queue the name of the queue 【所订阅消费的队列】
* @param autoAck true if the server should consider messages 【是否为自动确定消息,好像TCP的ack syn 啊,可怕的7层模型!!!一般写true就可以】
* acknowledged once delivered; false if the server should expect
* explicit acknowledgements
* @param callback an interface to the consumer object 【回调callback 这个马上上代码看看】
* @return the consumerTag generated by the server
* @throws java.io.IOException if an error is encountered
* @see com.rabbitmq.client.AMQP.Basic.Consume
* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
* @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)
*/
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
接收到消息之后,马上有回调,来看看回调接口:
/**
* Interface for application callback objects to receive notifications and messages from
* a queue by subscription.
* Most implementations will subclass {@link DefaultConsumer}.
* <p/>
* The methods of this interface are invoked in a dispatch
* thread which is separate from the {@link Connection}'s thread. This
* allows {@link Consumer}s to call {@link Channel} or {@link
* Connection} methods without causing a deadlock.
* <p/>
* The {@link Consumer}s on a particular {@link Channel} are invoked serially on one or more
* dispatch threads. {@link Consumer}s should avoid executing long-running code
* because this will delay dispatch of messages to other {@link Consumer}s on the same
* {@link Channel}.
*
* @see Channel#basicConsume(String, boolean, String, boolean, boolean, java.util.Map, Consumer)
* @see Channel#basicCancel
*/
public interface Consumer {
/**
* Called when the consumer is registered by a call to any of the
* {@link Channel#basicConsume} methods.
* @param consumerTag the <i>consumer tag</i> associated with the consumer
*/
void handleConsumeOk(String consumerTag);
/**
* Called when the consumer is cancelled by a call to {@link Channel#basicCancel}.
* @param consumerTag the <i>consumer tag</i> associated with the consumer
*/
void handleCancelOk(String consumerTag);
/**
* Called when the consumer is cancelled for reasons <i>other than</i> by a call to
* {@link Channel#basicCancel}. For example, the queue has been deleted.
* See {@link #handleCancelOk} for notification of consumer
* cancellation due to {@link Channel#basicCancel}.
* @param consumerTag the <i>consumer tag</i> associated with the consumer
* @throws IOException
*/
void handleCancel(String consumerTag) throws IOException;
/**
* Called when either the channel or the underlying connection has been shut down.
* @param consumerTag the <i>consumer tag</i> associated with the consumer
* @param sig a {@link ShutdownSignalException} indicating the reason for the shut down
*/
void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);
/**
* Called when a <code><b>basic.recover-ok</b></code> is received
* in reply to a <code><b>basic.recover</b></code>. All messages
* received before this is invoked that haven't been <i>ack</i>'ed will be
* re-delivered. All messages received afterwards won't be.
* @param consumerTag the <i>consumer tag</i> associated with the consumer
*/
void handleRecoverOk(String consumerTag);
/**
* Called when a <code><b>basic.deliver</b></code> is received for this consumer.
* @param consumerTag the <i>consumer tag</i> associated with the consumer
* @param envelope packaging data for the message
* @param properties content header data for the message
* @param body the message body (opaque, client-specific byte array)
* @throws IOException if the consumer encounters an I/O error while processing the message
* @see Envelope 宝宝累了 看重点,这个方法就是消息到达的时候回调的方法其他你们喜欢研究 可以认真看看英文备注。
*/
void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException;
}
注意一下:其实也可以使用其他订阅方式去消费消息的,可以使用get的方式去获得一条消息,然后会在队列当中给你一条消息,但是你会想就不需要使用上面的订阅方法啦,直接跑个循环就可以啦。其实get方法会开启订阅获得一条消息后关闭订阅,这样会产生不必要的性能开销的,除非老板让你搞卡一点,让客户掏点优化费,不然就别这样干了。get的方法这里就先不介绍啦~
4、上面所说的都是消息消费者的工作,这个step开始说消息生产者要做的。开启一个交换器,当然这个交换器应该是存在不用创建的因为 上面所说消费者已经创建过了,但是其实谁去先创建都是可以的,消费者也可以创建,生产者也可以,这个倒是没有什么关系,主要是看你的业务需求【这句话甩锅用】。由于已经贴出过创建交换器的代码所以就不贴了,看上面。
与上面的交换器创建一样。不贴代码了自己拖上去看吧~
5、好像没有什么别的事情了,就是产生一个消息然后发过去。上代码
JAVA代码:
/**
* Publish a message
* @see com.rabbitmq.client.AMQP.Basic.Publish
* @param exchange the exchange to publish the message to 【发送到那个交换器上】
* @param routingKey the routing key 【路由键 决定消息去哪个队列里面混】
* @param props other properties for the message - routing headers etc 【其他消息参数, 哎~ 这个我玩过,一会DEMO时间,表演一下】
* @param body the message body 【消息体,一般把对象序列化一下发过去】
* @throws java.io.IOException if an error is encountered
*/
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
然后就没有然后了 上面 第三步的 订阅方法中的回调 就会有这个消息了。 好又到说概念的时候了。 困的同学先休息。
消息的投递方式,我也困了其实:
【1】 消息MESSAGE_A 到达 queue_A 队列
【2】 RabbitMQ 把 MESSAGE_A 发送到 APPLICATION A(消费者)
【3】 APPLICATION A 确定收到了消息 发个ack 高兴一下
【4】 RabbitMQ 把 消息从 queue_A 队列删除。上面其实也有说过,autoack参数 有印象吧~ 所以autoack 设置成true 就没错了。这里就可以开展其他话题了,如果多个订阅者怎么办呢,是不是多个订阅者都会收到消息呢,答案是否定的。RabbitMQ的队列会对订阅者做一个循环,例如目前有两个订阅者订阅了同一个队列,serverA 和 serverB 他会循环去发送消息,队列有 ID 1-10 的消息,ID 1消息会由 serverA 处理 ID 2 消息会由 serverB 处理 ID 3 消息会由 serverA 去处理 如此类推。
如果serverA 在发送ACK应答给 RabbitMQ之前 断开连接【就是服务挂了,宕机了】RabbitMQ 会认为这个消息没有处理,即没有消费到这个消息,会把这个消息发送到 serverB 去处理。
而且在消息没有ACK之前,RabbitMQ不会发你第二条消息的,所以如果你想等待消息的任务处理完之后再给第二个消息的话,可以将autoDelete设置成false,这样你就可以在消息处理完之后再去ack。���样也是一个非常通用的场景,以防扎堆处理消息。
其实这样的设计也是非常好的,如果你出现了业务错误,执行消息处理的时候,刚好出现问题了,你可以通过断开连接的方式【这里所指的断开是断开RabbitMQ的connection】让这个消息交给下个订阅者,【这里说一下,如果队列没有消费者去订阅消息的话,消息会存在RabbitMQ当中等待有消费者去订阅再去发送】,当然这也是在你没有ack的情况下。在高级版本的RabbitMQ中可以使用reject命令,让这个消息直接传递到下个订阅者。【在RabbitMQ2.0可以使用】
AMQP信道channel与订阅
如果 一个channel 订阅了一个队列就不能订阅别的队列了,也就是说一个channel只能订阅一个队列。所以你需要取消原来的订阅,并将信道设置为“传输”模式。后面有时间写后面的文章可能会说到。
交换器的类型:
direct :这个简单,只要路由键一模一样就投递到相应的队列当中。
fanout :这个也简单,消息会投递到所有绑定到这个交换器的队列当中。
topic :这个就复杂一点了,我很客观的简单就简单 复杂就复杂 从不忽悠。但是这个也好常用不能不学,好累。在路由键当中可以使用通配符。怎么说呢,好纠结啊 不知道怎么解释啊 怎么办 在线等。
举个例子吧,绑定队列:
//绑定队列 路由键是 tony.teamA 有路由键为tony.teamA的消息就投递到这里
this.channel.queueBind("queueA","topic_exchangeA","tony.teamA",null);
//绑定队列 路由键是 yan.teamA 有路由键为tony.teamB的消息就投递到这里
this.channel.queueBind("queueB","topic_exchangeA","yan.teamA",null);
//绑定队列 路由键是 *.teamA 有路由键为.teamA结尾的消息就投递到这里
this.channel.queueBind("queueC","topic_exchangeA","*.teamA",null);
//绑定队列 路由键是 chao.teamB 有路由键为chao.teamB的消息就投递到这里
this.channel.queueBind("queueD","topic_exchangeA","chao.teamB",null);
//绑定队列 路由键是 *.teamB 有路由键为.teamB结尾的消息就投递到这里
this.channel.queueBind("queueE","topic_exchangeA","*.teamB",null);
//绑定队列 路由键是 # 所有在这个交换器的消息都投递到这里
this.channel.queueBind("queueF","topic_exchangeA","#",null);
// queueA[路由键:tony.teamA]、queueC[路由键:*.teamA]、queueF[路由键:#]会收到消息
this.channel.basicPublish("topic_exchangeA","tony.teamA",properties,SerializationUtils.serialize(msg));
// queueD[路由键:chao.teamB]、queueE[路由键:*.teamB]、queueF[路由键:#]会收到消息
this.channel.basicPublish("topic_exchangeA","chao.teamB",properties,SerializationUtils.serialize(msg));
然后基本的rabbitMQ 的东西就搞好了。不过这只是开始~
简单说一下虚拟主机和隔离吧,简单来说想MySQL 你可以创建很多个库,里面有很多个表。然后呢 rabbitMQ可以创建很多个虚拟主机,虚拟主机里面有很多个交换器,交换器里面有很多个队列,解释得完美。默认会提供一个 默认虚拟主机 vhost : “/”。后面找时间再说这个 vhost 吧。
重头大戏上DEMO,由于方便我阅读回忆,所以我忽略的封装性,一切以容易快速看懂和回忆为目标。别喷~
生产者:
package com.maxfunner;
import com.maxfunner.mq.EndPoint;
import com.rabbitmq.client.*;
import org.apache.commons.lang.SerializationUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Producer
*/
public class Producer {
private Connection connection;
private Channel channel;
private Map<Long, String> messageMap = new HashMap<Long, String>();
private int maxID = 0;
private static final String EXCHANGE_NAME = "MY_EXCHANGE";
public void createConnectionAndChannel() throws IOException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1"); //服务器地址
factory.setUsername("guest"); //默认用户名
factory.setPassword("guest"); //默认密码
factory.setPort(5672); //默认端口,对就是这么屌全部默认的
this.connection = factory.newConnection(); //创建链接
this.channel = this.connection.createChannel();
}
public void initChannelAndCreateExchange() throws IOException {
this.channel.confirmSelect(); //启用消息确认已经投递成功的回调
/**
* 创建了一个交换器,类型为 direct 非持久化 自动删除 没有额外参数
*/
this.channel.exchangeDeclare(EXCHANGE_NAME, "direct", false, true, null);
this.channel.addConfirmListener(new ConfirmListener() {
/**
* 成功的时候回调【这个是当消息到达交换器的时候回调】
* @param deliveryTag 每一条消息都有一个唯一ID【只是同一个channel唯一】,每次发出消息递增1 因为同一个channel所有也保证了消息的流水性。
* @param multiple
* @throws IOException
*/
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
String message = messageMap.get(deliveryTag);
System.out.println("message : " + message + " ! 发送成功");
messageMap.remove(message);
//最后一个消息都搞掂之后 关闭所有东西
if (deliveryTag >= maxID) {
closeAnything();
}
}
/**
* 失败的时候回调
* @param deliveryTag 每一条消息都有一个唯一ID【只是同一个channel唯一】,每次发出消息递增1 因为同一个channel所有也保证了消息的流水性。
* @param multiple
* @throws IOException
*/
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
String message = messageMap.get(deliveryTag);
System.out.println("message : " + message + " ! 发送失败");
messageMap.remove(message); //发送失败就不重发了,发脾气
//最后一个消息都搞掂之后 关闭所有东西
if (deliveryTag >= maxID) {
closeAnything();
}
}
});
}
public void sendMessage(String message) throws IOException {
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.contentType("text/plain") //指定是一个文本
.build();
// 发送一个消息 到 EXCHANGE_NAME 的交换器中 路由键为 KEY_A 发送 message 之前序列化一下 具体用什么包上面import自己看
this.channel.basicPublish(EXCHANGE_NAME, "KEY_A", properties, SerializationUtils.serialize(message));
}
public void closeAnything() throws IOException {
this.channel.close(); //跪安吧 小channel
this.connection.close(); //你也滚吧 connection
}
public static void main(String[] args) throws IOException {
Producer producer = new Producer();
producer.createConnectionAndChannel();
producer.initChannelAndCreateExchange();
List<String> messageList = new ArrayList<String>();
messageList.add("message_A");
messageList.add("message_B");
messageList.add("message_C");
messageList.add("message_D");
messageList.add("message_E");
messageList.add("message_F");
producer.maxID = messageList.size(); //记录最后一个ID 当最后一个消息发送成功后关闭连接
//注意:因为channel产生的ID 是从1开始的
for (int i = 1; i <= messageList.size(); i++) {
producer.messageMap.put(new Long(i), messageList.get(i - 1)); //这里看懂了吗?没看懂也没有办法了,这里我真不知道怎么解释
producer.sendMessage(messageList.get(i - 1));
}
}
}
消费者:
package com.maxfunner;
import com.maxfunner.mq.QueueConsumer;
import com.rabbitmq.client.*;
import org.apache.commons.lang.SerializationUtils;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
* Consumer
*/
public class Consumer {
private Connection connection;
private Channel channel;
private Map<Integer,String> messageMap = new HashMap<Integer, String>();
private static final String EXCHANGE_NAME = "MY_EXCHANGE";
/**
* 对,你猜得一点都没有错,我是复制的
* @throws IOException
*/
public void createConnectionAndChannel() throws IOException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1"); //服务器地址
factory.setUsername("guest"); //默认用户名
factory.setPassword("guest"); //默认密码
factory.setPort(5672); //默认端口,对就是这么屌全部默认的
this.connection = factory.newConnection(); //创建链接
this.channel = this.connection.createChannel();
}
public void createAndBindQueue() throws IOException {
/**
* 创建了一个交换器,类型为 direct 非持久化 自动删除 没有额外参数
*/
this.channel.exchangeDeclare(EXCHANGE_NAME,"direct",false,true,null); //最好也创建一下交换器,反正已经创建也没有关系
/**
* 创建了一个队列, 名称为 QUEUE_A 非持久化 非独有的 自动删除的 没有额外删除的
*/
this.channel.queueDeclare("QUEUE_A",false,false,true,null);
this.channel.queueBind("QUEUE_A",EXCHANGE_NAME,"KEY_A");
}
public static void main(String args[]) throws IOException {
final Consumer consumer = new Consumer();
consumer.createConnectionAndChannel();
consumer.createAndBindQueue();
System.out.println("等待消息中。。。。");
new Thread(new Runnable() {
public void run() {
try {
/**
* 订阅消息,订阅队列QUEUE_A 获得消息后自动确认
*/
consumer.channel.basicConsume("QUEUE_A", true, new com.rabbitmq.client.Consumer() {
public void handleConsumeOk(String consumerTag) {
}
public void handleCancelOk(String consumerTag) {
}
public void handleCancel(String consumerTag) throws IOException {
}
public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
}
public void handleRecoverOk(String consumerTag) {
}
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("message : " + SerializationUtils.deserialize(body));
}
});
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
}
}
生产者运行结果:
message : message_A ! 发送成功
message : message_B ! 发送成功
message : message_C ! 发送成功
message : message_D ! 发送成功
message : message_E ! 发送成功
message : message_F ! 发送成功
消费者运行结果:
等待消息中。。。。
message : message_A
message : message_B
message : message_C
message : message_D
message : message_E
message : message_F
项目我是用maven创建的贴一下maven 的pom.xml文件
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.maxfunner</groupId>
<artifactId>rabbitlearning</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>rabbitlearning</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.0.4</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.1</version>
</dependency>
</dependencies>
</project>
最后一个重点问题
消息“黑洞”,如果在没有绑定队列和交换器之前,所有发出的消息都无法匹配到相应的队列当中,那些消息将永远不会被消费。而且confirm的callback也是会返回成功的即使消息进入了消息“黑洞”。所以在发送消息之前 必须确定队列已经绑定,确保消息能分配到相应的队列当中。 测试很简单,上面的DEMO 先运行 Producer 显示所有消息发送成功,然后再运行 Consumer 发现没有消息可以接收。 再尝试先运行Consumer 再运行 Producer 就发现一切都正常了,这也是为什么我把autoDelete设置为true的原因,有了autoDelete当队列没有人用的时候就会自动删除。所以每次运行都可以测试出问题。要保证消息能够到达指定的队列最好也在Producer中建立队列 而且进行相关的绑定 然后再发送消息 修改一下 Producer 的其中一方法即可。 不写了 累了~
public void initChannelAndCreateExchange() throws IOException {
this.channel.confirmSelect(); //启用消息确认已经投递成功的回调
/**
* 创建了一个交换器,类型为 direct 非持久化 自动删除 没有额外参数
*/
this.channel.exchangeDeclare(EXCHANGE_NAME, "direct", false, true, null);
this.channel.addConfirmListener(new ConfirmListener() {
/**
* 成功的时候回调【这个是当消息到达交换器的时候回调】
* @param deliveryTag 每一条消息都有一个唯一ID【只是同一个channel唯一】,每次发出消息递增1 因为同一个channel所有也保证了消息的流水性。
* @param multiple
* @throws IOException
*/
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
String message = messageMap.get(deliveryTag);
System.out.println("message : " + message + " ! 发送成功");
messageMap.remove(message);
//最后一个消息都搞掂之后 关闭所有东西
if (deliveryTag >= maxID) {
closeAnything();
}
}
/**
* 失败的时候回调
* @param deliveryTag 每一条消息都有一个唯一ID【只是同一个channel唯一】,每次发出消息递增1 因为同一个channel所有也保证了消息的流水性。
* @param multiple
* @throws IOException
*/
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
String message = messageMap.get(deliveryTag);
System.out.println("message : " + message + " ! 发送失败");
messageMap.remove(message); //发送失败就不重发了,发脾气
//最后一个消息都搞掂之后 关闭所有东西
if (deliveryTag >= maxID) {
closeAnything();
}
}
});
/**
* 创建了一个队列, 名称为 QUEUE_A 非持久化 非独有的 自动删除的 没有额外删除的
*/
this.channel.queueDeclare("QUEUE_A",false,false,true,null);
this.channel.queueBind("QUEUE_A",EXCHANGE_NAME,"KEY_A");
CentOS7下安装RabbitMQ http://www.linuxidc.com/Linux/2016-11/136812.htm
CentOS 7.2 下 RabbitMQ 集群搭建 http://www.linuxidc.com/Linux/2016-12/137812.htm
CentOS7环境安装使用专业的消息队列产品RabbitMQ http://www.linuxidc.com/Linux/2016-11/13673.htm
配置与管理RabbitMQ http://www.linuxidc.com/Linux/2016-11/136815.htm
RabbitMQ概念及环境搭建 http://www.linuxidc.com/Linux/2014-12/110449.htm
RabbitMQ入门教程 http://www.linuxidc.com/Linux/2015-02/113983.htm
RabbitMQ 的详细介绍:请点这里
RabbitMQ 的下载地址:请点这里
本文永久更新链接地址:http://www.linuxidc.com/Linux/2016-11/136813.htm