Wednesday, September 26, 2012

A few variations over JMS

PREVIOUS NOTE: Although not deprecated, as this example should work just fine with Java EE 7, I would consider the asynchronous receiver example in this message to be obsolete. I would advise the reader to consider this message instead.

In this message, I will provide a few examples that slightly change a sender-receiver synchronous interaction. In the basic example, the sender just sends a simple message, "OH OH OH", and the receiver just prints it. You should notice that the message goes through the JBoss Message Provider. For these examples to run you need to register a user, set up the queue and start the provider, just as I showed in "Java Message Service with JBoss AS 7". We will change this basic example to create:

The Basic Case

To get a sender look here (and a synchronous receiver): Java Message Service with JBoss AS 7.

The Asynchronous Receiver

Let us now change the Receiver to get messages asynchronously. One should notice that the main thread needs to be put on halt, or otherwise it will leave and kill the daemon thread of the session that is waiting for the message. The main difference is that the Receiver class now implements the MessageListener interface. When you run this receiver you should press enter in the console to finish it. The sender goes unchanged.


import java.io.IOException;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
import javax.naming.NamingException;


public class Receiver2 implements MessageListener {
private ConnectionFactory cf;
private Connection c;
private Session s;
private Destination d;
private MessageConsumer mc;

public Receiver2() throws NamingException, JMSException {
InitialContext init = new InitialContext();
this.cf = (ConnectionFactory) init.lookup("jms/RemoteConnectionFactory");
this.d = (Destination) init.lookup("jms/queue/PlayQueue");
this.c = (Connection) this.cf.createConnection("joao", "pedro");
this.c.start();
this.s = this.c.createSession(false, Session.AUTO_ACKNOWLEDGE);
mc = s.createConsumer(d);
mc.setMessageListener(this);
}

@Override
public void onMessage(Message msg) {
TextMessage tmsg = (TextMessage) msg;
try {
System.out.println(tmsg.getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

private void close() throws JMSException {
this.c.close();
}


/**
* @param args
* @throws JMSException 
* @throws NamingException 
* @throws IOException 
*/
public static void main(String[] args) throws NamingException, JMSException, IOException {
Receiver2 r = new Receiver2();
System.in.read();
r.close();
}


}


ReplyTo and Temporary Destinations

Let us now play with the replyTo plus a temporary destination. The sender creates a queue where the receiver should send its reply to. I include only the code of the sender and the receiver that changes, starting from the sender.



Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer mp = s.createProducer(d);
TextMessage msg = s.createTextMessage();
Destination replyto = s.createTemporaryQueue();
msg.setJMSReplyTo(replyto);
msg.setText("OH OH OH");
mp.send(msg);
MessageConsumer mc = s.createConsumer(replyto);
TextMessage reply = (TextMessage) mc.receive();
System.out.println("Sender got back: " + reply.getText());
c.close();

The receiver:


Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer mc = s.createConsumer(d);
TextMessage msg = (TextMessage) mc.receive();
System.out.println(msg.getText());
TextMessage reply = s.createTextMessage(msg.getText() + " to you too my friend!");
MessageProducer mp = s.createProducer(msg.getJMSReplyTo());
mp.send(reply);

c.close();



Durable Subscriptions


You need to do a few changes to make a durable subscription. First, you need to give an id to the connection, as follows (only the second line is new):


c = cf.createConnection("joao""pedro");
c.setClientID("joao");

To create an appropriate message consumer you need to do as follows. This is pretty much the same as you used to do with a regular message consumer, except that you need to add a subscriptionid (a regular string):

MessageConsumer mconsumer = s.createDurableSubscriber(topic, subscriptionid);
This means that the clientid is independent of the identifier of the subscription (presumably, the client can have multiple durable subscriptions, although I didn't explicitly tried).

We also need to allow the role "guest" to make durable subscriptions. For this we edit the standalone-full.xml file (see my previous post). Note the lines with the "durable" word in it:

                <security-settings>
                    <security-setting match="#">
                        <permission type="send" roles="guest"/>
                        <permission type="consume" roles="guest"/>
                        <permission type="createNonDurableQueue" roles="guest"/>
                        <permission type="deleteNonDurableQueue" roles="guest"/>
                        <permission type="createDurableQueue" roles="guest"/>
                        <permission type="deleteDurableQueue" roles="guest"/>
                    </security-setting>
                </security-settings>

Now, just restart the JBoss AS 7.



Filtering


Let us again consider sending a message. The server can add a set*Property to the message to provide additional hints for the receiver to filter. Hence, just add a single line to the original sender:

TextMessage msg = s.createTextMessage();
msg.setText("OH OH OH");
msg.setLongProperty("Idade", 15);
mp.send(msg);

The receiver needs to add a filter like this, at the time it creates the consumer:

MessageConsumer mc = s.createConsumer(d, "Idade < 80");

You can now play with the ages on the sender and receiver sides. One obvious question is: when the receiver filters out the message, does the message provider keep it? Or should we lose the message for ever? E.g., assume that we don't care for any age above 45, but the sender sends a 46. Will we get this 46, once we change the receiving filter to 50?


Transactions


Finally the transactions, starting again from the server (go back to the original example, once again):


Session s = c.createSession(true, 0);
MessageProducer mp = s.createProducer(d);
TextMessage msg = s.createTextMessage();
msg.setText("OH OH OH");
mp.send(msg);
s.commit();
c.close();


The receiver:


Session s = c.createSession(true, 0); 
MessageConsumer mc = s.createConsumer(d);
TextMessage msg = (TextMessage) mc.receive();
System.out.println(msg.getText());
s.commit();
c.close();

To fully explore this example, you may want to do the following: delete the commit of the receiver and send one message. Run the receiver over and over again. It will always get the message. Restore the commit on the receiver and delete the commit on the server side. Now, you will see that the message never gets to the receiver.


No comments:

Post a Comment