Skip to content
This repository was archived by the owner on Apr 29, 2024. It is now read-only.
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

public class JestElasticsearchClient implements ElasticsearchClient {

Expand Down Expand Up @@ -89,6 +90,8 @@ public JestElasticsearchClient(String address) {
try {
JestClientFactory factory = new JestClientFactory();
factory.setHttpClientConfig(new HttpClientConfig.Builder(address)
.readTimeout(60000) // explicitly setting read timeout of 60s
.maxConnectionIdleTime(30000, TimeUnit.MILLISECONDS) // explicitly setting idle ttl of 30s
.multiThreaded(true)
.build()
);
Expand Down Expand Up @@ -280,6 +283,12 @@ private Index toIndexRequest(IndexableRecord record) {
return req.build();
}

private Boolean swallowableErrorType(final String errorType) {
return "mapper_parse_exception".equals(errorType)
|| "mapper_parsing_exception".equals(errorType)
|| "delete_failed_engine_exception".equals(errorType);
}

public BulkResponse executeBulk(BulkRequest bulk) throws IOException {
final BulkResult result = client.execute(((JestBulkRequest) bulk).getBulk());

Expand All @@ -298,9 +307,15 @@ public BulkResponse executeBulk(BulkRequest bulk) throws IOException {
final String errorType = parsedError.get("type").asText("");
if ("version_conflict_engine_exception".equals(errorType)) {
versionConflicts.add(new Key(item.index, item.type, item.id));
} else if ("mapper_parse_exception".equals(errorType)) {
retriable = false;
errors.add(item.error);
} else if (swallowableErrorType(errorType)) {
// swallow failed delete and parse operations
LOG.info(
"Swallowing error type {} for document ID {} and index {}",
errorType,
item.id,
item.index
);
continue;
} else {
errors.add(item.error);
}
Expand Down