as per the source code of configDef it should accept List object https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java#L97
the SMT config is defined as :enter code here
“transforms”: “EqualityCheck”,
“transforms.EqualityCheck.type”:”org.apache.kafka.connect.transforms.EqualityCheckOnFields$Value”,
“transforms.EqualityCheck.fields.notEquality”: “field1, field2”,
“transforms.EqualityCheck.fieldValues.notEquality”: [[9, 0, null, 4.8234, 9.9999],
[“pol”, “POC”, “”, null ]],
And it works when I test it in my local but when i actually deploy the connector config it throws me an error below:
{“error_code”:500,”message”:”Cannot deserialize value of type java.lang.String
from Array value (token JsonToken.START_ARRAY
)n at [Source: REDACTED (StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION
disabled); line: 35, column: 52] (through reference chain: java.util.LinkedHashMap[“transforms.EqualityCheck.fields.notEquality”])”}
the EqualityCheckOnFields Class:
public class EqualityCheckOnFields<R extends ConnectRecord<R>> implements Transformation<R> {
private static final Logger LOG = LoggerFactory.getLogger(EqualityCheckOnFields.class);
private static final String PURPOSE = "field validation";
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define("fields.notEquality", ConfigDef.Type.LIST, Collections.emptyList(), ConfigDef.Importance.MEDIUM, "List of fields for not-equality check")
.define("fieldValues.notEquality", ConfigDef.Type.LIST, Collections.emptyList(), ConfigDef.Importance.MEDIUM, "List of lists containing invalid values
....
@Override
public void configure(Map<String, ?> props) {
SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
notEqualityFields = config.getList("fields.notEquality");
notEqualityValues = parseValues(config.getList("fieldValues.notEquality"));
…
private List<List<?>> parseValues(List<?> values) {
List<List<?>> parsed = new ArrayList<>();
for (Object value : values) {
if (value instanceof String) {
// Parse String as JSON array
try {
parsed.add(new ObjectMapper(`enter code here`).readValue((String) value, List.class));
} catch (Exception e) {
throw new ConfigException("Invalid JSON format in list values: " + value, e);
}
} else if (value instanceof List) {
// If already a List, cast directly
parsed.add((List<?>) value);
} else {
throw new ConfigException("Unexpected value type in list: " + value.getClass().getName());
}
}
return parsed;
}