I am running the Confluent BQ sink connector both in AWS MSK, and in Docker running Kafka Connect in distributed mode.
In distributed mode on my machine, the connector will run fine. It creates a new table for each topic without any problems, and will write to existing tables as well.
But when I run the exact same connector (same config) in AWS MSK, I get an error:
[Worker-038a2275014ff9b9b] org.apache.kafka.connect.errors.RetriableException: Failed to retrieve information for table GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=dataset_name, tableId=new-topic-name}}
[Worker-038a2275014ff9b9b] at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.retrieveTable(BigQuerySinkTask.java:405)
[Worker-038a2275014ff9b9b] at java.base/java.util.HashMap.computeIfAbsent(HashMap.java:1134)
[Worker-038a2275014ff9b9b] at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.retrieveCachedTable(BigQuerySinkTask.java:390)
[Worker-038a2275014ff9b9b] at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.getRecordTable(BigQuerySinkTask.java:235)
[Worker-038a2275014ff9b9b] at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.writeSinkRecords(BigQuerySinkTask.java:268)
[Worker-038a2275014ff9b9b] at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.put(BigQuerySinkTask.java:321)
[Worker-038a2275014ff9b9b] at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
[Worker-038a2275014ff9b9b] at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
[Worker-038a2275014ff9b9b] at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
[Worker-038a2275014ff9b9b] at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
[Worker-038a2275014ff9b9b] at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
[Worker-038a2275014ff9b9b] at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)
[Worker-038a2275014ff9b9b] at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
[Worker-038a2275014ff9b9b] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[Worker-038a2275014ff9b9b] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
[Worker-038a2275014ff9b9b] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
[Worker-038a2275014ff9b9b] at java.base/java.lang.Thread.run(Thread.java:829)
[Worker-038a2275014ff9b9b] Caused by: com.google.cloud.bigquery.BigQueryException: Error getting access token for service account: connect timed out
[Worker-038a2275014ff9b9b] at com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.translate(HttpBigQueryRpc.java:115)
[Worker-038a2275014ff9b9b] at com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.getTable(HttpBigQueryRpc.java:286)
[Worker-038a2275014ff9b9b] at com.google.cloud.bigquery.BigQueryImpl$18.call(BigQueryImpl.java:761)
[Worker-038a2275014ff9b9b] at com.google.cloud.bigquery.BigQueryImpl$18.call(BigQueryImpl.java:758)
[Worker-038a2275014ff9b9b] at com.google.api.gax.retrying.DirectRetryingExecutor.submit(DirectRetryingExecutor.java:103)
[Worker-038a2275014ff9b9b] at com.google.cloud.RetryHelper.run(RetryHelper.java:76)
[Worker-038a2275014ff9b9b] at com.google.cloud.RetryHelper.runWithRetries(RetryHelper.java:50)
[Worker-038a2275014ff9b9b] at com.google.cloud.bigquery.BigQueryImpl.getTable(BigQueryImpl.java:757)
[Worker-038a2275014ff9b9b] at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.retrieveTable(BigQuerySinkTask.java:395)
[Worker-038a2275014ff9b9b] ... 16 more
[Worker-038a2275014ff9b9b] Caused by: java.io.IOException: Error getting access token for service account: connect timed out
[Worker-038a2275014ff9b9b] at com.google.auth.oauth2.ServiceAccountCredentials.refreshAccessToken(ServiceAccountCredentials.java:444)
[Worker-038a2275014ff9b9b] at com.google.auth.oauth2.OAuth2Credentials.refresh(OAuth2Credentials.java:157)
[Worker-038a2275014ff9b9b] at com.google.auth.oauth2.OAuth2Credentials.getRequestMetadata(OAuth2Credentials.java:145)
[Worker-038a2275014ff9b9b] at com.google.auth.oauth2.ServiceAccountCredentials.getRequestMetadata(ServiceAccountCredentials.java:603)
[Worker-038a2275014ff9b9b] at com.google.auth.http.HttpCredentialsAdapter.initialize(HttpCredentialsAdapter.java:91)
[Worker-038a2275014ff9b9b] at com.google.cloud.http.HttpTransportOptions$1.initialize(HttpTransportOptions.java:159)
[Worker-038a2275014ff9b9b] at com.google.api.client.http.HttpRequestFactory.buildRequest(HttpRequestFactory.java:91)
[Worker-038a2275014ff9b9b] at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.buildHttpRequest(AbstractGoogleClientRequest.java:404)
[Worker-038a2275014ff9b9b] at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:514)
[Worker-038a2275014ff9b9b] at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:455)
[Worker-038a2275014ff9b9b] at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:565)
[Worker-038a2275014ff9b9b] at com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.getTable(HttpBigQueryRpc.java:284)
[Worker-038a2275014ff9b9b] ... 23 more
[Worker-038a2275014ff9b9b] Caused by: java.net.SocketTimeoutException: connect timed out
[Worker-038a2275014ff9b9b] at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
[Worker-038a2275014ff9b9b] at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)
[Worker-038a2275014ff9b9b] at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)
[Worker-038a2275014ff9b9b] at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)
[Worker-038a2275014ff9b9b] at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
[Worker-038a2275014ff9b9b] at java.base/java.net.Socket.connect(Socket.java:609)
[Worker-038a2275014ff9b9b] at java.base/sun.security.ssl.SSLSocketImpl.connect(SSLSocketImpl.java:305)
[Worker-038a2275014ff9b9b] at java.base/sun.net.NetworkClient.doConnect(NetworkClient.java:177)
[Worker-038a2275014ff9b9b] at java.base/sun.net.www.http.HttpClient.openServer(HttpClient.java:509)
[Worker-038a2275014ff9b9b] at java.base/sun.net.www.http.HttpClient.openServer(HttpClient.java:604)
[Worker-038a2275014ff9b9b] at java.base/sun.net.www.protocol.https.HttpsClient.<init>(HttpsClient.java:266)
[Worker-038a2275014ff9b9b] at java.base/sun.net.www.protocol.https.HttpsClient.New(HttpsClient.java:373)
[Worker-038a2275014ff9b9b] at java.base/sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.getNewHttpClient(AbstractDelegateHttpsURLConnection.java:207)
[Worker-038a2275014ff9b9b] at java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1187)
[Worker-038a2275014ff9b9b] at java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1081)
[Worker-038a2275014ff9b9b] at java.base/sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:193)
[Worker-038a2275014ff9b9b] at java.base/sun.net.www.protocol.http.HttpURLConnection.getOutputStream0(HttpURLConnection.java:1367)
[Worker-038a2275014ff9b9b] at java.base/sun.net.www.protocol.http.HttpURLConnection.getOutputStream(HttpURLConnection.java:1342)
[Worker-038a2275014ff9b9b] at java.base/sun.net.www.protocol.https.HttpsURLConnectionImpl.getOutputStream(HttpsURLConnectionImpl.java:246)
[Worker-038a2275014ff9b9b] at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:113)
[Worker-038a2275014ff9b9b] at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:84)
[Worker-038a2275014ff9b9b] at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1012)
[Worker-038a2275014ff9b9b] at com.google.auth.oauth2.ServiceAccountCredentials.refreshAccessToken(ServiceAccountCredentials.java:441)
[Worker-038a2275014ff9b9b] ... 34 more
Here is my connector config:
{
"name": "connector-name",
"config": {
"connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
"allowNewBigQueryFields": "true",
"allowBigQueryRequiredFieldRelaxation": "true",
"autoCreateTables": "true",
"tasks.max": "2",
"topics": "existing-topic-name, new-topic-name",
"project": "gcp-project-name",
"defaultDataset": "dataset_name",
"keyfile": "/etc/kafka/secrets/gcp-cred.key",
"schemaRetriever": "com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "<my schema registry URL goes here>",
"value.converter.enhanced.avro.schema.support": "true",
"schema.registry.url": "<my schema registry URL goes here>",
"offsets.retention.minutes": 2880,
"auto.offset.reset": "earliest",
"enable.auto.commit": "true",
"isolation.level": "read_committed"
}
}