Context
I have a job that generates a csv based on some data in the datalake of my company. This job is triggered once a day with some predefined configuration. This job is implemented using spark and python and executed in an Airflow pipeline.
The csv is later on uploaded to a particular customer.
Case
Now, we want to onboard additional customers (for now, just a new one, but ideally there would be multiple in the future).
I was thinking about setting each customer configuration and properties in a yml file and upload them in the job based on an input parameter which would define the customer name.
Problem
In order to generate some columns in the final csv, I would need the python code to resolve some conditions in order to get the proper value for each customer (the value would be defined in the configuration file for each customer).
However, it seems that Spark udfs can only handle column data and I can’t pass complex structures there.
Example
Let’s imagine that I have the following:
- Configuration file. For example, for
customer-A.yml
:
x_enabled: True
start_date_x: '01-01-2024'
y_enabled: False
start_date_y: '01-01-2022'
values:
x: 'value-x'
y: 'value-y'
and customer-B.yml
should contain the same attributes, but different values.
- Model. Ideally I would like to do something like the following (simple pseudocode example of the model):
class Customer:
@udf(returnType=StringType())
def calculate_value(self) -> str:
# some conditions and logic using the properties and values defined in the inherited classes.
class CustomerA(Customer):
x_enabled: ...
start_date_x: ...
y_enabled: ...
start_date_y: ...
values: ...
but it turns out I can’t as udfs need to be static methods.
Question
One option is to pass all the parameters converted using lit()
but I find this quite ugly and not scalable (in case my conditions get more complex). Is there any better approach?