I created a class to extend DataFrame functionality.
I would say like a wrapper of the dataframe.
This may bring something up, but allow me to get to it.
class TurboDF(DataFrame):
def __init__(self, *args):
df = self._init_args(*args)
DataFrame.__init__(self, df._jdf, df.sql_ctx)
self._df = df
self._turbo_df()
def _init_args(self, *args):
if isinstance(args[0], DataFrame):
return args[0]
(assert isinstance(args[0], SparkSession),
"First argument must be DataFrame or SparkSession")
a_spark = args[0]
if Δ.isDeltaTable(a_spark, args[1]):
return a_spark.read.format('delta').load(args[1])
# and other ways to read tables
There are two issues here:
- With newer
pyspark
versions, some of the elements that I used are no longer valid.
So I would like to correct them. - The second issue may be more phylosophical: since I may be “reinventing the wheel”, and therefore I could add the functionality in some other way.
Elements that have changed with newer pyspark
versions:
df.sql_ctx
changed todf.sparkSession
(or perhaps it was the other way around).
This was the first change, and I just caught it with atry...except
block.isinstance(args[0], SparkSession)
changed because I used to import it atspark.sql.SparkSession
and then atspark.sql.connection.session.SparkSession
.df._jdf
this is the newest error that mentions something like “the attribute is not supported in Spark Connect.”
And as they say, third time is the chime. So most likely I don’t need to be maintaing this extra functionality and instead I’d be better on moving to the second issue.
What is the way to go about this extra functionality?
Working on this functionality had gotten me to understanding a few things about Spark, and it feels like I may be able to collaborate better with the Spark project, instead of creating my own tiny project.
Let me take a step back and mention what is the extra functionality that I added to my TurboDF
:
- Build DataFrame from any of (pandasDF, deltaTable path, deltaTable in catalog).
- Allow
select_turbo
to read list of columns, or dictionary of{name: column}
. - Allow
filter_turbo
to read list of filters, and optimize them withand
instead of one by one. - Allow
with_column_turbo
to call a dictionary of{name: column}
or list of dictionaries as before where the columns hold some dependencies. - Allow
with_column_renamed_turbo
to rename several columns at once. - Allow
join_turbo
to use suffixes for the new common columns, and deal with common non-join ones. - Allow
save_as_file
(in Databricks) for when the DataFrame is not very long. Obviously a functionality issue. - Similar functionality in
GroupedData
with its correspondingTurboGrouped
.
In addition, after reading the newer documentation I did find that some of these implementations had already been carried out natively. And this is why I’m reaching out to the community, in order to be involved better.
Thank you for your support.