如果只是网络抖动 出现异常那么直接进入死信队列 那么是不合理的
这就可以使用延时重试队列
原理:
1.发送到业务队里 如果正常收到 正常运行
2.如果处理失败 重试 并投入延时队列 如果超过延时时间 重新投入业务队列
3.如果重试次数大于3 那么进入死信队列
1.业务队列
-
@Configuration
-
public
class BusinessConfig {
-
-
/**
-
* yewu1模块direct交换机的名字
-
*/
-
public
static
final String YEWU1_EXCHANGE =
"yewu1_direct_exchange";
-
-
/**
-
* demo业务的队列名称
-
*/
-
public
static
final String YEWU1_DEMO_QUEUE =
"yewu1_demo_queue";
-
-
/**
-
* demo业务的routekey
-
*/
-
public
static
final String YEWU1_DEMO_ROUTINGKEY =
"yewu1_demo_key";
-
-
/**
-
* 业务交换机交换机(一个项目一个业务交换机即可)
-
* 1.定义direct exchange,绑定queueTest
-
* 2.durable="true" rabbitmq重启的时候不需要创建新的交换机
-
* 3.direct交换器相对来说比较简单,匹配规则为:如果路由键匹配,消息就被投送到相关的队列
-
* fanout交换器中没有路由键的概念,他会把消息发送到所有绑定在此交换器上面的队列中。
-
* topic交换器你采用模糊匹配路由键的原则进行转发消息到队列中
-
*/
-
@Bean
-
public DirectExchange yewu1Exchange() {
-
DirectExchange directExchange =
new DirectExchange(YEWU1_EXCHANGE,
true,
false);
-
return directExchange;
-
}
-
-
/**
-
* 新建队列(一个业务需要一个队列一个routekey 命名格式 项目名-业务名)
-
* 1.队列名称
-
* 2.durable="true" 持久化 rabbitmq重启的时候不需要创建新的队列
-
* 3.exclusive 表示该消息队列是否只在当前connection生效,默认是false
-
* 4.auto-delete 表示消息队列没有在使用时将被自动删除 默认是false
-
* 5.对nack或者发送超时的 发送给死信队列 args是绑定死信队列
-
*
-
*/
-
@Bean
-
public Queue yewu1DemoQueue() {
-
return
new Queue(YEWU1_DEMO_QUEUE,
true,
false,
false);
-
}
-
-
/**
-
* 交换机与routekey绑定
-
*
-
* @return
-
*/
-
@Bean
-
public Binding yewu1DemoBinding() {
-
return BindingBuilder.bind(yewu1DemoQueue()).to(yewu1Exchange())
-
.with(YEWU1_DEMO_ROUTINGKEY);
-
}
-
}
2.延时队列
-
@Configuration
-
public
class RetryConfig {
-
-
/**
-
* 延时队列 交换机配置标识符(固定)
-
*/
-
public
static
final String RETRY_LETTER_QUEUE_KEY =
"x-dead-letter-exchange";
-
-
/**
-
* 延时队列交换机绑定配置键标识符(固定)
-
*/
-
public
static
final String RETRY_LETTER_ROUTING_KEY =
"x-dead-letter-routing-key";
-
-
/**
-
* 延时队列消息的配置超时时间枚举(固定)
-
*/
-
public
static
final String RETRY_MESSAGE_TTL =
"x-message-ttl";
-
-
/**
-
* yewu1模块延时队列交换机
-
*/
-
public
final
static String YEWU1_RETRY_EXCHANGE_NAME =
"yewu1_retry_exchange";
-
-
/**
-
* yewu1模块DEMO业务延时队列
-
*/
-
public
final
static String YEWU1_DEMO_RETRY_QUEUE_NAME =
"yewu1_demo_retry_queue";
-
-
/**
-
* yewu1模块DEMO延时队列routekey
-
*/
-
public
final
static String YEWU1_DEMO_RETRY_ROUTING_KEY =
"yewu1_demo_retry_key";
-
-
/**
-
* 延时队列交换机
-
*
-
* @return
-
*/
-
@Bean
-
public DirectExchange yewu1RetryExchange() {
-
DirectExchange directExchange =
new DirectExchange(YEWU1_RETRY_EXCHANGE_NAME,
true,
false);
-
return directExchange;
-
}
-
-
/**
-
* 新建延时队列 一个业务队列需要一个延时队列
-
*
-
* @return
-
*/
-
@Bean
-
public Queue yewu1DemoRetryQueue() {
-
Map<String, Object> args =
new ConcurrentHashMap<>(
3);
-
// 将消息重新投递到业务交换机Exchange中
-
args.put(RETRY_LETTER_QUEUE_KEY, BusinessConfig.YEWU1_EXCHANGE);
-
args.put(RETRY_LETTER_ROUTING_KEY, BusinessConfig.YEWU1_DEMO_ROUTINGKEY);
-
// 消息在队列中延迟3s后超时,消息会重新投递到x-dead-letter-exchage对应的队列中,routingkey为自己指定
-
args.put(RETRY_MESSAGE_TTL,
3 *
1000);
-
return
new Queue(YEWU1_DEMO_RETRY_QUEUE_NAME,
true,
false,
false, args);
-
}
-
-
/**
-
* 绑定以上定义关系
-
*
-
* @return
-
*/
-
@Bean
-
public Binding retryDirectBinding() {
-
return BindingBuilder.bind(yewu1DemoRetryQueue()).to(yewu1RetryExchange())
-
.with(YEWU1_DEMO_RETRY_ROUTING_KEY);
-
}
-
-
}
3.死信队列
-
@Configuration
-
public
class DeadConfig {
-
-
/**
-
* 死信队列
-
*/
-
public
final
static String FAIL_QUEUE_NAME =
"fail_queue";
-
-
/**
-
* 死信交换机
-
*/
-
public
final
static String FAIL_EXCHANGE_NAME =
"fail_exchange";
-
-
/**
-
* 死信routing
-
*/
-
public
final
static String FAIL_ROUTING_KEY =
"fail_routing";
-
-
/**
-
* 创建配置死信队列
-
*
-
*/
-
@Bean
-
public Queue deadQueue() {
-
return
new Queue(FAIL_QUEUE_NAME,
true,
false,
false);
-
}
-
-
/**
-
* 死信交换机
-
*
-
* @return
-
*/
-
@Bean
-
public DirectExchange deadExchange() {
-
DirectExchange directExchange =
new DirectExchange(FAIL_EXCHANGE_NAME,
true,
false);
-
return directExchange;
-
}
-
-
/**
-
* 绑定关系
-
*
-
* @return
-
*/
-
@Bean
-
public Binding failBinding() {
-
return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(FAIL_ROUTING_KEY);
-
}
-
-
}
4.生产者
-
@RestController
-
@RequestMapping("/TestRabbit")
-
public
class ProducerDemo {
-
-
@Resource
-
private RabbitTemplate rabbitTemplate;
-
-
//@RequestMapping("/sendDirect")
-
String sendDirect(
@RequestBody String message) throws Exception {
-
System.
out.println(
"开始生产");
-
CorrelationData
data = new CorrelationData(UUID.randomUUID().toString());
-
rabbitTemplate.convertAndSend(BusinessConfig.YEWU1_EXCHANGE, BusinessConfig.YEWU1_DEMO_ROUTINGKEY,
-
message,
data);
-
System.
out.println(
"结束生产");
-
System.
out.println(
"发送id:" +
data);
-
return
"OK,sendDirect:" + message;
-
}
-
}
5.消费者
-
public
enum RabbitEnum {
-
-
/**
-
* 处理成功
-
*/
-
ACCEPT,
-
-
/**
-
* 可以重试的错误
-
*/
-
RETRY,
-
-
/**
-
* 无需重试的错误
-
*/
-
REJECT
-
@Component
-
public
class
ConsumerDemo {
-
-
private final
static Logger logger = LoggerFactory.getLogger(ConsumerDemo.class);
-
-
@Resource
-
private RabbitTemplate rabbitTemplate;
-
-
// @RabbitListener(queues = "yewu1_demo_queue")
-
protected void consumer(Message message, Channel channel) throws Exception {
-
RabbitEnum ackSign = RabbitEnum.RETRY;
-
System.
out.println(message.getMessageProperties().getCorrelationId());
-
try {
-
// 可以加入重复消费判断
-
int i =
1 /
0;
-
-
}
catch (Exception e) {
-
ackSign = RabbitEnum.RETRY;
-
throw e;
-
}
finally {
-
// 通过finally块来保证Ack/Nack会且只会执行一次
-
if (ackSign == RabbitEnum.ACCEPT) {
-
channel.basicAck(message.getMessageProperties().getDeliveryTag(),
false);
-
}
else
if (ackSign == RabbitEnum.RETRY) {
-
String correlationData =
-
(String)message.getMessageProperties().getHeaders().
get(
"spring_returned_message_correlation");
-
System.
out.println(message.getMessageProperties().getCorrelationId());
-
long retryCount = getRetryCount(message.getMessageProperties());
-
if (retryCount >=
3) {
-
// 重试次数超过3次,则将消息发送到失败队列等待特定消费者处理或者人工处理
-
try {
-
channel.basicAck(message.getMessageProperties().getDeliveryTag(),
false);
-
rabbitTemplate.convertAndSend(DeadConfig.FAIL_EXCHANGE_NAME, DeadConfig.FAIL_ROUTING_KEY,
-
message,
new CorrelationData(correlationData));
-
logger.info(
"连续失败三次,将消息发送到死信队列,发送消息:" +
new String(message.getBody()));
-
}
catch (Exception e1) {
-
channel.basicNack(message.getMessageProperties().getDeliveryTag(),
false,
false);
-
logger.error(
"发送死信队列报错:" + e1.getMessage() +
",原始消息:" +
new String(message.getBody()));
-
}
-
}
else {
-
try {
-
channel.basicAck(message.getMessageProperties().getDeliveryTag(),
false);
-
// 重试次数不超过3次,则将消息发送到重试队列等待重新被消费
-
rabbitTemplate.convertAndSend(RetryConfig.YEWU1_RETRY_EXCHANGE_NAME,
-
RetryConfig.YEWU1_DEMO_RETRY_ROUTING_KEY, message,
-
new CorrelationData(correlationData));
-
logger.info(
"消费失败,消息发送到重试队列;" +
"原始消息:" +
new String(message.getBody()) +
";第"
-
+ (retryCount +
1) +
"次重试");
-
}
catch (Exception e1) {
-
channel.basicNack(message.getMessageProperties().getDeliveryTag(),
false,
false);
-
logger.error(
"消息发送到重试队列的时候,异常了:" + e1.getMessage() +
",重新发送消息");
-
}
-
}
-
}
-
}
-
}
-
-
-
-
/**
-
* 获取消息被重试的次数
-
*/
-
public long getRetryCount(MessageProperties messageProperties) {
-
Long retryCount =
0L;
-
if (
null != messageProperties) {
-
List<Map<String, ?>> deaths = messageProperties.getXDeathHeader();
-
if (deaths !=
null && deaths.size() >
0) {
-
Map<String, Object> death = (Map<String, Object>)deaths.
get(
0);
-
retryCount = (Long)death.
get(
"count");
-
}
-
}
-
return retryCount;
-
}
-
}
参考:https://www.cnblogs.com/mfrank/p/11260355.html
转载:https://blog.csdn.net/qq_20143059/article/details/106214436
查看评论