Creating agentic AI to deploy ARO cluster using Terraform with Red Hat OpenShift AI on ARO and Azure OpenAI

Learn how to create an artificial intelligence (AI) that can deploy Microsoft Azure Red Hat® OpenShift® clusters using Terraform. Explore prompt-based infrastructure and how it can be used to maintain clusters in your own environment.

This learning path is for operations teams or system administrators

Developers may want to check out Getting started with Red Hat OpenShift AI on developers.redhat.com. 

Get started on developers.redhat.com

Creating deployment agents for agentic AI using Terraform

5 mins

Now, you will create the Azure Red Hat® OpenShift® deployment agent which essentially is the wrapper that dynamically generates Terraform configurations for Azure Red Hat OpenShift deployment. It also manages the complete lifecycle including state management, the Azure authentication, and resource provisioning. In addition to this work, we’ll tackle the orchestration layer of the agent. 

What will you learn?

  • How to create deployment agents for Azure Red Hat OpenShift
  • How to create a simulator for the agent

What do you need before starting?

  • Azure Red Hat OpenShift cluster
  • Azure OpenAI model access
  • Setup file created in Azure
  • OpenAI parser created

Creating the deployment agent

To begin with the deployment agent side of things, you should copy the lines below and save it as deployment.py.

import os
import tempfile
import json
import subprocess
from pathlib import Path
class ARODeploymentAgent:
    def __init__(self, debug=False):
        self.debug = debug
        self.work_dir = Path(tempfile.mkdtemp())
        
        required_env_vars = ["AZURE_SUBSCRIPTION_ID", "AZURE_TENANT_ID", "AZURE_CLIENT_ID", "AZURE_CLIENT_SECRET"]
        missing_vars = [var for var in required_env_vars if not os.getenv(var)]
        if missing_vars:
            raise ValueError(f"Missing required environment variables: {', '.join(missing_vars)}")
        
        self._login_to_azure()        
        self._create_terraform_files()
    def _run_command(self, command, cwd=None, stream_output=False):
        try:
            if stream_output:  
                process = subprocess.Popen(
                    command,
                    shell=True,
                    text=True,
                    stdout=subprocess.PIPE,
                    stderr=subprocess.STDOUT,
                    cwd=cwd or self.work_dir,
                    env=os.environ.copy(),
                    bufsize=1  
                )
                output = ""
                for line in iter(process.stdout.readline, ''):
                    if line:
                        print(line.rstrip())
                        output += line
                process.wait()
                return process.returncode == 0, output
            else:
                result = subprocess.run(
                    command,
                    shell=True,
                    text=True,
                    capture_output=True,
                    cwd=cwd or self.work_dir,
                    env=os.environ.copy()
                )
                return result.returncode == 0, result.stdout if result.returncode == 0 else result.stderr
        except Exception as e:
            return False, str(e)
    
    def _login_to_azure(self):
        cmd = f"az login --service-principal --username \"{os.getenv('AZURE_CLIENT_ID')}\" --password \"{os.getenv('AZURE_CLIENT_SECRET')}\" --tenant \"{os.getenv('AZURE_TENANT_ID')}\""
        self._run_command(cmd)
    
    def _create_terraform_files(self):
        variables_tf = '''variable "cluster_name" {
  type        = string
  description = "ARO cluster name"
}
variable "location" {
  type        = string
  description = "Azure region"
}
variable "aro_version" {
  type        = string
  description = "ARO version"
}
variable "worker_node_count" {
  type        = number
  default     = 3
  description = "Number of worker nodes"
}
variable "worker_vm_size" {
  type        = string
  default     = "Standard_D4s_v3"
  description = "VM size for worker nodes"
}
variable "subscription_id" {
  type        = string
  description = "Azure Subscription ID"
}
variable "api_server_profile" {
  type        = string
  default     = "Public"
  description = "API Server visibility (Public or Private)"
}
variable "ingress_profile" {
  type        = string
  default     = "Public"
  description = "Ingress visibility (Public or Private)"
}'''
        
        main_tf = f'''locals {{
  resource_group_name = "${{var.cluster_name}}-rg"
}}
resource "azurerm_resource_group" "aro" {{
  name     = local.resource_group_name
  location = var.location
}}
resource "azurerm_virtual_network" "aro" {{
  name                = "${{var.cluster_name}}-vnet"
  location            = azurerm_resource_group.aro.location
  resource_group_name = azurerm_resource_group.aro.name
  address_space       = ["10.0.0.0/16"]
}}
resource "azurerm_subnet" "master_subnet" {{
  name                 = "master-subnet"
  resource_group_name  = azurerm_resource_group.aro.name
  virtual_network_name = azurerm_virtual_network.aro.name
  address_prefixes     = ["10.0.0.0/23"]
  service_endpoints    = ["Microsoft.ContainerRegistry"]
  private_endpoint_network_policies_enabled = false
  private_link_service_network_policies_enabled = false
}}
resource "azurerm_subnet" "worker_subnet" {{
  name                 = "worker-subnet"
  resource_group_name  = azurerm_resource_group.aro.name
  virtual_network_name = azurerm_virtual_network.aro.name
  address_prefixes     = ["10.0.2.0/23"]
  service_endpoints    = ["Microsoft.ContainerRegistry"]
  private_endpoint_network_policies_enabled = false
  private_link_service_network_policies_enabled = false
}}
resource "azurerm_redhat_openshift_cluster" "aro" {{
  name                = var.cluster_name
  location            = azurerm_resource_group.aro.location
  resource_group_name = azurerm_resource_group.aro.name
  cluster_profile {{
    domain                     = var.cluster_name
    version                    = var.aro_version
    managed_resource_group_name = "${{azurerm_resource_group.aro.name}}-managed"
  }}
  network_profile {{
    pod_cidr     = "10.128.0.0/14"
    service_cidr = "172.30.0.0/16"
  }}
  main_profile {{
    vm_size   = "Standard_D8s_v3"
    subnet_id = azurerm_subnet.master_subnet.id
  }}
  worker_profile {{
    subnet_id    = azurerm_subnet.worker_subnet.id
    disk_size_gb = 128
    node_count   = var.worker_node_count
    vm_size      = var.worker_vm_size
  }}
  service_principal {{
    client_id     = "{os.getenv("AZURE_CLIENT_ID")}"
    client_secret = "{os.getenv("AZURE_CLIENT_SECRET")}"
  }}
  api_server_profile {{
    visibility = var.api_server_profile
  }}
  ingress_profile {{
    visibility = var.ingress_profile
  }}
}}
output "console_url" {{
  value = azurerm_redhat_openshift_cluster.aro.console_url
}}
output "api_url" {{
  value = azurerm_redhat_openshift_cluster.aro.api_server_profile[0].url
}}'''
        
        provider_tf = f'''terraform {{
  required_providers {{
    azurerm = {{
      source  = "hashicorp/azurerm"
      version = "~>3.0"
    }}
  }}
}}
provider "azurerm" {{
  features {{}}
  subscription_id = var.subscription_id
  tenant_id       = "{os.getenv("AZURE_TENANT_ID")}"
  client_id       = "{os.getenv("AZURE_CLIENT_ID")}"
  client_secret   = "{os.getenv("AZURE_CLIENT_SECRET")}"
}}'''
        
        (self.work_dir / "variables.tf").write_text(variables_tf)
        (self.work_dir / "main.tf").write_text(main_tf)
        (self.work_dir / "provider.tf").write_text(provider_tf)
            
    def create_terraform_vars(self, params):
        cluster_name = params.get("name", "agentic-aro")
        region = params.get("region", "westus")
        worker_vm_size = params.get("worker_vm_size", "Standard_D4s_v3")
        worker_node_count = params.get("worker_node_count", 3)
        aro_version = params.get("version") or self.get_latest_aro_version(region)
        is_private = params.get("is_private", False)
        
        tfvars_content = f'''subscription_id = "{os.getenv('AZURE_SUBSCRIPTION_ID')}"
cluster_name = "{cluster_name}"
location = "{region}"
aro_version = "{aro_version}"
worker_node_count = {worker_node_count}
worker_vm_size = "{worker_vm_size}"
api_server_profile = "{'Private' if is_private else 'Public'}"
ingress_profile = "{'Private' if is_private else 'Public'}"'''
        
        (self.work_dir / "terraform.tfvars").write_text(tfvars_content)
        return str(self.work_dir)
                    
    def get_latest_aro_version(self, region="westus"):
        cmd = f"az aro get-versions -l {region} --output tsv"
        success, output = self._run_command(cmd)
        
        if success and output.strip():
            versions = [v.strip() for v in output.strip().split('\n') if v.strip()]
            if versions:
                stable_versions = [v for v in versions if not any(x in v for x in ['-tech-preview', '-dev', 'nightly'])]
                if stable_versions:
                    stable_versions.sort(reverse=True)
                    return stable_versions[0]
        
        return "4.16.30"
    def deploy_cluster(self, params):
        print(f"Processing deployment request for cluster: {params.get('name', 'agentic-aro')}")
        
        rg_name = f"{params.get('name', 'agentic-aro')}-rg"
        cmd = f"az group exists --name {rg_name} --output tsv"
        success, output = self._run_command(cmd)
        
        if success and output.strip().lower() == 'true':
            return {"status": "error", "message": f"Resource group {rg_name} already exists"}
        
        try:
            work_dir = self.create_terraform_vars(params)
    
            print("Running terraform init...")
            success, output = self._run_command("terraform init", cwd=work_dir, stream_output=True)
            if not success:
                return {"status": "error", "message": f"Terraform init failed: {output}"}
    
            print("\nRunning terraform plan...")
            success, output = self._run_command("terraform plan -out=aro.plan", cwd=work_dir, stream_output=True)
            if not success:
                return {"status": "error", "message": f"Terraform plan failed: {output}"}
    
            print("\nRunning terraform apply (this takes 30-45 minutes)...")
            success, output = self._run_command("terraform apply -auto-approve aro.plan", cwd=work_dir, stream_output=True)
            if not success:
                return {"status": "error", "message": f"Terraform apply failed: {output}"}
            
            print("\nDeployment completed!")
            
            # Get outputs
            success, output = self._run_command("terraform output -json", cwd=work_dir)
            if success:
                try:
                    outputs = json.loads(output)
                    console_url = outputs.get("console_url", {}).get("value")
                    api_url = outputs.get("api_url", {}).get("value")
                    
                    cluster_name = params.get("name", "agentic-aro")
                    cmd = f"az aro list-credentials --resource-group {rg_name} --name {cluster_name} --output json"
                    success, creds_output = self._run_command(cmd)
                    
                    if success:
                        credentials = json.loads(creds_output)
                        username = credentials.get("kubeadminUsername")
                        password = credentials.get("kubeadminPassword")
                        
                        return {
                            "status": "success",
                            "message": f"Successfully deployed ARO cluster {cluster_name}!",
                            "console_url": console_url,
                            "api_url": api_url,
                            "resource_group": rg_name,
                            "username": username,
                            "password": password
                        }
                    else:
                        return {
                            "status": "success",
                            "message": f"ARO cluster deployed",
                            "console_url": console_url,
                            "api_url": api_url,
                            "resource_group": rg_name
                        }
                except:
                    return {"status": "success", "message": "ARO cluster deployed"}
            
            return {"status": "success", "message": "ARO cluster deployed successfully!"}
            
        except Exception as e:
            return {"status": "error", "message": f"Deployment failed: {str(e)}"}
            
    def destroy_cluster(self, resource_group):
        print(f"Starting destruction of resource group: {resource_group}")
        
        try:
            cmd = f"az group exists --name {resource_group} --output tsv"
            success, output = self._run_command(cmd)
            
            if not success or output.strip().lower() != 'true':
                return {"status": "error", "message": f"Resource group {resource_group} not found"}
            
            print("Deleting resource group (this may take several minutes)...")
            cmd = f"az group delete --name {resource_group} --yes"
            success, output = self._run_command(cmd)
            
            if success:
                print("Destruction completed!")
                return {"status": "success", "message": f"ARO cluster in {resource_group} destroyed"}
            else:
                return {"status": "error", "message": f"Failed to destroy: {output}"}
        except Exception as e:
            return {"status": "error", "message": f"Error: {str(e)}"}

