From 964cbc5f555bbfa42ec43aa71ae5349186d78449 Mon Sep 17 00:00:00 2001
From: zhangyongxiang <1306835795@qq.com>
Date: Wed, 4 Sep 2024 19:09:19 +0800
Subject: [PATCH] feat(RestWriter): add restfull api writer
add restfull api writer
[skip ci]
---
restwriter/doc/restwriter.md | 242 ++++++++
restwriter/pom.xml | 99 ++++
restwriter/src/main/assembly/package.xml | 35 ++
.../datax/plugin/writer/restwriter/Key.java | 47 ++
.../plugin/writer/restwriter/RestWriter.java | 533 ++++++++++++++++++
.../restwriter/RestWriterErrorCode.java | 68 +++
.../writer/restwriter/conf/ClientConfig.java | 19 +
.../plugin/writer/restwriter/conf/Field.java | 17 +
.../writer/restwriter/conf/Operation.java | 32 ++
.../writer/restwriter/conf/Process.java | 29 +
.../handler/ObjectRecordConverter.java | 95 ++++
.../restwriter/handler/RecordConverter.java | 14 +
.../restwriter/handler/TypeHandler.java | 9 +
.../handler/TypeHandlerRegistry.java | 62 ++
.../handler/bool/BoolVoidTypeHandler.java | 17 +
.../handler/bytes/BytesVoidTypeHandler.java | 19 +
.../date/DateLocalDateTimeTypeHandler.java | 29 +
.../handler/date/DateVoidTypeHandler.java | 18 +
.../handler/string/StringVoidTypeHandler.java | 18 +
.../typedouble/DoubleVoidTypeHandler.java | 18 +
.../handler/typeint/IntVoidTypeHandler.java | 20 +
.../handler/typelong/LongVoidTypeHandler.java | 18 +
.../handler/typenull/NullVoidTypeHandler.java | 19 +
.../restwriter/process/MapAccessor.java | 115 ++++
.../OperationExecutionFailException.java | 16 +
.../restwriter/process/ProcessCategory.java | 29 +
.../restwriter/process/ProcessExecutor.java | 275 +++++++++
.../restwriter/process/ProcessFactory.java | 37 ++
.../validator/ConfigurationValidator.java | 111 ++++
.../validator/HeadersParameterValidator.java | 22 +
.../validator/MethodParameterValidator.java | 37 ++
.../validator/ParameterValidator.java | 14 +
.../validator/ProcessValidator.java | 60 ++
.../validator/UrlParameterValidator.java | 44 ++
restwriter/src/main/resources/plugin.json | 6 +
.../main/resources/plugin_job_template.json | 83 +++
36 files changed, 2326 insertions(+)
create mode 100644 restwriter/doc/restwriter.md
create mode 100755 restwriter/pom.xml
create mode 100755 restwriter/src/main/assembly/package.xml
create mode 100644 restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/Key.java
create mode 100755 restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/RestWriter.java
create mode 100644 restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/RestWriterErrorCode.java
create mode 100644 restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/conf/ClientConfig.java
create mode 100644 restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/conf/Field.java
create mode 100644 restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/conf/Operation.java
create mode 100644 restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/conf/Process.java
create mode 100644 restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/handler/ObjectRecordConverter.java
create mode 100644 restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/handler/RecordConverter.java
create mode 100644 restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/handler/TypeHandler.java
create mode 100644 restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/handler/TypeHandlerRegistry.java
create mode 100644 restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/handler/bool/BoolVoidTypeHandler.java
create mode 100644 restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/handler/bytes/BytesVoidTypeHandler.java
create mode 100644 restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/handler/date/DateLocalDateTimeTypeHandler.java
create mode 100644 restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/handler/date/DateVoidTypeHandler.java
create mode 100644 restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/handler/string/StringVoidTypeHandler.java
create mode 100644 restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/handler/typedouble/DoubleVoidTypeHandler.java
create mode 100644 restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/handler/typeint/IntVoidTypeHandler.java
create mode 100644 restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/handler/typelong/LongVoidTypeHandler.java
create mode 100644 restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/handler/typenull/NullVoidTypeHandler.java
create mode 100644 restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/process/MapAccessor.java
create mode 100644 restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/process/OperationExecutionFailException.java
create mode 100644 restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/process/ProcessCategory.java
create mode 100644 restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/process/ProcessExecutor.java
create mode 100644 restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/process/ProcessFactory.java
create mode 100644 restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/validator/ConfigurationValidator.java
create mode 100644 restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/validator/HeadersParameterValidator.java
create mode 100644 restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/validator/MethodParameterValidator.java
create mode 100644 restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/validator/ParameterValidator.java
create mode 100644 restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/validator/ProcessValidator.java
create mode 100644 restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/validator/UrlParameterValidator.java
create mode 100755 restwriter/src/main/resources/plugin.json
create mode 100644 restwriter/src/main/resources/plugin_job_template.json
diff --git a/restwriter/doc/restwriter.md b/restwriter/doc/restwriter.md
new file mode 100644
index 00000000..0722be3c
--- /dev/null
+++ b/restwriter/doc/restwriter.md
@@ -0,0 +1,242 @@
+# DataX REstWriter 说明
+
+------------
+
+## 1 快速介绍
+
+RestWriter提供了Restful接口写入数据的能力。RestWriter服务的用户主要在于DataX开发、测试同学。
+
+## 2 功能与限制
+
+RestWriter实现了向Restful接口同步数据的通用能力,RestWriter如下几个方面约定:
+
+1. 通过请求题写入数据,支持post、put、patch3种http方法
+
+2. 通过返回的http状态码判断写入成功或者失败
+
+3. 支持失败重试与限流
+
+我们不能做到:
+
+1. 不支持嵌套对象。
+
+## 3 功能说明
+
+### 3.1 配置样例
+
+```json
+{
+ "setting": {},
+ "job": {
+ "setting": {
+ "speed": {
+ "channel": 2
+ }
+ },
+ "content": [
+ {
+ "reader": {
+ "name": "txtfilereader",
+ "parameter": {
+ "path": [
+ "/home/haiwei.luo/case00/data"
+ ],
+ "encoding": "UTF-8",
+ "column": [
+ {
+ "index": 0,
+ "type": "long"
+ },
+ {
+ "index": 1,
+ "type": "boolean"
+ },
+ {
+ "index": 2,
+ "type": "double"
+ },
+ {
+ "index": 3,
+ "type": "string"
+ },
+ {
+ "index": 4,
+ "type": "date",
+ "format": "yyyy.MM.dd"
+ }
+ ],
+ "fieldDelimiter": ","
+ }
+ },
+ "writer": {
+ "name": "restwriter",
+ "parameter": {
+ "url": "http://localhost:8080/echo",
+ "method": "post",
+ "ssl": false,
+ "headers": {
+ "aaa": "bbbb"
+ },
+ "query": {
+ "test": "test"
+ },
+ "maxRetries": 3,
+ "batch": false,
+ "batchSize": 1000,
+ "fields": [
+ {
+ "name": "id"
+ },
+ {
+ "name": "jobGroup"
+ },
+ {
+ "name": "jobId"
+ },
+ {
+ "name": "executorAddress"
+ },
+ {
+ "name": "executorHandler"
+ },
+ {
+ "name": "executorParam"
+ },
+ {
+ "name": "executorShardingParam"
+ },
+ {
+ "name": "executorFailRetryCount"
+ },
+ {
+ "name": "triggerTime",
+ "type": "java.time.LocalDateTime"
+ },
+ {
+ "name": "triggerCode"
+ },
+ {
+ "name": "triggerMsg"
+ },
+ {
+ "name": "handleTime",
+ "type": "java.time.LocalDateTime"
+ },
+ {
+ "name": "handleCode"
+ },
+ {
+ "name": "handleMsg"
+ },
+ {
+ "name": "alarmStatus"
+ }
+ ],
+ "print": true,
+ "failFast": false,
+ "rate-per-task": 10
+ }
+ }
+ }
+ ]
+ }
+}
+```
+
+### 3.2 参数说明
+
+* **url**
+
+ * 描述:restful API URL,经过转义后的URL,RestWriter不负责转义特殊字符,比如空格等。
+
+ * 必选:是
+
+ * 默认值:无
+
+* **method**
+
+ * 描述:http method
+
+ * 必选:是
+
+ * 默认值:无
+
+* **ssl**
+
+ * 描述:restful api是https/http,如果在url中给出protocol,则以url中为准
+
+ * 必选:否
+
+ * 默认值:false
+
+* **headers**
+
+ * 描述:http请求头
+
+ * 必选:否
+
+ * 默认值:无
+
+* **query**
+
+ * 描述:查询参数。
+
+ * 必选:否
+
+ * 默认值:无
+
+* **maxRetries**
+
+ * 描述:最大失败重试次数。
+
+ * 必选:否
+
+ * 默认值:3
+
+
+* **batch**
+
+ * 描述:是否批量处理
+
+ * 必选:否
+
+ * 默认值:false
+
+* **batchSize**
+
+ * 描述:批量处理最大条数
+
+ * 必选:否
+
+ * 默认值:100
+
+* **fields**
+
+ * 描述字段信息
+
+ * 必选:是
+
+ * 默认值:无
+
+* **print**
+
+ * 描述:是否打印debug信息。
+
+ * 必选:否
+
+ * 默认值:false
+
+### 3.3 类型转换
+
+
+## 4 性能报告
+
+## 5 约束限制
+
+略
+
+## 6 FAQ
+
+略
+
+
diff --git a/restwriter/pom.xml b/restwriter/pom.xml
new file mode 100755
index 00000000..2a8f31b8
--- /dev/null
+++ b/restwriter/pom.xml
@@ -0,0 +1,99 @@
+
+ 4.0.0
+
+ com.alibaba.datax
+ datax-all
+ 0.0.1-SNAPSHOT
+
+
+ restwriter
+ restwriter
+ RestWriter提供了通过HTTP请求写入的功能
+ jar
+
+
+
+ com.alibaba.datax
+ datax-common
+ ${datax-project-version}
+
+
+ slf4j-log4j12
+ org.slf4j
+
+
+
+
+ org.slf4j
+ slf4j-api
+
+
+ ch.qos.logback
+ logback-classic
+
+
+ org.projectlombok
+ lombok
+ 1.18.28
+
+
+ com.google.guava
+ guava
+ 32.1.2-jre
+
+
+ com.konghq
+ unirest-java
+ 3.14.5
+
+
+ org.apache.commons
+ commons-collections4
+ 4.4
+
+
+ dev.failsafe
+ failsafe
+ 3.3.2
+
+
+ org.springframework
+ spring-expression
+ 5.3.30
+
+
+
+
+
+
+
+
+ maven-compiler-plugin
+
+ ${jdk-version}
+ ${jdk-version}
+ ${project-sourceEncoding}
+
+
+
+ maven-assembly-plugin
+
+
+ src/main/assembly/package.xml
+
+ datax
+
+
+
+ dwzip
+ package
+
+ single
+
+
+
+
+
+
+
diff --git a/restwriter/src/main/assembly/package.xml b/restwriter/src/main/assembly/package.xml
new file mode 100755
index 00000000..ec0ce6a6
--- /dev/null
+++ b/restwriter/src/main/assembly/package.xml
@@ -0,0 +1,35 @@
+
+
+
+ dir
+
+ false
+
+
+ src/main/resources
+
+ plugin.json
+ plugin_job_template.json
+
+ plugin/writer/restwriter
+
+
+ target/
+
+ restwriter-0.0.1-SNAPSHOT.jar
+
+ plugin/writer/restwriter
+
+
+
+
+
+ false
+ plugin/writer/restwriter/libs
+ runtime
+
+
+
diff --git a/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/Key.java b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/Key.java
new file mode 100644
index 00000000..1eaedd87
--- /dev/null
+++ b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/Key.java
@@ -0,0 +1,47 @@
+package com.alibaba.datax.plugin.writer.restwriter;
+
+/**
+ * @author: zhangyongxiang
+ * @date 2023/8/24 11:38
+ **/
+public final class Key {
+
+ private Key() {}
+
+ public static final String URL = "url";
+
+ public static final String HTTP_METHOD = "method";
+
+ public static final String HTTP_SSL = "ssl";
+
+ public static final String HTTP_HEADERS = "headers";
+
+ public static final String HTTP_QUERY = "query";
+
+ public static final String MAX_RETRIES = "maxRetries";
+
+ public static final String BATCH_MODE = "batch";
+
+ public static final String BATCH_SIZE = "batchSize";
+
+ public static final String TASK_INDEX = "taskIndex";
+
+ public static final String FIELDS = "fields";
+
+ public static final String DEBUG = "debug";
+
+ public static final String FAIL_FAST = "failFast";
+
+ public static final String RATE_PER_TASK = "rate-per-task";
+
+ public static final String CLIENT = "client";
+
+ public static final String PREPROCESS = "preprocess";
+
+ public static final String POSTPROCESS = "postprocess";
+
+ public static final String ADDITIONAL_CONCURRENT = "concurrent";
+
+ public static final String ADDITIONAL_OPERATIONS = "operations";
+
+}
diff --git a/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/RestWriter.java b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/RestWriter.java
new file mode 100755
index 00000000..a631ad83
--- /dev/null
+++ b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/RestWriter.java
@@ -0,0 +1,533 @@
+package com.alibaba.datax.plugin.writer.restwriter;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import org.apache.http.HttpException;
+
+import com.alibaba.datax.common.element.Column;
+import com.alibaba.datax.common.element.Record;
+import com.alibaba.datax.common.exception.DataXException;
+import com.alibaba.datax.common.plugin.RecordReceiver;
+import com.alibaba.datax.common.spi.Writer;
+import com.alibaba.datax.common.util.Configuration;
+import com.alibaba.datax.plugin.writer.restwriter.conf.ClientConfig;
+import com.alibaba.datax.plugin.writer.restwriter.conf.Field;
+import com.alibaba.datax.plugin.writer.restwriter.conf.Process;
+import com.alibaba.datax.plugin.writer.restwriter.handler.ObjectRecordConverter;
+import com.alibaba.datax.plugin.writer.restwriter.handler.TypeHandlerRegistry;
+import com.alibaba.datax.plugin.writer.restwriter.process.ProcessExecutor;
+import com.alibaba.datax.plugin.writer.restwriter.process.ProcessFactory;
+import com.alibaba.datax.plugin.writer.restwriter.validator.ConfigurationValidator;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONException;
+import com.google.common.collect.Lists;
+
+import dev.failsafe.Failsafe;
+import dev.failsafe.FailsafeExecutor;
+import dev.failsafe.RateLimiter;
+import dev.failsafe.RetryPolicy;
+import kong.unirest.HttpMethod;
+import kong.unirest.HttpResponse;
+import kong.unirest.HttpStatus;
+import kong.unirest.JsonNode;
+import kong.unirest.Unirest;
+import kong.unirest.UnirestInstance;
+import lombok.EqualsAndHashCode;
+import lombok.extern.slf4j.Slf4j;
+
+import static com.alibaba.datax.plugin.writer.restwriter.Key.BATCH_MODE;
+import static com.alibaba.datax.plugin.writer.restwriter.Key.BATCH_SIZE;
+import static com.alibaba.datax.plugin.writer.restwriter.Key.CLIENT;
+import static com.alibaba.datax.plugin.writer.restwriter.Key.DEBUG;
+import static com.alibaba.datax.plugin.writer.restwriter.Key.FAIL_FAST;
+import static com.alibaba.datax.plugin.writer.restwriter.Key.FIELDS;
+import static com.alibaba.datax.plugin.writer.restwriter.Key.HTTP_HEADERS;
+import static com.alibaba.datax.plugin.writer.restwriter.Key.HTTP_METHOD;
+import static com.alibaba.datax.plugin.writer.restwriter.Key.HTTP_QUERY;
+import static com.alibaba.datax.plugin.writer.restwriter.Key.HTTP_SSL;
+import static com.alibaba.datax.plugin.writer.restwriter.Key.MAX_RETRIES;
+import static com.alibaba.datax.plugin.writer.restwriter.Key.RATE_PER_TASK;
+import static com.alibaba.datax.plugin.writer.restwriter.Key.TASK_INDEX;
+import static com.alibaba.datax.plugin.writer.restwriter.Key.URL;
+import static com.alibaba.datax.plugin.writer.restwriter.RestWriterErrorCode.HTTP_CLIENT_CONFIG_INVALID_EXCEPTION;
+import static com.alibaba.datax.plugin.writer.restwriter.RestWriterErrorCode.RUNTIME_EXCEPTION;
+import static com.alibaba.datax.plugin.writer.restwriter.process.ProcessCategory.POSTPROCESS;
+import static com.alibaba.datax.plugin.writer.restwriter.process.ProcessCategory.PREPROCESS;
+import static java.util.Objects.isNull;
+import static java.util.Objects.nonNull;
+import static kong.unirest.ContentType.APPLICATION_JSON;
+import static kong.unirest.HeaderNames.CONTENT_TYPE;
+import static org.apache.commons.collections4.CollectionUtils.isNotEmpty;
+import static org.apache.commons.collections4.MapUtils.emptyIfNull;
+import static org.apache.commons.lang3.StringUtils.prependIfMissing;
+
+/**
+ * @author zhangyongxiang
+ * @date 2023-08-23
+ */
+@Slf4j
+public class RestWriter extends Writer {
+
+ public static final int DEFAULT_MAX_RETRIES_VALUE = 3;
+
+ public static final int DEFAULT_BATCH_SIZE_VALUE = 100;
+
+ public static final boolean DEFAULT_BATCH_MODE_VALUE = false;
+
+ public static final boolean DEFAULT_DEBUG_VALUE = false;
+
+ public static final boolean DEFAULT_FAIL_FAST_VALUE = false;
+
+ public static final boolean DEFAULT_SSL_VALUE = false;
+
+ @Slf4j
+ @EqualsAndHashCode(callSuper = true)
+ public static class Job extends Writer.Job {
+
+ private Configuration originalConfig;
+
+ private long startTime;
+
+ private long endTime;
+
+ private Process preprocess;
+
+ private Process postprocess;
+
+ private ProcessFactory processFactory;
+
+ private ProcessExecutor processExecutor;
+
+ @Override
+ public void init() {
+ this.originalConfig = super.getPluginJobConf();
+ this.validateParameter();
+ this.processFactory = new ProcessFactory(this.originalConfig);
+ this.preprocess = this.processFactory.createProcess(PREPROCESS);
+ this.postprocess = this.processFactory.createProcess(POSTPROCESS);
+ this.processExecutor = new ProcessExecutor();
+ log.info(
+ "{} job initialized, desc: {}, developer: {}, job conf: {}, job preprocess: {}, job postprocess: {}",
+ this.getPluginName(), this.getDescription(),
+ this.getDeveloper(), this.getPluginJobConf(),
+ this.preprocess, this.postprocess);
+ }
+
+ private void validateParameter() {
+ try {
+ new ConfigurationValidator().validate(this.originalConfig,
+ null);
+ } catch (final Exception se) {
+ throw DataXException.asDataXException(RUNTIME_EXCEPTION,
+ "an exception has occurred when validating parameters",
+ se);
+ }
+ }
+
+ @Override
+ public void preCheck() {
+ log.info("job {} pre check will not be called",
+ this.getPluginName());
+ }
+
+ @Override
+ public void prepare() {
+ this.startTime = System.currentTimeMillis();
+
+ if (nonNull(this.preprocess)
+ && isNotEmpty(this.preprocess.getOperations())) {
+ this.processExecutor.execute(this.preprocess);
+ log.info(
+ "{} job prepared successfully after preprocess, job conf: {}, preprocess: {}",
+ this.getPluginName(), this.originalConfig,
+ this.preprocess);
+ } else {
+ log.info(
+ "{} job prepared without need any of preprocess, job conf: {}",
+ this.getPluginName(), this.originalConfig);
+ }
+ }
+
+ @Override
+ public List split(final int mandatoryNumber) {
+ int finalMandatoryNumber = mandatoryNumber;
+ if (finalMandatoryNumber < 1) {
+ log.warn(
+ "mandatory number {} less than one, reset it to be one",
+ mandatoryNumber);
+ finalMandatoryNumber = 1;
+ }
+ final List configurations = Lists
+ .newArrayListWithExpectedSize(finalMandatoryNumber);
+
+ for (int index = 0; index < finalMandatoryNumber; index++) {
+ final Configuration taskConf = this.originalConfig.clone();
+ taskConf.set(TASK_INDEX, index);
+ configurations.add(taskConf);
+ log.info(
+ "{} job split into {} tasks, current task: {}, desc: {}, developer: {}, task conf: {}",
+ this.getPluginName(), finalMandatoryNumber, index,
+ this.getDescription(), this.getDeveloper(), taskConf);
+ }
+ return configurations;
+ }
+
+ @Override
+ public void post() {
+ this.endTime = System.currentTimeMillis();
+
+ if (nonNull(this.postprocess)
+ && isNotEmpty(this.postprocess.getOperations())) {
+ this.processExecutor.execute(this.postprocess);
+ log.info(
+ "{} postprocess execute successfully, postprocess: {}",
+ this.getPluginName(), this.postprocess);
+ }
+
+ log.info(
+ "job {} execute to end, start from {}, end to {}, total time: {}",
+ this.getPluginName(),
+ Instant.ofEpochMilli(this.startTime)
+ .atZone(ZoneId.systemDefault()).toLocalDateTime(),
+ Instant.ofEpochMilli(this.endTime)
+ .atZone(ZoneId.systemDefault()).toLocalDateTime(),
+ Duration.ofMillis(this.endTime - this.startTime));
+ }
+
+ @Override
+ public void destroy() {
+ log.info("job {} destroy, nothing to clean up",
+ this.getPluginName());
+ }
+ }
+
+ @Slf4j
+ @EqualsAndHashCode(callSuper = true)
+ public static class Task extends Writer.Task {
+
+ private long startTime;
+
+ private long endTime;
+
+ private int successCount;
+
+ private int failCount;
+
+ private UnirestInstance unirest;
+
+ private ObjectRecordConverter converter;
+
+ private FailsafeExecutor> executor;
+
+ private Configuration writerSliceConfig;
+
+ private Integer taskIndex;
+
+ private String url;
+
+ private HttpMethod method;
+
+ private boolean ssl;
+
+ private Map headers;
+
+ private Map query;
+
+ private Integer maxRetries;
+
+ private boolean batchMode;
+
+ private Integer batchSize;
+
+ private List fields;
+
+ private boolean debug;
+
+ private boolean failFast;
+
+ private Integer ratePerTask;
+
+ private Duration avgWriteTime;
+
+ private final ClientConfig clientConfig = new ClientConfig();
+
+ @Override
+ public void init() {
+ this.writerSliceConfig = this.getPluginJobConf();
+ this.taskIndex = this.writerSliceConfig.getInt(TASK_INDEX);
+ this.url = this.writerSliceConfig.getString(URL).trim();
+ this.method = HttpMethod
+ .valueOf(this.writerSliceConfig.getString(HTTP_METHOD));
+ this.ssl = this.writerSliceConfig.getBool(HTTP_SSL,
+ DEFAULT_SSL_VALUE);
+ this.headers = emptyIfNull(
+ this.writerSliceConfig.getMap(HTTP_HEADERS, String.class));
+ this.query = emptyIfNull(this.writerSliceConfig.getMap(HTTP_QUERY));
+ this.maxRetries = this.writerSliceConfig.getInt(MAX_RETRIES,
+ DEFAULT_MAX_RETRIES_VALUE);
+ this.batchMode = this.writerSliceConfig.getBool(BATCH_MODE,
+ DEFAULT_BATCH_MODE_VALUE);
+ this.batchSize = this.writerSliceConfig.getInt(BATCH_SIZE,
+ DEFAULT_BATCH_SIZE_VALUE);
+ this.fields = this.writerSliceConfig.getListWithJson(FIELDS,
+ Field.class);
+ this.debug = this.writerSliceConfig.getBool(DEBUG,
+ DEFAULT_DEBUG_VALUE);
+ this.failFast = this.writerSliceConfig.getBool(FAIL_FAST,
+ DEFAULT_FAIL_FAST_VALUE);
+ this.ratePerTask = this.writerSliceConfig.getInt(RATE_PER_TASK);
+ final Object client = this.writerSliceConfig.get(CLIENT);
+ if (nonNull(client)) {
+ try {
+ final ClientConfig config = JSON.parseObject(
+ JSON.toJSONString(client), ClientConfig.class);
+ if (nonNull(config)) {
+ if (config.getMaxPerRoute() > 0) {
+ this.clientConfig
+ .setMaxPerRoute(config.getMaxPerRoute());
+ }
+ if (config.getMaxTotal() > 0) {
+ this.clientConfig.setMaxTotal(config.getMaxTotal());
+ }
+ }
+ } catch (final JSONException e) {
+ throw DataXException.asDataXException(
+ HTTP_CLIENT_CONFIG_INVALID_EXCEPTION,
+ String.format("client config: %s",
+ JSON.toJSONString(client)));
+ }
+ }
+ log.info(
+ "{} task {} initialized, desc: {}, developer: {}, task conf: {}",
+ this.getPluginName(), this.taskIndex, this.getDescription(),
+ this.getDeveloper(), this.writerSliceConfig);
+ }
+
+ @Override
+ public void prepare() {
+ if (this.url.startsWith("http://")) {
+ this.ssl = false;
+ } else if (this.url.startsWith("https://")) {
+ this.ssl = true;
+ } else if (this.ssl) {
+ this.url = prependIfMissing(this.url, "https://");
+ } else {
+ this.url = prependIfMissing(this.url, "http://");
+ }
+
+ this.unirest = Unirest.spawnInstance();
+ if (!emptyIfNull(this.headers).isEmpty()) {
+ this.headers.forEach(this.unirest.config()::addDefaultHeader);
+ }
+ if (!this.headers.containsKey(CONTENT_TYPE)) {
+ this.unirest.config().addDefaultHeader(CONTENT_TYPE,
+ APPLICATION_JSON.getMimeType());
+ }
+ this.unirest.config().addShutdownHook(true);
+ this.unirest.config().defaultBaseUrl(this.url);
+ this.unirest.config().verifySsl(false);
+ this.unirest.config().automaticRetries(false);
+ this.unirest.config().concurrency(this.clientConfig.getMaxTotal(),
+ this.clientConfig.getMaxPerRoute());
+
+ this.converter = new ObjectRecordConverter(
+ new TypeHandlerRegistry(), this.fields);
+
+ final RetryPolicy> retryPolicy = RetryPolicy
+ .>builder()
+ .handleResultIf(response -> response
+ .getStatus() >= HttpStatus.BAD_REQUEST)
+ .onFailedAttempt(e -> log.error(
+ "write failed, attempt execution times: {},"
+ + " possible result response code: {}, possible result response body: {}",
+ e.getAttemptCount() + 1,
+ Optional.ofNullable(e.getLastResult())
+ .map(HttpResponse::getStatusText)
+ .orElse(null),
+ Optional.ofNullable(e.getLastResult())
+ .map(HttpResponse::getBody).orElse(null),
+ e.getLastException()))
+ .onRetry(e -> log.warn("failure #{}th retrying.",
+ e.getAttemptCount()))
+ .onRetriesExceeded(e -> log.error(
+ "fail to write. max retries exceeded. cause: {}",
+ nonNull(e.getException())
+ ? e.getException().getMessage()
+ : e.getResult().getStatusText(),
+ e.getException()))
+ .build();
+ if (isNull(this.ratePerTask) || this.ratePerTask <= 0) {
+ this.executor = Failsafe.with(retryPolicy);
+ } else {
+ this.executor = Failsafe
+ .with(RateLimiter
+ .>smoothBuilder(
+ this.ratePerTask, Duration.ofSeconds(1))
+ .withMaxWaitTime(Duration.ofDays(365)).build())
+ .compose(retryPolicy);
+ }
+ this.startTime = System.currentTimeMillis();
+ this.successCount = 0;
+ this.failCount = 0;
+ log.info(
+ "{} task {} prepared, desc: {}, developer: {}, task conf: {}",
+ this.getPluginName(), this.taskIndex, this.getDescription(),
+ this.getDeveloper(), this.writerSliceConfig);
+ log.info("http client config: {}", this.clientConfig);
+ }
+
+ @Override
+ public void preCheck() {
+ log.info(
+ "{} task {} check will not be called, desc: {}, developer: {}, task conf {}",
+ this.getPluginName(), this.taskIndex, this.getDescription(),
+ this.getDeveloper(), this.writerSliceConfig);
+ }
+
+ @Override
+ public void startWrite(final RecordReceiver lineReceiver) {
+ final List writerBuffer = new ArrayList<>(this.batchSize);
+ Record recordItem = null;
+ while ((recordItem = lineReceiver.getFromReader()) != null) {
+ if (this.batchMode) {
+ writerBuffer.add(recordItem);
+ if (writerBuffer.size() >= this.batchSize) {
+ this.doWrite(writerBuffer);
+ writerBuffer.clear();
+ }
+
+ } else {
+ this.doWrite(recordItem);
+ }
+ if (this.debug) {
+ final int bound = recordItem.getColumnNumber();
+ for (int index = 0; index < bound; index++) {
+ final Column column = recordItem.getColumn(index);
+ log.info(
+ "colum type: {}, column type class: {}, raw data: {}, raw data class: {}, byte size: {}",
+ column.getType(),
+ column.getType().getClass().getName(),
+ column.getRawData(),
+ Optional.ofNullable(column.getRawData())
+ .map(Object::getClass)
+ .map(Class::getName).orElse(null),
+ column.getByteSize());
+ }
+ }
+ }
+ if (this.batchMode && !writerBuffer.isEmpty()) {
+ this.doWrite(writerBuffer);
+ writerBuffer.clear();
+ }
+
+ }
+
+ @Override
+ public void post() {
+ this.endTime = System.currentTimeMillis();
+ this.unirest.close();
+ log.info(
+ "job {} task {} execute to end, start from {}, end to {}, total time: {}, avg write time: {}, "
+ + "total count: {}, fail count: {}",
+ this.getPluginName(), this.taskIndex,
+ Instant.ofEpochMilli(this.startTime)
+ .atZone(ZoneId.systemDefault()).toLocalDateTime(),
+ Instant.ofEpochMilli(this.endTime)
+ .atZone(ZoneId.systemDefault()).toLocalDateTime(),
+ Duration.ofMillis(this.endTime - this.startTime),
+ this.avgWriteTime, this.successCount + this.failCount,
+ this.failCount);
+ if (this.failCount > 0) {
+ log.error("job {} task {} execute to end, fail count: {}",
+ this.getPluginName(), this.taskIndex, this.failCount);
+ }
+ }
+
+ @Override
+ public void destroy() {
+
+ log.info("job {} task {} destroy", this.getPluginName(),
+ this.taskIndex);
+ }
+
+ private void doWrite(final Record item) {
+ try {
+ final HttpResponse resp = this.executor.get(ctx -> {
+ final Map body = this.converter
+ .convert(item);
+ return executeRequest(1, body);
+ });
+ if (resp.getStatus() >= HttpStatus.BAD_REQUEST) {
+ throw new HttpException(resp.getStatusText());
+ }
+ } catch (final Exception e) {
+ if (this.failFast) {
+ throw DataXException.asDataXException(RUNTIME_EXCEPTION,
+ e.getMessage(), e);
+ }
+ }
+
+ }
+
+ private void doWrite(final List records) {
+ try {
+ final HttpResponse resp = this.executor.get(ctx -> {
+ final List