2.8. 创建功能,将过滤的数据发布到您的存储桶
创建一个过滤数据的功能,并将其添加到您创建的存储帐户中,以便与红帽共享。您可以使用示例 Python 脚本从与红帽费用相关的成本导出中收集成本数据,并将其添加到存储帐户中。此脚本会过滤您使用 BigQuery 创建的成本数据,删除非红帽信息,然后创建 .csv
文件,将其存储在您创建的存储桶中,并将数据发送到红帽。
流程
在 Google Cloud Console 中,搜索
secret
并选择 Secret Manager 结果来设置一个 secret,以使用 Red Hat 验证您的功能,而无需将凭证存储在您的功能中。- 在 Secret Manager 页面中,点 Create Secret。
- 为您的 secret 命名,添加您的红帽用户名,然后点 Create Secret。
- 重复此过程,为您的红帽密码保存 secret。
-
在 Google Cloud Console 搜索栏中,搜索功能并选择 Cloud Functions 结果。
- 在 Cloud Functions 页面上,单击 Create function。
-
将函数命名为。在本例中,使用
customer-data-function
。 - 在 Trigger 部分,点 Save 接受 HTTP Trigger 类型。
- 在 Runtime, build, connections and security settings 中,单击 Security and image repository,引用您创建的 secret,点 Done,然后点 Next。
- 在 Cloud Functions Code 页面中,将运行时设置为 Python 3.9。
打开
requirements.txt
文件。将下面几行粘贴到文件末尾。requests google-cloud-bigquery google-cloud-storage
打开
main.py
文件。-
将 Entry Point 设置为
get_filtered_data
。 粘贴以下 python 脚本。将标记为
# Required vars 的部分中
的值更改为您的环境的值。import csv import datetime import uuid import os import requests from google.cloud import bigquery from google.cloud import storage from itertools import islice from dateutil.relativedelta import relativedelta query_range = 5 now = datetime.datetime.now() delta = now - relativedelta(days=query_range) year = now.strftime("%Y") month = now.strftime("%m") day = now.strftime("%d") report_prefix=f"{year}/{month}/{day}/{uuid.uuid4()}" # Required vars to update USER = os.getenv('username') # Cost management username PASS = os.getenv('password') # Cost management password INTEGRATION_ID = "<integration_id>" # Cost management integration_id BUCKET = "<bucket>" # Filtered data GCP Bucket PROJECT_ID = "<project_id>" # Your project ID DATASET = "<dataset>" # Your dataset name TABLE_ID = "<table_id>" # Your table ID gcp_big_query_columns = [ "billing_account_id", "service.id", "service.description", "sku.id", "sku.description", "usage_start_time", "usage_end_time", "project.id", "project.name", "project.labels", "project.ancestry_numbers", "labels", "system_labels", "location.location", "location.country", "location.region", "location.zone", "export_time", "cost", "currency", "currency_conversion_rate", "usage.amount", "usage.unit", "usage.amount_in_pricing_units", "usage.pricing_unit", "credits", "invoice.month", "cost_type", "resource.name", "resource.global_name", ] table_name = ".".join([PROJECT_ID, DATASET, TABLE_ID]) BATCH_SIZE = 200000 def batch(iterable, n): """Yields successive n-sized chunks from iterable""" it = iter(iterable) while chunk := tuple(islice(it, n)): yield chunk def build_query_select_statement(): """Helper to build query select statement.""" columns_list = gcp_big_query_columns.copy() columns_list = [ f"TO_JSON_STRING({col})" if col in ("labels", "system_labels", "project.labels", "credits") else col for col in columns_list ] columns_list.append("DATE(_PARTITIONTIME) as partition_date") return ",".join(columns_list) def create_reports(query_date): query = f"SELECT {build_query_select_statement()} FROM {table_name} WHERE DATE(_PARTITIONTIME) = {query_date} AND sku.description LIKE '%RedHat%' OR sku.description LIKE '%Red Hat%' OR service.description LIKE '%Red Hat%' ORDER BY usage_start_time" client = bigquery.Client() query_job = client.query(query).result() column_list = gcp_big_query_columns.copy() column_list.append("partition_date") daily_files = [] storage_client = storage.Client() bucket = storage_client.bucket(BUCKET) for i, rows in enumerate(batch(query_job, BATCH_SIZE)): csv_file = f"{report_prefix}/{query_date}_part_{str(i)}.csv" daily_files.append(csv_file) blob = bucket.blob(csv_file) with blob.open(mode='w') as f: writer = csv.writer(f) writer.writerow(column_list) writer.writerows(rows) return daily_files def post_data(files_list): # Post CSV's to console.redhat.com API url = "https://console.redhat.com/api/cost-management/v1/ingress/reports/" json_data = {"source": INTEGRATION_ID, "reports_list": files_list, "bill_year": year, "bill_month": month} resp = requests.post(url, json=json_data, auth=(USER, PASS)) return resp def get_filtered_data(request): files_list = [] query_dates = [delta + datetime.timedelta(days=x) for x in range(query_range)] for query_date in query_dates: files_list += create_reports(query_date.date()) resp = post_data(files_list) return f'Files posted! {resp}'
-
将 Entry Point 设置为
- 点 Deploy。