Chapter 6. Complex event processing (CEP)


In Red Hat Decision Manager, an event is a record of a significant change of state in the application domain at a point in time. Depending on how the domain is modeled, the change of state may be represented by a single event, multiple atomic events, or hierarchies of correlated events. From a complex event processing (CEP) perspective, an event is a type of fact or object that occurs at a specific point in time, and a business rule is a definition of how to react to the data from that fact or object. For example, in a stock broker application, a change in security prices, a change in ownership from seller to buyer, or a change in an account holder’s balance are all considered to be events because a change has occurred in the state of the application domain at a given time.

The decision engine in Red Hat Decision Manager uses complex event processing (CEP) to detect and process multiple events within a collection of events, to uncover relationships that exist between events, and to infer new data from the events and their relationships.

CEP use cases share several requirements and goals with business rule use cases.

From a business perspective, business rule definitions are often defined based on the occurrence of scenarios triggered by events. In the following examples, events form the basis of business rules:

  • In an algorithmic trading application, a rule performs an action if the security price increases by X percent above the day opening price. The price increases are denoted by events on a stock trading application.
  • In a monitoring application, a rule performs an action if the temperature in the server room increases X degrees in Y minutes. The sensor readings are denoted by events.

From a technical perspective, business rule evaluation and CEP have the following key similarities:

  • Both business rule evaluation and CEP require seamless integration with the enterprise infrastructure and applications. This is particularly important with life-cycle management, auditing, and security.
  • Both business rule evaluation and CEP have functional requirements such as pattern matching, and non-functional requirements such as response time limits and query-rule explanations.

CEP scenarios have the following key characteristics:

  • Scenarios usually process large numbers of events, but only a small percentage of the events are relevant.
  • Events are usually immutable and represent a record of change in state.
  • Rules and queries run against events and must react to detected event patterns.
  • Related events usually have a strong temporal relationship.
  • Individual events are not prioritized. The CEP system prioritizes patterns of related events and the relationships between them.
  • Events usually need to be composed and aggregated.

Given these common CEP scenario characteristics, the CEP system in Red Hat Decision Manager supports the following features and functions to optimize event processing:

  • Event processing with proper semantics
  • Event detection, correlation, aggregation, and composition
  • Event stream processing
  • Temporal constraints to model the temporal relationships between events
  • Sliding windows of significant events
  • Session-scoped unified clock
  • Required volumes of events for CEP use cases
  • Reactive rules
  • Adapters for event input into the decision engine (pipeline)

6.1. Events in complex event processing

In Red Hat Decision Manager, an event is a record of a significant change of state in the application domain at a point in time. Depending on how the domain is modeled, the change of state may be represented by a single event, multiple atomic events, or hierarchies of correlated events. From a complex event processing (CEP) perspective, an event is a type of fact or object that occurs at a specific point in time, and a business rule is a definition of how to react to the data from that fact or object. For example, in a stock broker application, a change in security prices, a change in ownership from seller to buyer, or a change in an account holder’s balance are all considered to be events because a change has occurred in the state of the application domain at a given time.

Events have the following key characteristics:

  • Are immutable: An event is a record of change that has occurred at some time in the past and cannot be changed.

    Note

    The decision engine does not enforce immutability on the Java objects that represent events. This behavior makes event data enrichment possible. Your application should be able to populate unpopulated event attributes, and these attributes are used by the decision engine to enrich the event with inferred data. However, you should not change event attributes that have already been populated.

  • Have strong temporal constraints: Rules involving events usually require the correlation of multiple events that occur at different points in time relative to each other.
  • Have managed life cycles: Because events are immutable and have temporal constraints, they are usually only relevant for a specified period of time. This means that the decision engine can automatically manage the life cycle of events.
  • Can use sliding windows: You can define sliding windows of time or length with events. A sliding time window is a specified period of time during which events can be processed. A sliding length window is a specified number of events that can be processed.

6.2. Declaring facts as events

You can declare facts as events in your Java class or DRL rule file so that the decision engine handles the facts as events during complex event processing. You can declare the facts as interval-based events or point-in-time events. Interval-based events have a duration time and persist in the working memory of the decision engine until their duration time has lapsed. Point-in-time events have no duration and are essentially interval-based events with a duration of zero.

Procedure

For the relevant fact type in your Java class or DRL rule file, enter the @role( event ) metadata tag and parameter. The @role metadata tag accepts the following two values:

  • fact: (Default) Declares the type as a regular fact
  • event: Declares the type as an event

For example, the following snippet declares that the StockPoint fact type in a stock broker application must be handled as an event:

Declare fact type as an event

import some.package.StockPoint

declare StockPoint
  @role( event )
end

If StockPoint is a fact type declared in the DRL rule file instead of in a pre-existing class, you can declare the event in-line in your application code:

Declare fact type in-line and assign it to event role

declare StockPoint
  @role( event )

  datetime : java.util.Date
  symbol : String
  price : double
end

6.3. Metadata tags for events

The decision engine uses the following metadata tags for events that are inserted into the working memory of the decision engine. You can change the default metadata tag values in your Java class or DRL rule file as needed.

