Skip to main content
Aria Knowledge Central

Example Data Feed Code

Following are instructions for using the attached .zip file for listening to the data stream. In addition, sample code from 4 Java classes is included both in this article and in the .zip file (SseExampleMain, SseInputs, SseListener, and AuthFeature). Also included in the .zip file is a maven file (pom.xml) that pulls in dependencies and builds the jar.

Download the .zip: java-sse-client.zip

Building

The classes can be built / jarred with the maven pom file.
mvn clean package will build a simple jar (without dependent libraries)
mvn clean package assembly:single will build an “uber” jar containing all dependent libraries in a single jar

Invoking

The SseExampleMain is the target Main class to invoke.
The easiest way to invoke this sample is with an uber jar packaging:

java {vm arguments} -jar sse-client-java-1.0-SNAPSHOT-jar-with-dependencies.jar

Invocation Arguments

The invocation uses the following vm arguments described below:

-DExample.AuthServiceUrl The url to the auth endpoint - mandatory
-DExample.ClientId The client's id (aka client no) - mandatory
-DExample.ClientSecret The client's auth key - mandatory
-DExample.SseUrl The url to the sse endpoint (either the load stream or change stream) - mandatory
-DExample.LastEventId The last event id to resume streaming from - optional
-DExample.From The time to resume streaming from - optional and will be ignored if last event id is provided

SseExampleMain.java

package com.ariasystems.dataextract.examples.sse;

import org.apache.commons.lang3.StringUtils;

/**
 * An example app that creates an authorizer, creates an SSE listener and runs
 * that listener with an event handler that simply prints out each event.
 */
public class SseExampleMain {
    
    public static void main(String[] args) {
        String authServiceUrl = System.getProperty("Example.AuthServiceUrl");
        String clientId = System.getProperty("Example.ClientId");
        String clientSecret = System.getProperty("Example.ClientSecret");
        String sseUrl = System.getProperty("Example.SseUrl");
        String lastEventId = System.getProperty("Example.LastEventId");
        String from = System.getProperty("Example.From");
        System.out.println("client -> "+clientId);
        System.out.println("auth: sseUrl -> "+authServiceUrl);
        System.out.println("calling: sseUrl -> "+sseUrl);
           System.out.println("initializing app");
        System.out.println("validating inputs");
        
        //validate inputs
        try {
            validateNotBlank(authServiceUrl,"authServiceUrl");
            validateNotBlank(sseUrl,"sseUrl");
            validateNotBlank(clientId,"clientId");
            validateNotBlank(clientSecret,"clientSecret");
            validateNotBlank(authServiceUrl,"authServiceUrl");
            
            //assert that the sseUrl is clean of query params
            if(StringUtils.contains(sseUrl, "?")) {
                throw new IllegalArgumentException("sseUrl cannot contain query params in its input string. Please provide these as separate params");
            }  
        } catch (Exception e) {
            e.printStackTrace();
            printUsage();
        }        
        try {
            SseListener.initialize(authServiceUrl,clientId,clientSecret,sseUrl,lastEventId,from);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }    
    
    private static void validateNotBlank(String param, String paramName) {
        if(StringUtils.isBlank(param)) {
            throw new IllegalArgumentException("param: " + paramName + " cannot be blank");    
        }        
    }
    
    private static void printUsage(){
        System.err.println();
        System.err.println();
        System.err.println();
        System.err.println("USAGE: ");
        System.err.println("===========================================");
        System.err.println("-DExample.AuthServiceUrl={the url to the auth endpoint} - mandatory");
        System.err.println("-DExample.ClientId={the client's id} - mandatory");
        System.err.println("-DExample.ClientSecret={the client's auth key} - mandatory");
        System.err.println("-DExample.SseUrl={url to the sse endpoint (either the load stream or change stream)} - mandatory");
        System.err.println("-DExample.LastEventId={the last event id to resume streaming from} - optional");
        System.err.println("-DExample.From={the time to resume streaming from} - optional and will be ignored if last event id is provided");
    }
}

SseInputs.java

package com.ariasystems.dataextract.examples.sse;

import org.apache.commons.lang3.StringUtils;

/**
 * A singleton that holds input state that can be used for recovering lost connections. 
 * the last event id can be updated with the event ids in the latest data collected
 * @author anorman
 *
 */
public class SseInputs {
    public static SseInputs Instance = new SseInputs();
    private String authServiceUrl;
    private String sseUrl;    
    private String lastEventId;
    private String from;
    private String clientId;
    private String clientSecret;
    private String originalLastEventId;

 
  //private constructor / Singleton pattern
  private SseInputs(){
    authServiceUrl = null;
    sseUrl = null;    
    lastEventId = null;
    from = null;
    clientId = null;
    clientSecret = null;
    originalLastEventId = null;
  }
  
