Chapter 6. Understanding compute engines in Feature Store


You can configure compute engines to manage feature pipelines, transformations, and materialization in Red Hat. By integrating your Feature Store with distributed processing frameworks, you can centralize feature management and improve data reusability across your organization.

6.1. Using compute engines in Feature Store

You can use compute engines to run feature pipelines on back ends such as Spark, PyArrow, Pandas, or Ray. These pipelines perform transformations, aggregations, joins, and materializations.

Use the compute engine to build and run directed acyclic graphs (DAGs), for modular and scalable workflows.

Available operations:

  • materialize(): Generate features for offline and online stores in batch and stream modes.
  • get_historical_features(): Retrieve point-in-time training datasets.

Key concepts for compute engines

Understand the following components for better execution of materialization and retrieval tasks:

Expand
ConceptDefinition

Compute engine

The interface for executing materialization and retrieval tasks.

Feature builder

Constructs a Directed Acyclic Graphs (DAG), from a feature view definition for a specific backend.

Feature resolver

Arranges tasks in the correct sequence, so each step runs only after its dependencies.

DAG

A DAG operation, such as read, aggregate, or join.

Execution plan

Runs nodes in the correct sequence and saves the results.

Execution Context

Collects configuration, registry, stores, entity data, and node outputs.

Understanding the feature resolver and builder

The feature builder starts a feature resolver that extracts a DAG from FeatureView definitions, resolving dependencies and ensuring the correct execution order. A FeatureView represents a logical data source, whereas a DataSource represents the physical data source.

When defining a feature view, the source can be a physical data source, a derived feature view, or a list of feature views. Use the feature resolver to organize data sources into a directed acyclic graph (DAG). The resolver identifies node dependencies to generate the final output. The FeatureBuilder then builds DAG node objects for each operation, such as read, join, filter, or aggregate.

Expand
Table 6.1. Supported Compute engines
Compute engineDescription

Spark compute engine

Distributed DAG execution using Apache Spark. Supports point-in-time joins and large-scale materialization. Integrates with Spark Offline Store and Spark materialization job.

Ray compute engine

Provides distributed DAG execution. Enables automatic resource management and optimization. Integrates with Ray Offline Store and Ray Materialization Job.

Local compute engine

Runs on Arrow and a backend you specify (e.g., Pandas, Polars).

Enables local development, testing, or lightweight feature generation.

Supports local materialization job and local historical retrieval job.

Feature builder node details

Use the feature builder to build a directed acyclic graph (DAG) from a feature view definition to determine the operation order. The feature resolver identifies data sources and sorts the nodes to resolve dependencies.

Expand
Table 6.2. Feature builder nodes
Node typeDescription

Source read node

The process begins by reading the data source.

Transformation node or join node

If a feature transformation is defined, the system applies a transformation node. If there are multiple sources the system applies a join node.

Filter node

The system always includes this node to apply time to live (TTL) parameters or user-defined filters.

Aggregation node

The system applies this node if the feature view includes defined aggregations.

Deduplication node

The system applies this node for get_historical_features requests if no aggregation is defined.

Validation node

The system applies this node if enable_validation is set to true.

Output

Use retrieval output for get_historical_features requests. Use online store write or offline store write, for materialize requests.

The Ray compute engine is a distributed compute implementation that uses Ray for executing feature pipelines. This includes transformations, aggregations, joins, and materializations. It provides scalable and efficient distributed processing for both materialize() and get_historical_features() operations.

The Ray RAG template features:

  • Parallel embedding generation: Uses the Ray compute engine to generate embeddings across multiple workers
  • Vector search integration: Works with Milvus for semantic similarity search
  • Complete RAG pipeline: The Ray compute engine automatically distributes the embedding generation across available workers, making it ideal for processing large datasets efficiently

Ray compute engine features:

  • Distributed directed acyclic graphs (DAG) Execution: Executes feature computation DAG across Ray clusters
  • Intelligent Join Strategies: Automatic selection between broadcast and distributed joins
  • Lazy Evaluation: Deferred execution for optimal performance
  • Resource Management: Automatic scaling and resource optimization
  • Point-in-Time Joins: Efficient temporal joins for historical feature retrieval

