附录 H. 演示会话标签使用的代码示例
以下是标记角色、存储桶或对象的示例代码,并在角色信任和角色权限策略中使用标签键。
注意
该示例假定标签 Department=Engineering 由 Keycloak 在 JSON Web Token (JWT)访问令牌中传递。
-*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
import boto3
import json
from nose.tools import eq_ as eq
access_key = 'TESTER'
secret_key = 'test123'
endpoint = 'http://s3.us-east.localhost:8000'
s3client = boto3.client('s3',
aws_access_key_id = access_key,
aws_secret_access_key = secret_key,
endpoint_url = endpoint,
region_name='',)
s3res = boto3.resource('s3',
aws_access_key_id = access_key,
aws_secret_access_key = secret_key,
endpoint_url = endpoint,
region_name='',)
iam_client = boto3.client('iam',
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
endpoint_url=endpoint,
region_name=''
)
bucket_name = 'test-bucket'
s3bucket = s3client.create_bucket(Bucket=bucket_name)
bucket_tagging = s3res.BucketTagging(bucket_name)
Set_Tag = bucket_tagging.put(Tagging={'TagSet':[{'Key':'Department', 'Value': 'Engineering'}]})
try:
response = iam_client.create_open_id_connect_provider(
Url='http://localhost:8080/auth/realms/quickstart',
ClientIDList=[
'app-profile-jsp',
'app-jee-jsp'
],
ThumbprintList=[
'F7D7B3515DD0D319DD219A43A9EA727AD6065287'
]
)
except ClientError as e:
print ("Provider already exists")
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\"arn:aws:iam:::oidc-provider/localhost:8080/auth/realms/quickstart\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:RequestTag/Department\":\"${iam:ResourceTag/Department}\"}}}]}"
role_response = ""
print ("\n Getting Role \n")
try:
role_response = iam_client.get_role(
RoleName='S3Access'
)
print (role_response)
except ClientError as e:
if e.response['Code'] == 'NoSuchEntity':
print ("\n Creating Role \n")
tags_list = [
{'Key':'Department','Value':'Engineering'},
]
role_response = iam_client.create_role(
AssumeRolePolicyDocument=policy_document,
Path='/',
RoleName='S3Access',
Tags=tags_list,
)
print (role_response)
else:
print("Unexpected error: %s" % e)
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"StringEquals\":{\"s3:ResourceTag/Department\":[\"${aws:PrincipalTag/Department}\"]}}}}"
response = iam_client.put_role_policy(
RoleName='S3Access',
PolicyName='Policy1',
PolicyDocument=role_policy
)
sts_client = boto3.client('sts',
aws_access_key_id='abc',
aws_secret_access_key='def',
endpoint_url = endpoint,
region_name = '',
)
print ("\n Assuming Role with Web Identity\n")
response = sts_client.assume_role_with_web_identity(
RoleArn=role_response['Role']['Arn'],
RoleSessionName='Bob',
DurationSeconds=900,
WebIdentityToken='<web-token>')
s3client2 = boto3.client('s3',
aws_access_key_id = response['Credentials']['AccessKeyId'],
aws_secret_access_key = response['Credentials']['SecretAccessKey'],
aws_session_token = response['Credentials']['SessionToken'],
endpoint_url='http://s3.us-east.localhost:8000',
region_name='',)
bucket_body = 'this is a test file'
tags = 'Department=Engineering'
key = "test-1.txt"
s3_put_obj = s3client2.put_object(Body=bucket_body, Bucket=bucket_name, Key=key, Tagging=tags)
eq(s3_put_obj['ResponseMetadata']['HTTPStatusCode'],200)
s3_get_obj = s3client2.get_object(Bucket=bucket_name, Key=key)
eq(s3_get_obj['ResponseMetadata']['HTTPStatusCode'],200)