Following are instructions for using the attached .zip file for listening to the data stream. In addition, sample code from 4 Java classes is included both in this article and in the .zip file (SseExampleMain, SseInputs, SseListener, and AuthFeature). Also included in the .zip file is a maven file (pom.xml) that pulls in dependencies and builds the jar.
Example Data Feed Code
Download the .zip: java-sse-client.zip
Building
The classes can be built / jarred with the maven pom file.
mvn clean package will build a simple jar (without dependent libraries)
mvn clean package assembly:single will build an “uber” jar containing all dependent libraries in a single jar
Invoking
The SseExampleMain is the target Main class to invoke.
The easiest way to invoke this sample is with an uber jar packaging:
java {vm arguments} -jar sse-client-java-1.0-SNAPSHOT-jar-with-dependencies.jar
Invocation Arguments
The invocation uses the following vm arguments described below:
-DExample.AuthServiceUrl | The url to the auth endpoint - mandatory |
-DExample.ClientId | The client's id (aka client no) - mandatory |
-DExample.ClientSecret | The client's auth key - mandatory |
-DExample.SseUrl | The url to the sse endpoint (either the load stream or change stream) - mandatory |
-DExample.LastEventId | The last event id to resume streaming from - optional |
-DExample.From | The time to resume streaming from - optional and will be ignored if last event id is provided |
SseExampleMain.java
package com.ariasystems.dataextract.examples.sse; import org.apache.commons.lang3.StringUtils; /** * An example app that creates an authorizer, creates an SSE listener and runs * that listener with an event handler that simply prints out each event. */ public class SseExampleMain { public static void main(String[] args) { String authServiceUrl = System.getProperty("Example.AuthServiceUrl"); String clientId = System.getProperty("Example.ClientId"); String clientSecret = System.getProperty("Example.ClientSecret"); String sseUrl = System.getProperty("Example.SseUrl"); String lastEventId = System.getProperty("Example.LastEventId"); String from = System.getProperty("Example.From"); System.out.println("client -> "+clientId); System.out.println("auth: sseUrl -> "+authServiceUrl); System.out.println("calling: sseUrl -> "+sseUrl); System.out.println("initializing app"); System.out.println("validating inputs"); //validate inputs try { validateNotBlank(authServiceUrl,"authServiceUrl"); validateNotBlank(sseUrl,"sseUrl"); validateNotBlank(clientId,"clientId"); validateNotBlank(clientSecret,"clientSecret"); validateNotBlank(authServiceUrl,"authServiceUrl"); //assert that the sseUrl is clean of query params if(StringUtils.contains(sseUrl, "?")) { throw new IllegalArgumentException("sseUrl cannot contain query params in its input string. Please provide these as separate params"); } } catch (Exception e) { e.printStackTrace(); printUsage(); } try { SseListener.initialize(authServiceUrl,clientId,clientSecret,sseUrl,lastEventId,from); } catch (Exception e) { e.printStackTrace(); } } private static void validateNotBlank(String param, String paramName) { if(StringUtils.isBlank(param)) { throw new IllegalArgumentException("param: " + paramName + " cannot be blank"); } } private static void printUsage(){ System.err.println(); System.err.println(); System.err.println(); System.err.println("USAGE: "); System.err.println("==========================================="); System.err.println("-DExample.AuthServiceUrl={the url to the auth endpoint} - mandatory"); System.err.println("-DExample.ClientId={the client's id} - mandatory"); System.err.println("-DExample.ClientSecret={the client's auth key} - mandatory"); System.err.println("-DExample.SseUrl={url to the sse endpoint (either the load stream or change stream)} - mandatory"); System.err.println("-DExample.LastEventId={the last event id to resume streaming from} - optional"); System.err.println("-DExample.From={the time to resume streaming from} - optional and will be ignored if last event id is provided"); } }
SseInputs.java
package com.ariasystems.dataextract.examples.sse; import org.apache.commons.lang3.StringUtils; /** * A singleton that holds input state that can be used for recovering lost connections. * the last event id can be updated with the event ids in the latest data collected * @author anorman * */ public class SseInputs { public static SseInputs Instance = new SseInputs(); private String authServiceUrl; private String sseUrl; private String lastEventId; private String from; private String clientId; private String clientSecret; private String originalLastEventId; //private constructor / Singleton pattern private SseInputs(){ authServiceUrl = null; sseUrl = null; lastEventId = null; from = null; clientId = null; clientSecret = null; originalLastEventId = null; } public String getAuthServiceUrl() { return authServiceUrl; } public void setAuthServiceUrl(String authServiceUrl) { this.authServiceUrl = authServiceUrl; } public String getSseUrl() { return sseUrl; } public void setSseUrl(String sseUrl) { this.sseUrl = sseUrl; } public String getLastEventId() { return lastEventId; } public void setLastEventId(String eventId) { if (StringUtils.isNotBlank(eventId)){ synchronized(this) { this.lastEventId = eventId; } } } public String getFrom() { return from; } public void setFrom(String from) { this.from = from; } public String getClientId() { return clientId; } public void setClientId(String clientId) { this.clientId = clientId; } public String getClientSecret() { return clientSecret; } public void setClientSecret(String clientSecret) { this.clientSecret = clientSecret; } public String getOriginalLastEventId() { return originalLastEventId; } public void setOriginalLastEventId(String originalLastEventId) { this.originalLastEventId = originalLastEventId; } public boolean hasCollectedEvents() { return (lastEventId != null && originalLastEventId != lastEventId); } }
SseListener.java
package com.ariasystems.dataextract.examples.sse; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import jakarta.ws.rs.client.*; import jakarta.ws.rs.core.Feature; import jakarta.ws.rs.sse.*; import org.apache.commons.lang3.StringUtils; /** * A simple wrapper for SseEventSource that does basic construction and execution. */ public class SseListener { public class OnComplete implements Runnable{ @Override public void run(){ SseInputs inputs = SseInputs.Instance; if (isOpen()) { System.err.println("completion detected, closing connection..."); close(5, TimeUnit.SECONDS); } System.err.println("server ended transmission outside of the event SSE handling. "); String lastEventId = inputs.getLastEventId(); System.err.println("last successful EventId: " + lastEventId); System.err.println(""); if (inputs.hasCollectedEvents()) { System.err.println("restarting from last event id processed"); start(); } else { System.err.println("no events processed since the original startup, verify the input parameters and target urls and restart"); } } } private SseEventSource eventSource; private static SseInputs sseInputs = SseInputs.Instance; /** * @param targetUrl URL of a Server-Sent Events source * @param authorizer producer of a JAX-RS "feature" that authorizes client requests */ public SseListener(String targetUrl, Feature authorizer) { Client sseClient = HttpClientHelper.client(targetUrl); sseClient.register( authorizer ); WebTarget target = sseClient.target(targetUrl); eventSource = SseEventSource .target(target) .reconnectingEvery(10, TimeUnit.SECONDS) .build(); } static void initialize( String authServiceUrl, String clientId, String clientSecret, String sseUrl, String lastEventId, String from){ sseInputs.setAuthServiceUrl(authServiceUrl); if(StringUtils.isBlank(sseUrl)) { throw new IllegalArgumentException("sseUrl cannot be blank"); } sseInputs.setClientId(clientId); sseInputs.setClientSecret(clientSecret); sseInputs.setSseUrl(sseUrl); //if last event id is set, then ignore the from value as the lastEventId should be honored if (StringUtils.isBlank(lastEventId)) { System.out.println("setting from: " + from); sseInputs.setFrom(from); } else { System.out.println("setting lastEventId: " + lastEventId); sseInputs.setLastEventId(lastEventId); sseInputs.setOriginalLastEventId(lastEventId); } start(); } static private void start() { try { System.out.println("(re)starting..."); System.out.println("constructing, invoking auth"); Feature authFeature = new AuthFeature(sseInputs.getAuthServiceUrl(), sseInputs.getClientId(), sseInputs.getClientSecret()).build(); String ssePath = constructSsePath(); System.out.println("constructing SSE listener"); SseListener sseListener = new SseListener(ssePath, authFeature); System.out.println("invoking SSE listener"); sseListener.run((sseEvent) -> showEvent(sseEvent)); } catch(Exception e){ System.err.println("received error during startup, retrying after 5 seconds delay: " + e); try { Thread.sleep(5000); start(); } catch (InterruptedException ie){ System.err.println("retry sleep interrupted, aborting retry: " + ie); throw new RuntimeException(ie); } } } static private String constructSsePath() { StringBuilder sb = new StringBuilder(sseInputs.getSseUrl()); sb.append("?"); //we only construct the url with the from value if the lastEventId is blank if(StringUtils.isBlank(sseInputs.getLastEventId())) { if (StringUtils.isNotBlank(sseInputs.getFrom())){ sb.append("from=").append(sseInputs.getFrom()); } } else { sb.append("lastEventId=").append(sseInputs.getLastEventId()); } String path = sb.toString(); System.out.println("constructing path: " + path); return path; } static void showEvent(InboundSseEvent sseEvent) { // **** YOUR CODE HERE **** // This is a very simple example of what you might do. // Ideally this would contain logic that quickly writes to a file, queue, or kafka topic that can be processed by a downstream system if (sseEvent.getName() != null) { System.out.println("----- EVENT --------------------------------"); if(sseEvent.getId() != null) { SseInputs.Instance.setLastEventId(sseEvent.getId()); } System.out.println("id: " + sseEvent.getId()); System.out.println("event: " + sseEvent.getName()); String datas = sseEvent.readData(); for (String data : datas.split("\n")) { System.out.println("data: " + data); } System.out.println("raw: " + sseEvent); } } /** * @param eventHandler executed for every received SSE event */ public void run(Consumer<InboundSseEvent> eventHandler) { try{ eventSource.register(eventHandler, (e) -> failHandler(e), new OnComplete()); eventSource.open(); } catch(Exception e){ System.err.println("fatal exception starting event source: " + e); e.printStackTrace(); } } /** Close this listener by closing the event source. */ public boolean close(long timeout, TimeUnit unit) { return eventSource.close(timeout, unit); } /** Is this event source open? */ public boolean isOpen() { return eventSource.isOpen(); } /** Invoked upon a unrecoverable error encountered by a SseEventSource */ void failHandler(Throwable e) { //failover logic goes here System.err.println("received unrecoverable error: " + e); e.printStackTrace(); System.err.println("last successful EventId: " + SseInputs.Instance.getLastEventId()); start(); } }
HttpClientHelper.java
package com.ariasystems.dataextract.examples.sse; import java.security.cert.CertificateException; import java.security.cert.X509Certificate; import javax.net.ssl.HostnameVerifier; import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSession; import javax.net.ssl.TrustManager; import javax.net.ssl.X509TrustManager; import org.apache.commons.lang3.StringUtils; import jakarta.ws.rs.client.Client; import jakarta.ws.rs.client.ClientBuilder; /** * a helper that configures the http client used for calls over the wire * @author anorman * */ public class HttpClientHelper { /** * sets up the http or https connections update this logic with your actual * Https trust management and valid hosts here */ public static Client client(String target) { try { ClientBuilder builder = ClientBuilder.newBuilder(); System.out.println("setting up http client for target: " + target); if (StringUtils.startsWith(target, "https")) { System.out.println("detecting https for target: " + target); SSLContext context = SSLContext.getInstance("TLSv1.2"); System.setProperty("https.protocols", "TLSv1.2"); // TODO: use a real trustmanager for a live implementation. This demo is // configured to accept all certs! TrustManager[] trustAllCerts = { new X509TrustManager() { @Override public void checkClientTrusted(X509Certificate[] chain, String authType) { // Everyone is trusted! } @Override public void checkServerTrusted(final X509Certificate[] chain, final String authType) throws CertificateException { // Everyone is trusted! } @Override public X509Certificate[] getAcceptedIssuers() { return new X509Certificate[0]; } } }; context.init(null, trustAllCerts, new java.security.SecureRandom()); // TODO: use a real HostnameVerifier for a live implementation. This demo is // configured to accept all hosts! HostnameVerifier allHostsValid = new HostnameVerifier() { @Override public boolean verify(String hostname, SSLSession session) { return true; } }; builder.sslContext(context); // Install the all-trusting host verifier HttpsURLConnection.setDefaultHostnameVerifier(allHostsValid); } return builder.build(); } catch ( Exception e) { throw new RuntimeException("issue setting up client for target url: " + target, e); } } }
AuthFeature.java
package com.ariasystems.dataextract.examples.sse; import com.auth0.jwt.JWT; import com.auth0.jwt.algorithms.Algorithm; import org.apache.commons.lang3.StringUtils; import org.glassfish.jersey.client.oauth2.OAuth2ClientSupport; import jakarta.ws.rs.Produces; import jakarta.ws.rs.client.*; import jakarta.ws.rs.core.*; import java.util.Date; /** Produces a JAX-RS "feature" that, when registered with a client, causes client requests to be extended with * an HTTP Authorization header whose access token comes from querying an Aria authentication service. * <pre> * Feature authFeature = new AuthFeature(url, clientId, secret).build(); * Client sseClient = ClientBuilder.newClient(); * sseClient.register( authFeature ); * </pre> */ public class AuthFeature { public String getAuthSvcUrl() { return authSvcUrl; } public void setAuthSvcUrl(String authSvcUrl) { this.authSvcUrl = authSvcUrl; } public String getClientId() { return clientId; } public void setClientId(String clientId) { this.clientId = clientId; } public String getClientSecret() { return clientSecret; } public void setClientSecret(String clientSecret) { this.clientSecret = clientSecret; } private String authSvcUrl; private String clientId; private String clientSecret; /** * @param authSvcUrl url of an Aria authorization service * @param clientId Aria client id * @param clientSecret secret for the Aria client */ public AuthFeature(String authSvcUrl, String clientId, String clientSecret) { this.authSvcUrl = authSvcUrl; this.clientId = clientId; this.clientSecret = clientSecret; } /** * @return a feature to be registered with a JAX-RS client */ public Feature build() { return OAuth2ClientSupport.feature(getAriaAccessToken()); } /** * This is the method that makes the OAuth2 request to get an access token. * @return a fresh access token */ private String getAriaAccessToken() { // Create the JWT that will be the secret in the auth request String jwt = JWT.create() .withIssuedAt(new Date()) .sign(Algorithm.HMAC256(clientSecret)); // Construct the auth request Entity<?> authRequestEntity = Entity.entity( new Oauth2ClientCredentialsRequest(clientId, jwt), MediaType.APPLICATION_JSON ); Client authClient = ClientBuilder.newClient(); try { String path = ""; if (!StringUtils.contains(authSvcUrl, "/oauth2/token")) { path = "/oauth2/token"; } // Send the auth request Response authResponse = HttpClientHelper.client(authSvcUrl) .target(authSvcUrl) .path(path) .request(MediaType.APPLICATION_JSON) .post(authRequestEntity); // Read the response OAuth2Response oauthResponse = authResponse.readEntity(OAuth2Response.class); if (oauthResponse.error != null) { throw new RuntimeException("Unable to get OAuth2 token: " + oauthResponse.error_description); } return oauthResponse.access_token; } catch (Exception e){ System.out.println("error generating auth: " + e); e.printStackTrace(); throw e; } finally { authClient.close(); } } /** OAuth2 authentication requests of grant_type "client_credentials" */ @Produces(MediaType.APPLICATION_JSON) public static class Oauth2ClientCredentialsRequest { public String grant_type = "client_credentials"; public String client_id; public String client_secret; public Oauth2ClientCredentialsRequest(String clientId, String secret) { client_id = clientId; client_secret = secret; } } /** Responses to OAuth2 authentication requests */ @Produces(MediaType.APPLICATION_JSON) public static class OAuth2Response { public OAuth2Response() { super(); } public String token_type; public String expires_in; public String access_token; public String error; public String error_description; } }