6.3. Getting started using the Ray template

Use the Ray retrieval-augmented generation (RAG) template to build scalable, high-performance applications. This end-to-end framework enables parallel processing of large datasets, which reduces the processing time and memory intensity required on a single machine.

Prerequisites

  • You have a data science project with an active workbench.
  • Your workbench image includes the Feature Store.

Procedure

  1. Apply the Ray RAG Template

    Run the following code for RAG (Retrieval-Augmented Generation) applications with distributed embedding generation: feast init -t ray_rag my_rag_project cd my_rag_project/feature_repo

Your Ray template is now active.

Configure the Ray compute engine in Feature Store by defining Ray-specific settings in the feature_store.yaml file. This enables distributed execution of feature pipelines for materialization and historical feature retrieval.

Prerequisites

  • Your Ray cluster is running.

Procedure

  1. Configure the Ray compute engine in your feature_store.yaml file:
Expand

YAML

Available options

project: my_project
Copy to Clipboard Toggle word wrap

None

registry: data/registry.db
Copy to Clipboard Toggle word wrap

None

provider: local
Copy to Clipboard Toggle word wrap

None

offline_store:
    type: ray
    storage_path: data/ray_storage
Copy to Clipboard Toggle word wrap

None

batch_engine:
    type: ray.engine
    max_workers: 4
Copy to Clipboard Toggle word wrap

Maximum number of workers

broadcast_join_threshold_mb: 100
Copy to Clipboard Toggle word wrap

Broadcast join threshold (MB)

max_parallelism_multiplier: 2
Copy to Clipboard Toggle word wrap

Parallelism multiplier

target_partition_size_mb: 64
Copy to Clipboard Toggle word wrap

Target partition size (MB)

window_size_for_joins: "1H"
Copy to Clipboard Toggle word wrap

Time window for distributed joins

ray_address: localhost
Copy to Clipboard Toggle word wrap

Ray cluster address

Expand
Table 6.3. Ray Configuration options in Feature Store
OptionTypeDefaultDescription

type

string

ray.engine

Must be ray.engine

max_workers

integer

none (uses all cores)

This enables the maximum number of Ray workers.

enable_optimization

boolean

true

This enables performance optimizations.

broadcast_join_threshold_mb

integer

100

This is the size threshold for broadcast joins (MB).

max_parallelism_multiplier

integer

2

This enables you to run many CPU cores simultaneously.

target_partition_size_mb

integer

64

This allows you to identify a partition size (MB).

window_size_for_joins

string

1H

This enables a time window for distributed joins.

ray_address

string

none

This enables the Ray cluster address, which triggers the remote mode.

use_kuberay

boolean

none

This enables KubeRay mode (overrides ray_address).

kuberay_conf

dictionary

none

This enables KubeRay configuration dictionary with keys: cluster_name (required), namespace (default: default), auth_token, auth_server, skip_tls (default: false).

enable_ray_logging

boolean

false

This enables Ray progress bars and logging.

enable_distributed_joins

boolean

true

This enables distributed joins for large datasets.

staging_location

string

none

This is the remote path for batch materialization jobs.

ray_conf

dictionary

none

These are Ray configuration parameters such as memory and CPU limits.

You can manage mode detection precedence since the Ray compute engine automatically detects the execution mode:

  • Environment Variables (→) KubeRay mode (if FEAST_RAY_USE_KUBERAY=true)
  • Config kuberay_conf (→) KubeRay mode
  • Config ray_address (→) Remote mode
  • Default (→) Local mode
Note

It is recommended that you use KubeRay mode.

Note

For more information about Ray compute engine usage examples, see Ray compute engine usage examples.

Use Ray directed acyclic graph (DAG) node types to build scalable, high-performance feature generation workflows. Ray optimizes resources and reduces execution time by handling data partitioning and statically allocating buffers.

Ray read node

  1. Reads data from Ray-compatible sources:

    • Supports Parquet, comma-separated values (CSV), and other formats
    • Handles partitioning and schema inference
    • Applies field mappings and filters

Ray join node

  1. Performs distributed joins:

    • Broadcast join: Use for small datasets (<100MB)
    • Distributed join: Use for large datasets with time-based windowing
    • Automatic strategy selection: Based on dataset size and cluster resources

