Message Queues - 1.3

Extension ID

com.castsoftware.mqe

What’s new?

Please see Message Queues 1.3 - Release Notes for more information.

Description

This extension should be installed when analyzing projects containing Message Queue applications, and you want to view a transaction consisting of queue calls and queue receive objects with their corresponding links. This version supports Message Queues for:

Technology Description Supported
Java Plain Java
Java Spring
Java JMS
Java AWS-SQS
Mainframe Supported via the Mainframe analyzer. See Support for IBM MQSeries

Supported Message Queue versions

Message Queue Version Support
ActiveMQ 5.15.3
  • OpenWire + JMS
  • Spring + JMS with XML based configuration
  • JMS with SpringBoot
IBM MQ 6.0.0, 8.0.0,
  • 9.0.0
    • Spring + JMS with XML and Annotation based configuration
    • SpringBoot (when queue is autowired in different file)
    • Plain Java
    RabbitMQ 3.6.9
    • AMQP + SLF4J
    • Spring AMQP + Spring Rabbit with XML based configuration
    • Spring AMQP with SpringBoot
    JMS 1.x, 2.x
    • JMS Queue
    • JMS Topic
    • JMSContext
    AWS-SQS 1.x, 2.x, 3.x
    • Simple Queue Service
    KAFKA 2.6.0
    • Apache Kafka Patterns : send/subscribe
    • Spring Kafka

    Compatibility

    CAST Imaging Core release Supported
    ≥ 8.3.x

    Dependencies with other extensions

    Some CAST extensions require the presence of other CAST extensions in order to function correctly. The Message Queue extension requires that the following other CAST extensions are also installed:

    Download and installation instructions

    The extension will not be automatically downloaded and installed in CAST Console. If you need to use it, should manually install the extension using the Application - Extensions interface:

    Source code discovery

    The Message Queues extension does not contain any discoverers or extractors, therefore, no “Message Queue” specific projects will be detected. Your Message Queue source code should be part of a larger Java/JEE related project which you are also analyzing, and as such, JEE Analysis Units will be created  - simply ensure that the path to your Message Queues source code is included in these JEE Analysis Units: browse to the Application - Config panel and expand the JEE Technology option (3):

    What results can you expect?

    Objects

    Icons Description
    • IBM MQ Java Queue Publisher
    • IBM MQ Java Topic Publisher
    • RabbitMQ Java Queue Publisher
    • JMS Java Queue Publisher
    • JMS Java Topic Publisher
    • Java AWS Simple Queue Service Publisher

    IBM MQ Java Queue Receiver
  • IBM MQ Java Topic Receiver
  • RabbitMQ Java Queue Receiver
  • JMS Java Queue Receiver
  • JMS Java Topic Receiver
  • Java AWS Simple Queue Service Receiver

  • IBM MQ Java Unknown Queue Publisher
  • IBM MQ Java Unknown Topic Publisher
  • RabbitMQ Unknown Java Queue Publisher
  • JMS Java Unknown Queue Publisher
  • JMS Java Unknown Topic Publisher
  • Java AWS Simple Queue Service Unknown Publisher

  • IBM MQ Java Unknown Queue Receiver
  • IBM MQ Java Unknown Topic Receiver
  • RabbitMQ Unknown Java Queue Receiver
  • JMS Java Unknown Queue Receiver
  • JMS Java Unknown Topic Receiver
  • Java AWS Simple Queue Service Unknown Receiver

  • RabbitMQ Java Exchange Declaration
  • RabbitMQ Java Queue Bind
  • RabbitMQ Java Exchange Bind
  • For IBM MQ, Call link is created between:

    • Producer method object and IBM MQ Java Queue Publisher object, at the analyser level
    • IBM MQ Java Queue Receiver object and consumer method object at the analyser level
    • IBM MQ Java Queue Publisher object and IBM MQ Java Queue Receiver object, at the Application level by Web Services Linker
    • Producer method object and IBM MQ Java Topic Publisher object, at analyzer level
    • IBM MQ Java Topic Receiver object and Consumer method object, at analyzer level
    • IBM MQ Java Topic Publisher object and IBM MQ Java Topic Receiver object, at the Application level by Web Services Linker

    For RabbitMQ, Call link is created between:

    • Producer method object and RabbitMQ Java Queue Publisher object, at the analyser level
    • RabbitMQ Java Queue Receiver object and Consumer method object, at the analyser level
    • RabbitMQ Java Queue Publisher object and RabbitMQ Java Queue Receiver object, at the application level by Web Services Linker

    For JMS, Call link is created between:

    • Producer method object and JMS Java Queue Publisher object, at analyzer level
    • JMS Java Queue Receiver object and consumer method object, at analyzer level
    • JMS Java Queue Publisher object and JMS Java Queue Receiver object, at the Application level by Web Services Linker
    • Producer method object and JMS Java Topic Publisher object, at analyzer level
    • JMS Java Topic Receiver object and Consumer method object, at analyzer level
    • JMS Java Topic Publisher object and JMS Java Topic Receiver object, at the Application level by Web Services Linker

    For Kafka, Call link is created between:

    • Producer method object and JMS Java Topic Publisher object, at analyzer level
    • JMS Java Topic Receiver object and Consumer method object, at analyzer level
    • JMS Java Topic Publisher object and JMS Java Topic Receiver object, at the Application level by Web Services Linker

    For AWS-SQS, Call link is created between:

    • Producer method object and Java AWS Simple Queue Service Publisher object, at analyzer level
    • Java AWS Simple Queue Service Receiver object and Consumer method object, at analyzer level
    • Java AWS Simple Queue Service Publisher object and Java AWS Simple Queue Service Receiver object, at the Application level by Web Services Linker

    JMS with ActiveMQ

    Example of JMS with ActiveMQ (Spring-XML)

     Expand source

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
      http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
    
        <!-- JmsTemplate Definition -->
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <property name="connectionFactory" ref="connectionFactory" />
            <property name="defaultDestination" ref="destinationQueue" />
            <property name="messageConverter" ref="myMessageConverter" />
        </bean>
    
        <bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
            <constructor-arg index="0" value="tcp://localhost:61616" />
        </bean>
    
        <!-- ConnectionFactory Definition -->
        <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
            <constructor-arg ref="amqConnectionFactory" />
        </bean>
    
        <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg index="0" value="IN_QUEUE" />
        </bean>
    
        <bean id="SampleJmsMessageSender" class="com.baeldung.spring.jms.SampleJmsMessageSender">
            <property name="queue" ref="destinationQueue" />
            <property name="jmsTemplate" ref="jmsTemplate" />
        </bean>
    
        <bean id="myMessageConverter" class="com.baeldung.spring.jms.SampleMessageConverter" />
    
        <!-- this is the Message-Driven POJO (MDP) -->
        <bean id="messageListener" class="com.baeldung.spring.jms.SampleListener">
            <property name="jmsTemplate" ref="jmsTemplate" />
            <property name="queue" ref="destinationQueue" />
        </bean>
    
        <bean id="errorHandler" class="com.baeldung.spring.jms.SampleJmsErrorHandler" />
    
        <!-- and this is the message listener container -->
        <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory" />
            <property name="destinationName" value="IN_QUEUE" />
            <property name="messageListener" ref="messageListener" />
            <property name="errorHandler" ref="errorHandler" />
        </bean>
    </beans>
    

    Example of JMS with ActiveMQ Publisher convertAndSend API - Queue is stored in XML file

     Expand source

    private JmsTemplate jmsTemplate;
    private Queue queue;
    public void setJmsTemplate(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }
    
    public void setQueue(Queue queue) {
        this.queue = queue;
    }
    public void sendMessage(final Employee employee) {
         this.jmsTemplate.convertAndSend(employee);
    }
    

    Example of JMS with ActiveMQ Publisher send API - Queue is stored in XML file

     Expand source

    private JmsTemplate jmsTemplate;
    private Queue queue;
    public void setJmsTemplate(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }
    
    public void setQueue(Queue queue) {
        this.queue = queue;
    }
    public void simpleSend() {
        jmsTemplate.send(queue, s -> s.createTextMessage("hello queue world"));
    }
    

    Example of JMS with ActiveMQ Receiver (Springboot)

     Expand source

    public class OrderConsumer {
        public static final String ORDER_QUEUE = "Queue_Anno";
        private static Logger log = LoggerFactory.getLogger(OrderConsumer.class);
    
        Order received;
        private CountDownLatch countDownLatch;
    
    
        @JmsListener(destination = ORDER_QUEUE)
        public void receiveMessage(@Payload Order order,
                                   @Headers MessageHeaders headers,
                                   Message message, Session session) {
        }
    }
    

    Example of JMS with ActiveMQ - JNDI is used to store Queue

     Expand source

      public QBorrower() throws NamingException, JMSException {
        Context ctx=new InitialContext();
        QueueConnectionFactory connectionFactory=(QueueConnectionFactory)ctx.lookup("ConnectionFactory");
        queueConnection=connectionFactory.createQueueConnection();
        requestQueue=(Queue)ctx.lookup("jms.LoanRequestQueue");
        responseQueue=(Queue)ctx.lookup("jms.LoanResponseQueue");
        queueConnection.start();
        queueSession=queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
        }
     
     private void sendLoanRequest(double salary,double loanAmount) throws JMSException {
        MapMessage message=queueSession.createMapMessage();
        message.setDoubleProperty("salary", salary);
        message.setDoubleProperty("loanAmount", loanAmount);
        message.setJMSReplyTo(responseQueue);
        QueueSender sender=queueSession.createSender(requestQueue);
            QueueReceiver queueReceiver=queueSession.createReceiver(responseQueue);
        sender.send(message);
        }
    

    In order to recognize that ActiveMQ is analyzed, the created objects have the properties CAST_RabbitMQ_Queue.exchangeName for topic and CAST_MQE_QueueCall.messengingSystem for queue set to ActiveMQ value.

    IBM MQ

    Example of IBM MQ Producer and Consumer (Plain Java)

    com.ibm.mq.MQDestination.put and com.ibm.mq.MQDestination.get APIs are associated with com.ibm.mq.MQQueueManager.accessQueue and com.ibm.mq.MQQueueManager.accessTopic APIs. Here is an example with accessQueue API which indicates the name of the queue where the message is sent.

     Expand source

    public static void main(String args[]) {
                int openOptions = MQC.MQOO_INQUIRE + MQC.MQOO_FAIL_IF_QUIESCING + MQC.MQOO_INPUT_SHARED;
                MQQueue q = qMgr.accessQueue("SYSTEM.DEFAULT.LOCAL.QUEUE",openOptions,null,null,null);
                MQMessage mBuf = new MQMessage();
                MQPutMessageOptions pmo = new MQPutMessageOptions();
                do {
                    runShow = br.readLine();
                    if (runShow.length() > 0) {
                        mBuf.clearMessage();                // reset the buffer
                        mBuf.correlationId = 1; // set correlationId
                        mBuf.messageId = 1;     // set messageId
                        mBuf.writeString(runShow);          // set actual message
                        System.out.println("--> writing message to queue");
                        q.put(mBuf,pmo);      // put the message out on the queue
                        }
                    } while (runShow.length() > 0);
                q.close();
                qMgr.disconnect();
                }
            } catch (MQException ex) {
                System.out.println(
            "WMQ exception occurred : Completion code ");
            }
        }
    

     Expand source

     private void read() throws MQException
     {
      
       MQQueue queue = _queueManager.accessQueue( inputQName,
                                       openOptions,
                                       null,           // default q manager
                                       null,           // no dynamic q name
                                       null );         // no alternate user id
    
       MQGetMessageOptions getOptions = new MQGetMessageOptions();
       getOptions.options = MQC.MQGMO_NO_WAIT + MQC.MQGMO_FAIL_IF_QUIESCING + MQC.MQGMO_CONVERT;
       while(true)
       {
         MQMessage message = new MQMessage();
         try
         {
          queue.get(message, getOptions);
          byte[] b = new byte[message.getMessageLength()];
          message.readFully(b);
          System.out.println(new String(b));
          message.clearMessage();
         }
       }
       queue.close();
       _queueManager.disconnect();
     }
    

    Example of IBM MQ Publisher (JMS Interface)

     Expand source

    public int sendMessage(int type, String msg) { 
            System.out.println("sendMessage type "+type);
            System.out.println("msg = "+msg);
                
            if(type == TYPE_CAP)
            {
                port=1414;
                queueManager="QM1";
                queueName="IBM_QUEUE_1";
            }
            else if(type == TYPE_MEASURE)
            {
                port=1415;
                queueManager="QM2";
                queueName="IBM_QUEUE_2";
            }
            else if(type == TYPE_WOOUT)
            {
                port=1415;
                queueManager="QM3";
                queueName="IBM_QUEUE_3";
            }
            else
                return -1;
            
                System.out.println(port+","+queueManager+","+queueName);
    
              int status = 200;
              MQQueueConnectionFactory cf = null;
              MQQueueConnection connection = null;
              MQQueueSession session = null;
              MQQueue queue = null;
              MQQueueSender sender = null;
    
              try {
               cf = new MQQueueConnectionFactory();
               cf.setHostName(host);// host
               cf.setPort(port);// port
               cf.setTransportType(1);// JMSC.MQJMS_TP_CLIENT_MQ_TCPIP
               cf.setQueueManager(queueManager);// queue
               cf.setChannel(channel);// channel
    
               connection = (MQQueueConnection) cf.createQueueConnection();
               session = (MQQueueSession) connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
               queue = (MQQueue) session.createQueue(queueName);// queue
                                      // name
               sender = (MQQueueSender) session.createSender(queue);
               JMSTextMessage message = (JMSTextMessage) session.createTextMessage(msg);
               
               // Start the connection
               connection.start();
            
               // DO NOT MAKE LOOP!!!
               sender.send(message);  
               
              } catch (JMSException e){
                  e.printStackTrace();
              } finally {
                   try {
                    sender.close();
                   } catch (Exception e) {
                   }
                   try {
                    session.close();
                   } catch (Exception e) {
                   }
                   if(connection != null){
                       try {
                        connection.close();
                       } catch (JMSException e) {
                           e.printStackTrace();
                       }
                   }
              }
              
              return status; 
    }
    

    Example of IBM MQ Topic Publisher (JMS Interface)

     Expand source

    public class SimplePubSub {   
      
        public static void main(String[] args) {     
            try {       
                MQTopicConnectionFactory cf = new MQTopicConnectionFactory(); 
                // Config       
                cf.setHostName("localhost");       
                cf.setPort(1414);       
                cf.setTransportType(JMSC.MQJMS_TP_CLIENT_MQ_TCPIP);       
                cf.setQueueManager("QM_thinkpad");       
                cf.setChannel("SYSTEM.DEF.SVRCONN");       
                
                MQTopicConnection connection = (MQTopicConnection) cf.createTopicConnection();       
                MQTopicSession session = (MQTopicSession) connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);       
                MQTopic topic = (MQTopic) session.createTopic("topic://foo");       
                MQTopicPublisher publisher =  (MQTopicPublisher) session.createPublisher(topic);                   
                long uniqueNumber = System.currentTimeMillis() % 1000;       
                
                JMSTextMessage message = (JMSTextMessage) session.createTextMessage("SimplePubSub "+ uniqueNumber);            
                // Start the connection       
                connection.start();       
                publisher.publish(message);       
                System.out.println("Sent message:\\n" + message);
                
                publisher.close();           
                session.close();       
                connection.close();       
                System.out.println("\\nSUCCESS\\n");     
            }   
            catch (JMSException jmsex) {       
                System.out.println(jmsex);       
                System.out.println("\\nFAILURE\\n");     
            }     
            catch (Exception ex) {       
                System.out.println(ex);       
                System.out.println("\\nFAILURE\\n");     
            }   
        } 
    }
    

    RabbitMQ

    Supported APIs:

    Publisher Receiver

    org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend

    org.springframework.amqp.rabbit.core.RabbitTemplate.convertSendAndReceive

    org.springframework.amqp.rabbit.core.RabbitTemplate.sendAndReceive

    org.springframework.amqp.core.AmqpTemplate.convertAndSend

    org.springframework.amqp.rabbit.core.RabbitTemplate.sendAndReceive

    org.springframework.amqp.rabbit.core.RabbitTemplate.send

    com.rabbitmq.client.Channel.basicPublish

    org.springframework.amqp.rabbit.core.RabbitTemplate.receiveAndConvert

    org.springframework.amqp.rabbit.core.RabbitTemplate.receiveAndReply

    org.springframework.amqp.rabbit.core.RabbitTemplate.receive

    com.rabbitmq.client.Channel.basicConsume


    Configuration APIs

    com.rabbitmq.client.Channel.exchangeDeclare

    com.rabbitmq.client.Channel.queueBind

    com.rabbitmq.client.Channel.exchangeBind


    org.springframework.amqp.core.BindingBuilder.bind

    org.springframework.amqp.core.BindingBuilder.DestinationConfigurer.to

    org.springframework.amqp.core.BindingBuilder.DirectExchangeRoutingKeyConfigurer.with

    org.springframework.amqp.core.BindingBuilder.GenericExchangeRoutingKeyConfigurer.with

    org.springframework.amqp.core.BindingBuilder.TopicExchangeRoutingKeyConfigurer.with

    Using these configuration APIs, we create three new objects, called configuration objects (RabbitMQ Java Exchange Declaration, RabbitMQ Java Queue Bind, RabbitMQ Java Exchange Bind), which help us link the RabbitMQ Publisher and the RabbitMQ Receiver following the linking rules established by Web Services Linker.

    Starting com.castsoftware.mqe 1.3.0-alpha1 you must use com.castsoftware.wbslinker 1.7.25-funcrel because this new release is based on the new linking protocol for RabbitMQ.

    Example of Spring AMQP RabbitMQ Publisher

     Expand source

             @Service
             public class CustomMessageSender {
                 private static final Logger log = LoggerFactory.getLogger(CustomMessageSender.class);
                 private final RabbitTemplate rabbitTemplate;
                 @Autowired
                 public CustomMessageSender(final RabbitTemplate rabbitTemplate) {
                     this.rabbitTemplate = rabbitTemplate;
                 }
             @Scheduled(fixedDelay = 3000L)
             public void sendMessage() {
                 final CustomMessage message = new CustomMessage("Hello there!", new Random().nextInt(50), false);
                 log.info("Sending message...");
                 rabbitTemplate.convertAndSend(MessagingApplication.EXCHANGE_NAME, MessagingApplication.ROUTING_KEY, message);
             }  }
    

    Example of Spring AMQP RabbitMQ Receiver (support of @RabbitListener annotation)

     Expand source

        @Service
        public class CustomMessageListener {
            private static final Logger log = LoggerFactory.getLogger(CustomMessageListener.class);
            @RabbitListener(queues = MessagingApplication.QUEUE_GENERIC_NAME)
            public void receiveMessage(final Message message) {
                log.info("Received message as generic: {}", message.toString());
            }
            @RabbitListener(queues = MessagingApplication.QUEUE_SPECIFIC_NAME)
            public void receiveMessageSpecific(final CustomMessage customMessage) {
                log.info("Received message as specific class: {}", customMessage.toString());
            } }
    

    Example of SpringBoot RabbitMQ Exchange-Queue Binding configuration

     Expand source

    public class MessagingApplication implements RabbitListenerConfigurer{
         public static final String EXCHANGE_NAME = "appExchange";
         public static final String QUEUE_GENERIC_NAME = "appGenericQueue";
         public static final String QUEUE_SPECIFIC_NAME = "appSpecificQueue";
         public static final String ROUTING_KEY = "messages.key";
         public static void main(String[] args) {
              SpringApplication.run(MessagingApplication.class, args);
         }
         @Bean
          public TopicExchange appExchange() {
              return new TopicExchange(EXCHANGE_NAME);
         }
         @Bean
         public Queue appQueueGeneric() {
              return new Queue(QUEUE_GENERIC_NAME);
         }
         @Bean
         public Queue appQueueSpecific() {
              return new Queue(QUEUE_SPECIFIC_NAME);
         }
         @Bean
         public Binding declareBindingGeneric() {
              return BindingBuilder.bind (appQueueGeneric()).to(appExchange()).with(ROUTING_KEY);
         }
         @Bean
         public Binding declareBindingSpecific() {
              return BindingBuilder.bind(appQueueSpecific()).to(appExchange()).with(ROUTING_KEY);
         }
    

    *One to Many: RabbitMQ Topic Exchange bound to two Queues *

    Configuration objects used by web service linker to do the linking between Publisher and  Receiver

    Example of RabbitMQ Java Exchange Declaration object properties:

    Example of RabbitMQ Java Queue Bind object properties:

    RabbitMQ basicPublish and exchangeDeclare example with topic-exchange

     Expand source

    public class EmitLogTopic {
    
      private static final String EXCHANGE_NAME = "topic_logs";
    
      public static void main(String[] argv) {
        Connection connection = null;
        Channel channel = null;
        try {
          ConnectionFactory factory = new ConnectionFactory();
          factory.setHost("localhost");
    
          connection = factory.newConnection();
          channel = connection.createChannel();
    
          channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    
          String routingKey = "tp_key";
          String message = getMessage(argv);
    
          channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
          System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
    
        }
        catch  (Exception e) {
          e.printStackTrace();
        }
        finally {
          if (connection != null) {
            try {
              connection.close();
            }
            catch (Exception ignore) {}
          }
        }
      }
    ...
    }
    

     Expand source

    public class ReceiveLogsTopic {
    
      private static final String EXCHANGE_NAME = "topic_logs";
    
      public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
    
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String queueName = "topic_queue";
    
        if (argv.length < 1) {
          System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
          System.exit(1);
        }
        channel.queueBind(queueName, EXCHANGE_NAME, "tp_key");
        ...
        channel.basicConsume(queueName, true, consumer);
    }
    

    RabbitMQ Publisher object properties:

    RabbitMQ Receiver object properties:

    RabbitMQ MessageListener with spring xml queue declaration

     Expand source

    import javax.jms.*;
    
    public class MessageReceiver implements MessageListener {
    
        public void onMessage(Message message) {
            if(message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    String text = textMessage.getText();
                    System.out.println(String.format("Received: %s",text));
                    try {
                        Thread.sleep(100);
    
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    

     Expand source

        <!-- Queues -->
    
        <rabbit:queue id="springQueue" name="spring.queue" auto-delete="true" durable="false"/>
    
        <rabbit:listener-container connection-factory="connectionFactory">
            <rabbit:listener queues="springQueue" ref="messageListener"/>
        </rabbit:listener-container>
    
        <bean id="messageListener" class="com.ndpar.spring.rabbitmq.MessageHandler"/>
    
        <!-- Bindings -->
    
        <rabbit:fanout-exchange name="amq.fanout">
            <rabbit:bindings>
                <rabbit:binding queue="springQueue"/>
            </rabbit:bindings>
        </rabbit:fanout-exchange>
    

    RabbitMQ Queue object properties:

    @RabbitListener with @RabbitHandler

     Expand source

    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.messaging.handler.annotation.Payload;
    
    import java.util.Date;
    
    @RabbitListener(queues = "foo")
    public class Listerner {
    
        @RabbitHandler
        public void process(@Payload String foo) {
            System.out.println(new Date() + ": " + foo);
        }
    }
    

    Example of application where all 3 configuration objects are present

     Expand source

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Send {
    
        private static final String EXCHANGE_NAME = "exchange";
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
    
        Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
                String message = argv.length < 1 ? "info: Hello Worldss!" :
                        String.join(" ", argv);
    
                channel.basicPublish(EXCHANGE_NAME, "test1", null, message.getBytes("UTF-8"));
    
                System.out.println(" [x] Sent '" + message + "'");        
        }
    }
    

     Expand source

    import com.rabbitmq.client.*;
    
    public class Subs {
        public static void main(String[] args) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHostName("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            String queueName = channel.queueDeclare("queueName", false, false, false, null).getQueue();
            String queueName1 = channel.queueDeclare("queueName1", false, false, false, null).getQueue();
            String queueName2 = channel.queueDeclare("queueName2", false, false, false, null).getQueue();
            String queueName3 = channel.queueDeclare("queueName3", false, false, false, null).getQueue();
            String queueName5 = channel.queueDeclare("queueName5", false, false, false, null).getQueue();
    
            channel.exchangeDeclare("exchange", "fanout");
            channel.exchangeDeclare("exchange2", "direct");
            channel.exchangeDeclare("exchange3", "direct");
    
            channel.exchangeBind("exchange", "exchange2", "test1");
            channel.exchangeBind("exchange3", "exchange", "test1");
    
            channel.queueBind(queueName, "exchange", "test1");
            channel.queueBind(queueName2, "exchange", "testxx");
            channel.queueBind(queueName1, "exchange2", "test1");
            channel.queueBind(queueName3, "exchange2", "test5");
            channel.queueBind(queueName5, "exchange3", "test1");
            channel.queueBind(queueName5, "exchange3", "test2");
    
            System.out.println(" [*] Waiting for logs.");
    
            Consumer consumer = new DefaultConsumer(channel) {
            @Override
              public void handleDelivery(String consumerTag, Envelope envelope,
                                         AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
              }
            };
    
            channel.basicConsume(queueName, true, consumer);
            channel.basicConsume(queueName1, true, consumer);
            channel.basicConsume(queueName2, true, consumer);
            channel.basicConsume(queueName3, true, consumer);
            channel.basicConsume(queueName5, true, consumer);
    
            System.out.println(" Press [enter] to exit.");
            System.in.read();
    
            channel.close();
            connection.close();
        }
    }
    

    Example of RabbitMQ Java Exchange Declaration object properties:

    Example of RabbitMQ Java Queue Bind object properties:

    Example of RabbitMQ Java Exchange Bind object properties:

    JMS

    Example of JMS Queue with send and receive patterns using JNDI binding for Queue names defined in beans

     Expand source

    public String transmit(String xmlRequest) throws Throwable {
       String xmlResponse = null; // Transmit the message and get a response.
       String requestQueue = "java:comp/env/ServiceRequestQueue";
       String responseQueue = "java:comp/env/ServiceResponseQueue";
       JMSDestination messageDest = new JMSDestination(requestQueue, responseQueue);
       //19.1 Queue changes end
       xmlResponse = messageDest.sendAndReceive(xmlRequest);
    }
    

    The sendAndReceive() method:

     Expand source

    public String sendAndReceive(String message) throws ServiceException {
    
            String responseXml = null;
    
            QueueConnection connection = null;
            QueueSession session = null;
            Throwable thrown = null;
    
            try {
                // Create a connection and start it.
    
                connection = qcf.createQueueConnection();
                connection.start();
    
                // Create a session.
                session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
    
                String correlationID = send(message, session);
    
                responseXml = receive(correlationID, session, message);
            } catch (ServiceException serviceException ) {
                throw serviceException ;
            } finally {
                // Release resources.
                close(session);
                close(connection);
            }
            // Return the response.
            return responseXml;
        }
    

    The send() method:

     Expand source

    public String send(String message, QueueSession session) throws Throwable {
            QueueSender sender = null;
    
            try {
                // Create the sender queue.
                sender = session.createSender(requestQueue);
                sender.setTimeToLive(expiry);
    
                TextMessage outMessage = (TextMessage) session.createTextMessage(message);
    
                outMessage.setJMSReplyTo(responseQueue);
                outMessage.setJMSCorrelationID(correlationID);
    
                // Override dead message queue with desired response queue
                outMessage.setBooleanProperty(Constants.PRESERVE_UNDELIVERED, true);
    
                sender.send(outMessage);
                ...
            }
    }
    

    The receive() method:

     Expand source

    public String receive(String correlationID, QueueSession session, String message) throws Throwable {
            ...
            QueueReceiver receiver = null;
            try {
                receiver = session.createReceiver(responseQueue, ...);
    
                TextMessage inMessage = (TextMessage) receiver.receive(timeout);
    
            }
            ...
        }
    

    The XML file where binding is defined:

    <session name="ServiceApplication" simple-binding-name="ejb/com/iwm/example/services/ServiceApplication">
        <resource-ref name="ServiceRequestQueue" binding-name="jms/ServiceRequestQueue" />
    

    JMS with send and receive patterns using JNDI binding for Queue names defined in beans.

    JMS with send and receive patterns using JNDI binding for Queue names not defined in beans.

    Example of JMS Topic with publish pattern

     Expand source

    public class JMSDestination {
        ...
        requestTopic = 'pub/jms/topic';
        public String send(String msg, TopicSession session) throws Throwable
        {
            TopicPublisher publisher = null;
            try
            {
                ...
                publisher = session.createPublisher(requestTopic);
                publisher.setTimeToLive(expiry);
                TextMessage outMsg = session.createTextMessage(msg);
                publisher.publish(outMsg);
    
            }
            ...
        }
    }
    

     Expand source

        private void main() {
            String xmlRq = "messageToSend";
            JMSDestination msgDest = new JMSDestination();
            String xmlRs = msgDest.send(xmlRq);
                            
        }
    

    JMS with publish pattern for Topic

    *
    *

    Example of JMS asynchronous messaging

    The receive() method from MessageConsumer class alows receiving messages synchronously. When calling this method the message is received or not. The MessageListener interface defines a listener for receiving messages asynchronously. In this case, the onMessage() method will be called when a new message is received at the destination.The listener is registered using the setMessageListener() method from MessageConsumer() class.

     Expand source

       private TopicConnection getTopicConnection() throws JMSException, NamingException, FileNotFoundException,
          IOException, SQLException
       {
          try
          {
             Properties jmsProperties = SenderUtils.loadPropertiesFromFile("jms.properties");
             String jTopicName = "topicListener";
             final String JMS_FACTORY = "javax.jms.TopicConnectionFactory";
    
             InitialContext ctx = getInitialContext(url);
             TopicConnectionFactory tconFactory = (TopicConnectionFactory) ctx.lookup(JMS_FACTORY);
             jtcon = tconFactory.createTopicConnection();
             jtsession = jtcon.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
             Topic jtopic = (Topic) ctx.lookup(jTopicName);
             jtopicPublisher = jtsession.createPublisher(jtopic);
    
             TopicSubscriber jtopicSubscriber = jtsession.createSubscriber(jtopic, selectorString, false);
             MsgListener jtopicListener = new MsgListener(service);
             jtopicSubscriber.setMessageListener(jtopicListener);
             jtcon.setExceptionListener(new ExceptionListener()
             {
                public void onException(JMSException arg0)
                {
                   logger.error("onException invoked for: " + arg0.getMessage());
                   restartConnection();
                }
             });
             return jtcon;
          }
       }
    

    JMS with setMessageListener pattern for Topic (asynchronous messaging)

    *
    *

    Example of JMS request-reply pattern

    In some cases, the JMS client will want the message consumers to reply to a temporary topic or queue set up by the JMS client. When a JMS message consumer receives a message that includes a JMSReplyTo destination, it can reply using that destination. A JMS consumer is not required to send a reply, but in some JMS applications, clients are programmed to do so.

    The JMSReplyTo header indicates which destination, if any, a JMS consumer should reply to. The JMSReplyTo header is set explicitly by the JMS client; its contents will be a javax.jms.Destination object (either Topic or Queue).

     Expand source

    @Value("${jms.queue.name}")
    private String queueName;
    
    private void sendMessages() {
            ...
                try {
                    jmsTemplate.convertAndSend(queueName);
                } catch (Exception e) {
                    LOG.debug("Error ", e);
                }
            }
        }
    

     Expand source

    @JmsListener(destination = "${jms.queue.name}", containerFactory = "jmsListenerContainerFactory")
    public void onMessage(final Message message) {
         ...
    }
    

    *JMS with request-reply pattern
    *

    *
    *

    Example of JMS with JmsTemplate send API

    Application. Properties Expand source

    mq.hostName=MQ_SERVER_IP
    mq.port=PORT
    mq.queueManager=QUEUE.MANAGER.NAME
    mq.CCSID=437
    mq.username=mqm
    
    mq.password= 
    mq.pubSubDomain=false
    mq.receiveTimeout=20000
    
    mq.myDestination=QUEUE_NAME
    

     Expand source

    public class JmsQueueSender {
    
        private JmsTemplate jmsTemplate;
        //Referring to the value in the property file
        @Value("${mq.myDestination}")
        private String myDestination;
    
        public void simpleSend(final String message) {
            this.jmsTemplate.send(myDestination, new MessageCreator() {
                public Message createMessage(Session session) throws JMSException {
                    return session.createTextMessage(message);
                }
            });
        }
    }
    

    JMS with Message Driven Bean Class

    Session beans allow you to send JMS messages and to receive them. The message-driven bean class must implement the javax.jms.MessageListener interface and the onMessage method.

    Example of Message Driven Beans to receive messages synchronously:

    Application. Properties Expand source

    @MessageDriven(
            activationConfig = {
                    @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
                    @ActivationConfigProperty(propertyName = "connectionFactoryJndiName", propertyValue = "jms/hConnectionFactory")
            },
            mappedName = "jms/destinationQueue")
    @TransactionManagement(TransactionManagementType.CONTAINER)
    @TransactionAttribute (TransactionAttributeType.NOT_SUPPORTED)
    public class GenHealthMDB implements MessageListener {
        private static final String INSTANCE_COUNT = "instanceCount";
        private static final String MAKE_ACTIVE = "ACTIVE";
        private static final String MAKE_INACTIVE = "INACTIVE";
        
        private static Logger logger = Logger.getLogger(GenHealthMDB.class);
    
        @Override
        public void onMessage(Message message) {
            ...
        }
    }
    

    Example of Message Driven Beans to receive messages asynchronously, xml defined queue:

    Spring.XML Expand source

           <message-driven>
                <description>Message-Driven configured by using XML.</description>
                <display-name>MDBFilTraitementAsyn</display-name>
                <ejb-name>MDBFilTraitementAsyn</ejb-name>
                <ejb-class>fr.mi.siv.mti.cip.trait.core.fil.mdb.MDBFilTraitementAsyn</ejb-class>
                <message-destination-type>javax.jms.Queue</message-destination-type>
                <activation-config>
                    <activation-config-property>
                        <activation-config-property-name>destination</activation-config-property-name>
                        <activation-config-property-value>queueTraitRequeteASyn</activation-config-property-value>
                    </activation-config-property>
                    <activation-config-property>
                        <activation-config-property-name>destinationType</activation-config-property-name>
                        <activation-config-property-value>javax.jms.Queue</activation-config-property-value>
                    </activation-config-property>
                </activation-config>
            </message-driven>
    

    Application. Properties Expand source

    public class MDBFilTraitementAsyn implements MessageListener {
        public void onMessage(final Message message)
        {
            ...
        }
    }
    

    Example of Message-Driven Beans to receive messages asynchronously:

    Application. Properties

    mq.myDestination=QUEUE_NAME
    

    Spring.XML Expand source

    <bean id="jmsQueueListener" class="hu.vanio.jms.spring3.ibmmq.JmsQueueListener" />
    
        <!-- and this is the message listener container -->
        <jms:listener-container connection-factory="jmsQueueConnectionFactory">
            <jms:listener destination="${mq.myDestination}" ref="jmsQueueListener" />
        </jms:listener-container>
    

     Expand source

    public class JmsQueueListener implements MessageListener {
    
        public void onMessage(Message message) {
            ...
        }
    
    }
    

    Example of Message Driven Beans with weblogic:

    weblogic-ejb-jar.xml

    Spring.XML Expand source

    <?xml version="1.0" encoding="UTF-8"?>
    <wls:weblogic-ejb-jar xmlns:wls="http://xmlns.oracle.com/weblogic/weblogic-ejb-jar" 
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
        xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/ejb-jar_3_0.xsd http://xmlns.oracle.com/weblogic/weblogic-ejb-jar http://xmlns.oracle.com/weblogic/weblogic-ejb-jar/1.2/weblogic-ejb-jar.xsd">
        <wls:weblogic-enterprise-bean>
            <wls:ejb-name>NotifieMDB</wls:ejb-name>
            <wls:message-driven-descriptor>
                <wls:pool>
                    <wls:max-beans-in-free-pool>15</wls:max-beans-in-free-pool>
                    <wls:initial-beans-in-free-pool>15</wls:initial-beans-in-free-pool>
                </wls:pool>
                <wls:destination-jndi-name>Notification_Queue</wls:destination-jndi-name>
            </wls:message-driven-descriptor>
            <wls:enable-call-by-reference>true</wls:enable-call-by-reference>
            <wls:dispatch-policy>IFT.notification</wls:dispatch-policy>
        </wls:weblogic-enterprise-bean>
    </wls:weblogic-ejb-jar>
    

    ejb-jar.xml

    Spring.XML Expand source

    <?xml version="1.0" encoding="UTF-8"?>
    <ejb-jar xmlns="http://java.sun.com/xml/ns/javaee"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/ejb-jar_3_1.xsd"
             version="3.0">
        <enterprise-beans>
            <message-driven>
                <ejb-name>NotifieMDB</ejb-name>
                <ejb-class>com.notification.mdb.NotifieMDB</ejb-class>
                <transaction-type>Bean</transaction-type>
                <message-destination-type>javax.jms.Queue</message-destination-type>
            </message-driven>
        </enterprise-beans>
    </ejb-jar>
    

     Expand source

    import javax.jms.Message;
    import javax.jms.ObjectMessage;
    public class NotifieMDB
    {
        public void onMessage(Message msg)
        {
            ...
        }
    }
    

    In order to recognize that Message Driven Bean is analyzed, the created objects have the properties CAST_RabbitMQ_Queue.exchangeName for topic and CAST_MQE_QueueCall.messengingSystem for queue set to MessageDrivenBean value.

    JMS with JMSContext

    JMSContext is the main interface in the simplified JMS API which combines in a single object Connection and Session.

     Expand source

    import javax.jms.JMSConsumer;
    import javax.jms.JMSContext;
    import javax.jms.Queue;
    import javax.jms.Topic;
    ...
    public class Vendor {
    
        @Resource(lookup = "java:comp/DefaultJMSConnectionFactory")
        private static ConnectionFactory connectionFactory;
        @Resource(lookup = "jms/AQueue")
        private static Queue vendorOrderQueue;
        @Resource(lookup = "jms/CQueue")
        private static Queue vendorConfirmQueue;
        @Resource(lookup = "jms/OTopic")
        private static Topic supplierOrderTopic;
        static Random rgen = new Random();
        static int throwException = 1;
    
        public static void main(String[] args) {
            JMSConsumer vendorOrderReceiver;
            MapMessage orderMessage;
            JMSConsumer vendorConfirmReceiver;
            VendorMessageListener listener;
            Message inMessage;
            MapMessage vendorOrderMessage;
            Message endOfMessageStream;
            Order order;
            int quantity;
            ...
            try (JMSContext context =
                    connectionFactory.createContext(JMSContext.SESSION_TRANSACTED);
                    JMSContext asyncContext =
                    context.createContext(JMSContext.SESSION_TRANSACTED);) {
                vendorOrderReceiver = context.createConsumer(vendorOrderQueue);
                orderMessage = context.createMapMessage();
                vendorConfirmReceiver = asyncContext.createConsumer(
                        vendorConfirmQueue);
                listener = new VendorMessageListener(asyncContext, 2);
                vendorConfirmReceiver.setMessageListener(listener);
                while (true) {
                    try {
                        inMessage = vendorOrderReceiver.receive();
    
                        if (inMessage instanceof MapMessage) {
                            vendorOrderMessage = (MapMessage) inMessage;
                        } else {
                            endOfMessageStream = context.createMessage();
                            endOfMessageStream.setJMSReplyTo(
                                    vendorConfirmQueue);
                            context.createProducer().send(supplierOrderTopic,
                                    endOfMessageStream);
                            context.commit();
    
                            break;
                        }
                        if (rgen.nextInt(4) == throwException) {
                            throw new JMSException(
                                    "Simulated database concurrent access "
                                    + "exception");
                        }
                        order = new Order(vendorOrderMessage);
                        orderMessage.setInt(
                                "VendorOrderNumber",
                                order.orderNumber);
                        orderMessage.setJMSReplyTo(vendorConfirmQueue);
                        quantity = vendorOrderMessage.getInt("Quantity");
                        System.out.println(
                                "Vendor: Retailer ordered " + quantity
                                + " " + vendorOrderMessage.getString("Item"));
                        orderMessage.setString("Item", "");
                        orderMessage.setInt("Quantity", quantity);
                        context.createProducer().send(supplierOrderTopic,
                                orderMessage);
                        System.out.println(
                                "Vendor: Ordered " + quantity
                                + " CPU(s) and hard drive(s)");
                        context.commit();
                        System.out.println(
                                "  Vendor: Committed transaction 1");
                    } catch (JMSException e) {
                        System.err.println(
                                "Vendor: JMSException occurred: "
                                + e.toString());
                        context.rollback();
                        System.err.println(
                                "  Vendor: Rolled back transaction 1");
                    }
                }
                listener.monitor.waitTillDone();
            } catch (JMSRuntimeException e) {
                System.err.println(
                        "Vendor: Exception occurred: " + e.toString());
            }
        }
    }
    

    Result of Queue creation:

    Result of Topic creation:

    JMS with AWS-SQS

    SQSConnection class extends javax.jms.Connection. It can be used together with the JMS standard connection methods in order to create new queues.

     Expand source

    import com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper;
    import com.amazon.sqs.javamessaging.SQSConnection;
    import com.amazon.sqs.javamessaging.SQSConnectionFactory;
    import com.amazon.sqs.javamessaging.ProviderConfiguration;
    import com.amazonaws.auth.AWSCredentials;
    import com.amazonaws.auth.AWSStaticCredentialsProvider;
    import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
    import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration;
    import javax.jms.*;
    
    public class App
    {
        private static String queueName = "ymq_jms_example";
        public static void main( String[] args ) throws JMSException
        {
            SQSConnectionFactory connectionFactory = new SQSConnectionFactory(
                    new ProviderConfiguration(),
                    AmazonSQSClientBuilder.standard()
                            .withRegion("ru-central1")
                            .withEndpointConfiguration(new EndpointConfiguration(
                                "https://message-queue.api.cloud.yandex.net",
                                "ru-central1"
                            ))
            );
    
            SQSConnection connection = connectionFactory.createConnection();
            AmazonSQSMessagingClientWrapper client = connection.getWrappedAmazonSQSClient();
            if( !client.queueExists(queueName) ) {
                client.createQueue( queueName );
            }
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Queue queue = session.createQueue(queueName);
            MessageProducer producer = session.createProducer(queue);
            Message message = session.createTextMessage("test message");
            producer.send(message);
        }
    }
    

    AWS-SQS

    Supported APIs:

    Producer Consumer

    com.amazonaws.services.sqs.AmazonSQS.sendMessage

    com.amazonaws.services.sqs.AmazonSQSClient.sendMessage

    com.amazonaws.services.sqs.AmazonSQS.sendMessageBatch

    com.amazonaws.services.sqs.AmazonSQSAsync.sendMessageBatchAsync

    com.amazonaws.services.sqs.AmazonSQSAsync.sendMessageAsync

    software.amazon.awssdk.services.sqs.SqsClient.sendMessage

    software.amazon.awssdk.services.sqs.SqsClient.sendMessageBatch

    com.amazonaws.services.sqs.AmazonSQS.receiveMessage

    com.amazonaws.services.sqs.AmazonSQSClient.receiveMessage

    com.amazonaws.services.sqs.AmazonSQSAsync.receiveMessageAsync

    software.amazon.awssdk.services.sqs.SqsClient.receiveMessage

    AWS-SQS with sendMessage and receiveMessage APIs

     Expand source

    public class AWSResources {
    
        public static final String SQS_QUEUE_NAME = "reinvent-memes";
    }
    

     Expand source

    import static com.amazonaws.memes.AWSResources.SQS_QUEUE_NAME
    public class MemeUtils {
    
        public ImageMacro submitJob(String topCaption, String bottomCaption, String imageKey, String createdBy) {
            String queueUrl = SQS.getQueueUrl(new GetQueueUrlRequest(SQS_QUEUE_NAME)).getQueueUrl();
            SQS.sendMessage(new SendMessageRequest(queueUrl, macro.getId()));  
        }
    }
    

     Expand source

    import static com.amazonaws.memes.AWSResources.SQS_QUEUE_NAME
    
    public void run() {
            System.out.println("MemeWorker listening for work");
            String queueUrl = SQS.getQueueUrl(new GetQueueUrlRequest(SQS_QUEUE_NAME)).getQueueUrl();
            
            while (true) {
                try {
                    ReceiveMessageResult result = SQS.receiveMessage(
                            new ReceiveMessageRequest(queueUrl).withMaxNumberOfMessages(1));
                    for (Message msg : result.getMessages()) {
                        executorService.submit(new MessageProcessor(queueUrl, msg));
                    }
                    sleep(1000);
                } catch (InterruptedException e) {
                    Thread.interrupted();
                    throw new RuntimeException("Worker interrupted");
                } catch (Exception e) {
                    // ignore and retry
                }
            }
    }
    

    AWS-SQS with sendMessage and receiveMessage APIs

    AWS-SQS with sendMessage API; cross technologies linking with JmsListener API

     Expand source

        private static final String DEFAULT_QUEUE_NAME = "test-sdk";
    
        @RequestMapping(
                path = "/sqs/message",
                method = RequestMethod.POST,
        consumes = MediaType.APPLICATION_JSON_VALUE)
        public HttpEntity addMessage(@RequestBody  @Valid SimpleMessage simpleMessage){
    
            AmazonSQS sqs = AmazonSQSClientBuilder.defaultClient();
            GetQueueUrlResult getQueueUrlResult = sqs.getQueueUrl(DEFAULT_QUEUE_NAME);
            String queueUrl = getQueueUrlResult.getQueueUrl();
    
            SendMessageRequest sendMessageRequest = new SendMessageRequest();
            sendMessageRequest.setQueueUrl(queueUrl);
            sendMessageRequest.setMessageBody(simpleMessage.getMessage());
    
            SendMessageResult messageResult = sqs.sendMessage(sendMessageRequest);
    
            return new ResponseEntity(messageResult, HttpStatus.CREATED);
        }
    

    AWS-SQS with sendMessage API

    Apache Kafka Patterns

    KafkaProducer send() API example

     Expand source

    public class Example {
        public static void main(String[] args) {
            private Properties kafkaProps = new Properties();
            kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092");
            kafkaProps.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); // basic serializer class for key
            kafkaProps.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); // basic serializer class for value
             
            KafkaProducer producer = new KafkaProducer<String, String>(kafkaProps);
            ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products","France");
            try {
                producer.send(record);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    

    KafkaConsumer subscribe() API example

     Expand source

    public class Example {
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "broker1:9092,broker2:9092");
            props.put("group.id", "CountryCounter");
            props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
             
            KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(props);
            consumer.subscribe(Collections.singletonList("customerCountries"));
        }
    }
    

    Spring Kafka

    @KafkaListener annotation on method example

     Expand source

    @KafkaListener(topics = "${topic.name}", id="id")
    public void listen(@Payload String message,
                           @Header(KafkaHeaders.OFFSET) int offset,
                           @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition){
        ...
    }
    

    Topic value is present in properties file:

    topic.name=KAFKA_TOPIC
    

    @KafkaListener annotation on method example (with list of topics)

     Expand source

    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Service;
    
    @Service
    public class KafkaConsumerService {
    
        @KafkaListener(topics = {"${kafka.event.contracting.topic}", "${kafka.legacyConsumerTopic}"})
        public void listen(String message) {
            ...
        }
    }
    

    Topic values are present in properties file:

    kafka.event.contracting.topic={kafka.event.contracting.topic}
    kafka.legacyConsumerTopic={kafka.legacyConsumerTopic}
    

    @KafkaListener annotation on class with @KafkaHandler annotation example

     Expand source

    @Service
    @KafkaListener(topics = "${topic.name}")
    public class KafkaConsumerService {
    
        @KafkaHandler
        public void listen(String message) {
        
        }
    }
    

    Topic value is present in properties file:

    topic.name=KAFKA_TOPIC
    

    @KafkaListener and @SendTo annotations on method example

     Expand source

    public class Receiver {
    
        private static final Logger LOG = LoggerFactory.getLogger(Receiver.class);
    
        @SendTo(BAR_TOPIC)
        @KafkaListener(topics = FOO_TOPIC)
        public Double calculate(Double data) {
            LOG.info("calculating square root from='{}'", data);
            return Math.sqrt(data);
        }
    
        @KafkaListener(topics = BAR_TOPIC)
        public void result(Double data) {
            LOG.info("received square root='{}'", data);
        }
    
    }
    

     Expand source

    @Service
    public class Sender {
    
        private static final Logger LOG = LoggerFactory.getLogger(Sender.class);
    
        @Autowired
        private KafkaTemplate<String, Double> kafkaTemplate;
    
        public void send(Double data){
            LOG.info("sending data='{}' to topic='{}'", data, FOO_TOPIC);
            kafkaTemplate.send(FOO_TOPIC, data);
        }
    }
    

     Expand source

    public class Constants {
    
        public static final String FOO_TOPIC = "foo.t";
        public static final String BAR_TOPIC = "bar.t";
    
    }
    

    There are cases where @SendTo annotation has no value. This means that the default value is used: KafkaHeaders.REPLY_TOPIC. In this case we don’t create any object.

    @KafkaListener meta annotation example

     Expand source

    @Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    @KafkaListener
    public @interface MyListener{
    
        @AliasFor(annotation = KafkaListener.class, attribute = "id")
        String id();
    
        @AliasFor(annotation = KafkaListener.class, attribute = "groupId")
        String groupId() default "";
    
        @AliasFor(annotation = KafkaListener.class, attribute = "topics")
        String[] value() default {};
    
        @AliasFor(annotation = KafkaListener.class, attribute = "concurrency")
        String concurrency() default "3";
    }
    

     Expand source

    @SpringBootApplication
    public class Application {
    
        public static void main(String[] args) {
            SpringApplication.run(Application.class, args);
        }
    
        @MyListener(id = "my.group", topics = "${topic.name}")
        public void listen(String in) {
        
        }
    
    }
    

    Topic value is present in properties file:

    topic.name=KAFKA_TOPIC
    

    KafkaTemplate send API example

     Expand source

    @Value("${kafka.event.topic}")
    private String topic;
     
    @Override
    public void send(Map<String, String> requestMap) {
        String message = constructKafkaMessage(requestMap);
        kafkaTemplate.send(topic, message);
    }
    

    Topic value is present in properties file:

    kafka.event.topic={kafka.event.topic}
    

    ReplyingKafkaTemplate sendAndReceive API example

     Expand source

        @Bean
        public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
            return args -> {
                ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");
                RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
                SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
                System.out.println("Sent ok: " + sendResult.getRecordMetadata());
                ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
                System.out.println("Return value: " + consumerRecord.value());
            };
        }
    

    ContainerProperties setMessageListener API example (with KafkaMessageListenerContainer)

     Expand source

        @Bean
        public KafkaMessageListenerContainer<?, ?> container(ConsumerFactory<?, ?> consumerFactory) {
            ContainerProperties props = new ContainerProperties("perf");
            Map<String, Object> configs = new HashMap<>(
                    ((DefaultKafkaConsumerFactory<?, ?>) consumerFactory).getConfigurationProperties());
            configs.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 2 * 1024 * 1024);
            configs.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1024 * 1024);
            configs.put(ConsumerConfig.CHECK_CRCS_CONFIG, false);
            props.setPollTimeout(100);
            props.setConsumerRebalanceListener(new RebalanceListener());
            Listener messageListener = new Listener();
            props.setMessageListener(messageListener);
            KafkaMessageListenerContainer<Object, Object> container = new KafkaMessageListenerContainer<>(
                    new DefaultKafkaConsumerFactory<>(configs), props);
            messageListener.setContainer(container);
            return container;
        }
    

    Messaging service type

    For JMS, ActiveMQ, KAFKA and MessageDrivenBean, a JMS object is created, decorated with a property which specifies the vendor. Below is a table listing these properties and their values:

    Vendor Queue/Topic Property name Property value
    JMS


    Queue Publisher CAST_MQE_QueueCall.messengingSystem JMS
    Queue Receiver CAST_MQE_QueueReceive.messengingSystem JMS
    Topic Publisher CAST_RabbitMQ_Exchange.exchangeName JMS
    Topic Receiver CAST_RabbitMQ_Queue.exchangeName JMS
    ActiveMQ


    Queue Publisher CAST_MQE_QueueCall.messengingSystem ActiveMQ
    Queue Receiver CAST_MQE_QueueReceive.messengingSystem ActiveMQ
    Topic Publisher CAST_RabbitMQ_Exchange.exchangeName ActiveMQ
    Topic Receiver CAST_RabbitMQ_Queue.exchangeName ActiveMQ
    KAFKA Topic Publisher

    CAST_RabbitMQ_Exchange.exchangeName

    KAFKA
    Topic Receiver CAST_RabbitMQ_Queue.exchangeName KAFKA
    MessageDrivenBean Queue Receiver CAST_MQE_QueueReceive.messengingSystem MessageDrivenBean
    Topic Receiver CAST_RabbitMQ_Queue.exchangeName MessageDrivenBean

    For IBMMQ, RabbitMQ and AWS-SQS, specific objects are created.

    Limitations

    The following cases are not handled:

    • When the queue name is given at the runtime i.e. when Queue name is not initialized anywhere in the code and is given dynamically during the session/connection