  public String getAuthServiceUrl() {
        return authServiceUrl;
    }

    public void setAuthServiceUrl(String authServiceUrl) {
        this.authServiceUrl = authServiceUrl;
    }

    public String getSseUrl() {
        return sseUrl;
    }

    public void setSseUrl(String sseUrl) {
        this.sseUrl = sseUrl;
    }

    public String getLastEventId() {
        return lastEventId;
    }

    public void setLastEventId(String eventId) {
        if (StringUtils.isNotBlank(eventId)){
            synchronized(this) {
                this.lastEventId = eventId;
            }
        }    
    }

    public String getFrom() {
        return from;
    }

    public void setFrom(String from) {
        this.from = from;
    }

    public String getClientId() {
        return clientId;
    }

    public void setClientId(String clientId) {
        this.clientId = clientId;
    }

    public String getClientSecret() {
        return clientSecret;
    }

    public void setClientSecret(String clientSecret) {
        this.clientSecret = clientSecret;
    }

    public String getOriginalLastEventId() {
        return originalLastEventId;
    }

    public void setOriginalLastEventId(String originalLastEventId) {
        this.originalLastEventId = originalLastEventId;
    }
    
    public boolean hasCollectedEvents() {
        return (lastEventId != null && originalLastEventId != lastEventId);
    }
}

SseListener.java

package com.ariasystems.dataextract.examples.sse;

import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import jakarta.ws.rs.client.*;
import jakarta.ws.rs.core.Feature;
import jakarta.ws.rs.sse.*;
import org.apache.commons.lang3.StringUtils;

/**
 * A simple wrapper for SseEventSource that does basic construction and execution.
 */
public class SseListener {
    public class OnComplete implements Runnable{
        @Override public void run(){
            SseInputs inputs = SseInputs.Instance;
            if (isOpen()) {
                System.err.println("completion detected, closing connection...");
                close(5, TimeUnit.SECONDS);
            }
            System.err.println("server ended transmission outside of the event SSE handling. ");
            String lastEventId = inputs.getLastEventId();
            System.err.println("last successful EventId: " + lastEventId);
            System.err.println("");
            if (inputs.hasCollectedEvents()) {
                System.err.println("restarting from last event id processed");
                start();
            }
            else {
                System.err.println("no events processed since the original startup, verify the input parameters and target urls and restart");            
            }
        }
    }
    
    private SseEventSource eventSource;
    
    private static SseInputs sseInputs = SseInputs.Instance;
    
    /**
     * @param targetUrl URL of a Server-Sent Events source
     * @param authorizer producer of a JAX-RS "feature" that authorizes client requests
     */
    public SseListener(String targetUrl, Feature authorizer) {

        Client sseClient = HttpClientHelper.client(targetUrl);
        sseClient.register( authorizer );
        WebTarget target = sseClient.target(targetUrl);

        eventSource = SseEventSource
            .target(target)
            .reconnectingEvery(10, TimeUnit.SECONDS)
            .build();
    }
       
    static void initialize(
            String authServiceUrl, 
            String clientId,
            String clientSecret,
            String sseUrl, 
            String lastEventId, 
            String from){
        
        sseInputs.setAuthServiceUrl(authServiceUrl);
        if(StringUtils.isBlank(sseUrl)) {
            throw new IllegalArgumentException("sseUrl cannot be blank");    
        }
        sseInputs.setClientId(clientId);
        sseInputs.setClientSecret(clientSecret);

        sseInputs.setSseUrl(sseUrl);
        //if last event id is set, then ignore the from value as the lastEventId should be honored
        if (StringUtils.isBlank(lastEventId)) {
            System.out.println("setting from: " + from);
            sseInputs.setFrom(from);
        }
        else {
            System.out.println("setting lastEventId: " + lastEventId);
            sseInputs.setLastEventId(lastEventId);
            sseInputs.setOriginalLastEventId(lastEventId);
        }
        start();
    }
    
