HTML5 WebSockets is a powerful way to implement full duplex, reliable messaging over HTTP/S. However, HTML5’s Server-Sent Events (SSE) are an alternative (albeit not as full featured) way of sending dynamic updates from an HTTP/S server to code in a browser, and between applications written in any language. SSE doesn’t require a separate WebSockets server; it works over HTTP and HTTPS, it’s firewall friendly, and it’s simple.
In this article, we’ll explore simple messaging with HTML5’s SSE using a Java Servlet as the server and some JavaScript code in a web page as the client. We’ll also implement Java Servlets to support both topic-based publish-and-subscribe and queue-based point-to-point messaging over SSE, with standalone Java-based clients.
What Are Server-Sent Events?
HTML5 SSE messaging is based on two main components: the text/event-stream MIME type where text-based messages are sent according to a simple protocol, and the EventSource interface with event listeners to receive messages. Let’s examine the protocol first.
SSE message data is simple: it’s text that contains a field name, a colon, and the field’s value (or data). Comments—lines that starts with a colon followed by optional text—are ignored, but are useful as “keep alive” heartbeats to keep the HTTP connection open. A single message is made up of all the fields and data, parsed until an empty message (made up of just a carriage return or line feed) is received. Here are some examples:
event: message\n data: this is an important message\n \n
Listing 1 – A simple HTML5 SSE event
This SSE message results in a MessageEvent object with a data field set to “this is an important message”. The second example is similar:
event: message\n data: this is an important message\n data: that has two lines\n \n
Listing 2 – A single SSE event with two lines of data
This message results in a MessageEvent object with a data field set to “this is an important message\nthat has two lines”. Simply split the text string by a carriage return (or newline) character in your application to retrieve the individual lines of data.
SSE Field Names
Except for a comment, each SSE message starts with a fieldname. The complete set of pre-defined field names include:
- event: the event type, such as message, or another defined by your application
- data: the field data itself
- retry: an integer value indicating the reconnection time in case of a disconnect
- id: message id value
For example, the following delivers two MessageEvent objects:
:First name event: message\n data: John\n \n :Last name data: Doe\n \n
Listing 3 – Two discrete SSE event messages (separated by an empty line)
The first Message object’s data field will contain the text “John”, while the second Message object (queued and delivered after the first) will contain the text “Doe”. Both message streams contain comment fields—“First name” for the first message, “Last name” for the second—but are ignored.
Note that although the second message stream doesn’t specifically contain the event type, it’s implied to be of type “message” by default unless otherwise specified. You can create custom event types and then provide handlers to generate the MessageObjects with data sent for them. To explore this in detail, let’s look at the EventSource interface, along with sample JavaScript code.
Using the EventSource Interface
HTML5 defines an EventSource interface (see Listing 4), which you implement to receive SSE events as MessageEvent objects. This interface extends the DOM EventTarget interface, which defines methods to add and remove event listeners, and also to dispatch messages accordingly (implemented by the browser).
[Constructor(DOMString url, optional EventSourceInit eventSourceInitDict)] interface EventSource : EventTarget { readonly attribute DOMString url; readonly attribute boolean withCredentials; // ready state const unsigned short CONNECTING = 0; const unsigned short OPEN = 1; const unsigned short CLOSED = 2; readonly attribute unsigned short readyState; // networking attribute EventHandler onopen; attribute EventHandler onmessage; attribute EventHandler onerror; void close(); }; dictionary EventSourceInit { boolean withCredentials = false; };
Listing 4 – The IDL to define the HTML5 EventSource interface.
The EventSource constructor requires a server URL (where the events are sent from) and an optional parameter for optional security credentials (see Listing 5).
var people = new EventSource("http://my.example.com/chat"); people.onmessage = function (event) { var data = event.data.split('\n'); var name = data[0]; var action = data[1]; console.log( name + " has “ + action + “ the chat!"); }; people.onerror = function(event) { console.log("SSE onerror " + event); } people.onopen = function() { console.log("SSE onopen"); }
Listing 5 – JavaScript code to set up an SSE EventSource
The JavaScript code in this example receives SSE events sent as people join an online chat session. The EventSource object is given a URL for the chat server, and the onmessage, onerror, and onopen functions are defined. The onmessage() function is implemented to handle SSE events that comply with the format shown earlier in Listing 2. Here’s a specific example:
:Person joined chat event: message\n data: John Doe\n data: joined\n \n
When this SSE event is received by the code in Listing 5, the following text will be logged to the console:
John Doe has joined the chat!
Subsequently, the following SSE event will be sent when John Doe leaves the chat:
:Person joined chat event: message\n data: John Doe\n data: left\n \n
However, since “joined” and “left” are effectively two different events, you can handle them separately by defining two new SSE event types along with JavaScript handlers for each (see Listing 6).
var chat = new EventSource(‘http://my.example.com/chat’); chat.addEventListener(‘joined’, joinedChatHandler, false); chat.addEventListener(‘left’, leftChatHandler, false);
Listing 6 – Setting up custom SSE event type listeners.
:Person joined chat event: joined\n data: John Doe\n \n
The handler functions are also defined (see Listing 7).
function joinedChatHandler(event) { console.log( event.data + ” has joined the chat!”); } function leftChatHandler(event) { console.log( event.data + ” has left the chat!”); }
Listing 7 – The custom SSE event handler functions.
Note that the MessageEvent object’s data field no longer contains anything other than the name of the person. The “action” data (“joined” or “left”) has been removed since it’s now indicated by the event type.
SSE Connection States
Each SSE connection has a state with the following possible values:
- CONNECTING: The connection has not been established, or is currently being established. Note: the state may have been previously closed, but is now be reestablished.
- OPEN: The connection is open, active, and ready to dispatch events
- CLOSED: The connection is not open, is not currently being reestablished. This is a result of an unrecoverable error, or the result of a call to the close() function.
By default, when an EventSource object is created, its readyState is set to CONNECTING. It will then transition to OPEN when connected, or CLOSED if it fails entirely. Once open, the onopen event handler will be called if defined in your code. If an error occurs at any time, the onerror event handler will be called. When a message is dispatched, it’s delivered via the onmessage event handler.
Connectionless Push Notifications
Given the volume of HTML requests that come from mobile devices, the HTML5 SSE specification supports connectionless push notifications. This design pattern helps conserve power on mobile devices via a proxy server that maintains the connection to the web server on behalf of the device. The device can then sleep and resume as user activity dictates, while the proxy maintains the connection. Here’s how it works:
- The mobile browser loads an HTML5 web page that contains an EventSource request
- The mobile device connects to the server specified in the EventSource object
- An HTML5 SSE connection is established
- To conserve power when idle, the mobile device will go into low-power mode, taking the following steps:
- The mobile browser contacts a carrier network service to request that a proxy server maintain the HTTP connection
- The carrier proxy sets up an SSE connection with the server
- The mobile browser disconnects and allows the device to enter low power mode
- Later, when the server sends an SSE message to the proxy server:
- The proxy server sends a mobile push message, as defined by the Open Mobile Alliance (OMA)
- The mobile device wakes up to handle the SSE message
Remote Patient Monitoring IoT Demo
The first SSE example an application that represents a remote patient monitoring system. In this scenario, there are three simulated medical devices that, when used, transmit data. These include a blood pressure cuff, pulse oximeter (for heart rate and blood oxygenation percentage), and weight scale. The readings from these devices are simulated, and are strictly to make the demo more realistic. When the patient takes a reading (triggered by clicking on a button in the demo user interface), the data is transmitted from the simulated devices to the HTML5 code over SSE.
In the web page, shown in Figure 1, there are three main sections: data from each of the medical devices, instructions, and a toolbar along the top. The HTML5 UI is meant to resemble a tablet form factor, to be used by a remotely monitored patient. The HTML5 code is available for download here.
Figure 1 – The HTML5 remote patient monitoring application UI
When the page loads, an HTML5 DOM page load event listener is set to call the onPageLoad function (see Listing 8), which in turn calls setupEventSource.
…
Listing 8 – Handling the HTML5 page load event
A JavaScript SSE Client
The setupEventSource() JS function handles all of the SSE messaging, from subscribing to messages on the server, to handling the message events (see Listing 9).
function setupEventSource() { if (typeof(EventSource) == “undefined”) { Alert(“Error: Server-Sent Events are not supported in your browser”); return; } // Check if already connected to avoid getting duplicate messages if (typeof(source) !== “undefined”) { source.close(); } console.log(“Setting up new event source”); source = new EventSource(“sse?msg=healthcare”); source.onopen = function() { console.log(“SSE onopen”); } source.onmessage = function(event) { console.log(“SSE onmessage: ” + event.data); … } source.onerror = function(event) { console.log(“SSE onerror ” + event); … } }
Listing 9 – Setting up the SSE event listener
First, recall that variable source is defined globally (see Listing 8). Next, two things are checked:
- The browser supports SSE by checking for the presence of the EventSource interface.
- The SSE event source hasn’t already been set up. If so, the previous event source is closed, and a new one created. This can occur if the user refreshes the browser.
When the EventSource object is created, it’s given the URL of the server that sources the events. In this case, it’s the same host that served up the web page (which is http://localhost if you run this locally), and then a path to a Servlet named sse that handles the requests and sends the SSE event messages. A parameter is sent to indicate the type of messages that the client is interested in.
Once connected, the onopen function will be called. Subsequently, when messages are available, the onmessage function will be called (see Listing 10). If an error occurs for any reason (i.e. the network connection is lost), the onerror function is called with a parameter indicating the error.
source.onmessage = function(event) { console.log(“SSE onmessage: ” + event.data); if ( event.data===’start’ ) { // safely ignore this return; } // Parse the message and update the values var obj = JSON.parse(event.data); if ( obj.id == lastMsgId ) { // ignore duplicate return; } lastMsgId = obj.id if ( obj.type === ‘devicedata’ ) { setSystolic(obj.systolic); setDiastolic(obj.diastolic); setSPO2(obj.spo); setHeartrate(obj.heartrate); setWeight(obj.weight); } }
Listing 10 – The onmessage function is called when an SSE event is received
When the connection is first created, the code in this example sends a “start” message to indicate success. This isn’t required, but it’s useful when debugging. The data sent by the SSE server in this sample is in JSON format. Therefore, the next step is to convert it to an object via the JSON.parse function, built into the JavaScript engine of most browsers.
It’s best to compare the message’s ID to that of the previous message first. This ensures you don’t process a duplicate message if it’s redelivered. Finally, the appropriate HTML5 UI components are updated with the data received.
Next, let’s look at the server, where SSE subscription requests are handled and event messages are sent.
A Java Servlet SSE Server
For this example, the SSE server is implemented as a Java EE Servlet. Conceptually, HTML5 SSE processing is straightforward, but in practice there are a few tricks to ensure it works smoothly. First, SSE message subscription begins when a request arrives, handled by the Servlet processRequest method (Listing 11).
protected void processRequest(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { try { response.setContentType(“text/html;charset=UTF-8”); String param = “”; … // Is this a message subsciption request? param = request.getParameter(“msg”); if ( param != null && param.length() > 0 ) { // ** Set up new event listener ** // Important: set content type and header response.setContentType(“text/event-stream”); response.setCharacterEncoding(“UTF-8”); response.setHeader(“Connection”, “keep-alive”); // Store the listener’s writer to send messages PrintWriter out = response.getWriter(); synchronized ( listeners ) { listeners.add(out); } out.write(“event: message\n”); out.write(“retry: 300000\n”); out.write(“data: ” + “start” + “\n\n”); // Send heartbeats continuously while ( true ) { // Sending SSE heartbeat out.write(“: \n\n”); if ( out.checkError() ) { // Subscriber error, break out of loop break; } Utility.delay(HEARTBEAT_INTERVAL); } // Remove listener and return to stop sending messages removeListener(out); return; } catch ( Exception e ) { e.printStackTrace(); } }
Listing 11 – The Java Servlet handles SSE subscription requests
The following steps are taken:
- Check for the HTTP request parameter named “msg”. This indicates that a client is subscribing for SSE messages.
- Set the content type to “text/event-stream”, the encoding to “UTF-8”, and the Connection header parameter to “keep-alive”.
- Store the client’s PrintWriter object so that healthcare device messages can be sent to it.
- Set the connection retry interval to 5 minutes (300,000 milliseconds) in case of connection timeout. This is done as part of the “start” message that’s sent.
Finally, the code enters a loop to continuously send empty messages as connection heartbeats to keep the HTML5 SSE connection active. If the code returns from the processRequest method here, or fails to send heartbeats, the connection will be lost and the client will need to reconnect. Notice this loop doesn’t exit unless an SSE error occurs (i.e. the result of the browser closing).
Medical device data is sent to the Servlet asynchronously via demo driver code (or from Bluetooth, for example, if this were a real implementation), which is sent to all SSE listeners using the PrintWriter objects (stored in step 3 above). This occurs in the onDeviceData method (see Listing 12), which takes the simulated device readings and packages the update formatted as a JSON string.
public void onDeviceData(String device, String data) { // parse the device data … // Format the JSON data (just a String) String msg = “{ ” + ” \”id\”:” + (++id) + “,” + ” \”demomessagecount\”:” + demoMessageCount + “,” + ” \”demomessage\”:” + “\””+ demoMessage + “\”,” + ” \”diastolic\”:” + diastolic + “,” + ” \”systolic\”:” + systolic + “,” + ” \”spo\”:” + spo + “,” + ” \”heartrate\”:” + heartrate + “,” + ” \”weight\”:” + weight + “,” + ” \”type\”:” + “\””+ “devicedata” + “\”” + ” }”; // Send of the SSE event message with data sendUpdate(“devicedata”, msg); }
Listing 12 – Handle device data and package it as part of a JSON string
private void sendUpdate(String type, String msg) { if ( listeners == null || listeners.isEmpty() ) return; try { // Clone the list of listeners to safely iterate // Vector toSend; synchronized ( listeners ) { toSend = (Vector)listeners.clone(); } // Send update to all listeners // Iterator iter = toSend.iterator(); while ( iter.hasNext() ) { PrintWriter out = iter.next(); if ( out == null ) { continue; } try { // Send SSE update // out.write(“event: message\n”); out.write(“data: ” + msg + “\n\n”); out.flush(); } catch( Exception e ) { // Bad listener. Remove from original list and move on e.printStackTrace(); try { removeListener(out); } catch ( Exception e1 ) { } } } } catch ( Exception e ) { e.printStackTrace(); } }
Listing 13 – The SSE message sending code
The bulk of the processing is in cloning the list of listeners, and then iterating over it. For each listener, the SSE event is sent as a data message, as expected. If an error occurs while data is sent, it’s handled in the exception handler, where the error is logged and the listener is removed from the original list (it’s assumed the listener’s connection is closed).
Running the Healthcare SSE Demo Application
To start the demo, click the “START” link in the upper right of the UI toolbar, next to the battery icon. The text’s onclick function calls the startSimulation function, which sends an HTTP POST message to the Java Servlet (see Listing 14).
function startSimulation() { var request = new XMLHttpRequest(); request.open(‘POST’, ‘sse?SIM=start’, true); request.setRequestHeader(‘X-Requested-With’, ‘XMLHttpRequest’); request.send(null); }
Listing 14 – JavaScript code for a POST request to start the demo
When the demo is started, the numbers for the healthcare devices will continually update (see Figure 2) with simulated values.
Figure 2 – When started, the demo updates the simulated healthcare device readings
Next, let’s look at a more general SSE implementation, which you can use to send messages of any type.
Publish-Subscribe and Queue Messaging with SSE
The next example implements publish-and-subscribe (or topic-based) messaging, as well as point-to-point (or queue-based) messaging. You can read more about these forms of messaging here, but in summary:
- Topic-based (publish-and-subscribe): multiple listeners “subscribe” to a topic; senders “publish” to a topic. All active subscribers receive all published messages while subscribed. Messages are not saved or delivered to future subscribers.
- Queue-based (point-to-point): multiple listeners listen to a queue; senders place messages in the queue. Only one listener (out of potentially many) receives a message, which is then removed from the queue. Messages are saved and delivered later if no receivers are listening on the queue when a message is sent.
First, two Java Servlets are implemented: one for queue-based messaging named QueueServlet, and one for publish-and-subscribe named TopicServlet. Next, the base class, Destination, is defined (see Listing 15) to contain the data relevant to both queues and topics.
public class Destination { String name; final Vector listeners = new Vector<>(); }
Listing 15 – The Destination base class
TopicServlet contains a class Topic that extends Destination, merely adding a constructor with no variables. QueueServlet, however, contains a class Queue (see Listing 16) that extends Destination with data relevant to queue-based messaging, such as a data structure to hold undelivered messages in FIFO order.
class Queue extends Destination { int listenerCount = 0; final ConcurrentLinkedQueue messageQ = new ConcurrentLinkedQueue(); Runnable runnable; public Queue(String name) { this.name = name; } private Queue() { } }
Listing 16 – The Queue Destination class
Both Servlets share some common code (in base class Messenger). This includes SSE connection heartbeat code, code to send SSE event messages, subscriber management code, and more (see Listing 17).
public abstract class Messenger extends HttpServlet { static final int HEARTBEAT_INTERVAL = 5000; // milliseconds @Resource private ManagedExecutorService managedExecutorService; Logger logger = null; final ConcurrentHashMapdestinations = new ConcurrentHashMap<>(); protected Runnable _heartbeat(AsyncContext ac) { ac.setTimeout(0); Runnable runnable = new Runnable() { @Override public void run() { try { // Continually send heartbeats or connection will time out // while ( true ) { out.write(“: \n\n”); if ( out.checkError() ) { // checkError calls flush() System.out.println(“Subscriber error”); break; } try { Thread.sleep(HEARTBEAT_INTERVAL); } catch ( Exception e ) { } } } catch ( Exception e1 ) { } removeListener(ac); ac.complete(); } }; managedExecutorService.submit(runnable); return runnable; } protected boolean sendUpdate(String data, Vector listeners) { if ( listeners != null ) { // Clone the list of listeners for safety Vector toSend = null; synchronized ( listeners ) { toSend = (Vector)listeners.clone(); } // Send update to each listnner of this destination for ( AsyncContext ac: toSend ) { try { sendSSEMessage( ac.getResponse().getWriter(), data ); return true; } catch( Exception e ) { logger.log(Level.WARNING, “Listener error: ” + e.toString() + “. Removing from list” ); removeListener(ac); } } } return false; } protected boolean sendSSEMessage(PrintWriter out, String data) { out.write(“event: message\n”); out.write(“data: ” + data + “\n\n”); return ! out.checkError(); // no need to call flush() } protected void removeListener(AsyncContext ac) { … } }
Listing 17 – The SSE Message Servlet base class
This SSE implementation makes use of asynchronous Servlet contexts. This allows the Servlet to return when a request is made. As a result, heartbeat processing is performed in a Runnable handled by a thread pool. The code in method _heartbeat creates a Runnable per caller, provides its reference to a ManagedExectutorService object, and returns. This Java Executor handles the mechanics of the thread pool. The sendUpdate and sendSSEMessage methods work together to send messages as SSE event messages to the subscribers for the appropriate topic or queue.
Topic-based SSE Messaging Implementation
The TopicServlet implementation is straightforward, with only three methods of its own (recall that it inherits from the Messenger base class described above). First, the processRequest method (see Listing 18) allows HTTP-based callers to either publish or subscribe to a topic by name.
protected void processRequest(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { response.setContentType(“text/html;charset=UTF-8”); // Check for publish and subscribe requsts String publisher = request.getParameter(“publish”); String subscriber = request.getParameter(“subscribe”); String topicName = request.getParameter(“name”); if ( subscriber != null ) { onSubscribe(topicName, request, response); } else if ( publisher != null ) { String data = request.getParameter(“data”); onPublish(topicName, data, request, response); } else { try (PrintWriter out = response.getWriter()) { // output some informational HTML here… } } }
Listing 18 – The TopicServlet code to process HTTP requests
To subscribe, the HTTP parameter “subscribe” is specified with a topic name:
http://
To publish, the URL will look similar, but with the actual data (as text):
http://
A wider range of data types can be sent using HTTP POSTs. The code to handle subscriptions (see Listing 19) is similar to earlier examples on SSE processing.
private void onSubscribe( String topicName, HttpServletRequest request, HttpServletResponse response) { try { if ( topicName == null || topicName.length() == 0 ) { response.getWriter().println(“Error: no topic name provided”); return; } // Set content type response.setContentType(“text/event-stream”); response.setCharacterEncoding(“UTF-8”); response.setHeader(“Connection”, “keep-alive”); response.setHeader(“Cache-Control”, “no-cache”); // Store to send updates final AsyncContext ac; synchronized ( destinations ) { Topic topic = (Topic)destinations.get(topicName); if ( topic == null ) { topic = new Topic(topicName); destinations.put(topicName, topic); } // Add new subscriber for this topic ac = request.startAsync(request, response); topic.listeners.add(ac); } _heartbeat(ac); } catch ( Exception e ) { e.printStackTrace(); } }
Listing 19 – Topic subscriber handling code
First, the SSE content type, encoding and header data are set. Next, the listener to the specified Topic object, which is created if it’s a new topic name. Finally, the asynchronous Servlet Context processing—and SSE heartbeats—is started via calls to request.startAsynch and _heartbeat, respectively.
Message publishing is handled in onPublish—shown in Listing 20—which (aside from error checking) simply calls the base class’ sendUpdate method.
private void onPublish( String topicName, String data, HttpServletRequest request, HttpServletResponse response) { try { PrintWriter out = response.getWriter(); … Topic topic = (Topic)destinations.get(topicName); if ( topic != null ) { sendUpdate(data, topic.listeners); } } catch ( Exception e ) { e.printStackTrace(); } }
Listing 20 – Publishing an SSE message to a topic
Next, let’s look at queue-based messaging, which is a little more involved.
Queue-based SSE Messaging Implementation
The QueueServlet implementation is similar to TopicServlet, except that it needs to store messages for future delivery when there are no active listeners for a queue. It also needs to ensure only one listener receives each message when there are multiple listeners for the same queue. The processRequest method (see Listing 21), where HTTP Servlet requests are handled, is similar to the publish-and-subscribe implementation.
protected void processRequest(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { response.setContentType(“text/html;charset=UTF-8”); // Check for send and listen requests String sender = request.getParameter(“send”); String listener = request.getParameter(“listen”); String queueName = request.getParameter(“name”); if ( listener != null ) { onListen(queueName, request, response); } else if ( sender != null ) { String data = request.getParameter(“data”); onSend(queueName, data, request, response); } else { // output informational HTML… } }
Listing 21 – The QueueServlet HTTP request handler
The onSend method differs significantly from publish-and-subscribe messaging (see Listing 22).
private void onSend(String queueName, String data, HttpServletRequest request, HttpServletResponse response) { try { PrintWriter out = response.getWriter(); … Queue queue = getQueue(queueName); // First, attempt delivery boolean delivered = sendUpdate(data, queue.listeners); if ( ! delivered ) { // Message not delivered, enqueue message queue.messageQ.add(data); } } catch ( Exception e ) { e.printStackTrace(); } }
Listing 22 – QueueServlet message delivery
First, message delivery is attempted (the details on this are described below). However, unlike with TopicServlet, if there are no current listeners, the message is placed in an internal FIFO queue for future delivery. This internal queue is serviced when new listeners connect, and during heartbeat processing for those listeners after they connect.
Next, let’s look at the differences in the sendUpdate method (see Listing 23), which is overridden from the Messenger base class implementation.
@Override protected boolean sendUpdate(String data, Vector listeners) { // Find one listener to send the message to: // -Temporarily remove the listener from the list of listeners // -Remove the message from the internal list of messages // -Deliver message // -If successful, replace listener in list // -If failure, remove listener and replace message in list // AsyncContext listenerAC = null; try { if ( listeners != null ) { synchronized ( listeners ) { listenerAC = listeners.remove(0); } } } catch ( Exception e ) { } if ( listenerAC == null ) { return false; // No listeners } try { // Attempt message delivery boolean delivered = sendSSEMessage( listenerAC.getResponse().getWriter(), data ); if ( delivered ) { // Replace listener at head of queue synchronized ( listeners ) { listeners.add(0, listenerAC); } return true; } else { return false; } } catch( Exception e ) { // Bad listener. Remove from original list and move on // logger.log(Level.WARNING, “Listener error: ” + e.toString() + “. Removing from list” ); removeListener(listenerAC); return sendUpdate(data, listeners); } }
Listing 23 – The QueueServlet’s overridden sendUpdate implementation
Recall that each queued message must be delivered to, at most, one listener. To guarantee this, and to allow parallel message delivery processing of the remainder of the queued messages to other listeners, the first listener for this queue is removed from the Queue object’s internal list. Next, the message is attempted to be delivered to this listener. If successful, the listener is placed back into the Queue object’s internal list of listeners, and the method returns true.
If message delivery fails, this listener is not added back into the list, and the sendUpdate method calls itself recursively. This call will perform the same processing using the updated list, and will return true if the message is delivered successfully, or false otherwise (leaving the message in the queue for future delivery).
The Messenger base class defines an empty method, processDestination, which gets called periodically during heartbeat processing. In the TopicServlet implementation, this method isn’t needed. QueueServlet, however, overrides it to check the applicable queue to see if there are undelivered messages. If there are, then message delivery is resumed for that queue. As a result, periodic heartbeat processing also serves as a message delivery pump.
Next, let take a look at the topic and queue-based messaging in action.
SSE Java Clients
The code for the publisher, whether to a topic or a queue, is nearly identical. The only difference is in the URL. For instance, this URL is used to publish to a topic:
http://
The following URL is used to send a message to a queue:
http://
The code to send a message to either a queue or topic is shown in Listing 24.
public void sendMessage(String serverURL, String data) { try { // Construct the request data = URLEncoder.encode(data, “UTF-8”); serverURL += “&data=” + data; // Send the request URL url= new URL(serverURL); URI uri = new URI( url.getProtocol(), url.getUserInfo(), url.getHost(), url.getPort(), url.getPath(), url.getQuery(), url.getRef() ); url = new URL( uri.toASCIIString() ); URLConnection conn = url.openConnection(); conn.setDoOutput(true); BufferedReader rd = new BufferedReader( new InputStreamReader( conn.getInputStream())); rd.close(); } catch (Exception e) { e.printStackTrace(); } }
Listing 24 – Sending an SSE message to a queue or topic
First, the data is URL encoded since it’s included as an HTTP request parameter. Next, a connection is made to the Servlet (HTTP server). Opening the connection and reading from the InputStream triggers the request to complete, and the message is sent to the server.
Listening for data, either on a queue or topic, is shown in Listing 25.
public void listen(String serverURl) { try { String data = URLEncoder.encode(“”, “UTF-8”); URL url = new URL(serverURl); URLConnection conn = url.openConnection(); conn.setDoOutput(true); conn.setConnectTimeout(0); BufferedReader rd = new BufferedReader( new InputStreamReader( conn.getInputStream())); // Begin InputReader loop waiting for messages String line; while ((line = rd.readLine()) != null) { if ( line == null || line.length() <= 0 ) continue; // Did we get a heartbeat or useful data? if ( line.startsWith(“data:”) ) { System.out.println(line.substring(“data:”.length())); } } rd.close(); } catch (Exception e) { e.printStackTrace(); } }
Listing 25 - Java code to listen for an SSE message
First, an HTTP connection is made to the Servlet. Next, an InputReader is opened on the connection and a loop is entered, waiting for data to arrive from the InputReader. When it does, the BufferedReader object ensures we get complete lines of text (data). This is convenient since each SSE message ends in a line break.
You can build publishers and subscribers in any language. For instance, the following HTML5 page (see Listing 26) contains JavaScript code that subscribes to a queue named “q1”. The Java code from above can be used to send messages to that queue, or another JavaScript client, or one written in Python, and so on. HTML5 SSE messaging is language and platform independent.
SSE Test Page
Listing 26 – HTML5 JavaScript code that subscribes to an SSE queue
You can download the code for QueueServlet, TopicServlet, both Java clients, and the JavaScript sample here.
Conclusion: HTML5 Server Sent Events SSE Rocks!
As shown throughout this article, HTML5 Server Sent Events (SSE) is versatile, powerful, and easy to use for firewall friendly, reliable distributed software message delivery. It works well across platforms and languages, and doesn’t require much to set up.
Hopefully the code included in this article serves as a good starting point for your own projects. The SSE based queue and topic Servlets are usable as-is, but to be truly reliable, you’ll need to add some sort of persistence to queue-based messages to ensure they’re not lost if the server stops running.