Merge pull request #5 from alibaba/master

同步master
This commit is contained in:
Mr-KIDBK 2020-12-11 17:17:15 +08:00 committed by GitHub
commit 96120c721e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 355 additions and 103 deletions

View File

@ -1,11 +1,14 @@
![Datax-logo](https://github.com/alibaba/DataX/blob/master/images/DataX-logo.jpg) ![Datax-logo](https://github.com/alibaba/DataX/blob/master/images/DataX-logo.jpg)
# DataX # DataX
DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能。 DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能。
# DataX 商业版本
阿里云DataWorks数据集成是DataX团队在阿里云上的商业化产品致力于提供复杂网络环境下、丰富的异构数据源之间高速稳定的数据移动能力以及繁杂业务背景下的数据同步解决方案。目前已经支持云上近3000家客户单日同步数据超过3万亿条。DataWorks数据集成目前支持离线50+种数据源可以进行整库迁移、批量上云、增量同步、分库分表等各类同步解决方案。2020年更新实时同步能力2020年更新实时同步能力支持10+种数据源的读写任意组合。提供MySQLOracle等多种数据源到阿里云MaxComputeHologres等大数据引擎的一键全增量同步解决方案。
https://www.aliyun.com/product/bigdata/ide
# Features # Features

Binary file not shown.

After

Width:  |  Height:  |  Size: 40 KiB

View File

@ -0,0 +1,143 @@
# datax-kudu-plugins
datax kudu的writer插件
eg:
```json
{
"name": "kuduwriter",
"parameter": {
"kuduConfig": {
"kudu.master_addresses": "***",
"timeout": 60000,
"sessionTimeout": 60000
},
"table": "",
"replicaCount": 3,
"truncate": false,
"writeMode": "upsert",
"partition": {
"range": {
"column1": [
{
"lower": "2020-08-25",
"upper": "2020-08-26"
},
{
"lower": "2020-08-26",
"upper": "2020-08-27"
},
{
"lower": "2020-08-27",
"upper": "2020-08-28"
}
]
},
"hash": {
"column": [
"column1"
],
"number": 3
}
},
"column": [
{
"index": 0,
"name": "c1",
"type": "string",
"primaryKey": true
},
{
"index": 1,
"name": "c2",
"type": "string",
"compress": "DEFAULT_COMPRESSION",
"encoding": "AUTO_ENCODING",
"comment": "注解xxxx"
}
],
"batchSize": 1024,
"bufferSize": 2048,
"skipFail": false,
"encoding": "UTF-8"
}
}
```
必须参数:
```json
"writer": {
"name": "kuduwriter",
"parameter": {
"kuduConfig": {
"kudu.master_addresses": "***"
},
"table": "***",
"column": [
{
"name": "c1",
"type": "string",
"primaryKey": true
},
{
"name": "c2",
"type": "string",
},
{
"name": "c3",
"type": "string"
},
{
"name": "c4",
"type": "string"
}
]
}
}
```
主键列请写到最前面
![image-20200901193148188](./image-20200901193148188.png)
##### 配置列表
| name | default | description | 是否必须 |
| -------------- | ------------------- | ------------------------------------------------------------ | -------- |
| kuduConfig | | kudu配置 kudu.master_addresses等 | 是 |
| table | | 导入目标表名 | 是 |
| partition | | 分区 | 否 |
| column | | 列 | 是 |
| name | | 列名 | 是 |
| type | string | 列的类型现支持INT, FLOAT, STRING, BIGINT, DOUBLE, BOOLEAN, LONG。 | 否 |
| index | 升序排列 | 列索引位置(要么全部列都写,要么都不写)如reader中取到的某一字段在第二位置eg name id age但kudu目标表结构不同egidname age此时就需要将index赋值为102默认顺序012 | 否 |
| primaryKey | false | 是否为主键(请将所有的主键列写在前面),不表明主键将不会检查过滤脏数据 | 否 |
| compress | DEFAULT_COMPRESSION | 压缩格式 | 否 |
| encoding | AUTO_ENCODING | 编码 | 否 |
| replicaCount | 3 | 保留副本个数 | 否 |
| hash | | hash分区 | 否 |
| number | 3 | hash分区个数 | 否 |
| range | | range分区 | 否 |
| lower | | range分区下限 (eg: sql建表partition value='haha' 对应“lower”“haha”“upper”“haha\000”) | 否 |
| upper | | range分区上限(eg: sql建表partition "10" <= VALUES < "20" 对应“lower”“10”“upper”“20”) | 否 |
| truncate | false | 是否清空表,本质上是删表重建 | 否 |
| writeMode | upsert | upsertinsertupdate | 否 |
| batchSize | 512 | 每xx行数据flush一次结果最好不要超过1024 | 否 |
| bufferSize | 3072 | 缓冲区大小 | 否 |
| skipFail | false | 是否跳过插入不成功的数据 | 否 |
| timeout | 60000 | client超时时间,如创建表删除表操作的超时时间。单位ms | 否 |
| sessionTimeout | 60000 | session超时时间 单位ms | 否 |

View File

@ -14,21 +14,21 @@
<include>plugin.json</include> <include>plugin.json</include>
<include>plugin_job_template.json</include> <include>plugin_job_template.json</include>
</includes> </includes>
<outputDirectory>plugin/writer/kudu11xwriter</outputDirectory> <outputDirectory>plugin/writer/kuduwriter</outputDirectory>
</fileSet> </fileSet>
<fileSet> <fileSet>
<directory>target/</directory> <directory>target/</directory>
<includes> <includes>
<include>kudu11xwriter-0.0.1-SNAPSHOT.jar</include> <include>kuduwriter-0.0.1-SNAPSHOT.jar</include>
</includes> </includes>
<outputDirectory>plugin/writer/kudu11xwriter</outputDirectory> <outputDirectory>plugin/writer/kuduwriter</outputDirectory>
</fileSet> </fileSet>
</fileSets> </fileSets>
<dependencySets> <dependencySets>
<dependencySet> <dependencySet>
<useProjectArtifact>false</useProjectArtifact> <useProjectArtifact>false</useProjectArtifact>
<outputDirectory>plugin/writer/kudu11xwriter/libs</outputDirectory> <outputDirectory>plugin/writer/kuduwriter/libs</outputDirectory>
<scope>runtime</scope> <scope>runtime</scope>
</dependencySet> </dependencySet>
</dependencySets> </dependencySets>

View File

@ -10,11 +10,17 @@ import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema; import org.apache.kudu.Schema;
import org.apache.kudu.Type; import org.apache.kudu.Type;
import org.apache.kudu.client.*; import org.apache.kudu.client.*;
import org.apache.kudu.shaded.org.checkerframework.checker.units.qual.K;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import sun.rmi.runtime.Log;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.util.*; import java.util.*;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
/** /**
@ -47,10 +53,10 @@ public class Kudu11xHelper {
Map<String, Object> conf = Kudu11xHelper.getKuduConfiguration(kuduConfig); Map<String, Object> conf = Kudu11xHelper.getKuduConfiguration(kuduConfig);
KuduClient kuduClient = null; KuduClient kuduClient = null;
try { try {
String masterAddress = (String)conf.get(Key.KUDU_MASTER); String masterAddress = (String) conf.get(Key.KUDU_MASTER);
kuduClient = new KuduClient.KuduClientBuilder(masterAddress) kuduClient = new KuduClient.KuduClientBuilder(masterAddress)
.defaultAdminOperationTimeoutMs((Long) conf.get(Key.KUDU_ADMIN_TIMEOUT)) .defaultAdminOperationTimeoutMs((Long) conf.get(Key.KUDU_ADMIN_TIMEOUT))
.defaultOperationTimeoutMs((Long)conf.get(Key.KUDU_SESSION_TIMEOUT)) .defaultOperationTimeoutMs((Long) conf.get(Key.KUDU_SESSION_TIMEOUT))
.build(); .build();
} catch (Exception e) { } catch (Exception e) {
throw DataXException.asDataXException(Kudu11xWriterErrorcode.GET_KUDU_CONNECTION_ERROR, e); throw DataXException.asDataXException(Kudu11xWriterErrorcode.GET_KUDU_CONNECTION_ERROR, e);
@ -105,20 +111,20 @@ public class Kudu11xHelper {
} catch (Exception e) { } catch (Exception e) {
throw DataXException.asDataXException(Kudu11xWriterErrorcode.GREATE_KUDU_TABLE_ERROR, e); throw DataXException.asDataXException(Kudu11xWriterErrorcode.GREATE_KUDU_TABLE_ERROR, e);
} finally { } finally {
AtomicInteger i = new AtomicInteger(5); AtomicInteger i = new AtomicInteger(10);
while (i.get()>0) { while (i.get() > 0) {
try { try {
if (kuduClient.isCreateTableDone(tableName)){ if (kuduClient.isCreateTableDone(tableName)) {
Kudu11xHelper.closeClient(kuduClient); Kudu11xHelper.closeClient(kuduClient);
LOG.info("Table "+ tableName +" is created!"); LOG.info("Table " + tableName + " is created!");
break; break;
} }
i.decrementAndGet(); i.decrementAndGet();
LOG.error("timeout!"); LOG.error("timeout!");
} catch (KuduException e) { } catch (KuduException e) {
LOG.info("Wait for the table to be created..... "+i); LOG.info("Wait for the table to be created..... " + i);
try { try {
Thread.sleep(1000L); Thread.sleep(100L);
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
ex.printStackTrace(); ex.printStackTrace();
} }
@ -135,6 +141,44 @@ public class Kudu11xHelper {
} }
} }
public static ThreadPoolExecutor createRowAddThreadPool(int coreSize) {
return new ThreadPoolExecutor(coreSize,
coreSize,
60L,
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactory() {
private final ThreadGroup group = System.getSecurityManager() == null ? Thread.currentThread().getThreadGroup() : System.getSecurityManager().getThreadGroup();
private final AtomicInteger threadNumber = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
"pool-kudu_rows_add-thread-" + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}, new ThreadPoolExecutor.CallerRunsPolicy());
}
public static List<List<Configuration>> getColumnLists(List<Configuration> columns) {
int quota = 8;
int num = (columns.size() - 1) / quota + 1;
int gap = columns.size() / num;
List<List<Configuration>> columnLists = new ArrayList<>(num);
for (int j = 0; j < num - 1; j++) {
List<Configuration> destList = new ArrayList<>(columns.subList(j * gap, (j + 1) * gap));
columnLists.add(destList);
}
List<Configuration> destList = new ArrayList<>(columns.subList(gap * (num - 1), columns.size()));
columnLists.add(destList);
return columnLists;
}
public static boolean isTableExists(Configuration configuration) { public static boolean isTableExists(Configuration configuration) {
String tableName = configuration.getString(Key.TABLE); String tableName = configuration.getString(Key.TABLE);
String kuduConfig = configuration.getString(Key.KUDU_CONFIG); String kuduConfig = configuration.getString(Key.KUDU_CONFIG);
@ -154,7 +198,7 @@ public class Kudu11xHelper {
kuduClient.close(); kuduClient.close();
} }
} catch (KuduException e) { } catch (KuduException e) {
LOG.warn("kudu client is not gracefully closed !"); LOG.warn("The \"kudu client\" was not stopped gracefully. !");
} }
@ -172,8 +216,8 @@ public class Kudu11xHelper {
String type = "BIGINT".equals(column.getNecessaryValue(Key.TYPE, Kudu11xWriterErrorcode.REQUIRED_VALUE).toUpperCase()) || String type = "BIGINT".equals(column.getNecessaryValue(Key.TYPE, Kudu11xWriterErrorcode.REQUIRED_VALUE).toUpperCase()) ||
"LONG".equals(column.getNecessaryValue(Key.TYPE, Kudu11xWriterErrorcode.REQUIRED_VALUE).toUpperCase()) ? "LONG".equals(column.getNecessaryValue(Key.TYPE, Kudu11xWriterErrorcode.REQUIRED_VALUE).toUpperCase()) ?
"INT64" : "INT".equals(column.getNecessaryValue(Key.TYPE, Kudu11xWriterErrorcode.REQUIRED_VALUE).toUpperCase())? "INT64" : "INT".equals(column.getNecessaryValue(Key.TYPE, Kudu11xWriterErrorcode.REQUIRED_VALUE).toUpperCase()) ?
"INT32":column.getNecessaryValue(Key.TYPE, Kudu11xWriterErrorcode.REQUIRED_VALUE).toUpperCase(); "INT32" : column.getNecessaryValue(Key.TYPE, Kudu11xWriterErrorcode.REQUIRED_VALUE).toUpperCase();
String name = column.getNecessaryValue(Key.NAME, Kudu11xWriterErrorcode.REQUIRED_VALUE); String name = column.getNecessaryValue(Key.NAME, Kudu11xWriterErrorcode.REQUIRED_VALUE);
Boolean key = column.getBool(Key.PRIMARYKEY, false); Boolean key = column.getBool(Key.PRIMARYKEY, false);
String encoding = column.getString(Key.ENCODING, Constant.ENCODING).toUpperCase(); String encoding = column.getString(Key.ENCODING, Constant.ENCODING).toUpperCase();
@ -194,9 +238,9 @@ public class Kudu11xHelper {
return schema; return schema;
} }
public static Integer getPrimaryKeyIndexUntil(List<Configuration> columns){ public static Integer getPrimaryKeyIndexUntil(List<Configuration> columns) {
int i = 0; int i = 0;
while ( i < columns.size() ) { while (i < columns.size()) {
Configuration col = columns.get(i); Configuration col = columns.get(i);
if (!col.getBool(Key.PRIMARYKEY, false)) { if (!col.getBool(Key.PRIMARYKEY, false)) {
break; break;
@ -244,6 +288,7 @@ public class Kudu11xHelper {
} }
public static void validateParameter(Configuration configuration) { public static void validateParameter(Configuration configuration) {
LOG.info("Start validating parameters");
configuration.getNecessaryValue(Key.KUDU_CONFIG, Kudu11xWriterErrorcode.REQUIRED_VALUE); configuration.getNecessaryValue(Key.KUDU_CONFIG, Kudu11xWriterErrorcode.REQUIRED_VALUE);
configuration.getNecessaryValue(Key.TABLE, Kudu11xWriterErrorcode.REQUIRED_VALUE); configuration.getNecessaryValue(Key.TABLE, Kudu11xWriterErrorcode.REQUIRED_VALUE);
String encoding = configuration.getString(Key.ENCODING, Constant.DEFAULT_ENCODING); String encoding = configuration.getString(Key.ENCODING, Constant.DEFAULT_ENCODING);
@ -268,7 +313,39 @@ public class Kudu11xHelper {
Boolean isSkipFail = configuration.getBool(Key.SKIP_FAIL, false); Boolean isSkipFail = configuration.getBool(Key.SKIP_FAIL, false);
configuration.set(Key.SKIP_FAIL, isSkipFail); configuration.set(Key.SKIP_FAIL, isSkipFail);
LOG.info("==validate parameter complete"); List<Configuration> columns = configuration.getListConfiguration(Key.COLUMN);
List<Configuration> goalColumns = new ArrayList<>();
//column参数验证
int indexFlag = 0;
boolean primaryKey = true;
int primaryKeyFlag = 0;
for (int i = 0; i < columns.size(); i++) {
Configuration col = columns.get(i);
String index = col.getString(Key.INDEX);
if (index == null) {
index = String.valueOf(i);
col.set(Key.INDEX, index);
indexFlag++;
}
if(primaryKey != col.getBool(Key.PRIMARYKEY, false)){
primaryKey = col.getBool(Key.PRIMARYKEY, false);
primaryKeyFlag++;
}
goalColumns.add(col);
}
if (indexFlag != 0 && indexFlag != columns.size()) {
throw DataXException.asDataXException(Kudu11xWriterErrorcode.ILLEGAL_VALUE,
"\"index\" either has values for all of them, or all of them are null!");
}
if (primaryKeyFlag > 1){
throw DataXException.asDataXException(Kudu11xWriterErrorcode.ILLEGAL_VALUE,
"\"primaryKey\" must be written in the front");
}
configuration.set(Key.COLUMN, goalColumns);
// LOG.info("------------------------------------");
// LOG.info(configuration.toString());
// LOG.info("------------------------------------");
LOG.info("validate parameter complete");
} }
public static void truncateTable(Configuration configuration) { public static void truncateTable(Configuration configuration) {

View File

@ -38,7 +38,7 @@ public class Kudu11xWriter extends Writer {
@Override @Override
public List<Configuration> split(int i) { public List<Configuration> split(int i) {
List<Configuration> splitResultConfigs = new ArrayList<Configuration>(); List<Configuration> splitResultConfigs = new ArrayList<>();
for (int j = 0; j < i; j++) { for (int j = 0; j < i; j++) {
splitResultConfigs.add(config.clone()); splitResultConfigs.add(config.clone());
} }
@ -76,7 +76,7 @@ public class Kudu11xWriter extends Writer {
kuduTaskProxy.session.close(); kuduTaskProxy.session.close();
} }
}catch (Exception e){ }catch (Exception e){
LOG.warn("kudu session is not gracefully closed !"); LOG.warn("The \"kudu session\" was not stopped gracefully !");
} }
Kudu11xHelper.closeClient(kuduTaskProxy.kuduClient); Kudu11xHelper.closeClient(kuduTaskProxy.kuduClient);

View File

@ -12,12 +12,13 @@ import org.apache.kudu.client.*;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.*;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
/** /**
* @author daizihao * @author daizihao
@ -26,28 +27,30 @@ import java.util.concurrent.atomic.AtomicLong;
public class KuduWriterTask { public class KuduWriterTask {
private final static Logger LOG = LoggerFactory.getLogger(KuduWriterTask.class); private final static Logger LOG = LoggerFactory.getLogger(KuduWriterTask.class);
public List<Configuration> columns; private List<Configuration> columns;
public String encoding; private List<List<Configuration>> columnLists;
public String insertMode; private ThreadPoolExecutor pool;
public Double batchSize; private String encoding;
public long mutationBufferSpace; private Double batchSize;
public Boolean isUpsert; private Boolean isUpsert;
public Boolean isSkipFail; private Boolean isSkipFail;
public KuduClient kuduClient; public KuduClient kuduClient;
public KuduTable table;
public KuduSession session; public KuduSession session;
private KuduTable table;
private Integer primaryKeyIndexUntil; private Integer primaryKeyIndexUntil;
private final Object lock = new Object();
public KuduWriterTask(Configuration configuration) { public KuduWriterTask(Configuration configuration) {
this.columns = configuration.getListConfiguration(Key.COLUMN); columns = configuration.getListConfiguration(Key.COLUMN);
columnLists = Kudu11xHelper.getColumnLists(columns);
pool = Kudu11xHelper.createRowAddThreadPool(columnLists.size());
this.encoding = configuration.getString(Key.ENCODING); this.encoding = configuration.getString(Key.ENCODING);
this.insertMode = configuration.getString(Key.INSERT_MODE);
this.batchSize = configuration.getDouble(Key.WRITE_BATCH_SIZE); this.batchSize = configuration.getDouble(Key.WRITE_BATCH_SIZE);
this.mutationBufferSpace = configuration.getLong(Key.MUTATION_BUFFER_SPACE); this.isUpsert = !configuration.getString(Key.INSERT_MODE).equalsIgnoreCase("insert");
this.isUpsert = !configuration.getString(Key.INSERT_MODE).equals("insert");
this.isSkipFail = configuration.getBool(Key.SKIP_FAIL); this.isSkipFail = configuration.getBool(Key.SKIP_FAIL);
long mutationBufferSpace = configuration.getLong(Key.MUTATION_BUFFER_SPACE);
this.kuduClient = Kudu11xHelper.getKuduClient(configuration.getString(Key.KUDU_CONFIG)); this.kuduClient = Kudu11xHelper.getKuduClient(configuration.getString(Key.KUDU_CONFIG));
this.table = Kudu11xHelper.getKuduTable(configuration, kuduClient); this.table = Kudu11xHelper.getKuduTable(configuration, kuduClient);
@ -59,9 +62,9 @@ public class KuduWriterTask {
} }
public void startWriter(RecordReceiver lineReceiver, TaskPluginCollector taskPluginCollector) { public void startWriter(RecordReceiver lineReceiver, TaskPluginCollector taskPluginCollector) {
LOG.info("==kuduwriter began to write!"); LOG.info("kuduwriter began to write!");
Record record; Record record;
AtomicLong counter = new AtomicLong(0L); LongAdder counter = new LongAdder();
try { try {
while ((record = lineReceiver.getFromReader()) != null) { while ((record = lineReceiver.getFromReader()) != null) {
if (record.getColumnNumber() != columns.size()) { if (record.getColumnNumber() != columns.size()) {
@ -70,7 +73,7 @@ public class KuduWriterTask {
boolean isDirtyRecord = false; boolean isDirtyRecord = false;
for (int i = 0; i <= primaryKeyIndexUntil && !isDirtyRecord; i++) { for (int i = 0; i < primaryKeyIndexUntil && !isDirtyRecord; i++) {
Column column = record.getColumn(i); Column column = record.getColumn(i);
isDirtyRecord = StringUtils.isBlank(column.asString()); isDirtyRecord = StringUtils.isBlank(column.asString());
} }
@ -80,51 +83,79 @@ public class KuduWriterTask {
continue; continue;
} }
CountDownLatch countDownLatch = new CountDownLatch(columnLists.size());
Upsert upsert = table.newUpsert(); Upsert upsert = table.newUpsert();
Insert insert = table.newInsert(); Insert insert = table.newInsert();
PartialRow row;
for (int i = 0; i < columns.size(); i++) { if (isUpsert) {
PartialRow row; //覆盖更新
if (isUpsert) { row = upsert.getRow();
//覆盖更新 } else {
row = upsert.getRow(); //增量更新
} else { row = insert.getRow();
//增量更新 }
row = insert.getRow(); List<Future<?>> futures = new ArrayList<>();
} for (List<Configuration> columnList : columnLists) {
Configuration col = columns.get(i); Record finalRecord = record;
String name = col.getString(Key.NAME); Future<?> future = pool.submit(() -> {
ColumnType type = ColumnType.getByTypeName(col.getString(Key.TYPE)); try {
Column column = record.getColumn(col.getInt(Key.INDEX, i)); for (Configuration col : columnList) {
Object rawData = column.getRawData(); String name = col.getString(Key.NAME);
if (rawData == null) { ColumnType type = ColumnType.getByTypeName(col.getString(Key.TYPE, "string"));
row.setNull(name); Column column = finalRecord.getColumn(col.getInt(Key.INDEX));
continue; String rawData = column.asString();
} if (rawData == null) {
switch (type) { synchronized (lock) {
case INT: row.setNull(name);
row.addInt(name, Integer.parseInt(rawData.toString())); }
break; continue;
case LONG: }
case BIGINT: switch (type) {
row.addLong(name, Long.parseLong(rawData.toString())); case INT:
break; synchronized (lock) {
case FLOAT: row.addInt(name, Integer.parseInt(rawData));
row.addFloat(name, Float.parseFloat(rawData.toString())); }
break; break;
case DOUBLE: case LONG:
row.addDouble(name, Double.parseDouble(rawData.toString())); case BIGINT:
break; synchronized (lock) {
case BOOLEAN: row.addLong(name, Long.parseLong(rawData));
row.addBoolean(name, Boolean.getBoolean(rawData.toString())); }
break; break;
case STRING: case FLOAT:
default: synchronized (lock) {
row.addString(name, rawData.toString()); row.addFloat(name, Float.parseFloat(rawData));
} }
break;
case DOUBLE:
synchronized (lock) {
row.addDouble(name, Double.parseDouble(rawData));
}
break;
case BOOLEAN:
synchronized (lock) {
row.addBoolean(name, Boolean.getBoolean(rawData));
}
break;
case STRING:
default:
synchronized (lock) {
row.addString(name, rawData);
}
}
}
} finally {
countDownLatch.countDown();
}
});
futures.add(future);
}
countDownLatch.await();
for (Future<?> future : futures) {
future.get();
} }
try { try {
RetryUtil.executeWithRetry(()->{ RetryUtil.executeWithRetry(() -> {
if (isUpsert) { if (isUpsert) {
//覆盖更新 //覆盖更新
session.apply(upsert); session.apply(upsert);
@ -132,26 +163,27 @@ public class KuduWriterTask {
//增量更新 //增量更新
session.apply(insert); session.apply(insert);
} }
//提前写数据阈值可自定义 //flush
if (counter.incrementAndGet() > batchSize * 0.75) { if (counter.longValue() > (batchSize * 0.8)) {
session.flush(); session.flush();
counter.set(0L); counter.reset();
} }
counter.increment();
return true; return true;
},5,1000L,true); }, 5, 500L, true);
} catch (Exception e) { } catch (Exception e) {
LOG.error("Data write failed!", e); LOG.error("Record Write Failure!", e);
if (isSkipFail) { if (isSkipFail) {
LOG.warn("Because you have configured skipFail is true,this data will be skipped!"); LOG.warn("Since you have configured \"skipFail\" to be true, this record will be skipped !");
taskPluginCollector.collectDirtyRecord(record, e.getMessage()); taskPluginCollector.collectDirtyRecord(record, e.getMessage());
}else { } else {
throw e; throw DataXException.asDataXException(Kudu11xWriterErrorcode.PUT_KUDU_ERROR, e.getMessage());
} }
} }
} }
} catch (Exception e) { } catch (Exception e) {
LOG.error("write failed! the task will exit!"); LOG.error("write failure! the task will exit!");
throw DataXException.asDataXException(Kudu11xWriterErrorcode.PUT_KUDU_ERROR, e.getMessage()); throw DataXException.asDataXException(Kudu11xWriterErrorcode.PUT_KUDU_ERROR, e.getMessage());
} }
AtomicInteger i = new AtomicInteger(10); AtomicInteger i = new AtomicInteger(10);
@ -161,23 +193,20 @@ public class KuduWriterTask {
session.flush(); session.flush();
break; break;
} }
Thread.sleep(1000L); Thread.sleep(20L);
i.decrementAndGet(); i.decrementAndGet();
} }
} catch (Exception e) { } catch (Exception e) {
LOG.info("Waiting for data to be inserted...... " + i + "s"); LOG.info("Waiting for data to be written to kudu...... " + i + "s");
try {
Thread.sleep(1000L);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
i.decrementAndGet();
} finally { } finally {
try { try {
pool.shutdown();
//强制刷写
session.flush(); session.flush();
} catch (KuduException e) { } catch (KuduException e) {
LOG.error("==kuduwriter flush error! the results may not be complete"); LOG.error("kuduwriter flush error! The results may be incomplete");
e.printStackTrace(); throw DataXException.asDataXException(Kudu11xWriterErrorcode.PUT_KUDU_ERROR, e.getMessage());
} }
} }

View File

@ -1,5 +1,5 @@
{ {
"name": "kudu11xwriter", "name": "kuduwriter",
"class": "com.q1.datax.plugin.writer.kudu11xwriter.Kudu11xWriter", "class": "com.q1.datax.plugin.writer.kudu11xwriter.Kudu11xWriter",
"description": "use put: prod. mechanism: use kudu java api put data.", "description": "use put: prod. mechanism: use kudu java api put data.",
"developer": "com.q1.daizihao" "developer": "com.q1.daizihao"

View File

@ -1,5 +1,5 @@
{ {
"name": "kudu11xwriter", "name": "kuduwriter",
"parameter": { "parameter": {
"kuduConfig": { "kuduConfig": {
"kudu.master_addresses": "***", "kudu.master_addresses": "***",