延迟消费
在生产者可以设置消息的延迟级别(并不能设定一个具体的值),当然这种延迟只是消费者的延迟,消息其实已经发送给了消费者。
这种延迟级别代表的延迟时间我们可以再控制台中看到具体的数值,上面设置的级别为3就是延迟10秒。
批量发送
发送一次消息,一次发送多条消息,注意这种发送消息和单次发送一条消息的区别
public static void sendBatch() throws Exception{
//Instantiate with a producer group name.
DefaultMQProducer producer = new
DefaultMQProducer("test");
// Specify name server addresses.
producer.setNamesrvAddr("localhost:9876");
//Launch the instance.
producer.start();
List<Message> list = new ArrayList<Message>();
for (int i = 0; i < 10; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
list.add(msg);
}
// 一次性发送10条消息,批量发送
SendResult sendResult = producer.send(list);
System.out.printf("%s%n", sendResult);
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
这里还需要注意的是RocketMQ一次发送的消息最好不要超过1mb,因为一次发送的消息超过1mb性能就会下降,而一旦超过4mb那么就会报错。所以一般只有在一次发送的消息不超过1mb的时候才会使用批量发送。下面是一条消息大小的计算方式
(topic名称长度+消息传递过去的参数+消息体大小+20b(日志数据大小))=单条消息大小
消息分割
前面也说过了,一次最好不要发送超过1mb的消息,当然我们无法避免有些时候必须一次发送超过1mb的消息,这个时候我们可以选择通过消息分割的方式,把一次的消息分次发送。
首先需要一个工具类,用来切分消息,这个工具类在官网中也有介绍
public class ListSplitter implements Iterator<List<Message>> {
private final int SIZE_LIMIT = 1000 * 1000; //1024*1024 = 1mb
private final List<Message> messages;
private int currIndex;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override public boolean hasNext() {
return currIndex < messages.size();
}
@Override public List<Message> next() {
int nextIndex = currIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
//test 4 消息体大小 testMessage10: 13
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; //for log overhead 如果1条消息 直接超出了1MB的话 他这里是没办法分割的
if (tmpSize > SIZE_LIMIT) {
//it is unexpected that single message exceeds the SIZE_LIMIT
//here just let it go, otherwise it will block the splitting process
if (nextIndex - currIndex == 0) {
//if the next sublist has no element, add this one and then break, otherwise just break
nextIndex++;
}
break;
}
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
} else {
totalSize += tmpSize;
}
}
// 10 3
List<Message> subList = messages.subList(currIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
@Override
public void remove() {
}
@Override
public void forEachRemaining(Consumer<? super List<Message>> action) {
}
}
下面就是消息分割后的发送方式
public static void sendSegmentation() throws Exception{
//Instantiate with a producer group name.
DefaultMQProducer producer = new
DefaultMQProducer("test");
// Specify name server addresses.
producer.setNamesrvAddr("localhost:9876");
//Launch the instance.
producer.start();
List<Message> list = new ArrayList<Message>();
for (int i = 0; i < 10; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
list.add(msg);
}
ListSplitter listSplitter = new ListSplitter(list);
while (listSplitter.hasNext()) {
try {
List<Message> listItem = listSplitter.next();
producer.send(listItem);
} catch (Exception e) {
e.printStackTrace();
//handle the error
}
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
分布式事务
mq可以实现分布式事务,但是无论哪个mq,都只会保证最终一致性而不是强一致性,RocketMQ也同样如此。
下面就是RocketMQ分布式事务的大致流程,在整个过程中RocketMQ可以保证消息100%不会丢失,这也是保证
对于RocketMQ分布式事务的使用其实很简单,我们只需要一个TransactionListener的实现类就够了,由它负责实现我们的事务逻辑。同时不能够使用我们的默认的生产者对象DefaultMQProducer,而是使用它的子类TransactionMQProducer。
public class TranstionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setTransactionListener(transactionListener);
producer.start();
//t1提交 t2回滚 其他挂起
String[] tags = new String[] {"t1", "t2", "t3"};
for (int i = 0; i < 3; i++) {
try {
Message msg =
new Message("test2", tags[i],
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
RocketMQ提供了TransactionListener接口,我们要做的就是实现它,并实现其中的方法,在executeLocalTransaction方法中实现我们的事务逻辑,而checkLocalTransaction方法中则是查看我们本地事务情况的回调。
public class TransactionListenerImpl implements TransactionListener {
//本地事务在此处执行
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
System.out.println("执行事务");
//本地事务执行过长的时候 可以挂起他
//本地事务执行结果依赖别的事务执行的结果的时候可以挂起
if (msg.getTags().equals("t1")){
// connection.commit()
return LocalTransactionState.COMMIT_MESSAGE;
} else if (msg.getTags().equals("t2")) {
// try {
//
// }catch (){
//
// }
// connection.rollback()
return LocalTransactionState.ROLLBACK_MESSAGE;
}
return LocalTransactionState.UNKNOW;
}
//A B
//提供给MQ的回调
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
//确认本地事务的执行情况 决定 提交还是回滚
//代码检查B事务有没有执行完
System.out.println("确认回调");
return LocalTransactionState.COMMIT_MESSAGE;
}
}
转载:https://blog.csdn.net/qq_35262405/article/details/105621594