I have a UDF function in spark job which calls a API to get the data for each city and state to get postcode as a result is taking forever to complete the job. please suggest alternative for this code.
Code:
# Function to fetch zip code
def fetch(city, state):
url = f"http://api.zippopotam.us/us/{state}/{city.replace(' ', '%20')}"
response = requests.get(url)
if response.status_code == 200:
data = response.json()
return data['places'][0]['post code'] if 'places' in data else None
return None
# Register the UDF
fetchUDF = spark.udf.register("fetch", fetch, StringType())
# Load the dataframes
citiesDF = spark.read.parquet(citiesPath).select("city_id", "city", "state_abv")
trxDF = spark.read.parquet(trxPath)
# Perform the join and transformation
finalDF = (trxDF.join(broadcast(citiesDF), "city_id")
.filter(col("state_abv").isNotNull())
.withColumn("zip_code", fetchUDF(col("city"), col("state_abv"))))
# Write the result to disk partitioned by zip_code
finalDF.write.mode("overwrite").partitionBy("zip_code").parquet(finalPath)
i wanted to optimize the code to improve the code performance`