from kfp import components, dsl
def ray_fn(openshift_server: str, openshift_token: str) -> int:
import ray
from codeflare_sdk.cluster.auth import TokenAuthentication
from codeflare_sdk.cluster.cluster import Cluster, ClusterConfiguration
auth = TokenAuthentication(
token=openshift_token, server=openshift_server, skip_tls=True
)
auth_return = auth.login()
cluster = Cluster(
ClusterConfiguration(
name="raytest",
# namespace must exist
namespace="pipeline-example",
num_workers=1,
head_cpus="500m",
min_memory=1,
max_memory=1,
num_gpus=0,
image="quay.io/project-codeflare/ray:latest-py39-cu118",
instascale=False,
)
)
print(cluster.status())
cluster.up()
cluster.wait_ready()
print(cluster.status())
print(cluster.details())
ray_dashboard_uri = cluster.cluster_dashboard_uri()
ray_cluster_uri = cluster.cluster_uri()
print(ray_dashboard_uri, ray_cluster_uri)
# Before proceeding, ensure that the cluster exists and that its URI contains a value
assert ray_cluster_uri, "Ray cluster must be started and set before proceeding"
ray.init(address=ray_cluster_uri)
print("Ray cluster is up and running: ", ray.is_initialized())
@ray.remote
def train_fn():
# complex training function
return 100
result = ray.get(train_fn.remote())
assert 100 == result
ray.shutdown()
cluster.down()
auth.logout()
return result
@dsl.pipeline(
name="Ray Simple Example",
description="Ray Simple Example",
)
def ray_integration(openshift_server, openshift_token):
ray_op = components.create_component_from_func(
ray_fn,
base_image='registry.redhat.io/ubi8/python-39:latest',
packages_to_install=["codeflare-sdk"],
)
ray_op(openshift_server, openshift_token)
if __name__ == '__main__':
from kfp_tekton.compiler import TektonCompiler
TektonCompiler().compile(ray_integration, 'compiled-example.yaml')
from kfp import components, dsl
def ray_fn(openshift_server: str, openshift_token: str) -> int:
1
import ray
from codeflare_sdk.cluster.auth import TokenAuthentication
from codeflare_sdk.cluster.cluster import Cluster, ClusterConfiguration
auth = TokenAuthentication(
2
token=openshift_token, server=openshift_server, skip_tls=True
)
auth_return = auth.login()
cluster = Cluster(
3
ClusterConfiguration(
name="raytest",
# namespace must exist
namespace="pipeline-example",
num_workers=1,
head_cpus="500m",
min_memory=1,
max_memory=1,
num_gpus=0,
image="quay.io/project-codeflare/ray:latest-py39-cu118",
4
instascale=False,
5
)
)
print(cluster.status())
cluster.up()
6
cluster.wait_ready()
7
print(cluster.status())
print(cluster.details())
ray_dashboard_uri = cluster.cluster_dashboard_uri()
ray_cluster_uri = cluster.cluster_uri()
print(ray_dashboard_uri, ray_cluster_uri)
# Before proceeding, ensure that the cluster exists and that its URI contains a value
assert ray_cluster_uri, "Ray cluster must be started and set before proceeding"
ray.init(address=ray_cluster_uri)
print("Ray cluster is up and running: ", ray.is_initialized())
@ray.remote
def train_fn():
8
# complex training function
return 100
result = ray.get(train_fn.remote())
assert 100 == result
ray.shutdown()
cluster.down()
9
auth.logout()
return result
@dsl.pipeline(
10
name="Ray Simple Example",
description="Ray Simple Example",
)
def ray_integration(openshift_server, openshift_token):
ray_op = components.create_component_from_func(
ray_fn,
base_image='registry.redhat.io/ubi8/python-39:latest',
packages_to_install=["codeflare-sdk"],
)
ray_op(openshift_server, openshift_token)
if __name__ == '__main__':
11
from kfp_tekton.compiler import TektonCompiler
TektonCompiler().compile(ray_integration, 'compiled-example.yaml')