envProp = System.getenv();
- accessId = envProp.get(IdAndKeyRollingUtil.SKYNET_ACCESSID);
- String accessKey = null;
- if (StringUtils.isBlank(accessKey)) {
- // 老的没有出异常,只是获取不到ak
- accessKey = IdAndKeyRollingUtil.parseAkFromSkynetAccessKey();
- }
-
- if (StringUtils.isNotBlank(accessKey)) {
- // 确认使用这个的都是 accessId、accessKey的命名习惯
- originalConfig.set(IdAndKeyRollingUtil.ACCESS_ID, accessId);
- originalConfig.set(IdAndKeyRollingUtil.ACCESS_KEY, accessKey);
- }
- return accessKey;
- }
-}
diff --git a/core/src/main/java/com/alibaba/datax/core/Engine.java b/core/src/main/java/com/alibaba/datax/core/Engine.java
index 38342532..4ba9fc18 100755
--- a/core/src/main/java/com/alibaba/datax/core/Engine.java
+++ b/core/src/main/java/com/alibaba/datax/core/Engine.java
@@ -79,16 +79,9 @@ public class Engine {
perfReportEnable = false;
}
- int priority = 0;
- try {
- priority = Integer.parseInt(System.getenv("SKYNET_PRIORITY"));
- }catch (NumberFormatException e){
- LOG.warn("prioriy set to 0, because NumberFormatException, the value is: "+System.getProperty("PROIORY"));
- }
-
Configuration jobInfoConfig = allConf.getConfiguration(CoreConstant.DATAX_JOB_JOBINFO);
//初始化PerfTrace
- PerfTrace perfTrace = PerfTrace.getInstance(isJob, instanceId, taskGroupId, priority, traceEnable);
+ PerfTrace perfTrace = PerfTrace.getInstance(isJob, instanceId, taskGroupId, traceEnable);
perfTrace.setJobInfo(jobInfoConfig,perfReportEnable,channelNumber);
container.start();
diff --git a/core/src/main/java/com/alibaba/datax/core/container/util/JobAssignUtil.java b/core/src/main/java/com/alibaba/datax/core/container/util/JobAssignUtil.java
index 31ba60a4..cbd0d2a1 100755
--- a/core/src/main/java/com/alibaba/datax/core/container/util/JobAssignUtil.java
+++ b/core/src/main/java/com/alibaba/datax/core/container/util/JobAssignUtil.java
@@ -114,7 +114,7 @@ public final class JobAssignUtil {
* 需要实现的效果通过例子来说是:
*
* a 库上有表:0, 1, 2
- * a 库上有表:3, 4
+ * b 库上有表:3, 4
* c 库上有表:5, 6, 7
*
* 如果有 4个 taskGroup
diff --git a/core/src/main/java/com/alibaba/datax/core/job/JobContainer.java b/core/src/main/java/com/alibaba/datax/core/job/JobContainer.java
index 26b2989f..49f5a0a1 100755
--- a/core/src/main/java/com/alibaba/datax/core/job/JobContainer.java
+++ b/core/src/main/java/com/alibaba/datax/core/job/JobContainer.java
@@ -27,7 +27,7 @@ import com.alibaba.datax.core.util.container.ClassLoaderSwapper;
import com.alibaba.datax.core.util.container.CoreConstant;
import com.alibaba.datax.core.util.container.LoadUtil;
import com.alibaba.datax.dataxservice.face.domain.enums.ExecuteMode;
-import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson2.JSON;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.Validate;
import org.slf4j.Logger;
diff --git a/core/src/main/java/com/alibaba/datax/core/statistics/communication/CommunicationTool.java b/core/src/main/java/com/alibaba/datax/core/statistics/communication/CommunicationTool.java
index 51a601ae..1815ea02 100755
--- a/core/src/main/java/com/alibaba/datax/core/statistics/communication/CommunicationTool.java
+++ b/core/src/main/java/com/alibaba/datax/core/statistics/communication/CommunicationTool.java
@@ -2,7 +2,7 @@ package com.alibaba.datax.core.statistics.communication;
import com.alibaba.datax.common.statistics.PerfTrace;
import com.alibaba.datax.common.util.StrUtil;
-import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson2.JSON;
import org.apache.commons.lang.Validate;
import java.text.DecimalFormat;
diff --git a/core/src/main/java/com/alibaba/datax/core/statistics/plugin/task/StdoutPluginCollector.java b/core/src/main/java/com/alibaba/datax/core/statistics/plugin/task/StdoutPluginCollector.java
index 8b2a8378..d88ad0a8 100755
--- a/core/src/main/java/com/alibaba/datax/core/statistics/plugin/task/StdoutPluginCollector.java
+++ b/core/src/main/java/com/alibaba/datax/core/statistics/plugin/task/StdoutPluginCollector.java
@@ -6,7 +6,7 @@ import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.core.statistics.communication.Communication;
import com.alibaba.datax.core.util.container.CoreConstant;
import com.alibaba.datax.core.statistics.plugin.task.util.DirtyRecord;
-import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson2.JSON;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
diff --git a/core/src/main/java/com/alibaba/datax/core/statistics/plugin/task/util/DirtyRecord.java b/core/src/main/java/com/alibaba/datax/core/statistics/plugin/task/util/DirtyRecord.java
index 1b0d5238..caa4cb5b 100755
--- a/core/src/main/java/com/alibaba/datax/core/statistics/plugin/task/util/DirtyRecord.java
+++ b/core/src/main/java/com/alibaba/datax/core/statistics/plugin/task/util/DirtyRecord.java
@@ -4,7 +4,7 @@ import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.core.util.FrameworkErrorCode;
-import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson2.JSON;
import java.math.BigDecimal;
import java.math.BigInteger;
diff --git a/core/src/main/java/com/alibaba/datax/core/taskgroup/TaskGroupContainer.java b/core/src/main/java/com/alibaba/datax/core/taskgroup/TaskGroupContainer.java
index c30c94d9..b4b45695 100755
--- a/core/src/main/java/com/alibaba/datax/core/taskgroup/TaskGroupContainer.java
+++ b/core/src/main/java/com/alibaba/datax/core/taskgroup/TaskGroupContainer.java
@@ -27,7 +27,7 @@ import com.alibaba.datax.core.util.TransformerUtil;
import com.alibaba.datax.core.util.container.CoreConstant;
import com.alibaba.datax.core.util.container.LoadUtil;
import com.alibaba.datax.dataxservice.face.domain.enums.State;
-import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson2.JSON;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/core/src/main/java/com/alibaba/datax/core/transport/record/DefaultRecord.java b/core/src/main/java/com/alibaba/datax/core/transport/record/DefaultRecord.java
index c78a2a87..1dfa02e8 100755
--- a/core/src/main/java/com/alibaba/datax/core/transport/record/DefaultRecord.java
+++ b/core/src/main/java/com/alibaba/datax/core/transport/record/DefaultRecord.java
@@ -5,7 +5,7 @@ import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.core.util.ClassSize;
import com.alibaba.datax.core.util.FrameworkErrorCode;
-import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson2.JSON;
import java.util.ArrayList;
import java.util.HashMap;
diff --git a/databendwriter/doc/databendwriter-CN.md b/databendwriter/doc/databendwriter-CN.md
new file mode 100644
index 00000000..d6a8f1f3
--- /dev/null
+++ b/databendwriter/doc/databendwriter-CN.md
@@ -0,0 +1,171 @@
+# DataX DatabendWriter
+[简体中文](./databendwriter-CN.md) | [English](./databendwriter.md)
+
+## 1 快速介绍
+
+Databend Writer 是一个 DataX 的插件,用于从 DataX 中写入数据到 Databend 表中。
+该插件基于[databend JDBC driver](https://github.com/databendcloud/databend-jdbc) ,它使用 [RESTful http protocol](https://databend.rs/doc/integrations/api/rest)
+在开源的 databend 和 [databend cloud](https://app.databend.com/) 上执行查询。
+
+在每个写入批次中,databend writer 将批量数据上传到内部的 S3 stage,然后执行相应的 insert SQL 将数据上传到 databend 表中。
+
+为了最佳的用户体验,如果您使用的是 databend 社区版本,您应该尝试采用 [S3](https://aws.amazon.com/s3/)/[minio](https://min.io/)/[OSS](https://www.alibabacloud.com/product/object-storage-service) 作为其底层存储层,因为
+它们支持预签名上传操作,否则您可能会在数据传输上浪费不必要的成本。
+
+您可以在[文档](https://databend.rs/doc/deploy/deploying-databend)中了解更多详细信息
+
+## 2 实现原理
+
+Databend Writer 将使用 DataX 从 DataX Reader 中获取生成的记录,并将记录批量插入到 databend 表中指定的列中。
+
+## 3 功能说明
+
+### 3.1 配置样例
+
+* 以下配置将从内存中读取一些生成的数据,并将数据上传到databend表中
+
+#### 准备工作
+```sql
+--- create table in databend
+drop table if exists datax.sample1;
+drop database if exists datax;
+create database if not exists datax;
+create table if not exsits datax.sample1(a string, b int64, c date, d timestamp, e bool, f string, g variant);
+```
+
+#### 配置样例
+```json
+{
+ "job": {
+ "content": [
+ {
+ "reader": {
+ "name": "streamreader",
+ "parameter": {
+ "column" : [
+ {
+ "value": "DataX",
+ "type": "string"
+ },
+ {
+ "value": 19880808,
+ "type": "long"
+ },
+ {
+ "value": "1926-08-08 08:08:08",
+ "type": "date"
+ },
+ {
+ "value": "1988-08-08 08:08:08",
+ "type": "date"
+ },
+ {
+ "value": true,
+ "type": "bool"
+ },
+ {
+ "value": "test",
+ "type": "bytes"
+ },
+ {
+ "value": "{\"type\": \"variant\", \"value\": \"test\"}",
+ "type": "string"
+ }
+
+ ],
+ "sliceRecordCount": 10000
+ }
+ },
+ "writer": {
+ "name": "databendwriter",
+ "parameter": {
+ "username": "databend",
+ "password": "databend",
+ "column": ["a", "b", "c", "d", "e", "f", "g"],
+ "batchSize": 1000,
+ "preSql": [
+ ],
+ "postSql": [
+ ],
+ "connection": [
+ {
+ "jdbcUrl": "jdbc:databend://localhost:8000/datax",
+ "table": [
+ "sample1"
+ ]
+ }
+ ]
+ }
+ }
+ }
+ ],
+ "setting": {
+ "speed": {
+ "channel": 1
+ }
+ }
+ }
+}
+```
+
+### 3.2 参数说明
+* jdbcUrl
+ * 描述: JDBC 数据源 url。请参阅仓库中的详细[文档](https://github.com/databendcloud/databend-jdbc)
+ * 必选: 是
+ * 默认值: 无
+ * 示例: jdbc:databend://localhost:8000/datax
+* username
+ * 描述: JDBC 数据源用户名
+ * 必选: 是
+ * 默认值: 无
+ * 示例: databend
+* password
+ * 描述: JDBC 数据源密码
+ * 必选: 是
+ * 默认值: 无
+ * 示例: databend
+* table
+ * 描述: 表名的集合,table应该包含column参数中的所有列。
+ * 必选: 是
+ * 默认值: 无
+ * 示例: ["sample1"]
+* column
+ * 描述: 表中的列名集合,字段顺序应该与reader的record中的column类型对应
+ * 必选: 是
+ * 默认值: 无
+ * 示例: ["a", "b", "c", "d", "e", "f", "g"]
+* batchSize
+ * 描述: 每个批次的记录数
+ * 必选: 否
+ * 默认值: 1000
+ * 示例: 1000
+* preSql
+ * 描述: 在写入数据之前执行的SQL语句
+ * 必选: 否
+ * 默认值: 无
+ * 示例: ["delete from datax.sample1"]
+* postSql
+ * 描述: 在写入数据之后执行的SQL语句
+ * 必选: 否
+ * 默认值: 无
+ * 示例: ["select count(*) from datax.sample1"]
+
+### 3.3 类型转化
+DataX中的数据类型可以转换为databend中的相应数据类型。下表显示了两种类型之间的对应关系。
+
+| DataX 内部类型 | Databend 数据类型 |
+|------------|-----------------------------------------------------------|
+| INT | TINYINT, INT8, SMALLINT, INT16, INT, INT32, BIGINT, INT64 |
+| LONG | TINYINT, INT8, SMALLINT, INT16, INT, INT32, BIGINT, INT64 |
+| STRING | STRING, VARCHAR |
+| DOUBLE | FLOAT, DOUBLE |
+| BOOL | BOOLEAN, BOOL |
+| DATE | DATE, TIMESTAMP |
+| BYTES | STRING, VARCHAR |
+
+## 4 性能测试
+
+## 5 约束限制
+目前,复杂数据类型支持不稳定,如果您想使用复杂数据类型,例如元组,数组,请检查databend和jdbc驱动程序的进一步版本。
+
+## FAQ
\ No newline at end of file
diff --git a/databendwriter/doc/databendwriter.md b/databendwriter/doc/databendwriter.md
new file mode 100644
index 00000000..0b57bf13
--- /dev/null
+++ b/databendwriter/doc/databendwriter.md
@@ -0,0 +1,166 @@
+# DataX DatabendWriter
+[简体中文](./databendwriter-CN.md) | [English](./databendwriter.md)
+
+## 1 Introduction
+Databend Writer is a plugin for DataX to write data to Databend Table from dataX records.
+The plugin is based on [databend JDBC driver](https://github.com/databendcloud/databend-jdbc) which use [RESTful http protocol](https://databend.rs/doc/integrations/api/rest)
+to execute query on open source databend and [databend cloud](https://app.databend.com/).
+
+During each write batch, databend writer will upload batch data into internal S3 stage and execute corresponding insert SQL to upload data into databend table.
+
+For best user experience, if you are using databend community distribution, you should try to adopt [S3](https://aws.amazon.com/s3/)/[minio](https://min.io/)/[OSS](https://www.alibabacloud.com/product/object-storage-service) as its underlying storage layer since
+they support presign upload operation otherwise you may expend unneeded cost on data transfer.
+
+You could see more details on the [doc](https://databend.rs/doc/deploy/deploying-databend)
+
+## 2 Detailed Implementation
+Databend Writer would use DataX to fetch records generated by DataX Reader, and then batch insert records to the designated columns for your databend table.
+
+## 3 Features
+### 3.1 Example Configurations
+* the following configuration would read some generated data in memory and upload data into databend table
+
+#### Preparation
+```sql
+--- create table in databend
+drop table if exists datax.sample1;
+drop database if exists datax;
+create database if not exists datax;
+create table if not exsits datax.sample1(a string, b int64, c date, d timestamp, e bool, f string, g variant);
+```
+
+#### Configurations
+```json
+{
+ "job": {
+ "content": [
+ {
+ "reader": {
+ "name": "streamreader",
+ "parameter": {
+ "column" : [
+ {
+ "value": "DataX",
+ "type": "string"
+ },
+ {
+ "value": 19880808,
+ "type": "long"
+ },
+ {
+ "value": "1926-08-08 08:08:08",
+ "type": "date"
+ },
+ {
+ "value": "1988-08-08 08:08:08",
+ "type": "date"
+ },
+ {
+ "value": true,
+ "type": "bool"
+ },
+ {
+ "value": "test",
+ "type": "bytes"
+ },
+ {
+ "value": "{\"type\": \"variant\", \"value\": \"test\"}",
+ "type": "string"
+ }
+
+ ],
+ "sliceRecordCount": 10000
+ }
+ },
+ "writer": {
+ "name": "databendwriter",
+ "parameter": {
+ "username": "databend",
+ "password": "databend",
+ "column": ["a", "b", "c", "d", "e", "f", "g"],
+ "batchSize": 1000,
+ "preSql": [
+ ],
+ "postSql": [
+ ],
+ "connection": [
+ {
+ "jdbcUrl": "jdbc:databend://localhost:8000/datax",
+ "table": [
+ "sample1"
+ ]
+ }
+ ]
+ }
+ }
+ }
+ ],
+ "setting": {
+ "speed": {
+ "channel": 1
+ }
+ }
+ }
+}
+```
+
+### 3.2 Configuration Description
+* jdbcUrl
+ * Description: JDBC Data source url in Databend. Please take a look at repository for detailed [doc](https://github.com/databendcloud/databend-jdbc)
+ * Required: yes
+ * Default: none
+ * Example: jdbc:databend://localhost:8000/datax
+* username
+ * Description: Databend user name
+ * Required: yes
+ * Default: none
+ * Example: databend
+* password
+ * Description: Databend user password
+ * Required: yes
+ * Default: none
+ * Example: databend
+* table
+ * Description: A list of table names that should contain all of the columns in the column parameter.
+ * Required: yes
+ * Default: none
+ * Example: ["sample1"]
+* column
+ * Description: A list of column field names that should be inserted into the table. if you want to insert all column fields use `["*"]` instead.
+ * Required: yes
+ * Default: none
+ * Example: ["a", "b", "c", "d", "e", "f", "g"]
+* batchSize
+ * Description: The number of records to be inserted in each batch.
+ * Required: no
+ * Default: 1024
+* preSql
+ * Description: A list of SQL statements that will be executed before the write operation.
+ * Required: no
+ * Default: none
+* postSql
+ * Description: A list of SQL statements that will be executed after the write operation.
+ * Required: no
+ * Default: none
+
+### 3.3 Type Convert
+Data types in datax can be converted to the corresponding data types in databend. The following table shows the correspondence between the two types.
+
+| DataX Type | Databend Type |
+|------------|-----------------------------------------------------------|
+| INT | TINYINT, INT8, SMALLINT, INT16, INT, INT32, BIGINT, INT64 |
+| LONG | TINYINT, INT8, SMALLINT, INT16, INT, INT32, BIGINT, INT64 |
+| STRING | STRING, VARCHAR |
+| DOUBLE | FLOAT, DOUBLE |
+| BOOL | BOOLEAN, BOOL |
+| DATE | DATE, TIMESTAMP |
+| BYTES | STRING, VARCHAR |
+
+
+## 4 Performance Test
+
+
+## 5 Restrictions
+Currently, complex data type support is not stable, if you want to use complex data type such as tuple, array, please check further release version of databend and jdbc driver.
+
+## FAQ
diff --git a/databendwriter/pom.xml b/databendwriter/pom.xml
new file mode 100644
index 00000000..976ecd6a
--- /dev/null
+++ b/databendwriter/pom.xml
@@ -0,0 +1,101 @@
+
+
+
+ datax-all
+ com.alibaba.datax
+ 0.0.1-SNAPSHOT
+
+
+ 4.0.0
+ databendwriter
+ databendwriter
+ jar
+
+
+
+ com.databend
+ databend-jdbc
+ 0.0.5
+
+
+ com.alibaba.datax
+ datax-core
+ ${datax-project-version}
+
+
+ com.alibaba.datax
+ datax-common
+ ${datax-project-version}
+
+
+ org.slf4j
+ slf4j-api
+
+
+
+ ch.qos.logback
+ logback-classic
+
+
+
+ com.alibaba.datax
+ plugin-rdbms-util
+ ${datax-project-version}
+
+
+ com.google.guava
+ guava
+
+
+
+
+
+
+ junit
+ junit
+ test
+
+
+
+
+
+ src/main/java
+
+ **/*.properties
+
+
+
+
+
+
+ maven-compiler-plugin
+
+ ${jdk-version}
+ ${jdk-version}
+ ${project-sourceEncoding}
+
+
+
+
+ maven-assembly-plugin
+
+
+ src/main/assembly/package.xml
+
+ datax
+
+
+
+ dwzip
+ package
+
+ single
+
+
+
+
+
+
+
diff --git a/databendwriter/src/main/assembly/package.xml b/databendwriter/src/main/assembly/package.xml
new file mode 100755
index 00000000..8a9ba1b2
--- /dev/null
+++ b/databendwriter/src/main/assembly/package.xml
@@ -0,0 +1,34 @@
+
+
+
+ dir
+
+ false
+
+
+ src/main/resources
+
+ plugin.json
+ plugin_job_template.json
+
+ plugin/writer/databendwriter
+
+
+ target/
+
+ databendwriter-0.0.1-SNAPSHOT.jar
+
+ plugin/writer/databendwriter
+
+
+
+
+
+ false
+ plugin/writer/databendwriter/libs
+
+
+
diff --git a/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/DatabendWriter.java b/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/DatabendWriter.java
new file mode 100644
index 00000000..a4222f08
--- /dev/null
+++ b/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/DatabendWriter.java
@@ -0,0 +1,248 @@
+package com.alibaba.datax.plugin.writer.databendwriter;
+
+import com.alibaba.datax.common.element.Column;
+import com.alibaba.datax.common.element.StringColumn;
+import com.alibaba.datax.common.exception.CommonErrorCode;
+import com.alibaba.datax.common.exception.DataXException;
+import com.alibaba.datax.common.plugin.RecordReceiver;
+import com.alibaba.datax.common.spi.Writer;
+import com.alibaba.datax.common.util.Configuration;
+import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
+import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter;
+import com.alibaba.datax.plugin.writer.databendwriter.util.DatabendWriterUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.*;
+import java.util.List;
+import java.util.regex.Pattern;
+
+public class DatabendWriter extends Writer
+{
+ private static final DataBaseType DATABASE_TYPE = DataBaseType.Databend;
+
+ public static class Job
+ extends Writer.Job
+ {
+ private static final Logger LOG = LoggerFactory.getLogger(Job.class);
+ private Configuration originalConfig;
+ private CommonRdbmsWriter.Job commonRdbmsWriterMaster;
+
+ @Override
+ public void init()
+ {
+ this.originalConfig = super.getPluginJobConf();
+ this.commonRdbmsWriterMaster = new CommonRdbmsWriter.Job(DATABASE_TYPE);
+ this.commonRdbmsWriterMaster.init(this.originalConfig);
+ // placeholder currently not supported by databend driver, needs special treatment
+ DatabendWriterUtil.dealWriteMode(this.originalConfig);
+ }
+
+ @Override
+ public void preCheck()
+ {
+ this.init();
+ this.commonRdbmsWriterMaster.writerPreCheck(this.originalConfig, DATABASE_TYPE);
+ }
+
+ @Override
+ public void prepare() {
+ this.commonRdbmsWriterMaster.prepare(this.originalConfig);
+ }
+
+ @Override
+ public List split(int mandatoryNumber) {
+ return this.commonRdbmsWriterMaster.split(this.originalConfig, mandatoryNumber);
+ }
+
+ @Override
+ public void post() {
+ this.commonRdbmsWriterMaster.post(this.originalConfig);
+ }
+
+ @Override
+ public void destroy() {
+ this.commonRdbmsWriterMaster.destroy(this.originalConfig);
+ }
+ }
+
+
+ public static class Task extends Writer.Task
+ {
+ private static final Logger LOG = LoggerFactory.getLogger(Task.class);
+
+ private Configuration writerSliceConfig;
+
+ private CommonRdbmsWriter.Task commonRdbmsWriterSlave;
+
+ @Override
+ public void init()
+ {
+ this.writerSliceConfig = super.getPluginJobConf();
+
+ this.commonRdbmsWriterSlave = new CommonRdbmsWriter.Task(DataBaseType.Databend){
+ @Override
+ protected PreparedStatement fillPreparedStatementColumnType(PreparedStatement preparedStatement, int columnIndex, int columnSqltype, String typeName, Column column) throws SQLException {
+ try {
+ if (column.getRawData() == null) {
+ preparedStatement.setNull(columnIndex + 1, columnSqltype);
+ return preparedStatement;
+ }
+
+ java.util.Date utilDate;
+ switch (columnSqltype) {
+
+ case Types.TINYINT:
+ case Types.SMALLINT:
+ case Types.INTEGER:
+ preparedStatement.setInt(columnIndex + 1, column.asBigInteger().intValue());
+ break;
+ case Types.BIGINT:
+ preparedStatement.setLong(columnIndex + 1, column.asLong());
+ break;
+ case Types.DECIMAL:
+ preparedStatement.setBigDecimal(columnIndex + 1, column.asBigDecimal());
+ break;
+ case Types.FLOAT:
+ case Types.REAL:
+ preparedStatement.setFloat(columnIndex + 1, column.asDouble().floatValue());
+ break;
+ case Types.DOUBLE:
+ preparedStatement.setDouble(columnIndex + 1, column.asDouble());
+ break;
+ case Types.DATE:
+ java.sql.Date sqlDate = null;
+ try {
+ utilDate = column.asDate();
+ } catch (DataXException e) {
+ throw new SQLException(String.format(
+ "Date type conversion error: [%s]", column));
+ }
+
+ if (null != utilDate) {
+ sqlDate = new java.sql.Date(utilDate.getTime());
+ }
+ preparedStatement.setDate(columnIndex + 1, sqlDate);
+ break;
+
+ case Types.TIME:
+ java.sql.Time sqlTime = null;
+ try {
+ utilDate = column.asDate();
+ } catch (DataXException e) {
+ throw new SQLException(String.format(
+ "Date type conversion error: [%s]", column));
+ }
+
+ if (null != utilDate) {
+ sqlTime = new java.sql.Time(utilDate.getTime());
+ }
+ preparedStatement.setTime(columnIndex + 1, sqlTime);
+ break;
+
+ case Types.TIMESTAMP:
+ Timestamp sqlTimestamp = null;
+ if (column instanceof StringColumn && column.asString() != null) {
+ String timeStampStr = column.asString();
+ // JAVA TIMESTAMP 类型入参必须是 "2017-07-12 14:39:00.123566" 格式
+ String pattern = "^\\d+-\\d+-\\d+ \\d+:\\d+:\\d+.\\d+";
+ boolean isMatch = Pattern.matches(pattern, timeStampStr);
+ if (isMatch) {
+ sqlTimestamp = Timestamp.valueOf(timeStampStr);
+ preparedStatement.setTimestamp(columnIndex + 1, sqlTimestamp);
+ break;
+ }
+ }
+ try {
+ utilDate = column.asDate();
+ } catch (DataXException e) {
+ throw new SQLException(String.format(
+ "Date type conversion error: [%s]", column));
+ }
+
+ if (null != utilDate) {
+ sqlTimestamp = new Timestamp(
+ utilDate.getTime());
+ }
+ preparedStatement.setTimestamp(columnIndex + 1, sqlTimestamp);
+ break;
+
+ case Types.BINARY:
+ case Types.VARBINARY:
+ case Types.BLOB:
+ case Types.LONGVARBINARY:
+ preparedStatement.setBytes(columnIndex + 1, column
+ .asBytes());
+ break;
+
+ case Types.BOOLEAN:
+
+ // warn: bit(1) -> Types.BIT 可使用setBoolean
+ // warn: bit(>1) -> Types.VARBINARY 可使用setBytes
+ case Types.BIT:
+ if (this.dataBaseType == DataBaseType.MySql) {
+ Boolean asBoolean = column.asBoolean();
+ if (asBoolean != null) {
+ preparedStatement.setBoolean(columnIndex + 1, asBoolean);
+ } else {
+ preparedStatement.setNull(columnIndex + 1, Types.BIT);
+ }
+ } else {
+ preparedStatement.setString(columnIndex + 1, column.asString());
+ }
+ break;
+
+ default:
+ // cast variant / array into string is fine.
+ preparedStatement.setString(columnIndex + 1, column.asString());
+ break;
+ }
+ return preparedStatement;
+ } catch (DataXException e) {
+ // fix类型转换或者溢出失败时,将具体哪一列打印出来
+ if (e.getErrorCode() == CommonErrorCode.CONVERT_NOT_SUPPORT ||
+ e.getErrorCode() == CommonErrorCode.CONVERT_OVER_FLOW) {
+ throw DataXException
+ .asDataXException(
+ e.getErrorCode(),
+ String.format(
+ "type conversion error. columnName: [%s], columnType:[%d], columnJavaType: [%s]. please change the data type in given column field or do not sync on the column.",
+ this.resultSetMetaData.getLeft()
+ .get(columnIndex),
+ this.resultSetMetaData.getMiddle()
+ .get(columnIndex),
+ this.resultSetMetaData.getRight()
+ .get(columnIndex)));
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ };
+ this.commonRdbmsWriterSlave.init(this.writerSliceConfig);
+ }
+
+ @Override
+ public void destroy()
+ {
+ this.commonRdbmsWriterSlave.destroy(this.writerSliceConfig);
+ }
+
+ @Override
+ public void prepare() {
+ this.commonRdbmsWriterSlave.prepare(this.writerSliceConfig);
+ }
+
+ @Override
+ public void post() {
+ this.commonRdbmsWriterSlave.post(this.writerSliceConfig);
+ }
+ @Override
+ public void startWrite(RecordReceiver lineReceiver)
+ {
+ this.commonRdbmsWriterSlave.startWrite(lineReceiver, this.writerSliceConfig, this.getTaskPluginCollector());
+ }
+
+ }
+}
diff --git a/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/util/DatabendWriterUtil.java b/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/util/DatabendWriterUtil.java
new file mode 100644
index 00000000..a862e920
--- /dev/null
+++ b/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/util/DatabendWriterUtil.java
@@ -0,0 +1,40 @@
+package com.alibaba.datax.plugin.writer.databendwriter.util;
+import com.alibaba.datax.common.util.Configuration;
+import com.alibaba.datax.plugin.rdbms.writer.Constant;
+import com.alibaba.datax.plugin.rdbms.writer.Key;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.StringJoiner;
+
+public final class DatabendWriterUtil
+{
+ private static final Logger LOG = LoggerFactory.getLogger(DatabendWriterUtil.class);
+
+ private DatabendWriterUtil() {}
+ public static void dealWriteMode(Configuration originalConfig)
+ {
+ List columns = originalConfig.getList(Key.COLUMN, String.class);
+
+ String jdbcUrl = originalConfig.getString(String.format("%s[0].%s",
+ Constant.CONN_MARK, Key.JDBC_URL, String.class));
+
+ String writeMode = originalConfig.getString(Key.WRITE_MODE, "INSERT");
+
+ StringBuilder writeDataSqlTemplate = new StringBuilder();
+ writeDataSqlTemplate.append("INSERT INTO %s");
+ StringJoiner columnString = new StringJoiner(",");
+
+ for (String column : columns) {
+ columnString.add(column);
+ }
+ writeDataSqlTemplate.append(String.format("(%s)", columnString));
+ writeDataSqlTemplate.append(" VALUES");
+
+ LOG.info("Write data [\n{}\n], which jdbcUrl like:[{}]", writeDataSqlTemplate, jdbcUrl);
+
+ originalConfig.set(Constant.INSERT_OR_REPLACE_TEMPLATE_MARK, writeDataSqlTemplate);
+ }
+}
\ No newline at end of file
diff --git a/databendwriter/src/main/resources/plugin.json b/databendwriter/src/main/resources/plugin.json
new file mode 100644
index 00000000..bab0130d
--- /dev/null
+++ b/databendwriter/src/main/resources/plugin.json
@@ -0,0 +1,6 @@
+{
+ "name": "databendwriter",
+ "class": "com.alibaba.datax.plugin.writer.databendwriter.DatabendWriter",
+ "description": "execute batch insert sql to write dataX data into databend",
+ "developer": "databend"
+}
\ No newline at end of file
diff --git a/databendwriter/src/main/resources/plugin_job_template.json b/databendwriter/src/main/resources/plugin_job_template.json
new file mode 100644
index 00000000..34d4b251
--- /dev/null
+++ b/databendwriter/src/main/resources/plugin_job_template.json
@@ -0,0 +1,19 @@
+{
+ "name": "databendwriter",
+ "parameter": {
+ "username": "username",
+ "password": "password",
+ "column": ["col1", "col2", "col3"],
+ "connection": [
+ {
+ "jdbcUrl": "jdbc:databend://:[/]",
+ "table": "table1"
+ }
+ ],
+ "preSql": [],
+ "postSql": [],
+
+ "maxBatchRows": 65536,
+ "maxBatchSize": 134217728
+ }
+}
\ No newline at end of file
diff --git a/datahubreader/src/main/java/com/alibaba/datax/plugin/reader/datahubreader/DatahubClientHelper.java b/datahubreader/src/main/java/com/alibaba/datax/plugin/reader/datahubreader/DatahubClientHelper.java
index 6f601fb4..2b7bcec4 100644
--- a/datahubreader/src/main/java/com/alibaba/datax/plugin/reader/datahubreader/DatahubClientHelper.java
+++ b/datahubreader/src/main/java/com/alibaba/datax/plugin/reader/datahubreader/DatahubClientHelper.java
@@ -1,8 +1,8 @@
package com.alibaba.datax.plugin.reader.datahubreader;
import com.alibaba.datax.common.util.Configuration;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.TypeReference;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.TypeReference;
import com.aliyun.datahub.client.DatahubClient;
import com.aliyun.datahub.client.DatahubClientBuilder;
import com.aliyun.datahub.client.auth.Account;
diff --git a/datahubwriter/src/main/java/com/alibaba/datax/plugin/writer/datahubwriter/DatahubClientHelper.java b/datahubwriter/src/main/java/com/alibaba/datax/plugin/writer/datahubwriter/DatahubClientHelper.java
index 2d94212c..c25d1210 100644
--- a/datahubwriter/src/main/java/com/alibaba/datax/plugin/writer/datahubwriter/DatahubClientHelper.java
+++ b/datahubwriter/src/main/java/com/alibaba/datax/plugin/writer/datahubwriter/DatahubClientHelper.java
@@ -3,8 +3,8 @@ package com.alibaba.datax.plugin.writer.datahubwriter;
import org.apache.commons.lang3.StringUtils;
import com.alibaba.datax.common.util.Configuration;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.TypeReference;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.TypeReference;
import com.aliyun.datahub.client.DatahubClient;
import com.aliyun.datahub.client.DatahubClientBuilder;
import com.aliyun.datahub.client.auth.Account;
diff --git a/datahubwriter/src/main/java/com/alibaba/datax/plugin/writer/datahubwriter/DatahubWriter.java b/datahubwriter/src/main/java/com/alibaba/datax/plugin/writer/datahubwriter/DatahubWriter.java
index f6dc1105..cd414fc5 100644
--- a/datahubwriter/src/main/java/com/alibaba/datax/plugin/writer/datahubwriter/DatahubWriter.java
+++ b/datahubwriter/src/main/java/com/alibaba/datax/plugin/writer/datahubwriter/DatahubWriter.java
@@ -8,7 +8,7 @@ import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.common.util.DataXCaseEnvUtil;
import com.alibaba.datax.common.util.RetryUtil;
-import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson2.JSON;
import com.aliyun.datahub.client.DatahubClient;
import com.aliyun.datahub.client.model.FieldType;
import com.aliyun.datahub.client.model.GetTopicResult;
diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java
index e6c05733..68abd9eb 100644
--- a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java
+++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java
@@ -1,7 +1,7 @@
package com.alibaba.datax.plugin.writer.doriswriter;
import com.alibaba.datax.common.element.Record;
-import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson2.JSON;
import java.util.HashMap;
import java.util.List;
diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisStreamLoadObserver.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisStreamLoadObserver.java
index efb3d9db..6f7e9a5a 100644
--- a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisStreamLoadObserver.java
+++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisStreamLoadObserver.java
@@ -1,6 +1,6 @@
package com.alibaba.datax.plugin.writer.doriswriter;
-import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson2.JSON;
import org.apache.commons.codec.binary.Base64;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
diff --git a/elasticsearchwriter/src/main/java/com/alibaba/datax/plugin/writer/elasticsearchwriter/ElasticSearchClient.java b/elasticsearchwriter/src/main/java/com/alibaba/datax/plugin/writer/elasticsearchwriter/ElasticSearchClient.java
index 12ac3dd9..08486e1f 100644
--- a/elasticsearchwriter/src/main/java/com/alibaba/datax/plugin/writer/elasticsearchwriter/ElasticSearchClient.java
+++ b/elasticsearchwriter/src/main/java/com/alibaba/datax/plugin/writer/elasticsearchwriter/ElasticSearchClient.java
@@ -5,8 +5,8 @@ import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.writer.elasticsearchwriter.jest.ClusterInfo;
import com.alibaba.datax.plugin.writer.elasticsearchwriter.jest.ClusterInfoResult;
import com.alibaba.datax.plugin.writer.elasticsearchwriter.jest.PutMapping7;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONObject;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
@@ -53,6 +53,8 @@ public class ElasticSearchClient {
public ElasticSearchClient(Configuration conf) {
this.conf = conf;
String endpoint = Key.getEndpoint(conf);
+ //es是支持集群写入的
+ String[] endpoints = endpoint.split(",");
String user = Key.getUsername(conf);
String passwd = Key.getPassword(conf);
boolean multiThread = Key.isMultiThread(conf);
@@ -63,7 +65,7 @@ public class ElasticSearchClient {
int totalConnection = this.conf.getInt("maxTotalConnection", 200);
JestClientFactory factory = new JestClientFactory();
Builder httpClientConfig = new HttpClientConfig
- .Builder(endpoint)
+ .Builder(Arrays.asList(endpoints))
// .setPreemptiveAuth(new HttpHost(endpoint))
.multiThreaded(multiThread)
.connTimeout(readTimeout)
diff --git a/elasticsearchwriter/src/main/java/com/alibaba/datax/plugin/writer/elasticsearchwriter/ElasticSearchWriter.java b/elasticsearchwriter/src/main/java/com/alibaba/datax/plugin/writer/elasticsearchwriter/ElasticSearchWriter.java
index 6236e333..2c8ed2d0 100644
--- a/elasticsearchwriter/src/main/java/com/alibaba/datax/plugin/writer/elasticsearchwriter/ElasticSearchWriter.java
+++ b/elasticsearchwriter/src/main/java/com/alibaba/datax/plugin/writer/elasticsearchwriter/ElasticSearchWriter.java
@@ -9,11 +9,11 @@ import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.common.util.DataXCaseEnvUtil;
import com.alibaba.datax.common.util.RetryUtil;
import com.alibaba.datax.plugin.writer.elasticsearchwriter.Key.ActionType;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONArray;
-import com.alibaba.fastjson.JSONObject;
-import com.alibaba.fastjson.TypeReference;
-import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONArray;
+import com.alibaba.fastjson2.JSONObject;
+import com.alibaba.fastjson2.TypeReference;
+import com.alibaba.fastjson2.JSONWriter;
import com.google.common.base.Joiner;
import io.searchbox.client.JestResult;
import io.searchbox.core.*;
@@ -927,9 +927,8 @@ public class ElasticSearchWriter extends Writer {
Index.Builder builder = null;
if (this.enableWriteNull) {
builder = new Index.Builder(
- JSONObject.toJSONString(data, SerializerFeature.WriteMapNullValue,
- SerializerFeature.QuoteFieldNames, SerializerFeature.SkipTransientField,
- SerializerFeature.WriteEnumUsingToString, SerializerFeature.SortField));
+ JSONObject.toJSONString(data, JSONWriter.Feature.WriteMapNullValue,
+ JSONWriter.Feature.WriteEnumUsingToString));
} else {
builder = new Index.Builder(JSONObject.toJSONString(data));
}
@@ -958,9 +957,8 @@ public class ElasticSearchWriter extends Writer {
if (this.enableWriteNull) {
// write: {a:"1",b:null}
update = new Update.Builder(
- JSONObject.toJSONString(updateDoc, SerializerFeature.WriteMapNullValue,
- SerializerFeature.QuoteFieldNames, SerializerFeature.SkipTransientField,
- SerializerFeature.WriteEnumUsingToString, SerializerFeature.SortField));
+ JSONObject.toJSONString(updateDoc, JSONWriter.Feature.WriteMapNullValue,
+ JSONWriter.Feature.WriteEnumUsingToString));
// 在DEFAULT_GENERATE_FEATURE基础上,只增加了SerializerFeature.WRITE_MAP_NULL_FEATURES
} else {
// write: {"a":"1"}
diff --git a/elasticsearchwriter/src/main/java/com/alibaba/datax/plugin/writer/elasticsearchwriter/JsonPathUtil.java b/elasticsearchwriter/src/main/java/com/alibaba/datax/plugin/writer/elasticsearchwriter/JsonPathUtil.java
index 49703435..e7619e7c 100644
--- a/elasticsearchwriter/src/main/java/com/alibaba/datax/plugin/writer/elasticsearchwriter/JsonPathUtil.java
+++ b/elasticsearchwriter/src/main/java/com/alibaba/datax/plugin/writer/elasticsearchwriter/JsonPathUtil.java
@@ -2,7 +2,7 @@ package com.alibaba.datax.plugin.writer.elasticsearchwriter;
import java.util.List;
-import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson2.JSONObject;
public class JsonPathUtil {
diff --git a/elasticsearchwriter/src/main/java/com/alibaba/datax/plugin/writer/elasticsearchwriter/JsonUtil.java b/elasticsearchwriter/src/main/java/com/alibaba/datax/plugin/writer/elasticsearchwriter/JsonUtil.java
index e73c87be..ad6c01be 100644
--- a/elasticsearchwriter/src/main/java/com/alibaba/datax/plugin/writer/elasticsearchwriter/JsonUtil.java
+++ b/elasticsearchwriter/src/main/java/com/alibaba/datax/plugin/writer/elasticsearchwriter/JsonUtil.java
@@ -1,8 +1,8 @@
package com.alibaba.datax.plugin.writer.elasticsearchwriter;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONException;
-import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONException;
+import com.alibaba.fastjson2.JSONObject;
/**
* @author bozu
diff --git a/elasticsearchwriter/src/main/java/com/alibaba/datax/plugin/writer/elasticsearchwriter/Key.java b/elasticsearchwriter/src/main/java/com/alibaba/datax/plugin/writer/elasticsearchwriter/Key.java
index af197711..fcaac935 100644
--- a/elasticsearchwriter/src/main/java/com/alibaba/datax/plugin/writer/elasticsearchwriter/Key.java
+++ b/elasticsearchwriter/src/main/java/com/alibaba/datax/plugin/writer/elasticsearchwriter/Key.java
@@ -1,8 +1,8 @@
package com.alibaba.datax.plugin.writer.elasticsearchwriter;
import com.alibaba.datax.common.util.Configuration;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.TypeReference;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.TypeReference;
import org.apache.commons.lang3.StringUtils;
diff --git a/ftpwriter/doc/ftpwriter.md b/ftpwriter/doc/ftpwriter.md
index 6b1b2687..a38a1052 100644
--- a/ftpwriter/doc/ftpwriter.md
+++ b/ftpwriter/doc/ftpwriter.md
@@ -24,7 +24,7 @@ FtpWriter实现了从DataX协议转为FTP文件功能,FTP文件本身是无结
我们不能做到:
-1. 单个文件不能支持并发写入。
+1. 单个文件并发写入。
## 3 功能说明
diff --git a/ftpwriter/src/main/java/com/alibaba/datax/plugin/writer/ftpwriter/util/SftpHelperImpl.java b/ftpwriter/src/main/java/com/alibaba/datax/plugin/writer/ftpwriter/util/SftpHelperImpl.java
index e6d78629..e748f12c 100644
--- a/ftpwriter/src/main/java/com/alibaba/datax/plugin/writer/ftpwriter/util/SftpHelperImpl.java
+++ b/ftpwriter/src/main/java/com/alibaba/datax/plugin/writer/ftpwriter/util/SftpHelperImpl.java
@@ -14,8 +14,8 @@ import org.slf4j.LoggerFactory;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.plugin.writer.ftpwriter.FtpWriterErrorCode;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONWriter;
import com.jcraft.jsch.ChannelSftp;
import com.jcraft.jsch.JSch;
import com.jcraft.jsch.JSchException;
@@ -251,7 +251,7 @@ public class SftpHelperImpl implements IFtpHelper {
@SuppressWarnings("rawtypes")
Vector allFiles = this.channelSftp.ls(dir);
LOG.debug(String.format("ls: %s", JSON.toJSONString(allFiles,
- SerializerFeature.UseSingleQuotes)));
+ JSONWriter.Feature.UseSingleQuotes)));
for (int i = 0; i < allFiles.size(); i++) {
LsEntry le = (LsEntry) allFiles.get(i);
String strName = le.getFilename();
diff --git a/ftpwriter/src/main/java/com/alibaba/datax/plugin/writer/ftpwriter/util/StandardFtpHelperImpl.java b/ftpwriter/src/main/java/com/alibaba/datax/plugin/writer/ftpwriter/util/StandardFtpHelperImpl.java
index 8999b0a8..d5b9a746 100644
--- a/ftpwriter/src/main/java/com/alibaba/datax/plugin/writer/ftpwriter/util/StandardFtpHelperImpl.java
+++ b/ftpwriter/src/main/java/com/alibaba/datax/plugin/writer/ftpwriter/util/StandardFtpHelperImpl.java
@@ -18,8 +18,8 @@ import org.slf4j.LoggerFactory;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.plugin.writer.ftpwriter.FtpWriterErrorCode;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONWriter;
public class StandardFtpHelperImpl implements IFtpHelper {
private static final Logger LOG = LoggerFactory
@@ -244,7 +244,7 @@ public class StandardFtpHelperImpl implements IFtpHelper {
FTPFile[] fs = this.ftpClient.listFiles(dir);
// LOG.debug(JSON.toJSONString(this.ftpClient.listNames(dir)));
LOG.debug(String.format("ls: %s",
- JSON.toJSONString(fs, SerializerFeature.UseSingleQuotes)));
+ JSON.toJSONString(fs, JSONWriter.Feature.UseSingleQuotes)));
for (FTPFile ff : fs) {
String strName = ff.getName();
if (strName.startsWith(prefixFileName)) {
diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/DefaultGdbMapper.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/DefaultGdbMapper.java
index 73a94cf5..2c015879 100644
--- a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/DefaultGdbMapper.java
+++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/DefaultGdbMapper.java
@@ -19,8 +19,8 @@ import com.alibaba.datax.plugin.writer.gdbwriter.Key;
import com.alibaba.datax.plugin.writer.gdbwriter.model.GdbEdge;
import com.alibaba.datax.plugin.writer.gdbwriter.model.GdbElement;
import com.alibaba.datax.plugin.writer.gdbwriter.model.GdbVertex;
-import com.alibaba.fastjson.JSONArray;
-import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson2.JSONArray;
+import com.alibaba.fastjson2.JSONObject;
import lombok.extern.slf4j.Slf4j;
diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/util/ConfigHelper.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/util/ConfigHelper.java
index 178b5e7c..644f8898 100644
--- a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/util/ConfigHelper.java
+++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/util/ConfigHelper.java
@@ -12,8 +12,8 @@ import org.apache.commons.lang3.StringUtils;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.writer.gdbwriter.GdbWriterErrorCode;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONObject;
/**
* @author jerrywang
diff --git a/hbase094xreader/src/main/java/com/alibaba/datax/plugin/reader/hbase094xreader/Hbase094xHelper.java b/hbase094xreader/src/main/java/com/alibaba/datax/plugin/reader/hbase094xreader/Hbase094xHelper.java
index c3e2a212..b9f16b17 100644
--- a/hbase094xreader/src/main/java/com/alibaba/datax/plugin/reader/hbase094xreader/Hbase094xHelper.java
+++ b/hbase094xreader/src/main/java/com/alibaba/datax/plugin/reader/hbase094xreader/Hbase094xHelper.java
@@ -2,8 +2,8 @@ package com.alibaba.datax.plugin.reader.hbase094xreader;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.TypeReference;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.TypeReference;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import org.apache.hadoop.fs.Path;
diff --git a/hbase094xwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase094xwriter/Hbase094xHelper.java b/hbase094xwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase094xwriter/Hbase094xHelper.java
index f671d31d..00b128f3 100644
--- a/hbase094xwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase094xwriter/Hbase094xHelper.java
+++ b/hbase094xwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase094xwriter/Hbase094xHelper.java
@@ -2,8 +2,8 @@ package com.alibaba.datax.plugin.writer.hbase094xwriter;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.TypeReference;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.TypeReference;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import org.apache.hadoop.fs.Path;
diff --git a/hbase11xreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xreader/Hbase11xHelper.java b/hbase11xreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xreader/Hbase11xHelper.java
index 643072a9..82ad7122 100644
--- a/hbase11xreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xreader/Hbase11xHelper.java
+++ b/hbase11xreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xreader/Hbase11xHelper.java
@@ -2,8 +2,8 @@ package com.alibaba.datax.plugin.reader.hbase11xreader;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.TypeReference;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.TypeReference;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import org.apache.hadoop.hbase.HBaseConfiguration;
diff --git a/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLHelper.java b/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLHelper.java
index 5309d1d9..71665a6b 100644
--- a/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLHelper.java
+++ b/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLHelper.java
@@ -2,8 +2,8 @@ package com.alibaba.datax.plugin.reader.hbase11xsqlreader;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.TypeReference;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.TypeReference;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.mapreduce.InputSplit;
diff --git a/hbase11xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase11xsqlwriter/HbaseSQLHelper.java b/hbase11xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase11xsqlwriter/HbaseSQLHelper.java
index 41e57d4e..d1b23fdf 100644
--- a/hbase11xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase11xsqlwriter/HbaseSQLHelper.java
+++ b/hbase11xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase11xsqlwriter/HbaseSQLHelper.java
@@ -2,8 +2,8 @@ package com.alibaba.datax.plugin.writer.hbase11xsqlwriter;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.TypeReference;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.TypeReference;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.util.Pair;
diff --git a/hbase11xwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase11xwriter/Hbase11xHelper.java b/hbase11xwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase11xwriter/Hbase11xHelper.java
index 94b13b60..2889b647 100644
--- a/hbase11xwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase11xwriter/Hbase11xHelper.java
+++ b/hbase11xwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase11xwriter/Hbase11xHelper.java
@@ -2,8 +2,8 @@ package com.alibaba.datax.plugin.writer.hbase11xwriter;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.TypeReference;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.TypeReference;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import org.apache.hadoop.hbase.HBaseConfiguration;
diff --git a/hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/DFSUtil.java b/hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/DFSUtil.java
index c39d3847..5ba572e1 100644
--- a/hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/DFSUtil.java
+++ b/hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/DFSUtil.java
@@ -8,8 +8,8 @@ import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.unstructuredstorage.reader.ColumnEntry;
import com.alibaba.datax.plugin.unstructuredstorage.reader.UnstructuredStorageReaderErrorCode;
import com.alibaba.datax.plugin.unstructuredstorage.reader.UnstructuredStorageReaderUtil;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -331,26 +331,30 @@ public class DFSUtil {
//If the network disconnected, will retry 45 times, each time the retry interval for 20 seconds
//Each file as a split
//TODO multy threads
- InputSplit[] splits = in.getSplits(conf, 1);
+ // OrcInputFormat getSplits params numSplits not used, splits size = block numbers
+ InputSplit[] splits = in.getSplits(conf, -1);
+ for (InputSplit split : splits) {
+ {
+ RecordReader reader = in.getRecordReader(split, conf, Reporter.NULL);
+ Object key = reader.createKey();
+ Object value = reader.createValue();
+ // 获取列信息
+ List extends StructField> fields = inspector.getAllStructFieldRefs();
- RecordReader reader = in.getRecordReader(splits[0], conf, Reporter.NULL);
- Object key = reader.createKey();
- Object value = reader.createValue();
- // 获取列信息
- List extends StructField> fields = inspector.getAllStructFieldRefs();
+ List