242.8. 自定义管道
自定义频道管道通过插入自定义处理程序、编码器和解码器,而无需以非常简单的方式在 Netty Endpoint URL 中指定它们,从而对用户提供完全的控制。
要添加自定义管道,必须通过上下文 registry (JNDIRegistry 或 camel-spring ApplicationContextRegistry 等)创建并注册自定义频道管道工厂。
自定义管道工厂必须构建如下
-
Producer 链接的频道管道工厂必须扩展 abstract 类
ClientPipelineFactory
。 -
消费者链接的频道管道工厂必须扩展抽象类
ServerInitializerFactory
。 -
类应覆盖 initChannel ()方法,以插入自定义处理程序、编码器和解码器。不覆盖
initChannel
() 方法创建没有处理程序、编码器或解码器到管道的管道。
以下示例显示如何创建 ServerInitializerFactory 工厂
242.8.1. 使用自定义管道工厂 复制链接链接已复制到粘贴板!
复制链接链接已复制到粘贴板!
import io.netty.channel.Channel; import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.Delimiters; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil; import org.apache.camel.component.netty4.NettyConsumer; import org.apache.camel.component.netty4.ServerInitializerFactory; import org.apache.camel.component.netty4.handlers.ServerChannelHandler; public class SampleServerInitializerFactory extends ServerInitializerFactory { private int maxLineSize = 1024; NettyConsumer consumer; public SampleServerInitializerFactory(NettyConsumer consumer) { this.consumer = consumer; } @Override public ServerInitializerFactory createPipelineFactory(NettyConsumer consumer) { return new SampleServerInitializerFactory(consumer); } @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline channelPipeline = channel.pipeline(); channelPipeline.addLast("encoder-SD", new StringEncoder(CharsetUtil.UTF_8)); channelPipeline.addLast("decoder-DELIM", new DelimiterBasedFrameDecoder(maxLineSize, true, Delimiters.lineDelimiter())); channelPipeline.addLast("decoder-SD", new StringDecoder(CharsetUtil.UTF_8)); // here we add the default Camel ServerChannelHandler for the consumer, to allow Camel to route the message etc. channelPipeline.addLast("handler", new ServerChannelHandler(consumer)); } }
import io.netty.channel.Channel;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
import org.apache.camel.component.netty4.NettyConsumer;
import org.apache.camel.component.netty4.ServerInitializerFactory;
import org.apache.camel.component.netty4.handlers.ServerChannelHandler;
public class SampleServerInitializerFactory extends ServerInitializerFactory {
private int maxLineSize = 1024;
NettyConsumer consumer;
public SampleServerInitializerFactory(NettyConsumer consumer) {
this.consumer = consumer;
}
@Override
public ServerInitializerFactory createPipelineFactory(NettyConsumer consumer) {
return new SampleServerInitializerFactory(consumer);
}
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline channelPipeline = channel.pipeline();
channelPipeline.addLast("encoder-SD", new StringEncoder(CharsetUtil.UTF_8));
channelPipeline.addLast("decoder-DELIM", new DelimiterBasedFrameDecoder(maxLineSize, true, Delimiters.lineDelimiter()));
channelPipeline.addLast("decoder-SD", new StringDecoder(CharsetUtil.UTF_8));
// here we add the default Camel ServerChannelHandler for the consumer, to allow Camel to route the message etc.
channelPipeline.addLast("handler", new ServerChannelHandler(consumer));
}
}
然后,自定义频道管道工厂可以添加到 registry 中,并使用以下方法在 camel 路由上实例化/使用
Registry registry = camelContext.getRegistry(); ServerInitializerFactory factory = new TestServerInitializerFactory(nettyConsumer); registry.bind("spf", factory); context.addRoutes(new RouteBuilder() { public void configure() { String netty_ssl_endpoint = "netty4:tcp://0.0.0.0:5150?serverInitializerFactory=#spf" String return_string = "When You Go Home, Tell Them Of Us And Say," + "For Your Tomorrow, We Gave Our Today."; from(netty_ssl_endpoint) .process(new Processor() { public void process(Exchange exchange) throws Exception { exchange.getOut().setBody(return_string); } } } });
Registry registry = camelContext.getRegistry();
ServerInitializerFactory factory = new TestServerInitializerFactory(nettyConsumer);
registry.bind("spf", factory);
context.addRoutes(new RouteBuilder() {
public void configure() {
String netty_ssl_endpoint =
"netty4:tcp://0.0.0.0:5150?serverInitializerFactory=#spf"
String return_string =
"When You Go Home, Tell Them Of Us And Say,"
+ "For Your Tomorrow, We Gave Our Today.";
from(netty_ssl_endpoint)
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
exchange.getOut().setBody(return_string);
}
}
}
});