diff --git a/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java b/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java index aface56d2..f71498a3a 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java +++ b/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java @@ -55,6 +55,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; public class JestElasticsearchClient implements ElasticsearchClient { @@ -89,6 +90,8 @@ public JestElasticsearchClient(String address) { try { JestClientFactory factory = new JestClientFactory(); factory.setHttpClientConfig(new HttpClientConfig.Builder(address) + .readTimeout(60000) // explicitly setting read timeout of 60s + .maxConnectionIdleTime(30000, TimeUnit.MILLISECONDS) // explicitly setting idle ttl of 30s .multiThreaded(true) .build() ); @@ -301,6 +304,15 @@ public BulkResponse executeBulk(BulkRequest bulk) throws IOException { } else if ("mapper_parse_exception".equals(errorType)) { retriable = false; errors.add(item.error); + } else if ("delete_failed_engine_exception".equals(errorType)) { + // swallow failed delete operations, assumed to be delete on non-existent ID + LOG.info( + "Swallowing error type {} for document ID {} and index {}", + errorType, + item.id, + item.index + ); + continue; } else { errors.add(item.error); }