Chapter 13. Intercepting Messages
With AMQ Broker you can intercept packets entering or exiting the broker, allowing you to audit packets or filter messages. Interceptors can change the packets they intercept, which makes them powerful, but also potentially dangerous.
You can develop interceptors to meet your business requirements. Interceptors are protocol specific and must implement the appropriate interface.
Interceptors must implement the intercept()
method, which returns a boolean value. If the value is true
, the message packet continues onward. If false
, the process is aborted, no other interceptors are called, and the message packet is not processed further.
13.1. Creating Interceptors
You can create your own incoming and outgoing interceptors. All interceptors are protocol specific and are called for any packet entering or exiting the server respectively. This allows you to create interceptors to meet business requirements such as auditing packets. Interceptors can change the packets they intercept. This makes them powerful as well as potentially dangerous, so be sure to use them with caution.
Interceptors and their dependencies must be placed in the Java classpath of the broker. You can use the BROKER_INSTANCE_DIR/lib
directory since it is part of the classpath by default.
Procedure
The following examples demonstrate how to create an interceptor that checks the size of each packet passed to it. Note that the examples implement a specific interface for each protocol.
Implement the appropriate interface and override its
intercept()
method.If you are using the AMQP protocol, implement the
org.apache.activemq.artemis.protocol.amqp.broker.AmqpInterceptor
interface.package com.example; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; import org.apache.activemq.artemis.protocol.amqp.broker.AmqpInterceptor; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; public class MyInterceptor implements AmqpInterceptor { private final int ACCEPTABLE_SIZE = 1024; @Override public boolean intercept(final AMQPMessage message, RemotingConnection connection) { int size = message.getEncodeSize(); if (size <= ACCEPTABLE_SIZE) { System.out.println("This AMQPMessage has an acceptable size."); return true; } return false; } }
If you are using Core Protocol, your interceptor must implement the
org.apache.artemis.activemq.api.core.Interceptor
interface.package com.example; import org.apache.artemis.activemq.api.core.Interceptor; import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; public class MyInterceptor implements Interceptor { private final int ACCEPTABLE_SIZE = 1024; @Override boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException { int size = packet.getPacketSize(); if (size <= ACCEPTABLE_SIZE) { System.out.println("This Packet has an acceptable size."); return true; } return false; } }
If you are using the MQTT protocol, implement the
org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor
interface.package com.example; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor; import io.netty.handler.codec.mqtt.MqttMessage; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; public class MyInterceptor implements Interceptor { private final int ACCEPTABLE_SIZE = 1024; @Override boolean intercept(MqttMessage mqttMessage, RemotingConnection connection) throws ActiveMQException { byte[] msg = (mqttMessage.toString()).getBytes(); int size = msg.length; if (size <= ACCEPTABLE_SIZE) { System.out.println("This MqttMessage has an acceptable size."); return true; } return false; } }
If you are using the STOMP protocol, implement the
org.apache.activemq.artemis.core.protocol.stomp.StompFrameInterceptor
interface.package com.example; import org.apache.activemq.artemis.core.protocol.stomp.StompFrameInterceptor; import org.apache.activemq.artemis.core.protocol.stomp.StompFrame; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; public class MyInterceptor implements Interceptor { private final int ACCEPTABLE_SIZE = 1024; @Override boolean intercept(StompFrame stompFrame, RemotingConnection connection) throws ActiveMQException { int size = stompFrame.getEncodedSize(); if (size <= ACCEPTABLE_SIZE) { System.out.println("This StompFrame has an acceptable size."); return true; } return false; } }
13.2. Configuring the Broker to Use Interceptors
Once you have created an interceptor, you must configure the broker to use it.
Prerequisites
You must create an interceptor class and add it (and its dependencies) to the Java classpath of the broker before you can configure it for use by the broker. You can use the BROKER_INSTANCE_DIR/lib
directory since it is part of the classpath by default.
Procedure
Configure the broker to use an interceptor by adding configuration to
BROKER_INSTANCE_DIR/etc/broker.xml
If your interceptor is intended for incoming messages, add its
class-name
to the list ofremoting-incoming-interceptors
.<configuration> <core> ... <remoting-incoming-interceptors> <class-name>org.example.MyIncomingInterceptor</class-name> </remoting-incoming-interceptors> ... </core> </configuration>
If your interceptor is intended for outgoing messages, add its
class-name
to the list ofremoting-outgoing-interceptors
.<configuration> <core> ... <remoting-outgoing-interceptors> <class-name>org.example.MyOutgoingInterceptor</class-name> </remoting-outgoing-interceptors> </core> </configuration>
13.3. Interceptors on the Client Side
Clients can use interceptors to intercept packets either sent by the client to the server or by the server to the client. As in the case of a broker-side interceptor, if it returns false
, no other interceptors are called and the client does not process the packet further. This process happens transparently to the client except when an outgoing packet is sent in a blocking
fashion. In those cases, an ActiveMQException
is thrown to the caller because blocking sends provides reliability. The ActiveMQException
thrown contains the name of the interceptor that returned false.
As on the server, the client interceptor classes and their dependencies must be added to the Java classpath of the client to be properly instantiated and invoked.