22.2. 扩展 KIE 服务器以使用自定义数据传输


默认情况下,KIE 服务器扩展通过 REST 或 JMS 数据传输公开。您可以扩展 KIE 服务器,以支持自定义数据传输,以符合您的业务需求。

例如,这个流程向 KIE 服务器添加自定义数据传输,它使用 do ls 扩展,且基于 Apache MINA,它是一个开源 Java 网络应用程序框架。自定义 MINA 传输交换基于字符串的数据,它们依赖于现有的 marshalling 操作并只支持 JSON 格式。

流程

  1. 创建一个空的 Maven 项目,并在项目的 pom.xml 文件中定义以下打包类型和依赖项:

    示例项目中的 pom.xml 文件示例

    <packaging>jar</packaging>
    
    <properties>
      <version.org.kie>7.52.0.Final-redhat-00007</version.org.kie>
    </properties>
    
    <dependencies>
      <dependency>
        <groupId>org.kie</groupId>
        <artifactId>kie-api</artifactId>
        <version>${version.org.kie}</version>
      </dependency>
      <dependency>
        <groupId>org.kie</groupId>
        <artifactId>kie-internal</artifactId>
        <version>${version.org.kie}</version>
      </dependency>
      <dependency>
        <groupId>org.kie.server</groupId>
        <artifactId>kie-server-api</artifactId>
        <version>${version.org.kie}</version>
      </dependency>
      <dependency>
        <groupId>org.kie.server</groupId>
        <artifactId>kie-server-services-common</artifactId>
        <version>${version.org.kie}</version>
      </dependency>
      <dependency>
        <groupId>org.kie.server</groupId>
        <artifactId>kie-server-services-drools</artifactId>
        <version>${version.org.kie}</version>
      </dependency>
      <dependency>
        <groupId>org.drools</groupId>
        <artifactId>drools-core</artifactId>
        <version>${version.org.kie}</version>
      </dependency>
      <dependency>
        <groupId>org.drools</groupId>
        <artifactId>drools-compiler</artifactId>
        <version>${version.org.kie}</version>
      </dependency>
      <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.25</version>
      </dependency>
      <dependency>
        <groupId>org.apache.mina</groupId>
        <artifactId>mina-core</artifactId>
        <version>2.1.3</version>
      </dependency>
    </dependencies>

  2. 在项目中的 Java 类中实施 org.kie.server.services.api.KieServerExtension 接口,如下例所示:

    KieServerExtension 接口的实现示例

    public class MinaDroolsKieServerExtension implements KieServerExtension {
    
        private static final Logger logger = LoggerFactory.getLogger(MinaDroolsKieServerExtension.class);
    
        public static final String EXTENSION_NAME = "Drools-Mina";
    
        private static final Boolean disabled = Boolean.parseBoolean(System.getProperty("org.kie.server.drools-mina.ext.disabled", "false"));
        private static final String MINA_HOST = System.getProperty("org.kie.server.drools-mina.ext.port", "localhost");
        private static final int MINA_PORT = Integer.parseInt(System.getProperty("org.kie.server.drools-mina.ext.port", "9123"));
    
        // Taken from dependency on the `Drools` extension:
        private KieContainerCommandService batchCommandService;
    
        // Specific to MINA:
        private IoAcceptor acceptor;
    
        public boolean isActive() {
            return disabled == false;
        }
    
        public void init(KieServerImpl kieServer, KieServerRegistry registry) {
    
            KieServerExtension droolsExtension = registry.getServerExtension("Drools");
            if (droolsExtension == null) {
                logger.warn("No Drools extension available, quitting...");
                return;
            }
    
            List<Object> droolsServices = droolsExtension.getServices();
            for( Object object : droolsServices ) {
                // If the given service is null (not configured), continue to the next service:
                if (object == null) {
                    continue;
                }
                if( KieContainerCommandService.class.isAssignableFrom(object.getClass()) ) {
                    batchCommandService = (KieContainerCommandService) object;
                    continue;
                }
            }
            if (batchCommandService != null) {
                acceptor = new NioSocketAcceptor();
                acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter( new TextLineCodecFactory( Charset.forName( "UTF-8" ))));
    
                acceptor.setHandler( new TextBasedIoHandlerAdapter(batchCommandService) );
                acceptor.getSessionConfig().setReadBufferSize( 2048 );
                acceptor.getSessionConfig().setIdleTime( IdleStatus.BOTH_IDLE, 10 );
                try {
                    acceptor.bind( new InetSocketAddress(MINA_HOST, MINA_PORT) );
    
                    logger.info("{} -- Mina server started at {} and port {}", toString(), MINA_HOST, MINA_PORT);
                } catch (IOException e) {
                    logger.error("Unable to start Mina acceptor due to {}", e.getMessage(), e);
                }
    
            }
        }
    
        public void destroy(KieServerImpl kieServer, KieServerRegistry registry) {
            if (acceptor != null) {
                acceptor.dispose();
                acceptor = null;
            }
            logger.info("{} -- Mina server stopped", toString());
        }
    
        public void createContainer(String id, KieContainerInstance kieContainerInstance, Map<String, Object> parameters) {
            // Empty, already handled by the `Drools` extension
    
        }
    
        public void disposeContainer(String id, KieContainerInstance kieContainerInstance, Map<String, Object> parameters) {
          // Empty, already handled by the `Drools` extension
    
        }
    
        public List<Object> getAppComponents(SupportedTransports type) {
            // Nothing for supported transports (REST or JMS)
            return Collections.emptyList();
        }
    
        public <T> T getAppComponents(Class<T> serviceType) {
    
            return null;
        }
    
        public String getImplementedCapability() {
            return "BRM-Mina";
        }
    
        public List<Object> getServices() {
            return Collections.emptyList();
        }
    
        public String getExtensionName() {
            return EXTENSION_NAME;
        }
    
        public Integer getStartOrder() {
            return 20;
        }
    
        @Override
        public String toString() {
            return EXTENSION_NAME + " KIE Server extension";
        }
    }

    KieServerExtension 接口是 KIE 服务器可用于为新的 MINA 传输提供额外的功能的主要扩展接口。接口由以下组件组成:

    KieServerExtension 接口概述

    public interface KieServerExtension {
    
        boolean isActive();
    
        void init(KieServerImpl kieServer, KieServerRegistry registry);
    
        void destroy(KieServerImpl kieServer, KieServerRegistry registry);
    
        void createContainer(String id, KieContainerInstance kieContainerInstance, Map<String, Object> parameters);
    
        void disposeContainer(String id, KieContainerInstance kieContainerInstance, Map<String, Object> parameters);
    
        List<Object> getAppComponents(SupportedTransports type);
    
        <T> T getAppComponents(Class<T> serviceType);
    
        String getImplementedCapability();  1
    
        List<Object> getServices();
    
        String getExtensionName();  2
    
        Integer getStartOrder();  3
    }

    1
    指定此扩展涵盖的功能。该功能必须在 KIE 服务器中唯一。
    2
    为扩展名定义人类可读的名称。
    3
    确定何时启动指定的扩展。对于对其他扩展具有依赖项的扩展,此设置不得与父设置冲突。例如,在这种情况下,此自定义扩展取决于 dolrder 扩展,其 StartOrder 被设置为 0,因此此自定义附加组件扩展必须大于 0 ( 在示例实现中设为 20 )。

    在之前的 Mina Drools KieServerExtension 示例实现中,init 方法是从 dol 扩展收集服务的主要元素,用于引导 MINA 服务器。KieServerExtension 接口中的所有其他方法都可以与标准实施保持下来,以满足接口要求。

    TextBasedIoHandlerAdapter 类是在 MINA 服务器上响应传入请求的处理程序。

  3. 为 MINA 服务器实现 TextBasedIoHandlerAdapter 处理程序,如下例所示:

    TextBasedIoHandlerAdapter 处理程序实现示例

    public class TextBasedIoHandlerAdapter extends IoHandlerAdapter {
    
        private static final Logger logger = LoggerFactory.getLogger(TextBasedIoHandlerAdapter.class);
    
        private KieContainerCommandService batchCommandService;
    
        public TextBasedIoHandlerAdapter(KieContainerCommandService batchCommandService) {
            this.batchCommandService = batchCommandService;
        }
    
        @Override
        public void messageReceived( IoSession session, Object message ) throws Exception {
            String completeMessage = message.toString();
            logger.debug("Received message '{}'", completeMessage);
            if( completeMessage.trim().equalsIgnoreCase("quit") || completeMessage.trim().equalsIgnoreCase("exit") ) {
                session.close(false);
                return;
            }
    
            String[] elements = completeMessage.split("\\|");
            logger.debug("Container id {}", elements[0]);
            try {
                ServiceResponse<String> result = batchCommandService.callContainer(elements[0], elements[1], MarshallingFormat.JSON, null);
    
                if (result.getType().equals(ServiceResponse.ResponseType.SUCCESS)) {
                    session.write(result.getResult());
                    logger.debug("Successful message written with content '{}'", result.getResult());
                } else {
                    session.write(result.getMsg());
                    logger.debug("Failure message written with content '{}'", result.getMsg());
                }
            } catch (Exception e) {
    
            }
        }
    }

    在本例中,处理器类接收文本消息并在 drools 服务中 执行它们

    使用 TextBasedIoHandlerAdapter 处理程序实现时,请考虑以下处理程序要求和行为:

    • 您提交到处理器的任何内容都必须是一个行,因为每个传入的传输请求都是一行。
    • 您必须在此一行中传递 KIE 容器 ID,以便处理程序需要 containerID|payload 格式。
    • 您可以按照 marshaller 生成的方式设置响应。响应可以是多行。
    • 处理程序支持 流模式,允许您在不断开 KIE Server 会话的情况下发送命令。要以流模式结束 KIE 服务器会话,请将 退出退出 命令发送到服务器。
  4. 要使 KIE 服务器可以发现新的数据传输,请在 Maven 项目中创建一个 META-INF/services/org.kie.server.services.api.KieServerExtension 文件,并在文件中添加 KieServerExtension 实现类的完全限定域名。在本例中,文件包含一行 org.kie.server.ext.mina.MinaDroolsKieServerExtension
  5. 构建您的项目并将生成的 JAR 文件和 mina-core-2.0.9.jar 文件复制到项目的 ~/kie-server.war/WEB-INF/lib 目录中。例如,在 Red Hat JBoss EAP 上,此目录的路径是 EAP_HOME/standalone/deployments/kie-server.war/WEB-INF/lib
  6. 启动 KIE 服务器,并将构建的项目部署到正在运行的 KIE 服务器中。您可以使用 Business Central 接口或 KIE Server REST API ( PUT 请求 http://SERVER:PORT/kie-server/services/rest/server/containers/{containerId})来部署项目。

    在项目部署到正在运行的 KIE 服务器上后,您可以在 KIE 服务器日志中查看新数据传输的状态,并开始使用您的新数据传输:

    服务器日志中的新数据传输

    Drools-Mina KIE Server extension -- Mina server started at localhost and port 9123
    Drools-Mina KIE Server extension has been successfully registered as server extension

    在本例中,您可以使用 Telnet 与 KIE 服务器中基于 MINA 的数据传输进行交互:

    启动 Telnet 并在命令终端中连接到端口 9123 上的 KIE 服务器

    telnet 127.0.0.1 9123

    在命令终端中与 KIE 服务器交互示例

    Trying 127.0.0.1...
    Connected to localhost.
    Escape character is '^]'.
    
    # Request body:
    demo|{"lookup":"defaultKieSession","commands":[{"insert":{"object":{"org.jbpm.test.Person":{"name":"john","age":25}}}},{"fire-all-rules":""}]}
    
    # Server response:
    {
      "results" : [ {
        "key" : "",
        "value" : 1
      } ],
      "facts" : [ ]
    }
    
    demo|{"lookup":"defaultKieSession","commands":[{"insert":{"object":{"org.jbpm.test.Person":{"name":"mary","age":22}}}},{"fire-all-rules":""}]}
    {
      "results" : [ {
        "key" : "",
        "value" : 1
      } ],
      "facts" : [ ]
    }
    
    demo|{"lookup":"defaultKieSession","commands":[{"insert":{"object":{"org.jbpm.test.Person":{"name":"james","age":25}}}},{"fire-all-rules":""}]}
    {
      "results" : [ {
        "key" : "",
        "value" : 1
      } ],
      "facts" : [ ]
    }
    exit
    Connection closed by foreign host.

    服务器日志输出示例

    16:33:40,206 INFO  [stdout] (NioProcessor-2) Hello john
    16:34:03,877 INFO  [stdout] (NioProcessor-2) Hello mary
    16:34:19,800 INFO  [stdout] (NioProcessor-2) Hello james

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.