After I notice the database connection url was wrong, I updated the url, the kafka project keep failing on run query.
I have 6 million of lines to send, but the connector fails every time on the exec query.
my config now
<code>{
"name": "connector",
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"topic.prefix": "audit",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.extractTopic.type": "org.apache.kafka.connect.transforms.ExtractTopic$Value",
"transforms": "header,createKey,extractKey,extractValue",
"transforms.extractValue.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.header.type": "com.kafka.MoveValueFieldsToHeader",
"transforms.createKey.fields": "message_key",
"transforms.extractKey.field": "message_key",
"transforms.extractValue.field": "message_value",
"transforms.extractTopic.field": "message_key",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"tasks.max": "1",
"value.converter.schema.registry.url": "<URL>",
"mode": "incrementing",
"incrementing.column.name": "id",
"query": "SELECT * FROM (Select id, i_databases, message_key, message_value, message_headers as headers, message_type from events_outbox) A",
"poll.interval.ms": "60000",
"table.poll.interval.ms": "50000",
"connection.user": "<USER>",
"connection.password": "<PASSWORD>",
"connection.url": "<CONNECTION>",
"batch.max.rows": "10"
}
</code>
<code>{
"name": "connector",
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"topic.prefix": "audit",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.extractTopic.type": "org.apache.kafka.connect.transforms.ExtractTopic$Value",
"transforms": "header,createKey,extractKey,extractValue",
"transforms.extractValue.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.header.type": "com.kafka.MoveValueFieldsToHeader",
"transforms.createKey.fields": "message_key",
"transforms.extractKey.field": "message_key",
"transforms.extractValue.field": "message_value",
"transforms.extractTopic.field": "message_key",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"tasks.max": "1",
"value.converter.schema.registry.url": "<URL>",
"mode": "incrementing",
"incrementing.column.name": "id",
"query": "SELECT * FROM (Select id, i_databases, message_key, message_value, message_headers as headers, message_type from events_outbox) A",
"poll.interval.ms": "60000",
"table.poll.interval.ms": "50000",
"connection.user": "<USER>",
"connection.password": "<PASSWORD>",
"connection.url": "<CONNECTION>",
"batch.max.rows": "10"
}
</code>
{
"name": "connector",
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"topic.prefix": "audit",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.extractTopic.type": "org.apache.kafka.connect.transforms.ExtractTopic$Value",
"transforms": "header,createKey,extractKey,extractValue",
"transforms.extractValue.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.header.type": "com.kafka.MoveValueFieldsToHeader",
"transforms.createKey.fields": "message_key",
"transforms.extractKey.field": "message_key",
"transforms.extractValue.field": "message_value",
"transforms.extractTopic.field": "message_key",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"tasks.max": "1",
"value.converter.schema.registry.url": "<URL>",
"mode": "incrementing",
"incrementing.column.name": "id",
"query": "SELECT * FROM (Select id, i_databases, message_key, message_value, message_headers as headers, message_type from events_outbox) A",
"poll.interval.ms": "60000",
"table.poll.interval.ms": "50000",
"connection.user": "<USER>",
"connection.password": "<PASSWORD>",
"connection.url": "<CONNECTION>",
"batch.max.rows": "10"
}
I tried change to bulk mode and change query but the the query on mode incrementing, added a where with bind at the end of query.
How i can handle if a slow query? i already increase the resources that was logging out of memory on my pod