Skip to main content
Aria Knowledge Central

Data Feed Example Code

Data Feed Examples

Following are instructions for using the attached .zip file for listening to the Data Stream. In addition, sample code from 4 Java classes is included both in this article and in the .zip file (SseExampleMain, SseInputs, SseListener, and AuthFeature). Also included in the .zip file is a maven file (pom.xml) that pulls in dependencies and builds the jar.

Download the .zip: java-sse-client.zip

Below the Data Stream listening code examples are four other Data Stream code examples such as Authenticator and Event Handlers. Click on each ► below to expand the sample code within.

Building 

The classes can be built / jarred with the maven pom file.
mvn clean package will build a simple jar (without dependent libraries)
mvn clean package assembly:single will build an “uber” jar containing all dependent libraries in a single jar

Invoking 

The SseExampleMain is the target Main class to invoke.
The easiest way to invoke this sample is with an uber jar packaging:

java {vm arguments} -jar sse-client-java-1.0-SNAPSHOT-jar-with-dependencies.jar

Invocation Arguments 

The invocation uses the following vm arguments described below:

-DExample.AuthServiceUrl The url to the auth endpoint - mandatory
-DExample.ClientId The client's id (aka client no) - mandatory
-DExample.ClientSecret The client's auth key - mandatory
-DExample.SseUrl The url to the sse endpoint (either the load stream or change stream) - mandatory
-DExample.LastEventId The last event id to resume streaming from - optional
-DExample.From The time to resume streaming from - optional and will be ignored if last event id is provided

 

► SseExampleMain.java
package com.ariasystems.dataextract.examples.sse;

import org.apache.commons.lang3.StringUtils;

/**
 * An example app that creates an authorizer, creates an SSE listener and runs
 * that listener with an event handler that simply prints out each event.
 */
public class SseExampleMain {
    
    public static void main(String[] args) {
        String authServiceUrl = System.getProperty("Example.AuthServiceUrl");
        String clientId = System.getProperty("Example.ClientId");
        String clientSecret = System.getProperty("Example.ClientSecret");
        String sseUrl = System.getProperty("Example.SseUrl");
        String lastEventId = System.getProperty("Example.LastEventId");
        String from = System.getProperty("Example.From");
        System.out.println("client -> "+clientId);
        System.out.println("auth: sseUrl -> "+authServiceUrl);
        System.out.println("calling: sseUrl -> "+sseUrl);
           System.out.println("initializing app");
        System.out.println("validating inputs");
        
        //validate inputs
        try {
            validateNotBlank(authServiceUrl,"authServiceUrl");
            validateNotBlank(sseUrl,"sseUrl");
            validateNotBlank(clientId,"clientId");
            validateNotBlank(clientSecret,"clientSecret");
            validateNotBlank(authServiceUrl,"authServiceUrl");
            
            //assert that the sseUrl is clean of query params
            if(StringUtils.contains(sseUrl, "?")) {
                throw new IllegalArgumentException("sseUrl cannot contain query params in its input string. Please provide these as separate params");
            }  
        } catch (Exception e) {
            e.printStackTrace();
            printUsage();
        }        
        try {
            SseListener.initialize(authServiceUrl,clientId,clientSecret,sseUrl,lastEventId,from);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }    
    
    private static void validateNotBlank(String param, String paramName) {
        if(StringUtils.isBlank(param)) {
            throw new IllegalArgumentException("param: " + paramName + " cannot be blank");    
        }        
    }
    
    private static void printUsage(){
        System.err.println();
        System.err.println();
        System.err.println();
        System.err.println("USAGE: ");
        System.err.println("===========================================");
        System.err.println("-DExample.AuthServiceUrl={the url to the auth endpoint} - mandatory");
        System.err.println("-DExample.ClientId={the client's id} - mandatory");
        System.err.println("-DExample.ClientSecret={the client's auth key} - mandatory");
        System.err.println("-DExample.SseUrl={url to the sse endpoint (either the load stream or change stream)} - mandatory");
        System.err.println("-DExample.LastEventId={the last event id to resume streaming from} - optional");
        System.err.println("-DExample.From={the time to resume streaming from} - optional and will be ignored if last event id is provided");
    }
}
 
► SseInputs.java
package com.ariasystems.dataextract.examples.sse;

import org.apache.commons.lang3.StringUtils;

/**
 * A singleton that holds input state that can be used for recovering lost connections. 
 * the last event id can be updated with the event ids in the latest data collected
 * @author anorman
 *
 */
public class SseInputs {
    public static SseInputs Instance = new SseInputs();
    private String authServiceUrl;
    private String sseUrl;    
    private String lastEventId;
    private String from;
    private String clientId;
    private String clientSecret;
    private String originalLastEventId;

 
  //private constructor / Singleton pattern
  private SseInputs(){
    authServiceUrl = null;
    sseUrl = null;    
    lastEventId = null;
    from = null;
    clientId = null;
    clientSecret = null;
    originalLastEventId = null;
  }
  
