feat(RestWriter): add restfull api writer

add restfull api writer

[skip ci]
This commit is contained in:
zhangyongxiang 2024-09-04 19:09:19 +08:00
parent 3614c2633e
commit 964cbc5f55
36 changed files with 2326 additions and 0 deletions

View File

@ -0,0 +1,242 @@
# DataX REstWriter 说明
------------
## 1 快速介绍
RestWriter提供了Restful接口写入数据的能力。RestWriter服务的用户主要在于DataX开发、测试同学。
## 2 功能与限制
RestWriter实现了向Restful接口同步数据的通用能力RestWriter如下几个方面约定:
1. 通过请求题写入数据支持post、put、patch3种http方法
2. 通过返回的http状态码判断写入成功或者失败
3. 支持失败重试与限流
我们不能做到:
1. 不支持嵌套对象。
## 3 功能说明
### 3.1 配置样例
```json
{
"setting": {},
"job": {
"setting": {
"speed": {
"channel": 2
}
},
"content": [
{
"reader": {
"name": "txtfilereader",
"parameter": {
"path": [
"/home/haiwei.luo/case00/data"
],
"encoding": "UTF-8",
"column": [
{
"index": 0,
"type": "long"
},
{
"index": 1,
"type": "boolean"
},
{
"index": 2,
"type": "double"
},
{
"index": 3,
"type": "string"
},
{
"index": 4,
"type": "date",
"format": "yyyy.MM.dd"
}
],
"fieldDelimiter": ","
}
},
"writer": {
"name": "restwriter",
"parameter": {
"url": "http://localhost:8080/echo",
"method": "post",
"ssl": false,
"headers": {
"aaa": "bbbb"
},
"query": {
"test": "test"
},
"maxRetries": 3,
"batch": false,
"batchSize": 1000,
"fields": [
{
"name": "id"
},
{
"name": "jobGroup"
},
{
"name": "jobId"
},
{
"name": "executorAddress"
},
{
"name": "executorHandler"
},
{
"name": "executorParam"
},
{
"name": "executorShardingParam"
},
{
"name": "executorFailRetryCount"
},
{
"name": "triggerTime",
"type": "java.time.LocalDateTime"
},
{
"name": "triggerCode"
},
{
"name": "triggerMsg"
},
{
"name": "handleTime",
"type": "java.time.LocalDateTime"
},
{
"name": "handleCode"
},
{
"name": "handleMsg"
},
{
"name": "alarmStatus"
}
],
"print": true,
"failFast": false,
"rate-per-task": 10
}
}
}
]
}
}
```
### 3.2 参数说明
* **url**
* 描述restful API URL经过转义后的URLRestWriter不负责转义特殊字符比如空格等。 <br />
* 必选:是 <br />
* 默认值:无 <br />
* **method**
* 描述http method <br />
* 必选:是 <br />
* 默认值:无 <br />
* **ssl**
* 描述restful api是https/http如果在url中给出protocol则以url中为准<br />
* 必选:否 <br />
* 默认值false <br />
* **headers**
* 描述http请求头 <br />
* 必选:否 <br />
* 默认值:无 <br />
* **query**
* 描述:查询参数。 <br />
* 必选:否 <br />
* 默认值:无 <br />
* **maxRetries**
* 描述:最大失败重试次数。<br />
* 必选:否 <br />
* 默认值3 <br />
* **batch**
* 描述:是否批量处理<br />
* 必选:否 <br />
* 默认值false <br />
* **batchSize**
* 描述:批量处理最大条数<br />
* 必选:否 <br />
* 默认值100 <br />
* **fields**
* 描述字段信息<br />
* 必选:是 <br />
* 默认值:无 <br />
* **print**
* 描述是否打印debug信息。<br />
* 必选:否 <br />
* 默认值false <br />
### 3.3 类型转换
## 4 性能报告
## 5 约束限制
## 6 FAQ

99
restwriter/pom.xml Executable file
View File

