Class EventSource

java.lang.Object
org.glassfish.jersey.media.sse.EventSource
All Implemented Interfaces:
EventListener

public class EventSource extends Object implements EventListener
Client for reading and processing incoming Server-Sent Events.

Instances of this class are thread safe. To build a new instance, you can use one of the available public EventSource constructors that produce pre-configured event source instances. Alternatively, you can create a new EventSource.Builder instance using EventSource.target(endpoint) factory method. Compared to EventSource constructors, an event source builder provides greater flexibility when custom-configuring a new event source builder.

Once an EventSource is created, it opens a connection to the associated web target and starts processing any incoming inbound events. Whenever a new event is received, an onEvent(InboundEvent) method is called as well as any registered event listeners are notified (see register(EventListener) and register(EventListener, String, String...).

Reconnect support

The EventSource supports automated recuperation from a connection loss, including negotiation of delivery of any missed events based on the last received SSE event id field value, provided this field is set by the server and the negotiation facility is supported by the server. In case of a connection loss, the last received SSE event id field value is send in the "Last-Event-ID" HTTP request header as part of a new connection request sent to the SSE endpoint. Upon a receipt of such reconnect request, the SSE endpoint that supports this negotiation facility is expected to replay all missed events. Note however, that this is a best-effort mechanism which does not provide any guaranty that all events would be delivered without a loss. You should therefore not rely on receiving every single event and design your client application code accordingly.

By default, when a connection the the SSE endpoint is lost, the event source will wait 500L ms before attempting to reconnect to the SSE endpoint. The SSE endpoint can however control the client-side retry delay by including a special retry field value in the any send event. Jersey EventSource implementation tracks any received SSE event retry field values set by the endpoint and adjusts the reconnect delay accordingly, using the last received retry field value as the reconnect delay.

In addition to handling the standard connection losses, Jersey EventSource automatically deals with any HTTP 503 Service Unavailable responses from SSE endpoint, that contain a "Retry-After" HTTP header with a valid value. The HTTP 503 + "Retry-After" technique is often used by HTTP endpoints as a means of connection and traffic throttling. In case a HTTP 503 + "Retry-After" response is received in return to a connection request, Jersey EventSource will automatically schedule a new reconnect attempt and use the received "Retry-After" HTTP header value as a one-time override of the reconnect delay.

Using HTTP persistent connections

The experience has shown that persistent HTTP connection management in the HttpURLConnection, that is used as a default Jersey client connector, is fragile. It is unfortunately quite possible, that under heavy load the client and server connections may get out of sync, causing Jersey EventSource hang on a connection that has already been closed by a server, but has not been properly cleaned up by the HttpURLConnection management code, and has been reused for a re-connect request instead. To avoid this issue, Jersey EventSource implementation by default disables persistent HTTP connections when connecting (or reconnecting) to the SSE endpoint.

In case you are using Jersey event source with a Jersey client ClientConfig.connectorProvider(org.glassfish.jersey.client.spi.ConnectorProvider) connector provider configured to use some other client ConnectorProvider implementation able to reliably manage persistent HTTP connections (such as org.glassfish.jersey.grizzly.connector.GrizzlyConnectorProvider or org.glassfish.jersey.apache.connector.ApacheConnectorProvider), or in case you simply need to use persistent HTTP connections, you may do so by invoking the usePersistentConnections() method on an event source builder prior to creating a new event source instance.

Author:
Pavel Bucek, Marek Potociar
  • Field Details

    • RECONNECT_DEFAULT

      public static final long RECONNECT_DEFAULT
      Default SSE EventSource reconnect delay value in milliseconds.
      Since:
      2.3
      See Also:
  • Constructor Details

    • EventSource

      public EventSource(WebTarget endpoint)
      Create new SSE event source and open a connection it to the supplied SSE streaming web target. This constructor is performs the same series of actions as a call to:
      EventSource.target(endpoint).open()

      The created event source instance automatically opens a connection to the supplied SSE streaming web target and starts processing incoming events.

      The incoming events are processed by the event source in an asynchronous task that runs in an internal single-threaded scheduled executor service.

      Parameters:
      endpoint - SSE streaming endpoint. Must not be null.
      Throws:
      NullPointerException - in case the supplied web target is null.
    • EventSource

      public EventSource(WebTarget endpoint, boolean open)
      Create new SSE event source pointing at a SSE streaming web target. This constructor is performs the same series of actions as a call to:
       if (open) {
           EventSource.target(endpoint).open();
       } else {
           EventSource.target(endpoint).build();
       }

      If the supplied open flag is true, the created event source instance automatically opens a connection to the supplied SSE streaming web target and starts processing incoming events. Otherwise, if the open flag is set to false, the created event source instance is not automatically connected to the web target. In this case it is expected that the user who created the event source will manually invoke its open() method.

      Once the event source is open, the incoming events are processed by the event source in an asynchronous task that runs in an internal single-threaded scheduled executor service.

      Parameters:
      endpoint - SSE streaming endpoint. Must not be null.
      open - if true, the event source will immediately connect to the SSE endpoint, if false, the connection will not be established until open() method is called explicitly on the event stream.
      Throws:
      NullPointerException - in case the supplied web target is null.
  • Method Details

    • target

      public static EventSource.Builder target(WebTarget endpoint)
      Create a new event source builder that provides convenient way how to configure and fine-tune various aspects of a newly prepared event source instance.
      Parameters:
      endpoint - SSE streaming endpoint. Must not be null.
      Returns:
      a builder of a new event source instance pointing at the specified SSE streaming endpoint.
      Throws:
      NullPointerException - in case the supplied web target is null.
      Since:
      2.3
    • open

      public void open()
      Open the connection to the supplied SSE underlying web target and start processing incoming events.
      Throws:
      IllegalStateException - in case the event source has already been opened earlier.
    • isOpen

      public boolean isOpen()
      Check if this event source instance has already been opened.
      Returns:
      true if this event source is open, false otherwise.
    • register

      public void register(EventListener listener)
      Register new event listener to receive all streamed SSE events.
      Parameters:
      listener - event listener to be registered with the event source.
      See Also:
    • register

      public void register(EventListener listener, String eventName, String... eventNames)
      Add name-bound event listener which will be called only for incoming SSE events whose name is equal to the specified name(s).
      Parameters:
      listener - event listener to register with this event source.
      eventName - inbound event name.
      eventNames - additional event names.
      See Also:
    • onEvent

      public void onEvent(InboundEvent inboundEvent)
      Invoked when an event is received.

      The default EventSource implementation is empty, users can override this method to handle incoming InboundEvents.

      Note that overriding this method may be necessary to make sure no InboundEvent incoming events are lost in case the event source is constructed using EventSource(jakarta.ws.rs.client.WebTarget) constructor or in case a true flag is passed to the EventSource(jakarta.ws.rs.client.WebTarget, boolean) constructor, since the connection is opened as as part of the constructor call and the event processing starts immediately. Therefore any EventListeners registered later after the event source has been constructed may miss the notifications about the one or more events that arrive immediately after the connection to the event source is established.

      Specified by:
      onEvent in interface EventListener
      Parameters:
      inboundEvent - received inbound event.
    • close

      public void close()
      Close this event source. The method will wait up to 5 seconds for the internal event processing task to complete.
    • close

      public boolean close(long timeout, TimeUnit unit)
      Close this event source and wait for the internal event processing task to complete for up to the specified amount of wait time.

      The method blocks until the event processing task has completed execution after a shutdown request, or until the timeout occurs, or the current thread is interrupted, whichever happens first.

      In case the waiting for the event processing task has been interrupted, this method restores the interrupt flag on the thread before returning false.

      Parameters:
      timeout - the maximum time to wait.
      unit - the time unit of the timeout argument.
      Returns:
      true if this executor terminated and false if the timeout elapsed before termination or the termination was interrupted.