  public String getAuthServiceUrl() {
        return authServiceUrl;
    }

    public void setAuthServiceUrl(String authServiceUrl) {
        this.authServiceUrl = authServiceUrl;
    }

    public String getSseUrl() {
        return sseUrl;
    }

    public void setSseUrl(String sseUrl) {
        this.sseUrl = sseUrl;
    }

    public String getLastEventId() {
        return lastEventId;
    }

    public void setLastEventId(String eventId) {
        if (StringUtils.isNotBlank(eventId)){
            synchronized(this) {
                this.lastEventId = eventId;
            }
        }    
    }

    public String getFrom() {
        return from;
    }

    public void setFrom(String from) {
        this.from = from;
    }

    public String getClientId() {
        return clientId;
    }

    public void setClientId(String clientId) {
        this.clientId = clientId;
    }

    public String getClientSecret() {
        return clientSecret;
    }

    public void setClientSecret(String clientSecret) {
        this.clientSecret = clientSecret;
    }

    public String getOriginalLastEventId() {
        return originalLastEventId;
    }

    public void setOriginalLastEventId(String originalLastEventId) {
        this.originalLastEventId = originalLastEventId;
    }
    
    public boolean hasCollectedEvents() {
        return (lastEventId != null && originalLastEventId != lastEventId);
    }
}
 
► SseListener.java
package com.ariasystems.dataextract.examples.sse;

import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import jakarta.ws.rs.client.*;
import jakarta.ws.rs.core.Feature;
import jakarta.ws.rs.sse.*;
import org.apache.commons.lang3.StringUtils;

/**
 * A simple wrapper for SseEventSource that does basic construction and execution.
 */
public class SseListener {
    public class OnComplete implements Runnable{
        @Override public void run(){
            SseInputs inputs = SseInputs.Instance;
            if (isOpen()) {
                System.err.println("completion detected, closing connection...");
                close(5, TimeUnit.SECONDS);
            }
            System.err.println("server ended transmission outside of the event SSE handling. ");
            String lastEventId = inputs.getLastEventId();
            System.err.println("last successful EventId: " + lastEventId);
            System.err.println("");
            if (inputs.hasCollectedEvents()) {
                System.err.println("restarting from last event id processed");
                start();
            }
            else {
                System.err.println("no events processed since the original startup, verify the input parameters and target urls and restart");            
            }
        }
    }
    
    private SseEventSource eventSource;
    
    private static SseInputs sseInputs = SseInputs.Instance;
    
    /**
     * @param targetUrl URL of a Server-Sent Events source
     * @param authorizer producer of a JAX-RS "feature" that authorizes client requests
     */
    public SseListener(String targetUrl, Feature authorizer) {

        Client sseClient = HttpClientHelper.client(targetUrl);
        sseClient.register( authorizer );
        WebTarget target = sseClient.target(targetUrl);

        eventSource = SseEventSource
            .target(target)
            .reconnectingEvery(10, TimeUnit.SECONDS)
            .build();
    }
       