    static private void start() {
        try {    
            System.out.println("(re)starting...");
            System.out.println("constructing, invoking auth");
            Feature authFeature = new AuthFeature(sseInputs.getAuthServiceUrl(), sseInputs.getClientId(), sseInputs.getClientSecret()).build();
    
            String ssePath = constructSsePath();
            System.out.println("constructing SSE listener");
            SseListener sseListener = new SseListener(ssePath, authFeature);
    
            System.out.println("invoking SSE listener");
            sseListener.run((sseEvent) -> showEvent(sseEvent));
       }
       catch(Exception e){
           System.err.println("received error during startup, retrying after 5 seconds delay: " + e);
           try {
               Thread.sleep(5000);
               start();
           }
           catch (InterruptedException ie){
               System.err.println("retry sleep interrupted, aborting retry: " + ie);
               throw new RuntimeException(ie);
           }
       }
    }

    static private String constructSsePath() {
        StringBuilder sb = new StringBuilder(sseInputs.getSseUrl());
        sb.append("?");
        //we only construct the url with the from value if the lastEventId is blank
        if(StringUtils.isBlank(sseInputs.getLastEventId())) {
          if (StringUtils.isNotBlank(sseInputs.getFrom())){
              sb.append("from=").append(sseInputs.getFrom());
          }
        }
        else {
          sb.append("lastEventId=").append(sseInputs.getLastEventId());
        }
        String path = sb.toString();
        System.out.println("constructing path: " + path);
        return path;
    }
    
    static void showEvent(InboundSseEvent sseEvent) {
        // **** YOUR CODE HERE ****
        // This is a very simple example of what you might do.
        // Ideally this would contain logic that quickly writes to a file, queue, or kafka topic that can be processed by a downstream system 

        if (sseEvent.getName() != null) {
            System.out.println("----- EVENT --------------------------------");
            if(sseEvent.getId() != null) {
                SseInputs.Instance.setLastEventId(sseEvent.getId());
            }
            
            System.out.println("id:    " + sseEvent.getId());
            System.out.println("event: " + sseEvent.getName());
            

            String datas = sseEvent.readData();
            for (String data : datas.split("\n")) {
                System.out.println("data:  " + data);
            }
            System.out.println("raw:  " + sseEvent);            
        }
    }
    
    /**
     * @param eventHandler executed for every received SSE event
     */
    public void run(Consumer<InboundSseEvent> eventHandler) {
        try{
            eventSource.register(eventHandler, (e) -> failHandler(e), new OnComplete());
            eventSource.open();
          }
          catch(Exception e){
              System.err.println("fatal exception starting event source: " + e);
              e.printStackTrace();
          }
    }

    /** Close this listener by closing the event source. */
    public boolean close(long timeout, TimeUnit unit) {
        return eventSource.close(timeout, unit);
    }

    /** Is this event source open? */
    public boolean isOpen() {
        return eventSource.isOpen();
    }

    /** Invoked upon a unrecoverable error encountered by a SseEventSource */
    void failHandler(Throwable e) {
        //failover logic goes here
        System.err.println("received unrecoverable error: " + e);
        e.printStackTrace();
        System.err.println("last successful EventId: " + SseInputs.Instance.getLastEventId());
        
        start();
    }
}


HttpClientHelper.java

package com.ariasystems.dataextract.examples.sse;

import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;

import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSession;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;

import org.apache.commons.lang3.StringUtils;

import jakarta.ws.rs.client.Client;
import jakarta.ws.rs.client.ClientBuilder;

/**
 * a helper that configures the http client used for calls over the wire
 * @author anorman
 *
 */
public class HttpClientHelper {

    /**
     * sets up the http or https connections update this logic with your actual
     * Https trust management and valid hosts here
     */
    public static Client client(String target) {
        try {
            ClientBuilder builder = ClientBuilder.newBuilder();
               System.out.println("setting up http client for target: " + target);
                if (StringUtils.startsWith(target, "https")) {
                   System.out.println("detecting https for target: " + target);

                    SSLContext context = SSLContext.getInstance("TLSv1.2");
                System.setProperty("https.protocols", "TLSv1.2");
                // TODO: use a real trustmanager for a live implementation. This demo is
                // configured to accept all certs!
                TrustManager[] trustAllCerts = { 
                    new X509TrustManager() {
                        @Override
                        public void checkClientTrusted(X509Certificate[] chain, String authType) {
                            // Everyone is trusted!
                        }
    
                        @Override
                        public void checkServerTrusted(final X509Certificate[] chain, final String authType)
                                throws CertificateException {
                            // Everyone is trusted!
                        }
    
                        @Override
                        public X509Certificate[] getAcceptedIssuers() {
                            return new X509Certificate[0];
                        }
                    } 
                };
                context.init(null, trustAllCerts, new java.security.SecureRandom());

                // TODO: use a real HostnameVerifier for a live implementation. This demo is
                // configured to accept all hosts!
                HostnameVerifier allHostsValid = new HostnameVerifier() {

                    @Override
                    public boolean verify(String hostname, SSLSession session) {
                        return true;
                    }
                };

                builder.sslContext(context);

                // Install the all-trusting host verifier
                HttpsURLConnection.setDefaultHostnameVerifier(allHostsValid);
            }
            return builder.build();
        } catch (

        Exception e) {
            throw new RuntimeException("issue setting up client for target url: " + target, e);
        }

    }

}

AuthFeature.java

package com.ariasystems.dataextract.examples.sse;

import com.auth0.jwt.JWT;
import com.auth0.jwt.algorithms.Algorithm;

import org.apache.commons.lang3.StringUtils;
import org.glassfish.jersey.client.oauth2.OAuth2ClientSupport;

import jakarta.ws.rs.Produces;
import jakarta.ws.rs.client.*;
import jakarta.ws.rs.core.*;
import java.util.Date;

/** Produces a JAX-RS "feature" that, when registered with a client, causes client requests to be extended with
 *  an HTTP Authorization header whose access token comes from querying an Aria authentication service.
 *  <pre>
 *         Feature authFeature = new AuthFeature(url, clientId, secret).build();
 *         Client sseClient = ClientBuilder.newClient();
 *         sseClient.register( authFeature );
 *  </pre>
 */
public class AuthFeature {

