As title says, I have a problem connecting to Google Big Query from AWS Glue job. Idea is that I pick up data from database and send it to google big query.
My script looks like this:
import os
import sys
import boto3
from google.auth.transport.requests import Request
from google.auth import impersonated_credentials
from google.auth import load_credentials_from_file
from google.cloud import bigquery
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
# AWS Glue Job Arguments
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# AWS S3 setup for credentials
bucket_name = 'aws-glue-assets-123123123-eu-central-1'
credentials_file_key = 'credentials/service-account-impersonation.json'
local_credentials_path = '/tmp/service-account-impersonation.json'
project_id = 'nonprod-project_id'
# Function to download credentials file from S3
def download_credentials():
print(f"Downloading credentials from bucket: {bucket_name}, key: {credentials_file_key}")
s3 = boto3.client('s3')
try:
s3.head_object(Bucket=bucket_name, Key=credentials_file_key)
s3.download_file(bucket_name, credentials_file_key, local_credentials_path)
print(f"Credentials file downloaded to {local_credentials_path}")
except boto3.exceptions.S3UploadFailedError as e:
print(f"Error downloading credentials file: {e}")
sys.exit(1)
# Download credentials
download_credentials()
credentials = load_credentials_from_file(local_credentials_path)
# Specify the target service account to impersonate
target_principal = '[email protected]'
target_scopes = ['https://www.googleapis.com/auth/cloud-platform']
# Create impersonated credentials
impersonated_creds = impersonated_credentials.Credentials(
source_credentials=credentials,
target_principal=target_principal,
target_scopes=target_scopes
)
# Refresh the impersonated credentials
request = Request()
impersonated_creds.refresh(request)
# Initialize the BigQuery client
# client = bigquery.Client(credentials=impersonated_creds, project_id)
# # Define your DynamicFrames
# def create_dynamic_frame(table_name):
# return glueContext.create_dynamic_frame.from_options(
# connection_type="mysql",
# connection_options={
# "useConnectionProperties": "true",
# "dbtable": table_name,
# "connectionName": "myapp nonprod DB",
# },
# transformation_ctx=f"{table_name}_dynamic_frame"
# )
# tables = ['changes', 'pull_requests', 'event_change', 'team_members']
# dynamic_frames = {table: create_dynamic_frame(table) for table in tables}
# # Drop duplicates
# def drop_duplicates(frame):
# return DynamicFrame.fromDF(frame.toDF().dropDuplicates(), glueContext, f"{frame.transformation_ctx}_drop_duplicates")
# dynamic_frames = {table: drop_duplicates(df) for table, df in dynamic_frames.items()}
# # Drop fields for specific tables
# drop_fields_frames = {
# 'pull_requests': DropFields.apply(frame=dynamic_frames['pull_requests'], paths=["organization_name", "comment_id"], transformation_ctx="pull_requests_drop_fields")
# }
# # Function to load data to BigQuery
# def load_to_bigquery(df, table_id):
# pandas_df = df.toDF().toPandas()
# job_config = bigquery.LoadJobConfig(write_disposition=bigquery.WriteDisposition.WRITE_APPEND)
# load_job = client.load_table_from_dataframe(pandas_df, table_id, job_config=job_config)
# load_job.result() # Wait for the job to complete
# print(f"Loaded {load_job.output_rows} rows into {table_id}.")
# # Write data to BigQuery
# for table, df in dynamic_frames.items():
# table_id = f'myapp.{table}'
# load_to_bigquery(df, table_id)
# for table, df in drop_fields_frames.items():
# table_id = f'myapp.{table}'
# load_to_bigquery(df, table_id)
job.commit()
I have set workload identity federation pool and add service account to it. I’m trying to impersonate service account. My credentials looks like this:
{
"type": "external_account",
"audience": "//iam.googleapis.com/projects/123455678/locations/global/workloadIdentityPools/mynonprodpool/providers/aws-devendi-nonprod",
"subject_token_type": "urn:ietf:params:aws:token-type:aws4_request",
"service_account_impersonation_url": "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/[email protected]:generateAccessToken",
"token_url": "https://sts.googleapis.com/v1/token",
"credential_source": {
"environment_id": "aws1",
"region_url": "http://169.254.169.254/latest/meta-data/placement/availability-zone",
"url": "http://169.254.169.254/latest/meta-data/iam/security-credentials",
"regional_cred_verification_url": "https://sts.{region}.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15"
}
}
But when I run the job I get :
Error Category: INVALID_ARGUMENT_ERROR; AttributeError: 'tuple' object has no attribute 'token_state'
Also, I have working solution with service account, but I need to replace it with Google Workload Identity federation.
Have anybody experienced similar issue?