Delta Table Write – Unit Testing with Pytest

Here’s hoping someone can help me with this. I have a module with a unit test for Delta Tables setup as below. The test succeeds when executed individually, however the same test fails when executed as part of a test suite where I’m creating new SparkSessions for testing other things.

I have a feeling, this is happening because the spark fixture isn’t getting cleaned up between runs, so this is more of an issue with Pytest. What am I doing wrong?


from mimesis import Field, Fieldset, Schema
from mimesis.enums import TimestampFormat
from mimesis.locales import Locale
from mimesis import Person
from mimesis import Address
from mimesis import Code
from mimesis import Numeric
from mimesis import Datetime

field = Field(Locale.EN, seed=0xFF)
fieldset = Fieldset(Locale.EN, seed=0xFF)
person = Person(Locale.EN)
address = Address(Locale.EN)
code = Code()
numeric = Numeric()
dt = Datetime()


@pytest.fixture(scope="module")
def spark():
    spark, log, config = start_spark(
        app_name="Test Write Delta",
        jar_packages=["io.delta:delta-spark_2.12:3.1.0"],
        spark_config={
            "spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension",
            "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.delta.catalog.DeltaCatalog",
        },
    )
    yield spark
    spark.stop()

@pytest.fixture
def path():
    filename = "some_random_filename"
    yield f"/workspaces/tests/test_data/{file_name}.delta"

@pytest.fixture
    yield StructType(
        [
            StructField("first_name", StringType(), True),
            StructField("last_name", StringType(), True),
            StructField("created_at", StringType(), True),
        ]
    )

@pytest.fixture
def num_rows():
    num_rows = 100
    yield num_rows

@pytest.fixture
def data(num_rows):
    # Mimesis Schema Definition.
    def schema_definition():
        return tuple(
            [
                person.first_name(),
                person.last_name(),
                dt.timestamp(fmt=TimestampFormat.ISO_8601),
            ]
        )
    schema = Schema(schema=schema_definition, iterations=num_rows)
    yield schema.create()

@pytest.fixture
def row_count(data):
    yield len(data)


def test_write_delta(spark, path, data, row_count, schema):
    # Create the dataframe.
    df_delta = spark.createDataFrame(data, schema)

    # Write the dataframe as a delta.
    write_delta(spark, df_delta, path, mode="overwrite")


    # Read the delta file from disk.
    df_delta_read = spark.read.format("delta").load(path)

    # Assert dataframe count matches expected value.
    assert df_delta_read.count() == row_count

    # Assert datarame read back matches what was written to disk.
    assertDataFrameEqual(df_delta_read, df_delta)    

The error is:


answer = 'xro808', gateway_client = <py4j.clientserver.JavaClient object at 0x7f076ab65f10>, target_id = 'o807', name = 'save'

    def get_return_value(answer, gateway_client, target_id=None, name=None):
        """Converts an answer received from the Java gateway into a Python object.
    
        For example, string representation of integers are converted to Python
        integer, string representation of objects are converted to JavaObject
        instances, etc.
    
        :param answer: the string returned by the Java gateway
        :param gateway_client: the gateway client used to communicate with the Java
            Gateway. Only necessary if the answer is a reference (e.g., object,
            list, map)
        :param target_id: the name of the object from which the answer comes from
            (e.g., *object1* in `object1.hello()`). Optional.
        :param name: the name of the member from which the answer comes from
            (e.g., *hello* in `object1.hello()`). Optional.
        """
        if is_error(answer)[0]:
            if len(answer) > 1:
                type = answer[1]
                value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
                if answer[1] == REFERENCE_TYPE:
>                   raise Py4JJavaError(
                        "An error occurred while calling {0}{1}{2}.n".
                        format(target_id, ".", name), value)
E                   py4j.protocol.Py4JJavaError: An error occurred while calling o807.save.
E                   : org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: delta. Please find packages at `https://spark.apache.org/third-party-projects.html`.
E                       at org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:724)
E                       at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647)
E                       at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:697)
E                       at org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:863)
E                       at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:257)
E                       at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240)
E                       at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
E                       at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
E                       at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
E                       at java.base/java.lang.reflect.Method.invoke(Method.java:568)
E                       at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
E                       at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
E                       at py4j.Gateway.invoke(Gateway.java:282)
E                       at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
E                       at py4j.commands.CallCommand.execute(CallCommand.java:79)
E                       at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
E                       at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
E                       at java.base/java.lang.Thread.run(Thread.java:840)
E                   Caused by: java.lang.ClassNotFoundException: delta.DefaultSource
E                       at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
E                       at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:592)
E                       at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
E                       at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:633)
E                       at scala.util.Try$.apply(Try.scala:213)
E                       at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:633)
E                       at scala.util.Failure.orElse(Try.scala:224)
E                       at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:633)
E                       ... 16 more

