From 56fe3d66a282225769a42433a4bb9c5414485e7f Mon Sep 17 00:00:00 2001 From: Eric Douglas Date: Wed, 9 May 2018 12:07:53 -0700 Subject: [PATCH 1/5] attempting to swallow failed deletes, setting explicit timeout of 30s on jest client --- .../connect/elasticsearch/jest/JestElasticsearchClient.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java b/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java index aface56d2..31bb27c6d 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java +++ b/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java @@ -89,6 +89,7 @@ public JestElasticsearchClient(String address) { try { JestClientFactory factory = new JestClientFactory(); factory.setHttpClientConfig(new HttpClientConfig.Builder(address) + .readTimeout(30000) // explicitly setting timeout of 30s .multiThreaded(true) .build() ); @@ -301,6 +302,10 @@ public BulkResponse executeBulk(BulkRequest bulk) throws IOException { } else if ("mapper_parse_exception".equals(errorType)) { retriable = false; errors.add(item.error); + } else if ("delete_failed_engine_exception".equals(errorType)) { + // swallow failed delete operations, assumed to be delete on non-existent ID + LOG.info("Swallowing error type {} for document ID {} and index {}", errorType, item.id, item.index); + continue; } else { errors.add(item.error); } From 903cc981def8a998236ba69686f91f1da127cc7a Mon Sep 17 00:00:00 2001 From: Adam Vondersaar Date: Wed, 9 May 2018 14:34:15 -0700 Subject: [PATCH 2/5] Updated formatting to pass checktype --- .../elasticsearch/jest/JestElasticsearchClient.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java b/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java index 31bb27c6d..47aa9f2e5 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java +++ b/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java @@ -304,7 +304,12 @@ public BulkResponse executeBulk(BulkRequest bulk) throws IOException { errors.add(item.error); } else if ("delete_failed_engine_exception".equals(errorType)) { // swallow failed delete operations, assumed to be delete on non-existent ID - LOG.info("Swallowing error type {} for document ID {} and index {}", errorType, item.id, item.index); + LOG.info( + "Swallowing error type {} for document ID {} and index {}", + errorType, + item.id, + item.index + ); continue; } else { errors.add(item.error); From 0bc4cc7b28439dd521b1660335076de051f6157c Mon Sep 17 00:00:00 2001 From: Eric Douglas Date: Wed, 13 Jun 2018 14:55:58 -0700 Subject: [PATCH 3/5] updated read timeout on jest client from 30s to 60s --- .../connect/elasticsearch/jest/JestElasticsearchClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java b/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java index 47aa9f2e5..1ac4a3596 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java +++ b/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java @@ -89,7 +89,7 @@ public JestElasticsearchClient(String address) { try { JestClientFactory factory = new JestClientFactory(); factory.setHttpClientConfig(new HttpClientConfig.Builder(address) - .readTimeout(30000) // explicitly setting timeout of 30s + .readTimeout(60000) // explicitly setting timeout of 60s .multiThreaded(true) .build() ); From 945d6829f4982063f6cbf5b6e5558a3b79c8e34d Mon Sep 17 00:00:00 2001 From: Eric Douglas Date: Wed, 13 Jun 2018 15:10:26 -0700 Subject: [PATCH 4/5] added max connection idle timeout of 30s to jest client --- .../connect/elasticsearch/jest/JestElasticsearchClient.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java b/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java index 1ac4a3596..e4c0ce3cc 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java +++ b/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java @@ -89,7 +89,8 @@ public JestElasticsearchClient(String address) { try { JestClientFactory factory = new JestClientFactory(); factory.setHttpClientConfig(new HttpClientConfig.Builder(address) - .readTimeout(60000) // explicitly setting timeout of 60s + .readTimeout(60000) // explicitly setting read timeout of 60s + .maxConnectionIdleTime(30000) // explicitly setting idle ttl of 30s .multiThreaded(true) .build() ); From 246b6fd63b6bc0755f4663d850132805486c3378 Mon Sep 17 00:00:00 2001 From: Adam Vondersaar Date: Wed, 13 Jun 2018 15:20:51 -0700 Subject: [PATCH 5/5] fixed time setting --- .../connect/elasticsearch/jest/JestElasticsearchClient.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java b/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java index e4c0ce3cc..f71498a3a 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java +++ b/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java @@ -55,6 +55,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; public class JestElasticsearchClient implements ElasticsearchClient { @@ -90,7 +91,7 @@ public JestElasticsearchClient(String address) { JestClientFactory factory = new JestClientFactory(); factory.setHttpClientConfig(new HttpClientConfig.Builder(address) .readTimeout(60000) // explicitly setting read timeout of 60s - .maxConnectionIdleTime(30000) // explicitly setting idle ttl of 30s + .maxConnectionIdleTime(30000, TimeUnit.MILLISECONDS) // explicitly setting idle ttl of 30s .multiThreaded(true) .build() );