Ray filter node

  1. Applies filters and time-based constraints:

    • Time to live (TTL)-based filtering
    • Timestamp range filtering
    • Custom predicate filtering

Ray aggregation node

  1. Handles feature aggregations:

    • Windowed aggregations
    • Grouped aggregations
    • Custom aggregation functions

Ray transformation node

  1. Applies feature transformations:

    • Row-level transformations
    • Column-level transformations
    • Custom transformation functions

Ray write node

  1. Writes results to various targets:

    • Online stores
    • Offline stores
    • Temporary storage

6.7. Using Ray join strategies in Feature Store

The Ray compute engine automatically selects optimal join strategies:

.Used for small feature datasets:

  • Selects this join automatically when feature data is <100MB.
  • Caches features in Ray’s object store.
  • Distributes entities across a cluster.
  • Copies feature data and sends it to each worker.
  • Uses the distributed window join.
  • Used for large feature datasets.
  • Selects this join automatically when feature data <100MB.
  • Partitions data by time windows.
  • Provides point-in-time joins within each window.
  • Combines results across windows.

Example of using strategy selection logic

def select_join_strategy(feature_size_mb, threshold_mb):
    if feature_size_mb < threshold_mb:
        return "broadcast"
    else:
        return "distributed_windowed"
Copy to Clipboard Toggle word wrap

Ray is a distributed execution engine that scales Feast feature engineering and retrieval-augmented generation (RAG) workloads. By processing large datasets in parallel, Ray accelerates pipelines and reduces costs compared to single-node processing.

Use the Ray automatic optimizations for increased efficiency.

  1. Enabling automatic optimization

    1. The Ray compute engine includes several automatic optimizations:

      • Partition optimization: Automatically determines optimal partition sizes
      • Join strategy selection: Chooses between broadcast and distributed joins
      • Resource allocation: Scales workers based on available resources
      • Memory management: Handles out-of-core processing for large datasets

Manual tuning example

If you have specific workloads that require custom tuning, you can fine-tune performance:

batch_engine:
    type: ray.engine
    # Fine-tuning for high-throughput scenarios
    broadcast_join_threshold_mb: 200      # Larger broadcast threshold
    max_parallelism_multiplier: 1        # Conservative parallelism
    target_partition_size_mb: 512        # Larger partitions
    window_size_for_joins: "2H"          # Larger time windows
Copy to Clipboard Toggle word wrap

You can check cluster resources and monitor job progress when working with the Ray compute engine.

See the following Python example for how to extract monitoring and metrics data:

import ray

# Check cluster resources
resources = ray.cluster_resources()
print(f"Available CPUs: {resources.get('CPU', 0)}")
print(f"Available memory: {resources.get('memory', 0) / 1e9:.2f} GB")

# Monitor job progress
job = store.get_historical_features(...)
# Ray compute engine provides built-in progress tracking
Copy to Clipboard Toggle word wrap

Use the Spark compute engine to run distributed batch materialization and historical retrieval operations. Batch materialization includes materialize and materialize-incremental operations. The engine processes large-scale data from offline stores, such as Snowflake, Google BigQuery, and Apache Spark SQL.

The Spark compute engine can read various data sources and perform distributed or custom transformations. You can use the engine to perform these tasks: * Read from various data sources, such as Apache Spark SQL, Google BigQuery, and Snowflake. * Execute distributed feature transformations and aggregations. * Run custom transformations by using Apache Spark SQL or user-defined functions (UDFs).

Configure the Spark compute engine in Feature Store by defining Spark-specific settings in the feature_store.yaml file or programmatically using a Feast RepoConfig. This enables distributed batch materialization and historical feature retrieval using Spark.

Prerequisites

  • Your Spark cluster is running.

Procedure

  1. Configure the Spark compute engine in your feature_store.yaml file:
...
offline_store:
  type: snowflake.offline
...
batch_engine:
  type: spark.engine
  partitions: 10 # number of partitions when writing to the online or offline store
  spark_conf:
    spark.master: "local[*]"
    spark.app.name: "Feast Spark Engine"
    spark.sql.shuffle.partitions: 100
    spark.executor.memory: "4g"
