Appendix H. Sample code demonstrating usage of session tags
The following is a sample code for tagging a role, bucket, or an object and using tag keys in a role trust and role permission policy.
Note
The example assumes that a tag Department=Engineering
is passed in the JSON Web Token (JWT) access token by Keycloak.
# -*- coding: utf-8 -*- import boto3 import json from nose.tools import eq_ as eq access_key = 'TESTER' secret_key = 'test123' endpoint = 'http://s3.us-east.localhost:8000' s3client = boto3.client('s3', aws_access_key_id = access_key, aws_secret_access_key = secret_key, endpoint_url = endpoint, region_name='',) s3res = boto3.resource('s3', aws_access_key_id = access_key, aws_secret_access_key = secret_key, endpoint_url = endpoint, region_name='',) iam_client = boto3.client('iam', aws_access_key_id=access_key, aws_secret_access_key=secret_key, endpoint_url=endpoint, region_name='' ) bucket_name = 'test-bucket' s3bucket = s3client.create_bucket(Bucket=bucket_name) bucket_tagging = s3res.BucketTagging(bucket_name) Set_Tag = bucket_tagging.put(Tagging={'TagSet':[{'Key':'Department', 'Value': 'Engineering'}]}) try: response = iam_client.create_open_id_connect_provider( Url='http://localhost:8080/auth/realms/quickstart', ClientIDList=[ 'app-profile-jsp', 'app-jee-jsp' ], ThumbprintList=[ 'F7D7B3515DD0D319DD219A43A9EA727AD6065287' ] ) except ClientError as e: print ("Provider already exists") policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\"arn:aws:iam:::oidc-provider/localhost:8080/auth/realms/quickstart\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:RequestTag/Department\":\"${iam:ResourceTag/Department}\"}}}]}" role_response = "" print ("\n Getting Role \n") try: role_response = iam_client.get_role( RoleName='S3Access' ) print (role_response) except ClientError as e: if e.response['Code'] == 'NoSuchEntity': print ("\n Creating Role \n") tags_list = [ {'Key':'Department','Value':'Engineering'}, ] role_response = iam_client.create_role( AssumeRolePolicyDocument=policy_document, Path='/', RoleName='S3Access', Tags=tags_list, ) print (role_response) else: print("Unexpected error: %s" % e) role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"StringEquals\":{\"s3:ResourceTag/Department\":[\"${aws:PrincipalTag/Department}\"]}}}}" response = iam_client.put_role_policy( RoleName='S3Access', PolicyName='Policy1', PolicyDocument=role_policy ) sts_client = boto3.client('sts', aws_access_key_id='abc', aws_secret_access_key='def', endpoint_url = endpoint, region_name = '', ) print ("\n Assuming Role with Web Identity\n") response = sts_client.assume_role_with_web_identity( RoleArn=role_response['Role']['Arn'], RoleSessionName='Bob', DurationSeconds=900, WebIdentityToken='<web-token>') s3client2 = boto3.client('s3', aws_access_key_id = response['Credentials']['AccessKeyId'], aws_secret_access_key = response['Credentials']['SecretAccessKey'], aws_session_token = response['Credentials']['SessionToken'], endpoint_url='http://s3.us-east.localhost:8000', region_name='',) bucket_body = 'this is a test file' tags = 'Department=Engineering' key = "test-1.txt" s3_put_obj = s3client2.put_object(Body=bucket_body, Bucket=bucket_name, Key=key, Tagging=tags) eq(s3_put_obj['ResponseMetadata']['HTTPStatusCode'],200) s3_get_obj = s3client2.get_object(Bucket=bucket_name, Key=key) eq(s3_get_obj['ResponseMetadata']['HTTPStatusCode'],200)