16.2. 扩展 KIE 服务器以使用自定义数据传输
默认情况下,KIE 服务器扩展通过 REST 或 JMS 数据传输提供。您可以扩展 KIE 服务器以支持自定义数据传输,以根据您的业务需求调整 KIE 服务器传输协议。
例如,这个步骤将自定义数据传输添加到使用 Drools 扩展的 KIE 服务器中,该扩展基于 Apache MINA,它是一个开源 Java 网络应用程序框架。示例自定义 MINA 传输基于字符串的数据,这些数据依赖于现有的 marshalling 操作,且只支持 JSON 格式。
流程
创建一个空的 Maven 项目,并在项目的
pom.xml文件中定义以下打包类型和依赖项:示例项目中的 pom.xml 文件示例
<packaging>jar</packaging> <properties> <version.org.kie>7.67.0.Final-redhat-00019</version.org.kie> </properties> <dependencies> <dependency> <groupId>org.kie</groupId> <artifactId>kie-api</artifactId> <version>${version.org.kie}</version> </dependency> <dependency> <groupId>org.kie</groupId> <artifactId>kie-internal</artifactId> <version>${version.org.kie}</version> </dependency> <dependency> <groupId>org.kie.server</groupId> <artifactId>kie-server-api</artifactId> <version>${version.org.kie}</version> </dependency> <dependency> <groupId>org.kie.server</groupId> <artifactId>kie-server-services-common</artifactId> <version>${version.org.kie}</version> </dependency> <dependency> <groupId>org.kie.server</groupId> <artifactId>kie-server-services-drools</artifactId> <version>${version.org.kie}</version> </dependency> <dependency> <groupId>org.drools</groupId> <artifactId>drools-core</artifactId> <version>${version.org.kie}</version> </dependency> <dependency> <groupId>org.drools</groupId> <artifactId>drools-compiler</artifactId> <version>${version.org.kie}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>org.apache.mina</groupId> <artifactId>mina-core</artifactId> <version>2.1.3</version> </dependency> </dependencies>在项目中的 Java 类中实施
org.kie.server.services.api.KieServerExtension接口,如下例所示:KieServerExtension接口的实现示例public class MinaDroolsKieServerExtension implements KieServerExtension { private static final Logger logger = LoggerFactory.getLogger(MinaDroolsKieServerExtension.class); public static final String EXTENSION_NAME = "Drools-Mina"; private static final Boolean disabled = Boolean.parseBoolean(System.getProperty("org.kie.server.drools-mina.ext.disabled", "false")); private static final String MINA_HOST = System.getProperty("org.kie.server.drools-mina.ext.port", "localhost"); private static final int MINA_PORT = Integer.parseInt(System.getProperty("org.kie.server.drools-mina.ext.port", "9123")); // Taken from dependency on the `Drools` extension: private KieContainerCommandService batchCommandService; // Specific to MINA: private IoAcceptor acceptor; public boolean isActive() { return disabled == false; } public void init(KieServerImpl kieServer, KieServerRegistry registry) { KieServerExtension droolsExtension = registry.getServerExtension("Drools"); if (droolsExtension == null) { logger.warn("No Drools extension available, quitting..."); return; } List<Object> droolsServices = droolsExtension.getServices(); for( Object object : droolsServices ) { // If the given service is null (not configured), continue to the next service: if (object == null) { continue; } if( KieContainerCommandService.class.isAssignableFrom(object.getClass()) ) { batchCommandService = (KieContainerCommandService) object; continue; } } if (batchCommandService != null) { acceptor = new NioSocketAcceptor(); acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter( new TextLineCodecFactory( Charset.forName( "UTF-8" )))); acceptor.setHandler( new TextBasedIoHandlerAdapter(batchCommandService) ); acceptor.getSessionConfig().setReadBufferSize( 2048 ); acceptor.getSessionConfig().setIdleTime( IdleStatus.BOTH_IDLE, 10 ); try { acceptor.bind( new InetSocketAddress(MINA_HOST, MINA_PORT) ); logger.info("{} -- Mina server started at {} and port {}", toString(), MINA_HOST, MINA_PORT); } catch (IOException e) { logger.error("Unable to start Mina acceptor due to {}", e.getMessage(), e); } } } public void destroy(KieServerImpl kieServer, KieServerRegistry registry) { if (acceptor != null) { acceptor.dispose(); acceptor = null; } logger.info("{} -- Mina server stopped", toString()); } public void createContainer(String id, KieContainerInstance kieContainerInstance, Map<String, Object> parameters) { // Empty, already handled by the `Drools` extension } public void disposeContainer(String id, KieContainerInstance kieContainerInstance, Map<String, Object> parameters) { // Empty, already handled by the `Drools` extension } public List<Object> getAppComponents(SupportedTransports type) { // Nothing for supported transports (REST or JMS) return Collections.emptyList(); } public <T> T getAppComponents(Class<T> serviceType) { return null; } public String getImplementedCapability() { return "BRM-Mina"; } public List<Object> getServices() { return Collections.emptyList(); } public String getExtensionName() { return EXTENSION_NAME; } public Integer getStartOrder() { return 20; } @Override public String toString() { return EXTENSION_NAME + " KIE Server extension"; } }KieServerExtension接口是 KIE 服务器可以用来为新的 MINA 传输提供额外功能的主扩展接口。接口由以下组件组成:KieServerExtension接口概述public interface KieServerExtension { boolean isActive(); void init(KieServerImpl kieServer, KieServerRegistry registry); void destroy(KieServerImpl kieServer, KieServerRegistry registry); void createContainer(String id, KieContainerInstance kieContainerInstance, Map<String, Object> parameters); void disposeContainer(String id, KieContainerInstance kieContainerInstance, Map<String, Object> parameters); List<Object> getAppComponents(SupportedTransports type); <T> T getAppComponents(Class<T> serviceType); String getImplementedCapability();1 List<Object> getServices(); String getExtensionName();2 Integer getStartOrder();3 }在前面的
MinaDroolsKieServerExtension示例实现中,init方法是从Drools扩展收集服务的主要元素,用于引导 MINA 服务器。KieServerExtension接口中的其他方法都可以与标准实施保持一致,以满足接口要求。TextBasedIoHandlerAdapter类是响应传入请求的 MINA 服务器上的处理程序。为 MINA 服务器实施 text
BasedIoHandlerAdapter处理程序,如下例所示:TextBasedIoHandlerAdapter处理程序的实现示例public class TextBasedIoHandlerAdapter extends IoHandlerAdapter { private static final Logger logger = LoggerFactory.getLogger(TextBasedIoHandlerAdapter.class); private KieContainerCommandService batchCommandService; public TextBasedIoHandlerAdapter(KieContainerCommandService batchCommandService) { this.batchCommandService = batchCommandService; } @Override public void messageReceived( IoSession session, Object message ) throws Exception { String completeMessage = message.toString(); logger.debug("Received message '{}'", completeMessage); if( completeMessage.trim().equalsIgnoreCase("quit") || completeMessage.trim().equalsIgnoreCase("exit") ) { session.close(false); return; } String[] elements = completeMessage.split("\\|"); logger.debug("Container id {}", elements[0]); try { ServiceResponse<String> result = batchCommandService.callContainer(elements[0], elements[1], MarshallingFormat.JSON, null); if (result.getType().equals(ServiceResponse.ResponseType.SUCCESS)) { session.write(result.getResult()); logger.debug("Successful message written with content '{}'", result.getResult()); } else { session.write(result.getMsg()); logger.debug("Failure message written with content '{}'", result.getMsg()); } } catch (Exception e) { } } }在本例中,处理器类接收文本信息,并在
Drools服务中执行它们。在使用
TextBasedIoHandlerAdapter处理程序实现时,请考虑以下处理器要求和行为:- 您提交到处理程序的任何对象都必须是一行,因为每个传入的传输请求都是一行。
-
您必须在此一行中传递 KIE 容器 ID,以便处理程序需要格式
containerID|payload。 - 您可以使用 marshaller 生成的方式设置响应。响应可以是多行。
-
该处理程序支持 流模式,允许您发送命令而不与 KIE 服务器会话断开连接。要在流模式下结束 KIE 服务器会话,请将
exit或 exit 命令发送至服务器。
-
要使新数据传输可被 KIE 服务器发现,请在 文件中创建一个
META-INF/services/org.kie.server.api.KieServerExtension文件,并在该文件中添加完全限定的类名称。在本例中,该文件包含一行org.kie.server.ext.mina.MinastandaloneKieServerExtension。 -
构建您的项目,并将生成的 JAR 文件复制到项目的
mina-core-2.0.9.jar文件(此示例中取决于该扩展)到项目的~/kie-server.war/WEB-INF/lib目录中。例如,在红帽 JBoss EAP 上,此目录的路径是EAP_HOME/standalone/deployments/kie-server.war/WEB-INF/lib。 启动 KIE 服务器并将构建的项目部署到正在运行的 KIE 服务器。您可以使用 Business Central 接口或 KIE 服务器 REST API 部署项目(
PUT请求到http://SERVER:PORT/kie-server/services/rest/server/containers/{containerId})。在正在运行的 KIE 服务器上部署项目后,您可以在 KIE 服务器日志中查看新数据传输的状态,然后开始使用您的新数据传输:
服务器日志中的新数据传输
Drools-Mina KIE Server extension -- Mina server started at localhost and port 9123 Drools-Mina KIE Server extension has been successfully registered as server extension在本例中,您可以使用 Telnet 与 KIE 服务器中的新的基于 MINA 的数据传输进行交互:
在命令终端中启动 Telnet 并连接到端口 9123 上的 KIE 服务器
telnet 127.0.0.1 9123在命令终端中与 KIE 服务器交互示例
Trying 127.0.0.1... Connected to localhost. Escape character is '^]'. # Request body: demo|{"lookup":"defaultKieSession","commands":[{"insert":{"object":{"org.jbpm.test.Person":{"name":"john","age":25}}}},{"fire-all-rules":""}]} # Server response: { "results" : [ { "key" : "", "value" : 1 } ], "facts" : [ ] } demo|{"lookup":"defaultKieSession","commands":[{"insert":{"object":{"org.jbpm.test.Person":{"name":"mary","age":22}}}},{"fire-all-rules":""}]} { "results" : [ { "key" : "", "value" : 1 } ], "facts" : [ ] } demo|{"lookup":"defaultKieSession","commands":[{"insert":{"object":{"org.jbpm.test.Person":{"name":"james","age":25}}}},{"fire-all-rules":""}]} { "results" : [ { "key" : "", "value" : 1 } ], "facts" : [ ] } exit Connection closed by foreign host.服务器日志输出示例
16:33:40,206 INFO [stdout] (NioProcessor-2) Hello john 16:34:03,877 INFO [stdout] (NioProcessor-2) Hello mary 16:34:19,800 INFO [stdout] (NioProcessor-2) Hello james