Note

The examples in this section that refer to the VoiceCall class assume that the sample application domain model includes the following class details:

VoiceCall fact class in an example Telecom domain model

public class VoiceCall {
  private String  originNumber;
  private String  destinationNumber;
  private Date    callDateTime;
  private long    callDuration;  // in milliseconds

  // Constructors, getters, and setters
}

@role

This tag determines whether a given fact type is handled as a regular fact or an event in the decision engine during complex event processing.

Default parameter: fact

Supported parameters: fact, event

@role( fact | event )

Example: Declare VoiceCall as event type

declare VoiceCall
  @role( event )
end

@timestamp

This tag is automatically assigned to every event in the decision engine. By default, the time is provided by the session clock and assigned to the event when it is inserted into the working memory of the decision engine. You can specify a custom time stamp attribute instead of the default time stamp added by the session clock.

Default parameter: The time added by the decision engine session clock

Supported parameters: Session clock time or custom time stamp attribute

@timestamp( <attributeName> )

Example: Declare VoiceCall timestamp attribute

declare VoiceCall
  @role( event )
  @timestamp( callDateTime )
end

@duration

This tag determines the duration time for events in the decision engine. Events can be interval-based events or point-in-time events. Interval-based events have a duration time and persist in the working memory of the decision engine until their duration time has lapsed. Point-in-time events have no duration and are essentially interval-based events with a duration of zero. By default, every event in the decision engine has a duration of zero. You can specify a custom duration attribute instead of the default.

Default parameter: Null (zero)

Supported parameters: Custom duration attribute

@duration( <attributeName> )

Example: Declare VoiceCall duration attribute

declare VoiceCall
  @role( event )
  @timestamp( callDateTime )
  @duration( callDuration )
end

@expires

This tag determines the time duration before an event expires in the working memory of the decision engine. By default, an event expires when the event can no longer match and activate any of the current rules. You can define an amount of time after which an event should expire. This tag definition also overrides the implicit expiration offset calculated from temporal constraints and sliding windows in the KIE base. This tag is available only when the decision engine is running in stream mode.

Default parameter: Null (event expires after event can no longer match and activate rules)

