由于最近项目需要用到Rabbitmq的数据持久化技术,利用空闲时间,分别对Rabbitmq的三种常用的Exchange:direct、fanout、topic 写了个测试实例
1.direct(发布与订阅、完全匹配)
1.1生产者(Direct)
public class Direct { private static final String EXCHANGE_NAME = "temp_direct"; private static final String[] TYPE = { "info", "warning", "error" }; public static void main(String[] argv) throws java.io.IOException { // 创建连接和频道 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.32.129"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明转发器的类型 channel.exchangeDeclare(EXCHANGE_NAME, "direct",true); //发送6条消息 for (int i = 0; i <TYPE.length; i++) { String rand = getRandom(); String message = rand + "_log :" + UUID.randomUUID().toString(); //持久化 channel.queueDeclare(TYPE[i], true, false, false, null); //流量 channel.basicQos(1); //将消息队列绑定到Exchange channel.queueBind(TYPE[i], EXCHANGE_NAME, TYPE[i]); // 发布消息至转发器,指定routingkey channel.basicPublish(EXCHANGE_NAME, TYPE[i], null, message .getBytes()); System.out.println("队列" + TYPE[i] + "绑定成功!"); } channel.close(); connection.close(); } /** * 随机产生一种日志类型 * * @return */ private static String getRandom() { Random random = new Random(); int ranVal = random.nextInt(3); return TYPE[ranVal]; } }
1.2消费者
public class ReceiveDirect { private static final String EXCHANGE_NAME = "temp_direct"; private final static String HOST = "192.168.32.129"; private static final String[] TYPE = { "info", "warning", "error" }; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException { // 创建连接和频道 ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST); Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); // 声明direct类型转发器 channel.exchangeDeclare(EXCHANGE_NAME, "direct",true); for (int i = 0; i < TYPE.length; i++) { //持久化 channel.queueDeclare(TYPE[i], true, false, false, null); //流量控制 channel.basicQos(1); //将消息队列绑定到Exchange channel.queueBind(TYPE[i], EXCHANGE_NAME, TYPE[i]); System.out.println("队列" + TYPE[i] + "绑定成功!"); } for (int i = 0; i < TYPE.length; i++) { final String queue = TYPE[i]; new Thread(){ public void run() { try { receive(channel, queue); } catch (Exception e) { e.printStackTrace(); } } }.start(); } } private static void receive(Channel channel,String QUEUE_NAME) throws Exception { // 声明消费者 QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, false, consumer); while (true) { // 等待队列推送消息 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody(), "UTF-8"); System.out.println(QUEUE_NAME + " Received '" + message + "'"); // 反馈给服务器表示收到信息 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
2.fanout(广播)
2.1生产者
public class Fanout { private final static String HOST = "192.168.32.129"; private final static String EXCHANGE_NAME = "fanout"; private final static String QUEUE = "temp_fanout"; private final static String ROUTKEY = "mq.fanout"; private final static boolean DURABLE = true; public static void main(String[] args) throws IOException, TimeoutException{ // 创建连接和频道 ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明转发器和类型 channel.exchangeDeclare(EXCHANGE_NAME, "fanout" ); //持久化 channel.queueDeclare(QUEUE, DURABLE, false, false, null); channel.basicQos(1); channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTKEY); String message = new Date().getTime()+" : fanout something"; // 往转发器上发送消息 channel.basicPublish(EXCHANGE_NAME, "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } }
2.2消费者
public class ReceiveFanout { private final static String HOST = "192.168.32.129"; private final static String EXCHANGE_NAME = "fanout"; private final static String QUEUE = "temp_fanout"; private final static String ROUTKEY = "mq.fanout"; private final static boolean DURABLE = true; public static void main(String[] args) throws java.io.IOException, java.lang.InterruptedException, TimeoutException{ // 创建连接和频道 ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //持久化 channel.queueDeclare(QUEUE, DURABLE, false, false, null); channel.basicQos(1); // 创建一个非持久的、唯一的且自动删除的队列 //String queueName = channel.queueDeclare().getQueue(); // 为转发器指定队列,设置binding channel.queueBind(QUEUE, EXCHANGE_NAME,ROUTKEY); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); // 指定接收者,第二个参数为自动应答,无需手动应答 channel.basicConsume(QUEUE, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); } } }
3.topic(主题、规则匹配)
3.1生产者
public class Topic { private final static String HOST = "192.168.32.129"; private static final String EXCHANGE_NAME = "topic_Exc"; private static final String QUEUE = "temp_wwww"; private static final String ROUTKEY="*_topic"; private static final boolean durable = true; public static void main(String[] argv) throws Exception { // 创建连接和频道 ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //声明转发器 channel.exchangeDeclare(EXCHANGE_NAME, "topic",durable); //持久化 channel.queueDeclare(QUEUE, durable, false, false, null); channel.basicQos(1); //将消息队列绑定到Exchange channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTKEY); String msg = UUID.randomUUID().toString(); channel.basicPublish(EXCHANGE_NAME, ROUTKEY, MessageProperties.PERSISTENT_TEXT_PLAIN, msg .getBytes()); System.out.println(msg); channel.close(); connection.close(); } }
3.2消费者
public class ReceiveTopicFortopic{ private final static String HOST = "192.168.32.129"; private static final String EXCHANGE_NAME = "topic_Exc"; private static final String QUEUE = "temp_topic"; private static final String ROUTKEY="*_topic"; private static final boolean durable = true; public static void main(String[] argv) throws Exception { // 创建连接和频道 ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明转发器 channel.exchangeDeclare(EXCHANGE_NAME, "topic",durable); //持久化 channel.queueDeclare(QUEUE, durable, false, false, null); channel.basicQos(1); //将消息队列绑定到Exchange channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTKEY); System.out .println(" [*] Waiting for critical messages. To exit press CTRL+C"); //声明消费者 QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println(" [x] Received routingKey = " + routingKey + ",msg = " + message + "."); } } }
1. Exchange只用于Producer,RoutingKey用来绑定Exchange和Queue,这个一般在Producer这端做的。
2. 假如Producer把消息发到没有绑定Queue的Exchange消息会丢失!
原创类文章未经允许请勿转载:39点博客 » RabbitMQ的三种Exchange,java客户端实例