/opt/conda/envs/development/lib/python3.12/site-packages/py4j/protocol.py:326: Py4JJavaError

When I run the same test independently, I get the below output with pytest -s

==================================================== test session starts =====================================================
platform linux -- Python 3.12.3, pytest-8.0.1, pluggy-1.5.0
rootdir: /workspaces/cubie
plugins: mimesis-16.0.0
collected 1 item                                                                                                             

cubie/tests/connectors/writers/test_write_delta.py :: loading settings :: url = jar:file:/opt/conda/envs/development/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
io.delta#delta-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-ebd7a4be-cd81-42f4-97c3-89c5d52c2c31;1.0
        confs: [default]
        found io.delta#delta-spark_2.12;3.1.0 in central
        found io.delta#delta-storage;3.1.0 in central
        found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 243ms :: artifacts dl 11ms
        :: modules in use:
        io.delta#delta-spark_2.12;3.1.0 from central in [default]
        io.delta#delta-storage;3.1.0 from central in [default]
        org.antlr#antlr4-runtime;4.9.3 from central in [default]
        ---------------------------------------------------------------------
        |                  |            modules            ||   artifacts   |
        |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
        ---------------------------------------------------------------------
        |      default     |   3   |   0   |   0   |   0   ||   3   |   0   |
        ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-ebd7a4be-cd81-42f4-97c3-89c5d52c2c31
        confs: [default]
        0 artifacts copied, 3 already retrieved (0kB/7ms)
24/05/26 15:15:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/26 15:15:24 WARN <Test Write Delta local-1716736523789>: no config file found
24/05/26 15:15:31 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
24/05/26 15:15:31 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
24/05/26 15:15:31 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
24/05/26 15:15:31 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
24/05/26 15:15:31 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 63.33% for 12 writers
24/05/26 15:15:32 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
24/05/26 15:15:32 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
24/05/26 15:15:32 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
24/05/26 15:15:32 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
24/05/26 15:15:35 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
.                                                                               

===================================================== 1 passed in 26.65s =====================================================

Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa Dịch vụ tổ chức sự kiện 5 sao Thông tin về chúng tôi Dịch vụ sinh nhật bé trai Dịch vụ sinh nhật bé gái Sự kiện trọn gói Các tiết mục giải trí Dịch vụ bổ trợ Tiệc cưới sang trọng Dịch vụ khai trương Tư vấn tổ chức sự kiện Hình ảnh sự kiện Cập nhật tin tức Liên hệ ngay Thuê chú hề chuyên nghiệp Tiệc tất niên cho công ty Trang trí tiệc cuối năm Tiệc tất niên độc đáo Sinh nhật bé Hải Đăng Sinh nhật đáng yêu bé Khánh Vân Sinh nhật sang trọng Bích Ngân Tiệc sinh nhật bé Thanh Trang Dịch vụ ông già Noel Xiếc thú vui nhộn Biểu diễn xiếc quay đĩa Dịch vụ tổ chức tiệc uy tín Khám phá dịch vụ của chúng tôi Tiệc sinh nhật cho bé trai Trang trí tiệc cho bé gái Gói sự kiện chuyên nghiệp Chương trình giải trí hấp dẫn Dịch vụ hỗ trợ sự kiện Trang trí tiệc cưới đẹp Khởi đầu thành công với khai trương Chuyên gia tư vấn sự kiện Xem ảnh các sự kiện đẹp Tin mới về sự kiện Kết nối với đội ngũ chuyên gia Chú hề vui nhộn cho tiệc sinh nhật Ý tưởng tiệc cuối năm Tất niên độc đáo Trang trí tiệc hiện đại Tổ chức sinh nhật cho Hải Đăng Sinh nhật độc quyền Khánh Vân Phong cách tiệc Bích Ngân Trang trí tiệc bé Thanh Trang Thuê dịch vụ ông già Noel chuyên nghiệp Xem xiếc khỉ đặc sắc Xiếc quay đĩa thú vị
Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa
Thiết kế website Thiết kế website Thiết kế website Cách kháng tài khoản quảng cáo Mua bán Fanpage Facebook Dịch vụ SEO Tổ chức sinh nhật