Here’s hoping someone can help me with this. I have a module with a unit test for Delta Tables setup as below. The test succeeds when executed individually, however the same test fails when executed as part of a test suite where I’m creating new SparkSessions
for testing other things.
I have a feeling, this is happening because the spark
fixture isn’t getting cleaned up between runs, so this is more of an issue with Pytest
. What am I doing wrong?
from mimesis import Field, Fieldset, Schema
from mimesis.enums import TimestampFormat
from mimesis.locales import Locale
from mimesis import Person
from mimesis import Address
from mimesis import Code
from mimesis import Numeric
from mimesis import Datetime
field = Field(Locale.EN, seed=0xFF)
fieldset = Fieldset(Locale.EN, seed=0xFF)
person = Person(Locale.EN)
address = Address(Locale.EN)
code = Code()
numeric = Numeric()
dt = Datetime()
@pytest.fixture(scope="module")
def spark():
spark, log, config = start_spark(
app_name="Test Write Delta",
jar_packages=["io.delta:delta-spark_2.12:3.1.0"],
spark_config={
"spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension",
"spark.sql.catalog.spark_catalog": "org.apache.spark.sql.delta.catalog.DeltaCatalog",
},
)
yield spark
spark.stop()
@pytest.fixture
def path():
filename = "some_random_filename"
yield f"/workspaces/tests/test_data/{file_name}.delta"
@pytest.fixture
yield StructType(
[
StructField("first_name", StringType(), True),
StructField("last_name", StringType(), True),
StructField("created_at", StringType(), True),
]
)
@pytest.fixture
def num_rows():
num_rows = 100
yield num_rows
@pytest.fixture
def data(num_rows):
# Mimesis Schema Definition.
def schema_definition():
return tuple(
[
person.first_name(),
person.last_name(),
dt.timestamp(fmt=TimestampFormat.ISO_8601),
]
)
schema = Schema(schema=schema_definition, iterations=num_rows)
yield schema.create()
@pytest.fixture
def row_count(data):
yield len(data)
def test_write_delta(spark, path, data, row_count, schema):
# Create the dataframe.
df_delta = spark.createDataFrame(data, schema)
# Write the dataframe as a delta.
write_delta(spark, df_delta, path, mode="overwrite")
# Read the delta file from disk.
df_delta_read = spark.read.format("delta").load(path)
# Assert dataframe count matches expected value.
assert df_delta_read.count() == row_count
# Assert datarame read back matches what was written to disk.
assertDataFrameEqual(df_delta_read, df_delta)
The error is:
answer = 'xro808', gateway_client = <py4j.clientserver.JavaClient object at 0x7f076ab65f10>, target_id = 'o807', name = 'save'
def get_return_value(answer, gateway_client, target_id=None, name=None):
"""Converts an answer received from the Java gateway into a Python object.
For example, string representation of integers are converted to Python
integer, string representation of objects are converted to JavaObject
instances, etc.
:param answer: the string returned by the Java gateway
:param gateway_client: the gateway client used to communicate with the Java
Gateway. Only necessary if the answer is a reference (e.g., object,
list, map)
:param target_id: the name of the object from which the answer comes from
(e.g., *object1* in `object1.hello()`). Optional.
:param name: the name of the member from which the answer comes from
(e.g., *hello* in `object1.hello()`). Optional.
"""
if is_error(answer)[0]:
if len(answer) > 1:
type = answer[1]
value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
if answer[1] == REFERENCE_TYPE:
> raise Py4JJavaError(
"An error occurred while calling {0}{1}{2}.n".
format(target_id, ".", name), value)
E py4j.protocol.Py4JJavaError: An error occurred while calling o807.save.
E : org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: delta. Please find packages at `https://spark.apache.org/third-party-projects.html`.
E at org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:724)
E at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647)
E at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:697)
E at org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:863)
E at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:257)
E at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240)
E at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
E at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
E at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
E at java.base/java.lang.reflect.Method.invoke(Method.java:568)
E at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
E at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
E at py4j.Gateway.invoke(Gateway.java:282)
E at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
E at py4j.commands.CallCommand.execute(CallCommand.java:79)
E at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
E at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
E at java.base/java.lang.Thread.run(Thread.java:840)
E Caused by: java.lang.ClassNotFoundException: delta.DefaultSource
E at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
E at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:592)
E at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
E at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:633)
E at scala.util.Try$.apply(Try.scala:213)
E at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:633)
E at scala.util.Failure.orElse(Try.scala:224)
E at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:633)
E ... 16 more
/opt/conda/envs/development/lib/python3.12/site-packages/py4j/protocol.py:326: Py4JJavaError
When I run the same test independently, I get the below output with pytest -s
==================================================== test session starts =====================================================
platform linux -- Python 3.12.3, pytest-8.0.1, pluggy-1.5.0
rootdir: /workspaces/cubie
plugins: mimesis-16.0.0
collected 1 item
cubie/tests/connectors/writers/test_write_delta.py :: loading settings :: url = jar:file:/opt/conda/envs/development/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
io.delta#delta-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-ebd7a4be-cd81-42f4-97c3-89c5d52c2c31;1.0
confs: [default]
found io.delta#delta-spark_2.12;3.1.0 in central
found io.delta#delta-storage;3.1.0 in central
found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 243ms :: artifacts dl 11ms
:: modules in use:
io.delta#delta-spark_2.12;3.1.0 from central in [default]
io.delta#delta-storage;3.1.0 from central in [default]
org.antlr#antlr4-runtime;4.9.3 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 3 | 0 | 0 | 0 || 3 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-ebd7a4be-cd81-42f4-97c3-89c5d52c2c31
confs: [default]
0 artifacts copied, 3 already retrieved (0kB/7ms)
24/05/26 15:15:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/26 15:15:24 WARN <Test Write Delta local-1716736523789>: no config file found
24/05/26 15:15:31 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
24/05/26 15:15:31 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
24/05/26 15:15:31 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
24/05/26 15:15:31 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
24/05/26 15:15:31 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 63.33% for 12 writers
24/05/26 15:15:32 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
24/05/26 15:15:32 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
24/05/26 15:15:32 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
24/05/26 15:15:32 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
24/05/26 15:15:35 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
.
===================================================== 1 passed in 26.65s =====================================================