    static void initialize(
            String authServiceUrl, 
            String clientId,
            String clientSecret,
            String sseUrl, 
            String lastEventId, 
            String from){
        
        sseInputs.setAuthServiceUrl(authServiceUrl);
        if(StringUtils.isBlank(sseUrl)) {
            throw new IllegalArgumentException("sseUrl cannot be blank");    
        }
        sseInputs.setClientId(clientId);
        sseInputs.setClientSecret(clientSecret);

        sseInputs.setSseUrl(sseUrl);
        //if last event id is set, then ignore the from value as the lastEventId should be honored
        if (StringUtils.isBlank(lastEventId)) {
            System.out.println("setting from: " + from);
            sseInputs.setFrom(from);
        }
        else {
            System.out.println("setting lastEventId: " + lastEventId);
            sseInputs.setLastEventId(lastEventId);
            sseInputs.setOriginalLastEventId(lastEventId);
        }
        start();
    }
    
    static private void start() {
        try {    
            System.out.println("(re)starting...");
            System.out.println("constructing, invoking auth");
            Feature authFeature = new AuthFeature(sseInputs.getAuthServiceUrl(), sseInputs.getClientId(), sseInputs.getClientSecret()).build();
    
            String ssePath = constructSsePath();
            System.out.println("constructing SSE listener");
            SseListener sseListener = new SseListener(ssePath, authFeature);
    
            System.out.println("invoking SSE listener");
            sseListener.run((sseEvent) -> showEvent(sseEvent));
       }
       catch(Exception e){
           System.err.println("received error during startup, retrying after 5 seconds delay: " + e);
           try {
               Thread.sleep(5000);
               start();
           }
           catch (InterruptedException ie){
               System.err.println("retry sleep interrupted, aborting retry: " + ie);
               throw new RuntimeException(ie);
           }
       }
    }

    static private String constructSsePath() {
        StringBuilder sb = new StringBuilder(sseInputs.getSseUrl());
        sb.append("?");
        //we only construct the url with the from value if the lastEventId is blank
        if(StringUtils.isBlank(sseInputs.getLastEventId())) {
          if (StringUtils.isNotBlank(sseInputs.getFrom())){
              sb.append("from=").append(sseInputs.getFrom());
          }
        }
        else {
          sb.append("lastEventId=").append(sseInputs.getLastEventId());
        }
        String path = sb.toString();
        System.out.println("constructing path: " + path);
        return path;
    }
    
    static void showEvent(InboundSseEvent sseEvent) {
        // **** YOUR CODE HERE ****
        // This is a very simple example of what you might do.
        // Ideally this would contain logic that quickly writes to a file, queue, or kafka topic that can be processed by a downstream system 

        if (sseEvent.getName() != null) {
            System.out.println("----- EVENT --------------------------------");
            if(sseEvent.getId() != null) {
                SseInputs.Instance.setLastEventId(sseEvent.getId());
            }
            
            System.out.println("id:    " + sseEvent.getId());
            System.out.println("event: " + sseEvent.getName());
            

            String datas = sseEvent.readData();
            for (String data : datas.split("\n")) {
                System.out.println("data:  " + data);
            }
            System.out.println("raw:  " + sseEvent);            
        }
    }
    
    /**
     * @param eventHandler executed for every received SSE event
     */
    public void run(Consumer<InboundSseEvent> eventHandler) {
        try{
            eventSource.register(eventHandler, (e) -> failHandler(e), new OnComplete());
            eventSource.open();
          }
          catch(Exception e){
              System.err.println("fatal exception starting event source: " + e);
              e.printStackTrace();
          }
    }