Supported parameters: Custom timeOffset attribute in the format [#d][#h][#m][#s][[ms]]

@expires( <timeOffset> )

Example: Declare expiration offset for VoiceCall events

declare VoiceCall
  @role( event )
  @timestamp( callDateTime )
  @duration( callDuration )
  @expires( 1h35m )
end

6.4. Event processing modes in the decision engine

The decision engine runs in either cloud mode or stream mode. In cloud mode, the decision engine processes facts as facts with no temporal constraints, independent of time, and in no particular order. In stream mode, the decision engine processes facts as events with strong temporal constraints, in real time or near real time. Stream mode uses synchronization to make event processing possible in Red Hat Decision Manager.

Cloud mode

Cloud mode is the default operating mode of the decision engine. In cloud mode, the decision engine treats events as an unordered cloud. Events still have time stamps, but the decision engine running in cloud mode cannot draw relevance from the time stamp because cloud mode ignores the present time. This mode uses the rule constraints to find the matching tuples to activate and execute rules.

Cloud mode does not impose any kind of additional requirements on facts. However, because the decision engine in this mode has no concept of time, it cannot use temporal features such as sliding windows or automatic life-cycle management. In cloud mode, events must be explicitly retracted when they are no longer needed.

The following requirements are not imposed in cloud mode:

  • No clock synchronization because the decision engine has no notion of time
  • No ordering of events because the decision engine processes events as an unordered cloud, against which the decision engine match rules

You can specify cloud mode either by setting the system property in the relevant configuration files or by using the Java client API:

Set cloud mode using system property

drools.eventProcessingMode=cloud

Set cloud mode using Java client API

import org.kie.api.conf.EventProcessingOption;
import org.kie.api.KieBaseConfiguration;
import org.kie.api.KieServices.Factory;

KieBaseConfiguration config = KieServices.Factory.get().newKieBaseConfiguration();

config.setOption(EventProcessingOption.CLOUD);

You can also specify cloud mode using the eventProcessingMode="<mode>" KIE base attribute in the KIE module descriptor file (kmodule.xml) for a specific Red Hat Decision Manager project:

Set cloud mode using project kmodule.xml file

<kmodule>
  ...
  <kbase name="KBase2" default="false" eventProcessingMode="cloud" packages="org.domain.pkg2, org.domain.pkg3" includes="KBase1">
    ...
  </kbase>
  ...
</kmodule>

Stream mode

Stream mode enables the decision engine to process events chronologically and in real time as they are inserted into the decision engine. In stream mode, the decision engine synchronizes streams of events (so that events in different streams can be processed in chronological order), implements sliding windows of time or length, and enables automatic life-cycle management.

The following requirements apply to stream mode:

  • Events in each stream must be ordered chronologically.
  • A session clock must be present to synchronize event streams.
Note

Your application does not need to enforce ordering events between streams, but using event streams that have not been synchronized may cause unexpected results.

You can specify stream mode either by setting the system property in the relevant configuration files or by using the Java client API:

Set stream mode using system property

drools.eventProcessingMode=stream

Set stream mode using Java client API

import org.kie.api.conf.EventProcessingOption;
import org.kie.api.KieBaseConfiguration;
import org.kie.api.KieServices.Factory;

KieBaseConfiguration config = KieServices.Factory.get().newKieBaseConfiguration();

config.setOption(EventProcessingOption.STREAM);

You can also specify stream mode using the eventProcessingMode="<mode>" KIE base attribute in the KIE module descriptor file (kmodule.xml) for a specific Red Hat Decision Manager project:

Set stream mode using project kmodule.xml file

<kmodule>
  ...
  <kbase name="KBase2" default="false" eventProcessingMode="stream" packages="org.domain.pkg2, org.domain.pkg3" includes="KBase1">
    ...
  </kbase>
  ...
</kmodule>

6.4.1. Negative patterns in decision engine stream mode

A negative pattern is a pattern for conditions that are not met. For example, the following DRL rule activates a fire alarm if a fire is detected and the sprinkler is not activated:

Fire alarm rule with a negative pattern

rule "Sound the alarm"
when
  $f : FireDetected()
  not(SprinklerActivated())
then
  // Sound the alarm.
end

In cloud mode, the decision engine assumes all facts (regular facts and events) are known in advance and evaluates negative patterns immediately. In stream mode, the decision engine can support temporal constraints on facts to wait for a set time before activating a rule.

The same example rule in stream mode activates the fire alarm as usual, but applies a 10-second delay.

Fire alarm rule with a negative pattern and time delay (stream mode only)

rule "Sound the alarm"
when
  $f : FireDetected()
  not(SprinklerActivated(this after[0s,10s] $f))
then
  // Sound the alarm.
end

The following modified fire alarm rule expects one Heartbeat event to occur every 10 seconds. If the expected event does not occur, the rule is executed. This rule uses the same type of object in both the first pattern and in the negative pattern. The negative pattern has the temporal constraint to wait 0 to 10 seconds before executing and excludes the Heartbeat event bound to $h so that the rule can be executed. The bound event $h must be explicitly excluded in order for the rule to be executed because the temporal constraint [0s, …​] does not inherently exclude that event from being matched again.

Fire alarm rule excluding a bound event in a negative pattern (stream mode only)

rule "Sound the alarm"
when
  $h: Heartbeat() from entry-point "MonitoringStream"
  not(Heartbeat(this != $h, this after[0s,10s] $h) from entry-point "MonitoringStream")
then
  // Sound the alarm.
end

6.5. Property-change settings and listeners for fact types

By default, the decision engine does not re-evaluate all fact patterns for fact types each time a rule is triggered, but instead reacts only to modified properties that are constrained or bound inside a given pattern. For example, if a rule calls modify() as part of the rule actions but the action does not generate new data in the KIE base, the decision engine does not automatically re-evaluate all fact patterns because no data was modified. This property reactivity behavior prevents unwanted recursions in the KIE base and results in more efficient rule evaluation. This behavior also means that you do not always need to use the no-loop rule attribute to avoid infinite recursion.

You can modify or disable this property reactivity behavior with the following KnowledgeBuilderConfiguration options, and then use a property-change setting in your Java class or DRL files to fine-tune property reactivity as needed:

  • ALWAYS: (Default) All types are property reactive, but you can disable property reactivity for a specific type by using the @classReactive property-change setting.
  • ALLOWED: No types are property reactive, but you can enable property reactivity for a specific type by using the @propertyReactive property-change setting.
  • DISABLED: No types are property reactive. All property-change listeners are ignored.

Example property reactivity setting in KnowledgeBuilderConfiguration

KnowledgeBuilderConfiguration config = KnowledgeBuilderFactory.newKnowledgeBuilderConfiguration();
config.setOption(PropertySpecificOption.ALLOWED);
KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder(config);

Alternatively, you can update the drools.propertySpecific system property in the standalone.xml file of your Red Hat Decision Manager distribution:

Example property reactivity setting in system properties

<system-properties>
  ...
  <property name="drools.propertySpecific" value="ALLOWED"/>
  ...
</system-properties>

The decision engine supports the following property-change settings and listeners for fact classes or declared DRL fact types:

@classReactive

If property reactivity is set to ALWAYS in the decision engine (all types are property reactive), this tag disables the default property reactivity behavior for a specific Java class or a declared DRL fact type. You can use this tag if you want the decision engine to re-evaluate all fact patterns for the specified fact type each time the rule is triggered, instead of reacting only to modified properties that are constrained or bound inside a given pattern.

Example: Disable default property reactivity in a DRL type declaration

declare Person
  @classReactive
    firstName : String
    lastName : String
end

Example: Disable default property reactivity in a Java class

@classReactive
public static class Person {
    private String firstName;
    private String lastName;
}

@propertyReactive

If property reactivity is set to ALLOWED in the decision engine (no types are property reactive unless specified), this tag enables property reactivity for a specific Java class or a declared DRL fact type. You can use this tag if you want the decision engine to react only to modified properties that are constrained or bound inside a given pattern for the specified fact type, instead of re-evaluating all fact patterns for the fact each time the rule is triggered.

Example: Enable property reactivity in a DRL type declaration (when reactivity is disabled globally)

declare Person
  @propertyReactive
    firstName : String
    lastName : String
end

Example: Enable property reactivity in a Java class (when reactivity is disabled globally)

@propertyReactive
public static class Person {
    private String firstName;
    private String lastName;
}

@watch

This tag enables property reactivity for additional properties that you specify in-line in fact patterns in DRL rules. This tag is supported only if property reactivity is set to ALWAYS in the decision engine, or if property reactivity is set to ALLOWED and the relevant fact type uses the @propertyReactive tag. You can use this tag in DRL rules to add or exclude specific properties in fact property reactivity logic.

Default parameter: None

Supported parameters: Property name, * (all), ! (not), !* (no properties)

<factPattern> @watch ( <property> )

Example: Enable or disable property reactivity in fact patterns

// Listens for changes in both `firstName` (inferred) and `lastName`:
Person(firstName == $expectedFirstName) @watch( lastName )

// Listens for changes in all properties of the `Person` fact:
Person(firstName == $expectedFirstName) @watch( * )

// Listens for changes in `lastName` and explicitly excludes changes in `firstName`:
Person(firstName == $expectedFirstName) @watch( lastName, !firstName )

// Listens for changes in all properties of the `Person` fact except `age`:
Person(firstName == $expectedFirstName) @watch( *, !age )

// Excludes changes in all properties of the `Person` fact (equivalent to using `@classReactivity` tag):
Person(firstName == $expectedFirstName) @watch( !* )

The decision engine generates a compilation error if you use the @watch tag for properties in a fact type that uses the @classReactive tag (disables property reactivity) or when property reactivity is set to ALLOWED in the decision engine and the relevant fact type does not use the @propertyReactive tag. Compilation errors also arise if you duplicate properties in listener annotations, such as @watch( firstName, ! firstName ).

@propertyChangeSupport

For facts that implement support for property changes as defined in the JavaBeans Specification, this tag enables the decision engine to monitor changes in the fact properties.

Example: Declare property change support in JavaBeans object

declare Person
    @propertyChangeSupport
end

6.6. Temporal operators for events

In stream mode, the decision engine supports the following temporal operators for events that are inserted into the working memory of the decision engine. You can use these operators to define the temporal reasoning behavior of the events that you declare in your Java class or DRL rule file. Temporal operators are not supported when the decision engine is running in cloud mode.

  • after
  • before
  • coincides
  • during
  • includes
  • finishes
  • finished by
  • meets
  • met by
  • overlaps
  • overlapped by
  • starts
  • started by

    after

    This operator specifies if the current event occurs after the correlated event. This operator can also define an amount of time after which the current event can follow the correlated event, or a delimiting time range during which the current event can follow the correlated event.

    For example, the following pattern matches if $eventA starts between 3 minutes and 30 seconds and 4 minutes after $eventB finishes. If $eventA starts earlier than 3 minutes and 30 seconds after $eventB finishes, or later than 4 minutes after $eventB finishes, then the pattern is not matched.

    $eventA : EventA(this after[3m30s, 4m] $eventB)

    You can also express this operator in the following way:

    3m30s <= $eventA.startTimestamp - $eventB.endTimeStamp <= 4m

    The after operator supports up to two parameter values:

    • If two values are defined, the interval starts on the first value (3 minutes and 30 seconds in the example) and ends on the second value (4 minutes in the example).
    • If only one value is defined, the interval starts on the provided value and runs indefinitely with no end time.
    • If no value is defined, the interval starts at 1 millisecond and runs indefinitely with no end time.

    The after operator also supports negative time ranges:

    $eventA : EventA(this after[-3m30s, -2m] $eventB)

    If the first value is greater than the second value, the decision engine automatically reverses them. For example, the following two patterns are interpreted by the decision engine in the same way:

    $eventA : EventA(this after[-3m30s, -2m] $eventB)
    $eventA : EventA(this after[-2m, -3m30s] $eventB)
    before

    This operator specifies if the current event occurs before the correlated event. This operator can also define an amount of time before which the current event can precede the correlated event, or a delimiting time range during which the current event can precede the correlated event.

    For example, the following pattern matches if $eventA finishes between 3 minutes and 30 seconds and 4 minutes before $eventB starts. If $eventA finishes earlier than 3 minutes and 30 seconds before $eventB starts, or later than 4 minutes before $eventB starts, then the pattern is not matched.

    $eventA : EventA(this before[3m30s, 4m] $eventB)

    You can also express this operator in the following way:

    3m30s <= $eventB.startTimestamp - $eventA.endTimeStamp <= 4m

    The before operator supports up to two parameter values:

    • If two values are defined, the interval starts on the first value (3 minutes and 30 seconds in the example) and ends on the second value (4 minutes in the example).
    • If only one value is defined, the interval starts on the provided value and runs indefinitely with no end time.
    • If no value is defined, the interval starts at 1 millisecond and runs indefinitely with no end time.

    The before operator also supports negative time ranges:

    $eventA : EventA(this before[-3m30s, -2m] $eventB)

    If the first value is greater than the second value, the decision engine automatically reverses them. For example, the following two patterns are interpreted by the decision engine in the same way:

    $eventA : EventA(this before[-3m30s, -2m] $eventB)
    $eventA : EventA(this before[-2m, -3m30s] $eventB)
    coincides

    This operator specifies if the two events occur at the same time, with the same start and end times.

    For example, the following pattern matches if both the start and end time stamps of $eventA and $eventB are identical:

    $eventA : EventA(this coincides $eventB)

    The coincides operator supports up to two parameter values for the distance between the event start and end times, if they are not identical:

    • If only one parameter is given, the parameter is used to set the threshold for both the start and end times of both events.
    • If two parameters are given, the first is used as a threshold for the start time and the second is used as a threshold for the end time.

    The following pattern uses start and end time thresholds:

    $eventA : EventA(this coincides[15s, 10s] $eventB)

    The pattern matches if the following conditions are met:

    abs($eventA.startTimestamp - $eventB.startTimestamp) <= 15s
    &&
    abs($eventA.endTimestamp - $eventB.endTimestamp) <= 10s
    Warning

    The decision engine does not support negative intervals for the coincides operator. If you use negative intervals, the decision engine generates an error.

    during

    This operator specifies if the current event occurs within the time frame of when the correlated event starts and ends. The current event must start after the correlated event starts and must end before the correlated event ends. (With the coincides operator, the start and end times are the same or nearly the same.)

    For example, the following pattern matches if $eventA starts after $eventB starts and ends before $eventB ends:

    $eventA : EventA(this during $eventB)

    You can also express this operator in the following way:

    $eventB.startTimestamp < $eventA.startTimestamp <= $eventA.endTimestamp < $eventB.endTimestamp

    The during operator supports one, two, or four optional parameters:

    • If one value is defined, this value is the maximum distance between the start times of the two events and the maximum distance between the end times of the two events.
    • If two values are defined, these values are a threshold between which the current event start time and end time must occur in relation to the correlated event start and end times.

      For example, if the values are 5s and 10s, the current event must start between 5 and 10 seconds after the correlated event starts and must end between 5 and 10 seconds before the correlated event ends.

    • If four values are defined, the first and second values are the minimum and maximum distances between the start times of the events, and the third and fourth values are the minimum and maximum distances between the end times of the two events.
    includes

    This operator specifies if the correlated event occurs within the time frame of when the current event occurs. The correlated event must start after the current event starts and must end before the current event ends. (The behavior of this operator is the reverse of the during operator behavior.)

    For example, the following pattern matches if $eventB starts after $eventA starts and ends before $eventA ends:

    $eventA : EventA(this includes $eventB)

    You can also express this operator in the following way:

    $eventA.startTimestamp < $eventB.startTimestamp <= $eventB.endTimestamp < $eventA.endTimestamp

    The includes operator supports one, two, or four optional parameters:

    • If one value is defined, this value is the maximum distance between the start times of the two events and the maximum distance between the end times of the two events.
    • If two values are defined, these values are a threshold between which the correlated event start time and end time must occur in relation to the current event start and end times.

      For example, if the values are 5s and 10s, the correlated event must start between 5 and 10 seconds after the current event starts and must end between 5 and 10 seconds before the current event ends.

    • If four values are defined, the first and second values are the minimum and maximum distances between the start times of the events, and the third and fourth values are the minimum and maximum distances between the end times of the two events.
    finishes

    This operator specifies if the current event starts after the correlated event but both events end at the same time.

    For example, the following pattern matches if $eventA starts after $eventB starts and ends at the same time when $eventB ends:

    $eventA : EventA(this finishes $eventB)

    You can also express this operator in the following way:

    $eventB.startTimestamp < $eventA.startTimestamp
    &&
    $eventA.endTimestamp == $eventB.endTimestamp

    The finishes operator supports one optional parameter that sets the maximum time allowed between the end times of the two events:

    $eventA : EventA(this finishes[5s] $eventB)

    This pattern matches if these conditions are met:

    $eventB.startTimestamp < $eventA.startTimestamp
    &&
    abs($eventA.endTimestamp - $eventB.endTimestamp) <= 5s
    Warning

    The decision engine does not support negative intervals for the finishes operator. If you use negative intervals, the decision engine generates an error.

    finished by

    This operator specifies if the correlated event starts after the current event but both events end at the same time. (The behavior of this operator is the reverse of the finishes operator behavior.)

    For example, the following pattern matches if $eventB starts after $eventA starts and ends at the same time when $eventA ends:

    $eventA : EventA(this finishedby $eventB)

    You can also express this operator in the following way:

    $eventA.startTimestamp < $eventB.startTimestamp
    &&
    $eventA.endTimestamp == $eventB.endTimestamp

    The finished by operator supports one optional parameter that sets the maximum time allowed between the end times of the two events:

    $eventA : EventA(this finishedby[5s] $eventB)

    This pattern matches if these conditions are met:

    $eventA.startTimestamp < $eventB.startTimestamp
    &&
    abs($eventA.endTimestamp - $eventB.endTimestamp) <= 5s
    Warning

    The decision engine does not support negative intervals for the finished by operator. If you use negative intervals, the decision engine generates an error.

    meets

    This operator specifies if the current event ends at the same time when the correlated event starts.

    For example, the following pattern matches if $eventA ends at the same time when $eventB starts:

    $eventA : EventA(this meets $eventB)

    You can also express this operator in the following way:

    abs($eventB.startTimestamp - $eventA.endTimestamp) == 0

    The meets operator supports one optional parameter that sets the maximum time allowed between the end time of the current event and the start time of the correlated event:

    $eventA : EventA(this meets[5s] $eventB)

    This pattern matches if these conditions are met:

    abs($eventB.startTimestamp - $eventA.endTimestamp) <= 5s
    Warning

    The decision engine does not support negative intervals for the meets operator. If you use negative intervals, the decision engine generates an error.

    met by

    This operator specifies if the correlated event ends at the same time when the current event starts. (The behavior of this operator is the reverse of the meets operator behavior.)

    For example, the following pattern matches if $eventB ends at the same time when $eventA starts:

    $eventA : EventA(this metby $eventB)

    You can also express this operator in the following way:

    abs($eventA.startTimestamp - $eventB.endTimestamp) == 0

    The met by operator supports one optional parameter that sets the maximum distance between the end time of the correlated event and the start time of the current event:

    $eventA : EventA(this metby[5s] $eventB)

    This pattern matches if these conditions are met:

    abs($eventA.startTimestamp - $eventB.endTimestamp) <= 5s
    Warning

    The decision engine does not support negative intervals for the met by operator. If you use negative intervals, the decision engine generates an error.

    overlaps

    This operator specifies if the current event starts before the correlated event starts and it ends during the time frame that the correlated event occurs. The current event must end between the start and end times of the correlated event.

    For example, the following pattern matches if $eventA starts before $eventB starts and then ends while $eventB occurs, before $eventB ends:

    $eventA : EventA(this overlaps $eventB)

    The overlaps operator supports up to two parameters:

    • If one parameter is defined, the value is the maximum distance between the start time of the correlated event and the end time of the current event.
    • If two parameters are defined, the values are the minimum distance (first value) and the maximum distance (second value) between the start time of the correlated event and the end time of the current event.
    overlapped by

    This operator specifies if the correlated event starts before the current event starts and it ends during the time frame that the current event occurs. The correlated event must end between the start and end times of the current event. (The behavior of this operator is the reverse of the overlaps operator behavior.)

    For example, the following pattern matches if $eventB starts before $eventA starts and then ends while $eventA occurs, before $eventA ends:

    $eventA : EventA(this overlappedby $eventB)

    The overlapped by operator supports up to two parameters:

    • If one parameter is defined, the value is the maximum distance between the start time of the current event and the end time of the correlated event.
    • If two parameters are defined, the values are the minimum distance (first value) and the maximum distance (second value) between the start time of the current event and the end time of the correlated event.
    starts

    This operator specifies if the two events start at the same time but the current event ends before the correlated event ends.

    For example, the following pattern matches if $eventA and $eventB start at the same time, and $eventA ends before $eventB ends:

    $eventA : EventA(this starts $eventB)

    You can also express this operator in the following way:

    $eventA.startTimestamp == $eventB.startTimestamp
    &&
    $eventA.endTimestamp < $eventB.endTimestamp

    The starts operator supports one optional parameter that sets the maximum distance between the start times of the two events:

    $eventA : EventA(this starts[5s] $eventB)

    This pattern matches if these conditions are met:

    abs($eventA.startTimestamp - $eventB.startTimestamp) <= 5s
    &&
    $eventA.endTimestamp < $eventB.endTimestamp
    Warning

    The decision engine does not support negative intervals for the starts operator. If you use negative intervals, the decision engine generates an error.

    started by

    This operator specifies if the two events start at the same time but the correlated event ends before the current event ends. (The behavior of this operator is the reverse of the starts operator behavior.)

    For example, the following pattern matches if $eventA and $eventB start at the same time, and $eventB ends before $eventA ends:

    $eventA : EventA(this startedby $eventB)

    You can also express this operator in the following way:

    $eventA.startTimestamp == $eventB.startTimestamp
    &&
    $eventA.endTimestamp > $eventB.endTimestamp

    The started by operator supports one optional parameter that sets the maximum distance between the start times of the two events:

    $eventA : EventA( this starts[5s] $eventB)

    This pattern matches if these conditions are met:

    abs( $eventA.startTimestamp - $eventB.startTimestamp ) <= 5s
    &&
    $eventA.endTimestamp > $eventB.endTimestamp
    Warning

    The decision engine does not support negative intervals for the started by operator. If you use negative intervals, the decision engine generates an error.

6.7. Session clock implementations in the decision engine

During complex event processing, events in the decision engine may have temporal constraints and therefore require a session clock that provides the current time. For example, if a rule needs to determine the average price of a given stock over the last 60 minutes, the decision engine must be able to compare the stock price event time stamp with the current time in the session clock.

The decision engine supports a real-time clock and a pseudo clock. You can use one or both clock types depending on the scenario:

  • Rules testing: Testing requires a controlled environment, and when the tests include rules with temporal constraints, you must be able to control the input rules and facts and the flow of time.
  • Regular execution: The decision engine reacts to events in real time and therefore requires a real-time clock.
  • Special environments: Specific environments may have specific time control requirements. For example, clustered environments may require clock synchronization or Java Enterprise Edition (JEE) environments may require a clock provided by the application server.
  • Rules replay or simulation: In order to replay or simulate scenarios, the application must be able to control the flow of time.

Consider your environment requirements as you decide whether to use a real-time clock or pseudo clock in the decision engine.

Real-time clock

The real-time clock is the default clock implementation in the decision engine and uses the system clock to determine the current time for time stamps. To configure the decision engine to use the real-time clock, set the KIE session configuration parameter to realtime:

Configure real-time clock in KIE session

import org.kie.api.KieServices.Factory;
import org.kie.api.runtime.conf.ClockTypeOption;
import org.kie.api.runtime.KieSessionConfiguration;

KieSessionConfiguration config = KieServices.Factory.get().newKieSessionConfiguration();

config.setOption(ClockTypeOption.get("realtime"));

Pseudo clock

The pseudo clock implementation in the decision engine is helpful for testing temporal rules and it can be controlled by the application. To configure the decision engine to use the pseudo clock, set the KIE session configuration parameter to pseudo:

Configure pseudo clock in KIE session

import org.kie.api.runtime.conf.ClockTypeOption;
import org.kie.api.runtime.KieSessionConfiguration;
import org.kie.api.KieServices.Factory;

KieSessionConfiguration config = KieServices.Factory.get().newKieSessionConfiguration();

config.setOption(ClockTypeOption.get("pseudo"));

You can also use additional configurations and fact handlers to control the pseudo clock:

Control pseudo clock behavior in KIE session

import java.util.concurrent.TimeUnit;

import org.kie.api.runtime.KieSessionConfiguration;
import org.kie.api.KieServices.Factory;
import org.kie.api.runtime.KieSession;
import org.drools.core.time.SessionPseudoClock;
import org.kie.api.runtime.rule.FactHandle;
import org.kie.api.runtime.conf.ClockTypeOption;

KieSessionConfiguration conf = KieServices.Factory.get().newKieSessionConfiguration();

conf.setOption( ClockTypeOption.get("pseudo"));
KieSession session = kbase.newKieSession(conf, null);

SessionPseudoClock clock = session.getSessionClock();

// While inserting facts, advance the clock as necessary.
FactHandle handle1 = session.insert(tick1);
clock.advanceTime(10, TimeUnit.SECONDS);

FactHandle handle2 = session.insert(tick2);
clock.advanceTime(30, TimeUnit.SECONDS);

FactHandle handle3 = session.insert(tick3);

6.8. Event streams and entry points

The decision engine can process high volumes of events in the form of event streams. In DRL rule declarations, a stream is also known as an entry point. When you declare an entry point in a DRL rule or Java application, the decision engine, at compile time, identifies and creates the proper internal structures to use data from only that entry point to evaluate that rule.

Facts from one entry point, or stream, can join facts from any other entry point in addition to facts already in the working memory of the decision engine. Facts always remain associated with the entry point through which they entered the decision engine. Facts of the same type can enter the decision engine through several entry points, but facts that enter the decision engine through entry point A can never match a pattern from entry point B.

Event streams have the following characteristics:

  • Events in the stream are ordered by time stamp. The time stamps may have different semantics for different streams, but they are always ordered internally.
  • Event streams usually have a high volume of events.
  • Atomic events in streams are usually not useful individually, only collectively in a stream.
  • Event streams can be homogeneous and contain a single type of event, or heterogeneous and contain events of different types.

6.8.1. Declaring entry points for rule data

You can declare an entry point (event stream) for events so that the decision engine uses data from only that entry point to evaluate the rules. You can declare an entry point either implicitly by referencing it in DRL rules or explicitly in your Java application.

Procedure

Use one of the following methods to declare the entry point:

  • In the DRL rule file, specify from entry-point "<name>" for the inserted fact:

    Authorize withdrawal rule with "ATM Stream" entry point

    rule "Authorize withdrawal"
    when
      WithdrawRequest($ai : accountId, $am : amount) from entry-point "ATM Stream"
      CheckingAccount(accountId == $ai, balance > $am)
    then
      // Authorize withdrawal.
    end

    Apply fee rule with "Branch Stream" entry point

    rule "Apply fee on withdraws on branches"
    when
      WithdrawRequest($ai : accountId, processed == true) from entry-point "Branch Stream"
      CheckingAccount(accountId == $ai)
    then
      // Apply a $2 fee on the account.
    end

    Both example DRL rules from a banking application insert the event WithdrawalRequest with the fact CheckingAccount, but from different entry points. At run time, the decision engine evaluates the Authorize withdrawal rule using data from only the "ATM Stream" entry point, and evaluates the Apply fee rule using data from only the "Branch Stream" entry point. Any events inserted into the "ATM Stream" can never match patterns for the "Apply fee" rule, and any events inserted into the "Branch Stream" can never match patterns for the "Authorize withdrawal rule".

  • In the Java application code, use the getEntryPoint() method to specify and obtain an EntryPoint object and insert facts into that entry point accordingly:

    Java application code with EntryPoint object and inserted facts

    import org.kie.api.runtime.KieSession;
    import org.kie.api.runtime.rule.EntryPoint;
    
    // Create your KIE base and KIE session as usual.
    KieSession session = ...
    
    // Create a reference to the entry point.
    EntryPoint atmStream = session.getEntryPoint("ATM Stream");
    
    // Start inserting your facts into the entry point.
    atmStream.insert(aWithdrawRequest);

    Any DRL rules that specify from entry-point "ATM Stream" are then evaluated based on the data in this entry point only.

6.9. Sliding windows of time or length

In stream mode, the decision engine can process events from a specified sliding window of time or length. A sliding time window is a specified period of time during which events can be processed. A sliding length window is a specified number of events that can be processed. When you declare a sliding window in a DRL rule or Java application, the decision engine, at compile time, identifies and creates the proper internal structures to use data from only that sliding window to evaluate that rule.

For example, the following DRL rule snippets instruct the decision engine to process only the stock points from the last 2 minutes (sliding time window) or to process only the last 10 stock points (sliding length window):

Process stock points from the last 2 minutes (sliding time window)

StockPoint() over window:time(2m)

Process the last 10 stock points (sliding length window)

StockPoint() over window:length(10)

6.9.1. Declaring sliding windows for rule data

You can declare a sliding window of time (flow of time) or length (number of occurrences) for events so that the decision engine uses data from only that window to evaluate the rules.

Procedure

In the DRL rule file, specify over window:<time_or_length>(<value>) for the inserted fact.

For example, the following two DRL rules activate a fire alarm based on an average temperature. However, the first rule uses a sliding time window to calculate the average over the last 10 minutes while the second rule uses a sliding length window to calculate the average over the last one hundred temperature readings.

Average temperature over sliding time window

rule "Sound the alarm if temperature rises above threshold"
when
  TemperatureThreshold($max : max)
  Number(doubleValue > $max) from accumulate(
    SensorReading($temp : temperature) over window:time(10m),
    average($temp))
then
  // Sound the alarm.
end

Average temperature over sliding length window

rule "Sound the alarm if temperature rises above threshold"
when
  TemperatureThreshold($max : max)
  Number(doubleValue > $max) from accumulate(
    SensorReading($temp : temperature) over window:length(100),
    average($temp))
then
  // Sound the alarm.
end

The decision engine discards any SensorReading events that are more than 10 minutes old or that are not part of the last one hundred readings, and continues recalculating the average as the minutes or readings "slide" forward in real time.

The decision engine does not automatically remove outdated events from the KIE session because other rules without sliding window declarations might depend on those events. The decision engine stores events in the KIE session until the events expire either by explicit rule declarations or by implicit reasoning within the decision engine based on inferred data in the KIE base.

6.10. Memory management for events

In stream mode, the decision engine uses automatic memory management to maintain events that are stored in KIE sessions. The decision engine can retract from a KIE session any events that no longer match any rule due to their temporal constraints and release any resources held by the retracted events.

The decision engine uses either explicit or inferred expiration to retract outdated events:

  • Explicit expiration: The decision engine removes events that are explicitly set to expire in rules that declare the @expires tag:

    DRL rule snippet with explicit expiration

    declare StockPoint
      @expires( 30m )
    end

    This example rule sets any StockPoint events to expire after 30 minutes and to be removed from the KIE session if no other rules use the events.

  • Inferred expiration: The decision engine can calculate the expiration offset for a given event implicitly by analyzing the temporal constraints in the rules:

    DRL rule with temporal constraints

    rule "Correlate orders"
    when
      $bo : BuyOrder($id : id)
      $ae : AckOrder(id == $id, this after[0,10s] $bo)
    then
      // Perform an action.
    end

    For this example rule, the decision engine automatically calculates that whenever a BuyOrder event occurs, the decision engine needs to store the event for up to 10 seconds and wait for the matching AckOrder event. After 10 seconds, the decision engine infers the expiration and removes the event from the KIE session. An AckOrder event can only match an existing BuyOrder event, so the decision engine infers the expiration if no match occurs and removes the event immediately.

    The decision engine analyzes the entire KIE base to find the offset for every event type and to ensure that no other rules use the events that are pending removal. Whenever an implicit expiration clashes with an explicit expiration value, the decision engine uses the greater time frame of the two to store the event longer.

Red Hat logoGithubRedditYoutubeTwitter

Learn

Try, buy, & sell

Communities

About Red Hat Documentation

We help Red Hat users innovate and achieve their goals with our products and services with content they can trust.

Making open source more inclusive

Red Hat is committed to replacing problematic language in our code, documentation, and web properties. For more details, see the Red Hat Blog.

About Red Hat

We deliver hardened solutions that make it easier for enterprises to work across platforms and environments, from the core datacenter to the network edge.

© 2024 Red Hat, Inc.