diff --git a/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java b/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java index aface56d2..62e739530 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java +++ b/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java @@ -55,6 +55,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; public class JestElasticsearchClient implements ElasticsearchClient { @@ -89,6 +90,8 @@ public JestElasticsearchClient(String address) { try { JestClientFactory factory = new JestClientFactory(); factory.setHttpClientConfig(new HttpClientConfig.Builder(address) + .readTimeout(60000) // explicitly setting read timeout of 60s + .maxConnectionIdleTime(30000, TimeUnit.MILLISECONDS) // explicitly setting idle ttl of 30s .multiThreaded(true) .build() ); @@ -280,6 +283,12 @@ private Index toIndexRequest(IndexableRecord record) { return req.build(); } + private Boolean swallowableErrorType(final String errorType) { + return "mapper_parse_exception".equals(errorType) + || "mapper_parsing_exception".equals(errorType) + || "delete_failed_engine_exception".equals(errorType); + } + public BulkResponse executeBulk(BulkRequest bulk) throws IOException { final BulkResult result = client.execute(((JestBulkRequest) bulk).getBulk()); @@ -298,9 +307,15 @@ public BulkResponse executeBulk(BulkRequest bulk) throws IOException { final String errorType = parsedError.get("type").asText(""); if ("version_conflict_engine_exception".equals(errorType)) { versionConflicts.add(new Key(item.index, item.type, item.id)); - } else if ("mapper_parse_exception".equals(errorType)) { - retriable = false; - errors.add(item.error); + } else if (swallowableErrorType(errorType)) { + // swallow failed delete and parse operations + LOG.info( + "Swallowing error type {} for document ID {} and index {}", + errorType, + item.id, + item.index + ); + continue; } else { errors.add(item.error); }