Message Queues - 1.4
Extension ID
com.castsoftware.mqe
What’s new?
Please see Release Notes for more information.
Description
This extension should be installed when analyzing projects containing Message Queue applications, and you want to view a transaction consisting of queue calls and queue receive objects with their corresponding links.
Messaging library coverage
Apache ActiveMQ
Library name | Version Supported | Java package fullname |
---|---|---|
ActiveMQ :: Client | 5.8.x to 6.1.x | org.apache.activemq |
ActiveMQ Artemis Client Core | 1.0.x to 2.37.x | org.apache.activemq.artemis.api.core.client |
ActiveMQ Artemis JMS Client | 1.0.x to 2.37.x | org.apache.activemq.artemis.jms.client |
ActiveMQ Artemis JMS Client - standard JMS API | 1.0.x to 2.37.x | org.apache.activemq.artemis.api.jms |
Spring JMS : Java API + XML based configuration | 2.0.x to 6.1.x | org.springframework.jms.core |
JMS :: Spring Boot | 1.4.x to 3.3.x | org.springframework.boot.autoconfigure.jms.activemq org.springframework.boot.autoconfigure.jms.artemis |
STOMP | 1.0, 1.1, 1.2 | org.apache.activemq.transport.stomp org.springframework.messaging.simp org.springframework.messaging.simp.stomp |
IBM MQ
Library name | Version Supported | Java package fullname |
---|---|---|
IBM MQ Java client API (MQI) | 6.0, 7.x, 8.0, 9.x | com.ibm.mq |
IBM MQ Java client API (JMS) | 6.0, 7.x, 8.0, 9.x | com.ibm.mq.jms |
Spring JMS : Java API + XML and Annotation based configuration | 3.0 to 6.1.x | org.springframework.jms.core |
RabbitMQ
Library name | Version Supported | Java package fullname |
---|---|---|
RabbitMQ Java Client (AMQP) | <=3.11 | com.rabbitmq.client |
Spring AMQP | 1.0.x to 3.1.x | org.springframework.amqp |
Spring RabbitMQ Support and Spring Rabbit with XML based configuration | - | org.springframework.amqp.rabbit.core org.springframework.amqp.rabbit.core.RabbitTemplate org.springframework.amqp.core.AmqpTemplate |
Spring Boot Starter AMQP | 1.0.x to 3.3.x | org.springframework.boot.autoconfigure.amqp |
JMS
Library name | Version Supported | Java package fullname |
---|---|---|
JMS API | 1.0.2, 1.1, 2.0 for JEE 1.3 to 8.0 | javax.jms |
Spring Boot Actuate JMS | - | org.springframework.boot.actuate.autoconfigure.jms |
Spring Boot Starter JMS | - | org.springframework.boot.autoconfigure.jms |
Jakarta Messaging
Library name | Version Supported | Java package fullname |
---|---|---|
Jakarta Messaging API | 2.x, 3.x for Jakarta 8.0 to 11.0 | jakarta.jms |
Apache Kafka
Library name | Version Supported | Java package fullname |
---|---|---|
Apache Kafka client | 1.0.x to 3.8.x | org.apache.kafka.clients |
Spring for Apache Kafka | 1.3.x to 3.2.x | org.springframework.kafka |
Spring Boot Starter Kafka | - | org.springframework.kafka.annotation.KafkaListener |
Amazon Simple Queue Service (SQS)
Library name | Version Supported | Java package fullname |
---|---|---|
AWS Java SDK For Amazon SQS v1 | 1.9.x to 1.12.x | com.amazonaws.services.sqs com.amazon.sqs.javamessaging |
AWS Java SDK For Amazon SQS v2 | 1.9.x to 1.12.x | software.amazon.awssdk.services.sqs |
Compatibility
Release | Operating System | Supported |
---|---|---|
v3/8.4.x | Microsoft Windows / Linux | ✅ |
v2/8.3.x | Microsoft Windows | ✅ |
Dependencies with other extensions
Some CAST extensions require the presence of other CAST extensions in order to function correctly. The Message Queue extension requires that the following other CAST extensions are also installed:
- Universal Linker
- CAST AIP Internal extension (internal technical extension)
What results can you expect?
Objects
Icons | Description |
---|---|
|
|
|
|
|
|
|
|
|
Links
Messaging System | Link Type | Caller Object | Callee Object |
---|---|---|---|
IBM MQ | Calllink at analyzer level | Producer method IBM MQ Java Queue Consumer Producer method IBM MQ Java Topic Subscriber |
IBM MQ Java Queue Producer Consumer method IBM MQ Java Topic Publisher Consumer method |
IBM MQ | Calllink at application level by Universal Linker | IBM MQ Java Queue Producer IBM MQ Java Topic Publisher |
IBM MQ Java Queue Consumer IBM MQ Java Topic Subscriber |
RabbitMQ | Calllink at analyzer level | Producer method RabbitMQ Java Queue Subscriber |
RabbitMQ Java Queue Producer Consumer method |
RabbitMQ | Calllink at application level by Universal Linker | RabbitMQ Java Queue Publisher | RabbitMQ Java Queue Subscriber |
JMS | Calllink at analyzer level | Producer method JMS Java Queue Consumer Producer method JMS Java Topic Consumer |
JMS Java Queue Producer Consumer method JMS Java Topic Producer Consumer method |
JMS | Calllink at application level by Universal Linker | JMS Java Queue Producer JMS Java Topic Producer |
JMS Java Queue Consumer JMS Java Topic Consumer |
Apache Kafka | Calllink at analyzer level | Producer method JMS Java Topic Consumer |
JMS Java Topic Producer Consumer method |
Apache Kafka | Calllink at application level by Universal Linker | JMS Java Topic Producer | JMS Java Topic Consumer |
AWS-SQS | Calllink at analyzer level | Producer method Java AWS Simple Queue Service Consumer |
Java AWS Simple Queue Service Producer Consumer method |
AWS-SQS | Calllink at application level by Universal Linker | Java AWS Simple Queue Service Producer | Java AWS Simple Queue Service Consumer |
STOMP | Calllink at analyzer level | Producer method STOMP Java Queue Consumer Producer method STOMP Java Topic Consumer |
STOMP Java Queue Producer Consumer method STOMP Java Topic Producer Consumer method |
STOMP | Calllink at application level by Universal Linker | STOMP Java Queue Producer STOMP Java Topic Producer |
STOMP Java Queue Consumer STOMP Java Topic Consumer |
JMS with ActiveMQ
Example of JMS with ActiveMQ (Spring-XML)
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<!-- JmsTemplate Definition -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
<property name="defaultDestination" ref="destinationQueue" />
<property name="messageConverter" ref="myMessageConverter" />
</bean>
<bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<constructor-arg index="0" value="tcp://localhost:61616" />
</bean>
<!-- ConnectionFactory Definition -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<constructor-arg ref="amqConnectionFactory" />
</bean>
<bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg index="0" value="IN_QUEUE" />
</bean>
<bean id="SampleJmsMessageSender" class="com.baeldung.spring.jms.SampleJmsMessageSender">
<property name="queue" ref="destinationQueue" />
<property name="jmsTemplate" ref="jmsTemplate" />
</bean>
<bean id="myMessageConverter" class="com.baeldung.spring.jms.SampleMessageConverter" />
<!-- this is the Message-Driven POJO (MDP) -->
<bean id="messageListener" class="com.baeldung.spring.jms.SampleListener">
<property name="jmsTemplate" ref="jmsTemplate" />
<property name="queue" ref="destinationQueue" />
</bean>
<bean id="errorHandler" class="com.baeldung.spring.jms.SampleJmsErrorHandler" />
<!-- and this is the message listener container -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destinationName" value="IN_QUEUE" />
<property name="messageListener" ref="messageListener" />
<property name="errorHandler" ref="errorHandler" />
</bean>
</beans>
Example of JMS with ActiveMQ Producer convertAndSend API - Queue is stored in XML file
private JmsTemplate jmsTemplate;
private Queue queue;
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
public void setQueue(Queue queue) {
this.queue = queue;
}
public void sendMessage(final Employee employee) {
this.jmsTemplate.convertAndSend(employee);
}
Example of JMS with ActiveMQ Producer send API - Queue is stored in XML file
private JmsTemplate jmsTemplate;
private Queue queue;
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
public void setQueue(Queue queue) {
this.queue = queue;
}
public void simpleSend() {
jmsTemplate.send(queue, s -> s.createTextMessage("hello queue world"));
}
Example of JMS with ActiveMQ Consumer (Springboot)
public class OrderConsumer {
public static final String ORDER_QUEUE = "Queue_Anno";
private static Logger log = LoggerFactory.getLogger(OrderConsumer.class);
Order received;
private CountDownLatch countDownLatch;
@JmsListener(destination = ORDER_QUEUE)
public void receiveMessage(@Payload Order order,
@Headers MessageHeaders headers,
Message message, Session session) {
}
}
Example of JMS with ActiveMQ - JNDI is used to store Queue
public QBorrower() throws NamingException, JMSException {
Context ctx=new InitialContext();
QueueConnectionFactory connectionFactory=(QueueConnectionFactory)ctx.lookup("ConnectionFactory");
queueConnection=connectionFactory.createQueueConnection();
requestQueue=(Queue)ctx.lookup("jms.LoanRequestQueue");
responseQueue=(Queue)ctx.lookup("jms.LoanResponseQueue");
queueConnection.start();
queueSession=queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
}
private void sendLoanRequest(double salary,double loanAmount) throws JMSException {
MapMessage message=queueSession.createMapMessage();
message.setDoubleProperty("salary", salary);
message.setDoubleProperty("loanAmount", loanAmount);
message.setJMSReplyTo(responseQueue);
QueueSender sender=queueSession.createSender(requestQueue);
QueueReceiver queueReceiver=queueSession.createReceiver(responseQueue);
sender.send(message);
}
In order to recognize that ActiveMQ is analyzed, the created objects have the properties CAST_RabbitMQ_Queue.exchangeName for topic and CAST_MQE_QueueCall.messengingSystem for queue set to ActiveMQ value.
IBM MQ
Example of IBM MQ Producer and Consumer (Plain Java)
com.ibm.mq .MQDestination.put and com.ibm.mq .MQDestination.get APIs are associated with com.ibm.mq .MQQueueManager.accessQueue and com.ibm.mq .MQQueueManager.accessTopic APIs. Here is an example with accessQueue API which indicates the name of the queue where the message is sent.
public static void main(String args[]) {
int openOptions = MQC.MQOO_INQUIRE + MQC.MQOO_FAIL_IF_QUIESCING + MQC.MQOO_INPUT_SHARED;
MQQueue q = qMgr.accessQueue("SYSTEM.DEFAULT.LOCAL.QUEUE",openOptions,null,null,null);
MQMessage mBuf = new MQMessage();
MQPutMessageOptions pmo = new MQPutMessageOptions();
do {
runShow = br.readLine();
if (runShow.length() > 0) {
mBuf.clearMessage(); // reset the buffer
mBuf.correlationId = 1; // set correlationId
mBuf.messageId = 1; // set messageId
mBuf.writeString(runShow); // set actual message
System.out.println("--> writing message to queue");
q.put(mBuf,pmo); // put the message out on the queue
}
} while (runShow.length() > 0);
q.close();
qMgr.disconnect();
}
} catch (MQException ex) {
System.out.println(
"WMQ exception occurred : Completion code ");
}
}
private void read() throws MQException
{
MQQueue queue = _queueManager.accessQueue( inputQName,
openOptions,
null, // default q manager
null, // no dynamic q name
null ); // no alternate user id
MQGetMessageOptions getOptions = new MQGetMessageOptions();
getOptions.options = MQC.MQGMO_NO_WAIT + MQC.MQGMO_FAIL_IF_QUIESCING + MQC.MQGMO_CONVERT;
while(true)
{
MQMessage message = new MQMessage();
try
{
queue.get(message, getOptions);
byte[] b = new byte[message.getMessageLength()];
message.readFully(b);
System.out.println(new String(b));
message.clearMessage();
}
}
queue.close();
_queueManager.disconnect();
}
Example of IBM MQ Producer (JMS Interface)
public int sendMessage(int type, String msg) {
System.out.println("sendMessage type "+type);
System.out.println("msg = "+msg);
if(type == TYPE_CAP)
{
port=1414;
queueManager="QM1";
queueName="IBM_QUEUE_1";
}
else if(type == TYPE_MEASURE)
{
port=1415;
queueManager="QM2";
queueName="IBM_QUEUE_2";
}
else if(type == TYPE_WOOUT)
{
port=1415;
queueManager="QM3";
queueName="IBM_QUEUE_3";
}
else
return -1;
System.out.println(port+","+queueManager+","+queueName);
int status = 200;
MQQueueConnectionFactory cf = null;
MQQueueConnection connection = null;
MQQueueSession session = null;
MQQueue queue = null;
MQQueueSender sender = null;
try {
cf = new MQQueueConnectionFactory();
cf.setHostName(host);// host
cf.setPort(port);// port
cf.setTransportType(1);// JMSC.MQJMS_TP_CLIENT_MQ_TCPIP
cf.setQueueManager(queueManager);// queue
cf.setChannel(channel);// channel
connection = (MQQueueConnection) cf.createQueueConnection();
session = (MQQueueSession) connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
queue = (MQQueue) session.createQueue(queueName);// queue
// name
sender = (MQQueueSender) session.createSender(queue);
JMSTextMessage message = (JMSTextMessage) session.createTextMessage(msg);
// Start the connection
connection.start();
// DO NOT MAKE LOOP!!!
sender.send(message);
} catch (JMSException e){
e.printStackTrace();
} finally {
try {
sender.close();
} catch (Exception e) {
}
try {
session.close();
} catch (Exception e) {
}
if(connection != null){
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
return status;
}
Example of IBM MQ Topic Producer (JMS Interface)
public class SimplePubSub {
public static void main(String[] args) {
try {
MQTopicConnectionFactory cf = new MQTopicConnectionFactory();
// Config
cf.setHostName("localhost");
cf.setPort(1414);
cf.setTransportType(JMSC.MQJMS_TP_CLIENT_MQ_TCPIP);
cf.setQueueManager("QM_thinkpad");
cf.setChannel("SYSTEM.DEF.SVRCONN");
MQTopicConnection connection = (MQTopicConnection) cf.createTopicConnection();
MQTopicSession session = (MQTopicSession) connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
MQTopic topic = (MQTopic) session.createTopic("topic://foo");
MQTopicPublisher publisher = (MQTopicPublisher) session.createPublisher(topic);
long uniqueNumber = System.currentTimeMillis() % 1000;
JMSTextMessage message = (JMSTextMessage) session.createTextMessage("SimplePubSub "+ uniqueNumber);
// Start the connection
connection.start();
publisher.publish(message);
System.out.println("Sent message:\\n" + message);
publisher.close();
session.close();
connection.close();
System.out.println("\\nSUCCESS\\n");
}
catch (JMSException jmsex) {
System.out.println(jmsex);
System.out.println("\\nFAILURE\\n");
}
catch (Exception ex) {
System.out.println(ex);
System.out.println("\\nFAILURE\\n");
}
}
}
Example of com.ibm.mq.MQQueueManager.put for Queue and Topic destinations
import com.ibm.mq.*;
import com.ibm.mq.constants.MQConstants;
public class MQPutExample {
public static void main(String[] args) {
// MQ Configuration
String queueManagerName = "QM1"; // Queue Manager name
String queueName = "QUEUE1"; // Target queue name (for putting messages to a queue)
String topicName = "topic://MY.TOPIC"; // Target topic string (for publishing messages)
String hostname = "localhost"; // MQ server hostname
int port = 1414; // MQ server port
String channel = "SYSTEM.DEF.SVRCONN"; // MQ channel name
String messageContent = "Hello, MQ!"; // Message content
// Set MQ environment configuration
MQEnvironment.hostname = hostname;
MQEnvironment.port = port;
MQEnvironment.channel = channel;
MQQueueManager queueManager = null;
try {
// Connect to the queue manager
queueManager = new MQQueueManager(queueManagerName);
// Create the message
MQMessage message = new MQMessage();
message.writeString(messageContent);
// Example 1: Put message onto a queue
System.out.println("Putting message onto the queue: " + queueName);
queueManager.put(MQConstants.MQOT_Q, queueName, message);
// Example 2: Publish message to a topic
System.out.println("Publishing message to the topic: " + topicName);
queueManager.put(MQConstants.MQOT_TOPIC, topicName, message);
System.out.println("Message sent successfully to both queue and topic!");
} catch (MQException mqe) {
System.err.println("An MQException occurred: Completion Code = " + mqe.completionCode +
", Reason Code = " + mqe.reasonCode);
} catch (Exception e) {
e.printStackTrace();
} finally {
// Cleanup: Disconnect from the queue manager
if (queueManager != null) {
try {
queueManager.disconnect();
System.out.println("Disconnected from the queue manager.");
} catch (MQException mqe) {
System.err.println("Failed to disconnect: " + mqe.getMessage());
}
}
}
}
}
MQOT_Q constant parameter corresponds to queues.
MQOT_TOPIC constant parameter corresponds to topics.
RabbitMQ
Supported APIs:
Producer | Consumer |
---|---|
org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend org.springframework.amqp.rabbit.core.RabbitTemplate.convertSendAndReceive org.springframework.amqp.rabbit.core.RabbitTemplate.sendAndReceive org.springframework.amqp.core.AmqpTemplate.convertAndSend org.springframework.amqp.rabbit.core.RabbitTemplate.sendAndReceive org.springframework.amqp.rabbit.core.RabbitTemplate.send org.springframework.amqp.core.AmqpTemplate.convertAndSend org.springframework.amqp.core.AmqpTemplate.convertSendAndReceive org.springframework.amqp.core.AmqpTemplate.convertSendAndReceiveAsType org.springframework.amqp.core.AmqpTemplate.send org.springframework.amqp.core.AmqpTemplate.sendAndReceive com.rabbitmq.client.Channel.basicPublish |
org.springframework.amqp.rabbit.core.RabbitTemplate.receiveAndConvert org.springframework.amqp.rabbit.core.RabbitTemplate.receiveAndReply org.springframework.amqp.rabbit.core.RabbitTemplate.receive org.springframework.amqp.core.AmqpTemplate.receive org.springframework.amqp.core.AmqpTemplate.receiveAndReply org.springframework.amqp.core.AmqpTemplate.receiveAndConvert com.rabbitmq.client.Channel.basicConsume |
Configuration APIs |
---|
com.rabbitmq.client.Channel.exchangeDeclare com.rabbitmq.client.Channel.queueBind com.rabbitmq.client.Channel.exchangeBind org.springframework.amqp.core.BindingBuilder.bind org.springframework.amqp.core.BindingBuilder.DestinationConfigurer.to org.springframework.amqp.core.BindingBuilder.DirectExchangeRoutingKeyConfigurer.with org.springframework.amqp.core.BindingBuilder.GenericExchangeRoutingKeyConfigurer.with org.springframework.amqp.core.BindingBuilder.TopicExchangeRoutingKeyConfigurer.with |
Using these configuration APIs, we create three new objects, called configuration objects (RabbitMQ Java Exchange Declaration, RabbitMQ Java Queue Bind, RabbitMQ Java Exchange Bind), which help us link the RabbitMQ Producer and the RabbitMQ Consumer following the linking rules established by Universal Linker.
Starting com.castsoftware.mqe 1.3.0-alpha1 you must use com.castsoftware.wbslinker 1.7.25-funcrel because this new release is based on the new linking protocol for RabbitMQ.
Example of Spring AMQP RabbitMQ Producer
@Service
public class CustomMessageSender {
private static final Logger log = LoggerFactory.getLogger(CustomMessageSender.class);
private final RabbitTemplate rabbitTemplate;
@Autowired
public CustomMessageSender(final RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@Scheduled(fixedDelay = 3000L)
public void sendMessage() {
final CustomMessage message = new CustomMessage("Hello there!", new Random().nextInt(50), false);
log.info("Sending message...");
rabbitTemplate.convertAndSend(MessagingApplication.EXCHANGE_NAME, MessagingApplication.ROUTING_KEY, message);
} }
Example of Spring AMQP RabbitMQ Consumer (support of @RabbitListener annotation)
@Service
public class CustomMessageListener {
private static final Logger log = LoggerFactory.getLogger(CustomMessageListener.class);
@RabbitListener(queues = MessagingApplication.QUEUE_GENERIC_NAME)
public void receiveMessage(final Message message) {
log.info("Received message as generic: {}", message.toString());
}
@RabbitListener(queues = MessagingApplication.QUEUE_SPECIFIC_NAME)
public void receiveMessageSpecific(final CustomMessage customMessage) {
log.info("Received message as specific class: {}", customMessage.toString());
} }
Example of SpringBoot RabbitMQ Exchange-Queue Binding configuration
public class MessagingApplication implements RabbitListenerConfigurer{
public static final String EXCHANGE_NAME = "appExchange";
public static final String QUEUE_GENERIC_NAME = "appGenericQueue";
public static final String QUEUE_SPECIFIC_NAME = "appSpecificQueue";
public static final String ROUTING_KEY = "messages.key";
public static void main(String[] args) {
SpringApplication.run(MessagingApplication.class, args);
}
@Bean
public TopicExchange appExchange() {
return new TopicExchange(EXCHANGE_NAME);
}
@Bean
public Queue appQueueGeneric() {
return new Queue(QUEUE_GENERIC_NAME);
}
@Bean
public Queue appQueueSpecific() {
return new Queue(QUEUE_SPECIFIC_NAME);
}
@Bean
public Binding declareBindingGeneric() {
return BindingBuilder.bind (appQueueGeneric()).to(appExchange()).with(ROUTING_KEY);
}
@Bean
public Binding declareBindingSpecific() {
return BindingBuilder.bind(appQueueSpecific()).to(appExchange()).with(ROUTING_KEY);
}
One to Many: RabbitMQ Topic Exchange bound to two Queues
Configuration objects used by web service linker to do the linking between Producer and Consumer
Example of RabbitMQ Java Exchange Declaration object properties:
Example of RabbitMQ Java Queue Bind object properties:
RabbitMQ basicPublish and exchangeDeclare example with topic-exchange
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) {
Connection connection = null;
Channel channel = null;
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String routingKey = "tp_key";
String message = getMessage(argv);
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
}
catch (Exception e) {
e.printStackTrace();
}
finally {
if (connection != null) {
try {
connection.close();
}
catch (Exception ignore) {}
}
}
}
...
}
public class ReceiveLogsTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = "topic_queue";
if (argv.length < 1) {
System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
System.exit(1);
}
channel.queueBind(queueName, EXCHANGE_NAME, "tp_key");
...
channel.basicConsume(queueName, true, consumer);
}
RabbitMQ Producer object properties:
RabbitMQ Consumer object properties:
RabbitMQ MessageListener with spring xml queue declaration
import javax.jms.*;
public class MessageReceiver implements MessageListener {
public void onMessage(Message message) {
if(message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
String text = textMessage.getText();
System.out.println(String.format("Received: %s",text));
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
<!-- Queues -->
<rabbit:queue id="springQueue" name="spring.queue" auto-delete="true" durable="false"/>
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener queues="springQueue" ref="messageListener"/>
</rabbit:listener-container>
<bean id="messageListener" class="com.ndpar.spring.rabbitmq.MessageHandler"/>
<!-- Bindings -->
<rabbit:fanout-exchange name="amq.fanout">
<rabbit:bindings>
<rabbit:binding queue="springQueue"/>
</rabbit:bindings>
</rabbit:fanout-exchange>
RabbitMQ Queue object properties:
@RabbitListener with @RabbitHandler
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import java.util.Date;
@RabbitListener(queues = "foo")
public class Listerner {
@RabbitHandler
public void process(@Payload String foo) {
System.out.println(new Date() + ": " + foo);
}
}
Example of application where all 3 configuration objects are present
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Send {
private static final String EXCHANGE_NAME = "exchange";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String message = argv.length < 1 ? "info: Hello Worldss!" :
String.join(" ", argv);
channel.basicPublish(EXCHANGE_NAME, "test1", null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
}
import com.rabbitmq.client.*;
public class Subs {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHostName("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String queueName = channel.queueDeclare("queueName", false, false, false, null).getQueue();
String queueName1 = channel.queueDeclare("queueName1", false, false, false, null).getQueue();
String queueName2 = channel.queueDeclare("queueName2", false, false, false, null).getQueue();
String queueName3 = channel.queueDeclare("queueName3", false, false, false, null).getQueue();
String queueName5 = channel.queueDeclare("queueName5", false, false, false, null).getQueue();
channel.exchangeDeclare("exchange", "fanout");
channel.exchangeDeclare("exchange2", "direct");
channel.exchangeDeclare("exchange3", "direct");
channel.exchangeBind("exchange", "exchange2", "test1");
channel.exchangeBind("exchange3", "exchange", "test1");
channel.queueBind(queueName, "exchange", "test1");
channel.queueBind(queueName2, "exchange", "testxx");
channel.queueBind(queueName1, "exchange2", "test1");
channel.queueBind(queueName3, "exchange2", "test5");
channel.queueBind(queueName5, "exchange3", "test1");
channel.queueBind(queueName5, "exchange3", "test2");
System.out.println(" [*] Waiting for logs.");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
channel.basicConsume(queueName1, true, consumer);
channel.basicConsume(queueName2, true, consumer);
channel.basicConsume(queueName3, true, consumer);
channel.basicConsume(queueName5, true, consumer);
System.out.println(" Press [enter] to exit.");
System.in.read();
channel.close();
connection.close();
}
}
Example of RabbitMQ Java Exchange Declaration object properties:
Example of RabbitMQ Java Queue Bind object properties:
Example of RabbitMQ Java Exchange Bind object properties:
JMS
Example of JMS Queue with send and receive patterns using JNDI binding for Queue names defined in beans
public String transmit(String xmlRequest) throws Throwable {
String xmlResponse = null; // Transmit the message and get a response.
String requestQueue = "java:comp/env/ServiceRequestQueue";
String responseQueue = "java:comp/env/ServiceResponseQueue";
JMSDestination messageDest = new JMSDestination(requestQueue, responseQueue);
//19.1 Queue changes end
xmlResponse = messageDest.sendAndReceive(xmlRequest);
}
The sendAndReceive() method:
public String sendAndReceive(String message) throws ServiceException {
String responseXml = null;
QueueConnection connection = null;
QueueSession session = null;
Throwable thrown = null;
try {
// Create a connection and start it.
connection = qcf.createQueueConnection();
connection.start();
// Create a session.
session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
String correlationID = send(message, session);
responseXml = receive(correlationID, session, message);
} catch (ServiceException serviceException ) {
throw serviceException ;
} finally {
// Release resources.
close(session);
close(connection);
}
// Return the response.
return responseXml;
}
The send() method:
public String send(String message, QueueSession session) throws Throwable {
QueueSender sender = null;
try {
// Create the sender queue.
sender = session.createSender(requestQueue);
sender.setTimeToLive(expiry);
TextMessage outMessage = (TextMessage) session.createTextMessage(message);
outMessage.setJMSReplyTo(responseQueue);
outMessage.setJMSCorrelationID(correlationID);
// Override dead message queue with desired response queue
outMessage.setBooleanProperty(Constants.PRESERVE_UNDELIVERED, true);
sender.send(outMessage);
...
}
}
The receive() method:
public String receive(String correlationID, QueueSession session, String message) throws Throwable {
...
QueueReceiver receiver = null;
try {
receiver = session.createReceiver(responseQueue, ...);
TextMessage inMessage = (TextMessage) receiver.receive(timeout);
}
...
}
The XML file where binding is defined:
<session name="ServiceApplication" simple-binding-name="ejb/com/iwm/example/services/ServiceApplication">
<resource-ref name="ServiceRequestQueue" binding-name="jms/ServiceRequestQueue" />
JMS with send and receive patterns using JNDI binding for Queue names defined in beans.
JMS with send and receive patterns using JNDI binding for Queue names not defined in beans.
Example of JMS Topic with publish pattern
public class JMSDestination {
...
requestTopic = 'pub/jms/topic';
public String send(String msg, TopicSession session) throws Throwable
{
TopicPublisher publisher = null;
try
{
...
publisher = session.createPublisher(requestTopic);
publisher.setTimeToLive(expiry);
TextMessage outMsg = session.createTextMessage(msg);
publisher.publish(outMsg);
}
...
}
}
private void main() {
String xmlRq = "messageToSend";
JMSDestination msgDest = new JMSDestination();
String xmlRs = msgDest.send(xmlRq);
}
JMS with publish pattern for Topic
Example of JMS asynchronous messaging
The receive() method from MessageConsumer class alows receiving messages synchronously. When calling this method the message is received or not. The MessageListener interface defines a listener for receiving messages asynchronously. In this case, the onMessage() method will be called when a new message is received at the destination.The listener is registered using the setMessageListener() method from MessageConsumer() class.
private TopicConnection getTopicConnection() throws JMSException, NamingException, FileNotFoundException,
IOException, SQLException
{
try
{
Properties jmsProperties = SenderUtils.loadPropertiesFromFile("jms.properties");
String jTopicName = "topicListener";
final String JMS_FACTORY = "javax.jms.TopicConnectionFactory";
InitialContext ctx = getInitialContext(url);
TopicConnectionFactory tconFactory = (TopicConnectionFactory) ctx.lookup(JMS_FACTORY);
jtcon = tconFactory.createTopicConnection();
jtsession = jtcon.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic jtopic = (Topic) ctx.lookup(jTopicName);
jtopicPublisher = jtsession.createPublisher(jtopic);
TopicSubscriber jtopicSubscriber = jtsession.createSubscriber(jtopic, selectorString, false);
MsgListener jtopicListener = new MsgListener(service);
jtopicSubscriber.setMessageListener(jtopicListener);
jtcon.setExceptionListener(new ExceptionListener()
{
public void onException(JMSException arg0)
{
logger.error("onException invoked for: " + arg0.getMessage());
restartConnection();
}
});
return jtcon;
}
}
JMS with setMessageListener pattern for Topic (asynchronous messaging)
Example of JMS request-reply pattern
In some cases, the JMS client will want the message consumers to reply to a temporary topic or queue set up by the JMS client. When a JMS message consumer receives a message that includes a JMSReplyTo destination, it can reply using that destination. A JMS consumer is not required to send a reply, but in some JMS applications, clients are programmed to do so.
The JMSReplyTo header indicates which destination, if any, a JMS consumer should reply to. The JMSReplyTo header is set explicitly by the JMS client; its contents will be a javax.jms.Destination object (either Topic or Queue).
@Value("${jms.queue.name}")
private String queueName;
private void sendMessages() {
...
try {
jmsTemplate.convertAndSend(queueName);
} catch (Exception e) {
LOG.debug("Error ", e);
}
}
}
@JmsListener(destination = "${jms.queue.name}", containerFactory = "jmsListenerContainerFactory")
public void onMessage(final Message message) {
...
}
JMS with request-reply pattern
Example of JMS with JmsTemplate send API
Application.Properties
mq.hostName=MQ_SERVER_IP
mq.port=PORT
mq.queueManager=QUEUE.MANAGER.NAME
mq.CCSID=437
mq.username=mqm
mq.password=
mq.pubSubDomain=false
mq.receiveTimeout=20000
mq.myDestination=QUEUE_NAME
public class JmsQueueSender {
private JmsTemplate jmsTemplate;
//Referring to the value in the property file
@Value("${mq.myDestination}")
private String myDestination;
public void simpleSend(final String message) {
this.jmsTemplate.send(myDestination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message);
}
});
}
}
JMS with Message Driven Bean Class
Session beans allow you to send JMS messages and to receive them. The
message-driven bean class must implement the javax.jms.MessageListener
interface and the onMessage
method.
Example of Message Driven Beans to receive messages synchronously:
Application.Properties
@MessageDriven(
activationConfig = {
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
@ActivationConfigProperty(propertyName = "connectionFactoryJndiName", propertyValue = "jms/hConnectionFactory")
},
mappedName = "jms/destinationQueue")
@TransactionManagement(TransactionManagementType.CONTAINER)
@TransactionAttribute (TransactionAttributeType.NOT_SUPPORTED)
public class GenHealthMDB implements MessageListener {
private static final String INSTANCE_COUNT = "instanceCount";
private static final String MAKE_ACTIVE = "ACTIVE";
private static final String MAKE_INACTIVE = "INACTIVE";
private static Logger logger = Logger.getLogger(GenHealthMDB.class);
@Override
public void onMessage(Message message) {
...
}
}
Example of Message Driven Beans to receive messages asynchronously, xml defined queue:
Spring.XML
<message-driven>
<description>Message-Driven configured by using XML.</description>
<display-name>MDBFilTraitementAsyn</display-name>
<ejb-name>MDBFilTraitementAsyn</ejb-name>
<ejb-class>fr.mi.siv.mti.cip.trait.core.fil.mdb.MDBFilTraitementAsyn</ejb-class>
<message-destination-type>javax.jms.Queue</message-destination-type>
<activation-config>
<activation-config-property>
<activation-config-property-name>destination</activation-config-property-name>
<activation-config-property-value>queueTraitRequeteASyn</activation-config-property-value>
</activation-config-property>
<activation-config-property>
<activation-config-property-name>destinationType</activation-config-property-name>
<activation-config-property-value>javax.jms.Queue</activation-config-property-value>
</activation-config-property>
</activation-config>
</message-driven>
Application.Properties
public class MDBFilTraitementAsyn implements MessageListener {
public void onMessage(final Message message)
{
...
}
}
Example of Message-Driven Beans to receive messages asynchronously:
Application. Properties
mq.myDestination=QUEUE_NAME
Spring.XML
<bean id="jmsQueueListener" class="hu.vanio.jms.spring3.ibmmq.JmsQueueListener" />
<!-- and this is the message listener container -->
<jms:listener-container connection-factory="jmsQueueConnectionFactory">
<jms:listener destination="${mq.myDestination}" ref="jmsQueueListener" />
</jms:listener-container>
public class JmsQueueListener implements MessageListener {
public void onMessage(Message message) {
...
}
}
Example of Message Driven Beans with weblogic:
weblogic-ejb-jar.xml
Spring.XML
<?xml version="1.0" encoding="UTF-8"?>
<wls:weblogic-ejb-jar xmlns:wls="http://xmlns.oracle.com/weblogic/weblogic-ejb-jar"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/ejb-jar_3_0.xsd http://xmlns.oracle.com/weblogic/weblogic-ejb-jar http://xmlns.oracle.com/weblogic/weblogic-ejb-jar/1.2/weblogic-ejb-jar.xsd">
<wls:weblogic-enterprise-bean>
<wls:ejb-name>NotifieMDB</wls:ejb-name>
<wls:message-driven-descriptor>
<wls:pool>
<wls:max-beans-in-free-pool>15</wls:max-beans-in-free-pool>
<wls:initial-beans-in-free-pool>15</wls:initial-beans-in-free-pool>
</wls:pool>
<wls:destination-jndi-name>Notification_Queue</wls:destination-jndi-name>
</wls:message-driven-descriptor>
<wls:enable-call-by-reference>true</wls:enable-call-by-reference>
<wls:dispatch-policy>IFT.notification</wls:dispatch-policy>
</wls:weblogic-enterprise-bean>
</wls:weblogic-ejb-jar>
ejb-jar.xml
Spring.XML
<?xml version="1.0" encoding="UTF-8"?>
<ejb-jar xmlns="http://java.sun.com/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/ejb-jar_3_1.xsd"
version="3.0">
<enterprise-beans>
<message-driven>
<ejb-name>NotifieMDB</ejb-name>
<ejb-class>com.notification.mdb.NotifieMDB</ejb-class>
<transaction-type>Bean</transaction-type>
<message-destination-type>javax.jms.Queue</message-destination-type>
</message-driven>
</enterprise-beans>
</ejb-jar>
import javax.jms.Message;
import javax.jms.ObjectMessage;
public class NotifieMDB
{
public void onMessage(Message msg)
{
...
}
}
In order to recognize that Message Driven Bean is analyzed, the created objects have the properties CAST_RabbitMQ_Queue.exchangeName for topic and CAST_MQE_QueueCall.messengingSystem for queue set to MessageDrivenBean value.
JMS with JMSContext
JMSContext is the main interface in the simplified JMS API which combines in a single object Connection and Session.
import javax.jms.JMSConsumer;
import javax.jms.JMSContext;
import javax.jms.Queue;
import javax.jms.Topic;
...
public class Vendor {
@Resource(lookup = "java:comp/DefaultJMSConnectionFactory")
private static ConnectionFactory connectionFactory;
@Resource(lookup = "jms/AQueue")
private static Queue vendorOrderQueue;
@Resource(lookup = "jms/CQueue")
private static Queue vendorConfirmQueue;
@Resource(lookup = "jms/OTopic")
private static Topic supplierOrderTopic;
static Random rgen = new Random();
static int throwException = 1;
public static void main(String[] args) {
JMSConsumer vendorOrderReceiver;
MapMessage orderMessage;
JMSConsumer vendorConfirmReceiver;
VendorMessageListener listener;
Message inMessage;
MapMessage vendorOrderMessage;
Message endOfMessageStream;
Order order;
int quantity;
...
try (JMSContext context =
connectionFactory.createContext(JMSContext.SESSION_TRANSACTED);
JMSContext asyncContext =
context.createContext(JMSContext.SESSION_TRANSACTED);) {
vendorOrderReceiver = context.createConsumer(vendorOrderQueue);
orderMessage = context.createMapMessage();
vendorConfirmReceiver = asyncContext.createConsumer(
vendorConfirmQueue);
listener = new VendorMessageListener(asyncContext, 2);
vendorConfirmReceiver.setMessageListener(listener);
while (true) {
try {
inMessage = vendorOrderReceiver.receive();
if (inMessage instanceof MapMessage) {
vendorOrderMessage = (MapMessage) inMessage;
} else {
endOfMessageStream = context.createMessage();
endOfMessageStream.setJMSReplyTo(
vendorConfirmQueue);
context.createProducer().send(supplierOrderTopic,
endOfMessageStream);
context.commit();
break;
}
if (rgen.nextInt(4) == throwException) {
throw new JMSException(
"Simulated database concurrent access "
+ "exception");
}
order = new Order(vendorOrderMessage);
orderMessage.setInt(
"VendorOrderNumber",
order.orderNumber);
orderMessage.setJMSReplyTo(vendorConfirmQueue);
quantity = vendorOrderMessage.getInt("Quantity");
System.out.println(
"Vendor: Retailer ordered " + quantity
+ " " + vendorOrderMessage.getString("Item"));
orderMessage.setString("Item", "");
orderMessage.setInt("Quantity", quantity);
context.createProducer().send(supplierOrderTopic,
orderMessage);
System.out.println(
"Vendor: Ordered " + quantity
+ " CPU(s) and hard drive(s)");
context.commit();
System.out.println(
" Vendor: Committed transaction 1");
} catch (JMSException e) {
System.err.println(
"Vendor: JMSException occurred: "
+ e.toString());
context.rollback();
System.err.println(
" Vendor: Rolled back transaction 1");
}
}
listener.monitor.waitTillDone();
} catch (JMSRuntimeException e) {
System.err.println(
"Vendor: Exception occurred: " + e.toString());
}
}
}
Result of Queue creation:
Result of Topic creation:
JMS with AWS-SQS
SQSConnection class extends javax.jms.Connection. It can be used together with the JMS standard connection methods in order to create new queues.
import com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper;
import com.amazon.sqs.javamessaging.SQSConnection;
import com.amazon.sqs.javamessaging.SQSConnectionFactory;
import com.amazon.sqs.javamessaging.ProviderConfiguration;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration;
import javax.jms.*;
public class App
{
private static String queueName = "ymq_jms_example";
public static void main( String[] args ) throws JMSException
{
SQSConnectionFactory connectionFactory = new SQSConnectionFactory(
new ProviderConfiguration(),
AmazonSQSClientBuilder.standard()
.withRegion("ru-central1")
.withEndpointConfiguration(new EndpointConfiguration(
"https://message-queue.api.cloud.yandex.net",
"ru-central1"
))
);
SQSConnection connection = connectionFactory.createConnection();
AmazonSQSMessagingClientWrapper client = connection.getWrappedAmazonSQSClient();
if( !client.queueExists(queueName) ) {
client.createQueue( queueName );
}
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
Message message = session.createTextMessage("test message");
producer.send(message);
}
}
STOMP
Supported APIs:
Producer | Consumer |
---|---|
org.apache.activemq.transport.stomp.StompConnection.send org.springframework.messaging.simp.stomp.StompSession.send org.springframework.messaging.simp.SimpMessagingTemplate.send org.springframework.messaging.simp.SimpMessagingTemplate.doSend org.springframework.messaging.simp.SimpMessagingTemplate.convertAndSendToUser |
org.apache.activemq.transport.stomp.StompConnection.subscribe org.springframework.messaging.simp.stomp.StompSession.subscribe org.springframework.messaging.handler.annotation.MessageMapping org.springframework.messaging.handler.annotation.SendTo org.springframework.messaging.handler.annotation.SendToUser |
The prefix in STOMP /queue/ or /topic/ decides whether we create a Queue or a Topic. It is removed from the string before passing it to ActiveMQ Classic as a JMS destination.
Example with org.apache.activemq.transport.stomp.StompConnection.send API
import org.apache.activemq.transport.stomp.StompConnection;
import org.apache.activemq.transport.stomp.StompFrame;
public class StompConnectionExample {
public static void main(String[] args) {
StompConnection stompConnection = new StompConnection();
try {
// 1. Connect to ActiveMQ STOMP server
stompConnection.open("localhost", 61613); // Default STOMP port for ActiveMQ
// 2. Connect with STOMP protocol
stompConnection.connect("admin", "admin"); // Use ActiveMQ default credentials
// 3. Send a message to a queue
String queueName = "/queue/testQueue";
String message = "Hello from StompConnection!";
stompConnection.send(queueName, message);
System.out.println("Sent message: " + message);
// 4. Disconnect
stompConnection.disconnect();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
stompConnection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
Example of @MessageMapping with @SendTo annotations
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.stereotype.Controller;
@Controller
public class SocketController {
private static final Logger __logger = LoggerFactory.getLogger(SocketController.class);
@MessageMapping("/message.sendMessage")
@SendTo("/topic/input")
public SocketMessage sendMessage(@Payload SocketMessage socketMessage) {
return socketMessage;
}
@MessageMapping("/message.addClient")
@SendTo("/topic/client")
public SocketMessage addClient(
@Payload SocketMessage socketMessage, SimpMessageHeaderAccessor headerAccessor) {
headerAccessor.getSessionAttributes().put("clientuuid", socketMessage.getSender());
return socketMessage;
}
}
Example with org.apache.activemq.transport.stomp.StompConnection.subscribe API
import org.apache.activemq.transport.stomp.StompConnection;
public class StompTopicSubscriber {
public static void main(String[] args) {
String brokerUrl = "localhost";
int brokerPort = 61613;
String topicName = "/topic/example.topic";
StompConnection connection = new StompConnection();
try {
// Connect to ActiveMQ STOMP endpoint
connection.open(brokerUrl, brokerPort);
connection.connect("admin", "admin"); // Default ActiveMQ credentials
// Subscribe to the topic
connection.subscribe(topicName, "client-individual");
System.out.println("Subscribed to topic: " + topicName);
// Receive messages
while (true) {
org.apache.activemq.transport.stomp.StompFrame frame = connection.receive();
System.out.println("Received message: " + frame.getBody());
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
connection.disconnect();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
AWS-SQS
Supported APIs:
Producer | Consumer |
---|---|
com.amazonaws.services.sqs.AmazonSQS.sendMessage com.amazonaws.services.sqs.AmazonSQSClient.sendMessage com.amazonaws.services.sqs.AmazonSQS.sendMessageBatch com.amazonaws.services.sqs.AmazonSQSAsync.sendMessageBatchAsync com.amazonaws.services.sqs.AmazonSQSAsync.sendMessageAsync software.amazon.awssdk.services.sqs.SqsClient.sendMessage software.amazon.awssdk.services.sqs.SqsClient.sendMessageBatch org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend org.springframework.messaging.core.AbstractMessageSendingTemplate.send org.springframework.messaging.core.MessageSendingOperations.convertAndSend org.springframework.messaging.core.MessageSendingOperations.convertAndSend org.springframework.messaging.core.MessageSendingOperations.send io.awspring.cloud.messaging.core.support.AbstractMessageChannelMessagingSendingTemplate.convertAndSend io.awspring.cloud.messaging.core.support.AbstractMessageChannelMessagingSendingTemplate.send io.awspring.cloud.messaging.core.support.AbstractMessageChannelMessagingSendingTemplate.doSend |
com.amazonaws.services.sqs.AmazonSQS.receiveMessage com.amazonaws.services.sqs.AmazonSQSClient.receiveMessage com.amazonaws.services.sqs.AmazonSQSAsync.receiveMessageAsync software.amazon.awssdk.services.sqs.SqsClient.receiveMessage io.awspring.cloud.messaging.core.QueueMessagingTemplate.receive io.awspring.cloud.messaging.core.QueueMessagingTemplate.receiveAndConvert org.springframework.cloud.aws.messaging.core.QueueMessagingTemplate.receive org.springframework.cloud.aws.messaging.core.QueueMessagingTemplate.receiveAndConvert org.springframework.cloud.aws.messaging.listener.annotation.SqsListener |
AWS-SQS with sendMessage and receiveMessage APIs
public class AWSResources {
public static final String SQS_QUEUE_NAME = "reinvent-memes";
}
import static com.amazonaws.memes.AWSResources.SQS_QUEUE_NAME
public class MemeUtils {
public ImageMacro submitJob(String topCaption, String bottomCaption, String imageKey, String createdBy) {
String queueUrl = SQS.getQueueUrl(new GetQueueUrlRequest(SQS_QUEUE_NAME)).getQueueUrl();
SQS.sendMessage(new SendMessageRequest(queueUrl, macro.getId()));
}
}
import static com.amazonaws.memes.AWSResources.SQS_QUEUE_NAME
public void run() {
System.out.println("MemeWorker listening for work");
String queueUrl = SQS.getQueueUrl(new GetQueueUrlRequest(SQS_QUEUE_NAME)).getQueueUrl();
while (true) {
try {
ReceiveMessageResult result = SQS.receiveMessage(
new ReceiveMessageRequest(queueUrl).withMaxNumberOfMessages(1));
for (Message msg : result.getMessages()) {
executorService.submit(new MessageProcessor(queueUrl, msg));
}
sleep(1000);
} catch (InterruptedException e) {
Thread.interrupted();
throw new RuntimeException("Worker interrupted");
} catch (Exception e) {
// ignore and retry
}
}
}
AWS-SQS with sendMessage and receiveMessage APIs
AWS-SQS with sendMessage API; cross technologies linking with JmsListener API
private static final String DEFAULT_QUEUE_NAME = "test-sdk";
@RequestMapping(
path = "/sqs/message",
method = RequestMethod.POST,
consumes = MediaType.APPLICATION_JSON_VALUE)
public HttpEntity addMessage(@RequestBody @Valid SimpleMessage simpleMessage){
AmazonSQS sqs = AmazonSQSClientBuilder.defaultClient();
GetQueueUrlResult getQueueUrlResult = sqs.getQueueUrl(DEFAULT_QUEUE_NAME);
String queueUrl = getQueueUrlResult.getQueueUrl();
SendMessageRequest sendMessageRequest = new SendMessageRequest();
sendMessageRequest.setQueueUrl(queueUrl);
sendMessageRequest.setMessageBody(simpleMessage.getMessage());
SendMessageResult messageResult = sqs.sendMessage(sendMessageRequest);
return new ResponseEntity(messageResult, HttpStatus.CREATED);
}
AWS-SQS with sendMessage API
AWS-SQS with @SqsListener annotation example
@SqsListener(value = "${cloud.aws.queue.queueUrl}", deletionPolicy = SqsMessageDeletionPolicy.ALWAYS)
public void queueListener(@Payload Note message, @Headers Map<String, Object> sqsHeaders) {
...
}
AWS-SQS with QueueMessagingTemplate convertAndSend API
import com.amazonaws.services.sqs.AmazonSQSAsync;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.aws.messaging.core.QueueMessageChannel;
import org.springframework.cloud.aws.messaging.core.QueueMessagingTemplate;
import org.springframework.stereotype.Service;
@Service
public abstract class SqsQueueSender {
public void sendMessage(String queueUrl, Object message, String customerId) {
QueueMessageChannel messageChannel = new QueueMessageChannel(amazonSqs, queueUrl);
Map<String, Object> headers = new HashMap<>();
headers.put("CustomerId", customerId);
this.queueMessagingTemplate.convertAndSend(messageChannel, message, headers);
}
}
import com.amazonaws.services.sqs.AmazonSQSAsync;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
@Service
public class SqsPersonNotificaitonProducer extends SqsQueueSender {
@Value("${cloud.aws.queue.queueUrl}")
private String queueName;
@Autowired
public SqsNotificaitonProducer(AmazonSQSAsync amazonSqs) {
super(amazonSqs);
}
public void sendNotificationMessage(Object message, String customerId) {
sendMessage(queueName, message, customerId);
}
}
cloud:
aws:
queue:
queueUrl: "${cloud.aws.queue.hostName}/${cloud.aws.queue.queueName}"
Apache Kafka Patterns
KafkaProducer send() API example
public class Example {
public static void main(String[] args) {
private Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092");
kafkaProps.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); // basic serializer class for key
kafkaProps.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); // basic serializer class for value
KafkaProducer producer = new KafkaProducer<String, String>(kafkaProps);
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products","France");
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
}
}
KafkaConsumer subscribe() API example
public class Example {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(props);
consumer.subscribe(Collections.singletonList("customerCountries"));
}
}
Spring Kafka
@KafkaListener annotation on method example
@KafkaListener(topics = "${topic.name}", id="id")
public void listen(@Payload String message,
@Header(KafkaHeaders.OFFSET) int offset,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition){
...
}
Topic value is present in properties file:
topic.name=KAFKA_TOPIC
@KafkaListener annotation on method example (with list of topics)
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumerService {
@KafkaListener(topics = {"${kafka.event.contracting.topic}", "${kafka.legacyConsumerTopic}"})
public void listen(String message) {
...
}
}
Topic values are present in properties file:
kafka.event.contracting.topic={kafka.event.contracting.topic}
kafka.legacyConsumerTopic={kafka.legacyConsumerTopic}
@KafkaListener annotation on class with @KafkaHandler annotation example
@Service
@KafkaListener(topics = "${topic.name}")
public class KafkaConsumerService {
@KafkaHandler
public void listen(String message) {
}
}
Topic value is present in properties file:
topic.name=KAFKA_TOPIC
@KafkaListener and @SendTo annotations on method example
public class Receiver {
private static final Logger LOG = LoggerFactory.getLogger(Receiver.class);
@SendTo(BAR_TOPIC)
@KafkaListener(topics = FOO_TOPIC)
public Double calculate(Double data) {
LOG.info("calculating square root from='{}'", data);
return Math.sqrt(data);
}
@KafkaListener(topics = BAR_TOPIC)
public void result(Double data) {
LOG.info("received square root='{}'", data);
}
}
@Service
public class Sender {
private static final Logger LOG = LoggerFactory.getLogger(Sender.class);
@Autowired
private KafkaTemplate<String, Double> kafkaTemplate;
public void send(Double data){
LOG.info("sending data='{}' to topic='{}'", data, FOO_TOPIC);
kafkaTemplate.send(FOO_TOPIC, data);
}
}
public class Constants {
public static final String FOO_TOPIC = "foo.t";
public static final String BAR_TOPIC = "bar.t";
}
There are cases where @SendTo annotation has no value. This means that the default value is used: KafkaHeaders.REPLY_TOPIC. In this case we don’t create any object.
@KafkaListener meta annotation example
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@KafkaListener
public @interface MyListener{
@AliasFor(annotation = KafkaListener.class, attribute = "id")
String id();
@AliasFor(annotation = KafkaListener.class, attribute = "groupId")
String groupId() default "";
@AliasFor(annotation = KafkaListener.class, attribute = "topics")
String[] value() default {};
@AliasFor(annotation = KafkaListener.class, attribute = "concurrency")
String concurrency() default "3";
}
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@MyListener(id = "my.group", topics = "${topic.name}")
public void listen(String in) {
}
}
Topic value is present in properties file:
topic.name=KAFKA_TOPIC
KafkaTemplate send API example
@Value("${kafka.event.topic}")
private String topic;
@Override
public void send(Map<String, String> requestMap) {
String message = constructKafkaMessage(requestMap);
kafkaTemplate.send(topic, message);
}
Topic value is present in properties file:
kafka.event.topic={kafka.event.topic}
ReplyingKafkaTemplate sendAndReceive API example
@Bean
public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
return args -> {
ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");
RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
System.out.println("Sent ok: " + sendResult.getRecordMetadata());
ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
System.out.println("Return value: " + consumerRecord.value());
};
}
ContainerProperties setMessageListener API example (with KafkaMessageListenerContainer)
@Bean
public KafkaMessageListenerContainer<?, ?> container(ConsumerFactory<?, ?> consumerFactory) {
ContainerProperties props = new ContainerProperties("perf");
Map<String, Object> configs = new HashMap<>(
((DefaultKafkaConsumerFactory<?, ?>) consumerFactory).getConfigurationProperties());
configs.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 2 * 1024 * 1024);
configs.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1024 * 1024);
configs.put(ConsumerConfig.CHECK_CRCS_CONFIG, false);
props.setPollTimeout(100);
props.setConsumerRebalanceListener(new RebalanceListener());
Listener messageListener = new Listener();
props.setMessageListener(messageListener);
KafkaMessageListenerContainer<Object, Object> container = new KafkaMessageListenerContainer<>(
new DefaultKafkaConsumerFactory<>(configs), props);
messageListener.setContainer(container);
return container;
}
Messaging service type
For JMS, ActiveMQ, KAFKA and MessageDrivenBean, a JMS object is created, decorated with a property which specifies the vendor. Below is a table listing these properties and their values. For STOMP objects please refer to ActiveMQ section.
Vendor | Queue/Topic | Property name | Property value |
---|---|---|---|
JMS | Queue Producer | CAST_MQE_QueueCall.messengingSystem | JMS |
JMS | Queue Consumer | CAST_MQE_QueueReceive.messengingSystem | JMS |
JMS | Topic Producer | CAST_RabbitMQ_Exchange.exchangeName | JMS |
JMS | Topic Consumer | CAST_RabbitMQ_Queue.exchangeName | JMS |
ActiveMQ | Queue Producer | CAST_MQE_QueueCall.messengingSystem | ActiveMQ |
ActiveMQ | Queue Consumer | CAST_MQE_QueueReceive.messengingSystem | ActiveMQ |
ActiveMQ | Topic Producer | CAST_RabbitMQ_Exchange.exchangeName | ActiveMQ |
ActiveMQ | Topic Consumer | CAST_RabbitMQ_Queue.exchangeName | ActiveMQ |
KAFKA | Topic Producer | CAST_RabbitMQ_Exchange.exchangeName | KAFKA |
KAFKA | Topic Consumer | CAST_RabbitMQ_Queue.exchangeName | KAFKA |
MessageDrivenBean | Queue Consumer | CAST_MQE_QueueReceive.messengingSystem | MessageDrivenBean |
MessageDrivenBean | Topic Consumer | CAST_RabbitMQ_Queue.exchangeName | MessageDrivenBean |
For IBMMQ, RabbitMQ and AWS-SQS, specific objects are created.
Limitations
The following cases are not handled:
- When the queue name is given at the runtime i.e. when Queue name is not initialized anywhere in the code and is given dynamically during the session/connection.