3.5. S3 选择操作
作为开发者,您可以运行 S3 选择来加快吞吐量。用户可以在没有介质器的情况下直接运行 S3 选择查询。
有三个 S3 选择工作流 - CSV、Apache Parquet (Parquet)和 JSON,为 CSV、Bquet 和 JSON 对象提供 S3 选择操作:
- CSV 文件以纯文本格式存储表格数据。文件的每一行都是数据记录。
- Parquet 是一个开源、面向列的数据文件格式,旨在高效数据存储和检索。它提供了高效率的数据压缩和编码方案,以提高性能,以批量处理复杂数据。Parquet 启用 S3 select-engine 跳过列和块,从而减少 IOPS (与 CSV 和 JSON 格式持续)。
- JSON 是格式结构。S3 选择引擎使用 JSON 格式输入数据顶部的 SQL 语句,从而可以扫描高度嵌套和复杂的 JSON 格式数据。
例如,带有数 GB 数据的 CSV、Parquet 或 JSON S3 对象允许用户提取单个列,使用以下查询过滤另一个列:
示例
select customerid from s3Object where age>30 and age<65;
目前,S3 对象必须通过 Ceph 对象网关从 Ceph OSD 检索数据,然后才能过滤和提取数据。当对象较大且查询更为具体时,性能会提高性能。比 CSV 更高效地处理 Parquet 格式。
先决条件
- 一个正在运行的 Red Hat Ceph Storage 集群。
- RESTful 客户端。
- 创建的用户访问权限的 S3 用户。
3.5.1. S3 从对象中选择内容 复制链接链接已复制到粘贴板!
选择对象内容 API 通过结构化查询语言(SQL)过滤对象的内容。如需清单对象中应驻留的内容的说明,请参阅 AWS 系统管理器用户指南中的清单收集的元数据部分。清单内容会影响应针对该清单运行的查询类型。可能提供重要信息的 SQL 语句数量较大,但 S3 选择是类 SQL 的实用程序,因此不支持某些运算符,如 group-by 并加入。
仅对于 CSV,您必须将数据序列化格式指定为以逗号分隔的对象值,以检索指定的内容。Parquet 没有分隔符,因为它采用二进制格式。Amazon Web Services (AWS)命令行界面(CLI)选择对象内容使用 CSV 或 Parquet 格式将对象数据解析到记录中,仅返回查询中指定的记录。
您必须为响应指定数据序列化格式。这个操作必须具有 s3:GetObject 权限。
-
InputSerialization元素描述了正在查询的对象中的数据格式。对象可以是 CSV 或 Parquet 格式。 -
OutputSerialization元素是 AWS-CLI 用户客户端的一部分,并描述了如何格式化输出数据。Ceph 为 AWS-CLI 实施了服务器客户端,因此,根据当前仅 CSV 的OutputSerialization提供相同的输出。 -
InputSerialization的格式不需要与OutputSerialization的格式匹配。例如,您可以在InputSerialization中指定 Parquet,在OutputSerialization中指定 CSV。
语法
POST /BUCKET/KEY?select&select-type=2 HTTP/1.1\r\n
示例
POST /testbucket/sample1csv?select&select-type=2 HTTP/1.1\r\n
POST /testbucket/sample1parquet?select&select-type=2 HTTP/1.1\r\n
请求实体
Bucket- 描述
- 要从中选择对象内容的存储桶。
- Type
- 字符串
- 必需
- 是
键- 描述
- 对象键。
- 长度限制
- 最小长度为 1.
- Type
- 字符串
- 必需
- 是
SelectObjectContentRequest- 描述
- 选择对象内容请求参数的根级别标签。
- Type
- 字符串
- 必需
- 是
表达式- 描述
- 用于查询对象的表达式。
- Type
- 字符串
- 必需
- 是
ExpressionType- 描述
- 示例 SQL 提供的表达式的类型。
- Type
- 字符串
- 有效值
- SQL
- 必需
- 是
InputSerialization- 描述
- 描述正在查询的对象中数据的格式。
- Type
- 字符串
- 必需
- 是
OutputSerialization- 描述
- 以逗号分隔符和新行返回的数据格式。
- Type
- 字符串
- 必需
- 是
响应实体
如果操作成功,服务会返回 HTTP 200 响应。服务以 XML 格式返回数据:
payload- 描述
- 有效负载参数的根级别标签。
- Type
- 字符串
- 必需
- 是
Records- 描述
- 记录事件。
- Type
- base64 编码的二进制数据对象
- 必需
- 否
Stats- 描述
- stats 事件。
- Type
- Long
- 必需
- 否
Ceph 对象网关支持以下响应:
示例
{:event-type,records} {:content-type,application/octet-stream} {:message-type,event}
语法(对于 CSV)
aws --endpoint-URL http://localhost:80 s3api select-object-content
--bucket BUCKET_NAME
--expression-type 'SQL'
--input-serialization
'{"CSV": {"FieldDelimiter": "," , "QuoteCharacter": "\"" , "RecordDelimiter" : "\n" , "QuoteEscapeCharacter" : "\\" , "FileHeaderInfo": "USE" }, "CompressionType": "NONE"}'
--output-serialization '{"CSV": {}}'
--key OBJECT_NAME.csv
--expression "select count(0) from s3object where int(_1)<10;" output.csv
示例(对于 CSV)
aws --endpoint-url http://localhost:80 s3api select-object-content
--bucket testbucket
--expression-type 'SQL'
--input-serialization
'{"CSV": {"FieldDelimiter": "," , "QuoteCharacter": "\"" , "RecordDelimiter" : "\n" , "QuoteEscapeCharacter" : "\\" , "FileHeaderInfo": "USE" }, "CompressionType": "NONE"}'
--output-serialization '{"CSV": {}}'
--key testobject.csv
--expression "select count(0) from s3object where int(_1)<10;" output.csv
语法(针对 Parquet)
aws --endpoint-url http://localhost:80 s3api select-object-content
--bucket BUCKET_NAME
--expression-type 'SQL'
--input-serialization
'{"Parquet": {}, {"CompressionType": "NONE"}'
--output-serialization '{"CSV": {}}'
--key OBJECT_NAME.parquet
--expression "select count(0) from s3object where int(_1)<10;" output.csv
示例(对于 Parquet)
aws --endpoint-url http://localhost:80 s3api select-object-content
--bucket testbucket
--expression-type 'SQL'
--input-serialization
'{"Parquet": {}, {"CompressionType": "NONE"}'
--output-serialization '{"CSV": {}}'
--key testobject.parquet
--expression "select count(0) from s3object where int(_1)<10;" output.csv
语法(用于 JSON)
aws --endpoint-URL http://localhost:80 s3api select-object-content
--bucket BUCKET_NAME
--expression-type 'SQL'
--input-serialization
'{"JSON": {"CompressionType": "NONE"}'
--output-serialization '{"CSV": {}}}'
--key OBJECT_NAME.json
--expression "select count(0) from s3object where int(_1)<10;" output.csv
示例(用于 JSON)
aws --endpoint-url http://localhost:80 s3api select-object-content
--bucket testbucket
--expression-type 'SQL'
--input-serialization
'{"JSON": {"CompressionType": "NONE"}'
--output-serialization '{"CSV": {}}}'
--key testobject.json
--expression "select count(0) from s3object where int(_1)<10;" output.csv
示例(用于 BOTO3)
import pprint
import boto3
from botocore.exceptions import ClientError
def run_s3select(bucket,key,query,column_delim=",",row_delim="\n",quot_char='"',esc_char='\\',csv_header_info="NONE"):
s3 = boto3.client('s3',
endpoint_url=endpoint,
aws_access_key_id=access_key,
region_name=region_name,
aws_secret_access_key=secret_key)
result = ""
try:
r = s3.select_object_content(
Bucket=bucket,
Key=key,
ExpressionType='SQL',
InputSerialization = {"CSV": {"RecordDelimiter" : row_delim, "FieldDelimiter" : column_delim,"QuoteEscapeCharacter": esc_char, "QuoteCharacter": quot_char, "FileHeaderInfo": csv_header_info}, "CompressionType": "NONE"},
OutputSerialization = {"CSV": {}},
Expression=query,
RequestProgress = {"Enabled": progress})
except ClientError as c:
result += str(c)
return result
for event in r['Payload']:
if 'Records' in event:
result = ""
records = event['Records']['Payload'].decode('utf-8')
result += records
if 'Progress' in event:
print("progress")
pprint.pprint(event['Progress'],width=1)
if 'Stats' in event:
print("Stats")
pprint.pprint(event['Stats'],width=1)
if 'End' in event:
print("End")
pprint.pprint(event['End'],width=1)
return result
run_s3select(
"my_bucket",
"my_csv_object",
"select int(_1) as a1, int(_2) as a2 , (a1+a2) as a3 from s3object where a3>100 and a3<300;")
支持的功能
目前只支持 AWS s3 选择命令的一部分:
3.5.2. S3 支持的选择功能 复制链接链接已复制到粘贴板!
S3 选择支持以下功能:.Timestamp
- to_timestamp(string)
- 描述
- 将字符串转换为时间戳基本类型。在字符串格式中,任何缺少的 'time' 值都会填充零;对于缺少的月份和天值,1 是默认值。'timezone' 格式是 +/-H:mm 或 Z,其中字母 'Z' 表示协调通用时间(UTC)。timezone 的值可以包括 - 12:00 和 +14:00。
- 支持
目前,它可以将以下字符串格式转换为时间戳:
- YYYY-MM-DDTHH:mm:ss.SSSSSS+/-HH:mm
- YYYY-MM-DDTHH:mm:ss.SSSSSSZ
- YYYY-MM-DDTHH:mm:ss+/-HH:mm
- YYYY-MM-DDTHH:mm:ssZ
- YYYY-MM-DDTHH:mm+/-HH:mm
- YYYY-MM-DDTHH:mmZ
- YYYY-MM-DDT
- YYYYT
- to_string (timestamp, format_pattern)
- 描述
- 以给定的输入字符串格式返回输入时间戳的字符串。
- 参数
| 格式 | 示例 | 描述 |
|---|---|---|
| yy | 69 | 2 年数字. |
| y | 1969 | 4 年数字。 |
| YYYY | 1969 | 零添加的 4 位数年。 |
| M | 1 | 年月。 |
| MM | 01 | 每年的零添加月数. |
| MMM | 1 月 | 每年名称的缩写月。 |
| MMMM | 1 月 | 年全月名称。 |
| MMMMM | J |
年前一个字母的月。与 |
| d | 2 | 日期(1-31)。 |
| dd | 02 | 每月零添加一天(01-31)。 |
| a | AM | 每天的 AM 或 PM。 |
| h | 3 | 天中的小时(1-12)。 |
| hh | 03 | 零添加小时的一天(01-12)。 |
| H | 3 | 天中的小时(0-23)。 |
| HH | 03 | 每天的零添加小时(00-23)。 |
| m | 4 | 小时的分钟(0-59)。 |
| mm | 04 | 小时的零添加分钟(00-59)。 |
| s | 5 | 第二分钟(0-59)。 |
| ss | 05 | 分钟的零添加秒(00-59)。 |
| S | 1 | 第二部分(精度: 0.1,范围: 0.0-0.9)。 |
| SS | 12 | 第二部分(精度: 0.01,范围: 0.0-0.99)。 |
| SSS | 123 | 第二部分(精度: 0.01,范围: 0.0-0.999)。 |
| SSSS | 1234 | 第二部分(精度: 0.001,范围: 0.0-0.9999)。 |
| SSSSSS | 123456 | 第二部分(最大精度):1纳秒,范围: 0.0-0.999999)。 |
| n | 60000000 | second 的 nano。 |
| X | +07 或 Z | 如果偏移为 0,则以小时或"Z"表示偏移量。 |
| XX 或 XXXX | +0700 或 Z | 如果偏移为 0,则以小时和"Z"表示偏移量。 |
| XXX 或 XXXXX | +07:00 或 Z | 如果偏移为 0,则以小时和"Z"表示偏移量。 |
| x | 7 | 以小时为单位进行偏移。 |
| XX 或 xxxx | 700 | 以小时和分钟为单位的偏移。 |
| xxx 或 xxxxx | +07:00 | 以小时和分钟为单位的偏移。 |
- extract (date-part from timestamp)
- 描述
- 根据 date-part 从输入时间戳中提取的整数。
- 支持
- year, month, week, day, hour, minute, second, timezone_hour, timezone_minute.
- date_add(date-part ,integer,timestamp)
- 描述
- 返回时间戳,根据输入时间戳和日期部分的结果计算。
- 支持
- 年,月份、天、小时、分钟、秒。
- date_diff(date-part,timestamp,timestamp)
- 描述
- 返回整数,根据 date-part 在两个时间戳之间计算的结果。
- 支持
- 年,月份、天、小时、分钟、秒。
- utcnow()
- 描述
- 返回当前时间的时间戳。
聚合
- count()
- 描述
- 根据与条件匹配的行数返回整数(如果存在)。
- sum(expression)
- 描述
- 如果出现某个条件,则每行上返回表达式摘要。
- avg(expression)
- 描述
- 如果出现某个条件,每行中返回一个平均表达式。
- max(expression)
- 描述
- 如果出现某个条件,则返回与条件匹配的所有表达式的最大结果。
- min(expression)
- 描述
- 返回与条件匹配的所有表达式的最小结果。
字符串
- 子字符串(字符串、from、for)
- 描述
- 为输入返回字符串从输入字符串中提取的字符串。
- Char_length
- 描述
- 返回字符串中的多个字符。Character_length 也实现相同的目的。
- trim ([[leading | trailing | both remove_chars] from] string)
- 描述
- 从目标字符串中修剪前/尾部(或两者)字符。默认值为空白字符。
- Upper\lower
- 描述
- 将字符转换为大写或小写。
NULL
NULL 值缺失或未知,即 NULL 无法为任何算术操作生成一个值。这同样适用于算术比较,对 NULL 的任何比较都是未知的 NULL。
| A is NULL | result (NULL=UNKNOWN) |
|---|---|
| 不是 A |
|
| A 或 False |
|
| A or True |
|
| A 或 A |
|
| A 和 False |
|
| A 和 True |
|
| A 和 A |
|
3.5.3. S3 别名编程结构 复制链接链接已复制到粘贴板!
别名编程结构是 s3 选择语言的重要组成部分,因为它可以为包含许多列或复杂查询的对象启用更好的编程。当解析带有别名结构的声明时,它会将别名替换为对右投射列和查询执行的引用,该引用将象任何其他表达式一样评估。别名维护结果缓存,如果别名被多次使用,则不会评估相同的表达式,并返回相同的结果,因为使用了缓存的结果。目前,红帽支持列别名。
示例
select int(_1) as a1, int(_2) as a2 , (a1+a2) as a3 from s3object where a3>100 and a3<300;")
3.5.4. S3 解析解释 复制链接链接已复制到粘贴板!
S3 选择引擎具有所有三个文件格式的解析器:CSV、Brquet 和 JSON,后者将命令划分为更多可处理组件,然后附加到定义每个组件的标签中。
3.5.4.1. S3 CSV 解析 复制链接链接已复制到粘贴板!
带有输入序列化的 CSV 定义使用以下默认值:
-
将
{\n}'用于 row-delimiter。 -
使用
{"}括起内容。 -
使用
{\}转义字符。
csv-header-info 会在 AWS-CLI 中显示 USE 时解析,这是包含该模式的输入对象中的第一行。目前,不支持输出序列化和压缩类型。S3 选择引擎具有 CSV 解析器,它解析 S3-objects:
- 每行以 row-delimiter 结尾。
- field-separator 会分离相邻的列。
-
successive 字段分隔符定义
NULL列。 - quote-character 覆盖 field-separator;即,字段分隔符是引号之间的任何字符。
- 转义字符禁用除行分隔符之外的任何特殊字符。
以下是 CSV 解析规则的示例:
| 功能 | 描述 | 输入(Tokens) |
|---|---|---|
|
| successive 字段分隔符 |
|
|
| quote 字符覆盖字段分隔符。 |
|
|
| 转义字符覆盖了 元字符。 |
对象所有者 |
|
| 没有关闭的引号;行分隔符是右行。 |
|
|
| FileHeaderInfo 标签 | USE 值表示第一行中的每个令牌都是列名称;IGNORE 值意味着跳过第一行。 |
3.5.4.2. S3 Parquet 解析 复制链接链接已复制到粘贴板!
Apache Parquet 是一个开源列式数据文件格式,旨在高效数据存储和检索。
S3 选择引擎的 Parquet 解析器解析 S3-objects,如下所示:
示例
4-byte magic number "PAR1"
<Column 1 Chunk 1 + Column Metadata>
<Column 2 Chunk 1 + Column Metadata>
...
<Column N Chunk 1 + Column Metadata>
<Column 1 Chunk 2 + Column Metadata>
<Column 2 Chunk 2 + Column Metadata>
...
<Column N Chunk 2 + Column Metadata>
...
<Column 1 Chunk M + Column Metadata>
<Column 2 Chunk M + Column Metadata>
...
<Column N Chunk M + Column Metadata>
File Metadata
4-byte length in bytes of file metadata
4-byte magic number "PAR1"
- 在上例中,此表中有 N 列,被分成 M 行组。文件元数据包含所有列元数据启动位置。
- 元数据在数据后写入,以允许进行一次传递写入。
- 所有列块都可以在文件元数据中找到,之后应按顺序读取。
- 格式被明确设计为将元数据与数据分开。这允许将列分成多个文件,并有一个元数据文件引用多个 parquet 文件。
3.5.4.3. S3 JSON 解析 复制链接链接已复制到粘贴板!
JSON 文档启用在没有限制的情况下的对象或数组中嵌套值。在 S3 选择引擎的 JSON 文档中查询特定值时,该值的位置通过 SELECT 语句中的路径指定。
JSON 文档的通用结构没有 CSV 和 Parquet 等行和列结构。相反,SQL 语句本身是查询 JSON 文档时定义行和列的 SQL 语句本身。
S3 选择引擎的 JSON 解析器解析 S3-objects,如下所示:
-
SELECT语句中的FROM子句定义行边界。 - JSON 文档中的一行与如何为 CSV 对象定义行的分隔符类似,以及如何使用行组定义 Parquet 对象的行
考虑以下示例:
示例
{ "firstName": "Joe", "lastName": "Jackson", "gender": "male", "age": "twenty" }, { "firstName": "Joe_2", "lastName": "Jackson_2", "gender": "male", "age": 21 }, "phoneNumbers": [ { "type": "home1", "number": "734928_1","addr": 11 }, { "type": "home2", "number": "734928_2","addr": 22 } ], "key_after_array": "XXX", "description" : { "main_desc" : "value_1", "second_desc" : "value_2" } # the from-clause define a single row. # _1 points to root object level. # _1.age appears twice in Documnet-row, the last value is used for the operation. query = "select _1.firstname,_1.key_after_array,_1.age+4,_1.description.main_desc,_1.description.second_desc from s3object[*].aa.bb.cc;"; expected_result = Joe_2,XXX,25,value_1,value_2-
语句指示读取器搜索路径
aa.bb.cc,并根据此路径的发生定义行边界。 -
当读取器遇到路径时,行开始,当读取器退出路径的最顶层部分时结束,本例中为对象
cc。
-
语句指示读取器搜索路径
3.5.5. 将 Ceph 对象网关与 Trino 集成 复制链接链接已复制到粘贴板!
将 Ceph 对象网关与 Trino 集成,这是一个重要的实用程序,允许用户在 S3 对象上更快地运行 SQL 查询 9x。
以下是使用 Trino 的一些优点:
- Trino 是一个完整的 SQL 引擎。
- 推送 S3 选择 Trino 引擎中的请求标识在服务器端运行具有成本效益的 SQL 语句的一部分。
- 使用 Ceph/S3select 的优化规则来提高性能。
- 利用 Red Hat Ceph Storage 可扩展性,将原始对象划分为多个等部分,执行 S3 选择请求并合并请求。
如果 s3select 语法在查询 trino 时无法正常工作,请使用 SQL 语法。
先决条件
- 正在运行的 Red Hat Ceph Storage 集群安装有 Ceph 对象网关。
- 安装了 Docker 或 Podman。
- bucket 已创建。
- 对象已上传。
流程
部署 Trino 和 hive。
示例
[cephuser@host01 ~]$ git clone https://github.com/ceph/s3select.git [cephuser@host01 ~]$ cd s3select使用 S3 端点、访问密钥和 secret 密钥修改
hms_trino.yaml文件。示例
[cephuser@host01 s3select]$ cat container/trino/hms_trino.yaml version: '3' services: hms: image: galsl/hms:dev container_name: hms environment: # S3_ENDPOINT the CEPH/RGW end-point-url - S3_ENDPOINT=http://rgw_ip:port - S3_ACCESS_KEY=abc - S3_SECRET_KEY=abc # the container starts with booting the hive metastore command: sh -c '. ~/.bashrc; start_hive_metastore' ports: - 9083:9083 networks: - trino_hms trino: image: trinodb/trino:405 container_name: trino volumes: # the trino directory contains the necessary configuration - ./trino:/etc/trino ports: - 8080:8080 networks: - trino_hms networks: trino_hm使用 S3 端点、访问密钥和 secret 密钥修改
hive.properties文件。示例
[cephuser@host01 s3select]$ cat container/trino/trino/catalog/hive.properties connector.name=hive hive.metastore.uri=thrift://hms:9083 #hive.metastore.warehouse.dir=s3a://hive/ hive.allow-drop-table=true hive.allow-rename-table=true hive.allow-add-column=true hive.allow-drop-column=true hive.allow-rename-column=true hive.non-managed-table-writes-enabled=true hive.s3select-pushdown.enabled=true hive.s3.aws-access-key=abc hive.s3.aws-secret-key=abc # should modify per s3-endpoint-url hive.s3.endpoint=http://rgw_ip:port #hive.s3.max-connections=1 #hive.s3select-pushdown.max-connections=1 hive.s3.connect-timeout=100s hive.s3.socket-timeout=100s hive.max-splits-per-second=10000 hive.max-split-size=128MB启动 Trino 容器,以集成 Ceph 对象网关。
示例
[cephuser@host01 s3select]$ sudo docker compose -f ./container/trino/hms_trino.yaml up -d验证集成。
示例
[cephuser@host01 s3select]$ sudo docker exec -it trino /bin/bash trino@66f753905e82:/$ trino trino> create schema hive.csvbkt1schema; trino> create table hive.csvbkt1schema.polariondatacsv(c1 varchar,c2 varchar, c3 varchar, c4 varchar, c5 varchar, c6 varchar, c7 varchar, c8 varchar, c9 varchar) WITH ( external_location = 's3a://csvbkt1/',format = 'CSV'); trino> select * from hive.csvbkt1schema.polariondatacsv;注意外部位置必须指向存储桶名称或目录,而不是文件末尾。