From 56fe3d66a282225769a42433a4bb9c5414485e7f Mon Sep 17 00:00:00 2001 From: Eric Douglas Date: Wed, 9 May 2018 12:07:53 -0700 Subject: [PATCH 1/6] attempting to swallow failed deletes, setting explicit timeout of 30s on jest client --- .../connect/elasticsearch/jest/JestElasticsearchClient.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java b/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java index aface56d2..31bb27c6d 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java +++ b/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java @@ -89,6 +89,7 @@ public JestElasticsearchClient(String address) { try { JestClientFactory factory = new JestClientFactory(); factory.setHttpClientConfig(new HttpClientConfig.Builder(address) + .readTimeout(30000) // explicitly setting timeout of 30s .multiThreaded(true) .build() ); @@ -301,6 +302,10 @@ public BulkResponse executeBulk(BulkRequest bulk) throws IOException { } else if ("mapper_parse_exception".equals(errorType)) { retriable = false; errors.add(item.error); + } else if ("delete_failed_engine_exception".equals(errorType)) { + // swallow failed delete operations, assumed to be delete on non-existent ID + LOG.info("Swallowing error type {} for document ID {} and index {}", errorType, item.id, item.index); + continue; } else { errors.add(item.error); } From 903cc981def8a998236ba69686f91f1da127cc7a Mon Sep 17 00:00:00 2001 From: Adam Vondersaar Date: Wed, 9 May 2018 14:34:15 -0700 Subject: [PATCH 2/6] Updated formatting to pass checktype --- .../elasticsearch/jest/JestElasticsearchClient.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java b/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java index 31bb27c6d..47aa9f2e5 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java +++ b/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java @@ -304,7 +304,12 @@ public BulkResponse executeBulk(BulkRequest bulk) throws IOException { errors.add(item.error); } else if ("delete_failed_engine_exception".equals(errorType)) { // swallow failed delete operations, assumed to be delete on non-existent ID - LOG.info("Swallowing error type {} for document ID {} and index {}", errorType, item.id, item.index); + LOG.info( + "Swallowing error type {} for document ID {} and index {}", + errorType, + item.id, + item.index + ); continue; } else { errors.add(item.error); From 0bc4cc7b28439dd521b1660335076de051f6157c Mon Sep 17 00:00:00 2001 From: Eric Douglas Date: Wed, 13 Jun 2018 14:55:58 -0700 Subject: [PATCH 3/6] updated read timeout on jest client from 30s to 60s --- .../connect/elasticsearch/jest/JestElasticsearchClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java b/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java index 47aa9f2e5..1ac4a3596 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java +++ b/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java @@ -89,7 +89,7 @@ public JestElasticsearchClient(String address) { try { JestClientFactory factory = new JestClientFactory(); factory.setHttpClientConfig(new HttpClientConfig.Builder(address) - .readTimeout(30000) // explicitly setting timeout of 30s + .readTimeout(60000) // explicitly setting timeout of 60s .multiThreaded(true) .build() ); From 945d6829f4982063f6cbf5b6e5558a3b79c8e34d Mon Sep 17 00:00:00 2001 From: Eric Douglas Date: Wed, 13 Jun 2018 15:10:26 -0700 Subject: [PATCH 4/6] added max connection idle timeout of 30s to jest client --- .../connect/elasticsearch/jest/JestElasticsearchClient.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java b/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java index 1ac4a3596..e4c0ce3cc 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java +++ b/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java @@ -89,7 +89,8 @@ public JestElasticsearchClient(String address) { try { JestClientFactory factory = new JestClientFactory(); factory.setHttpClientConfig(new HttpClientConfig.Builder(address) - .readTimeout(60000) // explicitly setting timeout of 60s + .readTimeout(60000) // explicitly setting read timeout of 60s + .maxConnectionIdleTime(30000) // explicitly setting idle ttl of 30s .multiThreaded(true) .build() ); From 246b6fd63b6bc0755f4663d850132805486c3378 Mon Sep 17 00:00:00 2001 From: Adam Vondersaar Date: Wed, 13 Jun 2018 15:20:51 -0700 Subject: [PATCH 5/6] fixed time setting --- .../connect/elasticsearch/jest/JestElasticsearchClient.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java b/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java index e4c0ce3cc..f71498a3a 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java +++ b/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java @@ -55,6 +55,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; public class JestElasticsearchClient implements ElasticsearchClient { @@ -90,7 +91,7 @@ public JestElasticsearchClient(String address) { JestClientFactory factory = new JestClientFactory(); factory.setHttpClientConfig(new HttpClientConfig.Builder(address) .readTimeout(60000) // explicitly setting read timeout of 60s - .maxConnectionIdleTime(30000) // explicitly setting idle ttl of 30s + .maxConnectionIdleTime(30000, TimeUnit.MILLISECONDS) // explicitly setting idle ttl of 30s .multiThreaded(true) .build() ); From 9d721b9503f25cb3e48d2ca7efa82179185b80e8 Mon Sep 17 00:00:00 2001 From: Eric Douglas Date: Fri, 9 Nov 2018 11:23:58 -0800 Subject: [PATCH 6/6] allowing parse exceptions and failed deletes to be swallowed without crashing connector --- .../elasticsearch/jest/JestElasticsearchClient.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java b/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java index f71498a3a..62e739530 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java +++ b/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java @@ -283,6 +283,12 @@ private Index toIndexRequest(IndexableRecord record) { return req.build(); } + private Boolean swallowableErrorType(final String errorType) { + return "mapper_parse_exception".equals(errorType) + || "mapper_parsing_exception".equals(errorType) + || "delete_failed_engine_exception".equals(errorType); + } + public BulkResponse executeBulk(BulkRequest bulk) throws IOException { final BulkResult result = client.execute(((JestBulkRequest) bulk).getBulk()); @@ -301,11 +307,8 @@ public BulkResponse executeBulk(BulkRequest bulk) throws IOException { final String errorType = parsedError.get("type").asText(""); if ("version_conflict_engine_exception".equals(errorType)) { versionConflicts.add(new Key(item.index, item.type, item.id)); - } else if ("mapper_parse_exception".equals(errorType)) { - retriable = false; - errors.add(item.error); - } else if ("delete_failed_engine_exception".equals(errorType)) { - // swallow failed delete operations, assumed to be delete on non-existent ID + } else if (swallowableErrorType(errorType)) { + // swallow failed delete and parse operations LOG.info( "Swallowing error type {} for document ID {} and index {}", errorType,