Data Feed Example Code

Data Feed Examples

Following are instructions for using the attached .zip file for listening to the Data Stream. In addition, sample code from 4 Java classes is included both in this article and in the .zip file (SseExampleMain, SseInputs, SseListener, and AuthFeature). Also included in the .zip file is a maven file (pom.xml) that pulls in dependencies and builds the jar.

Download the .zip:

Below the Data Stream listening code examples are four other Data Stream code examples such as Authenticator and Event Handlers. Click on each ► below to expand the sample code within.


The classes can be built / jarred with the maven pom file.
mvn clean package will build a simple jar (without dependent libraries)
mvn clean package assembly:single will build an “uber” jar containing all dependent libraries in a single jar


The SseExampleMain is the target Main class to invoke.
The easiest way to invoke this sample is with an uber jar packaging:

java {vm arguments} -jar sse-client-java-1.0-SNAPSHOT-jar-with-dependencies.jar

Invocation Arguments 

The invocation uses the following vm arguments described below:

-DExample.AuthServiceUrl The url to the auth endpoint - mandatory
-DExample.ClientId The client's id (aka client no) - mandatory
-DExample.ClientSecret The client's auth key - mandatory
-DExample.SseUrl The url to the sse endpoint (either the load stream or change stream) - mandatory
-DExample.LastEventId The last event id to resume streaming from - optional
-DExample.From The time to resume streaming from - optional and will be ignored if last event id is provided


package com.ariasystems.dataextract.examples.sse;

import org.apache.commons.lang3.StringUtils;

 * An example app that creates an authorizer, creates an SSE listener and runs
 * that listener with an event handler that simply prints out each event.