@ -0,0 +1,99 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-all</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<artifactId>restwriter</artifactId>
<name>restwriter</name>
<description>RestWriter提供了通过HTTP请求写入的功能</description>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
<version>${datax-project-version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.28</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>32.1.2-jre</version>
</dependency>
<dependency>
<groupId>com.konghq</groupId>
<artifactId>unirest-java</artifactId>
<version>3.14.5</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.4</version>
</dependency>
<dependency>
<groupId>dev.failsafe</groupId>
<artifactId>failsafe</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-expression</artifactId>
<version>5.3.30</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${jdk-version}</source>
<target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
</descriptors>
<finalName>datax</finalName>
</configuration>
<executions>
<execution>
<id>dwzip</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,35 @@
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id></id>
<formats>
<format>dir</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources</directory>
<includes>
<include>plugin.json</include>
<include>plugin_job_template.json</include>
</includes>
<outputDirectory>plugin/writer/restwriter</outputDirectory>
</fileSet>
<fileSet>
<directory>target/</directory>
<includes>
<include>restwriter-0.0.1-SNAPSHOT.jar</include>
</includes>
<outputDirectory>plugin/writer/restwriter</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>plugin/writer/restwriter/libs</outputDirectory>
<scope>runtime</scope>
</dependencySet>
</dependencySets>
</assembly>

View File

@ -0,0 +1,47 @@
package com.alibaba.datax.plugin.writer.restwriter;
/**
* @author: zhangyongxiang
* @date 2023/8/24 11:38
**/
public final class Key {
private Key() {}
public static final String URL = "url";
public static final String HTTP_METHOD = "method";
public static final String HTTP_SSL = "ssl";
public static final String HTTP_HEADERS = "headers";
public static final String HTTP_QUERY = "query";
public static final String MAX_RETRIES = "maxRetries";
public static final String BATCH_MODE = "batch";
public static final String BATCH_SIZE = "batchSize";
public static final String TASK_INDEX = "taskIndex";
public static final String FIELDS = "fields";
public static final String DEBUG = "debug";
public static final String FAIL_FAST = "failFast";
public static final String RATE_PER_TASK = "rate-per-task";
public static final String CLIENT = "client";
public static final String PREPROCESS = "preprocess";
public static final String POSTPROCESS = "postprocess";
public static final String ADDITIONAL_CONCURRENT = "concurrent";
public static final String ADDITIONAL_OPERATIONS = "operations";
}

View File

@ -0,0 +1,533 @@
package com.alibaba.datax.plugin.writer.restwriter;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.http.HttpException;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.writer.restwriter.conf.ClientConfig;
import com.alibaba.datax.plugin.writer.restwriter.conf.Field;
import com.alibaba.datax.plugin.writer.restwriter.conf.Process;
import com.alibaba.datax.plugin.writer.restwriter.handler.ObjectRecordConverter;
import com.alibaba.datax.plugin.writer.restwriter.handler.TypeHandlerRegistry;
import com.alibaba.datax.plugin.writer.restwriter.process.ProcessExecutor;
import com.alibaba.datax.plugin.writer.restwriter.process.ProcessFactory;
import com.alibaba.datax.plugin.writer.restwriter.validator.ConfigurationValidator;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONException;
import com.google.common.collect.Lists;
import dev.failsafe.Failsafe;
import dev.failsafe.FailsafeExecutor;
import dev.failsafe.RateLimiter;
import dev.failsafe.RetryPolicy;
import kong.unirest.HttpMethod;
import kong.unirest.HttpResponse;
import kong.unirest.HttpStatus;
import kong.unirest.JsonNode;
import kong.unirest.Unirest;
import kong.unirest.UnirestInstance;
import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;
import static com.alibaba.datax.plugin.writer.restwriter.Key.BATCH_MODE;
import static com.alibaba.datax.plugin.writer.restwriter.Key.BATCH_SIZE;
import static com.alibaba.datax.plugin.writer.restwriter.Key.CLIENT;
import static com.alibaba.datax.plugin.writer.restwriter.Key.DEBUG;
import static com.alibaba.datax.plugin.writer.restwriter.Key.FAIL_FAST;
import static com.alibaba.datax.plugin.writer.restwriter.Key.FIELDS;
import static com.alibaba.datax.plugin.writer.restwriter.Key.HTTP_HEADERS;
import static com.alibaba.datax.plugin.writer.restwriter.Key.HTTP_METHOD;
import static com.alibaba.datax.plugin.writer.restwriter.Key.HTTP_QUERY;
import static com.alibaba.datax.plugin.writer.restwriter.Key.HTTP_SSL;
import static com.alibaba.datax.plugin.writer.restwriter.Key.MAX_RETRIES;
import static com.alibaba.datax.plugin.writer.restwriter.Key.RATE_PER_TASK;
import static com.alibaba.datax.plugin.writer.restwriter.Key.TASK_INDEX;
import static com.alibaba.datax.plugin.writer.restwriter.Key.URL;
import static com.alibaba.datax.plugin.writer.restwriter.RestWriterErrorCode.HTTP_CLIENT_CONFIG_INVALID_EXCEPTION;
import static com.alibaba.datax.plugin.writer.restwriter.RestWriterErrorCode.RUNTIME_EXCEPTION;
import static com.alibaba.datax.plugin.writer.restwriter.process.ProcessCategory.POSTPROCESS;
import static com.alibaba.datax.plugin.writer.restwriter.process.ProcessCategory.PREPROCESS;
import static java.util.Objects.isNull;
import static java.util.Objects.nonNull;
import static kong.unirest.ContentType.APPLICATION_JSON;
import static kong.unirest.HeaderNames.CONTENT_TYPE;
import static org.apache.commons.collections4.CollectionUtils.isNotEmpty;
import static org.apache.commons.collections4.MapUtils.emptyIfNull;
import static org.apache.commons.lang3.StringUtils.prependIfMissing;
/**
* @author zhangyongxiang
* @date 2023-08-23
*/
@Slf4j
public class RestWriter extends Writer {
public static final int DEFAULT_MAX_RETRIES_VALUE = 3;
public static final int DEFAULT_BATCH_SIZE_VALUE = 100;
public static final boolean DEFAULT_BATCH_MODE_VALUE = false;
public static final boolean DEFAULT_DEBUG_VALUE = false;
public static final boolean DEFAULT_FAIL_FAST_VALUE = false;
public static final boolean DEFAULT_SSL_VALUE = false;
@Slf4j
@EqualsAndHashCode(callSuper = true)
public static class Job extends Writer.Job {
private Configuration originalConfig;
private long startTime;
private long endTime;
private Process preprocess;
private Process postprocess;
private ProcessFactory processFactory;
private ProcessExecutor processExecutor;
@Override
public void init() {
this.originalConfig = super.getPluginJobConf();
this.validateParameter();
this.processFactory = new ProcessFactory(this.originalConfig);
this.preprocess = this.processFactory.createProcess(PREPROCESS);
this.postprocess = this.processFactory.createProcess(POSTPROCESS);
this.processExecutor = new ProcessExecutor();
log.info(
"{} job initialized, desc: {}, developer: {}, job conf: {}, job preprocess: {}, job postprocess: {}",
this.getPluginName(), this.getDescription(),
this.getDeveloper(), this.getPluginJobConf(),
this.preprocess, this.postprocess);
}
private void validateParameter() {
try {
new ConfigurationValidator().validate(this.originalConfig,
null);
} catch (final Exception se) {
throw DataXException.asDataXException(RUNTIME_EXCEPTION,
"an exception has occurred when validating parameters",
se);
}
}
@Override
public void preCheck() {
log.info("job {} pre check will not be called",
this.getPluginName());
}
@Override
public void prepare() {
this.startTime = System.currentTimeMillis();
if (nonNull(this.preprocess)
&& isNotEmpty(this.preprocess.getOperations())) {
this.processExecutor.execute(this.preprocess);
log.info(
"{} job prepared successfully after preprocess, job conf: {}, preprocess: {}",
this.getPluginName(), this.originalConfig,
this.preprocess);
} else {
log.info(
"{} job prepared without need any of preprocess, job conf: {}",
this.getPluginName(), this.originalConfig);
}
}
@Override
public List<Configuration> split(final int mandatoryNumber) {
int finalMandatoryNumber = mandatoryNumber;
if (finalMandatoryNumber < 1) {
log.warn(
"mandatory number {} less than one, reset it to be one",
mandatoryNumber);
finalMandatoryNumber = 1;
}
final List<Configuration> configurations = Lists
.newArrayListWithExpectedSize(finalMandatoryNumber);
for (int index = 0; index < finalMandatoryNumber; index++) {
final Configuration taskConf = this.originalConfig.clone();
taskConf.set(TASK_INDEX, index);
configurations.add(taskConf);
log.info(
"{} job split into {} tasks, current task: {}, desc: {}, developer: {}, task conf: {}",
this.getPluginName(), finalMandatoryNumber, index,
this.getDescription(), this.getDeveloper(), taskConf);
}
return configurations;
}
@Override
public void post() {
this.endTime = System.currentTimeMillis();
if (nonNull(this.postprocess)
&& isNotEmpty(this.postprocess.getOperations())) {
this.processExecutor.execute(this.postprocess);
log.info(
"{} postprocess execute successfully, postprocess: {}",
this.getPluginName(), this.postprocess);
}
log.info(
"job {} execute to end, start from {}, end to {}, total time: {}",
this.getPluginName(),
Instant.ofEpochMilli(this.startTime)
.atZone(ZoneId.systemDefault()).toLocalDateTime(),
Instant.ofEpochMilli(this.endTime)
.atZone(ZoneId.systemDefault()).toLocalDateTime(),
Duration.ofMillis(this.endTime - this.startTime));
}
@Override
public void destroy() {
log.info("job {} destroy, nothing to clean up",
this.getPluginName());
}
}
@Slf4j
@EqualsAndHashCode(callSuper = true)
public static class Task extends Writer.Task {
private long startTime;
private long endTime;
private int successCount;
private int failCount;
private UnirestInstance unirest;
private ObjectRecordConverter converter;
private FailsafeExecutor<HttpResponse<JsonNode>> executor;
private Configuration writerSliceConfig;
private Integer taskIndex;
private String url;
private HttpMethod method;
private boolean ssl;
private Map<String, String> headers;
private Map<String, Object> query;
private Integer maxRetries;
private boolean batchMode;
private Integer batchSize;
private List<Field> fields;
private boolean debug;
private boolean failFast;
private Integer ratePerTask;
private Duration avgWriteTime;
private final ClientConfig clientConfig = new ClientConfig();
@Override
public void init() {
this.writerSliceConfig = this.getPluginJobConf();
this.taskIndex = this.writerSliceConfig.getInt(TASK_INDEX);
this.url = this.writerSliceConfig.getString(URL).trim();
this.method = HttpMethod
.valueOf(this.writerSliceConfig.getString(HTTP_METHOD));
this.ssl = this.writerSliceConfig.getBool(HTTP_SSL,
DEFAULT_SSL_VALUE);
this.headers = emptyIfNull(
this.writerSliceConfig.getMap(HTTP_HEADERS, String.class));
this.query = emptyIfNull(this.writerSliceConfig.getMap(HTTP_QUERY));
this.maxRetries = this.writerSliceConfig.getInt(MAX_RETRIES,
DEFAULT_MAX_RETRIES_VALUE);
this.batchMode = this.writerSliceConfig.getBool(BATCH_MODE,
DEFAULT_BATCH_MODE_VALUE);
this.batchSize = this.writerSliceConfig.getInt(BATCH_SIZE,
DEFAULT_BATCH_SIZE_VALUE);
this.fields = this.writerSliceConfig.getListWithJson(FIELDS,
Field.class);
this.debug = this.writerSliceConfig.getBool(DEBUG,
DEFAULT_DEBUG_VALUE);
this.failFast = this.writerSliceConfig.getBool(FAIL_FAST,
DEFAULT_FAIL_FAST_VALUE);
this.ratePerTask = this.writerSliceConfig.getInt(RATE_PER_TASK);
final Object client = this.writerSliceConfig.get(CLIENT);
if (nonNull(client)) {
try {
final ClientConfig config = JSON.parseObject(
JSON.toJSONString(client), ClientConfig.class);
if (nonNull(config)) {
if (config.getMaxPerRoute() > 0) {
this.clientConfig
.setMaxPerRoute(config.getMaxPerRoute());
}
if (config.getMaxTotal() > 0) {
this.clientConfig.setMaxTotal(config.getMaxTotal());
}
}
} catch (final JSONException e) {
throw DataXException.asDataXException(
HTTP_CLIENT_CONFIG_INVALID_EXCEPTION,
String.format("client config: %s",
JSON.toJSONString(client)));
}
}
log.info(
"{} task {} initialized, desc: {}, developer: {}, task conf: {}",
this.getPluginName(), this.taskIndex, this.getDescription(),
this.getDeveloper(), this.writerSliceConfig);
}
@Override
public void prepare() {
if (this.url.startsWith("http://")) {
this.ssl = false;
} else if (this.url.startsWith("https://")) {
this.ssl = true;
} else if (this.ssl) {
this.url = prependIfMissing(this.url, "https://");
} else {
this.url = prependIfMissing(this.url, "http://");
}
this.unirest = Unirest.spawnInstance();
if (!emptyIfNull(this.headers).isEmpty()) {
this.headers.forEach(this.unirest.config()::addDefaultHeader);
}
if (!this.headers.containsKey(CONTENT_TYPE)) {
this.unirest.config().addDefaultHeader(CONTENT_TYPE,
APPLICATION_JSON.getMimeType());
}
this.unirest.config().addShutdownHook(true);
this.unirest.config().defaultBaseUrl(this.url);
this.unirest.config().verifySsl(false);
this.unirest.config().automaticRetries(false);
this.unirest.config().concurrency(this.clientConfig.getMaxTotal(),
this.clientConfig.getMaxPerRoute());
this.converter = new ObjectRecordConverter(
new TypeHandlerRegistry(), this.fields);
final RetryPolicy<HttpResponse<JsonNode>> retryPolicy = RetryPolicy
.<HttpResponse<JsonNode>>builder()
.handleResultIf(response -> response
.getStatus() >= HttpStatus.BAD_REQUEST)
.onFailedAttempt(e -> log.error(
"write failed, attempt execution times: {},"
+ " possible result response code: {}, possible result response body: {}",
e.getAttemptCount() + 1,
Optional.ofNullable(e.getLastResult())
.map(HttpResponse::getStatusText)
.orElse(null),
Optional.ofNullable(e.getLastResult())
.map(HttpResponse::getBody).orElse(null),
e.getLastException()))
.onRetry(e -> log.warn("failure #{}th retrying.",
e.getAttemptCount()))
.onRetriesExceeded(e -> log.error(
"fail to write. max retries exceeded. cause: {}",
nonNull(e.getException())
? e.getException().getMessage()
: e.getResult().getStatusText(),
e.getException()))
.build();
if (isNull(this.ratePerTask) || this.ratePerTask <= 0) {
this.executor = Failsafe.with(retryPolicy);
} else {
this.executor = Failsafe
.with(RateLimiter
.<HttpResponse<JsonNode>>smoothBuilder(
this.ratePerTask, Duration.ofSeconds(1))
.withMaxWaitTime(Duration.ofDays(365)).build())
.compose(retryPolicy);
}
this.startTime = System.currentTimeMillis();
this.successCount = 0;
this.failCount = 0;
log.info(
"{} task {} prepared, desc: {}, developer: {}, task conf: {}",
this.getPluginName(), this.taskIndex, this.getDescription(),
this.getDeveloper(), this.writerSliceConfig);
log.info("http client config: {}", this.clientConfig);
}
@Override
public void preCheck() {
log.info(
"{} task {} check will not be called, desc: {}, developer: {}, task conf {}",
this.getPluginName(), this.taskIndex, this.getDescription(),
this.getDeveloper(), this.writerSliceConfig);
}
@Override
public void startWrite(final RecordReceiver lineReceiver) {
final List<Record> writerBuffer = new ArrayList<>(this.batchSize);
Record recordItem = null;
while ((recordItem = lineReceiver.getFromReader()) != null) {
if (this.batchMode) {
writerBuffer.add(recordItem);
if (writerBuffer.size() >= this.batchSize) {
this.doWrite(writerBuffer);
writerBuffer.clear();
}
} else {
this.doWrite(recordItem);
}
if (this.debug) {
final int bound = recordItem.getColumnNumber();
for (int index = 0; index < bound; index++) {
final Column column = recordItem.getColumn(index);
log.info(
"colum type: {}, column type class: {}, raw data: {}, raw data class: {}, byte size: {}",
column.getType(),
column.getType().getClass().getName(),
column.getRawData(),
Optional.ofNullable(column.getRawData())
.map(Object::getClass)
.map(Class::getName).orElse(null),
column.getByteSize());
}
}
}
if (this.batchMode && !writerBuffer.isEmpty()) {
this.doWrite(writerBuffer);
writerBuffer.clear();
}
}
@Override
public void post() {
this.endTime = System.currentTimeMillis();
this.unirest.close();
log.info(
"job {} task {} execute to end, start from {}, end to {}, total time: {}, avg write time: {}, "
+ "total count: {}, fail count: {}",
this.getPluginName(), this.taskIndex,
Instant.ofEpochMilli(this.startTime)
.atZone(ZoneId.systemDefault()).toLocalDateTime(),
Instant.ofEpochMilli(this.endTime)
.atZone(ZoneId.systemDefault()).toLocalDateTime(),
Duration.ofMillis(this.endTime - this.startTime),
this.avgWriteTime, this.successCount + this.failCount,
this.failCount);
if (this.failCount > 0) {
log.error("job {} task {} execute to end, fail count: {}",
this.getPluginName(), this.taskIndex, this.failCount);
}
}
@Override
public void destroy() {
log.info("job {} task {} destroy", this.getPluginName(),
this.taskIndex);
}
private void doWrite(final Record item) {
try {
final HttpResponse<JsonNode> resp = this.executor.get(ctx -> {
final Map<String, Object> body = this.converter
.convert(item);
return executeRequest(1, body);
});
if (resp.getStatus() >= HttpStatus.BAD_REQUEST) {
throw new HttpException(resp.getStatusText());
}
} catch (final Exception e) {
if (this.failFast) {
throw DataXException.asDataXException(RUNTIME_EXCEPTION,
e.getMessage(), e);
}
}
}
private void doWrite(final List<Record> records) {
try {
final HttpResponse<JsonNode> resp = this.executor.get(ctx -> {
final List<Map<String, Object>> body = records.stream()
.map(this.converter::convert)
.collect(Collectors.toList());
return executeRequest(records.size(), body);
});
if (resp.getStatus() >= HttpStatus.BAD_REQUEST) {
throw new HttpException(resp.getStatusText());
}
} catch (final Exception e) {
if (this.failFast) {
throw DataXException.asDataXException(RUNTIME_EXCEPTION,
e.getMessage(), e);
}
}
}
private HttpResponse<JsonNode> executeRequest(final int itemCount,
final Object body) {
final long writeStartTime = System.nanoTime();
return this.unirest.request(this.method.name(), "")
.queryString(this.query).body(body).asJson()
.ifSuccess(response -> {
this.successCount += itemCount;
final long writeEndTime = System.nanoTime();
log.info(
"the {}th record has been written successfully, consume time: {}",
this.successCount + this.failCount,
Duration.ofNanos(
writeEndTime - writeStartTime));
if (isNull(this.avgWriteTime)) {
this.avgWriteTime = Duration
.ofNanos(writeEndTime - writeStartTime);
} else {
this.avgWriteTime = Duration.ofNanos(
(this.avgWriteTime.toNanos() + writeEndTime
- writeStartTime) / 2);
}
}).ifFailure(response -> {
this.failCount += itemCount;
log.error(
"data write failed, http code: {}, message: {} , optional reason: {}, data info: {} ",
response.getStatus(), response.getStatusText(),
response.getBody(), body);
response.getParsingError().ifPresent(e -> {
log.error("original body: {}, parsing exception",
e.getOriginalBody(), e);
throw e;
});
});
}
}
}

View File

@ -0,0 +1,68 @@
package com.alibaba.datax.plugin.writer.restwriter;
import com.alibaba.datax.common.spi.ErrorCode;
/**
* @author: zhangyongxiang
* @date 2023/8/24 11:34
**/
public enum RestWriterErrorCode implements ErrorCode {
/**
* runtime exception
*/
RUNTIME_EXCEPTION("RestWriter-00", "运行时异常"),
EMPTY_RECORD_EXCEPTION("RestWriter-01", "空record数据"),
EMPTY_FIELD_EXCEPTION("RestWriter-02", "你需要配置至少一个field"),
FIELD_MISMATCH_WITH_COLUMN_EXCEPTION("RestWriter-03",
"field数量与column数量不匹配"),
FIELD_CLASS_NOT_FOUND_EXCEPTION("RestWriter-04", "配置的field class不存在"),
URL_INVALID_EXCEPTION("RestWriter-05", "您填写的URL参数值不合法"),
METHOD_INVALID_EXCEPTION("RestWriter-06", "您填写的method参数值不合法"),
RATE_PER_TASK_INVALID_EXCEPTION("RestWriter-07", "您填写的rate-per-task参数值不合法"),
BATCH_SIZE_INVALID_EXCEPTION("RestWriter-08", "您填写的batchSize参数值不合法"),
MAX_RETRIES_INVALID_EXCEPTION("RestWriter-09", "您填写的maxRetries参数值不合法"),
FIELDS_INVALID_EXCEPTION("RestWriter-10", "您填写的fields参数值不合法"),
TYPE_HANDLER_NOT_FOUND_EXCEPTION("RestWriter-11", "没有找到合适的typehandler"),
HTTP_CLIENT_CONFIG_INVALID_EXCEPTION("RestWriter-12", "http client配置不合法"),
PREPROCESS_OPERATION_ERROR("RestWriter-13", "预处理失败"),
POSTPROCESS_OPERATION_ERROR("RestWriter-14", "后处理失败"),
CONCURRENT_INVALID_EXCEPTION("RestWriter-15", "并发参数不合法"),
OPERATION_RESULT_ERROR_EXCEPTION("RestWriter-16", "返回结果错误"),
EXPRESSION_EVALUATE_FAILED_EXCEPTION("RestWriter-17", "谓词表达式计算错误");
private final String code;
private final String description;
RestWriterErrorCode(final String code, final String description) {
this.code = code;
this.description = description;
}
@Override
public String getCode() {
return this.code;
}
@Override
public String getDescription() {
return this.description;
}
}

View File

@ -0,0 +1,19 @@
package com.alibaba.datax.plugin.writer.restwriter.conf;
import lombok.Data;
import static kong.unirest.Config.DEFAULT_MAX_CONNECTIONS;
import static kong.unirest.Config.DEFAULT_MAX_PER_ROUTE;
/**
* @name: zhangyongxiang
* @author: zhangyongxiang@baidu.com
**/
@Data
public class ClientConfig {
private int maxTotal = DEFAULT_MAX_CONNECTIONS;
private int maxPerRoute = DEFAULT_MAX_PER_ROUTE;
}

View File

@ -0,0 +1,17 @@
package com.alibaba.datax.plugin.writer.restwriter.conf;
import lombok.Data;
/**
* @author: zhangyongxiang
* @date 2023/8/24 21:58
**/
@Data
public class Field {
private String name;
private String type;
private String format;
}

View File

@ -0,0 +1,32 @@
package com.alibaba.datax.plugin.writer.restwriter.conf;
import java.util.Map;
import com.google.common.collect.Maps;
import lombok.Data;
/**
* @name: zhangyongxiang
* @author: zhangyongxiang@baidu.com
**/
@Data
public class Operation {
private String url;
private String method;
private Map<String, String> headers = Maps.newHashMap();
private String body;
private boolean base64;
private boolean debug;
private int maxRetries = 1;
private String jsonExpression;
}

View File

@ -0,0 +1,29 @@
package com.alibaba.datax.plugin.writer.restwriter.conf;
import java.util.List;
import com.alibaba.datax.plugin.writer.restwriter.process.ProcessCategory;
import com.google.common.collect.Lists;
import lombok.Data;
import static com.alibaba.datax.plugin.writer.restwriter.process.ProcessCategory.PREPROCESS;
/**
* @name: zhangyongxiang
* @author: zhangyongxiang@baidu.com
**/
@Data
public class Process {
private boolean concurrent;
private ProcessCategory category = PREPROCESS;
private List<Operation> operations = Lists.newArrayList();
public Process(final ProcessCategory category) {
this.category = category;
}
}

View File

@ -0,0 +1,95 @@
package com.alibaba.datax.plugin.writer.restwriter.handler;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;
import org.apache.commons.lang3.ClassUtils;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.plugin.writer.restwriter.conf.Field;
import com.google.common.collect.Maps;
import static com.alibaba.datax.plugin.writer.restwriter.RestWriterErrorCode.EMPTY_FIELD_EXCEPTION;
import static com.alibaba.datax.plugin.writer.restwriter.RestWriterErrorCode.EMPTY_RECORD_EXCEPTION;
import static com.alibaba.datax.plugin.writer.restwriter.RestWriterErrorCode.FIELD_CLASS_NOT_FOUND_EXCEPTION;
import static com.alibaba.datax.plugin.writer.restwriter.RestWriterErrorCode.FIELD_MISMATCH_WITH_COLUMN_EXCEPTION;
import static com.alibaba.datax.plugin.writer.restwriter.RestWriterErrorCode.TYPE_HANDLER_NOT_FOUND_EXCEPTION;
import static java.util.Objects.isNull;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
/**
* @author: zhangyongxiang
* @date 2023/8/24 14:25
**/
public class ObjectRecordConverter
implements RecordConverter<Map<String, Object>> {
private final TypeHandlerRegistry registry;
private final List<Field> fields;
private final Map<String, Class<?>> fieldClasses;
public ObjectRecordConverter(final TypeHandlerRegistry registry,
final List<Field> fields) {
this.registry = registry;
this.fields = fields;
this.fieldClasses = new HashMap<>();
if (!fields.isEmpty()) {
fields.forEach(field -> {
if (isNotBlank(field.getType())) {
try {
this.fieldClasses.put(field.getName(),
ClassUtils.getClass(field.getType()));
} catch (final ClassNotFoundException e) {
throw DataXException.asDataXException(
FIELD_CLASS_NOT_FOUND_EXCEPTION,
String.format("field %s type %s not found",
field.getName(), field.getType()),
e);
}
} else {
this.fieldClasses.put(field.getName(), Void.class);
}
});
} else {
throw DataXException.asDataXException(EMPTY_FIELD_EXCEPTION,
"you should configure at least one field");
}
}
@Override
public Map<String, Object> convert(final Record record) {
if (record.getColumnNumber() <= 0) {
throw DataXException.asDataXException(EMPTY_RECORD_EXCEPTION,
"record is empty");
}
if (this.fields.size() > record.getColumnNumber()) {
throw DataXException.asDataXException(
FIELD_MISMATCH_WITH_COLUMN_EXCEPTION,
"number of fields is less than number of columns of record");
}
final Map<String, Object> m = Maps.newHashMap();
IntStream.range(0, this.fields.size()).forEach(num -> {
final Column column = record.getColumn(num);
final Class<?> clazz = this.fieldClasses
.get(this.fields.get(num).getName());
final TypeHandler<?> typeHandler = this.registry
.getTypeHandler(column.getType(), clazz);
if (isNull(typeHandler)) {
throw DataXException.asDataXException(
TYPE_HANDLER_NOT_FOUND_EXCEPTION,
String.format(
"type handler not found for source type %s and target class %s",
column.getType().name(), clazz.getName()));
}
m.put(this.fields.get(num).getName(),
typeHandler.convert(column.getRawData()));
});
return m;
}
}

View File

@ -0,0 +1,14 @@
package com.alibaba.datax.plugin.writer.restwriter.handler;
import com.alibaba.datax.common.element.Record;
/**
* @author: zhangyongxiang
* @date 2023/8/24 14:24
**/
@FunctionalInterface
public interface RecordConverter<T> {
T convert(Record record);
}

View File

@ -0,0 +1,9 @@
package com.alibaba.datax.plugin.writer.restwriter.handler;
/**
* @author: zhangyongxiang
* @date 2023/8/24 21:03
**/
public interface TypeHandler<T> {
T convert(Object object);
}

View File

@ -0,0 +1,62 @@
package com.alibaba.datax.plugin.writer.restwriter.handler;
import java.time.LocalDateTime;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.plugin.writer.restwriter.handler.bool.BoolVoidTypeHandler;
import com.alibaba.datax.plugin.writer.restwriter.handler.bytes.BytesVoidTypeHandler;
import com.alibaba.datax.plugin.writer.restwriter.handler.date.DateLocalDateTimeTypeHandler;
import com.alibaba.datax.plugin.writer.restwriter.handler.date.DateVoidTypeHandler;
import com.alibaba.datax.plugin.writer.restwriter.handler.string.StringVoidTypeHandler;
import com.alibaba.datax.plugin.writer.restwriter.handler.typedouble.DoubleVoidTypeHandler;
import com.alibaba.datax.plugin.writer.restwriter.handler.typeint.IntVoidTypeHandler;
import com.alibaba.datax.plugin.writer.restwriter.handler.typelong.LongVoidTypeHandler;
import com.alibaba.datax.plugin.writer.restwriter.handler.typenull.NullVoidTypeHandler;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Table;
/**
* @author: zhangyongxiang
* @date 2023/8/24 21:03
**/
public class TypeHandlerRegistry {
private final Table<Column.Type, Class<?>, TypeHandler<?>> handlers = HashBasedTable
.create();
public TypeHandlerRegistry() {
registerDefault(Column.Type.INT, new IntVoidTypeHandler());
registerDefault(Column.Type.LONG, new LongVoidTypeHandler());
registerDefault(Column.Type.NULL, new NullVoidTypeHandler());
registerDefault(Column.Type.DOUBLE, new DoubleVoidTypeHandler());
registerDefault(Column.Type.STRING, new StringVoidTypeHandler());
registerDefault(Column.Type.BOOL, new BoolVoidTypeHandler());
registerDefault(Column.Type.DATE, new DateVoidTypeHandler());
registerDefault(Column.Type.BYTES, new BytesVoidTypeHandler());
register(Column.Type.DATE, LocalDateTime.class,
new DateLocalDateTimeTypeHandler());
}
// BAD, NULL, INT, LONG, DOUBLE, STRING, BOOL, DATE, BYTES
<T> void register(final Column.Type type, final Class<T> targetClass,
final TypeHandler<T> typeHandler) {
this.handlers.put(type, targetClass, typeHandler);
}
void registerDefault(final Column.Type type,
final TypeHandler<Object> typeHandler) {
this.handlers.put(type, Void.class, typeHandler);
}
<T> boolean hasTypeHandler(final Column.Type type,
final Class<T> targetClass) {
return this.handlers.contains(type, targetClass);
}
<T> TypeHandler<T> getTypeHandler(final Column.Type type,
final Class<T> targetClass) {
return (TypeHandler<T>) this.handlers.get(type, targetClass);
}
}

View File

@ -0,0 +1,17 @@
package com.alibaba.datax.plugin.writer.restwriter.handler.bool;
import com.alibaba.datax.plugin.writer.restwriter.handler.TypeHandler;
/**
* @author: zhangyongxiang
* @date 2023/8/24 21:46
**/
public class BoolVoidTypeHandler implements TypeHandler<Object> {
/**
* underlying type is Boolean
*/
@Override
public Object convert(final Object object) {
return object;
}
}

View File

@ -0,0 +1,19 @@
package com.alibaba.datax.plugin.writer.restwriter.handler.bytes;
import com.alibaba.datax.plugin.writer.restwriter.handler.TypeHandler;
/**
* @author: zhangyongxiang
* @date 2023/8/24 21:48
**/
public class BytesVoidTypeHandler implements TypeHandler<Object> {
/**
* underlying type is byte[]
*/
@Override
public Object convert(final Object object) {
return object;
}
}

View File

@ -0,0 +1,29 @@
package com.alibaba.datax.plugin.writer.restwriter.handler.date;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import com.alibaba.datax.plugin.writer.restwriter.handler.TypeHandler;
import static java.util.Objects.nonNull;
/**
* @author: zhangyongxiang
* @date 2023/8/24 21:49
**/
public class DateLocalDateTimeTypeHandler
implements TypeHandler<LocalDateTime> {
/**
* underlying type is Long
*/
@Override
public LocalDateTime convert(final Object object) {
if (nonNull(object)) {
return Instant.ofEpochMilli((Long) object)
.atZone(ZoneId.systemDefault()).toLocalDateTime();
}
return null;
}
}

View File

@ -0,0 +1,18 @@
package com.alibaba.datax.plugin.writer.restwriter.handler.date;
import com.alibaba.datax.plugin.writer.restwriter.handler.TypeHandler;
/**
* @author: zhangyongxiang
* @date 2023/8/24 21:47
**/
public class DateVoidTypeHandler implements TypeHandler<Object> {
/**
* underlying type is Long
*/
@Override
public Object convert(final Object object) {
return object;
}
}

View File

@ -0,0 +1,18 @@
package com.alibaba.datax.plugin.writer.restwriter.handler.string;
import com.alibaba.datax.plugin.writer.restwriter.handler.TypeHandler;
/**
* @author: zhangyongxiang
* @date 2023/8/24 21:46
**/
public class StringVoidTypeHandler implements TypeHandler<Object> {
/**
* underlying type is String
*/
@Override
public Object convert(final Object object) {
return object;
}
}

View File

@ -0,0 +1,18 @@
package com.alibaba.datax.plugin.writer.restwriter.handler.typedouble;
import com.alibaba.datax.plugin.writer.restwriter.handler.TypeHandler;
/**
* @author: zhangyongxiang
* @date 2023/8/24 21:45
**/
public class DoubleVoidTypeHandler implements TypeHandler<Object> {
/**
* underlying type is Double
*/
@Override
public Object convert(final Object object) {
return object;
}
}

View File

@ -0,0 +1,20 @@
package com.alibaba.datax.plugin.writer.restwriter.handler.typeint;
import com.alibaba.datax.plugin.writer.restwriter.handler.TypeHandler;
/**
* @author: zhangyongxiang
* @date 2023/8/24 21:22
**/
public class IntVoidTypeHandler implements TypeHandler<Object> {
/**
* underlying type is BigInteger
*/
@Override
public Object convert(final Object object) {
return object;
}
}

View File

@ -0,0 +1,18 @@
package com.alibaba.datax.plugin.writer.restwriter.handler.typelong;
import com.alibaba.datax.plugin.writer.restwriter.handler.TypeHandler;
/**
* @author: zhangyongxiang
* @date 2023/8/24 21:23
**/
public class LongVoidTypeHandler implements TypeHandler<Object> {
/**
* underlying type is BigInteger
*/
@Override
public Object convert(final Object object) {
return object;
}
}

View File

@ -0,0 +1,19 @@
package com.alibaba.datax.plugin.writer.restwriter.handler.typenull;
import com.alibaba.datax.plugin.writer.restwriter.handler.TypeHandler;
/**
* @author: zhangyongxiang
* @date 2023/8/24 21:20
**/
public class NullVoidTypeHandler implements TypeHandler<Object> {
/**
* unknown underlying type
*/
@Override
public Object convert(final Object object) {
return null;
}
}

View File

@ -0,0 +1,115 @@
package com.alibaba.datax.plugin.writer.restwriter.process;
import java.util.Map;
import org.springframework.asm.MethodVisitor;
import org.springframework.expression.AccessException;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.TypedValue;
import org.springframework.expression.spel.CodeFlow;
import org.springframework.expression.spel.CompilablePropertyAccessor;
import org.springframework.lang.NonNull;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
/**
* @version 1.0
* @name: zhangyongxiang
* @author: zhangyongxiang@baidu.com
* @date 2023/10/25 15:14
* @description:
**/
public class MapAccessor implements CompilablePropertyAccessor {
@Override
public Class<?>[] getSpecificTargetClasses() {
return new Class<?>[] { Map.class };
}
@Override
public boolean canRead(@NonNull final EvaluationContext context,
@Nullable final Object target, @NonNull final String name)
throws AccessException {
return (target instanceof Map
&& ((Map<?, ?>) target).containsKey(name));
}
@NonNull
@Override
public TypedValue read(@NonNull final EvaluationContext context,
@Nullable final Object target, @NonNull final String name)
throws AccessException {
Assert.state(target instanceof Map, "Target must be of type Map");
final Map<?, ?> map = (Map<?, ?>) target;
final Object value = map.get(name);
if (value == null && !map.containsKey(name)) {
throw new MapAccessException(name);
}
return new TypedValue(value);
}
@Override
public boolean canWrite(@NonNull final EvaluationContext context,
@Nullable final Object target, @NonNull final String name)
throws AccessException {
return true;
}
@Override
@SuppressWarnings("unchecked")
public void write(@NonNull final EvaluationContext context,
@Nullable final Object target, @NonNull final String name,
@Nullable final Object newValue) throws AccessException {
Assert.state(target instanceof Map, "Target must be a Map");
final Map<Object, Object> map = (Map<Object, Object>) target;
map.put(name, newValue);
}
@Override
public boolean isCompilable() {
return true;
}
@NonNull
@Override
public Class<?> getPropertyType() {
return Object.class;
}
@Override
public void generateCode(@NonNull final String propertyName,
@NonNull final MethodVisitor mv, final CodeFlow cf) {
final String descriptor = cf.lastDescriptor();
if (descriptor == null || !descriptor.equals("Ljava/util/Map")) {
if (descriptor == null) {
cf.loadTarget(mv);
}
CodeFlow.insertCheckCast(mv, "Ljava/util/Map");
}
mv.visitLdcInsn(propertyName);
mv.visitMethodInsn(INVOKEINTERFACE, "java/util/Map", "get",
"(Ljava/lang/Object;)Ljava/lang/Object;", true);
}
/**
* Exception thrown from {@code read} in order to reset a cached
* PropertyAccessor, allowing other accessors to have a try.
*/
@SuppressWarnings("serial")
private static class MapAccessException extends AccessException {
private final String key;
public MapAccessException(final String key) {
super("");
this.key = key;
}
@Override
public String getMessage() {
return "Map does not contain a value for key '" + this.key + "'";
}
}
}

View File

@ -0,0 +1,16 @@
package com.alibaba.datax.plugin.writer.restwriter.process;
/**
* @name: zhangyongxiang
* @author: zhangyongxiang@baidu.com
**/
public class OperationExecutionFailException extends RuntimeException {
private static final long serialVersionUID = 2848134252562605007L;
public OperationExecutionFailException(final String message,
final Throwable cause) {
super(message, cause);
}
}

View File

@ -0,0 +1,29 @@
package com.alibaba.datax.plugin.writer.restwriter.process;
import com.alibaba.datax.plugin.writer.restwriter.Key;
import lombok.Getter;
/**
* @name: zhangyongxiang
* @author: zhangyongxiang@baidu.com
**/
@Getter
public enum ProcessCategory {
/**
* pre processing
*/
PREPROCESS(Key.PREPROCESS),
/**
* post processing
*/
POSTPROCESS(Key.POSTPROCESS);
private String key;
ProcessCategory(final String key) {
this.key = key;
}
}

View File

@ -0,0 +1,275 @@
package com.alibaba.datax.plugin.writer.restwriter.process;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Base64;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.BooleanUtils;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.plugin.writer.restwriter.conf.Operation;
import com.alibaba.datax.plugin.writer.restwriter.conf.Process;
import dev.failsafe.Failsafe;
import dev.failsafe.RetryPolicy;
import kong.unirest.HttpMethod;
import kong.unirest.HttpRequest;
import kong.unirest.HttpRequestWithBody;
import kong.unirest.HttpResponse;
import kong.unirest.JsonObjectMapper;
import kong.unirest.ObjectMapper;
import kong.unirest.Unirest;
import kong.unirest.UnirestInstance;
import lombok.extern.slf4j.Slf4j;
import static com.alibaba.datax.plugin.writer.restwriter.RestWriterErrorCode.EXPRESSION_EVALUATE_FAILED_EXCEPTION;
import static com.alibaba.datax.plugin.writer.restwriter.RestWriterErrorCode.OPERATION_RESULT_ERROR_EXCEPTION;
import static com.alibaba.datax.plugin.writer.restwriter.RestWriterErrorCode.POSTPROCESS_OPERATION_ERROR;
import static com.alibaba.datax.plugin.writer.restwriter.RestWriterErrorCode.PREPROCESS_OPERATION_ERROR;
import static com.alibaba.datax.plugin.writer.restwriter.process.ProcessCategory.PREPROCESS;
import static java.util.Objects.nonNull;
import static java.util.Optional.ofNullable;
import static java.util.concurrent.ForkJoinPool.defaultForkJoinWorkerThreadFactory;
import static kong.unirest.ContentType.APPLICATION_JSON;
import static kong.unirest.HeaderNames.CONTENT_TYPE;
import static kong.unirest.HttpMethod.GET;
import static kong.unirest.HttpStatus.MULTIPLE_CHOICE;
import static org.apache.commons.collections4.CollectionUtils.isNotEmpty;
import static org.apache.commons.collections4.MapUtils.emptyIfNull;
import static org.apache.commons.lang3.StringUtils.EMPTY;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
/**
* @name: zhangyongxiang
* @author: zhangyongxiang@baidu.com
**/
@Slf4j
public class ProcessExecutor {
private final UnirestInstance unirest;
private final Executor executor;
private final ObjectMapper json;
private final ExpressionParser expressionParser;
public ProcessExecutor() {
this(new ForkJoinPool(Runtime.getRuntime().availableProcessors(),
defaultForkJoinWorkerThreadFactory, null, true));
}
public ProcessExecutor(final Executor executor) {
this.executor = executor;
this.json = new JsonObjectMapper();
this.expressionParser = new SpelExpressionParser();
this.unirest = Unirest.spawnInstance();
this.unirest.config().addShutdownHook(true);
this.unirest.config().verifySsl(false);
this.unirest.config().automaticRetries(false);
this.unirest.config()
.connectTimeout((int) Duration.ofHours(1).toMillis());
this.unirest.config()
.socketTimeout((int) Duration.ofHours(1).toMillis());
}
public void execute(final Process process) {
if (nonNull(process) && isNotEmpty(process.getOperations())) {
if (process.isConcurrent() && process.getOperations().size() > 1) {
CompletableFuture.allOf(process
.getOperations().stream().map(
operation -> CompletableFuture.runAsync(
() -> executeWithRetry(operation,
process.getCategory()),
this.executor))
.toArray(CompletableFuture[]::new)).exceptionally(e -> {
if (process.getCategory() == PREPROCESS) {
throw DataXException.asDataXException(
PREPROCESS_OPERATION_ERROR,
e.getMessage(), e);
} else {
throw DataXException.asDataXException(
POSTPROCESS_OPERATION_ERROR,
e.getMessage(), e);
}
}).join();
} else {
process.getOperations()
.forEach(operation -> executeWithRetry(operation,
process.getCategory()));
}
}
}
public HttpResponse<String> executeWithRetry(final Operation operation,
final ProcessCategory category) {
if (operation.getMaxRetries() > 1) {
final RetryPolicy<HttpResponse<String>> retryPolicy = RetryPolicy
.<HttpResponse<String>>builder()
.withMaxRetries(operation.getMaxRetries())
.handleResultIf(
response -> response.getStatus() >= MULTIPLE_CHOICE)
.withBackoff(1, 5, ChronoUnit.SECONDS)
.onFailedAttempt(e -> log.error(
"{} operation execute failed, attempt execution times: {},"
+ " possible result response code: {}, possible result response body: {}",
category, e.getAttemptCount(),
ofNullable(e.getLastResult())
.map(HttpResponse::getStatusText)
.orElse(null),
ofNullable(e.getLastResult())
.map(HttpResponse::getBody).orElse(null),
e.getLastException()))
.onRetry(e -> log.warn(
"{} operation failure #{}th retrying.", category,
e.getAttemptCount()))
.onRetriesExceeded(e -> log.error(
"fail to execute operation {}. max retries exceeded. cause: {}",
operation,
nonNull(e.getException())
? e.getException().getMessage()
: e.getResult().getStatusText(),
e.getException()))
.build();
return Failsafe.with(retryPolicy).get(
() -> executeWithExpectedResponse(operation, category));
} else {
return executeWithExpectedResponse(operation, category);
}
}
public HttpResponse<String> executeWithExpectedResponse(
final Operation operation, final ProcessCategory category) {
final HttpResponse<String> response = execute(operation, category);
if (response.getHeaders().containsKey(CONTENT_TYPE)
&& response.getHeaders().get(CONTENT_TYPE).stream()
.anyMatch(contentType -> contentType
.contains(APPLICATION_JSON.getMimeType()))
&& isNotBlank(operation.getJsonExpression())) {
if (isBlank(response.getBody())) {
log.warn("response body is empty, operation: {}", operation);
throw DataXException.asDataXException(
EXPRESSION_EVALUATE_FAILED_EXCEPTION,
String.format("operation %s return empty response body",
operation.getUrl()));
}
try {
final Object bodyResponse = json.readValue(response.getBody(),
Map.class);
if (operation.isDebug()) {
log.info(
"operation {} return response body: {}, deserialized value {}, class {}",
operation.getUrl(), operation.getBody(),
bodyResponse, bodyResponse.getClass().getName());
}
final StandardEvaluationContext context = new StandardEvaluationContext(
bodyResponse);
context.addPropertyAccessor(new MapAccessor());
if (!BooleanUtils.toBoolean(expressionParser
.parseExpression(operation.getJsonExpression())
.getValue(context, boolean.class))) {
throw DataXException.asDataXException(
OPERATION_RESULT_ERROR_EXCEPTION,
String.format(
"operation return result is not right according the json expression,"
+ " result %s, expression: %s",
bodyResponse,
operation.getJsonExpression()));
}
} catch (final Exception e) {
log.error(
"body {} can't be deserialized or can't be evaluated against json expression {}, operation: {}",
response.getBody(), operation.getJsonExpression(),
operation);
throw DataXException.asDataXException(
EXPRESSION_EVALUATE_FAILED_EXCEPTION,
String.format(
"operation result can't be deserialized or evaluated failed,"
+ " result %s, expression: %s",
response.getBody(),
operation.getJsonExpression()),
e);
}
}
return response;
}
/**
* @param operation operations
* @param category operations category
* @return response
*/
public HttpResponse<String> execute(final Operation operation,
final ProcessCategory category) {
HttpRequestWithBody requestBuilder = this.unirest
.request(operation.getMethod(), operation.getUrl());
if (MapUtils.isNotEmpty(operation.getHeaders())) {
for (final String header : operation.getHeaders().keySet()) {
requestBuilder = requestBuilder.header(header,
operation.getHeaders().get(header));
}
}
if (!emptyIfNull(operation.getHeaders()).containsKey(CONTENT_TYPE)) {
requestBuilder = requestBuilder.header(CONTENT_TYPE,
APPLICATION_JSON.getMimeType());
}
HttpRequest<?> request = requestBuilder;
if (HttpMethod.valueOf(operation.getMethod()) != GET
&& isNotBlank(operation.getBody())) {
if (operation.isBase64()) {
request = requestBuilder
.body(Base64.getDecoder().decode(operation.getBody()));
} else {
request = requestBuilder.body(operation.getBody());
}
if (operation.isDebug()) {
log.info(
"request {} method {} has body: {}, base64 encoded?{}, decoded body: {}",
operation.getUrl(), operation.getMethod(),
operation.getBody(), operation.isBase64(),
operation.isBase64()
? Base64.getDecoder()
.decode(operation.getBody())
: EMPTY);
}
}
final long startTime = System.nanoTime();
return request.asString().ifSuccess(response -> log.info(
"operation {} category: {} execute successfully,response: {}, body: {}, consume time: {}",
operation.getUrl(), category, response.getStatusText(),
response.getBody(),
Duration.ofNanos(System.nanoTime() - startTime)))
.ifFailure(response -> {
response.getParsingError().ifPresent(e -> {
log.error(
"operation {} category: {} execute failed, original body: {}, parsing exception",
operation.getUrl(),
category.name().toLowerCase(),
e.getOriginalBody(), e);
throw new OperationExecutionFailException(String.format(
"operation %s category: %s execute failed",
operation.getUrl(),
category.name().toLowerCase()), e);
});
log.error("operation {} category: {} execute failed, "
+ "http code: {}, message: {} , optional reason: {}",
operation.getUrl(), category.name().toLowerCase(),
response.getStatus(), response.getStatusText(),
response.getBody());
throw new OperationExecutionFailException(String.format(
"operation %s category: %s http execute failed, http code: %d, message: %s, optional reason: %s",
operation.getUrl(), category.name().toLowerCase(),
response.getStatus(), response.getStatusText(),
response.getBody()), null);
});
}
}

View File

@ -0,0 +1,37 @@
package com.alibaba.datax.plugin.writer.restwriter.process;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.writer.restwriter.conf.Operation;
import com.alibaba.datax.plugin.writer.restwriter.conf.Process;
import lombok.extern.slf4j.Slf4j;
import static com.alibaba.datax.plugin.writer.restwriter.Key.ADDITIONAL_CONCURRENT;
import static com.alibaba.datax.plugin.writer.restwriter.Key.ADDITIONAL_OPERATIONS;
import static java.util.Objects.nonNull;
/**
* Created by zhangyongxiang on 2023/10/12 19:28
**/
@Slf4j
public class ProcessFactory {
private final Configuration configuration;
public ProcessFactory(Configuration configuration) {
this.configuration = configuration;
}
public Process createProcess(final ProcessCategory category) {
final Process process = new Process(category);
final Configuration conf = this.configuration
.getConfiguration(category.getKey());
log.info("job configuration key: {}, conf: {}", category, conf);
if (nonNull(conf)) {
process.setConcurrent(conf.getBool(ADDITIONAL_CONCURRENT, false));
process.setOperations(conf.getListWithJson(ADDITIONAL_OPERATIONS,
Operation.class));
}
return process;
}
}

View File

@ -0,0 +1,111 @@
package com.alibaba.datax.plugin.writer.restwriter.validator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.writer.restwriter.conf.Field;
import com.google.common.collect.Sets;
import static com.alibaba.datax.plugin.writer.restwriter.Key.BATCH_SIZE;
import static com.alibaba.datax.plugin.writer.restwriter.Key.FIELDS;
import static com.alibaba.datax.plugin.writer.restwriter.Key.HTTP_HEADERS;
import static com.alibaba.datax.plugin.writer.restwriter.Key.HTTP_METHOD;
import static com.alibaba.datax.plugin.writer.restwriter.Key.MAX_RETRIES;
import static com.alibaba.datax.plugin.writer.restwriter.Key.POSTPROCESS;
import static com.alibaba.datax.plugin.writer.restwriter.Key.PREPROCESS;
import static com.alibaba.datax.plugin.writer.restwriter.Key.RATE_PER_TASK;
import static com.alibaba.datax.plugin.writer.restwriter.Key.URL;
import static com.alibaba.datax.plugin.writer.restwriter.RestWriter.DEFAULT_BATCH_SIZE_VALUE;
import static com.alibaba.datax.plugin.writer.restwriter.RestWriter.DEFAULT_MAX_RETRIES_VALUE;
import static com.alibaba.datax.plugin.writer.restwriter.RestWriterErrorCode.BATCH_SIZE_INVALID_EXCEPTION;
import static com.alibaba.datax.plugin.writer.restwriter.RestWriterErrorCode.EMPTY_FIELD_EXCEPTION;
import static com.alibaba.datax.plugin.writer.restwriter.RestWriterErrorCode.FIELDS_INVALID_EXCEPTION;
import static com.alibaba.datax.plugin.writer.restwriter.RestWriterErrorCode.MAX_RETRIES_INVALID_EXCEPTION;
import static com.alibaba.datax.plugin.writer.restwriter.RestWriterErrorCode.RATE_PER_TASK_INVALID_EXCEPTION;
import static java.util.Objects.nonNull;
import static org.apache.commons.collections4.CollectionUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isBlank;
/**
* @author: zhangyongxiang
* @date 2023/8/24 18:32
**/
public class ConfigurationValidator
implements ParameterValidator<Configuration> {
private final ParameterValidator<String> urlValidator;
private final ParameterValidator<String> methodValidator;
private final ParameterValidator<Map<String, Object>> headersValidator;
private final ParameterValidator<Configuration> processValidator;
public ConfigurationValidator() {
this.urlValidator = new UrlParameterValidator();
this.methodValidator = new MethodParameterValidator();
this.headersValidator = new HeadersParameterValidator();
this.processValidator = new ProcessValidator(this.urlValidator,
this.methodValidator);
}
@Override
public void validateImmediateValue(final Configuration parameter) {
this.urlValidator.validate(parameter, URL);
this.methodValidator.validate(parameter, HTTP_METHOD);
this.headersValidator.validate(parameter, HTTP_HEADERS);
final Integer maxRetries = parameter.getInt(MAX_RETRIES,
DEFAULT_MAX_RETRIES_VALUE);
if (maxRetries <= 0) {
throw DataXException.asDataXException(MAX_RETRIES_INVALID_EXCEPTION,
"maxRetries parameter must be greater than 0");
}
final Integer batchSize = parameter.getInt(BATCH_SIZE,
DEFAULT_BATCH_SIZE_VALUE);
if (batchSize <= 0) {
throw DataXException.asDataXException(BATCH_SIZE_INVALID_EXCEPTION,
"batchSize parameter must be greater than 0");
}
final Integer ratePerTask = parameter.getInt(RATE_PER_TASK);
if (nonNull(ratePerTask) && ratePerTask < 0) {
throw DataXException.asDataXException(
RATE_PER_TASK_INVALID_EXCEPTION,
"rate-per-task parameter must be greater than or equals 0");
}
final List<Field> fields = parameter.getListWithJson(FIELDS,
Field.class);
if (isEmpty(fields)) {
throw DataXException.asDataXException(EMPTY_FIELD_EXCEPTION,
"fields parameter must not be empty");
}
final Set<String> names = Sets.newHashSet();
fields.forEach(field -> {
if (isBlank(field.getName())) {
throw DataXException.asDataXException(FIELDS_INVALID_EXCEPTION,
"field name must not be empty or blank");
}
if (names.contains(field.getName())) {
throw DataXException.asDataXException(FIELDS_INVALID_EXCEPTION,
String.format("field name %s duplicate",
field.getName()));
} else {
names.add(field.getName());
}
});
this.processValidator.validate(parameter, PREPROCESS);
this.processValidator.validate(parameter, POSTPROCESS);
}
@Override
public void validate(final Configuration config, final String path) {
validateImmediateValue(config);
}
}

View File

@ -0,0 +1,22 @@
package com.alibaba.datax.plugin.writer.restwriter.validator;
import java.util.Map;
import com.alibaba.datax.common.util.Configuration;
/**
* Created by zhangyongxiang on 2023/8/25 8:09 PM
**/
public class HeadersParameterValidator
implements ParameterValidator<Map<String, Object>> {
@Override
public void validateImmediateValue(final Map<String, Object> parameter) {
}
@Override
public void validate(final Configuration config, final String path) {
validateImmediateValue(config.getMap(path));
}
}

View File

@ -0,0 +1,37 @@
package com.alibaba.datax.plugin.writer.restwriter.validator;
import java.util.List;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration;
import com.google.common.collect.Lists;
import static com.alibaba.datax.plugin.writer.restwriter.RestWriterErrorCode.METHOD_INVALID_EXCEPTION;
import static org.apache.commons.lang3.StringUtils.isBlank;
/**
* @author: zhangyongxiang
* @date 2023/8/24 18:39
**/
public class MethodParameterValidator implements ParameterValidator<String> {
private final List<String> method = Lists.newArrayList("get", "post", "put",
"patch", "delete");
@Override
public void validateImmediateValue(final String parameter) {
if (isBlank(parameter)) {
throw DataXException.asDataXException(METHOD_INVALID_EXCEPTION,
"需要填写method参数");
}
if (!this.method.contains(parameter.toLowerCase())) {
throw DataXException.asDataXException(METHOD_INVALID_EXCEPTION,
"method参数值不合法");
}
}
@Override
public void validate(final Configuration config, final String path) {
validateImmediateValue(config.getString(path));
}
}

View File

@ -0,0 +1,14 @@
package com.alibaba.datax.plugin.writer.restwriter.validator;
import com.alibaba.datax.common.util.Configuration;
/**
* @author: zhangyongxiang
* @date 2023/8/24 18:02
**/
public interface ParameterValidator<T> {
void validateImmediateValue(T parameter);
void validate(Configuration config, String path);
}

View File

@ -0,0 +1,60 @@
package com.alibaba.datax.plugin.writer.restwriter.validator;
import java.util.List;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.writer.restwriter.conf.Operation;
import static com.alibaba.datax.plugin.writer.restwriter.Key.ADDITIONAL_CONCURRENT;
import static com.alibaba.datax.plugin.writer.restwriter.Key.ADDITIONAL_OPERATIONS;
import static com.alibaba.datax.plugin.writer.restwriter.RestWriterErrorCode.CONCURRENT_INVALID_EXCEPTION;
import static java.util.Objects.nonNull;
import static org.apache.commons.collections4.ListUtils.emptyIfNull;
import static org.apache.commons.lang3.StringUtils.equalsIgnoreCase;
/**
* @name: zhangyongxiang
* @author: zhangyongxiang@baidu.com
**/
public class ProcessValidator implements ParameterValidator<Configuration> {
private final ParameterValidator<String> urlValidator;
private final ParameterValidator<String> methodValidator;
public ProcessValidator(ParameterValidator<String> urlValidator,
ParameterValidator<String> methodValidator) {
this.urlValidator = urlValidator;
this.methodValidator = methodValidator;
}
@Override
public void validateImmediateValue(final Configuration parameter) {
if (nonNull(parameter)) {
final String concurrent = parameter
.getString(ADDITIONAL_CONCURRENT);
if (nonNull(concurrent) && !equalsIgnoreCase(concurrent, "true")
&& !equalsIgnoreCase(concurrent, "false")) {
throw DataXException.asDataXException(
CONCURRENT_INVALID_EXCEPTION,
String.format(
"parameter concurrent %s is invalid, allow values: true,false",
concurrent));
}
List<Operation> operations = parameter
.getListWithJson(ADDITIONAL_OPERATIONS, Operation.class);
emptyIfNull(operations).forEach(operation -> {
this.urlValidator.validateImmediateValue(operation.getUrl());
this.methodValidator
.validateImmediateValue(operation.getMethod());
});
}
}
@Override
public void validate(final Configuration config, final String path) {
validateImmediateValue(config.getConfiguration(path));
}
}

View File

@ -0,0 +1,44 @@
package com.alibaba.datax.plugin.writer.restwriter.validator;
import java.util.regex.Pattern;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration;
import lombok.extern.slf4j.Slf4j;
import static com.alibaba.datax.plugin.writer.restwriter.RestWriterErrorCode.URL_INVALID_EXCEPTION;
import static org.apache.commons.lang3.StringUtils.isBlank;
/**
* @author: zhangyongxiang
* @date 2023/8/24 18:03
**/
@Slf4j
public class UrlParameterValidator implements ParameterValidator<String> {
private static final String URL_EXP = "^(https?|ftp|file)://[-a-zA-Z0-9+&@#/%?=~_|!:,.;]*[-a-zA-Z0-9+&@#/%=~_|]";
private final Pattern urlPattern;
public UrlParameterValidator() {
this.urlPattern = Pattern.compile(URL_EXP);
}
@Override
public void validate(final Configuration config, final String path) {
validateImmediateValue(config.getString(path));
}
@Override
public void validateImmediateValue(final String parameter) {
if (isBlank(parameter)) {
throw DataXException.asDataXException(URL_INVALID_EXCEPTION,
"需要填写url参数");
}
if (!urlPattern.matcher(parameter).find()) {
throw DataXException.asDataXException(URL_INVALID_EXCEPTION,
"url参数值不合法");
}
}
}

View File

@ -0,0 +1,6 @@
{
"name": "restwriter",
"class": "com.alibaba.datax.plugin.writer.restwriter.RestWriter",
"description": "useScene: prod. mechanism: use datax framework to transport data to http sink endpoint. warn: The more you know about the data, the less problems you encounter.",
"developer": "zhangyongxiang"
}

View File

@ -0,0 +1,83 @@
{
"name": "restwriter",
"parameter": {
"url": "http://localhost:8080/echo",
"method": "post",
"headers": {
"aaa": "bbbb"
},
"query": {
"test": "test"
},
"maxRetries": 3,
"batch": true,
"batchSize": 1000,
"fields": [
{
"name": "a"
},
{
"name": "b"
},
{
"name": "c",
"type": "java.time.LocalDatetime"
},
{
"name": "d"
},
{
"name": "e"
}
],
"debug": true,
"failFast": false,
"rate-per-task": 10,
"preprocess": {
"concurrent": true,
"operations": [
{
"url": "http://localhost:8080/echo",
"method": "post",
"headers": {
"aaa": "bbbb"
},
"body": "",
"debug": false
},
{
"url": "http://localhost:8080/echo",
"method": "post",
"headers": {
"aaa": "bbbb"
},
"body": "",
"debug": false
}
]
},
"postprocess": {
"concurrent": true,
"operations": [
{
"url": "http://localhost:8080/echo",
"method": "post",
"headers": {
"aaa": "bbbb"
},
"body": "",
"debug": false
},
{
"url": "http://localhost:8080/echo",
"method": "post",
"headers": {
"aaa": "bbbb"
},
"body": "",
"debug": false
}
]
}
}
}