I’m trying to implement my custom expectation. I use: python, s3, spark, glue.
I described my custom expectation:
from great_expectations.expectations.expectation import TableExpectation
from great_expectations.core.expectation_configuration import ExpectationConfiguration
from great_expectations.execution_engine import ExecutionEngine
class ExpectUniqueIdsWithNullEnd(TableExpectation):
library_metadata = {
"maturity": "production",
"package": "custom",
"tags": ["id uniqueness", "business logic"],
"contributors": ["Your Name <[email protected]>"],
}
def _validate(
self,
configuration: ExpectationConfiguration,
data: DataFrame,
runtime_configuration: dict = None,
execution_engine: ExecutionEngine = None,
metrics: dict = None,
):
column = configuration.kwargs.get("column", "id")
ended_at_column = configuration.kwargs.get("ended_at_column", "ended_at")
print("Hello")
print(column)
print(ended_at_column)
filtered_data = data.filter(F.col(ended_at_column).isNull())
id_counts = filtered_data.groupBy(column).count()
non_unique_ids = id_counts.filter(F.col("count") > 1).select(column)
if non_unique_ids.count() > 0:
return {
"success": False,
"unexpected_list": non_unique_ids.collect()
}
else:
return {
"success": True
}
from great_expectations.expectations.registry import register_expectation
register_expectation(ExpectUniqueIdsWithNullEnd)
from great_expectations.core.expectation_configuration import ExpectationConfiguration
expectation_config = ExpectationConfiguration(
expectation_type="expect_unique_ids_with_null_end",
kwargs={
"column": "id",
"ended_at_column": "ended_at"
}
)
Next, I try to call it using a batch request and a validator
batch_request = RuntimeBatchRequest(
datasource_name="spark_s3",
data_asset_name="data_asset_name",
batch_identifiers={"default_identifier_name": "default_identifier_name"},
data_connector_name="default_runtime_data_connector_name",
runtime_parameters={"path": "table_path"},
batch_spec_passthrough={"reader_method": "delta", "reader_options": {"header": True}},
)
validator = context_gx.get_validator(
batch_request=batch_request,
expectation_suite_name=expectation_suite_name
)
# Додавання до suite
expectation_suite_name = "haistruk_test_suite"
suite = context_gx.get_expectation_suite(expectation_suite_name)
suite.add_expectation(expectation_config)
context_gx.save_expectation_suite(suite, expectation_suite_name)
results = validator.validate()
print("Results of the custom expectation:", results)
but i am getting the following error:
line 1100, in metrics_validaten ] = self._validate(nTypeError: ExpectUniqueIdsWithNullEnd._validate() missing 1 required positional argument: ‘data’n”
Can’t find the problem tried everything please help
New contributor
Fenrir is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.