Hi I’m working on building Template pipeline using DataBricks workflows and Job compute. I have an requirement to add Jobname and few inputs parameters like market name and region as cluster tags to the job compute.
Is there any way we can associate dynamic values into cluster custom tags, like below scenario
I have one parameterized pipeline and one job cluster with custom tags Market Name & Jobname & region. The Job name is constant
If I run the job for India market then it should populate tag values as below
MarketName = India
Region = Asia Pacific
If I run the job for UK market then it should populate tag values as below
MarketName = United Kingdom
Region = Europe
If I run the job for Mexico market then it should populate tag values as below
MarketName = Mexico
Region = USA
could anyone suggest me something here.
1
You need to update the pipeline job settings every time, you cannot add those tags while the pipeline running.
So, create a new notebook and a job which updates the pipeline settings and trigger a run.
Here is the below code for updating and triggering the original pipeline.
import requests
import json
dbutils.widgets.text("MarketName", "default")
dbutils.widgets.text("Region", "default")
dbutils.widgets.text("Jobid", "436291271922693")
market_name = dbutils.widgets.get("MarketName")
region = dbutils.widgets.get("Region")
job_id = dbutils.widgets.get("Jobid")
print(f"Market Name: {market_name}")
print(f"Region: {region}")
print(f"Region: {job_id}")
databricks_instance = "https://<host>.azuredatabricks.net"
api_token = "dapi24........"
Configuring the required things.
new_tags = {
"MarketName": f"{market_name}",
"Region": f"{region}"
}
payload = {
"job_id":job_id,
"new_settings":{
"tags": new_tags
}
}
headers = {
"Authorization": f"Bearer {api_token}",
"Content-Type": "application/json"
}
url = f"{databricks_instance}/api/2.1/jobs/update"
response = requests.post(url, headers=headers, data=json.dumps(payload))
if response.status_code == 200:
print(f"Job updated with custom tags")
else:
print(f"Failed to update job: {response.status_code}")
print(response.json())
Updating the job, here I am setting the tags in jobs that is also propagated to newly created clusters on job execution.
payload = {
"job_id":job_id
}
headers = {
"Authorization": f"Bearer {api_token}",
"Content-Type": "application/json"
}
url = f"{databricks_instance}/api/2.1/jobs/run-now"
response = requests.post(url, headers=headers, data=json.dumps(payload))
if response.status_code == 200:
print(f"Job triggered")
print(response.json())
else:
print(f"Failed to trigger job: {response.status_code}")
print(response.json())
At last trigger new run.
Next, below is the code in the job which is triggered (Your main pipeline)
tags = spark.conf.get("spark.databricks.clusterUsageTags.clusterAllTags")
print(tags)
Output from the job update pipeline, make sure you are passing the required parameter.
and the tags are updated.