Data Feed Example Code
Data Feed Examples
Following are instructions for using the attached .zip file for listening to the Data Stream. In addition, sample code from 4 Java classes is included both in this article and in the .zip file (SseExampleMain, SseInputs, SseListener, and AuthFeature). Also included in the .zip file is a maven file (pom.xml) that pulls in dependencies and builds the jar.
Download the .zip: java-sse-client.zip
Below the Data Stream listening code examples are four other Data Stream code examples such as Authenticator and Event Handlers. Click on each ► below to expand the sample code within.
Building
The classes can be built / jarred with the maven pom file.
mvn clean package will build a simple jar (without dependent libraries)
mvn clean package assembly:single will build an “uber” jar containing all dependent libraries in a single jar
Invoking
The SseExampleMain is the target Main class to invoke.
The easiest way to invoke this sample is with an uber jar packaging:
java {vm arguments} -jar sse-client-java-1.0-SNAPSHOT-jar-with-dependencies.jar
Invocation Arguments
The invocation uses the following vm arguments described below:
-DExample.AuthServiceUrl | The url to the auth endpoint - mandatory |
-DExample.ClientId | The client's id (aka client no) - mandatory |
-DExample.ClientSecret | The client's auth key - mandatory |
-DExample.SseUrl | The url to the sse endpoint (either the load stream or change stream) - mandatory |
-DExample.LastEventId | The last event id to resume streaming from - optional |
-DExample.From | The time to resume streaming from - optional and will be ignored if last event id is provided |
- ► SseExampleMain.java
-
package com.ariasystems.dataextract.examples.sse; import org.apache.commons.lang3.StringUtils; /** * An example app that creates an authorizer, creates an SSE listener and runs * that listener with an event handler that simply prints out each event. */ public class SseExampleMain { public static void main(String[] args) { String authServiceUrl = System.getProperty("Example.AuthServiceUrl"); String clientId = System.getProperty("Example.ClientId"); String clientSecret = System.getProperty("Example.ClientSecret"); String sseUrl = System.getProperty("Example.SseUrl"); String lastEventId = System.getProperty("Example.LastEventId"); String from = System.getProperty("Example.From"); System.out.println("client -> "+clientId); System.out.println("auth: sseUrl -> "+authServiceUrl); System.out.println("calling: sseUrl -> "+sseUrl); System.out.println("initializing app"); System.out.println("validating inputs"); //validate inputs try { validateNotBlank(authServiceUrl,"authServiceUrl"); validateNotBlank(sseUrl,"sseUrl"); validateNotBlank(clientId,"clientId"); validateNotBlank(clientSecret,"clientSecret"); validateNotBlank(authServiceUrl,"authServiceUrl"); //assert that the sseUrl is clean of query params if(StringUtils.contains(sseUrl, "?")) { throw new IllegalArgumentException("sseUrl cannot contain query params in its input string. Please provide these as separate params"); } } catch (Exception e) { e.printStackTrace(); printUsage(); } try { SseListener.initialize(authServiceUrl,clientId,clientSecret,sseUrl,lastEventId,from); } catch (Exception e) { e.printStackTrace(); } } private static void validateNotBlank(String param, String paramName) { if(StringUtils.isBlank(param)) { throw new IllegalArgumentException("param: " + paramName + " cannot be blank"); } } private static void printUsage(){ System.err.println(); System.err.println(); System.err.println(); System.err.println("USAGE: "); System.err.println("==========================================="); System.err.println("-DExample.AuthServiceUrl={the url to the auth endpoint} - mandatory"); System.err.println("-DExample.ClientId={the client's id} - mandatory"); System.err.println("-DExample.ClientSecret={the client's auth key} - mandatory"); System.err.println("-DExample.SseUrl={url to the sse endpoint (either the load stream or change stream)} - mandatory"); System.err.println("-DExample.LastEventId={the last event id to resume streaming from} - optional"); System.err.println("-DExample.From={the time to resume streaming from} - optional and will be ignored if last event id is provided"); } }
- ► SseInputs.java
-
package com.ariasystems.dataextract.examples.sse; import org.apache.commons.lang3.StringUtils; /** * A singleton that holds input state that can be used for recovering lost connections. * the last event id can be updated with the event ids in the latest data collected * @author anorman * */ public class SseInputs { public static SseInputs Instance = new SseInputs(); private String authServiceUrl; private String sseUrl; private String lastEventId; private String from; private String clientId; private String clientSecret; private String originalLastEventId; //private constructor / Singleton pattern private SseInputs(){ authServiceUrl = null; sseUrl = null; lastEventId = null; from = null; clientId = null; clientSecret = null; originalLastEventId = null; } public String getAuthServiceUrl() { return authServiceUrl; } public void setAuthServiceUrl(String authServiceUrl) { this.authServiceUrl = authServiceUrl; } public String getSseUrl() { return sseUrl; } public void setSseUrl(String sseUrl) { this.sseUrl = sseUrl; } public String getLastEventId() { return lastEventId; } public void setLastEventId(String eventId) { if (StringUtils.isNotBlank(eventId)){ synchronized(this) { this.lastEventId = eventId; } } } public String getFrom() { return from; } public void setFrom(String from) { this.from = from; } public String getClientId() { return clientId; } public void setClientId(String clientId) { this.clientId = clientId; } public String getClientSecret() { return clientSecret; } public void setClientSecret(String clientSecret) { this.clientSecret = clientSecret; } public String getOriginalLastEventId() { return originalLastEventId; } public void setOriginalLastEventId(String originalLastEventId) { this.originalLastEventId = originalLastEventId; } public boolean hasCollectedEvents() { return (lastEventId != null && originalLastEventId != lastEventId); } }
- ► SseListener.java
-
package com.ariasystems.dataextract.examples.sse; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import jakarta.ws.rs.client.*; import jakarta.ws.rs.core.Feature; import jakarta.ws.rs.sse.*; import org.apache.commons.lang3.StringUtils; /** * A simple wrapper for SseEventSource that does basic construction and execution. */ public class SseListener { public class OnComplete implements Runnable{ @Override public void run(){ SseInputs inputs = SseInputs.Instance; if (isOpen()) { System.err.println("completion detected, closing connection..."); close(5, TimeUnit.SECONDS); } System.err.println("server ended transmission outside of the event SSE handling. "); String lastEventId = inputs.getLastEventId(); System.err.println("last successful EventId: " + lastEventId); System.err.println(""); if (inputs.hasCollectedEvents()) { System.err.println("restarting from last event id processed"); start(); } else { System.err.println("no events processed since the original startup, verify the input parameters and target urls and restart"); } } } private SseEventSource eventSource; private static SseInputs sseInputs = SseInputs.Instance; /** * @param targetUrl URL of a Server-Sent Events source * @param authorizer producer of a JAX-RS "feature" that authorizes client requests */ public SseListener(String targetUrl, Feature authorizer) { Client sseClient = HttpClientHelper.client(targetUrl); sseClient.register( authorizer ); WebTarget target = sseClient.target(targetUrl); eventSource = SseEventSource .target(target) .reconnectingEvery(10, TimeUnit.SECONDS) .build(); } static void initialize( String authServiceUrl, String clientId, String clientSecret, String sseUrl, String lastEventId, String from){ sseInputs.setAuthServiceUrl(authServiceUrl); if(StringUtils.isBlank(sseUrl)) { throw new IllegalArgumentException("sseUrl cannot be blank"); } sseInputs.setClientId(clientId); sseInputs.setClientSecret(clientSecret); sseInputs.setSseUrl(sseUrl); //if last event id is set, then ignore the from value as the lastEventId should be honored if (StringUtils.isBlank(lastEventId)) { System.out.println("setting from: " + from); sseInputs.setFrom(from); } else { System.out.println("setting lastEventId: " + lastEventId); sseInputs.setLastEventId(lastEventId); sseInputs.setOriginalLastEventId(lastEventId); } start(); } static private void start() { try { System.out.println("(re)starting..."); System.out.println("constructing, invoking auth"); Feature authFeature = new AuthFeature(sseInputs.getAuthServiceUrl(), sseInputs.getClientId(), sseInputs.getClientSecret()).build(); String ssePath = constructSsePath(); System.out.println("constructing SSE listener"); SseListener sseListener = new SseListener(ssePath, authFeature); System.out.println("invoking SSE listener"); sseListener.run((sseEvent) -> showEvent(sseEvent)); } catch(Exception e){ System.err.println("received error during startup, retrying after 5 seconds delay: " + e); try { Thread.sleep(5000); start(); } catch (InterruptedException ie){ System.err.println("retry sleep interrupted, aborting retry: " + ie); throw new RuntimeException(ie); } } } static private String constructSsePath() { StringBuilder sb = new StringBuilder(sseInputs.getSseUrl()); sb.append("?"); //we only construct the url with the from value if the lastEventId is blank if(StringUtils.isBlank(sseInputs.getLastEventId())) { if (StringUtils.isNotBlank(sseInputs.getFrom())){ sb.append("from=").append(sseInputs.getFrom()); } } else { sb.append("lastEventId=").append(sseInputs.getLastEventId()); } String path = sb.toString(); System.out.println("constructing path: " + path); return path; } static void showEvent(InboundSseEvent sseEvent) { // **** YOUR CODE HERE **** // This is a very simple example of what you might do. // Ideally this would contain logic that quickly writes to a file, queue, or kafka topic that can be processed by a downstream system if (sseEvent.getName() != null) { System.out.println("----- EVENT --------------------------------"); if(sseEvent.getId() != null) { SseInputs.Instance.setLastEventId(sseEvent.getId()); } System.out.println("id: " + sseEvent.getId()); System.out.println("event: " + sseEvent.getName()); String datas = sseEvent.readData(); for (String data : datas.split("\n")) { System.out.println("data: " + data); } System.out.println("raw: " + sseEvent); } } /** * @param eventHandler executed for every received SSE event */ public void run(Consumer<InboundSseEvent> eventHandler) { try{ eventSource.register(eventHandler, (e) -> failHandler(e), new OnComplete()); eventSource.open(); } catch(Exception e){ System.err.println("fatal exception starting event source: " + e); e.printStackTrace(); } } /** Close this listener by closing the event source. */ public boolean close(long timeout, TimeUnit unit) { return eventSource.close(timeout, unit); } /** Is this event source open? */ public boolean isOpen() { return eventSource.isOpen(); } /** Invoked upon a unrecoverable error encountered by a SseEventSource */ void failHandler(Throwable e) { //failover logic goes here System.err.println("received unrecoverable error: " + e); e.printStackTrace(); System.err.println("last successful EventId: " + SseInputs.Instance.getLastEventId()); start(); } }
- ► HttpClientHelper.java
-
package com.ariasystems.dataextract.examples.sse; import java.security.cert.CertificateException; import java.security.cert.X509Certificate; import javax.net.ssl.HostnameVerifier; import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSession; import javax.net.ssl.TrustManager; import javax.net.ssl.X509TrustManager; import org.apache.commons.lang3.StringUtils; import jakarta.ws.rs.client.Client; import jakarta.ws.rs.client.ClientBuilder; /** * a helper that configures the http client used for calls over the wire * @author anorman * */ public class HttpClientHelper { /** * sets up the http or https connections update this logic with your actual * Https trust management and valid hosts here */ public static Client client(String target) { try { ClientBuilder builder = ClientBuilder.newBuilder(); System.out.println("setting up http client for target: " + target); if (StringUtils.startsWith(target, "https")) { System.out.println("detecting https for target: " + target); SSLContext context = SSLContext.getInstance("TLSv1.2"); System.setProperty("https.protocols", "TLSv1.2"); // TODO: use a real trustmanager for a live implementation. This demo is // configured to accept all certs! TrustManager[] trustAllCerts = { new X509TrustManager() { @Override public void checkClientTrusted(X509Certificate[] chain, String authType) { // Everyone is trusted! } @Override public void checkServerTrusted(final X509Certificate[] chain, final String authType) throws CertificateException { // Everyone is trusted! } @Override public X509Certificate[] getAcceptedIssuers() { return new X509Certificate[0]; } } }; context.init(null, trustAllCerts, new java.security.SecureRandom()); // TODO: use a real HostnameVerifier for a live implementation. This demo is // configured to accept all hosts! HostnameVerifier allHostsValid = new HostnameVerifier() { @Override public boolean verify(String hostname, SSLSession session) { return true; } }; builder.sslContext(context); // Install the all-trusting host verifier HttpsURLConnection.setDefaultHostnameVerifier(allHostsValid); } return builder.build(); } catch ( Exception e) { throw new RuntimeException("issue setting up client for target url: " + target, e); } } }
- ► AuthFeature.java
-
package com.ariasystems.dataextract.examples.sse; import com.auth0.jwt.JWT; import com.auth0.jwt.algorithms.Algorithm; import org.apache.commons.lang3.StringUtils; import org.glassfish.jersey.client.oauth2.OAuth2ClientSupport; import jakarta.ws.rs.Produces; import jakarta.ws.rs.client.*; import jakarta.ws.rs.core.*; import java.util.Date; /** Produces a JAX-RS "feature" that, when registered with a client, causes client requests to be extended with * an HTTP Authorization header whose access token comes from querying an Aria authentication service. * <pre> * Feature authFeature = new AuthFeature(url, clientId, secret).build(); * Client sseClient = ClientBuilder.newClient(); * sseClient.register( authFeature ); * </pre> */ public class AuthFeature { public String getAuthSvcUrl() { return authSvcUrl; } public void setAuthSvcUrl(String authSvcUrl) { this.authSvcUrl = authSvcUrl; } public String getClientId() { return clientId; } public void setClientId(String clientId) { this.clientId = clientId; } public String getClientSecret() { return clientSecret; } public void setClientSecret(String clientSecret) { this.clientSecret = clientSecret; } private String authSvcUrl; private String clientId; private String clientSecret; /** * @param authSvcUrl url of an Aria authorization service * @param clientId Aria client id * @param clientSecret secret for the Aria client */ public AuthFeature(String authSvcUrl, String clientId, String clientSecret) { this.authSvcUrl = authSvcUrl; this.clientId = clientId; this.clientSecret = clientSecret; } /** * @return a feature to be registered with a JAX-RS client */ public Feature build() { return OAuth2ClientSupport.feature(getAriaAccessToken()); } /** * This is the method that makes the OAuth2 request to get an access token. * @return a fresh access token */ private String getAriaAccessToken() { // Create the JWT that will be the secret in the auth request String jwt = JWT.create() .withIssuedAt(new Date()) .sign(Algorithm.HMAC256(clientSecret)); // Construct the auth request Entity<?> authRequestEntity = Entity.entity( new Oauth2ClientCredentialsRequest(clientId, jwt), MediaType.APPLICATION_JSON ); Client authClient = ClientBuilder.newClient(); try { String path = ""; if (!StringUtils.contains(authSvcUrl, "/oauth2/token")) { path = "/oauth2/token"; } // Send the auth request Response authResponse = HttpClientHelper.client(authSvcUrl) .target(authSvcUrl) .path(path) .request(MediaType.APPLICATION_JSON) .post(authRequestEntity); // Read the response OAuth2Response oauthResponse = authResponse.readEntity(OAuth2Response.class); if (oauthResponse.error != null) { throw new RuntimeException("Unable to get OAuth2 token: " + oauthResponse.error_description); } return oauthResponse.access_token; } catch (Exception e){ System.out.println("error generating auth: " + e); e.printStackTrace(); throw e; } finally { authClient.close(); } } /** OAuth2 authentication requests of grant_type "client_credentials" */ @Produces(MediaType.APPLICATION_JSON) public static class Oauth2ClientCredentialsRequest { public String grant_type = "client_credentials"; public String client_id; public String client_secret; public Oauth2ClientCredentialsRequest(String clientId, String secret) { client_id = clientId; client_secret = secret; } } /** Responses to OAuth2 authentication requests */ @Produces(MediaType.APPLICATION_JSON) public static class OAuth2Response { public OAuth2Response() { super(); } public String token_type; public String expires_in; public String access_token; public String error; public String error_description; } }
- ► Data Feed Example Authenticator Code
-
import jwt # to install: pip install PyJWT import requests # to install: pip install requests import time from requests.auth import AuthBase class AriaOauth2(AuthBase): """ Authenticator that retrieves access token from Aria auth service. There does not appear to be an off-the-shelf lib that does this at this time, a value that can be assigned to an http request's "auth" parameter that retrieves the access token using an Oauth2 grant type request. """ def __init__(self, client_id, client_secret, oauth2_url): """ :param client_id: The Aria client's id :param client_secret: The Aria client's secret :param oauth2_url: The URL of the authentication service's token endpoint """ self.client_id = client_id self.client_secret = client_secret self.oauth2_url = oauth2_url self.token = None self.expires = None def __call__(self, r): """ If we don't have an access token in hand, or it has expired, get a new token from the auth service. Then set the access token in the request's Authorization header. """ now = time.time() if not self.token or now > self.expires: self.token = self.get_fresh_token() self.expires = now + self.token['expires_in'] r.headers['Authorization'] = 'Bearer ' + self.token['access_token'] return r def get_fresh_token(self): """ Get an authorization header that contains a valid access token for the client """ # The token used to sign the grant request itself jwt_token = jwt.encode({'iat': int(time.time())}, self.client_secret, algorithm='HS256') # Make the request to the auth service to get an access token for the client resp = requests.post( self.oauth2_url, data = {'grant_type': 'client_credentials', 'client_id': self.client_id, 'client_secret': jwt_token}, verify = False, allow_redirects = False ) json_resp = resp.json() if 'access_token' in json_resp: return json_resp elif 'error' in json_resp: raise Exception("OAuth failed: %s: %s" % (json_resp['error'], json_resp.get('error_description'))) else: raise Exception("OAuth failed: %s" % (str(json_resp)))
- ► Data Feed Example Command Line Arguments
-
import argparse class DemoCliArgs(object): """ Comm These inputs are required to set up an SSE client stream: * URL of an SSE service * URL of an OAuth2 service (for authentication) * Aria client ID (for authentication) * Aria client secret (for authentication) """ def __init__(self): # Define the command line argument parser for this example code parser = argparse.ArgumentParser() parser.add_argument('-e', '--sse', help='URL of a Server-Sent Events source') parser.add_argument('-o', '--oauth', help='URL of an OAuth2 token service') parser.add_argument('-c', '--clientid', help='Aria client ID') parser.add_argument('-s', '--secret', help='Secret for the Aria client') # Get argument values from the command line args = parser.parse_args() self.sse_service_url = args.sse self.oauth2_url = args.oauth self.client_id = args.clientid self.client_secret = args.secret
- ► Data Feed Example Event Handlers
-
# Handlers for the stream events def handle_create(data): """The data part of a 'create' event has two lines, ref of the created entity, then its data""" lines = data.split("\n", 2) print("CREATE to: %s\ndata: %s" % (lines[0], lines[1])) def handle_update(data): """The data part of a 'update' event has two lines, ref of the updated entity, then its data""" lines = data.split("\n", 2) print("UPDATE to: %s\ndata: %s" % (lines[0], lines[1])) def handle_delete(data): """The data part of a 'delete' event has one line, ref of the deleted entity""" print("DELETE %s" % (data)) def handle_heartbeat(data): """These messages appear periodically just to keep the stream connection alive.""" print("(beat)") class SseHandlers(object): def __init__(self): # Map event type to handler self.event_handlers = { "create": handle_create, "update": handle_update, "delete": handle_delete, "message": handle_heartbeat } def handleMsg(self, msg): # Get the handler for the event type. Call that handler with the event's data # event_handlers.get(msg.event)(msg.data) self.event_handlers.get(msg.event)(msg.data)
- ► Data Feed Example Store For Last Event ID
-
class StreamStatusStore(object): """ A dummy store for a stream's last event id. The purpose of this is to save the stream's last event id so that in the event that this app crashes, the app knows where to restart. A real implementation should periodically write the value to some kind of persistent store, such as a file system, memcache or database. On restart the implementation should reread the value. """ def __init__(self): self.saved_last_id = None def save_last_id(self, last_id): self.saved_last_id = last_id def get_last_id(self): return self.saved_last_id