diff --git a/chunjun-connectors/chunjun-connector-http/pom.xml b/chunjun-connectors/chunjun-connector-http/pom.xml
index 1408c7eb3c..d117b09da0 100644
--- a/chunjun-connectors/chunjun-connector-http/pom.xml
+++ b/chunjun-connectors/chunjun-connector-http/pom.xml
@@ -48,6 +48,30 @@
javacsv
2.0
+
+
+ com.jayway.jsonpath
+ json-path
+ 2.8.0
+
+
+ net.minidev
+ json-smart
+
+
+
+
+
+ net.minidev
+ json-smart
+ 2.4.10
+
+
+ slf4j-api
+ org.slf4j
+ 1.7.36
+ provided
+
@@ -56,6 +80,10 @@
org.apache.maven.plugins
maven-antrun-plugin
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
diff --git a/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/HttpClient.java b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/HttpClient.java
index 8cb9ad959d..51e182797c 100644
--- a/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/HttpClient.java
+++ b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/HttpClient.java
@@ -26,6 +26,7 @@
import com.dtstack.chunjun.util.GsonUtil;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpUriRequest;
@@ -41,9 +42,7 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import static com.dtstack.chunjun.connector.http.common.ConstantValue.CSV_DECODE;
-import static com.dtstack.chunjun.connector.http.common.ConstantValue.TEXT_DECODE;
-import static com.dtstack.chunjun.connector.http.common.ConstantValue.XML_DECODE;
+import static com.dtstack.chunjun.connector.http.common.ConstantValue.*;
@Slf4j
public class HttpClient {
@@ -134,6 +133,9 @@ public void initPosition(HttpRequestParam requestParam, String response) {
}
public void execute() {
+ if (restConfig.getLimitRequestTime() < restConfig.getRequestTime()) {
+ return;
+ }
if (!running) {
return;
@@ -182,6 +184,9 @@ public void execute() {
first = false;
requestRetryTime = 3;
requestNumber++;
+ // 子类和父类使用同一个对象,可以向上汇报请求次数进度,以便及时触发finish
+ Integer requestTime = restConfig.getRequestTime();
+ restConfig.setRequestTime(++requestTime);
}
public void doExecute(int retryTime) {
@@ -204,6 +209,21 @@ public void doExecute(int retryTime) {
String responseValue;
int responseStatus;
try {
+ Map requestParam = currentParam.getParam();
+ Map requestBody = currentParam.getBody();
+ if (StringUtils.isNotBlank(restConfig.getPageParamName())) {
+ Integer pagePosition =
+ restConfig.getStartIndex()
+ + restConfig.getStep() * restConfig.getRequestTime();
+ if (pagePosition > restConfig.getEndIndex()) {
+ return;
+ }
+ if ("get".equals(restConfig.getRequestMode())) {
+ requestParam.put(restConfig.getPageParamName(), pagePosition);
+ } else {
+ requestBody.put(restConfig.getPageParamName(), pagePosition);
+ }
+ }
HttpUriRequest request =
HttpUtil.getRequest(
@@ -221,7 +241,8 @@ public void doExecute(int retryTime) {
return;
}
- responseValue = EntityUtils.toString(httpResponse.getEntity());
+ // utf-8:支持中文
+ responseValue = EntityUtils.toString(httpResponse.getEntity(), "utf-8");
responseStatus = httpResponse.getStatusLine().getStatusCode();
} catch (Throwable e) {
// 只要本次请求中出现了异常 都会进行重试,如果重试次数达到了就真正结束任务
@@ -264,9 +285,16 @@ public void doExecute(int retryTime) {
}
}
- responseParse.parse(responseValue, responseStatus, HttpRequestParam.copy(currentParam));
- while (responseParse.hasNext()) {
- processData(responseParse.next());
+ if (StringUtils.isBlank(responseValue)) {
+ reachEnd = true;
+ running = false;
+ } else {
+ responseParse.parse(
+ responseValue, responseStatus, HttpRequestParam.copy(currentParam));
+ while (responseParse.hasNext()) {
+ // 一条一条数据的增加
+ processData(responseParse.next());
+ }
}
if (-1 != restConfig.getCycles() && requestNumber >= restConfig.getCycles()) {
@@ -342,6 +370,8 @@ protected ResponseParse getResponseParse(AbstractRowConverter converter) {
return new XmlResponseParse(restConfig, converter);
case TEXT_DECODE:
return new TextResponseParse(restConfig, converter);
+ case OFFLINE_JSON_DECODE:
+ return new OfflineJsonResponseParse(restConfig, converter);
default:
return new JsonResponseParse(restConfig, converter);
}
diff --git a/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/OfflineJsonResponseParse.java b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/OfflineJsonResponseParse.java
new file mode 100644
index 0000000000..77ce92b4c1
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/OfflineJsonResponseParse.java
@@ -0,0 +1,81 @@
+package com.dtstack.chunjun.connector.http.client;
+
+import com.dtstack.chunjun.connector.http.common.HttpRestConfig;
+import com.dtstack.chunjun.converter.AbstractRowConverter;
+import com.dtstack.chunjun.util.GsonUtil;
+
+import com.google.common.collect.Lists;
+import com.google.gson.Gson;
+import com.jayway.jsonpath.JsonPath;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/** @Description 离线任务 @Author lianggao @Date 2023/6/1 下午5:50 */
+public class OfflineJsonResponseParse extends ResponseParse {
+
+ private String responseValue;
+ private HttpRequestParam requestParam;
+ private final Gson gson;
+ private Iterator