3.5. S3 选择操作
作为开发者,您可以运行 S3 选择来加快吞吐量。用户可以在没有介质器的情况下直接运行 S3 选择查询。
有三个 S3 选择工作流 - CSV、Apache Parquet (Parquet)和 JSON,为 CSV、Bquet 和 JSON 对象提供 S3 选择操作:
- CSV 文件以纯文本格式存储表格数据。文件的每一行都是数据记录。
- Parquet 是一个开源、面向列的数据文件格式,旨在高效数据存储和检索。它提供了高效率的数据压缩和编码方案,以提高性能,以批量处理复杂数据。Parquet 启用 S3 select-engine 跳过列和块,从而减少 IOPS (与 CSV 和 JSON 格式持续)。
- JSON 是格式结构。S3 选择引擎使用 JSON 格式输入数据顶部的 SQL 语句,从而可以扫描高度嵌套和复杂的 JSON 格式数据。
例如,带有数 GB 数据的 CSV、Parquet 或 JSON S3 对象允许用户提取单个列,使用以下查询过滤另一个列:
示例
select customerid from s3Object where age>30 and age<65;
目前,S3 对象必须通过 Ceph 对象网关从 Ceph OSD 检索数据,然后才能过滤和提取数据。当对象较大且查询更为具体时,性能会提高性能。比 CSV 更高效地处理 Parquet 格式。
先决条件
- 一个正在运行的 Red Hat Ceph Storage 集群。
- RESTful 客户端。
- 创建的用户访问权限的 S3 用户。
3.5.1. S3 从对象中选择内容
选择对象内容 API 通过结构化查询语言(SQL)过滤对象的内容。如需清单对象中应驻留的内容的说明,请参阅 AWS 系统管理器用户指南中的清单收集的元数据部分。清单内容会影响应针对该清单运行的查询类型。可能提供重要信息的 SQL 语句数量较大,但 S3 选择是类 SQL 的实用程序,因此不支持某些运算符,如 group-by
并加入。
仅对于 CSV,您必须将数据序列化格式指定为以逗号分隔的对象值,以检索指定的内容。Parquet 没有分隔符,因为它采用二进制格式。Amazon Web Services (AWS)命令行界面(CLI)选择对象内容使用 CSV 或 Parquet 格式将对象数据解析到记录中,仅返回查询中指定的记录。
您必须为响应指定数据序列化格式。这个操作必须具有 s3:GetObject
权限。
-
InputSerialization
元素描述了正在查询的对象中的数据格式。对象可以是 CSV 或 Parquet 格式。 -
OutputSerialization
元素是 AWS-CLI 用户客户端的一部分,并描述了如何格式化输出数据。Ceph 为 AWS-CLI 实施了服务器客户端,因此,根据当前仅 CSV 的OutputSerialization
提供相同的输出。 -
InputSerialization
的格式不需要与OutputSerialization
的格式匹配。例如,您可以在InputSerialization
中指定 Parquet,在OutputSerialization
中指定 CSV。
语法
POST /BUCKET/KEY?select&select-type=2 HTTP/1.1\r\n
示例
POST /testbucket/sample1csv?select&select-type=2 HTTP/1.1\r\n POST /testbucket/sample1parquet?select&select-type=2 HTTP/1.1\r\n
请求实体
Bucket
- 描述
- 要从中选择对象内容的存储桶。
- Type
- 字符串
- 必需
- 是
键
- 描述
- 对象键。
- 长度限制
- 最小长度为 1.
- Type
- 字符串
- 必需
- 是
SelectObjectContentRequest
- 描述
- 选择对象内容请求参数的根级别标签。
- Type
- 字符串
- 必需
- 是
表达式
- 描述
- 用于查询对象的表达式。
- Type
- 字符串
- 必需
- 是
ExpressionType
- 描述
- 示例 SQL 提供的表达式的类型。
- Type
- 字符串
- 有效值
- SQL
- 必需
- 是
InputSerialization
- 描述
- 描述正在查询的对象中数据的格式。
- Type
- 字符串
- 必需
- 是
OutputSerialization
- 描述
- 以逗号分隔符和新行返回的数据格式。
- Type
- 字符串
- 必需
- 是
响应实体
如果操作成功,服务会返回 HTTP 200
响应。服务以 XML 格式返回数据:
payload
- 描述
- 有效负载参数的根级别标签。
- Type
- 字符串
- 必需
- 是
Records
- 描述
- 记录事件。
- Type
- base64 编码的二进制数据对象
- 必需
- 否
Stats
- 描述
- stats 事件。
- Type
- Long
- 必需
- 否
Ceph 对象网关支持以下响应:
示例
{:event-type,records} {:content-type,application/octet-stream} {:message-type,event}
语法(对于 CSV)
aws --endpoint-URL http://localhost:80 s3api select-object-content --bucket BUCKET_NAME --expression-type 'SQL' --input-serialization '{"CSV": {"FieldDelimiter": "," , "QuoteCharacter": "\"" , "RecordDelimiter" : "\n" , "QuoteEscapeCharacter" : "\\" , "FileHeaderInfo": "USE" }, "CompressionType": "NONE"}' --output-serialization '{"CSV": {}}' --key OBJECT_NAME.csv --expression "select count(0) from s3object where int(_1)<10;" output.csv
示例(对于 CSV)
aws --endpoint-url http://localhost:80 s3api select-object-content --bucket testbucket --expression-type 'SQL' --input-serialization '{"CSV": {"FieldDelimiter": "," , "QuoteCharacter": "\"" , "RecordDelimiter" : "\n" , "QuoteEscapeCharacter" : "\\" , "FileHeaderInfo": "USE" }, "CompressionType": "NONE"}' --output-serialization '{"CSV": {}}' --key testobject.csv --expression "select count(0) from s3object where int(_1)<10;" output.csv
语法(针对 Parquet)
aws --endpoint-url http://localhost:80 s3api select-object-content --bucket BUCKET_NAME --expression-type 'SQL' --input-serialization '{"Parquet": {}, {"CompressionType": "NONE"}' --output-serialization '{"CSV": {}}' --key OBJECT_NAME.parquet --expression "select count(0) from s3object where int(_1)<10;" output.csv
示例(对于 Parquet)
aws --endpoint-url http://localhost:80 s3api select-object-content --bucket testbucket --expression-type 'SQL' --input-serialization '{"Parquet": {}, {"CompressionType": "NONE"}' --output-serialization '{"CSV": {}}' --key testobject.parquet --expression "select count(0) from s3object where int(_1)<10;" output.csv
语法(用于 JSON)
aws --endpoint-URL http://localhost:80 s3api select-object-content --bucket BUCKET_NAME --expression-type 'SQL' --input-serialization '{"JSON": {"CompressionType": "NONE"}' --output-serialization '{"CSV": {}}}' --key OBJECT_NAME.json --expression "select count(0) from s3object where int(_1)<10;" output.csv
示例(用于 JSON)
aws --endpoint-url http://localhost:80 s3api select-object-content --bucket testbucket --expression-type 'SQL' --input-serialization '{"JSON": {"CompressionType": "NONE"}' --output-serialization '{"CSV": {}}}' --key testobject.json --expression "select count(0) from s3object where int(_1)<10;" output.csv
示例(用于 BOTO3)
import pprint import boto3 from botocore.exceptions import ClientError def run_s3select(bucket,key,query,column_delim=",",row_delim="\n",quot_char='"',esc_char='\\',csv_header_info="NONE"): s3 = boto3.client('s3', endpoint_url=endpoint, aws_access_key_id=access_key, region_name=region_name, aws_secret_access_key=secret_key) result = "" try: r = s3.select_object_content( Bucket=bucket, Key=key, ExpressionType='SQL', InputSerialization = {"CSV": {"RecordDelimiter" : row_delim, "FieldDelimiter" : column_delim,"QuoteEscapeCharacter": esc_char, "QuoteCharacter": quot_char, "FileHeaderInfo": csv_header_info}, "CompressionType": "NONE"}, OutputSerialization = {"CSV": {}}, Expression=query, RequestProgress = {"Enabled": progress}) except ClientError as c: result += str(c) return result for event in r['Payload']: if 'Records' in event: result = "" records = event['Records']['Payload'].decode('utf-8') result += records if 'Progress' in event: print("progress") pprint.pprint(event['Progress'],width=1) if 'Stats' in event: print("Stats") pprint.pprint(event['Stats'],width=1) if 'End' in event: print("End") pprint.pprint(event['End'],width=1) return result run_s3select( "my_bucket", "my_csv_object", "select int(_1) as a1, int(_2) as a2 , (a1+a2) as a3 from s3object where a3>100 and a3<300;")
支持的功能
目前只支持 AWS s3 选择命令的一部分:
其它资源
- 如需了解更多详细信息,请参阅 Amazon 的 S3 Select Object Content API。
3.5.2. S3 支持的选择功能
S3 选择支持以下功能:.Timestamp
- to_timestamp(string)
- 描述
- 将字符串转换为时间戳基本类型。在字符串格式中,任何缺少的 'time' 值都会填充零;对于缺少的月份和天值,1 是默认值。'timezone' 格式是 +/-H:mm 或 Z,其中字母 'Z' 表示协调通用时间(UTC)。timezone 的值可以包括 - 12:00 和 +14:00。
- 支持
目前,它可以将以下字符串格式转换为时间戳:
- YYYY-MM-DDTHH:mm:ss.SSSSSS+/-HH:mm
- YYYY-MM-DDTHH:mm:ss.SSSSSSZ
- YYYY-MM-DDTHH:mm:ss+/-HH:mm
- YYYY-MM-DDTHH:mm:ssZ
- YYYY-MM-DDTHH:mm+/-HH:mm
- YYYY-MM-DDTHH:mmZ
- YYYY-MM-DDT
- YYYYT
- to_string (timestamp, format_pattern)
- 描述
- 以给定的输入字符串格式返回输入时间戳的字符串。
- 参数
格式 | 示例 | 描述 |
---|---|---|
yy | 69 | 2 年数字. |
y | 1969 | 4 年数字。 |
YYYY | 1969 | 零添加的 4 位数年。 |
M | 1 | 年月。 |
MM | 01 | 每年的零添加月数. |
MMM | 1 月 | 每年名称的缩写月。 |
MMMM | 1 月 | 年全月名称。 |
MMMMM | J |
年前一个字母的月。与 |
d | 2 | 日期(1-31)。 |
dd | 02 | 每月零添加一天(01-31)。 |
a | AM | 每天的 AM 或 PM。 |
h | 3 | 天中的小时(1-12)。 |
hh | 03 | 零添加小时的一天(01-12)。 |
H | 3 | 天中的小时(0-23)。 |
HH | 03 | 每天的零添加小时(00-23)。 |
m | 4 | 小时的分钟(0-59)。 |
mm | 04 | 小时的零添加分钟(00-59)。 |
s | 5 | 第二分钟(0-59)。 |
ss | 05 | 分钟的零添加秒(00-59)。 |
S | 1 | 第二部分(精度: 0.1,范围: 0.0-0.9)。 |
SS | 12 | 第二部分(精度: 0.01,范围: 0.0-0.99)。 |
SSS | 123 | 第二部分(精度: 0.01,范围: 0.0-0.999)。 |
SSSS | 1234 | 第二部分(精度: 0.001,范围: 0.0-0.9999)。 |
SSSSSS | 123456 | 第二部分(最大精度):1纳秒,范围: 0.0-0.999999)。 |
n | 60000000 | second 的 nano。 |
X | +07 或 Z | 如果偏移为 0,则以小时或"Z"表示偏移量。 |
XX 或 XXXX | +0700 或 Z | 如果偏移为 0,则以小时和"Z"表示偏移量。 |
XXX 或 XXXXX | +07:00 或 Z | 如果偏移为 0,则以小时和"Z"表示偏移量。 |
x | 7 | 以小时为单位进行偏移。 |
XX 或 xxxx | 700 | 以小时和分钟为单位的偏移。 |
xxx 或 xxxxx | +07:00 | 以小时和分钟为单位的偏移。 |
- extract (date-part from timestamp)
- 描述
- 根据 date-part 从输入时间戳中提取的整数。
- 支持
- year, month, week, day, hour, minute, second, timezone_hour, timezone_minute.
- date_add(date-part ,integer,timestamp)
- 描述
- 返回时间戳,根据输入时间戳和日期部分的结果计算。
- 支持
- 年,月份、天、小时、分钟、秒。
- date_diff(date-part,timestamp,timestamp)
- 描述
- 返回整数,根据 date-part 在两个时间戳之间计算的结果。
- 支持
- 年,月份、天、小时、分钟、秒。
- utcnow()
- 描述
- 返回当前时间的时间戳。
聚合
- count()
- 描述
- 根据与条件匹配的行数返回整数(如果存在)。
- sum(expression)
- 描述
- 如果出现某个条件,则每行上返回表达式摘要。
- avg(expression)
- 描述
- 如果出现某个条件,每行中返回一个平均表达式。
- max(expression)
- 描述
- 如果出现某个条件,则返回与条件匹配的所有表达式的最大结果。
- min(expression)
- 描述
- 返回与条件匹配的所有表达式的最小结果。
字符串
- 子字符串(字符串、from、for)
- 描述
- 为输入返回字符串从输入字符串中提取的字符串。
- Char_length
- 描述
- 返回字符串中的多个字符。Character_length 也实现相同的目的。
- trim ([[leading | trailing | both remove_chars] from] string)
- 描述
- 从目标字符串中修剪前/尾部(或两者)字符。默认值为空白字符。
- Upper\lower
- 描述
- 将字符转换为大写或小写。
NULL
NULL
值缺失或未知,即 NULL
无法为任何算术操作生成一个值。这同样适用于算术比较,对 NULL
的任何比较都是未知的 NULL
。
A is NULL | result (NULL=UNKNOWN) |
---|---|
不是 A |
|
A 或 False |
|
A or True |
|
A 或 A |
|
A 和 False |
|
A 和 True |
|
A 和 A |
|
其它资源
- 如需了解更多详细信息,请参阅 Amazon 的 S3 Select Object Content API。
3.5.3. S3 别名编程结构
别名编程结构是 s3 选择语言的重要组成部分,因为它可以为包含许多列或复杂查询的对象启用更好的编程。当解析带有别名结构的声明时,它会将别名替换为对右投射列和查询执行的引用,该引用将象任何其他表达式一样评估。别名维护结果缓存,如果别名被多次使用,则不会评估相同的表达式,并返回相同的结果,因为使用了缓存的结果。目前,红帽支持列别名。
示例
select int(_1) as a1, int(_2) as a2 , (a1+a2) as a3 from s3object where a3>100 and a3<300;")
3.5.4. S3 解析解释
S3 选择引擎具有所有三个文件格式的解析器:CSV、Brquet 和 JSON,后者将命令划分为更多可处理组件,然后附加到定义每个组件的标签中。
3.5.4.1. S3 CSV 解析
带有输入序列化的 CSV 定义使用以下默认值:
-
将
{\n}'
用于 row-delimiter。 -
使用
{"}
括起内容。 -
使用
{\}
转义字符。
csv-header-info
会在 AWS-CLI 中显示 USE
时解析,这是包含该模式的输入对象中的第一行。目前,不支持输出序列化和压缩类型。S3 选择引擎具有 CSV 解析器,它解析 S3-objects:
- 每行以 row-delimiter 结尾。
- field-separator 会分离相邻的列。
-
successive 字段分隔符定义
NULL
列。 - quote-character 覆盖 field-separator;即,字段分隔符是引号之间的任何字符。
- 转义字符禁用除行分隔符之外的任何特殊字符。
以下是 CSV 解析规则的示例:
功能 | 描述 | 输入(Tokens) |
---|---|---|
| successive 字段分隔符 |
|
| quote 字符覆盖字段分隔符。 |
|
| 转义字符覆盖了 元字符。 |
对象所有者 |
| 没有关闭的引号;行分隔符是右行。 |
|
| FileHeaderInfo 标签 | USE 值表示第一行中的每个令牌都是列名称;IGNORE 值意味着跳过第一行。 |
其它资源
- 如需了解更多详细信息,请参阅 Amazon 的 S3 Select Object Content API。
3.5.4.2. S3 Parquet 解析
Apache Parquet 是一个开源列式数据文件格式,旨在高效数据存储和检索。
S3 选择引擎的 Parquet 解析器解析 S3-objects,如下所示:
示例
4-byte magic number "PAR1" <Column 1 Chunk 1 + Column Metadata> <Column 2 Chunk 1 + Column Metadata> ... <Column N Chunk 1 + Column Metadata> <Column 1 Chunk 2 + Column Metadata> <Column 2 Chunk 2 + Column Metadata> ... <Column N Chunk 2 + Column Metadata> ... <Column 1 Chunk M + Column Metadata> <Column 2 Chunk M + Column Metadata> ... <Column N Chunk M + Column Metadata> File Metadata 4-byte length in bytes of file metadata 4-byte magic number "PAR1"
- 在上例中,此表中有 N 列,被分成 M 行组。文件元数据包含所有列元数据启动位置。
- 元数据在数据后写入,以允许进行一次传递写入。
- 所有列块都可以在文件元数据中找到,之后应按顺序读取。
- 格式被明确设计为将元数据与数据分开。这允许将列分成多个文件,并有一个元数据文件引用多个 parquet 文件。
3.5.4.3. S3 JSON 解析
JSON 文档启用在没有限制的情况下的对象或数组中嵌套值。在 S3 选择引擎的 JSON 文档中查询特定值时,该值的位置通过 SELECT
语句中的路径指定。
JSON 文档的通用结构没有 CSV 和 Parquet 等行和列结构。相反,SQL 语句本身是查询 JSON 文档时定义行和列的 SQL 语句本身。
S3 选择引擎的 JSON 解析器解析 S3-objects,如下所示:
-
SELECT
语句中的FROM
子句定义行边界。 - JSON 文档中的一行与如何为 CSV 对象定义行的分隔符类似,以及如何使用行组定义 Parquet 对象的行
考虑以下示例:
示例
{ "firstName": "Joe", "lastName": "Jackson", "gender": "male", "age": "twenty" }, { "firstName": "Joe_2", "lastName": "Jackson_2", "gender": "male", "age": 21 }, "phoneNumbers": [ { "type": "home1", "number": "734928_1","addr": 11 }, { "type": "home2", "number": "734928_2","addr": 22 } ], "key_after_array": "XXX", "description" : { "main_desc" : "value_1", "second_desc" : "value_2" } # the from-clause define a single row. # _1 points to root object level. # _1.age appears twice in Documnet-row, the last value is used for the operation. query = "select _1.firstname,_1.key_after_array,_1.age+4,_1.description.main_desc,_1.description.second_desc from s3object[*].aa.bb.cc;"; expected_result = Joe_2,XXX,25,value_1,value_2
-
语句指示读取器搜索路径
aa.bb.cc
,并根据此路径的发生定义行边界。 -
当读取器遇到路径时,行开始,当读取器退出路径的最顶层部分时结束,本例中为对象
cc
。
-
语句指示读取器搜索路径
3.5.5. 将 Ceph 对象网关与 Trino 集成
将 Ceph 对象网关与 Trino 集成,这是一个重要的实用程序,允许用户在 S3 对象上更快地运行 SQL 查询 9x。
以下是使用 Trino 的一些优点:
- Trino 是一个完整的 SQL 引擎。
- 推送 S3 选择 Trino 引擎中的请求标识在服务器端运行具有成本效益的 SQL 语句的一部分。
- 使用 Ceph/S3select 的优化规则来提高性能。
- 利用 Red Hat Ceph Storage 可扩展性,将原始对象划分为多个等部分,执行 S3 选择请求并合并请求。
如果 s3select
语法在查询 trino 时无法正常工作,请使用 SQL 语法。
先决条件
- 正在运行的 Red Hat Ceph Storage 集群安装有 Ceph 对象网关。
- 安装了 Docker 或 Podman。
- bucket 已创建。
- 对象已上传。
流程
部署 Trino 和 hive。
示例
[cephuser@host01 ~]$ git clone https://github.com/ceph/s3select.git [cephuser@host01 ~]$ cd s3select
使用 S3 端点、访问密钥和 secret 密钥修改
hms_trino.yaml
文件。示例
[cephuser@host01 s3select]$ cat container/trino/hms_trino.yaml version: '3' services: hms: image: galsl/hms:dev container_name: hms environment: # S3_ENDPOINT the CEPH/RGW end-point-url - S3_ENDPOINT=http://rgw_ip:port - S3_ACCESS_KEY=abc - S3_SECRET_KEY=abc # the container starts with booting the hive metastore command: sh -c '. ~/.bashrc; start_hive_metastore' ports: - 9083:9083 networks: - trino_hms trino: image: trinodb/trino:405 container_name: trino volumes: # the trino directory contains the necessary configuration - ./trino:/etc/trino ports: - 8080:8080 networks: - trino_hms networks: trino_hm
使用 S3 端点、访问密钥和 secret 密钥修改
hive.properties
文件。示例
[cephuser@host01 s3select]$ cat container/trino/trino/catalog/hive.properties connector.name=hive hive.metastore.uri=thrift://hms:9083 #hive.metastore.warehouse.dir=s3a://hive/ hive.allow-drop-table=true hive.allow-rename-table=true hive.allow-add-column=true hive.allow-drop-column=true hive.allow-rename-column=true hive.non-managed-table-writes-enabled=true hive.s3select-pushdown.enabled=true hive.s3.aws-access-key=abc hive.s3.aws-secret-key=abc # should modify per s3-endpoint-url hive.s3.endpoint=http://rgw_ip:port #hive.s3.max-connections=1 #hive.s3select-pushdown.max-connections=1 hive.s3.connect-timeout=100s hive.s3.socket-timeout=100s hive.max-splits-per-second=10000 hive.max-split-size=128MB
启动 Trino 容器,以集成 Ceph 对象网关。
示例
[cephuser@host01 s3select]$ sudo docker compose -f ./container/trino/hms_trino.yaml up -d
验证集成。
示例
[cephuser@host01 s3select]$ sudo docker exec -it trino /bin/bash trino@66f753905e82:/$ trino trino> create schema hive.csvbkt1schema; trino> create table hive.csvbkt1schema.polariondatacsv(c1 varchar,c2 varchar, c3 varchar, c4 varchar, c5 varchar, c6 varchar, c7 varchar, c8 varchar, c9 varchar) WITH ( external_location = 's3a://csvbkt1/',format = 'CSV'); trino> select * from hive.csvbkt1schema.polariondatacsv;
注意外部位置必须指向存储桶名称或目录,而不是文件末尾。