    /** Close this listener by closing the event source. */
    public boolean close(long timeout, TimeUnit unit) {
        return eventSource.close(timeout, unit);
    }

    /** Is this event source open? */
    public boolean isOpen() {
        return eventSource.isOpen();
    }

    /** Invoked upon a unrecoverable error encountered by a SseEventSource */
    void failHandler(Throwable e) {
        //failover logic goes here
        System.err.println("received unrecoverable error: " + e);
        e.printStackTrace();
        System.err.println("last successful EventId: " + SseInputs.Instance.getLastEventId());
        
        start();
    }
}

 
► HttpClientHelper.java
package com.ariasystems.dataextract.examples.sse;

import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;

import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSession;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;

import org.apache.commons.lang3.StringUtils;

import jakarta.ws.rs.client.Client;
import jakarta.ws.rs.client.ClientBuilder;

/**
 * a helper that configures the http client used for calls over the wire
 * @author anorman
 *
 */
public class HttpClientHelper {

    /**
     * sets up the http or https connections update this logic with your actual
     * Https trust management and valid hosts here
     */
    public static Client client(String target) {
        try {
            ClientBuilder builder = ClientBuilder.newBuilder();
               System.out.println("setting up http client for target: " + target);
                if (StringUtils.startsWith(target, "https")) {
                   System.out.println("detecting https for target: " + target);

                    SSLContext context = SSLContext.getInstance("TLSv1.2");
                System.setProperty("https.protocols", "TLSv1.2");
                // TODO: use a real trustmanager for a live implementation. This demo is
                // configured to accept all certs!
                TrustManager[] trustAllCerts = { 
                    new X509TrustManager() {
                        @Override
                        public void checkClientTrusted(X509Certificate[] chain, String authType) {
                            // Everyone is trusted!
                        }
    
                        @Override
                        public void checkServerTrusted(final X509Certificate[] chain, final String authType)
                                throws CertificateException {
                            // Everyone is trusted!
                        }
    
                        @Override
                        public X509Certificate[] getAcceptedIssuers() {
                            return new X509Certificate[0];
                        }
                    } 
                };
                context.init(null, trustAllCerts, new java.security.SecureRandom());

                // TODO: use a real HostnameVerifier for a live implementation. This demo is
                // configured to accept all hosts!
                HostnameVerifier allHostsValid = new HostnameVerifier() {

                    @Override
                    public boolean verify(String hostname, SSLSession session) {
                        return true;
                    }
                };

                builder.sslContext(context);

                // Install the all-trusting host verifier
                HttpsURLConnection.setDefaultHostnameVerifier(allHostsValid);
            }
            return builder.build();
        } catch (

        Exception e) {
            throw new RuntimeException("issue setting up client for target url: " + target, e);
        }

    }

}
 
► AuthFeature.java
package com.ariasystems.dataextract.examples.sse;

import com.auth0.jwt.JWT;
import com.auth0.jwt.algorithms.Algorithm;

import org.apache.commons.lang3.StringUtils;
import org.glassfish.jersey.client.oauth2.OAuth2ClientSupport;

import jakarta.ws.rs.Produces;
import jakarta.ws.rs.client.*;
import jakarta.ws.rs.core.*;
import java.util.Date;

/** Produces a JAX-RS "feature" that, when registered with a client, causes client requests to be extended with
 *  an HTTP Authorization header whose access token comes from querying an Aria authentication service.
 *  <pre>
 *         Feature authFeature = new AuthFeature(url, clientId, secret).build();
 *         Client sseClient = ClientBuilder.newClient();
 *         sseClient.register( authFeature );
 *  </pre>
 */
public class AuthFeature {

    public String getAuthSvcUrl() {
        return authSvcUrl;
    }

    public void setAuthSvcUrl(String authSvcUrl) {
        this.authSvcUrl = authSvcUrl;
    }

    public String getClientId() {
        return clientId;
    }

