39点博客

39点博客
像小蜜蜂一样生活

RabbitMQ的三种Exchange,java客户端实例

由于最近项目需要用到Rabbitmq的数据持久化技术,利用空闲时间,分别对Rabbitmq的三种常用的Exchange:direct、fanout、topic 写了个测试实例

1.direct(发布与订阅、完全匹配)


1.1生产者(Direct)

public class Direct {  
    private static final String EXCHANGE_NAME = "temp_direct";    
    private static final String[] TYPE = { "info", "warning", "error" };    
      
    public static void main(String[] argv) throws java.io.IOException    
    {    
        // 创建连接和频道    
        ConnectionFactory factory = new ConnectionFactory();    
        factory.setHost("192.168.32.129");    
        Connection connection = factory.newConnection();    
        Channel channel = connection.createChannel();    
        // 声明转发器的类型    
        channel.exchangeDeclare(EXCHANGE_NAME, "direct",true);    
    
        //发送6条消息    
        for (int i = 0; i <TYPE.length; i++)    
        {    
            String rand = getRandom();    
            String message = rand + "_log :" + UUID.randomUUID().toString();    
            //持久化  
            channel.queueDeclare(TYPE[i], true, false, false, null);  
            //流量  
            channel.basicQos(1);  
            //将消息队列绑定到Exchange  
            channel.queueBind(TYPE[i], EXCHANGE_NAME, TYPE[i]);  
            // 发布消息至转发器,指定routingkey    
            channel.basicPublish(EXCHANGE_NAME, TYPE[i], null, message    
                    .getBytes());    
            System.out.println("队列" + TYPE[i] + "绑定成功!");    
        }    
    
        channel.close();    
        connection.close();    
    }    
    
    /**  
     * 随机产生一种日志类型  
     *   
     * @return  
     */    
    private static String getRandom()    
    {    
        Random random = new Random();    
        int ranVal = random.nextInt(3);    
        return TYPE[ranVal];    
    }    
}

1.2消费者

public class ReceiveDirect {  
    private static final String EXCHANGE_NAME = "temp_direct";    
    private final static String HOST = "192.168.32.129";  
    private static final String[] TYPE = { "info", "warning", "error" };    
      
    public static void main(String[] argv) throws java.io.IOException,    
    java.lang.InterruptedException    
    {    
    // 创建连接和频道    
    ConnectionFactory factory = new ConnectionFactory();    
    factory.setHost(HOST);    
    Connection connection = factory.newConnection();    
    final Channel channel = connection.createChannel();    
    // 声明direct类型转发器    
    channel.exchangeDeclare(EXCHANGE_NAME, "direct",true);    
    for (int i = 0; i < TYPE.length; i++) {  
        //持久化  
        channel.queueDeclare(TYPE[i], true, false, false, null);  
        //流量控制  
        channel.basicQos(1);  
        //将消息队列绑定到Exchange  
        channel.queueBind(TYPE[i], EXCHANGE_NAME, TYPE[i]);  
           
        System.out.println("队列" + TYPE[i] + "绑定成功!");  
    }  
      
    for (int i = 0; i < TYPE.length; i++) {  
          final String queue = TYPE[i];  
          new Thread(){  
            public void run() {  
              try {  
                receive(channel, queue);  
              } catch (Exception e) {  
                e.printStackTrace();  
              }  
            }  
          }.start();  
        }  
    }    
      
    private static void receive(Channel channel,String QUEUE_NAME) throws Exception {  
        // 声明消费者  
        QueueingConsumer consumer = new QueueingConsumer(channel);  
        channel.basicConsume(QUEUE_NAME, false, consumer);  
        while (true) {  
          // 等待队列推送消息  
          QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
          String message = new String(delivery.getBody(), "UTF-8");  
          System.out.println(QUEUE_NAME + " Received '" + message + "'");  
          // 反馈给服务器表示收到信息  
          channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);  
        }  
    }  
}

2.fanout(广播)

2.1生产者

public class Fanout {  
     private final static String HOST = "192.168.32.129";  
     private final static String EXCHANGE_NAME = "fanout";  
     private final static String QUEUE = "temp_fanout";  
     private final static String ROUTKEY = "mq.fanout";  
     private final static boolean DURABLE = true;  
     public static void main(String[] args) throws IOException, TimeoutException{  
            // 创建连接和频道    
            ConnectionFactory factory = new ConnectionFactory();    
            factory.setHost(HOST);    
            Connection connection = factory.newConnection();    
            Channel channel = connection.createChannel();    
            // 声明转发器和类型    
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout" );    
            //持久化  
            channel.queueDeclare(QUEUE, DURABLE, false, false, null);  
            channel.basicQos(1);  
            channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTKEY);  
            String message = new Date().getTime()+" : fanout something";    
            // 往转发器上发送消息    
            channel.basicPublish(EXCHANGE_NAME, "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());    
        
            System.out.println(" [x] Sent '" + message + "'");    
        
            channel.close();    
            connection.close();    
     }  
}

