I have an Airflow cluster on Kubernetes, maintained by the SRE team. The cluster uses the service account sa_project_a
, created from project_a
, to authenticate and access the GCP project. I need to implement a Python script to ingest data from BigQuery in project_a
and project_b
then write the data to a MySQL database hosted on Cloud SQL in project_b
.
The resource mapping
project_a
:
- sa_project_a
project_b
:
-
sa_project_b
-
Cloud SQL Instance
project_c
:
- Airflow Cluster on K8s
The sa_project_a
and sa_project_b
have the permission on project_b
BigQuery Data Viewer
BigQuery Job User
Cloud SQL Client
Secret Manager Secret Accessor
In the part of ingesting data from BigQuery, there were no permission issues.
def bigquery_to_df(self, query: str) -> None:
"""Query the data from BigQuery with SQL and convert to Pandas DataFrame
Args:
query (str): SQL query string
"""
client = bigquery.Client()
query_job = client.query(query)
rows = query_job.result()
data = [dict(row) for row in rows]
self.df = pd.DataFrame(data=data)
But, To connect to Cloud SQL, I am using the cloud-sql-python-connector Python library.
def create_connect(
self,
connection_name: str,
db_user: str,
db_pass: str,
db_name: str,
database: str,
lib: str,
) -> None:
"""Create the connect cursor to database in Cloud SQL
Args:
connection_name (str): Connection's name of Cloud SQL instance
db_user (str): User id
db_pass (str): Password
db_name (str): Database name on Cloud SQL instance
database (str, optional): Database software that host on Cloud SQL.
lib (str, optional): Database library to connect the database.
"""
ip_type = IPTypes.PRIVATE if os.environ.get("PRIVATE_IP") else IPTypes.PUBLIC
connector = Connector(ip_type)
def _get_conn() -> pymysql.connections.Connection:
"""Create the pymysql connection to database on Cloud SQL
Returns:
pymysql.connections.Connection: the pymysql connection
"""
conn = connector.connect(
instance_connection_string=connection_name,
driver=lib,
user=db_user,
password=db_pass,
db=db_name,
)
return conn
self.db_name = db_name
self._pool = sqlalchemy.create_engine(
f"{database}+{lib}://",
creator=_get_conn,
).connect()
logging.info(" --> Start the connection!")
When I attempt to create the connection, I encountered the following error:
aiohttp.client_exceptions.ClientResponseError: 403, message="Forbidden: Authenticated IAM principal does not seeem authorized to make API request. Verify 'Cloud SQL Admin API' is enabled within your GCP project and 'Cloud SQL Client' role has been granted to IAM principal.", url='https://sqladmin.googleapis.com/sql/v1beta4/projects/{project_id}/instances/{instand_id}/connectSettings'
Afterward, I enabled the Cloud SQL Admin API and added the Cloud SQL Client role to sa_project_a
. I then re-executed the pipeline, but the same error persisted.
To resolve the issue quickly, I generated a new service account, sa_project_b
, within project_b (the same project as the Cloud SQL instance) with the same roles as sa_project_a
. I stored the credentials in Secret Manager and retrieved them before creating the connection.
ip_type = IPTypes.PRIVATE if os.environ.get("PRIVATE_IP") else IPTypes.PUBLIC
client = secretmanager.SecretManagerServiceClient()
name = f"projects/{self.project_id}/secrets/{self.secret_id}/versions/latest"
request = client.access_secret_version(request={"name": name})
response = json.loads(request.payload.data.decode("UTF-8"))
credentials = service_account.Credentials.from_service_account_info(response)
connector = Connector(ip_type=ip_type, credentials=credentials)
And it was able to create the connection without any issues.
I suspect that sa_project_a
might not have permission to use the Cloud SQL Admin API, because I used the exact same script but only changed the service account, which was created in the same project as the Cloud SQL instance. sa_project_b
can automatically access the Cloud SQL Admin API because it was created in the same project.