    public String getAuthSvcUrl() {
        return authSvcUrl;
    }

    public void setAuthSvcUrl(String authSvcUrl) {
        this.authSvcUrl = authSvcUrl;
    }

    public String getClientId() {
        return clientId;
    }

    public void setClientId(String clientId) {
        this.clientId = clientId;
    }

    public String getClientSecret() {
        return clientSecret;
    }

    public void setClientSecret(String clientSecret) {
        this.clientSecret = clientSecret;
    }

    private String authSvcUrl;
    private String clientId;
    private String clientSecret;

    /**
     * @param authSvcUrl  url of an Aria authorization service
     * @param clientId  Aria client id
     * @param clientSecret  secret for the Aria client
     */
    public AuthFeature(String authSvcUrl, String clientId, String clientSecret) {
        this.authSvcUrl = authSvcUrl;
        this.clientId = clientId;
        this.clientSecret = clientSecret;
    }

    /**
     * @return a feature to be registered with a JAX-RS client
     */
    public Feature build() {
        return OAuth2ClientSupport.feature(getAriaAccessToken());
    }

    /**
     * This is the method that makes the OAuth2 request to get an access token.
     * @return a fresh access token
     */
    private String getAriaAccessToken() {

        // Create the JWT that will be the secret in the auth request
        String jwt = JWT.create()
                .withIssuedAt(new Date())
                .sign(Algorithm.HMAC256(clientSecret));

        // Construct the auth request
        Entity<?> authRequestEntity = Entity.entity(
                new Oauth2ClientCredentialsRequest(clientId, jwt),
                MediaType.APPLICATION_JSON
        );

        Client authClient = ClientBuilder.newClient();
        try {
            String path = "";
            if (!StringUtils.contains(authSvcUrl, "/oauth2/token")) {
                path = "/oauth2/token";
            }
            
            // Send the auth request
            Response authResponse = HttpClientHelper.client(authSvcUrl)
                    .target(authSvcUrl)
                    .path(path)
                    .request(MediaType.APPLICATION_JSON)
                    .post(authRequestEntity);

            // Read the response
            OAuth2Response oauthResponse = authResponse.readEntity(OAuth2Response.class);

            if (oauthResponse.error != null) {
                throw new RuntimeException("Unable to get OAuth2 token: " + oauthResponse.error_description);
            }

            return oauthResponse.access_token;
        }
        catch (Exception e){
            System.out.println("error generating auth: " + e);     
            e.printStackTrace();
            throw e;
        }
        finally {
            authClient.close();
        }
    }
    
    /** OAuth2 authentication requests of grant_type "client_credentials" */
    @Produces(MediaType.APPLICATION_JSON)
    public static class Oauth2ClientCredentialsRequest {
        public String grant_type = "client_credentials";
        public String client_id;
        public String client_secret;

        public Oauth2ClientCredentialsRequest(String clientId, String secret) {
            client_id = clientId;
            client_secret = secret;
        }
    }

    /** Responses to OAuth2 authentication requests */
    @Produces(MediaType.APPLICATION_JSON)
    public static class OAuth2Response {
        public OAuth2Response() {
            super();
        }
        public String token_type;
        public String expires_in;
        public String access_token;
        public String error;
        public String error_description;
    }
}
          
  • Was this article helpful?