public void close() throws JMSException {
if (connection != null) {
connection.close();
}
}
Consumer的代码也很类似,具体的步骤无非就是1.初始化资源。 2. 接收消息。 3. 必要的时候关闭资源。
初始化资源可以放到构造函数里面:
public Consumer() throws JMSException {
factory = new ActiveMQConnectionFactory(brokerURL);
connection = factory.createConnection();
connection.start();
session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
}
接收和处理消息的方法有两种,分为同步和异步的,一般同步的方式我们是通过MessageConsumer.receive()方法来处理接收
到的消息。而异步的方法则是通过注册一个MessageListener的方法,使用MessageConsumer.setMessageListener()。这里
我们采用异步的方式实现:
public static void main(String[] args) throws JMSException {
Consumer consumer = new Consumer();
for (String stock : args) {
Destination destination =
consumer.getSession().createTopic("STOCKS." + stock);
MessageConsumer messageConsumer =
consumer.getSession().createConsumer(destination);
messageConsumer.setMessageListener(new Listener());
}
}
public Session getSession() {
return session;
}
在前面的代码里我们先找到同样的topic,然后遍历所有的topic去获得消息。对于消息的处理我们专门通过Listener对象来负
责。
Listener对象的职责很简单,主要就是处理接收到的消息:
public class Listener implements MessageListener {
public void onMessage(Message message) {
try {
MapMessage map = (MapMessage)message;
String stock = map.getString("stock");
double price = map.getDouble("price");
double offer = map.getDouble("offer");
boolean up = map.getBoolean("up");
DecimalFormat df = new DecimalFormat( "#,###,###,##0.00" );
System.out.println(stock + "\t" + df.format(price) + "\t" + df.format(offer) +
"\t" + (up?"up":"down"));
} catch (Exception e) {
e.printStackTrace();
}
}
}
它实现了MessageListener接口,里面的onMessage方法就是在接收到消息之后会被调用的方法。
现在,通过实现前面的publisher和consumer我们已经实现了pub-sub模式的一个实例。仔细回想它的步骤的话,主要就是要
两者设定一个共同的topic,有了这个topic之后他们可以实现一方发消息另外一方接收。另外,为了连接到具体的message
server,这里是使用了连接tcp://localhost:16161作为定义ActiveMQConnectionFactory的路径。在publisher端通过session创建