public class SseExampleMain {
    public static void main(String[] args) {
        String authServiceUrl = System.getProperty("Example.AuthServiceUrl");
        String clientId = System.getProperty("Example.ClientId");
        String clientSecret = System.getProperty("Example.ClientSecret");
        String sseUrl = System.getProperty("Example.SseUrl");
        String lastEventId = System.getProperty("Example.LastEventId");
        String from = System.getProperty("Example.From");
        System.out.println("client -> "+clientId);
        System.out.println("auth: sseUrl -> "+authServiceUrl);
        System.out.println("calling: sseUrl -> "+sseUrl);
           System.out.println("initializing app");
        System.out.println("validating inputs");
        //validate inputs
        try {
            //assert that the sseUrl is clean of query params
            if(StringUtils.contains(sseUrl, "?")) {
                throw new IllegalArgumentException("sseUrl cannot contain query params in its input string. Please provide these as separate params");
        } catch (Exception e) {
        try {
        } catch (Exception e) {
    private static void validateNotBlank(String param, String paramName) {
        if(StringUtils.isBlank(param)) {
            throw new IllegalArgumentException("param: " + paramName + " cannot be blank");    
    private static void printUsage(){
        System.err.println("USAGE: ");
        System.err.println("-DExample.AuthServiceUrl={the url to the auth endpoint} - mandatory");
        System.err.println("-DExample.ClientId={the client's id} - mandatory");
        System.err.println("-DExample.ClientSecret={the client's auth key} - mandatory");
        System.err.println("-DExample.SseUrl={url to the sse endpoint (either the load stream or change stream)} - mandatory");
        System.err.println("-DExample.LastEventId={the last event id to resume streaming from} - optional");
        System.err.println("-DExample.From={the time to resume streaming from} - optional and will be ignored if last event id is provided");
package com.ariasystems.dataextract.examples.sse;

import org.apache.commons.lang3.StringUtils;

 * A singleton that holds input state that can be used for recovering lost connections. 
 * the last event id can be updated with the event ids in the latest data collected
 * @author anorman
public class SseInputs {
    public static SseInputs Instance = new SseInputs();
    private String authServiceUrl;
    private String sseUrl;    
    private String lastEventId;
    private String from;
    private String clientId;
    private String clientSecret;
    private String originalLastEventId;

  //private constructor / Singleton pattern
  private SseInputs(){
    authServiceUrl = null;
    sseUrl = null;    
    lastEventId = null;
    from = null;
    clientId = null;
    clientSecret = null;
    originalLastEventId = null;
  public String getAuthServiceUrl() {
        return authServiceUrl;

    public void setAuthServiceUrl(String authServiceUrl) {
        this.authServiceUrl = authServiceUrl;

    public String getSseUrl() {
        return sseUrl;

    public void setSseUrl(String sseUrl) {
        this.sseUrl = sseUrl;

    public String getLastEventId() {
        return lastEventId;

    public void setLastEventId(String eventId) {
        if (StringUtils.isNotBlank(eventId)){
            synchronized(this) {
                this.lastEventId = eventId;

    public String getFrom() {
        return from;

    public void setFrom(String from) {
        this.from = from;

    public String getClientId() {
        return clientId;

    public void setClientId(String clientId) {
        this.clientId = clientId;

    public String getClientSecret() {
        return clientSecret;

    public void setClientSecret(String clientSecret) {
        this.clientSecret = clientSecret;

    public String getOriginalLastEventId() {
        return originalLastEventId;

    public void setOriginalLastEventId(String originalLastEventId) {
        this.originalLastEventId = originalLastEventId;
    public boolean hasCollectedEvents() {
        return (lastEventId != null && originalLastEventId != lastEventId);
package com.ariasystems.dataextract.examples.sse;

import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.commons.lang3.StringUtils;

 * A simple wrapper for SseEventSource that does basic construction and execution.
public class SseListener {
    public class OnComplete implements Runnable{
        @Override public void run(){
            SseInputs inputs = SseInputs.Instance;
            if (isOpen()) {
                System.err.println("completion detected, closing connection...");
                close(5, TimeUnit.SECONDS);
            System.err.println("server ended transmission outside of the event SSE handling. ");
            String lastEventId = inputs.getLastEventId();
            System.err.println("last successful EventId: " + lastEventId);
            if (inputs.hasCollectedEvents()) {
                System.err.println("restarting from last event id processed");
            else {
                System.err.println("no events processed since the original startup, verify the input parameters and target urls and restart");            
    private SseEventSource eventSource;
    private static SseInputs sseInputs = SseInputs.Instance;
     * @param targetUrl URL of a Server-Sent Events source
     * @param authorizer producer of a JAX-RS "feature" that authorizes client requests
    public SseListener(String targetUrl, Feature authorizer) {

        Client sseClient = HttpClientHelper.client(targetUrl);
        sseClient.register( authorizer );
        WebTarget target =;

        eventSource = SseEventSource
            .reconnectingEvery(10, TimeUnit.SECONDS)
    static void initialize(
            String authServiceUrl, 
            String clientId,
            String clientSecret,
            String sseUrl, 
            String lastEventId, 
            String from){
        if(StringUtils.isBlank(sseUrl)) {
            throw new IllegalArgumentException("sseUrl cannot be blank");    

        //if last event id is set, then ignore the from value as the lastEventId should be honored
        if (StringUtils.isBlank(lastEventId)) {
            System.out.println("setting from: " + from);
        else {
            System.out.println("setting lastEventId: " + lastEventId);
    static private void start() {
        try {    
            System.out.println("constructing, invoking auth");
            Feature authFeature = new AuthFeature(sseInputs.getAuthServiceUrl(), sseInputs.getClientId(), sseInputs.getClientSecret()).build();
            String ssePath = constructSsePath();
            System.out.println("constructing SSE listener");
            SseListener sseListener = new SseListener(ssePath, authFeature);
            System.out.println("invoking SSE listener");
   -> showEvent(sseEvent));
       catch(Exception e){
           System.err.println("received error during startup, retrying after 5 seconds delay: " + e);
           try {
           catch (InterruptedException ie){
               System.err.println("retry sleep interrupted, aborting retry: " + ie);
               throw new RuntimeException(ie);

    static private String constructSsePath() {
        StringBuilder sb = new StringBuilder(sseInputs.getSseUrl());
        //we only construct the url with the from value if the lastEventId is blank
        if(StringUtils.isBlank(sseInputs.getLastEventId())) {
          if (StringUtils.isNotBlank(sseInputs.getFrom())){
        else {
        String path = sb.toString();
        System.out.println("constructing path: " + path);
        return path;
    static void showEvent(InboundSseEvent sseEvent) {
        // **** YOUR CODE HERE ****
        // This is a very simple example of what you might do.
        // Ideally this would contain logic that quickly writes to a file, queue, or kafka topic that can be processed by a downstream system 

        if (sseEvent.getName() != null) {
            System.out.println("----- EVENT --------------------------------");
            if(sseEvent.getId() != null) {
            System.out.println("id:    " + sseEvent.getId());
            System.out.println("event: " + sseEvent.getName());

            String datas = sseEvent.readData();
            for (String data : datas.split("\n")) {
                System.out.println("data:  " + data);
            System.out.println("raw:  " + sseEvent);            
     * @param eventHandler executed for every received SSE event
    public void run(Consumer<InboundSseEvent> eventHandler) {
            eventSource.register(eventHandler, (e) -> failHandler(e), new OnComplete());
          catch(Exception e){
              System.err.println("fatal exception starting event source: " + e);

    /** Close this listener by closing the event source. */
    public boolean close(long timeout, TimeUnit unit) {
        return eventSource.close(timeout, unit);

    /** Is this event source open? */
    public boolean isOpen() {
        return eventSource.isOpen();

    /** Invoked upon a unrecoverable error encountered by a SseEventSource */
    void failHandler(Throwable e) {
        //failover logic goes here
        System.err.println("received unrecoverable error: " + e);
        System.err.println("last successful EventId: " + SseInputs.Instance.getLastEventId());

package com.ariasystems.dataextract.examples.sse;



import org.apache.commons.lang3.StringUtils;


 * a helper that configures the http client used for calls over the wire
 * @author anorman
public class HttpClientHelper {

     * sets up the http or https connections update this logic with your actual
     * Https trust management and valid hosts here
    public static Client client(String target) {
        try {
            ClientBuilder builder = ClientBuilder.newBuilder();
               System.out.println("setting up http client for target: " + target);
                if (StringUtils.startsWith(target, "https")) {
                   System.out.println("detecting https for target: " + target);

                    SSLContext context = SSLContext.getInstance("TLSv1.2");
                System.setProperty("https.protocols", "TLSv1.2");
                // TODO: use a real trustmanager for a live implementation. This demo is
                // configured to accept all certs!
                TrustManager[] trustAllCerts = { 
                    new X509TrustManager() {
                        public void checkClientTrusted(X509Certificate[] chain, String authType) {
                            // Everyone is trusted!
                        public void checkServerTrusted(final X509Certificate[] chain, final String authType)
                                throws CertificateException {
                            // Everyone is trusted!
                        public X509Certificate[] getAcceptedIssuers() {
                            return new X509Certificate[0];
                context.init(null, trustAllCerts, new;

                // TODO: use a real HostnameVerifier for a live implementation. This demo is
                // configured to accept all hosts!
                HostnameVerifier allHostsValid = new HostnameVerifier() {

                    public boolean verify(String hostname, SSLSession session) {
                        return true;


                // Install the all-trusting host verifier
        } catch (

        Exception e) {
            throw new RuntimeException("issue setting up client for target url: " + target, e);


package com.ariasystems.dataextract.examples.sse;

import com.auth0.jwt.JWT;
import com.auth0.jwt.algorithms.Algorithm;

import org.apache.commons.lang3.StringUtils;
import org.glassfish.jersey.client.oauth2.OAuth2ClientSupport;

import java.util.Date;

/** Produces a JAX-RS "feature" that, when registered with a client, causes client requests to be extended with
 *  an HTTP Authorization header whose access token comes from querying an Aria authentication service.
 *  <pre>
 *         Feature authFeature = new AuthFeature(url, clientId, secret).build();
 *         Client sseClient = ClientBuilder.newClient();
 *         sseClient.register( authFeature );
 *  </pre>
public class AuthFeature {

    public String getAuthSvcUrl() {
        return authSvcUrl;

    public void setAuthSvcUrl(String authSvcUrl) {
        this.authSvcUrl = authSvcUrl;

    public String getClientId() {
        return clientId;

    public void setClientId(String clientId) {
        this.clientId = clientId;

    public String getClientSecret() {
        return clientSecret;

    public void setClientSecret(String clientSecret) {
        this.clientSecret = clientSecret;

    private String authSvcUrl;
    private String clientId;
    private String clientSecret;

     * @param authSvcUrl  url of an Aria authorization service
     * @param clientId  Aria client id
     * @param clientSecret  secret for the Aria client
    public AuthFeature(String authSvcUrl, String clientId, String clientSecret) {
        this.authSvcUrl = authSvcUrl;
        this.clientId = clientId;
        this.clientSecret = clientSecret;

     * @return a feature to be registered with a JAX-RS client
    public Feature build() {
        return OAuth2ClientSupport.feature(getAriaAccessToken());

     * This is the method that makes the OAuth2 request to get an access token.
     * @return a fresh access token
    private String getAriaAccessToken() {

        // Create the JWT that will be the secret in the auth request
        String jwt = JWT.create()
                .withIssuedAt(new Date())

        // Construct the auth request
        Entity<?> authRequestEntity = Entity.entity(
                new Oauth2ClientCredentialsRequest(clientId, jwt),

        Client authClient = ClientBuilder.newClient();
        try {
            String path = "";
            if (!StringUtils.contains(authSvcUrl, "/oauth2/token")) {
                path = "/oauth2/token";
            // Send the auth request
            Response authResponse = HttpClientHelper.client(authSvcUrl)

            // Read the response
            OAuth2Response oauthResponse = authResponse.readEntity(OAuth2Response.class);

            if (oauthResponse.error != null) {
                throw new RuntimeException("Unable to get OAuth2 token: " + oauthResponse.error_description);

            return oauthResponse.access_token;
        catch (Exception e){
            System.out.println("error generating auth: " + e);     
            throw e;
        finally {
    /** OAuth2 authentication requests of grant_type "client_credentials" */
    public static class Oauth2ClientCredentialsRequest {
        public String grant_type = "client_credentials";
        public String client_id;
        public String client_secret;

        public Oauth2ClientCredentialsRequest(String clientId, String secret) {
            client_id = clientId;
            client_secret = secret;

    /** Responses to OAuth2 authentication requests */
    public static class OAuth2Response {
        public OAuth2Response() {
        public String token_type;
        public String expires_in;
        public String access_token;
        public String error;
        public String error_description;

► Data Feed Example Authenticator Code
import jwt  # to install: pip install PyJWT
import requests  # to install: pip install requests
import time
from requests.auth import AuthBase

class AriaOauth2(AuthBase):
    Authenticator that retrieves access token from Aria auth service.
    There does not appear to be an off-the-shelf lib that does this at this time, a value that can be assigned
    to an http request's "auth" parameter that retrieves the access token using an Oauth2 grant type request.

    def __init__(self, client_id, client_secret, oauth2_url):
        :param client_id:  The Aria client's id
        :param client_secret:  The Aria client's secret
        :param oauth2_url:  The URL of the authentication service's token endpoint
        self.client_id = client_id
        self.client_secret = client_secret
        self.oauth2_url = oauth2_url
        self.token = None
        self.expires = None

    def __call__(self, r):
        If we don't have an access token in hand, or it has expired, get a new token from the auth service.
        Then set the access token in the request's Authorization header.
        now = time.time()
        if not self.token or now > self.expires:
            self.token = self.get_fresh_token()
            self.expires = now + self.token['expires_in']

        r.headers['Authorization'] = 'Bearer ' + self.token['access_token']
        return r

    def get_fresh_token(self):
        Get an authorization header that contains a valid access token for the client

        # The token used to sign the grant request itself
        jwt_token = jwt.encode({'iat': int(time.time())}, self.client_secret, algorithm='HS256')

        # Make the request to the auth service to get an access token for the client
        resp =
            data = {'grant_type': 'client_credentials', 'client_id': self.client_id, 'client_secret': jwt_token},
            verify = False,
            allow_redirects = False

        json_resp = resp.json()

        if 'access_token' in json_resp:
            return json_resp
        elif 'error' in json_resp:
            raise Exception("OAuth failed: %s: %s" % (json_resp['error'], json_resp.get('error_description')))
            raise Exception("OAuth failed: %s" % (str(json_resp)))

► Data Feed Example Command Line Arguments
import argparse

class DemoCliArgs(object):
    Comm  These inputs are required to set up an SSE client stream:
        * URL of an SSE service
        * URL of an OAuth2 service (for authentication)
        * Aria client ID (for authentication)
        * Aria client secret (for authentication)

    def __init__(self):
        # Define the command line argument parser for this example code
        parser = argparse.ArgumentParser()
        parser.add_argument('-e', '--sse',      help='URL of a Server-Sent Events source')
        parser.add_argument('-o', '--oauth',    help='URL of an OAuth2 token service')
        parser.add_argument('-c', '--clientid', help='Aria client ID')
        parser.add_argument('-s', '--secret',   help='Secret for the Aria client')

        # Get argument values from the command line
        args = parser.parse_args()
        self.sse_service_url = args.sse
        self.oauth2_url      = args.oauth
        self.client_id       = args.clientid
        self.client_secret   = args.secret

► Data Feed Example Event Handlers
# Handlers for the stream events

def handle_create(data):
    """The data part of a 'create' event has two lines, ref of the created entity, then its data"""
    lines = data.split("\n", 2)
    print("CREATE to: %s\ndata: %s" % (lines[0], lines[1]))

def handle_update(data):
    """The data part of a 'update' event has two lines, ref of the updated entity, then its data"""
    lines = data.split("\n", 2)
    print("UPDATE to: %s\ndata: %s" % (lines[0], lines[1]))

def handle_delete(data):
    """The data part of a 'delete' event has one line, ref of the deleted entity"""
    print("DELETE %s" % (data))

def handle_heartbeat(data):
    """These messages appear periodically just to keep the stream connection alive."""

class SseHandlers(object):
    def __init__(self):
        # Map event type to handler
        self.event_handlers = {
            "create": handle_create,
            "update": handle_update,
            "delete": handle_delete,
            "message": handle_heartbeat

    def handleMsg(self, msg):
        # Get the handler for the event type.  Call that handler with the event's data
        # event_handlers.get(msg.event)(

► Data Feed Example Store For Last Event ID
class StreamStatusStore(object):
    A dummy store for a stream's last event id.

    The purpose of this is to save the stream's last event id so that in the event that this app crashes,
    the app knows where to restart.  A real implementation should periodically write the value to some kind of
    persistent store, such as a file system, memcache or database.  On restart the implementation should reread
    the value.

    def __init__(self):
        self.saved_last_id = None

    def save_last_id(self, last_id):
        self.saved_last_id = last_id

    def get_last_id(self):
        return self.saved_last_id
