22.2. 扩展 KIE 服务器以使用自定义数据传输
默认情况下,KIE 服务器扩展通过 REST 或 JMS 数据传输公开。您可以扩展 KIE 服务器,以支持自定义数据传输,以符合您的业务需求。
例如,这个流程向 KIE 服务器添加自定义数据传输,它使用 do ls
扩展,且基于 Apache MINA,它是一个开源 Java 网络应用程序框架。自定义 MINA 传输交换基于字符串的数据,它们依赖于现有的 marshalling 操作并只支持 JSON 格式。
流程
创建一个空的 Maven 项目,并在项目的
pom.xml
文件中定义以下打包类型和依赖项:示例项目中的 pom.xml 文件示例
<packaging>jar</packaging> <properties> <version.org.kie>7.52.0.Final-redhat-00007</version.org.kie> </properties> <dependencies> <dependency> <groupId>org.kie</groupId> <artifactId>kie-api</artifactId> <version>${version.org.kie}</version> </dependency> <dependency> <groupId>org.kie</groupId> <artifactId>kie-internal</artifactId> <version>${version.org.kie}</version> </dependency> <dependency> <groupId>org.kie.server</groupId> <artifactId>kie-server-api</artifactId> <version>${version.org.kie}</version> </dependency> <dependency> <groupId>org.kie.server</groupId> <artifactId>kie-server-services-common</artifactId> <version>${version.org.kie}</version> </dependency> <dependency> <groupId>org.kie.server</groupId> <artifactId>kie-server-services-drools</artifactId> <version>${version.org.kie}</version> </dependency> <dependency> <groupId>org.drools</groupId> <artifactId>drools-core</artifactId> <version>${version.org.kie}</version> </dependency> <dependency> <groupId>org.drools</groupId> <artifactId>drools-compiler</artifactId> <version>${version.org.kie}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>org.apache.mina</groupId> <artifactId>mina-core</artifactId> <version>2.1.3</version> </dependency> </dependencies>
在项目中的 Java 类中实施
org.kie.server.services.api.KieServerExtension
接口,如下例所示:KieServerExtension
接口的实现示例public class MinaDroolsKieServerExtension implements KieServerExtension { private static final Logger logger = LoggerFactory.getLogger(MinaDroolsKieServerExtension.class); public static final String EXTENSION_NAME = "Drools-Mina"; private static final Boolean disabled = Boolean.parseBoolean(System.getProperty("org.kie.server.drools-mina.ext.disabled", "false")); private static final String MINA_HOST = System.getProperty("org.kie.server.drools-mina.ext.port", "localhost"); private static final int MINA_PORT = Integer.parseInt(System.getProperty("org.kie.server.drools-mina.ext.port", "9123")); // Taken from dependency on the `Drools` extension: private KieContainerCommandService batchCommandService; // Specific to MINA: private IoAcceptor acceptor; public boolean isActive() { return disabled == false; } public void init(KieServerImpl kieServer, KieServerRegistry registry) { KieServerExtension droolsExtension = registry.getServerExtension("Drools"); if (droolsExtension == null) { logger.warn("No Drools extension available, quitting..."); return; } List<Object> droolsServices = droolsExtension.getServices(); for( Object object : droolsServices ) { // If the given service is null (not configured), continue to the next service: if (object == null) { continue; } if( KieContainerCommandService.class.isAssignableFrom(object.getClass()) ) { batchCommandService = (KieContainerCommandService) object; continue; } } if (batchCommandService != null) { acceptor = new NioSocketAcceptor(); acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter( new TextLineCodecFactory( Charset.forName( "UTF-8" )))); acceptor.setHandler( new TextBasedIoHandlerAdapter(batchCommandService) ); acceptor.getSessionConfig().setReadBufferSize( 2048 ); acceptor.getSessionConfig().setIdleTime( IdleStatus.BOTH_IDLE, 10 ); try { acceptor.bind( new InetSocketAddress(MINA_HOST, MINA_PORT) ); logger.info("{} -- Mina server started at {} and port {}", toString(), MINA_HOST, MINA_PORT); } catch (IOException e) { logger.error("Unable to start Mina acceptor due to {}", e.getMessage(), e); } } } public void destroy(KieServerImpl kieServer, KieServerRegistry registry) { if (acceptor != null) { acceptor.dispose(); acceptor = null; } logger.info("{} -- Mina server stopped", toString()); } public void createContainer(String id, KieContainerInstance kieContainerInstance, Map<String, Object> parameters) { // Empty, already handled by the `Drools` extension } public void disposeContainer(String id, KieContainerInstance kieContainerInstance, Map<String, Object> parameters) { // Empty, already handled by the `Drools` extension } public List<Object> getAppComponents(SupportedTransports type) { // Nothing for supported transports (REST or JMS) return Collections.emptyList(); } public <T> T getAppComponents(Class<T> serviceType) { return null; } public String getImplementedCapability() { return "BRM-Mina"; } public List<Object> getServices() { return Collections.emptyList(); } public String getExtensionName() { return EXTENSION_NAME; } public Integer getStartOrder() { return 20; } @Override public String toString() { return EXTENSION_NAME + " KIE Server extension"; } }
KieServerExtension
接口是 KIE 服务器可用于为新的 MINA 传输提供额外的功能的主要扩展接口。接口由以下组件组成:KieServerExtension
接口概述public interface KieServerExtension { boolean isActive(); void init(KieServerImpl kieServer, KieServerRegistry registry); void destroy(KieServerImpl kieServer, KieServerRegistry registry); void createContainer(String id, KieContainerInstance kieContainerInstance, Map<String, Object> parameters); void disposeContainer(String id, KieContainerInstance kieContainerInstance, Map<String, Object> parameters); List<Object> getAppComponents(SupportedTransports type); <T> T getAppComponents(Class<T> serviceType); String getImplementedCapability(); 1 List<Object> getServices(); String getExtensionName(); 2 Integer getStartOrder(); 3 }
在之前的
Mina
示例实现中,Drools
KieServerExtensioninit
方法是从 dol 扩展收集服务的主要元素,用于引导 MINA 服务器。KieServerExtension
接口中的所有其他方法都可以与标准实施保持下来,以满足接口要求。TextBasedIoHandlerAdapter
类是在 MINA 服务器上响应传入请求的处理程序。为 MINA 服务器实现
TextBasedIoHandlerAdapter
处理程序,如下例所示:TextBasedIoHandlerAdapter
处理程序实现示例public class TextBasedIoHandlerAdapter extends IoHandlerAdapter { private static final Logger logger = LoggerFactory.getLogger(TextBasedIoHandlerAdapter.class); private KieContainerCommandService batchCommandService; public TextBasedIoHandlerAdapter(KieContainerCommandService batchCommandService) { this.batchCommandService = batchCommandService; } @Override public void messageReceived( IoSession session, Object message ) throws Exception { String completeMessage = message.toString(); logger.debug("Received message '{}'", completeMessage); if( completeMessage.trim().equalsIgnoreCase("quit") || completeMessage.trim().equalsIgnoreCase("exit") ) { session.close(false); return; } String[] elements = completeMessage.split("\\|"); logger.debug("Container id {}", elements[0]); try { ServiceResponse<String> result = batchCommandService.callContainer(elements[0], elements[1], MarshallingFormat.JSON, null); if (result.getType().equals(ServiceResponse.ResponseType.SUCCESS)) { session.write(result.getResult()); logger.debug("Successful message written with content '{}'", result.getResult()); } else { session.write(result.getMsg()); logger.debug("Failure message written with content '{}'", result.getMsg()); } } catch (Exception e) { } } }
在本例中,处理器类接收文本消息并在 drools 服务中
执行它们
。使用
TextBasedIoHandlerAdapter
处理程序实现时,请考虑以下处理程序要求和行为:- 您提交到处理器的任何内容都必须是一个行,因为每个传入的传输请求都是一行。
-
您必须在此一行中传递 KIE 容器 ID,以便处理程序需要
containerID|payload
格式。 - 您可以按照 marshaller 生成的方式设置响应。响应可以是多行。
-
处理程序支持 流模式,允许您在不断开 KIE Server 会话的情况下发送命令。要以流模式结束 KIE 服务器会话,请将
退出
或退出
命令发送到服务器。
-
要使 KIE 服务器可以发现新的数据传输,请在 Maven 项目中创建一个
META-INF/services/org.kie.server.services.api.KieServerExtension
文件,并在文件中添加KieServerExtension
实现类的完全限定域名。在本例中,文件包含一行org.kie.server.ext.mina.MinaDroolsKieServerExtension
。 -
构建您的项目并将生成的 JAR 文件和
mina-core-2.0.9.jar
文件复制到项目的~/kie-server.war/WEB-INF/lib
目录中。例如,在 Red Hat JBoss EAP 上,此目录的路径是EAP_HOME/standalone/deployments/kie-server.war/WEB-INF/lib
。 启动 KIE 服务器,并将构建的项目部署到正在运行的 KIE 服务器中。您可以使用 Business Central 接口或 KIE Server REST API (
PUT
请求http://SERVER:PORT/kie-server/services/rest/server/containers/{containerId}
)来部署项目。在项目部署到正在运行的 KIE 服务器上后,您可以在 KIE 服务器日志中查看新数据传输的状态,并开始使用您的新数据传输:
服务器日志中的新数据传输
Drools-Mina KIE Server extension -- Mina server started at localhost and port 9123 Drools-Mina KIE Server extension has been successfully registered as server extension
在本例中,您可以使用 Telnet 与 KIE 服务器中基于 MINA 的数据传输进行交互:
启动 Telnet 并在命令终端中连接到端口 9123 上的 KIE 服务器
telnet 127.0.0.1 9123
在命令终端中与 KIE 服务器交互示例
Trying 127.0.0.1... Connected to localhost. Escape character is '^]'. # Request body: demo|{"lookup":"defaultKieSession","commands":[{"insert":{"object":{"org.jbpm.test.Person":{"name":"john","age":25}}}},{"fire-all-rules":""}]} # Server response: { "results" : [ { "key" : "", "value" : 1 } ], "facts" : [ ] } demo|{"lookup":"defaultKieSession","commands":[{"insert":{"object":{"org.jbpm.test.Person":{"name":"mary","age":22}}}},{"fire-all-rules":""}]} { "results" : [ { "key" : "", "value" : 1 } ], "facts" : [ ] } demo|{"lookup":"defaultKieSession","commands":[{"insert":{"object":{"org.jbpm.test.Person":{"name":"james","age":25}}}},{"fire-all-rules":""}]} { "results" : [ { "key" : "", "value" : 1 } ], "facts" : [ ] } exit Connection closed by foreign host.
服务器日志输出示例
16:33:40,206 INFO [stdout] (NioProcessor-2) Hello john 16:34:03,877 INFO [stdout] (NioProcessor-2) Hello mary 16:34:19,800 INFO [stdout] (NioProcessor-2) Hello james