RabbitMQ详解(三):消息模式(fanout、direct、topic、work)

消费模式
参考官网:https://www.rabbitmq.com/getstarted.html
简单模式 Simple, 参考RabbitMQ详解(二):消息模式 Simple(简单)模式
简单模式是最简单的消息模式,它包含一个生产者、一个消费者和一个队列。生产者向队列里发送消息,消费者从队列中获取消息并消费。
发布订阅模式 fanout
同时向多个消费者发送消息的模式(类似广播的形式)
路由模式 direct
根据路由键选择性给多个消费者发送消息的模式
主题模式 topic
是direct模式上的一种叠加,增加了模糊路由RoutingKey的模式
工作模式 work
分发机制
…
消息模式-fanout(发布订阅)模式
类型:fanout
特点:Fanout—发布与订阅模式,是一种广播机制,它是没有路由key的模式。
创建交换机
注意 type 类型为fanout
绑定队列
图像化管理页面新建queue02、queue03队列
点击交换器后,绑定创建的三个队列
绑定成功后会如图所示
定义生产者
package com.cn.fanout;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* fanout(发布订阅) 生产者
*/
public class Producer {
public static void main(String[] args) {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置工厂属性
factory.setHost("请填写自己的ip地址");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
//3.从连接工厂中获取连接
connection = factory.newConnection("生产者1");
//4.从连接中获取通道
channel = connection.createChannel();
//5.申请队列存储信息,此步骤不需要了,我们手动在图形管理页面创建好交换机及绑定好队列queue01、queue02、queue03
//6.准备发送消息的内容
String message = "hello,rabbitmq!";
//7.1.准备交换机
String exchangeName = "fanout-exchange";
//7.2.定义路由key,fanout模式没有routingKey参数
String routingKey = "";
// 7.3: 发送消息给中间件rabbitmq-server
/*
* @params1: 交换机exchange
* @params2: 队列名称/routingkey
* @params3: 属性配置
* @params4: 发送消息的内容
*/
channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
System.out.println("消息发送成功!");
} catch (Exception e) {
e.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 8: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
启动生产者, 会看到每个队列都投递了一条消息
定义消费者
package com.cn.fanout;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.Charset;
/**
* fanout(发布订阅) 消费者
*/
public class Consumer {
public static Runnable runnable = new Runnable(){
@Override
public void run() {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置工厂属性
factory.setHost("请填写自己的ip地址");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/");
final String queueName = Thread.currentThread().getName();
Connection connection = null;
Channel channel = null;
try {
//3.从连接工厂中获取连接
connection = factory.newConnection("生产者1");
//4.从连接中获取通道
channel = connection.createChannel();
//5.接收消息
channel.basicConsume(queueName, true, new DeliverCallback() {
public void handle(String s, Delivery delivery) throws IOException {
System.out.println(queueName + "收到消息是:" + new String(delivery.getBody(), Charset.defaultCharset()));
}
}, new CancelCallback() {
public void handle(String s) throws IOException {
System.out.println("接收消息失败了...");
}
});
System.out.println(queueName + "开始接收消息 ")