3.5. Auditing messages by intercepting packets
Intercept packets entering or exiting the broker, to audit packets or filter messages. Interceptors change the packets that they intercept. This makes interceptors powerful, but also potentially dangerous.
Develop interceptors to meet your business requirements. Interceptors are protocol specific and must implement the appropriate interface.
Interceptors must implement the intercept() method, which returns a boolean value. If the value is true, the message packet continues onward. If false, the process is aborted, no other interceptors are called, and the message packet is not processed further.
3.5.1. Creating interceptors 复制链接链接已复制到粘贴板!
Interceptors can change the packets they intercept. You can create your own incoming and outgoing interceptors. All interceptors are protocol specific and are called for any packet entering or exiting the server respectively. This allows you to create interceptors to meet business requirements such as auditing packets.
Interceptors and their dependencies must be placed in the Java classpath of the broker. You can use the <broker_instance_dir>/lib directory because it is part of the classpath by default.
The following examples demonstrate how to create an interceptor that checks the size of each packet passed to it.
The examples implement a specific interface for each protocol.
Procedure
Implement the appropriate interface and override its
intercept()method.If you are using the AMQP protocol, implement the
org.apache.activemq.artemis.protocol.amqp.broker.AmqpInterceptorinterface.package com.example; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; import org.apache.activemq.artemis.protocol.amqp.broker.AmqpInterceptor; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; public class MyInterceptor implements AmqpInterceptor { private final int ACCEPTABLE_SIZE = 1024; @Override public boolean intercept(final AMQPMessage message, RemotingConnection connection) { int size = message.getEncodeSize(); if (size <= ACCEPTABLE_SIZE) { System.out.println("This AMQPMessage has an acceptable size."); return true; } return false; } }If you are using Core Protocol, your interceptor must implement the
org.apache.artemis.activemq.api.core.Interceptorinterface.package com.example; import org.apache.artemis.activemq.api.core.Interceptor; import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; public class MyInterceptor implements Interceptor { private final int ACCEPTABLE_SIZE = 1024; @Override boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException { int size = packet.getPacketSize(); if (size <= ACCEPTABLE_SIZE) { System.out.println("This Packet has an acceptable size."); return true; } return false; } }If you are using the MQTT protocol, implement the
org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptorinterface.package com.example; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor; import io.netty.handler.codec.mqtt.MqttMessage; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; public class MyInterceptor implements Interceptor { private final int ACCEPTABLE_SIZE = 1024; @Override boolean intercept(MqttMessage mqttMessage, RemotingConnection connection) throws ActiveMQException { byte[] msg = (mqttMessage.toString()).getBytes(); int size = msg.length; if (size <= ACCEPTABLE_SIZE) { System.out.println("This MqttMessage has an acceptable size."); return true; } return false; } }If you are using the STOMP protocol, implement the
org.apache.activemq.artemis.core.protocol.stomp.StompFrameInterceptorinterface.package com.example; import org.apache.activemq.artemis.core.protocol.stomp.StompFrameInterceptor; import org.apache.activemq.artemis.core.protocol.stomp.StompFrame; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; public class MyInterceptor implements Interceptor { private final int ACCEPTABLE_SIZE = 1024; @Override boolean intercept(StompFrame stompFrame, RemotingConnection connection) throws ActiveMQException { int size = stompFrame.getEncodedSize(); if (size <= ACCEPTABLE_SIZE) { System.out.println("This StompFrame has an acceptable size."); return true; } return false; } }
3.5.2. Configuring the broker to use interceptors 复制链接链接已复制到粘贴板!
Prerequisites
-
Create an interceptor class and add it (and its dependencies) to the Java classpath of the broker. You can use the
<broker_instance_dir>/libdirectory since it is part of the classpath by default.
Procedure
-
Open
<broker_instance_dir>/etc/broker.xml Configure the broker to use an interceptor by adding configuration to
<broker_instance_dir>/etc/broker.xmlIf the interceptor is intended for incoming messages, add its
class-nameto the list ofremoting-incoming-interceptors.<configuration> <core> ... <remoting-incoming-interceptors> <class-name>org.example.MyIncomingInterceptor</class-name> </remoting-incoming-interceptors> ... </core> </configuration>If the interceptor is intended for outgoing messages, add its
class-nameto the list ofremoting-outgoing-interceptors.<configuration> <core> ... <remoting-outgoing-interceptors> <class-name>org.example.MyOutgoingInterceptor</class-name> </remoting-outgoing-interceptors> </core> </configuration>
3.5.3. Interceptors on the client side 复制链接链接已复制到粘贴板!
Clients can use interceptors to intercept packets either sent by the client to the server or by the server to the client. If the broker-side interceptor returns a false value, then no other interceptors are called and the client does not process the packet further. This process happens transparently, unless an outgoing packet is sent in a blocking fashion. In this case, an ActiveMQException is thrown to the caller. The ActiveMQException thrown contains the name of the interceptor that returned the false value.
On the server, the client interceptor classes and their dependencies must be added to the Java classpath of the client, to be properly instantiated and invoked.