感谢支持
我们一直在努力

RabbitMQ入门教程

前面声明本文都是RabbitMQ的官方指南翻译过来的,由于本人水平有限难免有翻译不当的地方,如发现不对的地方,请联系下我,好及时改正。好了,正文开始:

RabbitMQ 是一个消息代理。这主要的原理十分简单,就是通过接受和转发消息。你可以把它想象成邮局:当你将一个包裹送到邮局,你会相信邮递员先生最终会将邮件送到接件人手上。RabbitMQ就好比一个邮箱,邮局或邮递员。

邮局和RabbitMQ两种主要的不同之处在于,RabbitMQ不处理文件,而是接受,并存储和以二进制形式将消息转发。

RabbitMQ,在消息的传送过程中,我们使用一些标准称呼。

生产过程就像发送过程,发送消息的程序就是一个生产者,我们使用“P”来描述它。

producer

队列是好比邮筒的称呼,它位于RabbitMQ内部,虽然消息流通过RabbitMQ和你的应用程序,但是它们仅仅存储在队列中。一个队列没有范围限制,你可以想存储多少就存储多少,本质上来说它是无限大的缓存。多个生产者可以通过一个队列发送消息,同样多个消费者也可以通同一个消息队列中接收消息。队列是画成这样,名字在它的上面:

queue

消费过程与接收相似,一个消费者通常是一个等着接受消息的程序,我们使用”C”来描述:

consumer

注意,那生产者,消费者和代理者不需要一定在一个机器上,事实上,大多数应用程序中,他们并不在一个机器上。

 

“Hello World”

 

(使用Java客户端)

在这部分指南中,我们将要使用java写两个程序;一个发送简单消息的生产者和一个接收消息并输出出来的消费者。我们会忽视掉一些Java API的细节,为了开始仅仅精选在这简单的事情上,这是一个”Hello World”消息。

java-one

Java 客户端库
RabbitMQ 遵循AMQP协议,那是一个开放的,并且通用的消息协议。在不同语言中有数种AMQP客户端,我们使用由RabbitMQ提供的Java客户端。
下载客户端库包,检验签名,将它解压缩到你的工作路径,从解压到的路径中提取JAR文件:

