3.2. 从数据科学管道运行分布式数据科学工作负载
要从管道运行分布式工作负载,您必须首先更新管道,使其包含指向您的 Ray 集群镜像的链接。
先决条件
- 您可以访问配置为运行分布式工作负载的数据科学项目,如 管理分布式工作负载 中所述。
您可以从数据科学集群访问以下软件:
- 与硬件架构兼容的 Ray 集群镜像
- 工作负载使用的数据集和模型
- 工作负载的 Python 依赖项,可以在 Ray 镜像或您自己的 Python Package Index (PyPI)服务器中
- 您可以访问包含工作台的数据科学项目,工作台正在运行包含 CodeFlare SDK 的默认工作台镜像,如 Standard Data Science workbench。有关项目和工作台 的详情,请参考使用数据科学项目。
具有数据科学项目的管理员访问权限。
- 如果创建项目,则会自动具有管理员访问权限。
- 如果没有创建项目,您的集群管理员必须授予管理员访问权限。
- 您可以访问 S3 兼容对象存储。
- 您已登陆到 Red Hat OpenShift AI。
流程
- 创建连接以将对象存储连接到您的数据科学项目,如添加与 数据科学项目的连接 中所述。
- 将管道服务器配置为使用连接,如 配置管道服务器 中所述。
创建数据科学管道,如下所示:
安装所有管道所需的
kfpPython 软件包:$ pip install kfp- 安装管道所需的任何其他依赖项。
在 Python 代码中构建您的数据科学管道。
例如,使用以下内容创建名为
compile_example.py的文件:from kfp import dsl @dsl.component( base_image="registry.redhat.io/ubi9/python-311:latest", packages_to_install=['codeflare-sdk'] ) def ray_fn(): import ray1 from codeflare_sdk import Cluster, ClusterConfiguration, generate_cert2 cluster = Cluster(3 ClusterConfiguration( namespace="my_project",4 name="raytest", num_workers=1, head_cpu_requests="500m", head_cpu_limits="500m", worker_memory_requests=1, worker_memory_limits=1, worker_extended_resource_requests={"nvidia.com/gpu": 1},5 image="quay.io/modh/ray:2.47.1-py311-cu121",6 local_queue="local_queue_name",7 ) ) print(cluster.status()) cluster.up()8 cluster.wait_ready()9 print(cluster.status()) print(cluster.details()) ray_dashboard_uri = cluster.cluster_dashboard_uri() ray_cluster_uri = cluster.cluster_uri() print(ray_dashboard_uri, ray_cluster_uri) # Enable Ray client to connect to secure Ray cluster that has mTLS enabled generate_cert.generate_tls_cert(cluster.config.name, cluster.config.namespace)10 generate_cert.export_env(cluster.config.name, cluster.config.namespace) ray.init(address=ray_cluster_uri) print("Ray cluster is up and running: ", ray.is_initialized()) @ray.remote def train_fn():11 # complex training function return 100 result = ray.get(train_fn.remote()) assert 100 == result ray.shutdown() cluster.down()12 auth.logout() return result @dsl.pipeline(13 name="Ray Simple Example", description="Ray Simple Example", ) def ray_integration(): ray_fn() if __name__ == '__main__':14 from kfp.compiler import Compiler Compiler().compile(ray_integration, 'compiled-example.yaml')- 1
- 导入 Ray。
- 2
- 从 CodeFlare SDK 中导入软件包以定义集群功能。
- 3
- 指定 Ray 集群配置:将这些示例值替换为 Ray 集群的值。
- 4
- 可选:指定创建 Ray 集群的项目。将示例值替换为项目的名称。如果省略这一行,则会在当前项目中创建 Ray 集群。
- 5
- 可选:为 Ray 集群指定请求的加速器(本例中为 1 NVIDIA GPU)。如果不使用 NVIDIA GPU,请将
nvidia.com/gpu替换为加速器的正确值;例如,为 AMD GPU 指定amd.com/gpu。如果不需要加速器,请将值设为 0 或省略该行。 - 6
- 指定 Ray 集群镜像的位置。Ray 集群镜像中的 Python 版本必须与工作台中的 Python 版本相同。如果省略此行,则使用默认 CUDA 兼容 Ray 集群镜像之一,具体取决于工作台中检测到的 Python 版本。默认的 Ray 镜像是 AMD64 镜像,可能不适用于其他架构。如果您在断开连接的环境中运行这个代码,请将默认值替换为您的环境的位置。有关最新可用培训镜像及其预安装的软件包的详情,请参考 Red Hat OpenShift AI: 支持的配置。
- 7
- 指定将向其提交 Ray 集群的本地队列。如果配置了默认的本地队列,您可以省略这一行。
- 8
- 使用指定的镜像和配置创建 Ray 集群。
- 9
- 在继续操作前,等待 Ray 集群就绪。
- 10
- 启用 Ray 客户端连接到启用了 mutual Transport Layer Security (mTLS)的安全 Ray 集群。OpenShift AI 中的 CodeFlare 组件中默认启用 mTLS。
- 11
- 将本节中的示例详情替换为您的工作负载详情。
- 12
- 当工作负载完成后,删除 Ray 集群。
- 13
- 将示例名称和描述替换为您的工作负载值。
- 14
- 编译 Python 代码,并将输出保存到 YAML 文件中。
编译 Python 文件(本例中为
compile_example.py文件):$ python compile_example.py此命令创建一个 YAML 文件(本例中为
compiled-example.yaml),您可以在下一步中导入该文件。
- 导入您的数据科学管道,如 导入数据科学管道 中所述。
- 调度管道运行,如 调度管道运行 中所述。
- 当管道运行完成后,确认它包含在触发的管道运行列表中,如 查看管道运行的详情 中所述。
验证
YAML 文件已创建,管道运行会完成且没有错误。
您可以查看运行详情,如 查看管道运行的详情 中所述。