I am trying to get all job data from my Databricks. Basically, I need to put all job data into a DataFrame. There are more than 3000 jobs, so need to use the page_token to traverse all pages.
Here is the code:
from databricks.sdk import WorkspaceClient
# Replace 'your-databricks-host' and 'your-databricks-token' with your actual Databricks host and token
w = WorkspaceClient(
host='some host',
token='some token'
)
all_jobs = []
page_token = None
while True:
response = w.jobs.list(expand_tasks=False, page_token=page_token)
all_jobs.extend(response['value'])
page_token = response.get('next_page_token')
if not page_token:
break
jobs_df = spark.createDataFrame(all_jobs)
display(jobs_df)
I am getting this error:
TypeError: ‘generator’ object is not subscriptable
I have tried other attempts but getting all kinds of different errors.
Basically the job data will be json like in here: https://docs.databricks.com/api/workspace/jobs/list
3
I end up using databricks rest api. Actually two api.
‘/api/2.1/jobs/list’: to get job ids.
‘/api/2.1/jobs/get’: to get job detail by a parameter job_id
‘/api/2.1/jobs/list’ can also get job details, but the reason not to use it alone is it can’t get task elements, which has notebooks path etc. Anyway, i tried, but failed. That’s why I also use ‘/api/2.1/jobs/get’.
The code:
Get job ids: team_ids below is a group of creators (emails) I am interested. Can be removed to return all job ids.
import requests
# import pandas as pd
import math
import datetime
import json
from pyspark.sql.functions import col
base_uri = 'databricks uri'
token='databricks token'
endpoint = '/api/2.1/jobs/list'
headers = {'Authorization': f'Bearer {token}'}
params = {
"page_token": None
}
all_job_ids = []
while True:
response = requests.get(base_uri + endpoint, headers=headers, params=params)
response_json = response.json()
data = []
for job in response_json["jobs"]:
if job.get("creator_user_name") in team_ids:
data.append(job["job_id"])
all_job_ids.extend(data)
if response_json.get("has_more") == True:
next_page_token = response_json.get("next_page_token")
params['page_token'] = next_page_token
else:
break
print(len(all_job_ids))
Get job details by job_id:
import requests
import json
from pyspark.sql.functions import from_json, col, explode, schema_of_json, lit, from_unixtime
def get_job_details(job_id):
baseURI = 'databricks uri'
apiToken = 'databricks token'
params = {
"job_id": job_id
}
endpoint = '/api/2.1/jobs/get'
headers = {'Authorization': f'Bearer {apiToken}'}
response = requests.get(baseURI + endpoint, headers=headers, params=params)
# Check if the response is successful and not empty
if response.status_code == 200 and response.content:
response_json = response.json()
else:
raise ValueError(f"Failed to get job details: {response.status_code}, {response.text}")
# Convert the response_json to a JSON string
response_json_str = json.dumps(response_json)
# Get the schema from the JSON string
schema = schema_of_json(lit(response_json_str))
# Create a DataFrame from the JSON string
try:
result_df = (
spark.createDataFrame([response_json_str], "string")
.select(from_json(col("value"), schema).alias("data"))
.select(
"data.job_id",
col("data.settings.name").alias("job_name"),
"data.creator_user_name",
"data.created_time",
explode("data.settings.tasks").alias("tasks")
)
.select(
"job_id",
"job_name",
"creator_user_name",
from_unixtime(col("created_time") / 1000).alias("created_time"), #created_time is epoch milliseconds
col("tasks.task_key").alias("tasks"),
col("tasks.notebook_task.notebook_path").alias("notebook")
)
)
except Exception as error:
print (str(error))
# the response is not a valid JSON, so it's not a notebook type task, check if it is a JAR type task
if 'spark_jar_task' in str(error):
result_df = (
spark.createDataFrame([response_json_str], "string")
.select(from_json(col("value"), schema).alias("data"))
.select(
"data.job_id",
col("data.settings.name").alias("job_name"),
"data.creator_user_name",
"data.created_time",
explode("data.settings.tasks").alias("tasks")
)
.select(
"job_id",
"job_name",
"creator_user_name",
from_unixtime(col("created_time") / 1000).alias("created_time"), #created_time is epoch milliseconds
col("tasks.task_key").alias("tasks"),
col("tasks.spark_jar_task.main_class_name").alias("notebook")
)
)
return result_df
# get_job_details(15770568).display()
for each job id, get job details
from functools import reduce
from pyspark.sql import DataFrame
dfs = []
for jobid in all_job_ids:
print(jobid)
dfs.append(get_job_details(jobid))
final_df = reduce(DataFrame.unionAll, dfs)
final_df.display()
final_df.count()