$ unzip rabbitmq-java-client-bin-*.zip
$ cp rabbitmq-java-client-bin-*/*.jar ./

(RabbitMQ Java客户端也存在Maven中央库中,groupIdcom.rabbitmq,artifactIdamqp-client.)

现在我们已经有了Java客户端和依赖文件,我们可以写一些代码了。

 

发送

sending.png

我们将会让我们的消息发送者发送消息,我们的接收者接收消息。发送者连接到RabbitMQ上,发送一个简单的消息,然后退出。

Send.java,我们需要引入一些类:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

建立这个类,为队列命名:

public class Send {

  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv)
      throws java.io.IOException {
      ...
  }
}

接着,我们创建一个服务器的连接:

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

抽象的socket连接,注意协议版本的处理以及授权,诸如此类的事情。
这里我们连接到本地机器上的代理,因此它是localhost。如果我们想连接到不同机器上的代理,只需要说明它的主机名和IP地址。

接下来我们创建一个通道,获取操作的大多数API都位于这上。

对于发送,我们必须声明一个发送队列,然后我们把消息发送到这个队列上:

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

Declaring a queue is idempotent – it will only be created if it doesn’t exist already. The message content is a byte array, so you can encode whatever you like there.

Lastly, we close the channel and the connection;
声明一个队列是幂等的,仅仅在要声明的队列不存在时才创建。消息内容是二进制数组,所以你可以随你喜好编码。

channel.close();

connection.close();

Here’s the whole Send.java class.

发送没有起作用

如果你是第一次使用RabbitMQ并且你没有看到”Sent”消息,你可能抓耳挠腮的想到底是哪里出的问题。可能是代理启动时没有足够空间(默认它需要至少1Gb 空间),因此拒绝接受消息。通过检查代理的日志文件来确定这个问题,必要情况下可以降低限制大小。配置文件的文档将会告诉你怎样设置disk_free_limit

 

接收

上面代码是构建我们的发送者。我们的接收者是从RabbitMQ中提取消息,所以不像发送者那样发送一个简单的消息,我们需要一直运行监听消息并且输出消息。
receiving

在Recv.java中的代码有与Send中几乎相同的引用:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

这额外的QueueingConsumer类是用来缓存从服务器那里发出来的信息。

跟创建发送者相同,我们打开一个连接和一个通道,声明一个我们要消费的队列。注意要与发送的队列相匹配。

             java.lang.InterruptedException {

  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv)
      throws java.io.IOException,
             java.lang.InterruptedException {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    ...
    }
}

注意我们在这里同样声明了一个队列。以为我们可能在发送者之前启动接收者,在我们从中获取消息之前我们想要确定这队列是否真实存在。
我们通知服务器通过此队列给我们发送消息。因此服务器会异步的给我们推送消息,在这里我们提供一个回调对象用来缓存消息,直到我们准备好再使用它们。这就是QueueingConsumer所做的事。

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true, consumer);

while (true) {
  QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  String message = new String(delivery.getBody());
  System.out.println(" [x] Received '" + message + "'");
}

QueueingConsumer.nextDelivery()在另一个来自服务器的消息到来之前它会一直阻塞着。

这是整个Recv.java类。

 

把所有放在一起

你可以在RabbitMQ Java客户端的类路径上编译这些文件:

$ javac -cp rabbitmq-client.jar Send.java Recv.java

为了运行它们,你需要rabbitma-client.jar和它在类路径上的的依赖文件。在一个终端上,运行发送者:

$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Send

然后,运行接收者:

$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Recv

在windows环境中,我们使用分号代替冒号来分隔类路径上的选项。

接收者将会输出从RabbitMQ中获取到来自发送者的消息。接收者会一直保持运行,等待消息(使用Ctrl-C停止),所以试着用另一个终端运行发送者。
如果你想检验队列,试着使用rabbitmqctl list_queues0

Hello World!

时间移动到第二部分,构建一个简单的工作队列。

提示
为了保存输入,你可以将类路径设置到环境变量中

\$ export CP=.:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
\$ java -cp $CP Send

或者在 Windows环境中:

\> set CP=.;commons-io-1.2.jar;commons-cli-1.1.jar;rabbitmq-client.jar
\> java -cp %CP% Send

CentOS 5.6 安装RabbitMQ http://www.linuxidc.com/Linux/2013-02/79508.htm

RabbitMQ客户端C++安装详细记录 http://www.linuxidc.com/Linux/2012-02/53521.htm

用Python尝试RabbitMQ http://www.linuxidc.com/Linux/2011-12/50653.htm

RabbitMQ集群环境生产实例部署 http://www.linuxidc.com/Linux/2012-10/72720.htm

Ubuntu下PHP + RabbitMQ使用 http://www.linuxidc.com/Linux/2010-07/27309.htm

在CentOS上安装RabbitMQ流程 http://www.linuxidc.com/Linux/2011-12/49610.htm

RabbitMQ概念及环境搭建 http://www.linuxidc.com/Linux/2014-12/110449.htm

更多详情见请继续阅读下一页的精彩内容: http://www.linuxidc.com/Linux/2015-02/113983p2.htm

工作队列

 

(使用Java客户端)

java-two
在这第一指南部分,我们写了通过同一命名的队列发送和接受消息。在这一部分,我们将会创建一个工作队列,在多个工作者之间使用分布式时间任务。
工作队列(亦称:任务队列)背后主要的思想是避免立即处理一个资源密集型任务并且不得不一直等待完成。相反我们可以计划着让任务后续执行。我们将任务封装成消息,发送到队列中。一个工作者进程在后台运行,获取任务并最终执行任务。当你运行多个工作者,所有的任务将会被他们所共享。

在web应用程序中,这个理念是特别有用的,你无法在一个短暂的http请求中处理一个复杂的任务。

 

准备

在先前的指南中,我们发送了一个包含”Hello World!“消息。现在我们将要发送一些字符串,用来代表复杂的任务。我们没有一个真实的任务,比如图片的调整大小或者pdf文件渲染,所以我们通过Thread.sleep()函数,伪装一个我们是很忙景象。我们将会把字符串中点的数量来代表它的复杂度;每一个点将要花费一秒的工作。例如,一个使用Hello...描述的假任务会发送三秒。

我们将会轻量的修改我们以前例子中Send.java代码,使其允许任意的消息可以通过命令行发出。这个程序将要计划安排任务到我们的工作队列中,所以我们把它命名为NewTask.java:

String message = getMessage(argv);
channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

一些帮助从命令行中获取消息参数:

private static String getMessage(String[] strings){
    if (strings.length < 1)
        return "Hello World!";
    return joinStrings(strings, " ");
}

private static String joinStrings(String[] strings, String delimiter) {
    int length = strings.length;
    if (length == 0) return "";
    StringBuilder words = new StringBuilder(strings[0]);
    for (int i = 1; i < length; i++) {
        words.append(delimiter).append(strings[i]);
    }
    return words.toString();
}

我们老的Recv.java程序也要求做些改变:它需要将消息体中每个点伪装成一秒。从队列中获取消息,运行任务,所以我们将它称之为Worker.java:

while (true) {
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    String message = new String(delivery.getBody());

    System.out.println(" [x] Received '" + message + "'");        
    doWork(message);
    System.out.println(" [x] Done");
}

我们伪装的任务中冒充执行时间:

private static void doWork(String task) throws InterruptedException {
    for (char ch: task.toCharArray()) {
        if (ch == '.') Thread.sleep(1000);
    }
}

在第一部分指南中那样编译它们(jar 文件需要再工作路径上):

$ javac -cp rabbitmq-client.jar NewTask.java Worker.java

 

循环分派

使用任务队列的优势之一是我们是容易并行处理。如果我们正在处理一些堆积的文件的话,我们仅仅需要增加更多的工作者,通过这种方式我们是容易扩展的。
首先,让我们试着在同一时间运行两个工作者实例。他们都会从队列中获取消息,但是具体怎样做呢?让我们一起来看一看。
你需要三个打开的控制平台,其中两个用来运行工作者程序。他们将会是我们的两个消费者-C1和C2。

shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
 [*] Waiting for messages. To exit press CTRL+C
shell2$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
 [*] Waiting for messages. To exit press CTRL+C

在这第三个控制平台我们用来发布新的任务。一旦你启动消费者,你就可以发布消息了:

shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask First message.
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Second message..
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Third message...
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Fourth message....
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Fifth message.....

让我们看看什么被投递到我们工作者那里:

shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'First message.'
[x] Received 'Third message...'
[x] Received 'Fifth message.....'
shell2$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Second message..'
 [x] Received 'Fourth message....'

默认情况想,RabbitMQ将会把每一个消息发送给下一个消费者。平均下来每个消费者获取的消息数量是相同的。这种分布式消息方式被称为轮询。试试三个或更多的工作者。

 

消息确认

处理一个任务可能花费数秒时间,你可能会好奇如果一个消费者开始一个长任务,并且在处理完成部分的情况下就死掉了会发生什么情况。就我们当前的代码来说,一旦RabbitMQ将消息传递给消费者,它就会立即将消息从内存中删除。在这种情况下,如果你杀掉一个正在处理的工作者你会丢失它正在处理的消息。我们也同时失去了已经分配给这个工作者并且没有开始处理的消息。
但是我们不想丢失任何任务,如果一个工作者死掉,我们期望将任务传递给另一个工作者。
为了保证每一个消息不会丢失,RabbitMQ支持消息确认机制。一个消息确认是由消费者发出,告诉RabbitMQ这个消息已经被接受,处理完成,RabbitMQ 可以删除它了。
如果一个消费者没有发送确认信号,RabbitMQ将会认定这个消息没有完全处理成功,将会把它传递给另一个消费者。通过这种方式,即使工作者有时会死掉,你依旧可以保证没有消息会被丢失。
这里不存在消息超时;RabbitMQ只会在工作者连接死掉才重新传递这个消息。即使一个消息要被处理很长很长时间,也不是问题。
消息确认机制默认情况下是开着的。在先前的例子中我们是明确的将这个功能关闭no_ack=True。是时候移除这个标识了,一旦我们完成一个任务,工作者需要发送一个确认信号。

QueueingConsumer consumer = new QueueingConsumer(channel);
boolean autoAck = false;
channel.basicConsume("hello", autoAck, consumer);

while (true) {
  QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  //...      
  channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}

使用这段代码,我们可以保证即使你将一个正在处理消息的工作者通过CTRL+C来终止它运行,依旧没有消息会丢失。稍后,工作者死亡后没有发送确认的消息会被重新传递。

忘掉确认

这是一个普遍的错误,就是忘记确认。这是一个很简单的错误,但是这后果是严重的。当你的客户端退出,消息会重新传递(看上去是随机传递的),RabbitMQ会越来越占用内存,因为它不会释放哪些没有发送确认的消息。

为了调试这种类型的错误,你可以使用rabbitmqctl打印出messages_unacknowledged属性:

$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello    0       0
...done.

 

消息持久化

我们已经学习了如何在确定消费者是否已经死掉,并且保证任务不被丢失。但是如果RabbitMQ服务器停止,我们的任务依旧会丢失。

当RabbitMQ退出或者崩溃,它将会忘记这队列和消息,除非你告诉它不要这样做。两个事情需要做来保证消息不会丢失:我们标记队列和消息持久化。

首先,我们需要确保RabbitMQ不会丢失我们的队列,为了这样做,我们需要将它声明为持久化:

boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);

虽然这命令是正确的,但它不会立即在我们的程序里运行。那是因为我们已经定义了一个不持久化的hello队列。RabbitMQ不允许你使用不同的参数重新定义一个存在的队列,如果你试着那样做它会返回一个错误。有个快速的变通方案-让我们声明一个不同名字的队列,比如task_queue:

boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);

这个queuqDeclare的改变需要应用在生产者和消费者的代码中。
在这点上,我们可以保证即使RabbitMQ重启,task_queue队列也不会丢失。现在我们需要标记消息持久化 – 通过设置MessageProperties(实现了BasicProperties)的值为PERSISTENT_TEXT_PLAIN

import com.rabbitmq.client.MessageProperties;

channel.basicPublish("", "task_queue", 
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());

注意消息持久化
标记消息持久化不能完全保证消息不会被丢失,虽然这样会告诉RabbitMQ保存消息到硬盘上。但是对于RabbitMQ依旧有个短暂的时间窗口对于接收一个消息并且还没有完成保存。同样,RabbitMQ不能让每个消息同步–它可能仅仅保存在缓存中,还没有真正的写入到硬盘中。这持久化的保证不是健壮的,但是对我们的简单的任务队列来说是足够了。如果你需要更健壮的持久化保证,你可以使用出版者确认。

 

公平分发

你可能注意到了,分发过程并没有如我们想的那样运作。例如,在一个情况下有两个工作者,当所有奇数消息是重的和所有偶数是轻的,一个工作者会一直忙碌下去,而另一个则会几乎不做什么事情。好吧,RabbitMQ不会在意那个事情,它会一直均匀的分发消息。
这种情况发生因为RabbitMQ仅仅分发消息到队列中��它不关心有多少消息没有由发送者发送确认信号。它仅仅盲目的将N个消息发送到N个消费者。
prefetch-count.png

为了解决这个问题,我们可以使用basicQos方法,设置prefetchCount=1。这样将会告知RabbitMQ不要同时给一个工作者超过一个任务,或者换句话说在一个工作者处理完成,发送确认之前不要给它分发一个新的消息。代替,把消息分发到下一个不繁忙的工作者。

int prefetchCount = 1;
channel.basicQos(prefetchCount);

注意队列大小

如果你的所有工作者是在忙碌,你的队列就会被填满。你将会想关注这件事,可能要添加更多的工作者,或者有些其他策略。

 

把它们放在一起

我们的NewTask.java最终代码:

import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

public class NewTask {

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) 
                      throws java.io.IOException {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

    String message = getMessage(argv);

    channel.basicPublish( "", TASK_QUEUE_NAME, 
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");

    channel.close();
    connection.close();
  }      
  //...
}

(NewTask.java source)
我们的Worker.java代码:

                      java.lang.InterruptedException {
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class Worker {

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv)
                      throws java.io.IOException,
                      java.lang.InterruptedException {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    channel.basicQos(1);

    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(TASK_QUEUE_NAME, false, consumer);

    while (true) {
      QueueingConsumer.Delivery delivery = consumer.nextDelivery();
      String message = new String(delivery.getBody());

      System.out.println(" [x] Received '" + message + "'");   
      doWork(message); 
      System.out.println(" [x] Done" );

      channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }
  }
  //...
}

(Worker.java source)
使用消息确认和预读数量你可以建立一个工作队列。持久化选项使得RabbitMQ重启之后任务依旧存在。

想要了解更多关于通道方法和消息属性,你可以浏览javadocs online

现在我们可以移到指南3了,学习怎么样将相同的消息传递给多个消费者

出版和订阅

 

(使用Java 客户端)

在先前的指南中,我们创建了一个工作队列。这工作队列后面的假想是每一个任务都被准确的传递给工作者。在这部分我们将会做一些完全不同的事情–我们将一个消息传递给多个消费者。这部分被认知为“出版和订阅”。

为了说明这个部分,我们会建立一个简单德日志系统,它是由两个程序组成–第一个发出日志消息,第二个接收和打印它们。

在我们的日志系统中,每一个运行的接收者拷贝程序将会获得信息。通过这个方式我们可以运行一个接收者,直接的把日志记录到硬盘中;在同一时间我们可以运行另一个接收者,在屏幕上看这些日志。
本质上,发布日志消息等同于广播到所有接收者。

 

交换

在先前指南部分,我们将消息发送到队列里,并从队列中接收消息。现在是时候介绍RabbitMQ中全消息模型。
让我们快速温习下在先前指南中我们掌握的:

一个发送消息的生产者是一个用户程序。
一个存储消息的队列是一个缓冲。
一个接收消息的消费者是一个用户程序。
在RabbitMQ消息模型中核心的思想是生产者从不直接将消息发送给队列。实际上,生产者常常甚至不知道是否一个消息会被传递到队列中。

相反,生产者仅能将消息发送到一个交易所。一个交易所是一个非常简单的事物。在它的一遍,它从生产者那里接收消息,另一边将消息推送到队列中。这个交换所必须清楚的知道它所接收到的消息要如何处理。是否将它附加到一个特别的队列中?是否将它附加到多个队列中?或者是否它应该被丢弃。规则的定义是由交换类型决定的。
exchanges.png
有几个交换类型:directtopicdeadersfanout。我们来关注最后一个–fanout。让我们创建一个这种类型的交易所并且称呼它为logs:

channel.exchangeDeclare("logs", "fanout");

fanout交易所是非常简单的。通过这个名字你可能已经猜出它的用处了,它会将接收的所有消息都广播到所有它所知道的所有队列。这个真正是我们的记录器所需要的。

交易所列表
为了列出服务器中所有交易所,你可以运行着有用的rabbitmqctl

$ sudo rabbitmqctl list_exchanges
Listing exchanges ...
        direct
amq.direct      direct
amq.fanout      fanout
amq.headers     headers
amq.match       headers
amq.rabbitmq.log        topic
amq.rabbitmq.trace      topic
amq.topic       topic
logs    fanout
...done.

在这个列表里有一些以amq.打头的交易所和默认(未命名)的交易所。这些是默认创建的,但是不太可能你会在某个时刻使用它们。
匿名交易所
在先前的指南中我们对交易所毫无了解,但是我们依旧能将消息发送到队列中。那是可能实现的,因为我们使用的是默认交易所,通过我们使用空字符串(““)标识它。
回想一下我们以前是如何发送消息的:

channel.basicPublish("", "hello", null, message.getBytes());

这第一个参数是交易所的名字。空字符串说明它是默认的或者匿名的交易所:路由关键字存在的话,消息通过路由关键字的名字路由到特定的队列上。

现在,我们可以发布我们自己命名的交易所:

channel.basicPublish( "logs", "", null, message.getBytes());

 

临时队列

你可能会想起先前我们使用的队列是有特定的名字的(是否记得hellotask_queue)。命名一个队列对我们来说是至关重要的–我们需要指定工作者到这相同的队列上。当你想把队列分享给生产者和消费者,给队列名是重要的。
但是那不是我们记录器的实例。我们想监听所有日志消息,不仅仅是它们中的子集。我们同样是对当前的消息流感兴趣,而不是旧的。为了解决这个我们需要两件事。
首先,无论我们什么时候连接RabbitMQ,我们需要一个新的,空的队列。为了做到这些,我们可以创建一个随机名字的队列或者更胜一筹-让服务器为我们选择一个随机的名字。
第二部,一旦我们将消费者的连接断开,队列应该自动删除。
在Java客户端里,当我们使用无参数调用queueDeclare()方法,我们创建一个自动产生的名字,不持久化,独占的,自动删除的队列。

String queueName = channel.queueDeclare().getQueue();

在这点,队列名中包含一个随机队列名。例如名字像amq.gen-JzTY20BRgKO-HjmUJj0wLg

 

绑定

bindings.png

我们已经创建了一个fanout交易所和队列。现在我们需要告诉交易所发送消息给我们的队列上。这交易所和队列之间的关系称之为一个绑定。

channel.queueBind(queueName, "logs", "");

从现在开始,日志交换所将要附加消息到我们的队列中。

绑定列表
你可以列出存在的绑定使用,使用rabbitmqctl list_bindings

 

把所有放在一起

python-three-overall.png
这发送日志消息的生产者程序,跟以前指南中的程序没有多少不同。这最重要的改变是我们将匿名的交易所替换为我们想要消息发布到的日志交易所。当发送是我们需要申请一个路由关键字,但是在广播消息是它的值会被忽略。这是EmitLog.java程序的代码:

import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class EmitLog {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv)
                  throws java.io.IOException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
    //...
}

(EmitLog.java source)
如你所知,建立连接后我们声明一个交易所。这个步骤是必须的,因为发布到一个不存在的交易所是禁止的。

如果队列还没有绑定到交易所上,消息将会丢失,但是这个对我们来说是ok的;如果没有消费者正在监听,我们可以安全的丢弃消息。
ReceiveLogs.java代码:

                  java.lang.InterruptedException {
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class ReceiveLogs {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv)
                  throws java.io.IOException,
                  java.lang.InterruptedException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "");

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());

            System.out.println(" [x] Received '" + message + "'");
        }
    }
}

(ReceiveLogs.java source)
如以前那样编译,我们已经做了。

$ javac -cp rabbitmq-client.jar EmitLog.java ReceiveLogs.java

如果你想把日志保存到文件中,仅仅打开一个控制平台,键入:

$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar ReceiveLogs > logs_from_rabbit.log

如果你想在你的屏幕上看这些日志, 新建一个终端并且运行:

$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar ReceiveLogs

当然,为了发出日志键入:

$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar EmitLog

使用rabbitmactl list_bindings你可以验证这代码确实创建绑定和我们想要的队列。随着两个ReceiveLogs.java程序的运行你可以看到一些如:

 $ sudo rabbitmqctl list_bindings
Listing bindings ...
logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
...done.

这结果的解释是直白简单的:来自交易所的日志流向服务器安排的两个队列中。并且那确实我们所期望的。
为了弄明白如何监听一个消息的子集,让我们移到指南的第四部分。

路由

 

(使用Java客户端)

在先前的指南中,我们建立了一个简单德日志系统。我们可以将我们的日志信息广播到多个接收者。
在这部分的指南中,我们将要往其中添加一个功能-让仅仅订阅一个消息的子集成为可能。例如,我们可以直接将关键的错误信息指向到日志文件(保存在爱硬盘空间),同时依旧能打印所有日志信息到平台上。

 

绑定

在之前的例子里我们已经创建绑定。你可以回顾下代码:

channel.queueBind(queueName, EXCHANGE_NAME, "");

A binding is a relationship between an exchange and a queue. This can be simply read as: the queue is interested in messages from this exchange.
一个绑定是一个交换所和一个队列之间的关系。这个很容易理解为:这个队列是对这交易所的消息感兴趣。

绑定可以带上额外的路由关键字参数。为了消除对basic_publish参数的迷惑,我们将会将它称之为绑定关键字。以下是我们如何通过一个关键字创建一个绑定:

channel.queueBind(queueName, EXCHANGE_NAME, "black");

这绑定关键字的意义取决于交易所类型。这fanout交易所,我们之前使用的那个,仅仅忽略它的值。

 

直接交换

我们当前的日志系统将所有消息广播到所有消费者。我们想扩展它,让其允许依据其严格的规则过滤消息。例如我们可能想让一个往硬盘中写日志消息的程序仅仅接收关键的错误,而不是将硬盘空间浪费在警告和信息的日志消息上。
我们使用fanout类型的交易所,那个不会给我们太多的灵活性-它仅仅能胜任没头脑的广播。

我们可以使用direct类型的交易所来替代。一个direct交易所背后的路由算法是简单的-一个消息将会进入那些队列的绑定关键字与消息中路由关键字匹配的队列中。

为了说明那个,考虑接下来结构:
direct-exchange.png
在这个结构里,我们看见了这direct类型的交易所绑定了两个队列。第一个队列装有orange绑定关键字,这第二个有两个绑定,一个是black绑定关键字并且另一个是green关键之。
在这个结构里,发送到交易所里的消息,其中消息中带路由关键字为orange将要路由到队列Q1上,消息中带路由关键字为blackgreen将路由到队列Q2上。所有其他类型的消息会被丢弃。

 

多种绑定

direct-exchange-multiple.png
将一个绑定关键字绑定到货个队列上是十分合法的。在我们例子中使用绑定关键字blackXQ1绑定在一起。既然那样,这direct类型的交易所与fanout类型相似,同样会广播这消息到所有符合的队列中。一个路由关键位balck的关键字将会被传递到Q1Q2

 

发出日志

我们将会为我们的日志系统使用这个模型。使用direct类型的交易所来代替fanout类型,发送消息。由于这路由关键字我们可以严格的记录。接收程序通过这种方式可以严格接收它想接收的。让我们首先关注发布日志。
总之,我们首先需要创建个交易所。

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

我们准备发送一个消息:

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

为了简化这个事情,我们保证这severityinof,warning,error中的一个。

 

订阅

接收消息如先前那样工作,有一个例外,我们会把每一个我们感兴趣的severity创建一个新的绑定。

String queueName = channel.queueDeclare().getQueue();

for(String severity : argv){    
  channel.queueBind(queueName, EXCHANGE_NAME, severity);
}

 

把它们放在一起

python-four.png
EmitLogDirect.java类的代码:

public class EmitLogDirect {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv)
                  throws java.io.IOException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        String severity = getSeverity(argv);
        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
        System.out.println(" [x] Sent '" + severity + "':'" + message + "'");

        channel.close();
        connection.close();
    }
    //..
}

ReceiveLogsDirect.java类的代码:

                  java.lang.InterruptedException {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv)
                  throws java.io.IOException,
                  java.lang.InterruptedException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        String queueName = channel.queueDeclare().getQueue();

        if (argv.length < 1){
            System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
            System.exit(1);
        }

        for(String severity : argv){
            channel.queueBind(queueName, EXCHANGE_NAME, severity);
        }

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            String routingKey = delivery.getEnvelope().getRoutingKey();

            System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
        }
    }
}

如平常那样编译(看指南第一部分,编译和类路径的建议)。为了方便,当我们运行实例是,我们现在使用一个环境变量$CP(在windows环境上是%CP%)表示类路径。
如果你想仅保存warningerror记录不包含info记录信息到文件里,打开一个控制平台并输入:

$ java -cp $CP ReceiveLogsDirect warning error > logs_from_rabbit.log

如果你想在你的屏幕上看所有的日志信息,打开一个新的终端并键入:

$ java -cp $CP ReceiveLogsDirect info warning error
 [*] Waiting for logs. To exit press CTRL+C

例如,为了发布一个错误日志信息,仅需要键入:

$ java -cp $CP EmitLogDirect error "Run. Run. Or it will explode."
 [x] Sent 'error':'Run. Run. Or it will explode.'

EmitLogDirect.java source和ReceiveLogsDirect.java source的所有源代码。

阅览指南第五部分,查看如何根据一个模式来监听消息。

主题(topic)

 

(使用Java客户端)

在先前的指南中我们改进了我们的日志系统。取代使用fanout类型的交易所,那个仅仅有能力实现哑的广播,我们使用一个direct类型的交易所,获得一个可以有选择性的接收日志。

虽然使用direct交易所类型已经改善了我们的系统,但它依旧有限制-它不能根据多个条件进行路由。

我们的日志系统中,我们可能想要订阅不仅仅基于严格的日志,同样基于发布日志的源码。你可能了解到syslog unix tool的概念,那个基于严格的(info/warn/crit…)和灵巧的(auth/cron/kern…)路由日志。

那个将会给我们许多灵活性-我们可能仅仅想监听来自于cron的关键性的错误和所有来自于kern的日志。

为了在我们日志系统中实现那个,我们需要学习更复杂的topic类型交易所。

 

topic类型交易所

发送到topic类型的交易所不能有任意的路由的关键字-它必须是一个关键字列表,由点分隔。这关键字可以是任意的,但是通常可以说明消息的基本的联系。几个合法的路由关键字例子:“stock.usd.nyse”,“nyse.vmw”,“quick.orange.rabbit”。可能有很多你想要的路由关键字,上限是255个字节。
这绑定关键字必须也在这同样的表单里。topic交易所逻辑背后是与direct交易所类型类似-一个带特别的路由关键字的消息将会被传递到所有匹配绑定的关键字的队列。但是有两个特别重要的绑定关键字。

>* (星标) 能替代任意一个单词。
># (哈希) 能代替零个或多个单词。

这个例子中是很容易解释的:
python-five.png

在这个例子中,我们发送的消息都描述的是动物。被发送的消息的路由关键字是由三个单词(两个点)组成。路由关键字中第一个单词描述的是速度,第二个描述的是颜色,第三个是物种:
“<速度>.<颜色>.<物种>“。

我们创建了三个绑定:Q1s是由绑定关键字”。orange.“所约束,Q2由”。rabbit”和”lazy.#“所约束。
这些绑定可以概括为:

>Q1 是对orange颜色的动物感兴趣。
>Q2 想了解关于兔子的所有信息和所有慢吞吞的动物信息。

一个路由关键字为”quick.orange.rabbit”消息将会被传递到所有队列。消息”lazy.orange.elephant”同样也传递到所有队列。另一方面”quick.orange.fox”仅进入第一队列,“lazy.brown.fox”仅进入第二个队列。“lazy.pink.rabbit”仅传递到第二个队列一次,即使它会匹配两个绑定。“quick.brown.fox”不符合任何绑定关键字,所以会被丢弃。

如果我们打破我们的约定,发送一个消息带一个或四个单词的关键字,像”orange”或”quick.orange.male.rabbit”,会发生什么呢?好吧,这些消息不会匹配任何绑定,将会丢失。
另一方面”lazy.orange.male.rabbit”,即使它有四个单词,将会匹配最最后那个绑定,将消息传递到第二个队列。

topic类型交易所
topic类型交易所是强大的,能表现的像其他的交易所。
Topic exchange is powerful and can behave like other exchanges.
当一个队列绑定到了”#“(哈希)绑定关键字-它会接收所有消息,不管路由关键字是什么-类似于fanout类型交易所
topic类型交易所中没有使用像”*“(星标)和”#“(哈希)的特殊字符,它的行为类似于direct类型交易所

 

把所有放在一起

我们将会在我们的日志系统中使用topic类型交易所。我们假设我们的工作的日志消息的路由关键字是由两个单词组成,格式为:“ . “。
这代码和先前的几乎一样:
EmitLogTopic.java的代码:

public class EmitLogTopic {

    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv)
                  throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        String routingKey = getRouting(argv);
        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
        System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");

        connection.close();
    }
    //...
}

ReceiveLogsTopic.java的代码:

public class ReceiveLogsTopic {

    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv)
                  throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String queueName = channel.queueDeclare().getQueue();

        if (argv.length < 1){
            System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
            System.exit(1);
        }

        for(String bindingKey : argv){
            channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
        }

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            String routingKey = delivery.getEnvelope().getRoutingKey();

            System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
        }
    }
}

运行接下来的例子,在windows环境中,使用%CP%,包含指南一种的类路径。
接收所有日志:

$ java -cp $CP ReceiveLogsTopic "#"

接收所有灵巧的kern日志:

$ java -cp $CP ReceiveLogsTopic "kern.*"

或者你仅仅想接收’critical’日志:

$ java -cp $CP ReceiveLogsTopic "*.critical"

你可以创建多个绑定:

$ java -cp $CP ReceiveLogsTopic "kern.*" "*.critical"

发出一个路由关键字为”kern.critical”的日志,输入:

$ java -cp $CP EmitLogTopic "kern.critical" "A critical kernel error"

跟这些程序玩的开心。注意代码没有对特定的路由和绑定关键字做臆断,你可以操作多于两个的路由关键字参数。

一些难题:

>““绑定会捕获路由关键字是空的消息吗?
>“#.
“会捕获消息中关键字带”..“的吗?它会捕获一个单词的关键字吗?
>“a.*.#和”a.#“之间有什么不同?

EmitLogTopic.java 和 ReceiveLogsTopic.java的源代码。

接下来,让我们在指南的第六部分,弄清当一个远端程序被调用,如何做一个一个往返的消息。

远程过程调用(RPC)
(使用Java客户端)

在指南的第二部分,我们学习了如何使用工作队列将耗时的任务分布到多个工作者中。

但是假如我们需要调用远端计算机的函数,等待结果呢?好吧,这又是另一个故事了。这模式通常被称为远程过程调用或RPC。

在这部分,我们将会使用RabbitMQ构建一个RPC系统:一个客户端和一个可扩展的RPC服务器。由于我们还没有值得分散的耗时任务,我们将会创建一个虚拟的RPC服务,用来返回Fibonacci(斐波纳契数列)。

用户接口

为了说明RPC服务如何使用,我们将会创建一个简单德客户端类。它会暴露一个叫call的方法,用来发送一个RPC请求,在响应回复之前都会一直阻塞:

FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();   
String result = fibonacciRpc.call("4");
System.out.println( "fib(4) is " + result);

RPC方面的注意
虽然RPC在电脑运算方面是一个十分普通的模式,但是它依旧常常受批判的。
如果一个程序员没有意识到函数call是本地的还是一个迟钝的RPC。这结果是不可预知的很让你困惑的,并且会增加不必要的复杂调试。与简化软件相反,误用RPC会导致不可维护的意大利面条代码(译者注:原文是spaghetti code可能形容代码很长很乱)。

思想中煎熬,考虑下接下来的建议:
确保明显区分哪个是函数call是本地调用的,哪个是远端调用的。

给你的系统加上文档,让组件之间的依赖项清晰可见的。

处理错误事件。当RPC服务器很久没有响应了,客户端应该如何响应?
当关于RPC的所有疑问消除,在你可以的情况下,你应该使用一个异步的管道,代替RPC中阻塞,结果会异步的放入接下来的计算平台。

回收队列

一般来说在RabbitMQ上做RPC是容易的。一个客户端发送一个请求消息,一个服务器返回响应消息。为了接受到响应,我们需要再请求中带上一个callback队列的地址。我们可以使用默认队列(那个在Java客户端上市独占的)。让我们试一下:

callbackQueueName = channel.queueDeclare().getQueue();

BasicProperties props = new BasicProperties
                            .Builder()
                            .replyTo(callbackQueueName)
                            .build();

channel.basicPublish("", "rpc_queue", props, message.getBytes());

// ... then code to read a response message from the callback_queue ...

消息属性
这AMQP协议预先确定了消息中的14个属性。他们大多数属性很少使用,除了下面这些例外:
deliveryMode:将一个消息标记为持久化(值为2)或者瞬态的(其他值)。你可能从第二部分中记起这个属性。
contentType:用来描述媒体类型的编码。例如常常使用的JSON编码,这是一个好的惯例,设置这个属性为:application/json
replyTo:通常来命名回收队列的名字。
correlationId:对RPC加速响应请求是很有用的。

我们需要这个新的引用:

import com.rabbitmq.client.AMQP.BasicProperties;

相关性ID (原:Correlation Id)

在当前方法中我们建议为每一个RPC请求创建一个回收队列。这个效率十分低下的,但幸运的是有一个更好的方式- 让我们为每一个客户端创建一个单一的回收队列。
这样又出现了新的问题,没有清晰的判断队列中的响应是属于哪个请求的。这个时候coorrelationId属性发挥了作用。我们将每个请求的这个属性设置为唯一值。以后当我们在回收队列中接收消息时,我们将会查看这个属性,依据这个属性值,我们是能将每个响应匹配的对应的请求上。如果我们遇见个未知的correlationId值,我们可以安全的丢弃这个消息-因为它不属于任何一个我们的请求。

你可能会问,为什么我们要忽略哪些在回收队列中未知的消息,而不是以一个错误结束?因为在服务器竟态条件下,这种情况是可能的。RPC服务器发送给我们答应之后,在发送一个确认消息之前,就死掉了,虽然这种可能性不大,但是它依旧存在可能。如果这事情发生了,RPC服务器重启之后,将会再一次处理请求。这就是为什么我们要温和地处理重复的响应,这RPC理想情况下是幂等的。

摘要

python-six.png
我们的RPC将会像这样工作:

当客户端启动,它会创建一个匿名的独占的回收队列。
对于一个RPC请求,客户端会发送一个消息中有两个属性:replyTo,要发送的的回收队列和correlationId,对于每一个请求都是唯一值。
这请求发送到rpc_queue队列中。
这RPC工作者(亦称:服务器)等候队列中的请求。当请求出现,它处理这工作并发送携带结果的信息到客户端,使用的队列是消息属性replTo中的那个。
客户端等待回收队列中的数据。当一个消息出现,它会检查correlationId属性。如果它符合请求中的值,它会返回这响应给应用程序。

把所有的放在一起

斐波那契任务:

private static int fib(int n) throws Exception {
    if (n == 0) return 0;
    if (n == 1) return 1;
    return fib(n-1) + fib(n-2);
}

我们声明我们的斐波那契函数。它假定一个合法的正整数做为输入参数。(不要期望这个可以处理大量数字,它可能是最慢的递归实现了)。
我们的RPC服务器RPCServer.java的代码:

private static final String RPC_QUEUE_NAME = "rpc_queue";

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);

channel.basicQos(1);

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);

System.out.println(" [x] Awaiting RPC requests");

while (true) {
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();

    BasicProperties props = delivery.getProperties();
    BasicProperties replyProps = new BasicProperties
                                     .Builder()
                                     .correlationId(props.getCorrelationId())
                                     .build();

    String message = new String(delivery.getBody());
    int n = Integer.parseInt(message);

    System.out.println(" [.] fib(" + message + ")");
    String response = "" + fib(n);

    channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes());

    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}

这服务器代码是相当简单明了的:
如往常一样,我们开始建立连接,通道和声明队列。
我们可能想运行不止一个服务器进程。为了均衡的负载到多个服务器上,我们需要设置channel.basicQos中的prefetchCount属性。
我们使用basicConsume访问队列。然后进入while循环,我们等待请求消息,处理工作,发送响应。

我们RPC客户端RPCClient.java的代码:

private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
private String replyQueueName;
private QueueingConsumer consumer;

public RPCClient() throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    connection = factory.newConnection();
    channel = connection.createChannel();

    replyQueueName = channel.queueDeclare().getQueue(); 
    consumer = new QueueingConsumer(channel);
    channel.basicConsume(replyQueueName, true, consumer);
}

public String call(String message) throws Exception {     
    String response = null;
    String corrId = java.util.UUID.randomUUID().toString();

    BasicProperties props = new BasicProperties
                                .Builder()
                                .correlationId(corrId)
                                .replyTo(replyQueueName)
                                .build();

    channel.basicPublish("", requestQueueName, props, message.getBytes());

    while (true) {
        QueueingConsumer.Delivery delivery =consumer.nextDelivery();
        if (delivery.getProperties().getCorrelationId().equals(corrId)) {
            response = new String(delivery.getBody());
            break;
        }
    }

    return response; 
}

public void close() throws Exception {
    connection.close();
}

The client code is slightly more involved:
这客户端代码是更加清晰:
我们建立一个连接和通道并且声明一个独占的callback队列用来等待答复。
我们订阅这个callback队列,以便于我们可以接收到RPC响应。
我们的call方法做这真正的RPC请求。
接着,我们首次生成一个唯一的correlationId数字并且保存它,在循环中使用这个值找到合适的响应。
接下来,我们发布请求消息,带着两个属性:replyTocorrelationId
这时候,我们可以坐下来,等着合适的响应抵达。
这循环中做了个简单德工作,检查每一个响应消息中correlationId值,是否是它要寻找的。如果是,它会保存这响应。
最终,我们把响应返回给用户。

制造客户端请求:

RPCClient fibonacciRpc = new RPCClient();

System.out.println(" [x] Requesting fib(30)");   
String response = fibonacciRpc.call("30");
System.out.println(" [.] Got '" + response + "'");

fibonacciRpc.close();

现在是时候让我们回顾下我们RPCClient.java和RPCServer.java中的全部例子的源码(包含基本的异常处理)。
编译和如往常一样建立类路径(看指南的第一部分)

$ javac -cp rabbitmq-client.jar RPCClient.java RPCServer.java

我们的RPC服务现在准备好了,我们启动着服务器:

$ java -cp $CP RPCServer
 [x] Awaiting RPC requests

为了请求一个斐波那契数字,运行客户端:

$ java -cp $CP RPCClient
 [x] Requesting fib(30)

现在的设计不仅仅可以实现一个RPC服务,并且它还有几项重要的优势:
如果RPC服务器反应太迟缓,你可以通过运行另一个程序来扩展。试着通过一个新的控制平台来运行第二个RPC服务器。在客户端这边,RPC要求仅发送和接收一个消息。像queueDeclare非同步调用是被要求的。因此,RPC客户端仅仅需要一个网络循环的单一RPC请求。

我们的代码一直是十分简单的,不能试着解决更复杂(但是重要)的问题,比如:
如果没有服务器运行,客户端如何响应?
客户端是否对RPC的超时有处理?
如果服务器发生故障,抛出一个异常,是否应该传递到客户端?
在处理之前把进入来的非法消息隔离掉(检查界限,类型)。

如果你想实验下,你会发现rabbitmq-management插件,对观察队列是很有帮助的。

RabbitMQ 的详细介绍:请点这里
RabbitMQ 的下载地址:请点这里

本文永久更新链接地址:http://www.linuxidc.com/Linux/2015-02/113983.htm

赞(0) 打赏
转载请注明出处:服务器评测 » RabbitMQ入门教程
分享到: 更多 (0)

听说打赏我的人,都进福布斯排行榜啦!

支付宝扫一扫打赏

微信扫一扫打赏