Copy to Clipboard Toggle word wrap

Configuring the Spark offline store example

You can configure the feature store by using the feature_store.py file. This configuration uses Amazon DynamoDB for the online store and the Spark compute engine for the offline store.

Note

In the following code, replace [YOUR_BUCKET] with the name of your specific S3 bucket.

from feast import FeatureStore, RepoConfig
from feast.repo_config import RegistryConfig
from feast.infra.online_stores.dynamodb import DynamoDBOnlineStoreConfig
from feast.infra.offline_stores.contrib.spark_offline_store.spark import SparkOfflineStoreConfig

repo_config = RepoConfig(
    registry="s3://[YOUR_BUCKET]/feast-registry.db",
    project="feast_repo",
    provider="aws",
    offline_store=SparkOfflineStoreConfig(
      spark_conf={
        "spark.ui.enabled": "false",
        "spark.eventLog.enabled": "false",
        "spark.sql.catalogImplementation": "hive",
        "spark.sql.parser.quotedRegexColumnNames": "true",
        "spark.sql.session.timeZone": "UTC"
      }
    ),
    batch_engine={
      "type": "spark.engine",
      "partitions": 10
    },
    online_store=DynamoDBOnlineStoreConfig(region="us-west-1"),
    entity_key_serialization_version=3
)
store = FeatureStore(config=repo_config)
Copy to Clipboard Toggle word wrap

You can integrate Ray with Spark, cloud storage and feature transformations. This enables distributed processing of large-scale machine learning workloads, from feature engineering to serving. It also enables efficient handling of intensive tasks.

Integrating Ray with the Spark offline store in Feature Store

# Use Ray compute engine with Spark offline store
offline_store:
    type: spark
    spark_conf:
        spark.executor.memory: "4g"
        spark.executor.cores: "2"
batch_engine:
    type: ray.engine
    max_workers: 8
    enable_optimization: true
Copy to Clipboard Toggle word wrap

Integrating Ray with cloud storage in Feature Store

# Use Ray compute engine with cloud storage
offline_store:
    type: ray
    storage_path: s3://my-bucket/feast-data
batch_engine:
    type: ray.engine
    ray_address: "ray://ray-cluster:10001"
    broadcast_join_threshold_mb: 50
Copy to Clipboard Toggle word wrap

Integrating Ray with feature transformations

from feast import FeatureView, Field
from feast.types import Float64
from feast.on_demand_feature_view import on_demand_feature_view

@on_demand_feature_view(
    sources=["driver_stats"],
    schema=[Field(name="trips_per_hour", dtype=Float64)]
)
def trips_per_hour(features_df):
    features_df["trips_per_hour"] = features_df["avg_daily_trips"] / 24
    return features_df

# Ray compute engine handles transformations efficiently
features = store.get_historical_features(
    entity_df=entity_df,
    features=["trips_per_hour:trips_per_hour"]
)
Copy to Clipboard Toggle word wrap

  1. Ray native transformations

If you have distributed transformations that use Ray’s dataset and parallel processing capabilities, use mode=ray in your BatchFeatureView:

# Feature view with Ray transformation mode
document_embeddings_view = BatchFeatureView(
    name="document_embeddings",
    entities=[document],
    mode="ray",  # Enable Ray native transformation
    ttl=timedelta(days=365),
    schema=[
        Field(name="document_id", dtype=String),
        Field(name="embedding", dtype=Array(Float32), vector_index=True),
        Field(name="movie_name", dtype=String),
        Field(name="movie_director", dtype=String),
    ],
    source=movies_source,
    udf=generate_embeddings_ray_native,
    online=True,
)
Copy to Clipboard Toggle word wrap
Red Hat logoGithubredditYoutubeTwitter

Learn

Try, buy, & sell

Communities

About Red Hat Documentation

We help Red Hat users innovate and achieve their goals with our products and services with content they can trust. Explore our recent updates.

Making open source more inclusive

Red Hat is committed to replacing problematic language in our code, documentation, and web properties. For more details, see the Red Hat Blog.

About Red Hat

We deliver hardened solutions that make it easier for enterprises to work across platforms and environments, from the core datacenter to the network edge.

Theme

© 2026 Red Hat
Back to top