2.2消费者

public class ReceiveFanout {  
     private final static String HOST = "192.168.32.129";  
     private final static String EXCHANGE_NAME = "fanout";  
     private final static String QUEUE = "temp_fanout";  
     private final static String ROUTKEY = "mq.fanout";  
     private final static boolean DURABLE = true;  
    public static void main(String[] args) throws java.io.IOException,    
    java.lang.InterruptedException, TimeoutException{  
        // 创建连接和频道    
        ConnectionFactory factory = new ConnectionFactory();    
        factory.setHost(HOST);    
        Connection connection = factory.newConnection();    
        Channel channel = connection.createChannel();    
    
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");   
        //持久化  
        channel.queueDeclare(QUEUE, DURABLE, false, false, null);  
        channel.basicQos(1);  
        // 创建一个非持久的、唯一的且自动删除的队列    
        //String queueName = channel.queueDeclare().getQueue();    
        // 为转发器指定队列,设置binding    
        channel.queueBind(QUEUE, EXCHANGE_NAME,ROUTKEY);    
    
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");    
    
        QueueingConsumer consumer = new QueueingConsumer(channel);    
        // 指定接收者,第二个参数为自动应答,无需手动应答    
        channel.basicConsume(QUEUE, true, consumer);    
    
        while (true)    
        {    
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();    
            String message = new String(delivery.getBody());    
            System.out.println(" [x] Received '" + message + "'");    
    
        }    
    }  
}

3.topic(主题、规则匹配)

3.1生产者

public class Topic {  
    private final static String HOST = "192.168.32.129";  
    private static final String EXCHANGE_NAME = "topic_Exc";  
    private static final String QUEUE = "temp_wwww";  
    private static final String ROUTKEY="*_topic";  
    private static final boolean durable = true;  
    public static void main(String[] argv) throws Exception    
    {    
        // 创建连接和频道    
        ConnectionFactory factory = new ConnectionFactory();    
        factory.setHost(HOST);    
        Connection connection = factory.newConnection();    
        Channel channel = connection.createChannel();    
        //声明转发器  
        channel.exchangeDeclare(EXCHANGE_NAME, "topic",durable);    
        //持久化  
        channel.queueDeclare(QUEUE, durable, false, false, null);  
        channel.basicQos(1);  
        //将消息队列绑定到Exchange  
        channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTKEY);  
          
        String msg = UUID.randomUUID().toString();  
          
        channel.basicPublish(EXCHANGE_NAME, ROUTKEY, MessageProperties.PERSISTENT_TEXT_PLAIN, msg    
                .getBytes());   
        System.out.println(msg);  
    
        channel.close();    
        connection.close();    
    }    
}

3.2消费者

public class ReceiveTopicFortopic{  
    private final static String HOST = "192.168.32.129";  
    private static final String EXCHANGE_NAME = "topic_Exc";  
    private static final String QUEUE = "temp_topic";  
    private static final String ROUTKEY="*_topic";  
     private static final boolean durable = true;  
     public static void main(String[] argv) throws Exception    
        {    
            // 创建连接和频道    
            ConnectionFactory factory = new ConnectionFactory();    
            factory.setHost(HOST);    
            Connection connection = factory.newConnection();    
            Channel channel = connection.createChannel();    
            // 声明转发器    
            channel.exchangeDeclare(EXCHANGE_NAME, "topic",durable);   
            //持久化  
            channel.queueDeclare(QUEUE, durable, false, false, null);  
            channel.basicQos(1);  
            //将消息队列绑定到Exchange  
            channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTKEY);  
  
            System.out    
                    .println(" [*] Waiting for critical messages. To exit press CTRL+C");    
            //声明消费者  
            QueueingConsumer consumer = new QueueingConsumer(channel);    
            channel.basicConsume(QUEUE, true, consumer);    
        
            while (true)    
            {    
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();    
                String message = new String(delivery.getBody());    
                String routingKey = delivery.getEnvelope().getRoutingKey();    
        
                System.out.println(" [x] Received routingKey = " + routingKey    
                        + ",msg = " + message + ".");    
            }    
        }    
}

   

1. Exchange只用于Producer,RoutingKey用来绑定Exchange和Queue,这个一般在Producer这端做的。

2. 假如Producer把消息发到没有绑定Queue的Exchange消息会丢失!

原创类文章未经允许请勿转载:39点博客 » RabbitMQ的三种Exchange,java客户端实例

分享到: +More

评论 沙了个发

换个身份

取消评论