    public void setClientId(String clientId) {
        this.clientId = clientId;
    }

    public String getClientSecret() {
        return clientSecret;
    }

    public void setClientSecret(String clientSecret) {
        this.clientSecret = clientSecret;
    }

    private String authSvcUrl;
    private String clientId;
    private String clientSecret;

    /**
     * @param authSvcUrl  url of an Aria authorization service
     * @param clientId  Aria client id
     * @param clientSecret  secret for the Aria client
     */
    public AuthFeature(String authSvcUrl, String clientId, String clientSecret) {
        this.authSvcUrl = authSvcUrl;
        this.clientId = clientId;
        this.clientSecret = clientSecret;
    }

    /**
     * @return a feature to be registered with a JAX-RS client
     */
    public Feature build() {
        return OAuth2ClientSupport.feature(getAriaAccessToken());
    }

    /**
     * This is the method that makes the OAuth2 request to get an access token.
     * @return a fresh access token
     */
    private String getAriaAccessToken() {

        // Create the JWT that will be the secret in the auth request
        String jwt = JWT.create()
                .withIssuedAt(new Date())
                .sign(Algorithm.HMAC256(clientSecret));

        // Construct the auth request
        Entity<?> authRequestEntity = Entity.entity(
                new Oauth2ClientCredentialsRequest(clientId, jwt),
                MediaType.APPLICATION_JSON
        );

        Client authClient = ClientBuilder.newClient();
        try {
            String path = "";
            if (!StringUtils.contains(authSvcUrl, "/oauth2/token")) {
                path = "/oauth2/token";
            }
            
            // Send the auth request
            Response authResponse = HttpClientHelper.client(authSvcUrl)
                    .target(authSvcUrl)
                    .path(path)
                    .request(MediaType.APPLICATION_JSON)
                    .post(authRequestEntity);

            // Read the response
            OAuth2Response oauthResponse = authResponse.readEntity(OAuth2Response.class);

            if (oauthResponse.error != null) {
                throw new RuntimeException("Unable to get OAuth2 token: " + oauthResponse.error_description);
            }

            return oauthResponse.access_token;
        }
        catch (Exception e){
            System.out.println("error generating auth: " + e);     
            e.printStackTrace();
            throw e;
        }
        finally {
            authClient.close();
        }
    }
    
    /** OAuth2 authentication requests of grant_type "client_credentials" */
    @Produces(MediaType.APPLICATION_JSON)
    public static class Oauth2ClientCredentialsRequest {
        public String grant_type = "client_credentials";
        public String client_id;
        public String client_secret;

        public Oauth2ClientCredentialsRequest(String clientId, String secret) {
            client_id = clientId;
            client_secret = secret;
        }
    }

    /** Responses to OAuth2 authentication requests */
    @Produces(MediaType.APPLICATION_JSON)
    public static class OAuth2Response {
        public OAuth2Response() {
            super();
        }
        public String token_type;
        public String expires_in;
        public String access_token;
        public String error;
        public String error_description;
    }
}          

 
► Data Feed Example Authenticator Code
import jwt  # to install: pip install PyJWT
import requests  # to install: pip install requests
import time
from requests.auth import AuthBase

class AriaOauth2(AuthBase):
    """
    Authenticator that retrieves access token from Aria auth service.
    There does not appear to be an off-the-shelf lib that does this at this time, a value that can be assigned
    to an http request's "auth" parameter that retrieves the access token using an Oauth2 grant type request.
    """

    def __init__(self, client_id, client_secret, oauth2_url):
        """
        :param client_id:  The Aria client's id
        :param client_secret:  The Aria client's secret
        :param oauth2_url:  The URL of the authentication service's token endpoint
        """
        self.client_id = client_id
        self.client_secret = client_secret
        self.oauth2_url = oauth2_url
        self.token = None
        self.expires = None

