color IN ('red', 'green', 'white')
Message Queue Clients: Design and Features |
Previous | Next | Contents |
This chapter addresses architectural and configuration issues that depend upon Message Queue’s implementation of the Java Message Specification. It covers the following topics:
The choices you make in designing a JMS client affect portability, allocating work between connections and sessions, reliability and performance, resource use, and ease of administration. This section discusses basic issues that you need to address in client design. It covers the following topics:
The Java Messaging Specification was developed to abstract access to message-oriented middleware systems (MOMs). A client that writes JMS code should be portable to any provider that implements this specification. If code portability is important to you, be sure that you do the following in developing clients:
Make sure your code does not depend on extensions or features that are specific to Message Queue.
Look up, using JNDI, (rather than instantiate) administered objects
for connection factories and destinations.
Administered objects encapsulate provider-specific implementation and
configuration information. Besides allowing for portability,
administered objects also make it much easier to share connection
factories between applications and to tune a JMS application for
performance and resource use. So, even if portability is not important
to you, you might still want to leave the work of creating and
configuring these objects to an administrator. For more information, see
Looking Up a Connection Factory With
JNDI and Looking Up a Destination
With JNDI.
As described in "Messaging Domains" in Open Message Queue Technical Overview, JMS supports two distinct message delivery models: point-to-point (queues) and publish/subscribe (topics). The JMS simplified and classic APIs support both domains. There are also legacy APIs specific to each domain. These four APIs are shown in Table 2-1.
Table 2-1 Interface Classes for JMS APIs
Simplified API | Classic API | Legacy API for Point-to-Point Domain | Legacy API for Publish/Subscribe Domain |
---|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
+ |
|
|
|
|
|
|
|
|
|
|
|
The JMS 2.0 specification provides the Simplified API for unified domains. It provides all the functionality of the Classic API provided by the JMS 1.1 specification but requires fewer interfaces and is simpler to use. You can choose the API that best suits your needs. The legacy domain-specific APIs continue to be supported but are not recommended for new application development.
A connection is a relatively heavy-weight object because of the authentication and communication setup that must be done when a connection is created. For this reason, it’s a good idea to use as few connections as possible. The real allocation of work occurs in sessions, which are light-weight, single-threaded contexts for producing and consuming messages. When you are thinking about structuring your client, it is best to think of the work that is done at the session level.
A session
Is a factory for its message producers and consumers
Supplies provider-optimized message factories
Supports a single series of transactions that combine work spanning its producers and consumers into atomic units
Defines a serial order for the messages it consumes and the messages it produces
Retains messages until they have been acknowledged
Serializes execution of message listeners registered with its message consumers
The requirement that sessions be operated on by a single thread at a time places some restrictions on the combination of producers and consumers that can use the same session. In particular, if a session has an asynchronous consumer, it may not have any other synchronous consumers. For a discussion of the connection and session’s use of threads, see Managing Client Threads. With the exception of these restrictions, let the needs of your application determine the number of sessions, producers, and consumers.
The JMS 2.0 Specification provides the JMSContext
object is an active
connection to a JMS provider and a single-threaded context for sending
and receiving messages. It is used in the Simplified API to combine the
functionality of the Connection
and Session
object to reduce the
number of objects to send and receive messages. See
The JMS Simplified API.
Aside from the reliability your client requires, the design decisions that relate to producers and consumers include the following:
Do you want to use a point-to-point or a publish/subscribe domain?
There are some interesting permutations here. There are times when you
would want to use publish/subscribe even when you have only one
subscriber. On the other hand, performance considerations might make the
point-to-point model more efficient than the publish/subscribe model,
when the work of sorting messages between subscribers is too costly.
Sometimes You cannot make these decisions cannot in the abstract, but
must actually develop and test different prototypes.
Are you using an asynchronous message consumer that does not receive
messages often or a producer that is seldom used?
Let the administrator know how to set the ping interval, so that your
client gets an exception if the connection should fail. For more
information see Using the Client Runtime Ping Feature.
Are you using a synchronous consumer in a distributed application?
You might need to allow a small time interval between connecting and
calling the receiveNoWait()
method in order not to miss a pending
message. For more information, see Synchronous Consumption
in Distributed Applications.
Do you need message compression?
Benefits vary with the size and format of messages, the number of
consumers, network bandwidth, and CPU performance; and benefits are not
guaranteed. For a more detailed discussion, see Message
Compression.
A connection can have a client identifier. This identifier is used to
associate a JMS client’s connection to a message service, with state
information maintained by the message service for that client. The JMS
provider must ensure that a client identifier is unique, and applies to
only one connection at a time. Currently, client identifiers are used to
maintain state for durable subscribers. In defining a client identifier,
you can use a special variable substitution syntax that allows multiple
connections to be created from a single ConnectionFactory
object using
different user name parameters to generate unique client identifiers.
These connections can be used by multiple durable subscribers without
naming conflicts or lack of security.
Message Queue allows client identifiers to be set in one of two ways:
Programmatically: You use the setClientID
method of the Connection
object. If you use this method, you must set the client id before you
use the connection. Once the connection is used, the client identifier
cannot be set or reset.
Administratively: The administrator specifies the client ID when creating the connection factory administrative object. See "Client Identifier" in Open Message Queue Administration Guide.
In general, all messages sent to a destination by a single session are guaranteed to be delivered to a consumer in the order they were sent. However, if they are assigned different priorities, a messaging system will attempt to deliver higher priority messages first.
Beyond this, the ordering of messages consumed by a client can have only a rough relationship to the order in which they were produced. This is because the delivery of messages to a number of destinations and the delivery from those destinations can depend on a number of issues that affect timing, such as the order in which the messages are sent, the sessions from which they are sent, whether the messages are persistent, the lifetime of the messages, the priority of the messages, the message delivery policy of queue destinations (see "Physical Destination Property Reference" in Open Message Queue Administration Guide), and message service availability.
The use of selectors can have a significant impact on the performance of your application. It’s difficult to put an exact cost on the expense of using selectors since it varies with the complexity of the selector expression, but the more you can do to eliminate or simplify selectors the better.
One way to eliminate (or simplify) selectors is to use multiple destinations to sort messages. This has the additional benefit of spreading the message load over more than one producer, which can improve the scalability of your application. For those cases when it is not possible to do that, here are some techniques that you can use to improve the performance of your application when using selectors:
Have consumers share selectors. As of version 3.5 of Message Queue,
message consumers with identical selectors "share" that selector in
imqbrokerd
which can significantly improve performance. So if there is
a way to structure your application to have some selector sharing,
consider doing so.
Use IN
instead of multiple string comparisons. For example, the
following expression:
color IN ('red', 'green', 'white')
is much more efficient than this expression
color = 'red' OR color = 'green' OR color = 'white'
especially if the above expression usually evaluates to false.
* Use BETWEEN
instead of multiple integer comparisons. For example:
size BETWEEN 6 AND 10
is generally more efficient than
size>= 6 AND size <= 10
especially if the above expression usually evaluates to true. * Order the selector expression so that Message Queue can determine its evaluation as soon as possible. (Evaluation proceeds from left to right.) This can easily double or triple performance when using selectors, depending on the complexity of the expression.
If you have two expressions joined by an OR
, put the expression
that is most likely to evaluate to TRUE
first.
If you have two expressions joined by an AND
, put the expression
that is most likely to evaluate to FALSE
first.
For example, if size
is usually greater than 6, but color is rarely
red
you’d want the order of an OR
expression to be:
size> 6 OR color = 'red'
If you are using AND
:
color = 'red' AND size> 6
Reliable messaging is implemented in a variety of ways: through the use of persistent messages, acknowledgments or transactions, durable subscriptions, and connection failover.
In general, the more reliable the delivery of messages, the more overhead and bandwidth are required to achieve it. The trade-off between reliability and performance is a significant design consideration. You can maximize performance and throughput by choosing to produce and consume nonpersistent messages. On the other hand, you can maximize reliability by producing and consuming persistent messages in a transaction using a transacted session. For a detailed discussion of design options and their impact on performance, see Factors Affecting Performance.
Using client threads effectively requires that you balance performance, throughput, and resource needs. To do this, you need to understand JMS restrictions on thread usage, what threads Message Queue allocates for itself, and the architecture of your applications. This section addresses these issues and offers some guidelines for managing client threads.
The Java Messaging Specification mandates that a session not be operated on by more than one thread at a time. This leads to the following restrictions:
A session may not have an asynchronous consumer and a synchronous consumer.
A session that has an asynchronous consumer can only produce messages
from within the onMessage()
method (the message listener). The only
call that you can make outside the message listener is to close the
session.
A session may include any number of synchronous consumers, any number of producers, and any combination of the two. That is, the single-thread requirement cannot be violated by these combinations. However, performance may suffer.
The system does not enforce the requirement that a session be single
threaded. If your client application violates this requirement, you will
get a JMSIllegalState
exception or unexpected results.
When the Message Queue client runtime creates a connection, it creates two threads: one for consuming messages from the socket, and one to manage the flow of messages for the connection. In addition, the client runtime creates a thread for each client session. Thus, at a minimum, for a connection using one session, three threads are created. For a connection using three sessions, five threads are created, and so on.
Managing threads in a JMS application often involves trade-offs between performance and throughput. Weigh the following considerations when dealing with threading issues.
When you create several asynchronous message consumers in the same session, messages are delivered serially by the session thread to these consumers. Sharing a session among several message consumers might starve some consumers of messages while inundating other consumers. If the message rate across these consumers is high enough to cause an imbalance, you might want to separate the consumers into different sessions. To determine whether message flow is unbalanced, you can monitor destinations to see the rate of messages coming in. See Using the Metrics Monitoring API.
You can reduce the number of threads allocated to the client application by using fewer connections and fewer sessions. However, doing this might slow your application’s throughput.
You might be able to use certain JVM runtime options to improve thread
memory usage and performance. For example, if you are running on the
Solaris platform, you may be able to run with the same number (or more)
threads by using the following vm
options with the client: Refer to
the JDK documentation for details.
Use the Xss128K
option to decrease the memory size of the heap.
Use the xconcurrentIO
option to improve thread performance in the
1.3 VM.
This section describes memory and performance issues that you can manage by increasing JVM heap space and by managing the size of your messages. It covers the following topics:
You can also improve performance by having the administrator set connection factory attributes to meter the message flow over the client-broker connection and to limit the message flow for a consumer. For a detailed explanation, please see "Reliability And Flow Control" in Open Message Queue Administration Guide.
A client application running in a JVM needs enough memory to accommodate
messages that flow in from the network as well as messages the client
creates. If your client gets OutOfMemoryError
errors, chances are that
not enough memory was provided to handle the size or the number of
messages being consumed or produced.
Your client might need more than the default JVM heap space. On most systems, the default is 64 MB but you will need to check the default values for your system.
Consider the following guidelines:
Evaluate the normal and peak system memory footprints when sizing heap space.
You can start by doubling the heap size using a command like the
following:
java -Xmx128m MyClass
The best size for the heap space depends on both the operating system and the JDK release. Check the JDK documentation for restrictions.
The size of the VM’s memory allocation pool must be less than or equal to the amount of virtual memory that is available on the system.
In general, for better manageability, you can break large messages into smaller parts, and use sequencing to ensure that the partial messages sent are concatenated properly. You can also use a Message Queue JMS feature to compress the body of a message. This section describes the programming interface that allows you to compress messages and to compare the size of compressed and uncompressed messages.
Message compression and decompression is handled entirely by the client runtime, without involving the broker. Therefore, applications can use this feature with a pervious version of the broker, but they must use version 3.6 or later of the Message Queue client runtime library.
You can use the Message.setBooleanProperty()
method to specify that
the body of a message be compressed. If the JMS_SUN_COMPRESS
property
is set to true
, the client runtime, will compress the body of the
message being sent. This happens after the producer’s send method is
called and before the send method returns to the caller. The compressed
message is automatically decompressed by the client runtime before the
message is delivered to the message consumer.
For example, the following call specifies that a message be compressed:
MyMessage.setBooleanProperty("JMS_SUN_COMPRESS",true);
Compression only affects the message body; the message header and properties are not compressed.
Two read-only JMS message properties are set by the client runtime after a message is sent.
Applications can test the properties (JMS_SUN_UNCOMPRESSED_SIZE
and
JMS_SUN_COMPRESSED_SIZE)
after a send returns to determine whether
compression is advantageous. That is, applications wanting to use this
feature, do not have to explicitly receive a compressed and uncompressed
version of the message to determine whether compression is desired.
If the consumer of a compressed message wants to resend the message in
an uncompressed form, it should call the Message.clearProperties(
) to
clear the JMS_SUN_COMPRESS
property. Otherwise, the message will be
compressed before it is sent to its next destination.
Although message compression has been added to improve performance, such benefit is not guaranteed. Benefits vary with the size and format of messages, the number of consumers, network bandwidth, and CPU performance. For example, the cost of compression and decompression might be higher than the time saved in sending and receiving a compressed message. This is especially true when sending small messages in a high-speed network. On the other hand, applications that publish large messages to many consumers or who publish in a slow network environment, might improve system performance by compressing messages.
Depending on the message body type, compression may also provide minimal
or no benefit. An application client can use the
JMS_SUN_UNCOMPRESSED_SIZE
and JMS_SUN_COMPRESSED_SIZE
properties to
determine the benefit of compression for different message types.
Message consumers deployed with client runtime libraries that precede version 3.6 cannot handle compressed messages. Clients wishing to send compressed messages must make sure that consumers are compatible. C clients cannot currently consume compressed messages.
Example 2-1 shows how you set and send a compressed message:
Example 2-1 Sending a Compressed Message
//topicSession and myTopic are assumed to have been created
topicPublisher publisher = topicSession.createPublisher(myTopic);
BytesMessage bytesMessage=topicSession.createBytesMessage();
//byteArray is assumed to have been created
bytesMessage.writeBytes(byteArray);
//instruct the client runtime to compress this message
bytesMessage.setBooleanProperty("JMS_SUN_COMPRESS", true);
//publish message to the myTopic destination
publisher.publish(bytesMessage);
Example 2-2 shows how you examine compressed and
uncompressed message body size. The bytesMessage
was created as in
Example 2-1:
Example 2-2 Comparing Compressed and Uncompressed Message Size
//get uncompressed body size
int uncompressed=bytesMessage.getIntProperty("JMS_SUN_UNCOMPRESSED_SIZE");
//get compressed body size
int compressed=bytesMessage.getIntProperty("JMS_SUN_COMPRESSED_SIZE");
When a message is deemed undeliverable, it is automatically placed on a special queue called the dead message queue. A message placed on this queue retains all of its original headers (including its original destination) and information is added to the message’s properties to explain why it became a dead message. An administrator or a developer can access this queue, remove a message, and determine why it was placed on the queue.
For an introduction to dead messages and the dead message queue, see "Using the Dead Message Queue" in Open Message Queue Administration Guide.
For a description of the destination properties and of the broker properties that control the system’s use of the dead message queue, see "Physical Destination Property Reference" in Open Message Queue Administration Guide.
This section describes the message properties that you can set or examine programmatically to determine the following:
Whether a dead message can be sent to the dead message queue.
Whether the broker should log information when a message is destroyed or moved to the dead message queue.
Whether the body of the message should also be stored when the message is placed on the dead message queue.
Why the message was placed on the dead message queue and any ancillary information.
Message Queue 3.6 clients can set properties related to the dead message queue on messages and send those messages to clients compiled against earlier versions. However clients receiving such messages cannot examine these properties without recompiling against 3.6 libraries.
The dead message queue is automatically created by the broker and called
mq.sys.dmq.
You can use the message monitoring API, described in
Using the Metrics
Monitoring API, to determine whether that queue is growing, to examine
messages on that queue, and so on.
You can set the properties described in Table 2-2 for any message to control how the broker should handle that message if it deems it to be undeliverable. Note that these message properties are needed only to override destination, or broker-based behavior.
Table 2-2 Message Properties Relating to Dead Message Queue
Property | Description |
---|---|
|
A boolean whose value determines what the broker should do with the message if it is dead. The default value of unset, specifies that the message should be handled
as specified by the A value of A value of |
|
A boolean value that determines how activity relating to dead messages should be logged. The default value of unset, will behave as specified by the broker
configuration property A value of A value of |
|
A boolean value that specifies whether the body of a dead message is truncated. The default value of unset, will behave as specified by the broker
property A value of A value of |
The properties described in Table 2-3 are set by the broker for a message placed in the dead message queue. You can examine the properties for the message to retrieve information about why the message was placed on the queue and to gather other information about the message and about the context within which this action was taken.
Table 2-3 Dead Message Properties
Property | Description |
---|---|
|
An |
|
A |
|
A
If the message was marked dead for multiple reasons, for example it was undeliverable and expired, only one reason will be specified by this property. The |
|
A |
|
A |
|
A |
|
A |
|
A |
When creating a topic or queue destination, the administrator can
specify how the broker should behave when certain memory limits are
reached. Specifically, when the number of messages reaching a physical
destination exceeds the number specified with the maxNumMsgs
property
or when the total amount of memory allowed for messages exceeds the
number specified with the maxTotalMsgBytes
property, the broker takes
one of the following actions, depending on the setting of the
limitBehavior
property:
Slows message producers (FLOW_CONTROL
)
Throws out the oldest message in memory (REMOVE_OLDEST
)
Throws out the lowest priority message in memory
(REMOVE_LOW_PRIORITY
)
Rejects the newest messages (REJECT_NEWEST
)
If the default value REJECT_NEWEST
is specified for the
limitBehavior
property, the broker throws out the newest messages
received when memory limits are exceeded. If the message discarded is a
persistent message, the producing client gets an exception which should
be handled by resending the message later.
If any of the other values is selected for the limitBehavior
property
or if the message is not persistent, the application client is not
notified if a message is discarded. Application clients should let the
administrator know how they prefer this property to be set for best
performance and reliability.
This section describes two problems that consumers might need to manage: the undetected loss of a connection, or the loss of a message for distributed synchronous consumers.
Message Queue defines a connection factory attribute for a ping interval. This attribute specifies the interval at which the client runtime should check the client’s connection to the broker. The ping feature is especially useful to Message Queue clients that exclusively receive messages and might therefore not be aware that the absence of messages is due to a connection failure. This feature could also be useful to producers who don’t send messages frequently and who would want notification that a connection they’re planning to use is not available.
The connection factory attribute used to specify this interval is called
imqPingInterval
. Its default value is 30 seconds. A value of -1 or 0,
specifies that the client runtime should not check the client
connection.
Developers should set (or have the administrator set) ping intervals
that are slightly more frequent than they need to send or receive
messages, to allow time to recover the connection in case the ping
discovers a connection failure. Note also that the ping may not occur at
the exact time specified by the value you supply for interval
; the
underlying operating system’s use of i/o buffers may affect the amount
of time needed to detect a connection failure and trigger an exception.
A failed ping operation results in a JMSException
on the subsequent
method call that uses the connection. If an exception listener is
registered on the connection, it will be called when a ping operation
fails.
It is always possible that a message can be lost for synchronous
consumers in a session using AUTO_ACKNOWLEDGE
mode if the provider
fails. To prevent this possibility, you should either use a transacted
session or a session in CLIENT_ACKNOWLEDGE
mode.
Because distributed applications involve greater processing time, such
an application might not behave as expected if it were run locally. For
example, calling the receiveNoWait
method for a synchronous consumer
might return null
even when there is a message available to be
retrieved.
If a client connects to the broker and immediately calls the
receiveNoWait
method, it is possible that the message queued for the
consuming client is in the process of being transmitted from the broker
to the client. The client runtime has no knowledge of what is on the
broker, so when it sees that there is no message available on the
client’s internal queue, it returns with a null
, indicating no
message.
You can avoid this problem by having your client do either of the following:
Use one of the synchronous receive methods that specifies a timeout interval.
Use a queue browser to check the queue before calling the
receiveNoWait
method.
Application design decisions can have a significant effect on overall messaging performance. The most important factors affecting performance are those that impact the reliability of message delivery; among these are the following:
Other application design factors impacting performance include the following:
The sections that follow describe the impact of each of these factors on messaging performance. As a general rule, there is a trade-off between performance and reliability: factors that increase reliability tend to decrease performance.
Table 2-4 shows how application design factors affect messaging performance. The table shows two scenarios—a high-reliability, low-performance scenario and a high-performance, low-reliability scenario—and the choice of application design factors that characterizes each. Between these extremes, there are many choices and trade-offs that affect both reliability and performance.
Table 2-4 Comparison of High Reliability and High Performance Scenarios
Application Design Factor | High Reliability, Low Performance | High Performance, Low Reliability |
---|---|---|
Delivery mode |
Persistent messages |
Nonpersistent messages |
Use of transactions |
Transacted sessions |
No transactions |
Acknowledgment mode |
|
|
Durable/nondurable subscriptions |
Durable subscriptions |
Nondurable subscriptions |
Use of selectors |
Message filtering |
No message filtering |
Message size |
Small messages |
Large messages |
Message body type |
Complex body types |
Simple body types |
Persistent messages guarantee message delivery in case of broker failure. The broker stores these message in a persistent store until all intended consumers acknowledge that they have consumed the message.
Broker processing of persistent messages is slower than for nonpersistent messages for the following reasons:
A broker must reliably store a persistent message so that it will not be lost should the broker fail.
The broker must confirm receipt of each persistent message it receives. Delivery to the broker is guaranteed once the method producing the message returns without an exception.
Depending on the client acknowledgment mode, the broker might need to confirm a consuming client’s acknowledgment of a persistent message.
For both queues and topics with durable subscribers, performance was
approximately 40% faster for non-persistent messages. We obtained these
results using 10K-size messages and AUTO_ACKNOWLEDGE
mode.
A transaction guarantees that all messages produced in a transacted session and all messages consumed in a transacted session will be either processed or not processed (rolled back) as a unit. Message Queue supports both local and distributed transactions.
A message produced or acknowledged in a transacted session is slower than in a non-transacted session for the following reasons:
Additional information must be stored with each produced message.
In some situations, messages in a transaction are stored when normally they would not be. For example, a persistent message delivered to a topic destination with no subscriptions would normally be deleted, however, at the time the transaction is begun, information about subscriptions is not available.
Information on the consumption and acknowledgment of messages within a transaction must be stored and processed when the transaction is committed.
Other than using transactions, you can ensure reliable delivery by
having the client acknowledge receiving a message. If a session is
closed without the client acknowledging the message or if the message
broker fails before the acknowledgment is processed, the broker
redelivers that message, setting a JMSRedelivered
flag.
For a non-transacted session, the client can choose one of three acknowledgment modes, each of which has its own performance characteristics:
AUTO_ACKNOWLEDGE
. The system automatically acknowledges a message
once the consumer has processed it. This mode guarantees at most one
redelivered message after a provider failure.
CLIENT_ACKNOWLEDGE
. The application controls the point at which
messages are acknowledged. All messages processed in that session since
the previous acknowledgment are acknowledged. If the broker fails while
processing a set of acknowledgments, one or more messages in that group
might be redelivered.
(Using CLIENT_ACKNOWLEDGE
mode is similar to using transactions,
except there is no guarantee that all acknowledgments will be processed
together if a provider fails during processing.)
DUPS_OK_ACKNOWLEDGE
. This mode instructs the system to acknowledge
messages in a lazy manner. Multiple messages can be redelivered after a
provider failure.
NO_ACKNOWLEDGE
In this mode, the broker considers a message
acknowledged as soon as it has been written to the client. The broker
does not wait for an acknowledgment from the receiving client. This mode
is best used by typic subscribers who are not worried about reliability.
Performance is impacted by acknowledgment mode for the following reasons:
Extra control messages between broker and client are required in
AUTO_ACKNOWLEDGE
and CLIENT_ACKNOWLEDGE
modes. The additional
control messages add processing overhead and can interfere with JMS
payload messages, causing processing delays.
In AUTO_ACKNOWLEDGE
and CLIENT_ACKNOWLEDGE
modes, the client must
wait until the broker confirms that it has processed the client’s
acknowledgment before the client can consume more messages. (This broker
confirmation guarantees that the broker will not inadvertently redeliver
these messages.)
The Message Queue persistent store must be updated with the acknowledgment information for all persistent messages received by consumers, thereby decreasing performance.
Subscribers to a topic destination have either durable and nondurable subscriptions. Durable subscriptions provide increased reliability at the cost of slower throughput for the following reasons:
The Message Queue message broker must persistently store the list of messages assigned to each durable subscription so that should the broker fail, the list is available after recovery.
Persistent messages for durable subscriptions are stored persistently, so that should a broker fail, the messages can still be delivered after recovery, when the corresponding consumer becomes active. By contrast, persistent messages for nondurable subscriptions are not stored persistently (should a broker fail, the corresponding consumer connection is lost and the message would never be delivered).
We compared performance for durable and non-durable subscribers in two
cases: persistent and nonpersistent 10k-sized messages. Both cases use
AUTO_ACKNOWLEDGE
acknowledgment mode. We found a performance impact
only in the case of persistent messages, which slowed messages conveyed
to durable subscribers by about 30%.
Application developers can have the messaging provider sort messages
according to criteria specified in the message selector associated with
a consumer and deliver to that consumer only those messages whose
property value matches the message selector. For example, if an
application creates a subscriber to the topic WidgetOrders
and
specifies the expression NumberOfOrders>1000
for the message selector,
messages with a NumberOfOrders
property value of 1001
or more are
delivered to that subscriber.
Creating consumers with selectors lowers performance (as compared to using multiple destinations) because additional processing is required to handle each message. When a selector is used, it must be parsed so that it can be matched against future messages. Additionally, the message properties of each message must be retrieved and compared against the selector as each message is routed. However, using selectors provides more flexibility in a messaging application and may lower resource requirements at the expense of speed.
Message size affects performance because more data must be passed from producing client to broker and from broker to consuming client, and because for persistent messages a larger message must be stored.
However, by batching smaller messages into a single message, the routing and processing of individual messages can be minimized, providing an overall performance gain. In this case, information about the state of individual messages is lost.
In our tests, which compared throughput in kilobytes per second for 1K,
10K, and 100K-sized messages to a queue destination using
AUTO_ACKNOWLEDGE
mode, we found that non-persistent messaging was
about 50% faster for 1K messages, about 20% faster for 10K messages, and
about 5% faster for 100K messages. The size of the message affected
performance significantly for both persistent and non-persistent
messages. 100k messages are about 10 times faster than 10K, and 10K
messages are about 5 times faster than 1K.
JMS supports five message body types, shown below roughly in the order of complexity:
Bytes: Contains a set of bytes in a format determined by the application
Text: Is a simple java.lang.String
Stream: Contains a stream of Java primitive values
Map: Contains a set of name-and-value pairs
Object: Contains a Java serialized object
While, in general, the message type is dictated by the needs of an application, the more complicated types (map and object) carry a performance cost — the expense of serializing and deserializing the data. The performance cost depends on how simple or how complicated the data is.
Connection event notifications allow a Message Queue client to listen for closure and reconnection events and to take appropriate action based on the notification type and the connection state. For example, when a failover occurs and the client is reconnected to another broker, an application might want to clean up its transaction state and proceed with a new transaction.
If the Message Queue provider detects a serious problem with a
connection, it calls the connection object’s registered exception
listener. It does this by calling the listener’s onException
method,
and passing it a JMSException
argument describing the problem. In the
same way, the Message Queue provider offers an event notification API
that allows the client runtime to inform the application about
connection state changes. The notification API is defined by the
following elements:
The com.sun.messaging.jms.notification
package, which defines the
event listener and the notification event objects .
The com.sun.messaging.jms.Connection
interface, which defines
extensions to the jakarta.jms.Connection
interface.
The following sections describe the events that can trigger notification and explain how you can create an event listener.
The following table lists and describes the events that can be returned by the event listener.
Note that the JMS exception listener is not called when a connection event occurs. The exception listener is only called if the client runtime has exhausted its reconnection attempts. The client runtime always calls the event listener before the exception listener.
Table 2-5 Notification Events
Event Type | Meaning |
---|---|
|
The Message Queue client runtime generates this event when it receives a notification from the broker that a connection is about to be closed due to a shutdown requested by the administrator. |
|
The Message Queue client runtime generates this event when a connection is closed due to a broker error or when it is closed due to a shutdown or restart requested by the administrator. When an event listener receives a |
|
The Message Queue client runtime has reconnected to a broker. This could be the same broker to which the client was previously connected or a different broker. An application can use the |
|
The Message Queue client runtime has failed to reconnect to a broker. Each time a reconnect attempt fails, the runtime generates a new event and delivers it to the event listener. The JMS exception listener is not called when a connection event occurs. It is only called if the client runtime has exhausted its reconnection attempts. The client runtime always calls the event listener before the exception listener. |
The following code example illustrates how you set a connection event
listener. Whenever a connection event occurs, the event listener’s
onEvent
method will be invoked by the client runtime.
//create an MQ connection factory.
com.sun.messaging.ConnectionFactory factory =
new com.sun.messaging.ConnectionFactory();
//create an MQ connection.
com.sun.messaging.jms.Connection connection =
(com.sun.messaging.jms.Connection )factory.createConnection();
//construct an MQ event listener. The listener implements
//com.sun.messaging.jms.notification.EventListener interface.
com.sun.messaging.jms.notification.EventListener eListener =
new ApplicationEventListener();
//set event listener to the MQ connection.
connection.setEventListener ( eListener );
In this example, an application chooses to have its event listener log the connection event to the application’s logging system.
public class ApplicationEventListener implements
com.sun.messaging.jms.notification.EventListener {
public void onEvent ( com.sun.messaging.jms.notification.Event connEvent ) {
log (connEvent);
}
private void log ( com.sun.messaging.jms.notification.Event connEvent ) {
String eventCode = connEvent.getEventCode();
String eventMessage = connEvent.getEventMessage();
//write event information to the output stream.
}
}
Consumer event notifications allow a Message Queue client to listen for the existence of consumers on a destination. Thus, for example, a producer client can start or stop producing messages to a given destination based on the existence of consumers on the destination.
The following sections describe the events that can trigger notification and explain how you can create an event listener.
The following table lists and describes the events that can be returned by the event listener.
Table 2-6 Consumer Notification Events
Event Type | Meaning |
---|---|
|
This event is generated when consumer existence
changes on a destination. The event has two possible event codes:
|
The following code example illustrates how you set and remove a consumer
event listener. Whenever a consumer event occurs, the event listener’s
onEvent
method will be invoked by the client runtime.
//create an MQ connection factory.
com.sun.messaging.ConnectionFactory factory =
new com.sun.messaging.ConnectionFactory();
//create an MQ connection.
com.sun.messaging.jms.Connection connection =
(com.sun.messaging.jms.Connection)factory.createConnection();
//create an MQ session
com.sun.messaging.jms.Session session =
(com.sun.messaging.jms.Session)connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
//create a queue
com.sun.messaging.Queue queue =
(com.sun.messaging.Queue)session.createQueue(strQueueName);
//construct an MQ event listener. The listener implements
//com.sun.messaging.jms.notification.EventListener interface.
com.sun.messaging.jms.notification.EventListener consEvtListener =
new MyConsumerEventListener();
//set consumer event listener.
connection.setConsumerEventListener
( (com.sun.messaging.Destination)queue, consEvtListener );
//perform activities
//remove consumer event listener.
connection.removeConsumerEventListener
( (com.sun.messaging.Destination)queue );
In this example, an application chooses to have its event listener set a boolean flag to give ongoing consumer availability information.
public class MyEventListener implements
com.sun.messaging.jms.notification.EventListener {
boolean consumerReady = false;
MyEventListener(){
consumerReady = false;
}
public void onEvent(com.sun.messaging.jms.notification.Event evt) {
if (evt.getEventCode().equals(
com.sun.messaging.jms.notification.ConsumerEvent.CONSUMER_NOT_READY
)) {
synchronized(this){
consumerReady=false;
}
} else if (evt.getEventCode().equals(
com.sun.messaging.jms.notification.ConsumerEvent.CONSUMER_READY
)) {
synchronized(this){
consumerReady=true;
}
}
}
}
Message Queue supports client connection failover. A failed connection can be automatically restored not only to the original broker, but to a different broker in a broker cluster. There are circumstances under which the client-side state cannot be restored on any broker during an automatic reconnection attempt; for example, when the client uses transacted sessions or temporary destinations. At such times the connection exception handler is called and the application code has to catch the exception and restore state.
This section explains how automatic reconnection is enabled, how the broker behaves during a reconnect, how automatic reconnection impacts producers and consumers, and how producers and consumers should handle exceptions that result from connection failover. For additional information about this feature, see "Connection Handling" in Open Message Queue Administration Guide.
Message Queue also provides a notification API that allows the client application to listen for closure and reconnection events and to respond to such events based on the notification type and the connection state. These notifications may be valuable in preparing the client for an impending event or for gathering diagnostic data. For more information, see Connection Event Notification.
Starting with version 4.1 of Message Queue, you can cluster brokers in either a conventional cluster or a high-availability cluster. The clustering model used may affect your client design. This section notes such design differences.
If you are using conventional clusters, you enable automatic
reconnection by setting the connection factory imqReconnectEnabled
attribute to true
. If you are using a high availability cluster, the
imqReconnectEnabled
attribute is ignored; the client runtime will
automatically reconnect to a backup broker if the connection is lost and
not regained after no more than imqReconnectAttempts
attempts. This
applies to all deployment configurations: whether Message Queue is used
stand alone or whether the connection is created through a resource
adapter.
No matter which type of cluster you are using, you must also configure the connection factory administered object to specify the following information.
A list of message-service addresses (using the imqAddressList
attribute). Independently of the clustering model used, the client
runtime uses this address list when it establishes the initial
connection.
When you connect to a conventional cluster, the client runtime also uses
the address list when it tries to reestablish a connection to the
message service: it attempts to connect to the brokers in the list until
it finds (or fails to find) an available broker. If you specify only a
single broker instance on the imqAddressList
attribute, the
configuration won’t support recovery from hardware failure.
When you specify more than one broker, you can decide whether to use
parallel brokers or a broker cluster. In a parallel configuration, there
is no communication between brokers, while in a broker cluster, the
brokers interact to distribute message delivery loads. (Refer to
"Cluster Message Delivery" in Open Message Queue
Technical Overview for more information on broker clusters.)
To enable parallel-broker reconnection, set
the`imqAddressListBehavior` attribute to PRIORITY
. Typically, you
would specify no more than a pair of brokers for this type of
reconnection. This way, the messages are published to one broker, and
all clients fail over together from the first broker to the second.
To enable clustered-broker reconnection, set the
imqAddressListBehavior
attribute to RANDOM
. This way, the client
runtime randomizes connection attempts across the list, and client
connections are distributed evenly across the broker cluster.
Each broker in a cluster uses its own separate persistent store (which
means that undelivered persistent messages are unavailable until a
failed broker is back online). If one broker crashes, its client
connections are reestablished on other brokers.
If you use the high availability clustering model, the address list is
dynamically updated to include the brokers that are connected to the
highly available database serving the cluster. In this case, the client
runtime and the brokers use an internal protocol to determine which
broker takes over the persistent data of the failed broker. Therefore
the imqAddressListBehavior
property does not apply to this model.
The number of iterations to be made over the list of brokers (using
the imqAddressListIterations
attribute) when attempting to create a
connection or to reconnect.
For high-availability clusters, the broker will attempt to reconnect
forever (no matter what value you specify for this attribute). If the
client does not want this behavior, it must explicitly close the
connection.
The number of attempts to reconnect to a broker if the first
connection fails (using the imqReconnectAttempts
attribute).
The interval, in milliseconds, between reconnect attempts, using the
imqReconnectInterval
attribute. This attribute applies to both
clustering models.
Configure your connection-factory object as follows:
Example 2-3 Example of Command to Configure a Single Broker
imqobjmgr add -t cf -l "cn=myConnectionFactory" \
-o "imqAddressList=mq://jpgserv/jms" \
-o "imqReconnect=true" \
-o "imqReconnectAttempts=10"
-j "java.naming.factory.initial =
com.sun.jndi.fscontext.RefFSContextFactory
-j "java.naming.provider.url= file:///home/foo/imq_admin_objects"
This command creates a connection-factory object with a single address
in the broker address list. If connection fails, the client runtime will
try to reconnect with the broker 10 times. If an attempt to reconnect
fails, the client runtime will sleep for three seconds (the default
value for the imqReconnectInterval
attribute) before trying again.
After 10 unsuccessful attempts, the application will receive a
JMSException
.
You can ensure that the broker starts automatically at system start-up
time. See "Starting Brokers Automatically" in Open
Message Queue Administration Guide for information on how to configure
automatic broker start-up. For example, on the Solaris platform, you can
use /etc/rc.d
scripts.
Configure your connection-factory objects as follows:
Example 2-4 Example of Command to Configure Parallel Brokers
imqobjmgr add -t cf -l "cn=myCF" \
-o "imqAddressList=myhost1, mqtcp://myhost2:12345/jms" \
-o "imqReconnect=true" \
-o "imqReconnectRetries=5"
-j "java.naming.factory.initial =
com.sun.jndi.fscontext.RefFSContextFactory
-j "java.naming.provider.url= file:///home/foo/imq_admin_objects"
This command creates a connection factory object with two addresses in
the broker list. The first address describes a broker instance running
on the host myhost1
with a standard port number (7676
). The second
address describes a jms
connection service running at a statically
configured port number (12345
).
Configure your connection-factory objects as follows:
Example 2-5 Example of Command to Configure a Broker Cluster
imqobjmgr add -t cf -l "cn=myConnectionFactory" \
-o "imqAddressList=mq://myhost1/ssljms, \
mq://myhost2/ssljms, \
mq://myhost3/ssljms, \
mq://myhost4/ssljms" \
-o "imqReconnect=true" \
-o "imqReconnectRetries=5" \
-o "imqAddressListBehavior=RANDOM"
-j "java.naming.factory.initial =
com.sun.jndi.fscontext.RefFSContextFactory
-j "java.naming.provider.url= file:///home/foo/imq_admin_objects"
This command creates a connection factory object with four addresses in
the imqAddressList
. All the addresses point to jms
services running
on SSL transport on different hosts. Since the imqAddressListBehavior
attribute is set to RANDOM
, the client connections that are
established using this connection factory object will be distributed
randomly among the four brokers in the address list. If you are using a
high availability cluster, the RANDOM
attribute is ignored during a
failover reconnect after losing an existing connection to a broker.
For a conventional cluster, you must configure one of the brokers in the cluster as the master broker.In the connection-factory address list, you can also specify a subset of all the brokers in the cluster.
A broker treats an automatic reconnection as it would a new connection. When the original connection is lost, all resources associated with that connection are released. For example, in a broker cluster, as soon as one broker fails, the other brokers assume that the client connections associated with the failed broker are gone. After auto-reconnect takes place, the client connections are recreated from scratch.
Sometimes the client-side state cannot be fully restored by auto-reconnect. Perhaps a resource that the client needs cannot be recreated. In this case, the client runtime calls the client’s connection exception handler and the client must take appropriate action to restore state. For additional information, see Handling Exceptions When Failover Occurs.
If the client is automatically-reconnected to a different broker instance, effects vary depending on the clustering model used.
In a conventional cluster, persistent messages produced but not yet consumed may only be delivered to the consumer after the original broker recovers. Other state information held by the failed or disconnected broker can be lost. The messages held by the original broker, once it is restored, might be delivered out of order.
In a high availability cluster, messages produced but not yet consumed continue to be delivered to the consumer without the original broker needing to recover.
A transacted session is the most reliable method of ensuring that a
message isn’t lost if you are careful in coding the transaction. If
auto-reconnect happens in the middle of a transaction, any attempt to
produce or consume messages will cause the client runtime to throw a
JMSException
. In this case, applications must call
Session.rollback()
to roll back the transaction.
The Message Queue client runtime may throw a
TransactionRolledBackException
when Session.commit()
is called
during or after a failover occurs. In this case, the transaction is
rolled back and a new transaction is automatically started. Applications
are not required to call Session.rollback()
to rollback the
transaction after receiving a TransactionRolledBackException
.
The Message Queue client runtime may throw a JMSException
when
Session.commit()
is called during or after a failover occurs. In this
case, the transaction state is unknown (may or may not be committed).
Applications should call Session.rollback()
to roll back the
uncommitted transaction.
If you are using a high availability cluster, the only time your transaction might wind up in an unknown state is if it is not possible to reconnect to any brokers in the cluster. This should happen rarely if ever. For additional information, see Handling Exceptions When Failover Occurs.
Automatic reconnection affects producers and consumers differently:
During reconnection, producers cannot send messages. The production of messages (or any operation that involves communication with the message broker) is blocked until the connection is reestablished.
For consumers, automatic reconnection is supported for all client
acknowledgment modes. After a connection is reestablished, the broker
will redeliver all unacknowledged messages it had previously delivered,
marking them with a Redeliver
flag. The client can examine this flag
to determine whether any message has already been consumed (but not yet
acknowledged). In the case of nondurable subscribers, some messages
might be lost because the broker does not hold their messages once their
connections have been closed. Any messages produced for nondurable
subscribers while the connection is down cannot be delivered when the
connections is reestablished. For additional information, see
Handling Exceptions When Failover Occurs.
Notice the following points when using the auto-reconnect feature:
Messages might be redelivered to a consumer after auto-reconnect takes place. In auto-acknowledge mode, you will get no more than one redelivered message. In other session types, all unacknowledged persistent messages are redelivered.
While the client runtime is trying to reconnect, any messages sent by the broker to nondurable topic consumers are lost.
Any messages that are in queue destinations and that are unacknowledged when a connection fails are redelivered after auto-reconnect. However, in the case of queues delivering to multiple consumers, these messages cannot be guaranteed to be redelivered to the original consumers. That is, as soon as a connection fails, an unacknowledged queue message might be rerouted to other connected consumers.
In the case of a conventional broker cluster, the failure of the master broker prevents the following operations from succeeding on any other broker in the cluster:
Creating or destroying a new durable subscription.
Creating or destroying a new physical destination using the
imqcmd create dst
command.
Starting a new broker process. (However, the brokers that are already
running continue to function normally even if the master broker goes
down.)
You can configure the master broker to restart automatically using
Message Queue broker support for rc
scripts or the Windows service
manager.
Auto-reconnect doesn’t work if the client uses a ConnectionConsumer
to consume messages. In that case, the client runtime throws an
exception.
Several kinds of exceptions can occur as a result of the client being reconnected after a failover. How the client application should handle these exceptions depends on whether a session is transacted, on the kind of exception thrown, and on the client’s role—as producer or consumer. The following sections discuss the implications of these factors.
Independently of how the exception is raised, the client application must never call `System.exit()`to exit the application because this would prevent the Message Queue client runtime from reconnecting to an alternate or restarted broker.
When a failover occurs, exception messages may be shown on the application’s console and recorded in the broker’s log. These messages are for information only. They may be useful in troubleshooting, but minimizing or eliminating the impact of a failover is best handled preemptively by the application client in the ways described in the following sections.
Note
|
Message Queue provides a notification API that allows the client application to listen for closure and reconnection events and to respond to such events based on the notification type and the connection state. These notifications may be valuable in preparing the client for an impending event or for gathering diagnostic data. For more information, see Connection Event Notification |
A transacted session might fail to commit and (throw an exception)
either because a failover occurs while statements within the transaction
are being executed or because the failover occurs during the call to
Session.commit()
. In the first case, the failover is said to occur
during an open transaction; in the second case, the failover occurs
during the commit itself.
In the case of a failover during an open transaction, when the client
application calls Session.commit()
, the client runtime will throw a
TransactionRolledBackException
and roll back the transaction causing
the following to happen.
Messages that have been produced (but not committed) in the transacted session are discarded and not delivered to the consumer.
All messages that have been consumed (but not committed) in the
transacted session are redelivered to the consumer with the Redeliver
flag set.
A new transaction is automatically started.
If the client application itself had called Session.rollback
after a
failover (before the Session.commit
is executed) the same things would
happen as if the application had received a
TransactionRollbackException.
After receiving a
TransactionRollbackException
or calling Session.rollback()
, the
client application must retry the failed transaction. That is, it must
re-send and re-consume the messages that were involved in the
failed-over transaction.
In the second case, when the failover occurs during a call to
Session.commit
, there may be three outcomes:
The transaction is committed successfully and the call to
Session.commit
does not return an exception. In this case, the
application client does not have to do anything.
The runtime throws a TransactionRolledbackException
and does not
commit the transaction. The transaction is automatically rolled back by
the Message Queue runtime. In this case, the client application must
retry the transaction as described for the case in which an open
transaction is failed-over.
A JMXException
is thrown. This signals the fact that the transaction
state is unknown: It might have either succeeded or failed. A client
application should handle this case by assuming failure, pausing for
three seconds, calling Session.rollback
, and then retrying the
operations. However, since the commit might have succeeded, when
retrying the transacted operations, a producer should set
application-specific properties on the messages it re-sends to signal
that these might be duplicate messages. Likewise, consumers that retry
receive operations should not assume that a message that is redelivered
is necessarily a duplicate. In other words, to ensure once and only once
delivery, both producers and consumers need to do a little extra work to
handle this edge case. The code samples presented next illustrate good
coding practices for handling this situation.
If you are using a high availability cluster, the only time this
condition might arise is when the client is unable to connect to any
backup broker. This should be extremely rare.
The next two examples illustrate how stand-alone Message Queue producers and consumers should handle transactions during a failover. To run the sample programs, do the following:
Start two high availability brokers. The brokers can be on the same machine or on different machines, but they must be in the same cluster.
Start the example programs. For example:
java —DimqAddressList="localhost:777" test.jmsclient.ha.FailoverQSender java —DimqAddressList="localhost:777" test.jmsclient.ha.FailoverQReceiver
It does not matter in what order you start the programs. The only
property that you must specify is imqAddressList
. The client
application will be automatically failed over to a backup broker if the
connection to its home broker fails. (The imqReconnectEnabled
and
imqAddressListIterations
properties are ignored for a high
availability cluster.)
3. Kill the broker to which the producing or consuming application is
connected. The clients will reconnect, validate, and continue the failed
transaction. A message produced or consumed in a transaction is either
committed or rolled back after a successful failover.
4. You can restart the dead broker and retry the failover operation by
killing the new home broker.
Transacted Session: Failover Producer Example
The following code sample shows the work that a producer in a transacted session needs to do to recover state after a failover. Note how the application tests both for rollback exceptions and for JMS exceptions. Note also the use of a counter to allow the producer and consumer to verify message order and delivery.
/*
* @(#)FailoverQSender.java 1.2 07/04/20
*
* Copyright (c) 2000, 2019 Oracle and/or its licensees. All Rights Reserved.
* This program and the accompanying materials are made available under the
* terms of the Eclipse Distribution License v. 1.0, which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*
*/
package test.jmsclient.ha;
import java.util.Date;
import jakarta.jms.*;
import com.sun.messaging.jms.Connection;
import com.sun.messaging.jms.notification.*;
/**
*
* This sample program uses a transacted session to send messages.
* It is designed to run with test.jmsclient.ha.FailoverQReceiver
* @version 1.0
*/
public class FailoverQSender
implements ExceptionListener, EventListener, Runnable {
//constant - commit property name
public static final String COMMIT_PROPERTY_NAME = "COMMIT_PROPERTY";
//constant - message counter
public static final String MESSAGE_COUNTER = "counter";
//constant - destination name
public static final String TEST_DEST_NAME = "FailoverTestDest001";
//queue connection
QueueConnection conn = null;
//session
QueueSession session = null;
//queue sender
QueueSender sender = null;
//queue destination
Queue queue = null;
//commmitted counter.
private int commitCounter = 0;
//current message counter
private int currentCounter = 0;
//set to true if the application is connected to the broker.
private boolean isConnected = false;
/**
* Default constructor - do nothing.
* Properties are passed in from init() method.
*/
public FailoverQSender() {
//set up JMS environment
setup();
}
/**
* Connection Exception listener.
*/
public void onException (JMSException e) {
//The run() method will exit.
this.isConnected = false;
log ("Exception listener is called.
Connection is closed by MQ client runtime." );
log (e);
}
/**
* this method is called when a MQ connection event occurred.
*/
public void onEvent (Event connectionEvent) {
log(connectionEvent);
}
/**
* Rollback the application data.
*
*/
private void rollBackApplication() {
this.currentCounter = this.commitCounter;
log ("Application rolled back., current (commit) counter: "
+ currentCounter);
}
/**
* Roll back the current jms session.
*/
private void rollBackJMS() {
try {
log("Rolling back JMS ...., commit counter: " + commitCounter);
session.rollback();
} catch (JMSException jmse) {
log("Rollback failed");
log(jmse);
//application may decide to log and continue sending messages
// without closing the application.
close();
}
}
/**
* rollback application data and jms session.
*
*/
private void rollBackAll() {
//rollback jms
rollBackJMS();
//rollback app data
rollBackApplication();
}
/**
* close JMS connection and stop the application
*
*/
private void close() {
try {
if ( conn != null ) {
//close the connection
conn.close();
}
} catch (Exception e) {
//log exception
log (e);
} finally {
//set flag to true. application thread will exit
isConnected = false;
}
}
/**
* Send messages in a loop until the connection is closed.
* Session is committed for each message sent.
*/
public void run () {
//start producing messages
while (isConnected) {
try {
//reset message counter if it reaches max int value
checkMessageCounter();
//create a message
Message m = session.createMessage();
//get the current message counter value
int messageCounter = this.getMessageCounter();
//set message counter to message property
m.setIntProperty(MESSAGE_COUNTER, messageCounter);
//set commit property
m.setBooleanProperty(COMMIT_PROPERTY_NAME, true);
//send the message
sender.send(m);
log("Sending message: " + messageCounter +
", current connected broker: " +
this.getCurrentConnectedBrokerAddress());
//commit the message
this.commit();
// pause 3 seconds
sleep(3000);
} catch (TransactionRolledBackException trbe) {
//rollback app data
rollBackApplication();
} catch (JMSException jmse) {
if (isConnected == true) {
//rollback app data and JMS session
rollBackAll();
}
}
}
}
/**
* Reset all counters if integer max value is reached.
*/
private void checkMessageCounter() {
if ( currentCounter == Integer.MAX_VALUE ) {
currentCounter = 0;
commitCounter = 0;
}
}
/**
* Set up testing parameters - connection, destination, etc
*/
protected void setup() {
try {
//get connection factory
com.sun.messaging.QueueConnectionFactory factory =
new com.sun.messaging.QueueConnectionFactory();
//create a queue connection
conn = factory.createQueueConnection();
//set exception listener
conn.setExceptionListener(this);
//set event listener
( (com.sun.messaging.jms.Connection) conn).setEventListener(this);
//get destination name
String destName = TEST_DEST_NAME;
//create a transacted session
session = conn.createQueueSession(true, Session.AUTO_ACKNOWLEDGE);
//get destination
queue = session.createQueue(destName);
//create queue sender
sender = session.createSender(queue);
//set isConnected flag to true.
this.isConnected = true;
} catch (JMSException jmse) {
this.isConnected = false;
}
}
/**
* get the next message counter.
*/
private synchronized int getMessageCounter () {
return ++ currentCounter;
}
/**
* commit the current transaction/session.
*/
private void commit() throws JMSException {
session.commit();
this.commitCounter = currentCounter;
log ("Transaction committed, commit counter: " +commitCounter);
}
/**
* Get the current connencted broker address.
*/
private String getCurrentConnectedBrokerAddress() {
return ((com.sun.messaging.jms.Connection)conn).getBrokerAddress();
}
/**
* log a string message.
* @param msg
*/
private synchronized void log (String msg) {
System.out.println(new Date() + ": " + msg);
}
/**
* Log an exception received.
*/
private synchronized void log (Exception e) {
System.out.println(new Date() + ": Exception:");
e.printStackTrace();
}
/**
* Log the specified MQ event.
*/
private synchronized void log (Event event) {
try {
System.out.println(new Date() + ": Received MQ event notification.");
System.out.println("*** Event code: " + event.getEventCode() );
System.out.println("*** Event message: " + event.getEventMessage());
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* pause the specified milli seconds.
*/
private void sleep (long millis) {
try {
Thread.sleep(millis);
} catch (java.lang.InterruptedException inte) {
log (inte);
}
}
/**
* The main program.
*/
public static void main (String args[]) {
FailoverQSender fp = new FailoverQSender();
fp.run();
}
}
Transacted Session: Failover Consumer Example
The following code sample shows the work that a consumer in a transacted session needs to do in order to recover state after a failover. Note how the application tests both for rollback exceptions and JMS exceptions. Note also the use of a counter to allow the producer and consumer to verify message order and delivery.
/*
* @(#)FailoverQReceiver.java 1.4 07/04/20
*
* Copyright (c) 2000, 2019 Oracle and/or its licensees. All Rights Reserved.
* This program and the accompanying materials are made available under the
* terms of the Eclipse Distribution License v. 1.0, which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*
*/
package test.jmsclient.ha;
import java.util.Date;
import java.util.Vector;
import jakarta.jms.*;
import com.sun.messaging.jms.notification.*;
/**
* This sample program uses a transacted session to receive messages.
* It is designed to run with test.jmsclient.ha.FailoverQSender.
*
* @version 1.0
*/
public class FailoverQReceiver
implements ExceptionListener, EventListener, Runnable {
//queue connection
private QueueConnection conn = null;
//queue session
private QueueSession session = null;
//qreceiver
private QueueReceiver qreceiver = null;
//queue destination
private Queue queue = null;
//commmitted counter.
private int commitCounter = 0;
//flag to indicate if the connection is connected to the broker.
private boolean isConnected = false;
//flag to indicate if current connection is to HA broker cluster.
private boolean isHAConnection = false;
//application data holder.
private Vector data = new Vector();
/**
* Default constructor - JMS setup.
*/
public FailoverQReceiver() {
//set up JMS environment
setup();
}
/**
* Connection Exception listener.
*/
public void onException (JMSException e) {
//The run() method will exit.
this.isConnected = false;
log ("Exception listener is called. Connection is closed
by MQ client runtime." );
log (e);
}
/**
* log the connection event.
*/
public void onEvent (Event connectionEvent) {
log (connectionEvent);
}
/**
* Roll back application data.
*/
private void rollBackApplication() {
//reset application data
this.reset();
log ("Rolled back application data, current commit counter:
" + commitCounter);
}
/**
* Clear the application data for the current un-committed transaction.
*/
private void reset() {
data.clear();
}
/**
* Roll back JMS transaction and application.
*/
private void rollBackAll() {
try {
//rollback JMS
rollBackJMS();
//rollback application data
rollBackApplication();
} catch (Exception e) {
log ("rollback failed. closing JMS connection ...");
//application may decide NOT to close connection if rollback failed.
close();
}
}
/**
* Roll back jms session.
*/
private void rollBackJMS() throws JMSException {
session.rollback();
log("JMS session rolled back ...., commit counter:
" + commitCounter);
}
/**
* Close JMS connection and exit the application.
*/
private void close() {
try {
if ( conn != null ) {
conn.close();
}
} catch (Exception e) {
log (e);
} finally {
isConnected = false;
}
}
/**
* Receive, validate, and commit messages.
*/
public void run () {
//produce messages
while (isConnected) {
try {
//receive message
Message m = qreceiver.receive();
//process message -- add message to the data holder
processMessage(m);
//check if the commit flag is set in the message property
if ( shouldCommit(m) ) {
//commit the transaction
commit(m);
}
} catch (TransactionRolledBackException trbe) {
log ("transaction rolled back by MQ ...");
//rollback application data
rollBackApplication();
} catch (JMSException jmse) {
//The exception can happen when receiving messages
//and the connected broker is killed.
if ( isConnected == true ) {
//rollback MQ and application data
rollBackAll();
}
} catch (Exception e) {
log (e);
//application may decide NOT to close the connection
//when an unexpected Exception occurred.
close();
}
}
}
/**
* Set up testing parameters - connection, destination, etc
*/
protected void setup() {
try {
//get connection factory
com.sun.messaging.QueueConnectionFactory factory =
new com.sun.messaging.QueueConnectionFactory();
//create jms connection
conn = factory.createQueueConnection();
//set exception listener
conn.setExceptionListener(this);
//set event listener
( (com.sun.messaging.jms.Connection) conn).setEventListener(this);
//test if this is a HA connection
isHAConnection = ( (com.sun.messaging.jms.Connection)
conn).isConnectedToHABroker();
log ("Is connected to HA broker cluster: " + isHAConnection);
//get destination name
String destName = FailoverQSender.TEST_DEST_NAME;
//create a transacted session
session = conn.createQueueSession(true, -1);
//get destination
queue = session.createQueue(destName);
//create queue receiver
qreceiver = session.createReceiver(queue);
//set isConnected flag to true
isConnected = true;
//start the JMS connection
conn.start();
log("Ready to receive on destination: " + destName);
} catch (JMSException jmse) {
isConnected = false;
log (jmse);
close();
}
}
/**
* Check if we should commit the transaction.
*/
private synchronized boolean shouldCommit(Message m) {
boolean flag = false;
try {
//get the commit flag set by the FailoverQSender
flag = m.getBooleanProperty(FailoverQSender.COMMIT_PROPERTY_NAME);
if ( flag ) {
//check if message property contains expected message counter
validate (m);
}
} catch (JMSException jmse) {
log (jmse);
}
return flag;
}
/**
* A very simple validation only. More logic may be added to validate
* message ordering and message content.
* @param m Message The last message received for the current transaction.
*/
private void validate (Message m) {
try {
//get message counter property
int counter = m.getIntProperty(FailoverQSender.MESSAGE_COUNTER);
//The counter is set sequentially and must be received in right order.
//Each message is committed after validated.
if (counter != (commitCounter + 1)) {
this.printData();
throw new RuntimeException("validation failed.");
}
log ("messages validated. ready to commit ...");
} catch (JMSException jmse) {
log (jmse);
printData();
throw new RuntimeException("Exception occurred during validation:
" + jmse);
}
}
/**
* Get the message counter and put it in the data holder.
* @param m the current message received
*/
private synchronized void processMessage(Message m) throws JMSException {
// get message counter. this value is set by the FailoverQSender.
int ct = m.getIntProperty(FailoverQSender.MESSAGE_COUNTER);
// log the message
log("received message: " + ct
+", current connected broker:
" + this.getCurrentConnectedBrokerAddress());
// saved the data in data holder.
data.addElement(new Integer(ct));
}
/**
* commit the current transaction.
* @param m the last received message to be committed.
* @throws JMSException if commit failed.
*/
private void commit(Message m) throws JMSException {
//commit the transaction
session.commit();
//get the current message counter
int counter = m.getIntProperty(FailoverQSender.MESSAGE_COUNTER);
//set the commit counter
commitCounter = counter;
//clear app data
this.reset();
log ("Messages committed, commitCounter: " + commitCounter);
}
/**
* log exception.
*/
private synchronized void log (Exception e) {
System.out.println(new Date() + ": Exception Stack Trace: ");
e.printStackTrace();
}
/**
* log connection event.
*/
private synchronized void log (Event event) {
try {
System.out.println(new Date()
+ ": Received MQ event notification.");
System.out.println("*** Event Code: " + event.getEventCode() );
System.out.println("*** Event message: " + event.getEventMessage());
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* Log the specified message.
*/
private void log (String msg) {
System.out.println(new Date() + ": " + msg);
}
/**
* print values stored in the data holder.
*
*/
private void printData() {
for ( int i=0; i< data.size(); i++) {
log (" *** data index " + i + " = " + data.elementAt(i) );
}
}
private String getCurrentConnectedBrokerAddress() {
return ((com.sun.messaging.jms.Connection)conn).getBrokerAddress();
}
/**
* The main method. This starts the failover queue receiver.
*/
public static void main (String args[]) {
FailoverQReceiver fqr = new FailoverQReceiver();
fqr.run();
}
}
If a connection is failed-over for a producer in a non-transacted
session, a client application may receive a JMSException.
The
application thread that receives the exception should pause for a few
seconds and then resend the messages. The client application may want to
set a flag on the resent messages to indicate that they could be
duplicates.
If a connection is failed over for a message consumer, the consequences vary with the sessions acknowledge mode:
In client-acknowledge mode, calling Message.acknowledge
or
MessageConsumer.receive
during a failover will raise a JMSException.
The consumer should call Session.recover
to recover or re-deliver the
unacknowledged messages and then call Message.acknowledge
or
MessageConsumer.receive
.
In auto-acknowledge mode, after getting a JMSException,
the
synchronous consumer should pause a few seconds and then call
MessageConsumer.receive
to continue receiving messages. Any message
that failed to be acknowledged (due to the failover) will be redelivered
with the redelivered flags set to true.
In dups-OK-acknowledge
mode, the synchronous consumer should pause a
few seconds after getting an exception and then call
MessageConsumer.receive
to continue receiving messages. In this case,
it’s possible that messages delivered and acknowledged (before the
failover) could be redelivered.
Failover Producer Example
The following code sample illustrates good coding practices for handling
exceptions during a failover. It is designed to send non-transacted,
persistent messages forever and to handle JMSExceptions when a failover
occurs. The program is able to handle either a true or false setting for
the imqReconnectEnabled
property. To run the program enter one of the
following commands.
java dura.example.FailoverProducer
java -DimqReconnectEnabled=true dura.example.FailoverProducer
/*
* @(#)FailoverProducer.java 1.1 06/06/09
* Copyright (c) 2000, 2019 Oracle and/or its licensees. All Rights Reserved.
* This program and the accompanying materials are made available under the
* terms of the Eclipse Distribution License v. 1.0, which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*
*/
package dura.example;
import jakarta.jms.*;
import com.sun.messaging.ConnectionConfiguration;
import java.util.*;
public class FailoverProducer implements ExceptionListener {
//connection factory
private com.sun.messaging.TopicConnectionFactory factory;
//connection
private TopicConnection pconn = null;
//session
private TopicSession psession = null;
//publisher
private TopicPublisher publisher = null;
//topic
private Topic topic = null;
//This flag indicates whether this test client is closed.
private boolean isClosed = false;
//auto reconnection flag
private boolean autoReconnect = false;
//destination name for this example.
private static final String DURA_TEST_TOPIC = "DuraTestTopic";
//the message counter property name
public static final String MESSAGE_COUNTER = "MESSAGE_COUNTER";
//the message in-doubt-bit property name
public static final String MESSAGE_IN_DOUBT = "MESSAGE_IN_DOUBT";
/**
* Constructor. Get imqReconnectEnabled property value from
* System property.
*/
public FailoverProducer () {
try {
autoReconnect =
Boolean.getBoolean(ConnectionConfiguration.imqReconnectEnabled);
} catch (Exception e) {
this.printException(e);
}
}
/**
* Connection is broken if this handler is called.
* If autoReconnect flag is true, this is called only
* if no more retries from MQ.
*/
public void onException (JMSException jmse) {
this.printException (jmse);
}
/**
* create MQ connection factory.
* @throws JMSException
*/
private void initFactory() throws JMSException {
//get connection factory
factory = new com.sun.messaging.TopicConnectionFactory();
}
/**
* JMS setup. Create a Connection,Session, and Producer.
*
* If any of the JMS object creation fails (due to system failure),
* it retries until it succeeds.
*
*/
private void initProducer() {
boolean isConnected = false;
while ( isClosed == false && isConnected == false ) {
try {
println("producer client creating connection ...");
//create connection
pconn = factory.createTopicConnection();
//set connection exception listener
pconn.setExceptionListener(this);
//create topic session
psession = pconn.createTopicSession(false,
Session.CLIENT_ACKNOWLEDGE);
//get destination
topic = psession.createTopic(DURA_TEST_TOPIC);
//publisher
publisher = psession.createPublisher(topic);
//set flag to true
isConnected = true;
println("producer ready.");
}
catch (Exception e) {
println("*** connect failed ... sleep for 5 secs.");
try {
//close resources.
if ( pconn != null ) {
pconn.close();
}
//pause 5 secs.
Thread.sleep(5000);
} catch (Exception e1) {
;
}
}
}
}
/**
* Start test. This sends JMS messages in a loop (forever).
*/
public void run () {
try {
//create MQ connection factory.
initFactory();
//create JMS connection,session, and producer
initProducer();
//send messages forever.
sendMessages();
} catch (Exception e) {
this.printException(e);
}
}
/**
* Send persistent messages to a topic forever. This shows how
* to handle failover for a message producer.
*/
private void sendMessages() {
//this is set to true if send failed.
boolean messageInDoubt = false;
//message to be sent
TextMessage m = null;
//msg counter
long msgcount = 0;
while (isClosed == false) {
try {
/**
* create a text message
*/
m = psession.createTextMessage();
/**
* the MESSAGE_IN_DOUBT bit is set to true if
* you get an exception for the last message.
*/
if ( messageInDoubt == true ) {
m.setBooleanProperty (MESSAGE_IN_DOUBT, true);
messageInDoubt = false;
println("MESSAGE_IN_DOUBT bit is set to true
for msg: " + msgcount);
} else {
m.setBooleanProperty (MESSAGE_IN_DOUBT, false);
}
//set message counter
m.setLongProperty(MESSAGE_COUNTER, msgcount);
//set message body
m.setText("msg: " + msgcount);
//send the msg
publisher.send(m, DeliveryMode.PERSISTENT, 4, 0);
println("sent msg: " + msgcount);
/**
* reset counetr if reached max long value.
*/
if (msgcount == Long.MAX_VALUE) {
msgcount = 0;
println ("Reset message counter to 0.");
}
//increase counter
msgcount ++;
Thread.sleep(1000);
} catch (Exception e) {
if ( isClosed == false ) {
//set in doubt bit to true.
messageInDoubt = true;
this.printException(e);
//init producer only if auto reconnect is false.
if ( autoReconnect == false ) {
this.initProducer();
}
}
}
}
}
/**
* Close this example program.
*/
public synchronized void close() {
try {
isClosed = true;
pconn.close();
notifyAll();
} catch (Exception e) {
this.printException(e);
}
}
/**
* print the specified exception.
* @param e the exception to be printed.
*/
private void printException (Exception e) {
System.out.println(new Date().toString());
e.printStackTrace();
}
/**
* print the specified message.
* @param msg the message to be printed.
*/
private void println (String msg) {
System.out.println(new Date() + ": " + msg);
}
/**
* Main program to start this example.
*/
public static void main (String args[]) {
FailoverProducer fp = new FailoverProducer();
fp.run();
}
}
Failover Consumer Example
The following code sample, FailoverConsumer,
illustrates good coding
practices for handling exceptions during a failover. The transacted
session is able to receive messages forever. The program sets the auto
reconnect property to true, requiring the Message Queue runtime to
automatically perform a reconnect when the connected broker fails or is
killed. It is designed to work with the dura.example.FailoverProducer
,
shown in the previous section.
To run this program enter the following command.
java dura.example.FailoverConsumer
/*
* @(#)FailoverConsumer.java 1.1 06/06/09
* Copyright (c) 2000, 2019 Oracle and/or its licensees. All Rights Reserved.
* This program and the accompanying materials are made available under the
* terms of the Eclipse Distribution License v. 1.0, which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*
*/
package dura.example;
import java.util.Date;
import jakarta.jms.Destination;
import jakarta.jms.ExceptionListener;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.Connection;
import jakarta.jms.MessageConsumer;
import jakarta.jms.Session;
import jakarta.jms.Topic;
import jakarta.jms.TransactionRolledBackException;
import com.sun.messaging.ConnectionConfiguration;
public class FailoverConsumer implements ExceptionListener, Runnable {
//JMS connection
private Connection conn = null;
//JMS session
private Session session = null;
//JMS Message consumer
private MessageConsumer messageConsumer = null;
//JMS destination.
private Destination destination = null;
//flag indicates whether this program should continue running.
private boolean isConnected = false;
//destination name.
private static final String DURA_TEST_TOPIC = "DuraTestTopic";
//the commit counter, for information only.
private long commitCounter = 0;
/**
* message counter property set by the producer.
*/
public static final String MESSAGE_COUNTER = "MESSAGE_COUNTER";
/**
* Message in doubt bit set by the producer
*/
public static final String MESSAGE_IN_DOUBT = "MESSAGE_IN_DOUBT";
/**
* receive time out
*/
public static final long RECEIVE_TIMEOUT = 0;
/**
* Default constructor -
* Set up JMS Environment.
*/
public FailoverConsumer() {
setup();
}
/* Connection Exception listener. This is called when connection
* breaks and no reconnect attempts are performed by MQ client runtime.
*/
public void onException (JMSException e) {
print ("Reconnect failed. Shutting down the connection ...");
/**
* Set this flag to false so that the run() method will exit.
*/
this.isConnected = false;
e.printStackTrace();
}
/**
* Best effort to roll back a jms session. When a broker crashes, an
* open-transaction should be rolled back. But the re-started broker
* may not have the uncommitted tranaction information due to system
* failure. In a situation like this, an application can just quit
* calling rollback after retrying a few times The uncommitted
* transaction (resources) will eventually be removed by the broker.
*/
private void rollBackJMS() {
//rollback fail count
int failCount = 0;
boolean keepTrying = true;
while ( keepTrying ) {
try {
print ("<<< rolling back JMS ...., consumer commit counter:
" + this.commitCounter);
session.rollback();
print("<<< JMS rolled back ...., consumer commit counter:
" + this.commitCounter);
keepTrying = false;
} catch (JMSException jmse) {
failCount ++;
jmse.printStackTrace();
sleep (3000); //3 secs
if ( failCount == 1 ) {
print ("<<< rollback failed : total count" + failCount);
keepTrying = false;
}
}
}
}
/**
* Close the JMS connection and exit the program.
*
*/
private void close() {
try {
if ( conn != null ) {
conn.close();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
this.isConnected = false;
}
}
/*Receive messages in a loop until closed.*/
public void run () {
while (isConnected) {
try {
/*receive message with specified timeout.*/
Message m = messageConsumer.receive(RECEIVE_TIMEOUT);
/* process the message. */
processMessage(m);
/* commit JMS transaction. */
this.commit();
/*increase the commit counter.*/
this.commitCounter ++;
} catch (TransactionRolledBackException trbe) {
/**
* the transaction is rolled back
* a new transaction is automatically started.
*/
trbe.printStackTrace();
} catch (JMSException jmse) {
/* The transaction is in unknown state.
* We need to roll back the transaction.*/
jmse.printStackTrace();
/* roll back if not closed.
*/
if ( this.isConnected == true ) {
this.rollBackJMS();
}
} catch (Exception e) {
e.printStackTrace();
/* Exit if this is an unexpected Exception.
*/
this.close();
} finally {
;//do nothing
}
}
print(" <<< consumer exit ...");
}
/*Set up connection, destination, etc*/
/
protected void setup() {
try {
//create connection factory
com.sun.messaging.ConnectionFactory factory =
new com.sun.messaging.ConnectionFactory();
//set auto reconnect to true.
factory.setProperty(ConnectionConfiguration.imqReconnectEnabled, "true");
//A value of -1 will retry forever if connection is broken.
factory.setProperty(ConnectionConfiguration.imqReconnectAttempts, "-1");
//retry interval - every 10 seconds
factory.setProperty(ConnectionConfiguration.imqReconnectInterval, "10000");
//create connection
conn = factory.createConnection();
//set client ID
conn.setClientID(DURA_TEST_TOPIC);
//set exception listener
conn.setExceptionListener(this);
//create a transacted session
session = conn.createSession(true, -1);
//get destination
destination = session.createTopic(DURA_TEST_TOPIC);
//message consumer
messageConsumer = session.createDurableSubscriber((Topic)destination,
DURA_TEST_TOPIC);
//set flag to true
this.isConnected = true;
//we are ready, start the connection
conn.start();
print("<<< Ready to receive on destination: " + DURA_TEST_TOPIC);
} catch (JMSException jmse) {
this.isConnected = false;
jmse.printStackTrace();
this.close();
}
}
/**
* Process the received message message.
* This prints received message counter.
* @param m the message to be processed.
*/
private synchronized void processMessage(Message m) {
try {
//in this example, we do not expect a timeout, etc.
if ( m == null ) {
throw new RuntimeException ("<<< Received null message.
Maybe reached max time out. ");
}
//get message counter property
long msgCtr = m.getLongProperty (MESSAGE_COUNTER);
//get message in-doubt bit
boolean indoubt = m.getBooleanProperty(MESSAGE_IN_DOUBT);
if ( indoubt) {
print("<<< received message: " + msgCtr + ", indoubt bit is true");
} else {
print("<<< received message: " + msgCtr);
}
} catch (JMSException jmse) {
jmse.printStackTrace();
}
}
/**
* Commit a JMS transaction.
* @throws JMSException
*/
private void commit() throws JMSException {
session.commit();
}
/**
* Sleep for the specified time.
* @param millis sleep time in milli-seconds.
*/
private void sleep (long millis) {
try {
Thread.sleep(millis);
} catch (java.lang.InterruptedException inte) {
print (inte);
}
}
/**
* Print the specified message.
* @param msg the message to be printed.
*/
private static void print (String msg) {
System.out.println(new Date() + ": " + msg);
}
/**
* Print Exception stack trace.
* @param e the exception to be printed.
*/
private static void print (Exception e) {
System.out.print(e.getMessage());
e.printStackTrace();
}
/**
* Start this example program.
*/
public static void main (String args[]) {
FailoverConsumer fc = new FailoverConsumer();
fc.run();
}
}
Message Queue supports the standard JMS acknowledgment modes (auto-acknowledge, client-acknowledge, and dups-OK-acknowledge). When you create a session for a consumer, you can specify one of these modes. Your choice will affect whether acknowledgment is done explicitly (by the client application) or implicitly (by the session) and will also affect performance and reliability. This section describes additional options you can use to customize acknowledgment behavior:
You can customize the JMS client-acknowledge mode to acknowledge one message at a time.
If performance is key and reliability is not a concern, you can use the proprietary no-acknowledge mode to have the broker consider a message acknowledged as soon as it has been sent to the consuming client.
The following sections explain how you program these options.
For more flexibility, Message Queue lets you customize the JMS
client-acknowledge mode. In client-acknowledge mode, the client
explicitly acknowledges message consumption by invoking the
acknowledge()
method of a message object. The standard behavior of
this method is to cause the session to acknowledge all messages that
have been consumed by any consumer in the session since the last time
the method was invoked. (That is, the session acknowledges the current
message and all previously unacknowledged messages, regardless of who
consumed them.)
In addition to the standard behavior specified by JMS, Message Queue lets you use client-acknowledge mode to acknowledge one message at a time.
Observe the following rules when implementing custom client acknowledgment:
To acknowledge an individual message, call the
acknowledgeThisMessage()
method. To acknowledge all messages consumed
so far, call the acknowledgeUpThroughThisMessage()
method. Both are
shown in the following code example.
public interface com.sun.messaging.jms.Message { void acknowledgeThisMessage() throws JMSException; void acknowledgeUpThroughThisMessage() throws JMSException; }
When you compile the resulting code, include both imq.jar
and
jms.jar
in the class path.
Don’t call acknowledge()
, acknowledgeThisMessage()
, or
acknowledgeUpThroughThisMessage()
in any session except one that uses
client-acknowledge mode. Otherwise, the method call is ignored.
Don’t use custom acknowledgment in transacted sessions. A transacted session defines a specific way to have messages acknowledged.
If a broker fails, any message that was not acknowledged successfully
(that is, any message whose acknowledgment ended in a JMSException
) is
held by the broker for delivery to subsequent clients.
Example 2-6 demonstrates both types of custom client acknowledgment.
Example 2-6 Example of Custom Client Acknowledgment Code
...
import jakarta.jms.*;
...[Look up a connection factory and create a connection.]
Session session = connection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
...[Create a consumer and receive messages.]
Message message1 = consumer.receive();
Message message2 = consumer.receive();
Message message3 = consumer.receive();
...[Process messages.]
...[Acknowledge one individual message.
Notice that the following acknowledges only message 2.]
((com.sun.messaging.jms.Message)message2).acknowledgeThisMessage();
...[Continue. Receive and process more messages.]
Message message4 = consumer.receive();
Message message5 = consumer.receive();
Message message6 = consumer.receive();
...[Acknowledge all messages up through message 4. Notice that this
acknowledges messages 1, 3, and 4, because message 2 was acknowledged
earlier.]
((com.sun.messaging.jms.Message)message4).acknowledgeUpThroughThisMessage();
...[Continue. Finally, acknowledge all messages consumed in the session.
Notice that this acknowledges all remaining consumed messages, that is,
messages 5 and 6, because this is the standard behavior of the JMS API.]
message5.acknowledge();
No-acknowledge mode is a nonstandard extension to the JMS API. Normally, the broker waits for a client acknowledgment before considering that a message has been acknowledged and discarding it. That acknowledgment must be made programmatically if the client has specified client-acknowledge mode or it can be made automatically, by the session, if the client has specified auto-acknowledge or dups-OK-acknowledge. If a consuming client specifies no-acknowledge mode, the broker discards the message as soon as it has sent it to the consuming client. This feature is intended for use by nondurable subscribers consuming nonpersistent messages, but it can be used by any consumer.
Using this feature improves performance by reducing protocol traffic and broker work involved in acknowledging a message. This feature can also improve performance for brokers dealing with misbehaving clients who do not acknowledge messages and therefore tie down broker memory resources unnecessarily. Using this mode has no effect on producers.
You use this feature by specifying NO_ACKNOWLEDGE
for the
acknowledgeMode
parameter to the createSession
,
createQueueSession
, or createTopicSession
method. No-acknowledge
mode must be used only with the connection methods defined in the
com.sun.messaging.jms
package. Note however that the connection itself
must be created using the jakarta.jms
package.
The following are sample variable declarations for connection
,
queueConnection
and topicConnection
:
jakarta.jms.connection Connection;
jakarta.jms.queueConnection queueConnection
jakarta.jms.topicConnection topicConnection
The following are sample statements to create different kinds of no-acknowledge sessions:
//to create a no ack session
Session noAckSession =
((com.sun.messaging.jms.Connection)connection)
.createSession(com.sun.messaging.jms.Session.NO_ACKNOWLEDGE);
// to create a no ack topic session
TopicSession noAckTopicSession =
((com.sun.messaging.jms.TopicConnection) topicConnection)
.createTopicSession(com.sun.messaging.jms.Session.NO_ACKNOWLEDGE);
//to create a no ack queue session
QueueSession noAckQueueSession =
((com.sun.messaging.jms.QueueConnection) queueConnection)
.createQueueSession(com.sun.messaging.jms.Session.NO_ACKNOWLEDGE);
Specifying no-acknowledge mode for a session results in the following behavior:
The client runtime will throw a JMSException
if Session.recover()
is called.
The client runtime will ignore a call to the Message.acknowledge()
method from a consumer.
Messages can be lost. As opposed to dups-OK-acknowledge
, which can
result in duplicate messages being sent, no-acknowledge mode bypasses
checks and balances built into the system and may result in message
loss.
This Message Queue feature enables validation of the content of a text (not object) XML message against an XML schema at the point the message is sent to the broker.
When XML validation is enabled, the Message Queue client runtime will attempt to validate an XML message against specified XSDs before sending the message to the broker. The location of the XML schema (XSD) is specified as a property of a Message Queue destination. If the specified schema cannot be located or the message cannot be validated, the message is not sent, and an exception is thrown.
If no XSD location is specified, the DTD declaration within the XML document is used to perform DTD validation. (XSD validation, which includes data type and value range validation, is more rigorous than DTD validation.)
Client applications using this feature should upgrade Java SE version to JRE 1.5 or above.
XML schema validation is enabled using the following physical
destination properties: validateXMLSchemaEnabled
, XMLSchemaURIList
,
and reloadXMLSchemaOnFailure
. These properties are described in
"Physical Destination Property Reference" in Open
Message Queue Administration Guide. The property values can be set at
destination creation or update time by using the imqcmd create dst
or
imqcmd update dst
command, respectively. The XML validation properties
should be set when a destination is inactive: that is, when it has no
consumers and producers, and when there are no messages in the
destination.
If any of the XML validation properties are set while a destination is active (for example, if a producer is connected to the destination), the change will not take effect until the producer reconnects to the broker. Similarly, if an XSD is changed, as a result of changing application requirements, all client applications producing XML messages based on the changed XSD must reconnect to the broker.
If the reloadXMLSchemaOnFailure
property is set to true
and XML
validation fails, then the Message Queue client runtime will attempt to
reload the XSD before attempting again to validate a message. The client
runtime will throw an exception if the validation fails using the
reloaded XSD.
Message Queue supports C clients as message producers and consumers.
A Java client consuming messages sent by a C client faces only one restriction: a C client cannot be part of a distributed transaction, and therefore a Java client receiving a message from a C client cannot participate in a distributed transaction either.
A Java client producing messages for a consuming C client must be aware of the following differences in the Java and C interfaces because these differences will affect the C client’s ability to consume messages: C clients
Can only consume messages of type text
and bytes
Cannot consume messages whose body has been compressed
Cannot participate in distributed transactions
Cannot receive SOAP messages
This section describes support for client runtime logging of connection and session-related events.
JDK 1.4 (and above) includes the java.util.logging
library. This
library implements a standard logger interface that can be used for
application-specific logging.
The Message Queue client runtime uses the Java Logging API to implement its logging functions. You can use all the J2SE 1.4 logging facilities to configure logging activities. For example, an application can use the following Java logging facilities to configure how the Message Queue client runtime outputs its logging information:
Logging Handlers
Logging Filters
Logging Formatters
Logging Level
For more information about the Java Logging API, please see the Java
Logging Overview at
http://download.oracle.com/javase/1.4.2/docs/guide/util/logging/overview.html
The Message Queue provider defines a set of logging name spaces associated with logging levels and logging activities that allow Message Queue clients to log connection and session events when a logging configuration is appropriately set.
The root logging name space for the Message Queue client runtime is
defined as jakarta.jms
. All loggers in the Message Queue client runtime
use this name as the parent name space.
The logging levels used for the Message Queue client runtime are the
same as those defined in the java.util.logging.Level
class. This class
defines seven standard log levels and two additional settings that you
can use to turn logging on and off.
OFF
Turns off logging.
SEVERE
Highest priority, highest value. Application-defined.
WARNING
Application-defined.
INFO
Application-defined.
CONFIG
Application-defined
FINE
Application-defined.
FINER
Application-defined.
FINEST
Lowest priority, lowest value. Application-defined.
ALL
Enables logging of all messages.
In general, exceptions and errors that occur in the Message Queue client
runtime are logged by the logger with the jakarta.jms
name space.
Exceptions thrown from the JVM and caught by the client runtime, such
as IOException
, are logged by the logger with the logging name space
jakarta.jms
at level WARNING
.
JMS exceptions thrown from the client runtime, such as
IllegalStateException
, are logged by the logger with the logging name
space jakarta.jms
at level FINER
.
Errors thrown from the JVM and caught by the client runtime, such as
OutOfMemoryError
, are logged by the logger with the logging name space
jakarta.jms
at level SEVERE
.
The following tables list the events that can be logged and the log level that must be set to log events for JMS connections and for sessions.
The following table describes log levels and events for connections.
Table 2-7 Log Levels and Events for jakarta.jms.connection
Name Space
Log Level | Events |
---|---|
|
Connection created |
|
Connection started |
|
Connection closed |
|
Connection broken |
|
Connection reconnected |
|
Miscellaneous connection activities such as |
|
Messages, acknowledgments, Message Queue action and control messages (like committing a transaction) |
For sessions, the following information is recorded in the log record.
Each log record for a message delivered to a consumer includes ConnectionID, SessionID, and ConsumerID.
Each log record for a message sent by a producer includes ConnectionID, SessionID, ProducerID, and destination name.
The table below describes log levels and events for sessions.
Table 2-8 Log Levels and Events for jakarta.jms.session
Name Space
Log Level | Event |
---|---|
|
Session created |
|
Session closed |
|
Producer created |
|
Consumer created |
|
Destination created |
|
Miscellaneous session activities such as committing a session. |
|
Messages produced and consumed. (Message properties and bodies are not logged in the log records.) |
By default, the output log level is inherited from the JRE in which the
application is running. Check the JRE_DIRECTORY/lib/logging.properties
file to determine what that level is.
You can configure logging programmatically or by using configuration files, and you can control the scope within which logging takes place. The following sections describe these possibilities.
The following example shows how you set logging name spaces and levels
in the JRE_DIRECTORY/lib/logging.properties
file, which is used to set
the log level for the Java runtime environment. All applications using
this JRE will have the same logging configuration. The sample
configuration below sets the logging level to INFO
for the
jakarta.jms.connection
name space and specifies that output be written
to java.util.logging.ConsoleHandler
.
#logging.properties file.
# "handlers" specifies a comma separated list of log Handler
# classes. These handlers will be installed during VM startup.
# Note that these classes must be on the system classpath.
# By default we only configure a ConsoleHandler, which will only
# show messages at the INFO and above levels.
handlers= java.util.logging.ConsoleHandler
# Default global logging level.
# This specifies which kinds of events are logged across
# all loggers. For any given facility this global level
# can be overriden by a facility-specific level.
# Note that the ConsoleHandler also has a separate level
# setting to limit messages printed to the console.
.level= INFO
# Limit the messages that are printed on the console to INFO and above.
java.util.logging.ConsoleHandler.level = INFO
java.util.logging.ConsoleHandler.formatter =
java.util.logging.SimpleFormatter
# The logger with jakarta.jms.connection name space will write
# Level.INFO messages to its output handler(s). In this configuration
# the ouput handler is set to java.util.logging.ConsoleHandler.
jakarta.jms.connection.level = INFO
You can also define a logging configuration file from the java command
line that you use to run an application. The application will use the
configuration defined in the specified logging file. In the following
example, configFile
uses the same format as defined in the
JRE_DIRECTORY/lib/logging.properties
file.
java -Djava.util.logging.config.file=configFile MQApplication
The following code uses the java.util.logging
API to log connection
events by changing the jakarta.jms.connection
name space log level to
FINE
. You can include such code in your application to set logging
configuration programmatically.
import java.util.logging.*;
//construct a file handler and output to the mq.log file
//in the system's temp directory.
Handler fh = new FileHandler("%t/mq.log");
fh.setLevel (Level.FINE);
//Get Logger for "jakarta.jms.connection" domain.
Logger logger = Logger.getLogger("jakarta.jms.connection");
logger.addHandler (fh);
//jakarta.jms.connection logger would log activities
//with level FINE and above.
logger.setLevel (Level.FINE);
Previous | Next | Contents |