3.5. S3 select 操作
開発者は、S3 select を実行してスループットを加速できます。ユーザーは、メディエーターなしで S3 select クエリーを直接実行できます。
S3 select ワークフローには、CSV、Apache Parquet (Parquet)、および JSON の 3 つがあり、CSV、Parquet、および JSON オブジェクトを使用した S3 select 操作を提供します。
- CSV ファイルには、表形式のデータがプレーンテキスト形式で格納されます。ファイルの各行はデータレコードです。
- Parquet は、効率的なデータの保存と取得のために設計された、オープンソースのカラム型のデータファイル形式です。複雑なデータをまとめて処理するための強化されたパフォーマンスを備えた、非常に効率的なデータ圧縮およびエンコーディングスキームを提供します。Parquet を使用すると、S3 select エンジンが列とチャンクをスキップできるため、(CSV および JSON 形式とは対照的に) IOPS が大幅に削減されます。
- JSON はフォーマット構造です。S3 select エンジンは、JSON リーダーを使用して JSON 形式の入力データ上で SQL ステートメントを使用できるようにし、高度にネストされた複雑な JSON 形式のデータのスキャンを可能にします。
たとえば、数ギガバイトのデータを持つ CSV、Parquet または JSON S3 オブジェクトの場合、ユーザーは次のクエリーを使用して、別の列によってフィルター処理された単一の列を抽出できます。
例
select customerid from s3Object where age>30 and age<65;
現時点で、S3 オブジェクトはデータのフィルタリングおよび抽出の前に、Ceph Object Gateway 経由で Ceph OSD からデータを取得する必要があります。オブジェクトのサイズが大きく、クエリーが具体的な場合に、パフォーマンスが向上します。Parquet 形式は、CSV よりも効率的に処理できます。
前提条件
- 稼働中の Red Hat Ceph Storage クラスターがある。
- RESTful クライアント。
- ユーザーアクセスで作成された S3 ユーザー。
3.5.1. S3 select content from an object
select object content API は、構造化されたクエリー言語 (SQL) でオブジェクトの内容をフィルターします。インベントリーオブジェクトに含める必要がある内容の記述例は、AWS Systems Manager User Guide の Metadata collected by inventory セクションを参照してください。インベントリーの内容は、そのインベントリーに対して実行する必要があるクエリーのタイプに影響します。重要な情報を提供できる可能性のある SQL ステートメントの数は多いものの、S3 select は SQL に似たユーティリティーであるため、group-by
や join
などの一部の演算子はサポートされていません。
CSV の場合のみ、オブジェクトのコンマ区切りの値であるデータのシリアライズ形式を指定して、指定のコンテンツを取得する必要があります。Parquet はバイナリー形式であるため、区切り文字はありません。Amazon Web Services (AWS) のコマンドラインインターフェイス (CLI) 選択オブジェクトコンテンツは、CSV または Parquet 形式を使用してオブジェクトデータをレコードに解析し、クエリーで指定されたレコードのみを返します。
応答のデータシリアライゼーション形式を指定する必要があります。この操作には s3:GetObject
パーミッションが必要です。
-
InputSerialization
要素は、クエリーされるオブジェクトに含まれるデータの形式を記述します。オブジェクトは、CSV または Parquet 形式にすることができます。 -
OutputSerialization
要素は AWS-CLI ユーザークライアントの一部で、出力データのフォーマット方法を記述します。Ceph は AWS-CLI のサーバークライアントを実装しているため、現在 CSV のみであるOutputSerialization
に従って同じ出力を提供します。 -
InputSerialization
の形式は、OutputSerialization
の形式と一致する必要はありません。そのため、たとえばInputSerialization
で Parquet を指定し、OutputSerialization
で CSV を指定することもできます。
構文
POST /BUCKET/KEY?select&select-type=2 HTTP/1.1\r\n
例
POST /testbucket/sample1csv?select&select-type=2 HTTP/1.1\r\n POST /testbucket/sample1parquet?select&select-type=2 HTTP/1.1\r\n
要求エンティティー
Bucket
- 説明
- オブジェクトコンテンツを選択するバケット。
- 型
- String
- 必須
- はい
キー
- 説明
- オブジェクトキー。
- 長さに関する制約
- 最小長は 1 です。
- 型
- String
- 必須
- はい
SelectObjectContentRequest
- 説明
- select オブジェクトコンテンツ要求パラメーターのルートレベルタグ。
- 型
- String
- 必須
- はい
式
- 説明
- オブジェクトのクエリーに使用される式。
- 型
- String
- 必須
- はい
ExpressionType
- 説明
- SQL など、提供された式のタイプ。
- 型
- String
- 有効な値
- SQL
- 必須
- はい
InputSerialization
- 説明
- クエリーされるオブジェクトに含まれるデータの形式を記述します。
- 型
- String
- 必須
- はい
OutputSerialization
- 説明
- コンマセパレーターおよび改行で返されるデータの形式。
- 型
- String
- 必須
- はい
応答エンティティー
アクションに成功すると、サービスは HTTP 200
応答を返します。データは、サービスによって XML 形式で返されます。
Payload
- 説明
- ペイロードパラメーターのルートレベルタグ。
- 型
- String
- 必須
- はい
Records
- 説明
- レコードイベント。
- 型
- base64 でエンコードされたバイナリーデータオブジェクト
- 必須
- いいえ
Stats
- 説明
- stats イベント。
- 型
- Long
- 必須
- いいえ
Ceph Object Gateway は以下の応答をサポートします。
例
{:event-type,records} {:content-type,application/octet-stream} {:message-type,event}
構文 (CSV の場合)
aws --endpoint-URL http://localhost:80 s3api select-object-content --bucket BUCKET_NAME --expression-type 'SQL' --input-serialization '{"CSV": {"FieldDelimiter": "," , "QuoteCharacter": "\"" , "RecordDelimiter" : "\n" , "QuoteEscapeCharacter" : "\\" , "FileHeaderInfo": "USE" }, "CompressionType": "NONE"}' --output-serialization '{"CSV": {}}' --key OBJECT_NAME.csv --expression "select count(0) from s3object where int(_1)<10;" output.csv
例 (CSV の場合)
aws --endpoint-url http://localhost:80 s3api select-object-content --bucket testbucket --expression-type 'SQL' --input-serialization '{"CSV": {"FieldDelimiter": "," , "QuoteCharacter": "\"" , "RecordDelimiter" : "\n" , "QuoteEscapeCharacter" : "\\" , "FileHeaderInfo": "USE" }, "CompressionType": "NONE"}' --output-serialization '{"CSV": {}}' --key testobject.csv --expression "select count(0) from s3object where int(_1)<10;" output.csv
構文 (Parquet の場合)
aws --endpoint-url http://localhost:80 s3api select-object-content --bucket BUCKET_NAME --expression-type 'SQL' --input-serialization '{"Parquet": {}, {"CompressionType": "NONE"}' --output-serialization '{"CSV": {}}' --key OBJECT_NAME.parquet --expression "select count(0) from s3object where int(_1)<10;" output.csv
例 (Parquet の場合)
aws --endpoint-url http://localhost:80 s3api select-object-content --bucket testbucket --expression-type 'SQL' --input-serialization '{"Parquet": {}, {"CompressionType": "NONE"}' --output-serialization '{"CSV": {}}' --key testobject.parquet --expression "select count(0) from s3object where int(_1)<10;" output.csv
構文 (JSON の場合)
aws --endpoint-URL http://localhost:80 s3api select-object-content --bucket BUCKET_NAME --expression-type 'SQL' --input-serialization '{"JSON": {"CompressionType": "NONE"}' --output-serialization '{"CSV": {}}}' --key OBJECT_NAME.json --expression "select count(0) from s3object where int(_1)<10;" output.csv
例 (JSON の場合)
aws --endpoint-url http://localhost:80 s3api select-object-content --bucket testbucket --expression-type 'SQL' --input-serialization '{"JSON": {"CompressionType": "NONE"}' --output-serialization '{"CSV": {}}}' --key testobject.json --expression "select count(0) from s3object where int(_1)<10;" output.csv
例 (BOTO3 の場合)
import pprint import boto3 from botocore.exceptions import ClientError def run_s3select(bucket,key,query,column_delim=",",row_delim="\n",quot_char='"',esc_char='\\',csv_header_info="NONE"): s3 = boto3.client('s3', endpoint_url=endpoint, aws_access_key_id=access_key, region_name=region_name, aws_secret_access_key=secret_key) result = "" try: r = s3.select_object_content( Bucket=bucket, Key=key, ExpressionType='SQL', InputSerialization = {"CSV": {"RecordDelimiter" : row_delim, "FieldDelimiter" : column_delim,"QuoteEscapeCharacter": esc_char, "QuoteCharacter": quot_char, "FileHeaderInfo": csv_header_info}, "CompressionType": "NONE"}, OutputSerialization = {"CSV": {}}, Expression=query, RequestProgress = {"Enabled": progress}) except ClientError as c: result += str(c) return result for event in r['Payload']: if 'Records' in event: result = "" records = event['Records']['Payload'].decode('utf-8') result += records if 'Progress' in event: print("progress") pprint.pprint(event['Progress'],width=1) if 'Stats' in event: print("Stats") pprint.pprint(event['Stats'],width=1) if 'End' in event: print("End") pprint.pprint(event['End'],width=1) return result run_s3select( "my_bucket", "my_csv_object", "select int(_1) as a1, int(_2) as a2 , (a1+a2) as a3 from s3object where a3>100 and a3<300;")
サポートされる機能
現時点で、AWS s3 select コマンドの一部のみがサポートされます。
関連情報
- 詳細は、Amazon's S3 Select Object Content API を参照してください。
3.5.2. S3 supported select functions
S3 select は、.Timestamp の機能をサポートします。
- to_timestamp(string)
- 説明
- 文字列をタイムスタンプの基本型に変換します。文字列形式では、'時刻' の値が欠落している場合はゼロが入力されます。月と日の値が欠落している場合は、1 がデフォルト値として入力されます。'タイムゾーン' の形式は +/-HH:mm または Z で、文字 'Z' は協定世界時 (UTC) を示します。タイムゾーンの値の範囲は、-12:00 から +14:00 です。
- サポート対象
現在、次の文字列形式をタイムスタンプに変換できます。
- YYYY-MM-DDTHH:mm:ss.SSSSSS+/-HH:mm
- YYYY-MM-DDTHH:mm:ss.SSSSSSZ
- YYYY-MM-DDTHH:mm:ss+/-HH:mm
- YYYY-MM-DDTHH:mm:ssZ
- YYYY-MM-DDTHH:mm+/-HH:mm
- YYYY-MM-DDTHH:mmZ
- YYYY-MM-DDT
- YYYYT
- to_string(timestamp, format_pattern)
- 説明
- 入力したタイムスタンプの文字列表現を、指定した入力文字列形式で返します。
- パラメーター
形式 | 例 | 説明 |
---|---|---|
yy | 69 | 年を表す 2 桁の値。 |
y | 1969 | 年を表す 4 桁の値。 |
yyyy | 1969 | ゼロパディングされた 4 桁の年。 |
M | 1 | 月。 |
MM | 01 | ゼロパディングされた月。 |
MMM | Jan | 月の名称の省略形。 |
MMMM | January | 月の正式名称。 |
MMMMM | J |
月の最初の 1 文字。 |
d | 2 | 日付 (1 - 31)。 |
dd | 02 | ゼロパディングされた日付 (01 - 31)。 |
a | AM | 午前または午後。 |
h | 3 | 時刻 (1 - 12)。 |
hh | 03 | ゼロパディングされた時刻 (01 - 12)。 |
H | 3 | 時刻 (0 - 23)。 |
HH | 03 | ゼロパディングされた時刻 (00 - 23)。 |
m | 4 | 分 (0 - 59)。 |
mm | 04 | ゼロパディングされた分 (00 - 59)。 |
s | 5 | 秒 (0 - 59)。 |
ss | 05 | ゼロパディングされた秒 (00 - 59)。 |
S | 1 | 秒の小数部 (精度: 0.1、範囲: 0.0 - 0.9)。 |
SS | 12 | 秒の小数部 (精度: 0.01、範囲: 0.0 - 0.99)。 |
SSS | 123 | 秒の小数部 (精度: 0.01、範囲: 0.0 - 0.999)。 |
SSSS | 1234 | 秒の小数部 (精度: 0.001、範囲: 0.0 - 0.9999)。 |
SSSSSS | 123456 | 秒の小数部 (最大精度: 1 ナノ秒、範囲: 0.0 - 0.999999)。 |
n | 60000000 | ナノ秒部。 |
X | +07 または Z | 時間単位のオフセット。オフセットが 0 の場合は “Z”。 |
XX または XXXX | +0700 または Z | 時間と分単位のオフセット。オフセットが 0 の場合は “Z”。 |
XXX または XXXXX | +07:00 または Z | 時間と分単位のオフセット。オフセットが 0 の場合は “Z”。 |
x | 7 | 時間単位のオフセット。 |
xx または xxxx | 700 | 時間と分単位のオフセット。 |
xxx または xxxxx | +07:00 | 時間と分単位のオフセット。 |
- extract(date-part from timestamp)
- 説明
- 入力タイムスタンプからの date-part の抽出に従って整数を返します。
- サポート対象
- year、month、week、day、hour、minute、second、timezone_hour、timezone_minute。
- date_add(date-part ,integer,timestamp)
- 説明
- 入力されたタイムスタンプと date-part の結果に基づいて計算されたタイムスタンプを返します。
- サポート対象
- year、month、day、hour、minute、second。
- date_diff(date-part,timestamp,timestamp)
- 説明
- 整数を返します。これは、date-part に応じた 2 つのタイムスタンプの差の計算結果です。
- サポート対象
- year、month、day、hour、minute、second。
- utcnow()
- 説明
- 現在の時刻のタイムスタンプを返します。
集約
- count()
- 説明
- (条件がある場合) 条件と一致する行数に基づいて整数を返します。
- sum(expression)
- 説明
- (条件がある場合) 条件と一致する各行の式の概要を返します。
- avg(expression)
- 説明
- (条件がある場合) 条件に一致する各行の平均式を返します。
- max(expression)
- 説明
- (条件がある場合) 条件に一致するすべての式について最大結果を返します。
- min(expression)
- 説明
- (条件がある場合) 条件に一致するすべての式の最小結果を返します。
String
- substring (string,from,for)
- 説明
- from、for の入力に従って入力文字列から抽出した文字列を返します。
- Char_length
- 説明
- 文字列の文字数を返します。Character_length も同じです。
- trim([[leading | trailing | both remove_chars] from] string )
- 説明
- ターゲット文字列から先頭/末尾 (または両方) の文字を削除します。デフォルト値は空白文字です。
- Upper\lower
- 説明
- 文字を大文字または小文字に変換します。
NULL
NULL
値が見つからないか、不明な値で、NULL
が任意の演算に値を生成できません。同じことが算術比較にも当てはまります。NULL
との比較は不明である NULL
です。
A is NULL | Result(NULL=UNKNOWN) |
---|---|
Not A |
|
A または alse |
|
A or True |
|
A or A |
|
A and False |
|
A and True |
|
A and A |
|
関連情報
- 詳細は、Amazon's S3 Select Object Content API を参照してください。
3.5.3. S3 alias programming construct
エイリアスプログラミング構築は、多くの列または複雑なクエリーを含むオブジェクトを持つプログラミングを容易にするため、s3 select 言語に不可欠な部分です。エイリアス構造を含むステートメントを解析すると、エイリアスを適切な投影列への参照に置き換え、クエリーの実行時に参照が他の式として評価されます。エイリアスは結果キャッシュを維持します。つまり、エイリアスが複数回使用された場合は、キャッシュからの結果が使用されるため、同じ式は評価されず、同じ結果が返されます。現在、Red Hat は列エイリアスをサポートしています。
例
select int(_1) as a1, int(_2) as a2 , (a1+a2) as a3 from s3object where a3>100 and a3<300;")
3.5.4. S3 解析の説明
S3 select エンジンには、CSV、Parquet、JSON の 3 つのファイル形式すべてに対応するパーサーがあり、コマンドをより処理しやすいコンポーネントに分割します。コンポーネントは、各コンポーネントを定義するタグにアタッチされます。
3.5.4.1. S3 CSV の解析
入力シリアライゼーションを含む CSV 定義では、次のデフォルト値が使用されます。
-
行区切り文字には
{\n}`
を使用します。 -
引用には
{“}
を使用します。 -
エスケープ文字には
{\}
を使用します。
csv-header-info
は、AWS-CLI に表示される USE
で解析されます。これは、スキーマを含む入力オブジェクトの最初の行です。現在、シリアル化および圧縮タイプの出力はサポートされていません。S3 select エンジンには、S3-objects を解析する CSV パーサーがあります。
- 各行は、行区切り文字で終わります。
- フィールド区切り文字は、隣接する列を区切ります。
-
連続するフィールドの区切り文字は
NULL
列を定義します。 - 引用符は、フィールド区切り文字をオーバーライドします。フィールド区切り文字は、引用符の間の任意の文字です。
- エスケープ文字は、行区切り文字以外の特殊文字を無効にします。
以下は、CSV 解析ルールの例です。
機能 | 説明 | 入力 (トークン) |
---|---|---|
| 連続するフィールド区切り文字 |
|
| 引用符は、フィールドの区切り文字をオーバーライドします。 |
|
| エスケープ文字はメタ文字をオーバーライドします。 |
オブジェクトの所有者の |
| 終わりの引用符はありません。行区切り文字は終了行になります。 |
|
| FileHeaderInfo タグ | USE の値は、最初の行の各トークンが column-name であることを示します。IGNORE 値は最初の行をスキップすることを意味します。 |
関連情報
- 詳細は、Amazon's S3 Select Object Content API を参照してください。
3.5.4.2. S3 Parquet の解析
Apache Parquet は、効率的なデータの保存と取得のために設計された、オープンソースのコラム型のデータファイル形式です。
S3 select エンジンの Parquet パーサーは、S3 オブジェクトを次のように解析します。
例
4-byte magic number "PAR1" <Column 1 Chunk 1 + Column Metadata> <Column 2 Chunk 1 + Column Metadata> ... <Column N Chunk 1 + Column Metadata> <Column 1 Chunk 2 + Column Metadata> <Column 2 Chunk 2 + Column Metadata> ... <Column N Chunk 2 + Column Metadata> ... <Column 1 Chunk M + Column Metadata> <Column 2 Chunk M + Column Metadata> ... <Column N Chunk M + Column Metadata> File Metadata 4-byte length in bytes of file metadata 4-byte magic number "PAR1"
- 上の例のテーブルには N 列あり、M 個の行グループに分割されています。ファイルメタデータには、すべての列メタデータの開始位置が含まれています。
- シングルパス書き込みを可能にするために、メタデータはデータの後に書き込まれます。
- すべての列チャンクはファイルメタデータ内にあり、後で順次読み取る必要があります。
- この形式は、メタデータをデータから分離するように明示的に設計されています。これにより、列を複数のファイルに分割したり、単一のメタデータファイルで複数の parquet ファイルを参照したりすることができます。
3.5.4.3. S3 JSON の解析
JSON ドキュメントでは、オブジェクトまたは配列内で制限なく値をネストできます。S3 select エンジンで JSON ドキュメント内の特定の値をクエリーする場合、値の場所は SELECT
ステートメントのパスで指定されます。
JSON ドキュメントの一般的な構造には、CSV や Parquet のような行と列の構造はありません。代わりに、SQL ステートメント自体が、JSON ドキュメントをクエリーするときに行と列を定義します。
S3 select エンジンの JSON パーサーは、S3 オブジェクトを次のように解析します。
-
SELECT
ステートメントのFROM
句は行の境界を定義します。 - JSON ドキュメント内の行は、行区切り文字を使用して CSV オブジェクトの行を定義する方法や、行グループを使用して Parquet オブジェクトの行を定義する方法と似ています。
以下の例を考慮してください。
例
{ "firstName": "Joe", "lastName": "Jackson", "gender": "male", "age": "twenty" }, { "firstName": "Joe_2", "lastName": "Jackson_2", "gender": "male", "age": 21 }, "phoneNumbers": [ { "type": "home1", "number": "734928_1","addr": 11 }, { "type": "home2", "number": "734928_2","addr": 22 } ], "key_after_array": "XXX", "description" : { "main_desc" : "value_1", "second_desc" : "value_2" } # the from-clause define a single row. # _1 points to root object level. # _1.age appears twice in Documnet-row, the last value is used for the operation. query = "select _1.firstname,_1.key_after_array,_1.age+4,_1.description.main_desc,_1.description.second_desc from s3object[*].aa.bb.cc;"; expected_result = Joe_2,XXX,25,value_1,value_2
-
このステートメントは、リーダーにパス
aa.bb.cc
を検索するように指示し、このパスの出現に基づいて行の境界を定義します。 -
行は、リーダーがパスを検出したときに始まり、リーダーがパスの最も内側の部分 (この場合はオブジェクト
cc
) を出たときに終了します。
-
このステートメントは、リーダーにパス
3.5.5. Ceph Object Gateway と Trino の統合
Ceph Object Gateway を Trino と統合します。Trino は、ユーザーが S3 オブジェクト上で SQL クエリーを 9 倍の速度で実行できるようにする重要なユーティリティーです。
Trino を使用するいくつかの利点を以下に示します。
- Trino は完全な SQL エンジンです。
- S3 select リクエストをプッシュダウンします。プッシュダウンでは、Trino エンジンが、サーバー側で実行するとコスト効率が高い SQL ステートメントの一部を特定します。
- Ceph/S3select の最適化ルールを使用してパフォーマンスを向上します。
- Red Hat Ceph Storage のスケーラビリティを活用して、元のオブジェクトを複数の部分に等分し、S3 select リクエストを実行して、リクエストをマージします。
trino を使用してクエリーしているときに s3select
構文が機能しない場合は、SQL 構文を使用してください。
前提条件
- Ceph Object Gateway がインストールされた Red Hat Ceph Storage クラスターが実行中である。
- Docker または Podman がインストールされている。
- バケットが作成されている。
- オブジェクトがアップロードされている。
手順
Trino と hive をデプロイします。
例
[cephuser@host01 ~]$ git clone https://github.com/ceph/s3select.git [cephuser@host01 ~]$ cd s3select
S3 エンドポイント、アクセスキー、およびシークレットキーを使用して
hms_trino.yaml
ファイルを変更します。例
[cephuser@host01 s3select]$ cat container/trino/hms_trino.yaml version: '3' services: hms: image: galsl/hms:dev container_name: hms environment: # S3_ENDPOINT the CEPH/RGW end-point-url - S3_ENDPOINT=http://rgw_ip:port - S3_ACCESS_KEY=abc - S3_SECRET_KEY=abc # the container starts with booting the hive metastore command: sh -c '. ~/.bashrc; start_hive_metastore' ports: - 9083:9083 networks: - trino_hms trino: image: trinodb/trino:405 container_name: trino volumes: # the trino directory contains the necessary configuration - ./trino:/etc/trino ports: - 8080:8080 networks: - trino_hms networks: trino_hm
S3 エンドポイント、アクセスキー、およびシークレットキーを使用して
hive.properties
ファイルを変更します。例
[cephuser@host01 s3select]$ cat container/trino/trino/catalog/hive.properties connector.name=hive hive.metastore.uri=thrift://hms:9083 #hive.metastore.warehouse.dir=s3a://hive/ hive.allow-drop-table=true hive.allow-rename-table=true hive.allow-add-column=true hive.allow-drop-column=true hive.allow-rename-column=true hive.non-managed-table-writes-enabled=true hive.s3select-pushdown.enabled=true hive.s3.aws-access-key=abc hive.s3.aws-secret-key=abc # should modify per s3-endpoint-url hive.s3.endpoint=http://rgw_ip:port #hive.s3.max-connections=1 #hive.s3select-pushdown.max-connections=1 hive.s3.connect-timeout=100s hive.s3.socket-timeout=100s hive.max-splits-per-second=10000 hive.max-split-size=128MB
Trino コンテナーを起動して Ceph Object Gateway を統合します。
例
[cephuser@host01 s3select]$ sudo docker compose -f ./container/trino/hms_trino.yaml up -d
統合を確認します。
例
[cephuser@host01 s3select]$ sudo docker exec -it trino /bin/bash trino@66f753905e82:/$ trino trino> create schema hive.csvbkt1schema; trino> create table hive.csvbkt1schema.polariondatacsv(c1 varchar,c2 varchar, c3 varchar, c4 varchar, c5 varchar, c6 varchar, c7 varchar, c8 varchar, c9 varchar) WITH ( external_location = 's3a://csvbkt1/',format = 'CSV'); trino> select * from hive.csvbkt1schema.polariondatacsv;
注記外部の場所は、ファイルの末尾ではなく、バケット名またはディレクトリーを指す必要があります。