diff --git a/restwriter/doc/restwriter.md b/restwriter/doc/restwriter.md new file mode 100644 index 00000000..0722be3c --- /dev/null +++ b/restwriter/doc/restwriter.md @@ -0,0 +1,242 @@ +# DataX REstWriter 说明 + +------------ + +## 1 快速介绍 + +RestWriter提供了Restful接口写入数据的能力。RestWriter服务的用户主要在于DataX开发、测试同学。 + +## 2 功能与限制 + +RestWriter实现了向Restful接口同步数据的通用能力,RestWriter如下几个方面约定: + +1. 通过请求题写入数据,支持post、put、patch3种http方法 + +2. 通过返回的http状态码判断写入成功或者失败 + +3. 支持失败重试与限流 + +我们不能做到: + +1. 不支持嵌套对象。 + +## 3 功能说明 + +### 3.1 配置样例 + +```json +{ + "setting": {}, + "job": { + "setting": { + "speed": { + "channel": 2 + } + }, + "content": [ + { + "reader": { + "name": "txtfilereader", + "parameter": { + "path": [ + "/home/haiwei.luo/case00/data" + ], + "encoding": "UTF-8", + "column": [ + { + "index": 0, + "type": "long" + }, + { + "index": 1, + "type": "boolean" + }, + { + "index": 2, + "type": "double" + }, + { + "index": 3, + "type": "string" + }, + { + "index": 4, + "type": "date", + "format": "yyyy.MM.dd" + } + ], + "fieldDelimiter": "," + } + }, + "writer": { + "name": "restwriter", + "parameter": { + "url": "http://localhost:8080/echo", + "method": "post", + "ssl": false, + "headers": { + "aaa": "bbbb" + }, + "query": { + "test": "test" + }, + "maxRetries": 3, + "batch": false, + "batchSize": 1000, + "fields": [ + { + "name": "id" + }, + { + "name": "jobGroup" + }, + { + "name": "jobId" + }, + { + "name": "executorAddress" + }, + { + "name": "executorHandler" + }, + { + "name": "executorParam" + }, + { + "name": "executorShardingParam" + }, + { + "name": "executorFailRetryCount" + }, + { + "name": "triggerTime", + "type": "java.time.LocalDateTime" + }, + { + "name": "triggerCode" + }, + { + "name": "triggerMsg" + }, + { + "name": "handleTime", + "type": "java.time.LocalDateTime" + }, + { + "name": "handleCode" + }, + { + "name": "handleMsg" + }, + { + "name": "alarmStatus" + } + ], + "print": true, + "failFast": false, + "rate-per-task": 10 + } + } + } + ] + } +} +``` + +### 3.2 参数说明 + +* **url** + + * 描述:restful API URL,经过转义后的URL,RestWriter不负责转义特殊字符,比如空格等。
+ + * 必选:是
+ + * 默认值:无
+ +* **method** + + * 描述:http method
+ + * 必选:是
+ + * 默认值:无
+ +* **ssl** + + * 描述:restful api是https/http,如果在url中给出protocol,则以url中为准
+ + * 必选:否
+ + * 默认值:false
+ +* **headers** + + * 描述:http请求头
+ + * 必选:否
+ + * 默认值:无
+ +* **query** + + * 描述:查询参数。
+ + * 必选:否
+ + * 默认值:无
+ +* **maxRetries** + + * 描述:最大失败重试次数。
+ + * 必选:否
+ + * 默认值:3
+ + +* **batch** + + * 描述:是否批量处理
+ + * 必选:否
+ + * 默认值:false
+ +* **batchSize** + + * 描述:批量处理最大条数
+ + * 必选:否
+ + * 默认值:100
+ +* **fields** + + * 描述字段信息
+ + * 必选:是
+ + * 默认值:无
+ +* **print** + + * 描述:是否打印debug信息。
+ + * 必选:否
+ + * 默认值:false
+ +### 3.3 类型转换 + + +## 4 性能报告 + +## 5 约束限制 + +略 + +## 6 FAQ + +略 + + diff --git a/restwriter/pom.xml b/restwriter/pom.xml new file mode 100755 index 00000000..2a8f31b8 --- /dev/null +++ b/restwriter/pom.xml @@ -0,0 +1,99 @@ + + 4.0.0 + + com.alibaba.datax + datax-all + 0.0.1-SNAPSHOT + + + restwriter + restwriter + RestWriter提供了通过HTTP请求写入的功能 + jar + + + + com.alibaba.datax + datax-common + ${datax-project-version} + + + slf4j-log4j12 + org.slf4j + + + + + org.slf4j + slf4j-api + + + ch.qos.logback + logback-classic + + + org.projectlombok + lombok + 1.18.28 + + + com.google.guava + guava + 32.1.2-jre + + + com.konghq + unirest-java + 3.14.5 + + + org.apache.commons + commons-collections4 + 4.4 + + + dev.failsafe + failsafe + 3.3.2 + + + org.springframework + spring-expression + 5.3.30 + + + + + + + + + maven-compiler-plugin + + ${jdk-version} + ${jdk-version} + ${project-sourceEncoding} + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + + diff --git a/restwriter/src/main/assembly/package.xml b/restwriter/src/main/assembly/package.xml new file mode 100755 index 00000000..ec0ce6a6 --- /dev/null +++ b/restwriter/src/main/assembly/package.xml @@ -0,0 +1,35 @@ + + + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/writer/restwriter + + + target/ + + restwriter-0.0.1-SNAPSHOT.jar + + plugin/writer/restwriter + + + + + + false + plugin/writer/restwriter/libs + runtime + + + diff --git a/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/Key.java b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/Key.java new file mode 100644 index 00000000..1eaedd87 --- /dev/null +++ b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/Key.java @@ -0,0 +1,47 @@ +package com.alibaba.datax.plugin.writer.restwriter; + +/** + * @author: zhangyongxiang + * @date 2023/8/24 11:38 + **/ +public final class Key { + + private Key() {} + + public static final String URL = "url"; + + public static final String HTTP_METHOD = "method"; + + public static final String HTTP_SSL = "ssl"; + + public static final String HTTP_HEADERS = "headers"; + + public static final String HTTP_QUERY = "query"; + + public static final String MAX_RETRIES = "maxRetries"; + + public static final String BATCH_MODE = "batch"; + + public static final String BATCH_SIZE = "batchSize"; + + public static final String TASK_INDEX = "taskIndex"; + + public static final String FIELDS = "fields"; + + public static final String DEBUG = "debug"; + + public static final String FAIL_FAST = "failFast"; + + public static final String RATE_PER_TASK = "rate-per-task"; + + public static final String CLIENT = "client"; + + public static final String PREPROCESS = "preprocess"; + + public static final String POSTPROCESS = "postprocess"; + + public static final String ADDITIONAL_CONCURRENT = "concurrent"; + + public static final String ADDITIONAL_OPERATIONS = "operations"; + +} diff --git a/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/RestWriter.java b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/RestWriter.java new file mode 100755 index 00000000..a631ad83 --- /dev/null +++ b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/RestWriter.java @@ -0,0 +1,533 @@ +package com.alibaba.datax.plugin.writer.restwriter; + +import java.time.Duration; +import java.time.Instant; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +import org.apache.http.HttpException; + +import com.alibaba.datax.common.element.Column; +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.plugin.RecordReceiver; +import com.alibaba.datax.common.spi.Writer; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.writer.restwriter.conf.ClientConfig; +import com.alibaba.datax.plugin.writer.restwriter.conf.Field; +import com.alibaba.datax.plugin.writer.restwriter.conf.Process; +import com.alibaba.datax.plugin.writer.restwriter.handler.ObjectRecordConverter; +import com.alibaba.datax.plugin.writer.restwriter.handler.TypeHandlerRegistry; +import com.alibaba.datax.plugin.writer.restwriter.process.ProcessExecutor; +import com.alibaba.datax.plugin.writer.restwriter.process.ProcessFactory; +import com.alibaba.datax.plugin.writer.restwriter.validator.ConfigurationValidator; +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONException; +import com.google.common.collect.Lists; + +import dev.failsafe.Failsafe; +import dev.failsafe.FailsafeExecutor; +import dev.failsafe.RateLimiter; +import dev.failsafe.RetryPolicy; +import kong.unirest.HttpMethod; +import kong.unirest.HttpResponse; +import kong.unirest.HttpStatus; +import kong.unirest.JsonNode; +import kong.unirest.Unirest; +import kong.unirest.UnirestInstance; +import lombok.EqualsAndHashCode; +import lombok.extern.slf4j.Slf4j; + +import static com.alibaba.datax.plugin.writer.restwriter.Key.BATCH_MODE; +import static com.alibaba.datax.plugin.writer.restwriter.Key.BATCH_SIZE; +import static com.alibaba.datax.plugin.writer.restwriter.Key.CLIENT; +import static com.alibaba.datax.plugin.writer.restwriter.Key.DEBUG; +import static com.alibaba.datax.plugin.writer.restwriter.Key.FAIL_FAST; +import static com.alibaba.datax.plugin.writer.restwriter.Key.FIELDS; +import static com.alibaba.datax.plugin.writer.restwriter.Key.HTTP_HEADERS; +import static com.alibaba.datax.plugin.writer.restwriter.Key.HTTP_METHOD; +import static com.alibaba.datax.plugin.writer.restwriter.Key.HTTP_QUERY; +import static com.alibaba.datax.plugin.writer.restwriter.Key.HTTP_SSL; +import static com.alibaba.datax.plugin.writer.restwriter.Key.MAX_RETRIES; +import static com.alibaba.datax.plugin.writer.restwriter.Key.RATE_PER_TASK; +import static com.alibaba.datax.plugin.writer.restwriter.Key.TASK_INDEX; +import static com.alibaba.datax.plugin.writer.restwriter.Key.URL; +import static com.alibaba.datax.plugin.writer.restwriter.RestWriterErrorCode.HTTP_CLIENT_CONFIG_INVALID_EXCEPTION; +import static com.alibaba.datax.plugin.writer.restwriter.RestWriterErrorCode.RUNTIME_EXCEPTION; +import static com.alibaba.datax.plugin.writer.restwriter.process.ProcessCategory.POSTPROCESS; +import static com.alibaba.datax.plugin.writer.restwriter.process.ProcessCategory.PREPROCESS; +import static java.util.Objects.isNull; +import static java.util.Objects.nonNull; +import static kong.unirest.ContentType.APPLICATION_JSON; +import static kong.unirest.HeaderNames.CONTENT_TYPE; +import static org.apache.commons.collections4.CollectionUtils.isNotEmpty; +import static org.apache.commons.collections4.MapUtils.emptyIfNull; +import static org.apache.commons.lang3.StringUtils.prependIfMissing; + +/** + * @author zhangyongxiang + * @date 2023-08-23 + */ +@Slf4j +public class RestWriter extends Writer { + + public static final int DEFAULT_MAX_RETRIES_VALUE = 3; + + public static final int DEFAULT_BATCH_SIZE_VALUE = 100; + + public static final boolean DEFAULT_BATCH_MODE_VALUE = false; + + public static final boolean DEFAULT_DEBUG_VALUE = false; + + public static final boolean DEFAULT_FAIL_FAST_VALUE = false; + + public static final boolean DEFAULT_SSL_VALUE = false; + + @Slf4j + @EqualsAndHashCode(callSuper = true) + public static class Job extends Writer.Job { + + private Configuration originalConfig; + + private long startTime; + + private long endTime; + + private Process preprocess; + + private Process postprocess; + + private ProcessFactory processFactory; + + private ProcessExecutor processExecutor; + + @Override + public void init() { + this.originalConfig = super.getPluginJobConf(); + this.validateParameter(); + this.processFactory = new ProcessFactory(this.originalConfig); + this.preprocess = this.processFactory.createProcess(PREPROCESS); + this.postprocess = this.processFactory.createProcess(POSTPROCESS); + this.processExecutor = new ProcessExecutor(); + log.info( + "{} job initialized, desc: {}, developer: {}, job conf: {}, job preprocess: {}, job postprocess: {}", + this.getPluginName(), this.getDescription(), + this.getDeveloper(), this.getPluginJobConf(), + this.preprocess, this.postprocess); + } + + private void validateParameter() { + try { + new ConfigurationValidator().validate(this.originalConfig, + null); + } catch (final Exception se) { + throw DataXException.asDataXException(RUNTIME_EXCEPTION, + "an exception has occurred when validating parameters", + se); + } + } + + @Override + public void preCheck() { + log.info("job {} pre check will not be called", + this.getPluginName()); + } + + @Override + public void prepare() { + this.startTime = System.currentTimeMillis(); + + if (nonNull(this.preprocess) + && isNotEmpty(this.preprocess.getOperations())) { + this.processExecutor.execute(this.preprocess); + log.info( + "{} job prepared successfully after preprocess, job conf: {}, preprocess: {}", + this.getPluginName(), this.originalConfig, + this.preprocess); + } else { + log.info( + "{} job prepared without need any of preprocess, job conf: {}", + this.getPluginName(), this.originalConfig); + } + } + + @Override + public List split(final int mandatoryNumber) { + int finalMandatoryNumber = mandatoryNumber; + if (finalMandatoryNumber < 1) { + log.warn( + "mandatory number {} less than one, reset it to be one", + mandatoryNumber); + finalMandatoryNumber = 1; + } + final List configurations = Lists + .newArrayListWithExpectedSize(finalMandatoryNumber); + + for (int index = 0; index < finalMandatoryNumber; index++) { + final Configuration taskConf = this.originalConfig.clone(); + taskConf.set(TASK_INDEX, index); + configurations.add(taskConf); + log.info( + "{} job split into {} tasks, current task: {}, desc: {}, developer: {}, task conf: {}", + this.getPluginName(), finalMandatoryNumber, index, + this.getDescription(), this.getDeveloper(), taskConf); + } + return configurations; + } + + @Override + public void post() { + this.endTime = System.currentTimeMillis(); + + if (nonNull(this.postprocess) + && isNotEmpty(this.postprocess.getOperations())) { + this.processExecutor.execute(this.postprocess); + log.info( + "{} postprocess execute successfully, postprocess: {}", + this.getPluginName(), this.postprocess); + } + + log.info( + "job {} execute to end, start from {}, end to {}, total time: {}", + this.getPluginName(), + Instant.ofEpochMilli(this.startTime) + .atZone(ZoneId.systemDefault()).toLocalDateTime(), + Instant.ofEpochMilli(this.endTime) + .atZone(ZoneId.systemDefault()).toLocalDateTime(), + Duration.ofMillis(this.endTime - this.startTime)); + } + + @Override + public void destroy() { + log.info("job {} destroy, nothing to clean up", + this.getPluginName()); + } + } + + @Slf4j + @EqualsAndHashCode(callSuper = true) + public static class Task extends Writer.Task { + + private long startTime; + + private long endTime; + + private int successCount; + + private int failCount; + + private UnirestInstance unirest; + + private ObjectRecordConverter converter; + + private FailsafeExecutor> executor; + + private Configuration writerSliceConfig; + + private Integer taskIndex; + + private String url; + + private HttpMethod method; + + private boolean ssl; + + private Map headers; + + private Map query; + + private Integer maxRetries; + + private boolean batchMode; + + private Integer batchSize; + + private List fields; + + private boolean debug; + + private boolean failFast; + + private Integer ratePerTask; + + private Duration avgWriteTime; + + private final ClientConfig clientConfig = new ClientConfig(); + + @Override + public void init() { + this.writerSliceConfig = this.getPluginJobConf(); + this.taskIndex = this.writerSliceConfig.getInt(TASK_INDEX); + this.url = this.writerSliceConfig.getString(URL).trim(); + this.method = HttpMethod + .valueOf(this.writerSliceConfig.getString(HTTP_METHOD)); + this.ssl = this.writerSliceConfig.getBool(HTTP_SSL, + DEFAULT_SSL_VALUE); + this.headers = emptyIfNull( + this.writerSliceConfig.getMap(HTTP_HEADERS, String.class)); + this.query = emptyIfNull(this.writerSliceConfig.getMap(HTTP_QUERY)); + this.maxRetries = this.writerSliceConfig.getInt(MAX_RETRIES, + DEFAULT_MAX_RETRIES_VALUE); + this.batchMode = this.writerSliceConfig.getBool(BATCH_MODE, + DEFAULT_BATCH_MODE_VALUE); + this.batchSize = this.writerSliceConfig.getInt(BATCH_SIZE, + DEFAULT_BATCH_SIZE_VALUE); + this.fields = this.writerSliceConfig.getListWithJson(FIELDS, + Field.class); + this.debug = this.writerSliceConfig.getBool(DEBUG, + DEFAULT_DEBUG_VALUE); + this.failFast = this.writerSliceConfig.getBool(FAIL_FAST, + DEFAULT_FAIL_FAST_VALUE); + this.ratePerTask = this.writerSliceConfig.getInt(RATE_PER_TASK); + final Object client = this.writerSliceConfig.get(CLIENT); + if (nonNull(client)) { + try { + final ClientConfig config = JSON.parseObject( + JSON.toJSONString(client), ClientConfig.class); + if (nonNull(config)) { + if (config.getMaxPerRoute() > 0) { + this.clientConfig + .setMaxPerRoute(config.getMaxPerRoute()); + } + if (config.getMaxTotal() > 0) { + this.clientConfig.setMaxTotal(config.getMaxTotal()); + } + } + } catch (final JSONException e) { + throw DataXException.asDataXException( + HTTP_CLIENT_CONFIG_INVALID_EXCEPTION, + String.format("client config: %s", + JSON.toJSONString(client))); + } + } + log.info( + "{} task {} initialized, desc: {}, developer: {}, task conf: {}", + this.getPluginName(), this.taskIndex, this.getDescription(), + this.getDeveloper(), this.writerSliceConfig); + } + + @Override + public void prepare() { + if (this.url.startsWith("http://")) { + this.ssl = false; + } else if (this.url.startsWith("https://")) { + this.ssl = true; + } else if (this.ssl) { + this.url = prependIfMissing(this.url, "https://"); + } else { + this.url = prependIfMissing(this.url, "http://"); + } + + this.unirest = Unirest.spawnInstance(); + if (!emptyIfNull(this.headers).isEmpty()) { + this.headers.forEach(this.unirest.config()::addDefaultHeader); + } + if (!this.headers.containsKey(CONTENT_TYPE)) { + this.unirest.config().addDefaultHeader(CONTENT_TYPE, + APPLICATION_JSON.getMimeType()); + } + this.unirest.config().addShutdownHook(true); + this.unirest.config().defaultBaseUrl(this.url); + this.unirest.config().verifySsl(false); + this.unirest.config().automaticRetries(false); + this.unirest.config().concurrency(this.clientConfig.getMaxTotal(), + this.clientConfig.getMaxPerRoute()); + + this.converter = new ObjectRecordConverter( + new TypeHandlerRegistry(), this.fields); + + final RetryPolicy> retryPolicy = RetryPolicy + .>builder() + .handleResultIf(response -> response + .getStatus() >= HttpStatus.BAD_REQUEST) + .onFailedAttempt(e -> log.error( + "write failed, attempt execution times: {}," + + " possible result response code: {}, possible result response body: {}", + e.getAttemptCount() + 1, + Optional.ofNullable(e.getLastResult()) + .map(HttpResponse::getStatusText) + .orElse(null), + Optional.ofNullable(e.getLastResult()) + .map(HttpResponse::getBody).orElse(null), + e.getLastException())) + .onRetry(e -> log.warn("failure #{}th retrying.", + e.getAttemptCount())) + .onRetriesExceeded(e -> log.error( + "fail to write. max retries exceeded. cause: {}", + nonNull(e.getException()) + ? e.getException().getMessage() + : e.getResult().getStatusText(), + e.getException())) + .build(); + if (isNull(this.ratePerTask) || this.ratePerTask <= 0) { + this.executor = Failsafe.with(retryPolicy); + } else { + this.executor = Failsafe + .with(RateLimiter + .>smoothBuilder( + this.ratePerTask, Duration.ofSeconds(1)) + .withMaxWaitTime(Duration.ofDays(365)).build()) + .compose(retryPolicy); + } + this.startTime = System.currentTimeMillis(); + this.successCount = 0; + this.failCount = 0; + log.info( + "{} task {} prepared, desc: {}, developer: {}, task conf: {}", + this.getPluginName(), this.taskIndex, this.getDescription(), + this.getDeveloper(), this.writerSliceConfig); + log.info("http client config: {}", this.clientConfig); + } + + @Override + public void preCheck() { + log.info( + "{} task {} check will not be called, desc: {}, developer: {}, task conf {}", + this.getPluginName(), this.taskIndex, this.getDescription(), + this.getDeveloper(), this.writerSliceConfig); + } + + @Override + public void startWrite(final RecordReceiver lineReceiver) { + final List writerBuffer = new ArrayList<>(this.batchSize); + Record recordItem = null; + while ((recordItem = lineReceiver.getFromReader()) != null) { + if (this.batchMode) { + writerBuffer.add(recordItem); + if (writerBuffer.size() >= this.batchSize) { + this.doWrite(writerBuffer); + writerBuffer.clear(); + } + + } else { + this.doWrite(recordItem); + } + if (this.debug) { + final int bound = recordItem.getColumnNumber(); + for (int index = 0; index < bound; index++) { + final Column column = recordItem.getColumn(index); + log.info( + "colum type: {}, column type class: {}, raw data: {}, raw data class: {}, byte size: {}", + column.getType(), + column.getType().getClass().getName(), + column.getRawData(), + Optional.ofNullable(column.getRawData()) + .map(Object::getClass) + .map(Class::getName).orElse(null), + column.getByteSize()); + } + } + } + if (this.batchMode && !writerBuffer.isEmpty()) { + this.doWrite(writerBuffer); + writerBuffer.clear(); + } + + } + + @Override + public void post() { + this.endTime = System.currentTimeMillis(); + this.unirest.close(); + log.info( + "job {} task {} execute to end, start from {}, end to {}, total time: {}, avg write time: {}, " + + "total count: {}, fail count: {}", + this.getPluginName(), this.taskIndex, + Instant.ofEpochMilli(this.startTime) + .atZone(ZoneId.systemDefault()).toLocalDateTime(), + Instant.ofEpochMilli(this.endTime) + .atZone(ZoneId.systemDefault()).toLocalDateTime(), + Duration.ofMillis(this.endTime - this.startTime), + this.avgWriteTime, this.successCount + this.failCount, + this.failCount); + if (this.failCount > 0) { + log.error("job {} task {} execute to end, fail count: {}", + this.getPluginName(), this.taskIndex, this.failCount); + } + } + + @Override + public void destroy() { + + log.info("job {} task {} destroy", this.getPluginName(), + this.taskIndex); + } + + private void doWrite(final Record item) { + try { + final HttpResponse resp = this.executor.get(ctx -> { + final Map body = this.converter + .convert(item); + return executeRequest(1, body); + }); + if (resp.getStatus() >= HttpStatus.BAD_REQUEST) { + throw new HttpException(resp.getStatusText()); + } + } catch (final Exception e) { + if (this.failFast) { + throw DataXException.asDataXException(RUNTIME_EXCEPTION, + e.getMessage(), e); + } + } + + } + + private void doWrite(final List records) { + try { + final HttpResponse resp = this.executor.get(ctx -> { + final List> body = records.stream() + .map(this.converter::convert) + .collect(Collectors.toList()); + return executeRequest(records.size(), body); + }); + if (resp.getStatus() >= HttpStatus.BAD_REQUEST) { + throw new HttpException(resp.getStatusText()); + } + } catch (final Exception e) { + if (this.failFast) { + throw DataXException.asDataXException(RUNTIME_EXCEPTION, + e.getMessage(), e); + } + } + } + + private HttpResponse executeRequest(final int itemCount, + final Object body) { + final long writeStartTime = System.nanoTime(); + return this.unirest.request(this.method.name(), "") + .queryString(this.query).body(body).asJson() + .ifSuccess(response -> { + this.successCount += itemCount; + final long writeEndTime = System.nanoTime(); + log.info( + "the {}th record has been written successfully, consume time: {}", + this.successCount + this.failCount, + Duration.ofNanos( + writeEndTime - writeStartTime)); + if (isNull(this.avgWriteTime)) { + this.avgWriteTime = Duration + .ofNanos(writeEndTime - writeStartTime); + } else { + this.avgWriteTime = Duration.ofNanos( + (this.avgWriteTime.toNanos() + writeEndTime + - writeStartTime) / 2); + } + }).ifFailure(response -> { + this.failCount += itemCount; + log.error( + "data write failed, http code: {}, message: {} , optional reason: {}, data info: {} ", + response.getStatus(), response.getStatusText(), + response.getBody(), body); + response.getParsingError().ifPresent(e -> { + log.error("original body: {}, parsing exception", + e.getOriginalBody(), e); + throw e; + }); + }); + } + } +} diff --git a/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/RestWriterErrorCode.java b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/RestWriterErrorCode.java new file mode 100644 index 00000000..737dfd40 --- /dev/null +++ b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/RestWriterErrorCode.java @@ -0,0 +1,68 @@ +package com.alibaba.datax.plugin.writer.restwriter; + +import com.alibaba.datax.common.spi.ErrorCode; + +/** + * @author: zhangyongxiang + * @date 2023/8/24 11:34 + **/ +public enum RestWriterErrorCode implements ErrorCode { + /** + * runtime exception + */ + RUNTIME_EXCEPTION("RestWriter-00", "运行时异常"), + + EMPTY_RECORD_EXCEPTION("RestWriter-01", "空record数据"), + + EMPTY_FIELD_EXCEPTION("RestWriter-02", "你需要配置至少一个field"), + + FIELD_MISMATCH_WITH_COLUMN_EXCEPTION("RestWriter-03", + "field数量与column数量不匹配"), + + FIELD_CLASS_NOT_FOUND_EXCEPTION("RestWriter-04", "配置的field class不存在"), + + URL_INVALID_EXCEPTION("RestWriter-05", "您填写的URL参数值不合法"), + + METHOD_INVALID_EXCEPTION("RestWriter-06", "您填写的method参数值不合法"), + + RATE_PER_TASK_INVALID_EXCEPTION("RestWriter-07", "您填写的rate-per-task参数值不合法"), + + BATCH_SIZE_INVALID_EXCEPTION("RestWriter-08", "您填写的batchSize参数值不合法"), + + MAX_RETRIES_INVALID_EXCEPTION("RestWriter-09", "您填写的maxRetries参数值不合法"), + + FIELDS_INVALID_EXCEPTION("RestWriter-10", "您填写的fields参数值不合法"), + + TYPE_HANDLER_NOT_FOUND_EXCEPTION("RestWriter-11", "没有找到合适的typehandler"), + + HTTP_CLIENT_CONFIG_INVALID_EXCEPTION("RestWriter-12", "http client配置不合法"), + + PREPROCESS_OPERATION_ERROR("RestWriter-13", "预处理失败"), + + POSTPROCESS_OPERATION_ERROR("RestWriter-14", "后处理失败"), + + CONCURRENT_INVALID_EXCEPTION("RestWriter-15", "并发参数不合法"), + + OPERATION_RESULT_ERROR_EXCEPTION("RestWriter-16", "返回结果错误"), + + EXPRESSION_EVALUATE_FAILED_EXCEPTION("RestWriter-17", "谓词表达式计算错误"); + + private final String code; + + private final String description; + + RestWriterErrorCode(final String code, final String description) { + this.code = code; + this.description = description; + } + + @Override + public String getCode() { + return this.code; + } + + @Override + public String getDescription() { + return this.description; + } +} diff --git a/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/conf/ClientConfig.java b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/conf/ClientConfig.java new file mode 100644 index 00000000..4e9bf7e8 --- /dev/null +++ b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/conf/ClientConfig.java @@ -0,0 +1,19 @@ +package com.alibaba.datax.plugin.writer.restwriter.conf; + +import lombok.Data; + +import static kong.unirest.Config.DEFAULT_MAX_CONNECTIONS; +import static kong.unirest.Config.DEFAULT_MAX_PER_ROUTE; + +/** + * @name: zhangyongxiang + * @author: zhangyongxiang@baidu.com + **/ +@Data +public class ClientConfig { + + private int maxTotal = DEFAULT_MAX_CONNECTIONS; + + private int maxPerRoute = DEFAULT_MAX_PER_ROUTE; + +} diff --git a/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/conf/Field.java b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/conf/Field.java new file mode 100644 index 00000000..07e4bf9c --- /dev/null +++ b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/conf/Field.java @@ -0,0 +1,17 @@ +package com.alibaba.datax.plugin.writer.restwriter.conf; + +import lombok.Data; + +/** + * @author: zhangyongxiang + * @date 2023/8/24 21:58 + **/ +@Data +public class Field { + + private String name; + + private String type; + + private String format; +} diff --git a/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/conf/Operation.java b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/conf/Operation.java new file mode 100644 index 00000000..e9508468 --- /dev/null +++ b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/conf/Operation.java @@ -0,0 +1,32 @@ +package com.alibaba.datax.plugin.writer.restwriter.conf; + +import java.util.Map; + +import com.google.common.collect.Maps; + +import lombok.Data; + +/** + * @name: zhangyongxiang + * @author: zhangyongxiang@baidu.com + **/ +@Data +public class Operation { + + private String url; + + private String method; + + private Map headers = Maps.newHashMap(); + + private String body; + + private boolean base64; + + private boolean debug; + + private int maxRetries = 1; + + private String jsonExpression; + +} diff --git a/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/conf/Process.java b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/conf/Process.java new file mode 100644 index 00000000..b9859737 --- /dev/null +++ b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/conf/Process.java @@ -0,0 +1,29 @@ +package com.alibaba.datax.plugin.writer.restwriter.conf; + +import java.util.List; + +import com.alibaba.datax.plugin.writer.restwriter.process.ProcessCategory; +import com.google.common.collect.Lists; + +import lombok.Data; + +import static com.alibaba.datax.plugin.writer.restwriter.process.ProcessCategory.PREPROCESS; + +/** + * @name: zhangyongxiang + * @author: zhangyongxiang@baidu.com + **/ + +@Data +public class Process { + + private boolean concurrent; + + private ProcessCategory category = PREPROCESS; + + private List operations = Lists.newArrayList(); + + public Process(final ProcessCategory category) { + this.category = category; + } +} diff --git a/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/handler/ObjectRecordConverter.java b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/handler/ObjectRecordConverter.java new file mode 100644 index 00000000..1c7538cc --- /dev/null +++ b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/handler/ObjectRecordConverter.java @@ -0,0 +1,95 @@ +package com.alibaba.datax.plugin.writer.restwriter.handler; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.IntStream; + +import org.apache.commons.lang3.ClassUtils; + +import com.alibaba.datax.common.element.Column; +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.plugin.writer.restwriter.conf.Field; +import com.google.common.collect.Maps; + +import static com.alibaba.datax.plugin.writer.restwriter.RestWriterErrorCode.EMPTY_FIELD_EXCEPTION; +import static com.alibaba.datax.plugin.writer.restwriter.RestWriterErrorCode.EMPTY_RECORD_EXCEPTION; +import static com.alibaba.datax.plugin.writer.restwriter.RestWriterErrorCode.FIELD_CLASS_NOT_FOUND_EXCEPTION; +import static com.alibaba.datax.plugin.writer.restwriter.RestWriterErrorCode.FIELD_MISMATCH_WITH_COLUMN_EXCEPTION; +import static com.alibaba.datax.plugin.writer.restwriter.RestWriterErrorCode.TYPE_HANDLER_NOT_FOUND_EXCEPTION; +import static java.util.Objects.isNull; +import static org.apache.commons.lang3.StringUtils.isNotBlank; + +/** + * @author: zhangyongxiang + * @date 2023/8/24 14:25 + **/ +public class ObjectRecordConverter + implements RecordConverter> { + + private final TypeHandlerRegistry registry; + + private final List fields; + + private final Map> fieldClasses; + + public ObjectRecordConverter(final TypeHandlerRegistry registry, + final List fields) { + this.registry = registry; + this.fields = fields; + this.fieldClasses = new HashMap<>(); + if (!fields.isEmpty()) { + fields.forEach(field -> { + if (isNotBlank(field.getType())) { + try { + this.fieldClasses.put(field.getName(), + ClassUtils.getClass(field.getType())); + } catch (final ClassNotFoundException e) { + throw DataXException.asDataXException( + FIELD_CLASS_NOT_FOUND_EXCEPTION, + String.format("field %s type %s not found", + field.getName(), field.getType()), + e); + } + } else { + this.fieldClasses.put(field.getName(), Void.class); + } + }); + } else { + throw DataXException.asDataXException(EMPTY_FIELD_EXCEPTION, + "you should configure at least one field"); + } + } + + @Override + public Map convert(final Record record) { + if (record.getColumnNumber() <= 0) { + throw DataXException.asDataXException(EMPTY_RECORD_EXCEPTION, + "record is empty"); + } + if (this.fields.size() > record.getColumnNumber()) { + throw DataXException.asDataXException( + FIELD_MISMATCH_WITH_COLUMN_EXCEPTION, + "number of fields is less than number of columns of record"); + } + final Map m = Maps.newHashMap(); + IntStream.range(0, this.fields.size()).forEach(num -> { + final Column column = record.getColumn(num); + final Class clazz = this.fieldClasses + .get(this.fields.get(num).getName()); + final TypeHandler typeHandler = this.registry + .getTypeHandler(column.getType(), clazz); + if (isNull(typeHandler)) { + throw DataXException.asDataXException( + TYPE_HANDLER_NOT_FOUND_EXCEPTION, + String.format( + "type handler not found for source type %s and target class %s", + column.getType().name(), clazz.getName())); + } + m.put(this.fields.get(num).getName(), + typeHandler.convert(column.getRawData())); + }); + return m; + } +} diff --git a/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/handler/RecordConverter.java b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/handler/RecordConverter.java new file mode 100644 index 00000000..74fa811f --- /dev/null +++ b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/handler/RecordConverter.java @@ -0,0 +1,14 @@ +package com.alibaba.datax.plugin.writer.restwriter.handler; + +import com.alibaba.datax.common.element.Record; + +/** + * @author: zhangyongxiang + * @date 2023/8/24 14:24 + **/ +@FunctionalInterface +public interface RecordConverter { + + T convert(Record record); + +} diff --git a/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/handler/TypeHandler.java b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/handler/TypeHandler.java new file mode 100644 index 00000000..536c8c75 --- /dev/null +++ b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/handler/TypeHandler.java @@ -0,0 +1,9 @@ +package com.alibaba.datax.plugin.writer.restwriter.handler; + +/** + * @author: zhangyongxiang + * @date 2023/8/24 21:03 + **/ +public interface TypeHandler { + T convert(Object object); +} diff --git a/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/handler/TypeHandlerRegistry.java b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/handler/TypeHandlerRegistry.java new file mode 100644 index 00000000..f6d286b3 --- /dev/null +++ b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/handler/TypeHandlerRegistry.java @@ -0,0 +1,62 @@ +package com.alibaba.datax.plugin.writer.restwriter.handler; + +import java.time.LocalDateTime; + +import com.alibaba.datax.common.element.Column; +import com.alibaba.datax.plugin.writer.restwriter.handler.bool.BoolVoidTypeHandler; +import com.alibaba.datax.plugin.writer.restwriter.handler.bytes.BytesVoidTypeHandler; +import com.alibaba.datax.plugin.writer.restwriter.handler.date.DateLocalDateTimeTypeHandler; +import com.alibaba.datax.plugin.writer.restwriter.handler.date.DateVoidTypeHandler; +import com.alibaba.datax.plugin.writer.restwriter.handler.string.StringVoidTypeHandler; +import com.alibaba.datax.plugin.writer.restwriter.handler.typedouble.DoubleVoidTypeHandler; +import com.alibaba.datax.plugin.writer.restwriter.handler.typeint.IntVoidTypeHandler; +import com.alibaba.datax.plugin.writer.restwriter.handler.typelong.LongVoidTypeHandler; +import com.alibaba.datax.plugin.writer.restwriter.handler.typenull.NullVoidTypeHandler; +import com.google.common.collect.HashBasedTable; +import com.google.common.collect.Table; + +/** + * @author: zhangyongxiang + * @date 2023/8/24 21:03 + **/ +public class TypeHandlerRegistry { + + private final Table, TypeHandler> handlers = HashBasedTable + .create(); + + public TypeHandlerRegistry() { + registerDefault(Column.Type.INT, new IntVoidTypeHandler()); + registerDefault(Column.Type.LONG, new LongVoidTypeHandler()); + registerDefault(Column.Type.NULL, new NullVoidTypeHandler()); + registerDefault(Column.Type.DOUBLE, new DoubleVoidTypeHandler()); + registerDefault(Column.Type.STRING, new StringVoidTypeHandler()); + registerDefault(Column.Type.BOOL, new BoolVoidTypeHandler()); + registerDefault(Column.Type.DATE, new DateVoidTypeHandler()); + registerDefault(Column.Type.BYTES, new BytesVoidTypeHandler()); + register(Column.Type.DATE, LocalDateTime.class, + new DateLocalDateTimeTypeHandler()); + } + + // BAD, NULL, INT, LONG, DOUBLE, STRING, BOOL, DATE, BYTES + + void register(final Column.Type type, final Class targetClass, + final TypeHandler typeHandler) { + this.handlers.put(type, targetClass, typeHandler); + } + + void registerDefault(final Column.Type type, + final TypeHandler typeHandler) { + this.handlers.put(type, Void.class, typeHandler); + } + + boolean hasTypeHandler(final Column.Type type, + final Class targetClass) { + return this.handlers.contains(type, targetClass); + } + + TypeHandler getTypeHandler(final Column.Type type, + final Class targetClass) { + return (TypeHandler) this.handlers.get(type, targetClass); + } + +} diff --git a/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/handler/bool/BoolVoidTypeHandler.java b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/handler/bool/BoolVoidTypeHandler.java new file mode 100644 index 00000000..1be622c3 --- /dev/null +++ b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/handler/bool/BoolVoidTypeHandler.java @@ -0,0 +1,17 @@ +package com.alibaba.datax.plugin.writer.restwriter.handler.bool; + +import com.alibaba.datax.plugin.writer.restwriter.handler.TypeHandler; + +/** + * @author: zhangyongxiang + * @date 2023/8/24 21:46 + **/ +public class BoolVoidTypeHandler implements TypeHandler { + /** + * underlying type is Boolean + */ + @Override + public Object convert(final Object object) { + return object; + } +} diff --git a/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/handler/bytes/BytesVoidTypeHandler.java b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/handler/bytes/BytesVoidTypeHandler.java new file mode 100644 index 00000000..e7f500f1 --- /dev/null +++ b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/handler/bytes/BytesVoidTypeHandler.java @@ -0,0 +1,19 @@ +package com.alibaba.datax.plugin.writer.restwriter.handler.bytes; + +import com.alibaba.datax.plugin.writer.restwriter.handler.TypeHandler; + +/** + * @author: zhangyongxiang + * @date 2023/8/24 21:48 + **/ + +public class BytesVoidTypeHandler implements TypeHandler { + + /** + * underlying type is byte[] + */ + @Override + public Object convert(final Object object) { + return object; + } +} diff --git a/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/handler/date/DateLocalDateTimeTypeHandler.java b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/handler/date/DateLocalDateTimeTypeHandler.java new file mode 100644 index 00000000..01987d70 --- /dev/null +++ b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/handler/date/DateLocalDateTimeTypeHandler.java @@ -0,0 +1,29 @@ +package com.alibaba.datax.plugin.writer.restwriter.handler.date; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; + +import com.alibaba.datax.plugin.writer.restwriter.handler.TypeHandler; + +import static java.util.Objects.nonNull; + +/** + * @author: zhangyongxiang + * @date 2023/8/24 21:49 + **/ + +public class DateLocalDateTimeTypeHandler + implements TypeHandler { + /** + * underlying type is Long + */ + @Override + public LocalDateTime convert(final Object object) { + if (nonNull(object)) { + return Instant.ofEpochMilli((Long) object) + .atZone(ZoneId.systemDefault()).toLocalDateTime(); + } + return null; + } +} diff --git a/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/handler/date/DateVoidTypeHandler.java b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/handler/date/DateVoidTypeHandler.java new file mode 100644 index 00000000..599471d0 --- /dev/null +++ b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/handler/date/DateVoidTypeHandler.java @@ -0,0 +1,18 @@ +package com.alibaba.datax.plugin.writer.restwriter.handler.date; + +import com.alibaba.datax.plugin.writer.restwriter.handler.TypeHandler; + +/** + * @author: zhangyongxiang + * @date 2023/8/24 21:47 + **/ + +public class DateVoidTypeHandler implements TypeHandler { + /** + * underlying type is Long + */ + @Override + public Object convert(final Object object) { + return object; + } +} diff --git a/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/handler/string/StringVoidTypeHandler.java b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/handler/string/StringVoidTypeHandler.java new file mode 100644 index 00000000..508f6eee --- /dev/null +++ b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/handler/string/StringVoidTypeHandler.java @@ -0,0 +1,18 @@ +package com.alibaba.datax.plugin.writer.restwriter.handler.string; + +import com.alibaba.datax.plugin.writer.restwriter.handler.TypeHandler; + +/** + * @author: zhangyongxiang + * @date 2023/8/24 21:46 + **/ +public class StringVoidTypeHandler implements TypeHandler { + + /** + * underlying type is String + */ + @Override + public Object convert(final Object object) { + return object; + } +} diff --git a/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/handler/typedouble/DoubleVoidTypeHandler.java b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/handler/typedouble/DoubleVoidTypeHandler.java new file mode 100644 index 00000000..107b29aa --- /dev/null +++ b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/handler/typedouble/DoubleVoidTypeHandler.java @@ -0,0 +1,18 @@ +package com.alibaba.datax.plugin.writer.restwriter.handler.typedouble; + +import com.alibaba.datax.plugin.writer.restwriter.handler.TypeHandler; + +/** + * @author: zhangyongxiang + * @date 2023/8/24 21:45 + **/ + +public class DoubleVoidTypeHandler implements TypeHandler { + /** + * underlying type is Double + */ + @Override + public Object convert(final Object object) { + return object; + } +} diff --git a/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/handler/typeint/IntVoidTypeHandler.java b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/handler/typeint/IntVoidTypeHandler.java new file mode 100644 index 00000000..08e02624 --- /dev/null +++ b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/handler/typeint/IntVoidTypeHandler.java @@ -0,0 +1,20 @@ +package com.alibaba.datax.plugin.writer.restwriter.handler.typeint; + +import com.alibaba.datax.plugin.writer.restwriter.handler.TypeHandler; + +/** + * @author: zhangyongxiang + * @date 2023/8/24 21:22 + **/ + +public class IntVoidTypeHandler implements TypeHandler { + + /** + * underlying type is BigInteger + */ + + @Override + public Object convert(final Object object) { + return object; + } +} diff --git a/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/handler/typelong/LongVoidTypeHandler.java b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/handler/typelong/LongVoidTypeHandler.java new file mode 100644 index 00000000..c44a6668 --- /dev/null +++ b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/handler/typelong/LongVoidTypeHandler.java @@ -0,0 +1,18 @@ +package com.alibaba.datax.plugin.writer.restwriter.handler.typelong; + +import com.alibaba.datax.plugin.writer.restwriter.handler.TypeHandler; + +/** + * @author: zhangyongxiang + * @date 2023/8/24 21:23 + **/ +public class LongVoidTypeHandler implements TypeHandler { + + /** + * underlying type is BigInteger + */ + @Override + public Object convert(final Object object) { + return object; + } +} diff --git a/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/handler/typenull/NullVoidTypeHandler.java b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/handler/typenull/NullVoidTypeHandler.java new file mode 100644 index 00000000..2890ee63 --- /dev/null +++ b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/handler/typenull/NullVoidTypeHandler.java @@ -0,0 +1,19 @@ +package com.alibaba.datax.plugin.writer.restwriter.handler.typenull; + +import com.alibaba.datax.plugin.writer.restwriter.handler.TypeHandler; + +/** + * @author: zhangyongxiang + * @date 2023/8/24 21:20 + **/ + +public class NullVoidTypeHandler implements TypeHandler { + + /** + * unknown underlying type + */ + @Override + public Object convert(final Object object) { + return null; + } +} diff --git a/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/process/MapAccessor.java b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/process/MapAccessor.java new file mode 100644 index 00000000..8745ef9e --- /dev/null +++ b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/process/MapAccessor.java @@ -0,0 +1,115 @@ +package com.alibaba.datax.plugin.writer.restwriter.process; + +import java.util.Map; + +import org.springframework.asm.MethodVisitor; +import org.springframework.expression.AccessException; +import org.springframework.expression.EvaluationContext; +import org.springframework.expression.TypedValue; +import org.springframework.expression.spel.CodeFlow; +import org.springframework.expression.spel.CompilablePropertyAccessor; +import org.springframework.lang.NonNull; +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; + +/** + * @version 1.0 + * @name: zhangyongxiang + * @author: zhangyongxiang@baidu.com + * @date 2023/10/25 15:14 + * @description: + **/ +public class MapAccessor implements CompilablePropertyAccessor { + + @Override + public Class[] getSpecificTargetClasses() { + return new Class[] { Map.class }; + } + + @Override + public boolean canRead(@NonNull final EvaluationContext context, + @Nullable final Object target, @NonNull final String name) + throws AccessException { + return (target instanceof Map + && ((Map) target).containsKey(name)); + } + + @NonNull + @Override + public TypedValue read(@NonNull final EvaluationContext context, + @Nullable final Object target, @NonNull final String name) + throws AccessException { + Assert.state(target instanceof Map, "Target must be of type Map"); + final Map map = (Map) target; + final Object value = map.get(name); + if (value == null && !map.containsKey(name)) { + throw new MapAccessException(name); + } + return new TypedValue(value); + } + + @Override + public boolean canWrite(@NonNull final EvaluationContext context, + @Nullable final Object target, @NonNull final String name) + throws AccessException { + return true; + } + + @Override + @SuppressWarnings("unchecked") + public void write(@NonNull final EvaluationContext context, + @Nullable final Object target, @NonNull final String name, + @Nullable final Object newValue) throws AccessException { + + Assert.state(target instanceof Map, "Target must be a Map"); + final Map map = (Map) target; + map.put(name, newValue); + } + + @Override + public boolean isCompilable() { + return true; + } + + @NonNull + @Override + public Class getPropertyType() { + return Object.class; + } + + @Override + public void generateCode(@NonNull final String propertyName, + @NonNull final MethodVisitor mv, final CodeFlow cf) { + final String descriptor = cf.lastDescriptor(); + if (descriptor == null || !descriptor.equals("Ljava/util/Map")) { + if (descriptor == null) { + cf.loadTarget(mv); + } + CodeFlow.insertCheckCast(mv, "Ljava/util/Map"); + } + mv.visitLdcInsn(propertyName); + mv.visitMethodInsn(INVOKEINTERFACE, "java/util/Map", "get", + "(Ljava/lang/Object;)Ljava/lang/Object;", true); + } + + /** + * Exception thrown from {@code read} in order to reset a cached + * PropertyAccessor, allowing other accessors to have a try. + */ + @SuppressWarnings("serial") + private static class MapAccessException extends AccessException { + + private final String key; + + public MapAccessException(final String key) { + super(""); + this.key = key; + } + + @Override + public String getMessage() { + return "Map does not contain a value for key '" + this.key + "'"; + } + } + +} diff --git a/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/process/OperationExecutionFailException.java b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/process/OperationExecutionFailException.java new file mode 100644 index 00000000..8be9b0aa --- /dev/null +++ b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/process/OperationExecutionFailException.java @@ -0,0 +1,16 @@ +package com.alibaba.datax.plugin.writer.restwriter.process; + +/** + * @name: zhangyongxiang + * @author: zhangyongxiang@baidu.com + **/ + +public class OperationExecutionFailException extends RuntimeException { + + private static final long serialVersionUID = 2848134252562605007L; + + public OperationExecutionFailException(final String message, + final Throwable cause) { + super(message, cause); + } +} diff --git a/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/process/ProcessCategory.java b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/process/ProcessCategory.java new file mode 100644 index 00000000..168cca12 --- /dev/null +++ b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/process/ProcessCategory.java @@ -0,0 +1,29 @@ +package com.alibaba.datax.plugin.writer.restwriter.process; + +import com.alibaba.datax.plugin.writer.restwriter.Key; + +import lombok.Getter; + +/** + * @name: zhangyongxiang + * @author: zhangyongxiang@baidu.com + **/ + +@Getter +public enum ProcessCategory { + /** + * pre processing + */ + PREPROCESS(Key.PREPROCESS), + /** + * post processing + */ + POSTPROCESS(Key.POSTPROCESS); + + private String key; + + ProcessCategory(final String key) { + this.key = key; + } + +} diff --git a/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/process/ProcessExecutor.java b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/process/ProcessExecutor.java new file mode 100644 index 00000000..c74408b8 --- /dev/null +++ b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/process/ProcessExecutor.java @@ -0,0 +1,275 @@ +package com.alibaba.datax.plugin.writer.restwriter.process; + +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.Base64; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.ForkJoinPool; + +import org.apache.commons.collections4.MapUtils; +import org.apache.commons.lang3.BooleanUtils; +import org.springframework.expression.ExpressionParser; +import org.springframework.expression.spel.standard.SpelExpressionParser; +import org.springframework.expression.spel.support.StandardEvaluationContext; + +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.plugin.writer.restwriter.conf.Operation; +import com.alibaba.datax.plugin.writer.restwriter.conf.Process; + +import dev.failsafe.Failsafe; +import dev.failsafe.RetryPolicy; +import kong.unirest.HttpMethod; +import kong.unirest.HttpRequest; +import kong.unirest.HttpRequestWithBody; +import kong.unirest.HttpResponse; +import kong.unirest.JsonObjectMapper; +import kong.unirest.ObjectMapper; +import kong.unirest.Unirest; +import kong.unirest.UnirestInstance; +import lombok.extern.slf4j.Slf4j; + +import static com.alibaba.datax.plugin.writer.restwriter.RestWriterErrorCode.EXPRESSION_EVALUATE_FAILED_EXCEPTION; +import static com.alibaba.datax.plugin.writer.restwriter.RestWriterErrorCode.OPERATION_RESULT_ERROR_EXCEPTION; +import static com.alibaba.datax.plugin.writer.restwriter.RestWriterErrorCode.POSTPROCESS_OPERATION_ERROR; +import static com.alibaba.datax.plugin.writer.restwriter.RestWriterErrorCode.PREPROCESS_OPERATION_ERROR; +import static com.alibaba.datax.plugin.writer.restwriter.process.ProcessCategory.PREPROCESS; +import static java.util.Objects.nonNull; +import static java.util.Optional.ofNullable; +import static java.util.concurrent.ForkJoinPool.defaultForkJoinWorkerThreadFactory; +import static kong.unirest.ContentType.APPLICATION_JSON; +import static kong.unirest.HeaderNames.CONTENT_TYPE; +import static kong.unirest.HttpMethod.GET; +import static kong.unirest.HttpStatus.MULTIPLE_CHOICE; +import static org.apache.commons.collections4.CollectionUtils.isNotEmpty; +import static org.apache.commons.collections4.MapUtils.emptyIfNull; +import static org.apache.commons.lang3.StringUtils.EMPTY; +import static org.apache.commons.lang3.StringUtils.isBlank; +import static org.apache.commons.lang3.StringUtils.isNotBlank; + +/** + * @name: zhangyongxiang + * @author: zhangyongxiang@baidu.com + **/ +@Slf4j +public class ProcessExecutor { + + private final UnirestInstance unirest; + + private final Executor executor; + + private final ObjectMapper json; + + private final ExpressionParser expressionParser; + + public ProcessExecutor() { + this(new ForkJoinPool(Runtime.getRuntime().availableProcessors(), + defaultForkJoinWorkerThreadFactory, null, true)); + } + + public ProcessExecutor(final Executor executor) { + this.executor = executor; + this.json = new JsonObjectMapper(); + this.expressionParser = new SpelExpressionParser(); + this.unirest = Unirest.spawnInstance(); + this.unirest.config().addShutdownHook(true); + this.unirest.config().verifySsl(false); + this.unirest.config().automaticRetries(false); + this.unirest.config() + .connectTimeout((int) Duration.ofHours(1).toMillis()); + this.unirest.config() + .socketTimeout((int) Duration.ofHours(1).toMillis()); + } + + public void execute(final Process process) { + if (nonNull(process) && isNotEmpty(process.getOperations())) { + if (process.isConcurrent() && process.getOperations().size() > 1) { + CompletableFuture.allOf(process + .getOperations().stream().map( + operation -> CompletableFuture.runAsync( + () -> executeWithRetry(operation, + process.getCategory()), + this.executor)) + .toArray(CompletableFuture[]::new)).exceptionally(e -> { + if (process.getCategory() == PREPROCESS) { + throw DataXException.asDataXException( + PREPROCESS_OPERATION_ERROR, + e.getMessage(), e); + } else { + throw DataXException.asDataXException( + POSTPROCESS_OPERATION_ERROR, + e.getMessage(), e); + } + }).join(); + } else { + process.getOperations() + .forEach(operation -> executeWithRetry(operation, + process.getCategory())); + } + } + } + + public HttpResponse executeWithRetry(final Operation operation, + final ProcessCategory category) { + if (operation.getMaxRetries() > 1) { + final RetryPolicy> retryPolicy = RetryPolicy + .>builder() + .withMaxRetries(operation.getMaxRetries()) + .handleResultIf( + response -> response.getStatus() >= MULTIPLE_CHOICE) + .withBackoff(1, 5, ChronoUnit.SECONDS) + .onFailedAttempt(e -> log.error( + "{} operation execute failed, attempt execution times: {}," + + " possible result response code: {}, possible result response body: {}", + category, e.getAttemptCount(), + ofNullable(e.getLastResult()) + .map(HttpResponse::getStatusText) + .orElse(null), + ofNullable(e.getLastResult()) + .map(HttpResponse::getBody).orElse(null), + e.getLastException())) + .onRetry(e -> log.warn( + "{} operation failure #{}th retrying.", category, + e.getAttemptCount())) + .onRetriesExceeded(e -> log.error( + "fail to execute operation {}. max retries exceeded. cause: {}", + operation, + nonNull(e.getException()) + ? e.getException().getMessage() + : e.getResult().getStatusText(), + e.getException())) + .build(); + return Failsafe.with(retryPolicy).get( + () -> executeWithExpectedResponse(operation, category)); + } else { + return executeWithExpectedResponse(operation, category); + } + } + + public HttpResponse executeWithExpectedResponse( + final Operation operation, final ProcessCategory category) { + final HttpResponse response = execute(operation, category); + if (response.getHeaders().containsKey(CONTENT_TYPE) + && response.getHeaders().get(CONTENT_TYPE).stream() + .anyMatch(contentType -> contentType + .contains(APPLICATION_JSON.getMimeType())) + && isNotBlank(operation.getJsonExpression())) { + if (isBlank(response.getBody())) { + log.warn("response body is empty, operation: {}", operation); + throw DataXException.asDataXException( + EXPRESSION_EVALUATE_FAILED_EXCEPTION, + String.format("operation %s return empty response body", + operation.getUrl())); + } + try { + final Object bodyResponse = json.readValue(response.getBody(), + Map.class); + if (operation.isDebug()) { + log.info( + "operation {} return response body: {}, deserialized value {}, class {}", + operation.getUrl(), operation.getBody(), + bodyResponse, bodyResponse.getClass().getName()); + } + final StandardEvaluationContext context = new StandardEvaluationContext( + bodyResponse); + context.addPropertyAccessor(new MapAccessor()); + if (!BooleanUtils.toBoolean(expressionParser + .parseExpression(operation.getJsonExpression()) + .getValue(context, boolean.class))) { + throw DataXException.asDataXException( + OPERATION_RESULT_ERROR_EXCEPTION, + String.format( + "operation return result is not right according the json expression," + + " result %s, expression: %s", + bodyResponse, + operation.getJsonExpression())); + } + } catch (final Exception e) { + log.error( + "body {} can't be deserialized or can't be evaluated against json expression {}, operation: {}", + response.getBody(), operation.getJsonExpression(), + operation); + throw DataXException.asDataXException( + EXPRESSION_EVALUATE_FAILED_EXCEPTION, + String.format( + "operation result can't be deserialized or evaluated failed," + + " result %s, expression: %s", + response.getBody(), + operation.getJsonExpression()), + e); + } + } + return response; + } + + /** + * @param operation operations + * @param category operations category + * @return response + */ + public HttpResponse execute(final Operation operation, + final ProcessCategory category) { + HttpRequestWithBody requestBuilder = this.unirest + .request(operation.getMethod(), operation.getUrl()); + if (MapUtils.isNotEmpty(operation.getHeaders())) { + for (final String header : operation.getHeaders().keySet()) { + requestBuilder = requestBuilder.header(header, + operation.getHeaders().get(header)); + } + } + if (!emptyIfNull(operation.getHeaders()).containsKey(CONTENT_TYPE)) { + requestBuilder = requestBuilder.header(CONTENT_TYPE, + APPLICATION_JSON.getMimeType()); + } + HttpRequest request = requestBuilder; + if (HttpMethod.valueOf(operation.getMethod()) != GET + && isNotBlank(operation.getBody())) { + if (operation.isBase64()) { + request = requestBuilder + .body(Base64.getDecoder().decode(operation.getBody())); + } else { + request = requestBuilder.body(operation.getBody()); + } + if (operation.isDebug()) { + log.info( + "request {} method {} has body: {}, base64 encoded?{}, decoded body: {}", + operation.getUrl(), operation.getMethod(), + operation.getBody(), operation.isBase64(), + operation.isBase64() + ? Base64.getDecoder() + .decode(operation.getBody()) + : EMPTY); + } + } + final long startTime = System.nanoTime(); + return request.asString().ifSuccess(response -> log.info( + "operation {} category: {} execute successfully,response: {}, body: {}, consume time: {}", + operation.getUrl(), category, response.getStatusText(), + response.getBody(), + Duration.ofNanos(System.nanoTime() - startTime))) + .ifFailure(response -> { + response.getParsingError().ifPresent(e -> { + log.error( + "operation {} category: {} execute failed, original body: {}, parsing exception", + operation.getUrl(), + category.name().toLowerCase(), + e.getOriginalBody(), e); + throw new OperationExecutionFailException(String.format( + "operation %s category: %s execute failed", + operation.getUrl(), + category.name().toLowerCase()), e); + }); + log.error("operation {} category: {} execute failed, " + + "http code: {}, message: {} , optional reason: {}", + operation.getUrl(), category.name().toLowerCase(), + response.getStatus(), response.getStatusText(), + response.getBody()); + throw new OperationExecutionFailException(String.format( + "operation %s category: %s http execute failed, http code: %d, message: %s, optional reason: %s", + operation.getUrl(), category.name().toLowerCase(), + response.getStatus(), response.getStatusText(), + response.getBody()), null); + }); + } +} diff --git a/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/process/ProcessFactory.java b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/process/ProcessFactory.java new file mode 100644 index 00000000..7e76fbc5 --- /dev/null +++ b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/process/ProcessFactory.java @@ -0,0 +1,37 @@ +package com.alibaba.datax.plugin.writer.restwriter.process; + +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.writer.restwriter.conf.Operation; +import com.alibaba.datax.plugin.writer.restwriter.conf.Process; + +import lombok.extern.slf4j.Slf4j; + +import static com.alibaba.datax.plugin.writer.restwriter.Key.ADDITIONAL_CONCURRENT; +import static com.alibaba.datax.plugin.writer.restwriter.Key.ADDITIONAL_OPERATIONS; +import static java.util.Objects.nonNull; + +/** + * Created by zhangyongxiang on 2023/10/12 19:28 + **/ +@Slf4j +public class ProcessFactory { + + private final Configuration configuration; + + public ProcessFactory(Configuration configuration) { + this.configuration = configuration; + } + + public Process createProcess(final ProcessCategory category) { + final Process process = new Process(category); + final Configuration conf = this.configuration + .getConfiguration(category.getKey()); + log.info("job configuration key: {}, conf: {}", category, conf); + if (nonNull(conf)) { + process.setConcurrent(conf.getBool(ADDITIONAL_CONCURRENT, false)); + process.setOperations(conf.getListWithJson(ADDITIONAL_OPERATIONS, + Operation.class)); + } + return process; + } +} diff --git a/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/validator/ConfigurationValidator.java b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/validator/ConfigurationValidator.java new file mode 100644 index 00000000..dd9bd1be --- /dev/null +++ b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/validator/ConfigurationValidator.java @@ -0,0 +1,111 @@ +package com.alibaba.datax.plugin.writer.restwriter.validator; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.writer.restwriter.conf.Field; +import com.google.common.collect.Sets; + +import static com.alibaba.datax.plugin.writer.restwriter.Key.BATCH_SIZE; +import static com.alibaba.datax.plugin.writer.restwriter.Key.FIELDS; +import static com.alibaba.datax.plugin.writer.restwriter.Key.HTTP_HEADERS; +import static com.alibaba.datax.plugin.writer.restwriter.Key.HTTP_METHOD; +import static com.alibaba.datax.plugin.writer.restwriter.Key.MAX_RETRIES; +import static com.alibaba.datax.plugin.writer.restwriter.Key.POSTPROCESS; +import static com.alibaba.datax.plugin.writer.restwriter.Key.PREPROCESS; +import static com.alibaba.datax.plugin.writer.restwriter.Key.RATE_PER_TASK; +import static com.alibaba.datax.plugin.writer.restwriter.Key.URL; +import static com.alibaba.datax.plugin.writer.restwriter.RestWriter.DEFAULT_BATCH_SIZE_VALUE; +import static com.alibaba.datax.plugin.writer.restwriter.RestWriter.DEFAULT_MAX_RETRIES_VALUE; +import static com.alibaba.datax.plugin.writer.restwriter.RestWriterErrorCode.BATCH_SIZE_INVALID_EXCEPTION; +import static com.alibaba.datax.plugin.writer.restwriter.RestWriterErrorCode.EMPTY_FIELD_EXCEPTION; +import static com.alibaba.datax.plugin.writer.restwriter.RestWriterErrorCode.FIELDS_INVALID_EXCEPTION; +import static com.alibaba.datax.plugin.writer.restwriter.RestWriterErrorCode.MAX_RETRIES_INVALID_EXCEPTION; +import static com.alibaba.datax.plugin.writer.restwriter.RestWriterErrorCode.RATE_PER_TASK_INVALID_EXCEPTION; +import static java.util.Objects.nonNull; +import static org.apache.commons.collections4.CollectionUtils.isEmpty; +import static org.apache.commons.lang3.StringUtils.isBlank; + +/** + * @author: zhangyongxiang + * @date 2023/8/24 18:32 + **/ +public class ConfigurationValidator + implements ParameterValidator { + + private final ParameterValidator urlValidator; + + private final ParameterValidator methodValidator; + + private final ParameterValidator> headersValidator; + + private final ParameterValidator processValidator; + + public ConfigurationValidator() { + this.urlValidator = new UrlParameterValidator(); + this.methodValidator = new MethodParameterValidator(); + this.headersValidator = new HeadersParameterValidator(); + this.processValidator = new ProcessValidator(this.urlValidator, + this.methodValidator); + } + + @Override + public void validateImmediateValue(final Configuration parameter) { + this.urlValidator.validate(parameter, URL); + this.methodValidator.validate(parameter, HTTP_METHOD); + this.headersValidator.validate(parameter, HTTP_HEADERS); + + final Integer maxRetries = parameter.getInt(MAX_RETRIES, + DEFAULT_MAX_RETRIES_VALUE); + + if (maxRetries <= 0) { + throw DataXException.asDataXException(MAX_RETRIES_INVALID_EXCEPTION, + "maxRetries parameter must be greater than 0"); + } + final Integer batchSize = parameter.getInt(BATCH_SIZE, + DEFAULT_BATCH_SIZE_VALUE); + if (batchSize <= 0) { + throw DataXException.asDataXException(BATCH_SIZE_INVALID_EXCEPTION, + "batchSize parameter must be greater than 0"); + } + final Integer ratePerTask = parameter.getInt(RATE_PER_TASK); + if (nonNull(ratePerTask) && ratePerTask < 0) { + throw DataXException.asDataXException( + RATE_PER_TASK_INVALID_EXCEPTION, + "rate-per-task parameter must be greater than or equals 0"); + } + final List fields = parameter.getListWithJson(FIELDS, + Field.class); + if (isEmpty(fields)) { + throw DataXException.asDataXException(EMPTY_FIELD_EXCEPTION, + "fields parameter must not be empty"); + } + + final Set names = Sets.newHashSet(); + fields.forEach(field -> { + if (isBlank(field.getName())) { + throw DataXException.asDataXException(FIELDS_INVALID_EXCEPTION, + "field name must not be empty or blank"); + } + if (names.contains(field.getName())) { + throw DataXException.asDataXException(FIELDS_INVALID_EXCEPTION, + String.format("field name %s duplicate", + field.getName())); + } else { + names.add(field.getName()); + } + }); + + this.processValidator.validate(parameter, PREPROCESS); + this.processValidator.validate(parameter, POSTPROCESS); + + } + + @Override + public void validate(final Configuration config, final String path) { + validateImmediateValue(config); + } +} diff --git a/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/validator/HeadersParameterValidator.java b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/validator/HeadersParameterValidator.java new file mode 100644 index 00000000..654ed236 --- /dev/null +++ b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/validator/HeadersParameterValidator.java @@ -0,0 +1,22 @@ +package com.alibaba.datax.plugin.writer.restwriter.validator; + +import java.util.Map; + +import com.alibaba.datax.common.util.Configuration; + +/** + * Created by zhangyongxiang on 2023/8/25 8:09 PM + **/ +public class HeadersParameterValidator + implements ParameterValidator> { + + @Override + public void validateImmediateValue(final Map parameter) { + + } + + @Override + public void validate(final Configuration config, final String path) { + validateImmediateValue(config.getMap(path)); + } +} diff --git a/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/validator/MethodParameterValidator.java b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/validator/MethodParameterValidator.java new file mode 100644 index 00000000..13e7f182 --- /dev/null +++ b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/validator/MethodParameterValidator.java @@ -0,0 +1,37 @@ +package com.alibaba.datax.plugin.writer.restwriter.validator; + +import java.util.List; + +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.util.Configuration; +import com.google.common.collect.Lists; + +import static com.alibaba.datax.plugin.writer.restwriter.RestWriterErrorCode.METHOD_INVALID_EXCEPTION; +import static org.apache.commons.lang3.StringUtils.isBlank; + +/** + * @author: zhangyongxiang + * @date 2023/8/24 18:39 + **/ +public class MethodParameterValidator implements ParameterValidator { + + private final List method = Lists.newArrayList("get", "post", "put", + "patch", "delete"); + + @Override + public void validateImmediateValue(final String parameter) { + if (isBlank(parameter)) { + throw DataXException.asDataXException(METHOD_INVALID_EXCEPTION, + "需要填写method参数"); + } + if (!this.method.contains(parameter.toLowerCase())) { + throw DataXException.asDataXException(METHOD_INVALID_EXCEPTION, + "method参数值不合法"); + } + } + + @Override + public void validate(final Configuration config, final String path) { + validateImmediateValue(config.getString(path)); + } +} diff --git a/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/validator/ParameterValidator.java b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/validator/ParameterValidator.java new file mode 100644 index 00000000..54307904 --- /dev/null +++ b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/validator/ParameterValidator.java @@ -0,0 +1,14 @@ +package com.alibaba.datax.plugin.writer.restwriter.validator; + +import com.alibaba.datax.common.util.Configuration; + +/** + * @author: zhangyongxiang + * @date 2023/8/24 18:02 + **/ +public interface ParameterValidator { + + void validateImmediateValue(T parameter); + + void validate(Configuration config, String path); +} diff --git a/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/validator/ProcessValidator.java b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/validator/ProcessValidator.java new file mode 100644 index 00000000..6785ef74 --- /dev/null +++ b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/validator/ProcessValidator.java @@ -0,0 +1,60 @@ +package com.alibaba.datax.plugin.writer.restwriter.validator; + +import java.util.List; + +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.writer.restwriter.conf.Operation; + +import static com.alibaba.datax.plugin.writer.restwriter.Key.ADDITIONAL_CONCURRENT; +import static com.alibaba.datax.plugin.writer.restwriter.Key.ADDITIONAL_OPERATIONS; +import static com.alibaba.datax.plugin.writer.restwriter.RestWriterErrorCode.CONCURRENT_INVALID_EXCEPTION; +import static java.util.Objects.nonNull; +import static org.apache.commons.collections4.ListUtils.emptyIfNull; +import static org.apache.commons.lang3.StringUtils.equalsIgnoreCase; + +/** + * @name: zhangyongxiang + * @author: zhangyongxiang@baidu.com + **/ + +public class ProcessValidator implements ParameterValidator { + + private final ParameterValidator urlValidator; + + private final ParameterValidator methodValidator; + + public ProcessValidator(ParameterValidator urlValidator, + ParameterValidator methodValidator) { + this.urlValidator = urlValidator; + this.methodValidator = methodValidator; + } + + @Override + public void validateImmediateValue(final Configuration parameter) { + if (nonNull(parameter)) { + final String concurrent = parameter + .getString(ADDITIONAL_CONCURRENT); + if (nonNull(concurrent) && !equalsIgnoreCase(concurrent, "true") + && !equalsIgnoreCase(concurrent, "false")) { + throw DataXException.asDataXException( + CONCURRENT_INVALID_EXCEPTION, + String.format( + "parameter concurrent %s is invalid, allow values: true,false", + concurrent)); + } + List operations = parameter + .getListWithJson(ADDITIONAL_OPERATIONS, Operation.class); + emptyIfNull(operations).forEach(operation -> { + this.urlValidator.validateImmediateValue(operation.getUrl()); + this.methodValidator + .validateImmediateValue(operation.getMethod()); + }); + } + } + + @Override + public void validate(final Configuration config, final String path) { + validateImmediateValue(config.getConfiguration(path)); + } +} diff --git a/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/validator/UrlParameterValidator.java b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/validator/UrlParameterValidator.java new file mode 100644 index 00000000..c125eebc --- /dev/null +++ b/restwriter/src/main/java/com/alibaba/datax/plugin/writer/restwriter/validator/UrlParameterValidator.java @@ -0,0 +1,44 @@ +package com.alibaba.datax.plugin.writer.restwriter.validator; + +import java.util.regex.Pattern; + +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.util.Configuration; + +import lombok.extern.slf4j.Slf4j; + +import static com.alibaba.datax.plugin.writer.restwriter.RestWriterErrorCode.URL_INVALID_EXCEPTION; +import static org.apache.commons.lang3.StringUtils.isBlank; + +/** + * @author: zhangyongxiang + * @date 2023/8/24 18:03 + **/ +@Slf4j +public class UrlParameterValidator implements ParameterValidator { + + private static final String URL_EXP = "^(https?|ftp|file)://[-a-zA-Z0-9+&@#/%?=~_|!:,.;]*[-a-zA-Z0-9+&@#/%=~_|]"; + + private final Pattern urlPattern; + + public UrlParameterValidator() { + this.urlPattern = Pattern.compile(URL_EXP); + } + + @Override + public void validate(final Configuration config, final String path) { + validateImmediateValue(config.getString(path)); + } + + @Override + public void validateImmediateValue(final String parameter) { + if (isBlank(parameter)) { + throw DataXException.asDataXException(URL_INVALID_EXCEPTION, + "需要填写url参数"); + } + if (!urlPattern.matcher(parameter).find()) { + throw DataXException.asDataXException(URL_INVALID_EXCEPTION, + "url参数值不合法"); + } + } +} diff --git a/restwriter/src/main/resources/plugin.json b/restwriter/src/main/resources/plugin.json new file mode 100755 index 00000000..3b96ece7 --- /dev/null +++ b/restwriter/src/main/resources/plugin.json @@ -0,0 +1,6 @@ +{ + "name": "restwriter", + "class": "com.alibaba.datax.plugin.writer.restwriter.RestWriter", + "description": "useScene: prod. mechanism: use datax framework to transport data to http sink endpoint. warn: The more you know about the data, the less problems you encounter.", + "developer": "zhangyongxiang" +} \ No newline at end of file diff --git a/restwriter/src/main/resources/plugin_job_template.json b/restwriter/src/main/resources/plugin_job_template.json new file mode 100644 index 00000000..ae74eed4 --- /dev/null +++ b/restwriter/src/main/resources/plugin_job_template.json @@ -0,0 +1,83 @@ +{ + "name": "restwriter", + "parameter": { + "url": "http://localhost:8080/echo", + "method": "post", + "headers": { + "aaa": "bbbb" + }, + "query": { + "test": "test" + }, + "maxRetries": 3, + "batch": true, + "batchSize": 1000, + "fields": [ + { + "name": "a" + }, + { + "name": "b" + }, + { + "name": "c", + "type": "java.time.LocalDatetime" + }, + { + "name": "d" + }, + { + "name": "e" + } + ], + "debug": true, + "failFast": false, + "rate-per-task": 10, + "preprocess": { + "concurrent": true, + "operations": [ + { + "url": "http://localhost:8080/echo", + "method": "post", + "headers": { + "aaa": "bbbb" + }, + "body": "", + "debug": false + }, + { + "url": "http://localhost:8080/echo", + "method": "post", + "headers": { + "aaa": "bbbb" + }, + "body": "", + "debug": false + } + ] + }, + "postprocess": { + "concurrent": true, + "operations": [ + { + "url": "http://localhost:8080/echo", + "method": "post", + "headers": { + "aaa": "bbbb" + }, + "body": "", + "debug": false + }, + { + "url": "http://localhost:8080/echo", + "method": "post", + "headers": { + "aaa": "bbbb" + }, + "body": "", + "debug": false + } + ] + } + } +} \ No newline at end of file