Once this file is saved, we are clear to proceed with the orchestration steps of the process.

Creating the simulator

Next, you will be creating the orchestrator or the brain of the agent. This simulator is a high-level orchestration layer implementing a state machine for deployment/deletion operations and it handles whether you ask it for either mock or real deployment (and destruction).

Copy the lines below and save it as simulator.py.

import os
import json
import time
import tempfile
from pathlib import Path

class AROAzureOpenAISimulator:
    def __init__(self, debug=False, mock=False, azure_openai_deployment="gpt-4o-mini"):
        self.debug = debug
        self.mock = mock
        self.azure_openai_deployment = azure_openai_deployment
        self.openai_parser = None
        self.setup_status = None
        self.cluster_temp_dirs = {}
    
    def setup(self):
        from setup import setup_environment
        self.setup_status = setup_environment()
        
        from parser import AzureOpenAIParser
        self.openai_parser = AzureOpenAIParser(
            deployment_name=self.azure_openai_deployment,
            debug=self.debug
        )
        
        return self.setup_status
    
    def process_request(self, request):
        if any(word in request.lower() for word in ["delete", "destroy", "remove", "tear down"]):
            return self._handle_deletion(request)
        else:
            return self._handle_deployment(request)
    
    def _handle_deployment(self, request):
        print(f"Parsing request: '{request}'")
        params = self.openai_parser.extract_aro_parameters(request)
        
        if not params:
            return {"status": "error", "message": "Failed to extract parameters"}
        
        print(f"Extracted parameters: {params}")
        return self._deploy_aro_cluster(params)
    
    def _handle_deletion(self, request):
        params = self.openai_parser.extract_deletion_parameters(request)
        
        if not params or not params.get("name"):
            return {"status": "error", "message": "Failed to extract cluster name"}
        
        return self._destroy_aro_cluster(params.get("name"))
    
    def _deploy_aro_cluster(self, params):
        cluster_name = params.get('name', 'agentic-aro')
        
        if self.mock:
            return self._mock_aro_deployment(params)
        
        print("\n" + "="*60)
        print(f"Starting ARO Cluster Deployment: {cluster_name}")
        print("="*60 + "\n")
        
        from deployment import ARODeploymentAgent
        try:
            aro_deployer = ARODeploymentAgent(debug=self.debug)
            result = aro_deployer.deploy_cluster(params)
            
            if result.get("status") == "success" and hasattr(aro_deployer, "work_dir"):
                self.cluster_temp_dirs[cluster_name] = str(aro_deployer.work_dir)
            
            return result
        except Exception as e:
            return {"status": "error", "message": f"Error deploying ARO cluster: {str(e)}"}
    
    def _destroy_aro_cluster(self, cluster_name):
        resource_group = f"{cluster_name}-rg"
        
        if self.mock:
            return self._mock_aro_destroy(cluster_name)
        
        from deployment import ARODeploymentAgent
        try:
            aro_deployer = ARODeploymentAgent(debug=self.debug)
            
            if cluster_name in self.cluster_temp_dirs:
                aro_deployer.work_dir = Path(self.cluster_temp_dirs[cluster_name])
            
            result = aro_deployer.destroy_cluster(resource_group)
            
            if result.get("status") == "success" and cluster_name in self.cluster_temp_dirs:
                del self.cluster_temp_dirs[cluster_name]
            
            return result
        except Exception as e:
            return {"status": "error", "message": f"Error destroying ARO cluster: {str(e)}"}
    
    def _mock_aro_deployment(self, params):
        cluster_name = params.get('name', 'agentic-aro')
        region = params.get('region', 'westus')
        rg_name = f"{cluster_name}-rg"
        
        time.sleep(6)
        
        console_url = f"https://console-openshift-console.apps.{cluster_name}.{region}.aroapp.io"
        api_url = f"https://api.{cluster_name}.{region}.aroapp.io:6443"
        
        return {
            "status": "success",
            "message": f"[MOCK] Successfully deployed ARO cluster {cluster_name}",
            "console_url": console_url,
            "api_url": api_url,
            "resource_group": rg_name,
            "username": "kubeadmin",
            "password": "Mock-Pass-123!"
        }
    
    def _mock_aro_destroy(self, cluster_name):
        resource_group = f"{cluster_name}-rg"
        time.sleep(3)
        
        return {
            "status": "success",
            "message": f"[MOCK] ARO cluster {cluster_name} destroyed"
        }

After saving this file, we can now proceed to integrating all our files and credentials into a notebook.

Red Hat logoGithubredditYoutubeTwitter

자세한 정보

평가판, 구매 및 판매

커뮤니티

Red Hat 문서 정보

Red Hat을 사용하는 고객은 신뢰할 수 있는 콘텐츠가 포함된 제품과 서비스를 통해 혁신하고 목표를 달성할 수 있습니다. 최신 업데이트를 확인하세요.

보다 포괄적 수용을 위한 오픈 소스 용어 교체

Red Hat은 코드, 문서, 웹 속성에서 문제가 있는 언어를 교체하기 위해 최선을 다하고 있습니다. 자세한 내용은 다음을 참조하세요.Red Hat 블로그.

Red Hat 소개

Red Hat은 기업이 핵심 데이터 센터에서 네트워크 에지에 이르기까지 플랫폼과 환경 전반에서 더 쉽게 작업할 수 있도록 강화된 솔루션을 제공합니다.

Theme

© 2026 Red Hat
맨 위로 이동