    def __call__(self, r):
        """
        If we don't have an access token in hand, or it has expired, get a new token from the auth service.
        Then set the access token in the request's Authorization header.
        """
        now = time.time()
        if not self.token or now > self.expires:
            self.token = self.get_fresh_token()
            self.expires = now + self.token['expires_in']

        r.headers['Authorization'] = 'Bearer ' + self.token['access_token']
        return r

    def get_fresh_token(self):
        """
        Get an authorization header that contains a valid access token for the client
        """

        # The token used to sign the grant request itself
        jwt_token = jwt.encode({'iat': int(time.time())}, self.client_secret, algorithm='HS256')

        # Make the request to the auth service to get an access token for the client
        resp = requests.post(
            self.oauth2_url,
            data = {'grant_type': 'client_credentials', 'client_id': self.client_id, 'client_secret': jwt_token},
            verify = False,
            allow_redirects = False
        )

        json_resp = resp.json()

        if 'access_token' in json_resp:
            return json_resp
        elif 'error' in json_resp:
            raise Exception("OAuth failed: %s: %s" % (json_resp['error'], json_resp.get('error_description')))
        else:
            raise Exception("OAuth failed: %s" % (str(json_resp)))

 
► Data Feed Example Command Line Arguments
import argparse

class DemoCliArgs(object):
    """
    Comm  These inputs are required to set up an SSE client stream:
        * URL of an SSE service
        * URL of an OAuth2 service (for authentication)
        * Aria client ID (for authentication)
        * Aria client secret (for authentication)
    """

    def __init__(self):
        # Define the command line argument parser for this example code
        parser = argparse.ArgumentParser()
        parser.add_argument('-e', '--sse',      help='URL of a Server-Sent Events source')
        parser.add_argument('-o', '--oauth',    help='URL of an OAuth2 token service')
        parser.add_argument('-c', '--clientid', help='Aria client ID')
        parser.add_argument('-s', '--secret',   help='Secret for the Aria client')

        # Get argument values from the command line
        args = parser.parse_args()
        self.sse_service_url = args.sse
        self.oauth2_url      = args.oauth
        self.client_id       = args.clientid
        self.client_secret   = args.secret

 
► Data Feed Example Event Handlers
# Handlers for the stream events

def handle_create(data):
    """The data part of a 'create' event has two lines, ref of the created entity, then its data"""
    lines = data.split("\n", 2)
    print("CREATE to: %s\ndata: %s" % (lines[0], lines[1]))

def handle_update(data):
    """The data part of a 'update' event has two lines, ref of the updated entity, then its data"""
    lines = data.split("\n", 2)
    print("UPDATE to: %s\ndata: %s" % (lines[0], lines[1]))

def handle_delete(data):
    """The data part of a 'delete' event has one line, ref of the deleted entity"""
    print("DELETE %s" % (data))

def handle_heartbeat(data):
    """These messages appear periodically just to keep the stream connection alive."""
    print("(beat)")


class SseHandlers(object):
    def __init__(self):
        # Map event type to handler
        self.event_handlers = {
            "create": handle_create,
            "update": handle_update,
            "delete": handle_delete,
            "message": handle_heartbeat
        }

    def handleMsg(self, msg):
        # Get the handler for the event type.  Call that handler with the event's data
        # event_handlers.get(msg.event)(msg.data)
        self.event_handlers.get(msg.event)(msg.data)

 
► Data Feed Example Store For Last Event ID
class StreamStatusStore(object):
    """
    A dummy store for a stream's last event id.

    The purpose of this is to save the stream's last event id so that in the event that this app crashes,
    the app knows where to restart.  A real implementation should periodically write the value to some kind of
    persistent store, such as a file system, memcache or database.  On restart the implementation should reread
    the value.
    """

    def __init__(self):
        self.saved_last_id = None

    def save_last_id(self, last_id):
        self.saved_last_id = last_id

    def get_last_id(self):
        return self.saved_last_id
  • Was this article helpful?