3.5. S3 选择操作
作为开发者,您可以运行 S3 选择以加快吞吐量。用户可以在没有介质器的情况下直接运行 S3 选择查询。
有三个 S3 选择工作流 - CSV、Apache Parquet (Parquet)和 JSON,它们为 CSV、Parquet 和 JSON 对象提供 S3 选择操作:
- CSV 文件以纯文本格式存储表格数据。文件的每一行都是数据记录。
- Parquet 是一个开源、面向列的数据文件格式,旨在有效数据存储和检索。它提供高效的数据压缩和编码方案,提高性能以批量处理复杂数据。Parquet 允许 S3 select-engine 跳过列和块,从而显著减少 IOPS (与 CSV 和 JSON 格式保持一致)。
- JSON 是一种格式结构。S3 选择引擎利用 JSON 读取器在 JSON 格式输入数据上使用 SQL 语句,从而扫描高度嵌套和复杂的 JSON 格式的数据。
例如,带有几GB 数据的 CSV、Parquet 或 JSON S3 对象,用户可以使用以下查询提取单个列,该列由另一列过滤:
Example
select customerid from s3Object where age>30 and age<65;
目前,S3 对象必须通过 Ceph 对象网关从 Ceph OSD 检索数据,然后才能过滤和提取数据。当对象较大且查询更为具体时,会提高性能。与 CSV 相比,Parquet 格式可以更有效地处理。
先决条件
- 一个正在运行的 Red Hat Ceph Storage 集群。
- RESTful 客户端。
- 创建的用户具有访问权限的 S3 用户。
3.5.1. S3 从对象中选择内容
select 对象内容 API 通过结构化查询语言(SQL)过滤对象的内容。如需清单对象中应驻留的内容的说明,请参阅 AWS 系统管理器用户指南中的清单收集的元数据部分。清单内容会影响应针对该清单运行的查询类型。可能提供必要信息的 SQL 语句数量较大,但 S3 选择是一个类似于 SQL 的实用程序,因此不支持一些运算符,如 group-by
和 join
。
对于 CSV,您必须将数据序列化格式指定为以逗号分隔的对象值,才能检索指定的内容。Parquet 没有分隔符,因为它采用二进制格式。Amazon Web Services (AWS)命令行界面(CLI)选择对象内容使用 CSV 或 Parquet 格式将对象数据解析到记录中,仅返回查询中指定的记录。
您必须为响应指定数据序列化格式。这个操作必须具有 s3:GetObject
权限。
-
InputSerialization
元素描述正在查询的对象中数据的格式。对象可以是 CSV 或 Parquet 格式。 -
OutputSerialization
元素是 AWS-CLI 用户客户端的一部分,描述了如何格式化输出数据。Ceph 为 AWS-CLI 实施服务器客户端,因此根据OutputSerialization
(当前仅为 CSV)提供相同的输出。 -
InputSerialization
的格式不需要与OutputSerialization
的格式匹配。例如,您可以在InputSerialization
中指定 Parquet,在OutputSerialization
中指定 CSV。
语法
POST /BUCKET/KEY?select&select-type=2 HTTP/1.1\r\n
示例
POST /testbucket/sample1csv?select&select-type=2 HTTP/1.1\r\n POST /testbucket/sample1parquet?select&select-type=2 HTTP/1.1\r\n
请求实体
Bucket
- 描述
- 要从中选择对象内容的存储桶。
- Type
- 字符串
- 必需
- 是
键
- 描述
- 对象密钥。
- 长度限制
- 最小长度为 1.
- Type
- 字符串
- 必需
- 是
SelectObjectContentRequest
- 描述
- 选择对象内容请求参数的根级别标签。
- Type
- 字符串
- 必需
- 是
表达式
- 描述
- 用于查询对象的表达式。
- Type
- 字符串
- 必需
- 是
ExpressionType
- 描述
- 示例 SQL 提供的表达式的类型。
- Type
- 字符串
- 有效值
- SQL
- 必需
- 是
InputSerialization
- 描述
- 描述正在查询的对象中数据的格式。
- Type
- 字符串
- 必需
- 是
OutputSerialization
- 描述
- 以逗号分隔符和换行符返回的数据格式。
- Type
- 字符串
- 必需
- 是
响应实体
如果操作成功,服务会返回 HTTP 200
响应。服务以 XML 格式返回数据:
payload
- 描述
- 有效负载参数的根级别标签。
- Type
- 字符串
- 必需
- 是
Records
- 描述
- 记录事件。
- Type
- base64 编码的二进制数据对象
- 必需
- 否
Stats
- 描述
- stats 事件。
- Type
- Long
- 必需
- 否
Ceph 对象网关支持以下响应:
示例
{:event-type,records} {:content-type,application/octet-stream} {:message-type,event}
语法(用于 CSV)
aws --endpoint-URL http://localhost:80 s3api select-object-content --bucket BUCKET_NAME --expression-type 'SQL' --input-serialization '{"CSV": {"FieldDelimiter": "," , "QuoteCharacter": "\"" , "RecordDelimiter" : "\n" , "QuoteEscapeCharacter" : "\\" , "FileHeaderInfo": "USE" }, "CompressionType": "NONE"}' --output-serialization '{"CSV": {}}' --key OBJECT_NAME.csv --expression "select count(0) from s3object where int(_1)<10;" output.csv
示例(用于 CSV)
aws --endpoint-url http://localhost:80 s3api select-object-content --bucket testbucket --expression-type 'SQL' --input-serialization '{"CSV": {"FieldDelimiter": "," , "QuoteCharacter": "\"" , "RecordDelimiter" : "\n" , "QuoteEscapeCharacter" : "\\" , "FileHeaderInfo": "USE" }, "CompressionType": "NONE"}' --output-serialization '{"CSV": {}}' --key testobject.csv --expression "select count(0) from s3object where int(_1)<10;" output.csv
语法(用于 Parquet)
aws --endpoint-url http://localhost:80 s3api select-object-content --bucket BUCKET_NAME --expression-type 'SQL' --input-serialization '{"Parquet": {}, {"CompressionType": "NONE"}' --output-serialization '{"CSV": {}}' --key OBJECT_NAME.parquet --expression "select count(0) from s3object where int(_1)<10;" output.csv
示例(用于 Parquet)
aws --endpoint-url http://localhost:80 s3api select-object-content --bucket testbucket --expression-type 'SQL' --input-serialization '{"Parquet": {}, {"CompressionType": "NONE"}' --output-serialization '{"CSV": {}}' --key testobject.parquet --expression "select count(0) from s3object where int(_1)<10;" output.csv
语法(用于 JSON)
aws --endpoint-URL http://localhost:80 s3api select-object-content --bucket BUCKET_NAME --expression-type 'SQL' --input-serialization '{"JSON": {"CompressionType": "NONE"}' --output-serialization '{"CSV": {}}}' --key OBJECT_NAME.json --expression "select count(0) from s3object where int(_1)<10;" output.csv
示例(用于 JSON)
aws --endpoint-url http://localhost:80 s3api select-object-content --bucket testbucket --expression-type 'SQL' --input-serialization '{"JSON": {"CompressionType": "NONE"}' --output-serialization '{"CSV": {}}}' --key testobject.json --expression "select count(0) from s3object where int(_1)<10;" output.csv
示例(如 BOTO3)
import pprint import boto3 from botocore.exceptions import ClientError def run_s3select(bucket,key,query,column_delim=",",row_delim="\n",quot_char='"',esc_char='\\',csv_header_info="NONE"): s3 = boto3.client('s3', endpoint_url=endpoint, aws_access_key_id=access_key, region_name=region_name, aws_secret_access_key=secret_key) result = "" try: r = s3.select_object_content( Bucket=bucket, Key=key, ExpressionType='SQL', InputSerialization = {"CSV": {"RecordDelimiter" : row_delim, "FieldDelimiter" : column_delim,"QuoteEscapeCharacter": esc_char, "QuoteCharacter": quot_char, "FileHeaderInfo": csv_header_info}, "CompressionType": "NONE"}, OutputSerialization = {"CSV": {}}, Expression=query, RequestProgress = {"Enabled": progress}) except ClientError as c: result += str(c) return result for event in r['Payload']: if 'Records' in event: result = "" records = event['Records']['Payload'].decode('utf-8') result += records if 'Progress' in event: print("progress") pprint.pprint(event['Progress'],width=1) if 'Stats' in event: print("Stats") pprint.pprint(event['Stats'],width=1) if 'End' in event: print("End") pprint.pprint(event['End'],width=1) return result run_s3select( "my_bucket", "my_csv_object", "select int(_1) as a1, int(_2) as a2 , (a1+a2) as a3 from s3object where a3>100 and a3<300;")
支持的功能
目前只支持 AWS s3 选择命令的一部分:
其它资源
- 如需了解更多详细信息,请参阅 Amazon 的 S3 Select Object Content API。
3.5.2. S3 支持的选择功能
S3 选择支持以下功能:.Timestamp
- to_timestamp(string)
- 描述
- 将字符串转换为时间戳基本类型。在字符串格式中,任何缺少的 'time' 值都会填充零;对于缺失的月和天值,1 是默认值。'timezone' 采用 +/-H:mm 或 Z 格式,其中字母 'Z' 表示协调通用时间(UTC)。时区值可以在 - 12:00 和 +14:00 之间范围。
- 支持
目前,它可将以下字符串格式转换为时间戳:
- YYYY-MM-DDTHH:mm:ss.SSSSSS+/-HH:mm
- YYYY-MM-DDTHH:mm:ss.SSSSSSZ
- YYYY-MM-DDTHH:mm:ss+/-HH:mm
- YYYY-MM-DDTHH:mm:ssZ
- YYYY-MM-DDTHH:mm+/-HH:mm
- YYYY-MM-DDTHH:mmZ
- YYYY-MM-DDT
- YYYYT
- to_string (timestamp, format_pattern)
- 描述
- 以给定输入字符串格式返回输入时间戳的字符串表示。
- 参数
格式 | Example | 描述 |
---|---|---|
yy | 69 | 2 年位. |
y | 1969 | 4 年位. |
YYYY | 1969 | 零添加的 4 位。 |
M | 1 | 年月。 |
MM | 01 | 一年的零添加月。 |
MMM | 1 月 | 年名称的缩写月份。 |
MMMM | 1 月 | 年名的全月。 |
MMMMM | J |
首字母年月.对 |
d | 2 | 月份的日期(1-31)。 |
dd | 02 | 月份的零添加日(01-31)。 |
a | AM | 每天的早上或 PM. |
h | 3 | 天小时(1-12)。 |
hh | 03 | 零添加小时(01-12)。 |
H | 3 | 一天的小时(0-23)。 |
HH | 03 | 天(00-23)的零添加小时。 |
m | 4 | 小时的分钟(0-59)。 |
mm | 04 | 小时(00-59)的零添加分钟。 |
s | 5 | 第二分钟(0-59)。 |
ss | 05 | 分钟的零添加第二分钟(00-59)。 |
S | 1 | 第二部分(precision: 0.1, range: 0.0-0.9)。 |
SS | 12 | 第二部分(precision: 0.01, range: 0.0-0.99)。 |
SSS | 123 | 第二部分(precision: 0.01, range: 0.0-0.999)。 |
SSSS | 1234 | 第二部分(precision: 0.001, range: 0.0-0.9999)。 |
SSSSSS | 123456 | 第二部分(最大精度值:1 nanosecond, range: 0.0-0.999999)。 |
n | 60000000 | 纳纳秒。 |
X | +07 或 Z | 如果偏移量为 0,以小时或 "Z" 为单位的偏移量。 |
XX 或 XXXX | +0700 或 Z | 如果偏移量为 0,以小时、分钟或"Z"为单位的偏移量。 |
XXX 或 XXXXX | +07:00 或 Z | 如果偏移量为 0,以小时、分钟或"Z"为单位的偏移量。 |
x | 7 | 以小时为单位的偏移量。 |
XX 或 xxxx | 700 | 以小时和分钟为单位的偏移量。 |
xxx 或 xxxxx | +07:00 | 以小时和分钟为单位的偏移量。 |
- extract (date-part from timestamp)
- 描述
- 根据 date-part 从输入时间戳中提取的整数。
- 支持
- year, month, week, day, hour, minute, second, timezone_hour, timezone_minute.
- date_add(date-part ,integer,timestamp)
- 描述
- 根据输入时间戳和 date-part 的结果返回时间戳,计算结果。
- 支持
- year, month, day, hour, minute, second.
- date_diff(date-part,timestamp,timestamp)
- 描述
- 返回整数,根据 date-part 在两个时间戳之间计算的结果。
- 支持
- year, month, day, hour, minute, second.
- utcnow()
- 描述
- 返回当前时间的时间戳。
聚合
- count()
- 描述
- 根据与条件匹配的行数返回整数。
- sum(expression)
- 描述
- 如果出现某个条件,则每行上返回表达式摘要。
- avg(expression)
- 描述
- 如果出现条件,则每行中返回一个平均表达式。
- max(expression)
- 描述
- 如果出现某个条件,则返回与条件匹配的所有表达式的最大结果。
- min(expression)
- 描述
- 如果出现一个条件,则返回与条件匹配的所有表达式的最小结果。
字符串
- 子字符串(字符串,from,for)
- 描述
- 根据 from,返回从输入字符串中提取的字符串,以用于输入。
- Char_length
- 描述
- 返回字符串中的多个字符。Character_length 也实现相同的目的。
- trim ([[leading | trailing | both remove_chars] from] string)
- 描述
- 从目标字符串中修剪前导/结尾(或两者)字符。默认值为空白字符。
- Upper\lower
- 描述
- 将字符转换为大写或小写。
NULL
NULL
值缺失或未知,即 NULL
无法为任何算术操作生成一个值。这同样适用于算术比较,任何与 NULL
的比较都是未知的 NULL
。
A is NULL | 结果(NULL=UNKNOWN) |
---|---|
非 A |
|
A 或 False |
|
A or True |
|
A 或 A |
|
A 和 False |
|
A 和 True |
|
A 和 A |
|
其它资源
- 如需了解更多详细信息,请参阅 Amazon 的 S3 Select Object Content API。
3.5.3. S3 别名编程结构
别名编程结构是 s3 选择语言的重要组成部分,因为它可以为包含许多列或复杂查询的对象启用更好的编程。当解析带有别名构造的语句时,它会将 alias 替换为对右 projection 列的引用,并在查询执行时对其进行评估,与任何其他表达式一样进行评估。alias 维护结果缓存,如果多次使用别名,则不会评估同一表达式,并返回相同的结果,因为使用了缓存的结果。目前,红帽支持列别名。
Example
select int(_1) as a1, int(_2) as a2 , (a1+a2) as a3 from s3object where a3>100 and a3<300;")
3.5.4. S3 解析解释
S3 选择引擎具有所有三种文件格式的解析器 - CSV、Parquet 和 JSON,将命令划分为更可处理的组件,然后附加到定义各个组件的标记。
3.5.4.1. S3 CSV 解析
带有输入序列化的 CSV 定义使用以下默认值:
-
将
{\n}'
用于 row-delimiter。 -
使用
{"}
括起内容。 -
使用
{\}
转义字符。
csv-header-info
在 AWS-CLI 中显示的 USE
上解析;这是包含 schema 的输入对象中的第一行。目前,不支持输出序列化和 compression-type。S3 选择引擎具有 CSV 解析器,它解析 S3-objects:
- 每行以 row-delimiter 结尾。
- field-separator 会分离相邻的列。
-
successive 字段分隔符定义
NULL
列。 - quote-character 覆盖 field-separator;也就是说,字段分隔符是引号之间的任何字符。
- 转义字符禁用除行分隔符之外的任何特殊字符。
以下是 CSV 解析规则的示例:
功能 | 描述 | 输入(Tokens) |
---|---|---|
| successive 字段分隔符 |
|
| quote 字符覆盖字段分隔符。 |
|
| 转义字符会覆盖分隔符。 |
对象所有者 |
| 没有关闭的引号;行分隔符是关闭行。 |
|
| FileHeaderInfo 标签 | USE 值表示第一行上的每个令牌都是列名称;IGNORE 值意味着跳过第一行。 |
其它资源
- 如需了解更多详细信息,请参阅 Amazon 的 S3 Select Object Content API。
3.5.4.2. S3 Parquet 解析
Apache Parquet 是一个开源、列式数据文件格式,旨在有效数据存储和检索。
S3 选择引擎的 Parquet parser 解析 S3-objects,如下所示:
Example
4-byte magic number "PAR1" <Column 1 Chunk 1 + Column Metadata> <Column 2 Chunk 1 + Column Metadata> ... <Column N Chunk 1 + Column Metadata> <Column 1 Chunk 2 + Column Metadata> <Column 2 Chunk 2 + Column Metadata> ... <Column N Chunk 2 + Column Metadata> ... <Column 1 Chunk M + Column Metadata> <Column 2 Chunk M + Column Metadata> ... <Column N Chunk M + Column Metadata> File Metadata 4-byte length in bytes of file metadata 4-byte magic number "PAR1"
- 在上例中,此表中有 N 列,被分成 M 行组。文件元数据包含所有列元数据开始位置的位置。
- 元数据在数据后写入,以允许进行单个传递写入。
- 所有列块都可以在文件元数据中找到,之后应该按顺序读取。
- 格式被明确设计为将元数据与数据分开。这允许将列分成多个文件,并让单个元数据文件引用多个 parquet 文件。
3.5.4.3. S3 JSON 解析
JSON 文档可在对象或数组内启用嵌套值。在 S3 选择引擎的 JSON 文档中查询特定值时,该值的位置将通过 SELECT
语句中的路径来指定。
JSON 文档的通用结构没有 CSV 和 Parquet 等一行和列结构。相反,它是 SQL 语句本身,它在查询 JSON 文档时定义行和列。
S3 选择引擎的 JSON 解析器解析 S3-objects,如下所示:
-
SELECT
语句中的FROM
子句定义行边界。 - JSON 文档中的一行与使用行分隔符定义 CSV 对象行的方式类似,以及如何使用行组为 Parquet 对象定义行
考虑以下示例:
Example
{ "firstName": "Joe", "lastName": "Jackson", "gender": "male", "age": "twenty" }, { "firstName": "Joe_2", "lastName": "Jackson_2", "gender": "male", "age": 21 }, "phoneNumbers": [ { "type": "home1", "number": "734928_1","addr": 11 }, { "type": "home2", "number": "734928_2","addr": 22 } ], "key_after_array": "XXX", "description" : { "main_desc" : "value_1", "second_desc" : "value_2" } # the from-clause define a single row. # _1 points to root object level. # _1.age appears twice in Documnet-row, the last value is used for the operation. query = "select _1.firstname,_1.key_after_array,_1.age+4,_1.description.main_desc,_1.description.second_desc from s3object[*].aa.bb.cc;"; expected_result = Joe_2,XXX,25,value_1,value_2
-
语句指示读者搜索路径
aa.bb.cc
,并根据此路径发生定义行边界。 -
当读者遇到路径时,一行将开始,当读取器退出路径的最内部部分时,该路径的最内部部分是对象
cc
。
-
语句指示读者搜索路径
3.5.5. 将 Ceph 对象网关与 Trino 集成
将 Ceph 对象网关与 Trino 集成,这是可让用户在 S3 对象上更快地运行 SQL 查询 9x 的重要实用程序。
以下是使用 Trino 的一些优点:
- Trino 是一个完整的 SQL 引擎。
- 推送 S3 选择请求,其中 Trino 引擎标识 SQL 语句的一部分,该语句在服务器端运行效率更高。
- 使用 Ceph/S3select 的优化规则来提高性能。
- 利用 Red Hat Ceph Storage 可扩展性并将原始对象划分为多个相等部分,执行 S3 选择请求,并合并请求。
如果在通过 trino 查询时 s3select
语法不起作用,请使用 SQL 语法。
先决条件
- 正在运行的 Red Hat Ceph Storage 集群安装有 Ceph 对象网关。
- 安装了 Docker 或 Podman。
- 已创建存储桶。
- 对象被上传。
流程
部署 Trino 和 hive.
Example
[cephuser@host01 ~]$ git clone https://github.com/ceph/s3select.git [cephuser@host01 ~]$ cd s3select
使用 S3 端点、访问密钥和 secret 密钥修改
hms_trino.yaml
文件。Example
[cephuser@host01 s3select]$ cat container/trino/hms_trino.yaml version: '3' services: hms: image: galsl/hms:dev container_name: hms environment: # S3_ENDPOINT the CEPH/RGW end-point-url - S3_ENDPOINT=http://rgw_ip:port - S3_ACCESS_KEY=abc - S3_SECRET_KEY=abc # the container starts with booting the hive metastore command: sh -c '. ~/.bashrc; start_hive_metastore' ports: - 9083:9083 networks: - trino_hms trino: image: trinodb/trino:405 container_name: trino volumes: # the trino directory contains the necessary configuration - ./trino:/etc/trino ports: - 8080:8080 networks: - trino_hms networks: trino_hm
使用 S3 端点、访问密钥和 secret 密钥修改
hive.properties
文件。Example
[cephuser@host01 s3select]$ cat container/trino/trino/catalog/hive.properties connector.name=hive hive.metastore.uri=thrift://hms:9083 #hive.metastore.warehouse.dir=s3a://hive/ hive.allow-drop-table=true hive.allow-rename-table=true hive.allow-add-column=true hive.allow-drop-column=true hive.allow-rename-column=true hive.non-managed-table-writes-enabled=true hive.s3select-pushdown.enabled=true hive.s3.aws-access-key=abc hive.s3.aws-secret-key=abc # should modify per s3-endpoint-url hive.s3.endpoint=http://rgw_ip:port #hive.s3.max-connections=1 #hive.s3select-pushdown.max-connections=1 hive.s3.connect-timeout=100s hive.s3.socket-timeout=100s hive.max-splits-per-second=10000 hive.max-split-size=128MB
启动 Trino 容器,以集成 Ceph 对象网关。
Example
[cephuser@host01 s3select]$ sudo docker compose -f ./container/trino/hms_trino.yaml up -d
验证集成。
Example
[cephuser@host01 s3select]$ sudo docker exec -it trino /bin/bash trino@66f753905e82:/$ trino trino> create schema hive.csvbkt1schema; trino> create table hive.csvbkt1schema.polariondatacsv(c1 varchar,c2 varchar, c3 varchar, c4 varchar, c5 varchar, c6 varchar, c7 varchar, c8 varchar, c9 varchar) WITH ( external_location = 's3a://csvbkt1/',format = 'CSV'); trino> select * from hive.csvbkt1schema.polariondatacsv;
注意外部位置必须指向存储桶名称或目录,而不是文件的末尾。