ActiveMQ(22):Consumer高级特性之消息分组(Message Groups)
一、简介
Message Groups就是对消息分组,它是Exclusive Consumer功能的增强。
逻辑上,Message Groups 可以看成是一种并发的Exclusive Consumer。跟所有的消息都由唯一的consumer处理不同,JMS 消息属性JMSXGroupID 被用来区分message group。Message Groups特性保证所有具有相同JMSXGroupID的消息会被分发到相同的consumer(只要这个consumer保持active)。
另外一方面,Message Groups特性也是一种负载均衡的机制。在一个消息被分发到consumer之前,broker首先检查消息JMSXGroupID属性。如果存在,那么broker会检查是否有某个consumer拥有这个message group。如果没有,那么broker会选择一个consumer,并将它关联到这个message group。此后,这个consumer会接收这个message group的所有消息,直到:
1:Consumer被关闭
2:Message group被关闭,通过发送一个消息,并设置这个消息的JMSXGroupSeq为-1
二、操作
2.1 创建一个Message Groups
创建一个Message Groups,只需要在message对象上设置属性即可,如下:
message.setStringProperty("JMSXGroupID","GroupA");2.2 关闭一个Message Groups
关闭一个Message Groups,只需要在message对象上设置属性即可,如下:
message.setStringProperty("JMSXGroupID","GroupA");
message.setIntProperty("JMSXGroupSeq", -1);发送:
public void test4() throws Exception {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("liuy","123456","failover:(tcp://192.168.175.13:61616,tcp://192.168.175.13:61676)");
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("test-queue4");
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < 3; i++) {
TextMessage message = session.createTextMessage("messageGroupA--" + i);
message.setStringProperty("JMSXGroupID", "GroupA");
producer.send(message);
TextMessage message2 = session.createTextMessage("GroupB--" + i);
message.setStringProperty("JMSXGroupID", "GroupB");
producer.send(message2);
}
session.commit();
session.close();
connection.close();
}接收:
public void test4() throws Exception {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("liuy","123456","failover:(tcp://192.168.175.13:61616,tcp://192.168.175.13:61676)");
Connection connection = cf.createConnection();
connection.start();
final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("test-queue4");
for (int i = 0; i < 2; i++) {
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage msg = (TextMessage) message;
try {
System.out.println(consumer + "收到消息:" + msg.getText());
session.commit();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}效果:
本文出自 “我爱大金子” 博客,请务必保留此出处http://1754966750.blog.51cto.com/7455444/1924848
文章来自:http://1754966750.blog.51cto.com/7455444/1924848
