mirror of
https://github.com/alibaba/DataX.git
synced 2025-05-03 01:49:34 +08:00
bug fix and write speed optimization
This commit is contained in:
parent
3fd9c0a79f
commit
80860c224d
BIN
kuduwriter/doc/image-20200901193148188.png
Normal file
BIN
kuduwriter/doc/image-20200901193148188.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 40 KiB |
143
kuduwriter/doc/kuduwirter.md
Normal file
143
kuduwriter/doc/kuduwirter.md
Normal file
@ -0,0 +1,143 @@
|
|||||||
|
# datax-kudu-plugins
|
||||||
|
datax kudu的writer插件
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
eg:
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"name": "kuduwriter",
|
||||||
|
"parameter": {
|
||||||
|
"kuduConfig": {
|
||||||
|
"kudu.master_addresses": "***",
|
||||||
|
"timeout": 60000,
|
||||||
|
"sessionTimeout": 60000
|
||||||
|
|
||||||
|
},
|
||||||
|
"table": "",
|
||||||
|
"replicaCount": 3,
|
||||||
|
"truncate": false,
|
||||||
|
"writeMode": "upsert",
|
||||||
|
"partition": {
|
||||||
|
"range": {
|
||||||
|
"column1": [
|
||||||
|
{
|
||||||
|
"lower": "2020-08-25",
|
||||||
|
"upper": "2020-08-26"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"lower": "2020-08-26",
|
||||||
|
"upper": "2020-08-27"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"lower": "2020-08-27",
|
||||||
|
"upper": "2020-08-28"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"hash": {
|
||||||
|
"column": [
|
||||||
|
"column1"
|
||||||
|
],
|
||||||
|
"number": 3
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"column": [
|
||||||
|
{
|
||||||
|
"index": 0,
|
||||||
|
"name": "c1",
|
||||||
|
"type": "string",
|
||||||
|
"primaryKey": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"index": 1,
|
||||||
|
"name": "c2",
|
||||||
|
"type": "string",
|
||||||
|
"compress": "DEFAULT_COMPRESSION",
|
||||||
|
"encoding": "AUTO_ENCODING",
|
||||||
|
"comment": "注解xxxx"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"batchSize": 1024,
|
||||||
|
"bufferSize": 2048,
|
||||||
|
"skipFail": false,
|
||||||
|
"encoding": "UTF-8"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
必须参数:
|
||||||
|
|
||||||
|
```json
|
||||||
|
"writer": {
|
||||||
|
"name": "kuduwriter",
|
||||||
|
"parameter": {
|
||||||
|
"kuduConfig": {
|
||||||
|
"kudu.master_addresses": "***"
|
||||||
|
},
|
||||||
|
"table": "***",
|
||||||
|
"column": [
|
||||||
|
{
|
||||||
|
"name": "c1",
|
||||||
|
"type": "string",
|
||||||
|
"primaryKey": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "c2",
|
||||||
|
"type": "string",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "c3",
|
||||||
|
"type": "string"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "c4",
|
||||||
|
"type": "string"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
主键列请写到最前面
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|

|
||||||
|
|
||||||
|
##### 配置列表
|
||||||
|
|
||||||
|
| name | default | description | 是否必须 |
|
||||||
|
| -------------- | ------------------- | ------------------------------------------------------------ | -------- |
|
||||||
|
| kuduConfig | | kudu配置 (kudu.master_addresses等) | 是 |
|
||||||
|
| table | | 导入目标表名 | 是 |
|
||||||
|
| partition | | 分区 | 否 |
|
||||||
|
| column | | 列 | 是 |
|
||||||
|
| name | | 列名 | 是 |
|
||||||
|
| type | string | 列的类型,现支持INT, FLOAT, STRING, BIGINT, DOUBLE, BOOLEAN, LONG。 | 否 |
|
||||||
|
| index | 升序排列 | 列索引位置(要么全部列都写,要么都不写),如reader中取到的某一字段在第二位置(eg: name, id, age)但kudu目标表结构不同(eg:id,name, age),此时就需要将index赋值为(1,0,2),默认顺序(0,1,2) | 否 |
|
||||||
|
| primaryKey | false | 是否为主键(请将所有的主键列写在前面),不表明主键将不会检查过滤脏数据 | 否 |
|
||||||
|
| compress | DEFAULT_COMPRESSION | 压缩格式 | 否 |
|
||||||
|
| encoding | AUTO_ENCODING | 编码 | 否 |
|
||||||
|
| replicaCount | 3 | 保留副本个数 | 否 |
|
||||||
|
| hash | | hash分区 | 否 |
|
||||||
|
| number | 3 | hash分区个数 | 否 |
|
||||||
|
| range | | range分区 | 否 |
|
||||||
|
| lower | | range分区下限 (eg: sql建表:partition value='haha' 对应:“lower”:“haha”,“upper”:“haha\000”) | 否 |
|
||||||
|
| upper | | range分区上限(eg: sql建表:partition "10" <= VALUES < "20" 对应:“lower”:“10”,“upper”:“20”) | 否 |
|
||||||
|
| truncate | false | 是否清空表,本质上是删表重建 | 否 |
|
||||||
|
| writeMode | upsert | upsert,insert,update | 否 |
|
||||||
|
| batchSize | 512 | 每xx行数据flush一次结果(最好不要超过1024) | 否 |
|
||||||
|
| bufferSize | 3072 | 缓冲区大小 | 否 |
|
||||||
|
| skipFail | false | 是否跳过插入不成功的数据 | 否 |
|
||||||
|
| timeout | 60000 | client超时时间,如创建表,删除表操作的超时时间。单位:ms | 否 |
|
||||||
|
| sessionTimeout | 60000 | session超时时间 单位:ms | 否 |
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -14,21 +14,21 @@
|
|||||||
<include>plugin.json</include>
|
<include>plugin.json</include>
|
||||||
<include>plugin_job_template.json</include>
|
<include>plugin_job_template.json</include>
|
||||||
</includes>
|
</includes>
|
||||||
<outputDirectory>plugin/writer/kudu11xwriter</outputDirectory>
|
<outputDirectory>plugin/writer/kuduwriter</outputDirectory>
|
||||||
</fileSet>
|
</fileSet>
|
||||||
<fileSet>
|
<fileSet>
|
||||||
<directory>target/</directory>
|
<directory>target/</directory>
|
||||||
<includes>
|
<includes>
|
||||||
<include>kudu11xwriter-0.0.1-SNAPSHOT.jar</include>
|
<include>kuduwriter-0.0.1-SNAPSHOT.jar</include>
|
||||||
</includes>
|
</includes>
|
||||||
<outputDirectory>plugin/writer/kudu11xwriter</outputDirectory>
|
<outputDirectory>plugin/writer/kuduwriter</outputDirectory>
|
||||||
</fileSet>
|
</fileSet>
|
||||||
</fileSets>
|
</fileSets>
|
||||||
|
|
||||||
<dependencySets>
|
<dependencySets>
|
||||||
<dependencySet>
|
<dependencySet>
|
||||||
<useProjectArtifact>false</useProjectArtifact>
|
<useProjectArtifact>false</useProjectArtifact>
|
||||||
<outputDirectory>plugin/writer/kudu11xwriter/libs</outputDirectory>
|
<outputDirectory>plugin/writer/kuduwriter/libs</outputDirectory>
|
||||||
<scope>runtime</scope>
|
<scope>runtime</scope>
|
||||||
</dependencySet>
|
</dependencySet>
|
||||||
</dependencySets>
|
</dependencySets>
|
||||||
|
@ -10,11 +10,17 @@ import org.apache.kudu.ColumnSchema;
|
|||||||
import org.apache.kudu.Schema;
|
import org.apache.kudu.Schema;
|
||||||
import org.apache.kudu.Type;
|
import org.apache.kudu.Type;
|
||||||
import org.apache.kudu.client.*;
|
import org.apache.kudu.client.*;
|
||||||
|
import org.apache.kudu.shaded.org.checkerframework.checker.units.qual.K;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
import sun.rmi.runtime.Log;
|
||||||
|
|
||||||
import java.nio.charset.Charset;
|
import java.nio.charset.Charset;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
|
import java.util.concurrent.SynchronousQueue;
|
||||||
|
import java.util.concurrent.ThreadFactory;
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -135,6 +141,44 @@ public class Kudu11xHelper {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static ThreadPoolExecutor createRowAddThreadPool(int coreSize) {
|
||||||
|
return new ThreadPoolExecutor(coreSize,
|
||||||
|
coreSize,
|
||||||
|
60L,
|
||||||
|
TimeUnit.SECONDS,
|
||||||
|
new SynchronousQueue<Runnable>(),
|
||||||
|
new ThreadFactory() {
|
||||||
|
private final ThreadGroup group = System.getSecurityManager() == null ? Thread.currentThread().getThreadGroup() : System.getSecurityManager().getThreadGroup();
|
||||||
|
private final AtomicInteger threadNumber = new AtomicInteger(1);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Thread newThread(Runnable r) {
|
||||||
|
Thread t = new Thread(group, r,
|
||||||
|
"pool-kudu_rows_add-thread-" + threadNumber.getAndIncrement(),
|
||||||
|
0);
|
||||||
|
if (t.isDaemon())
|
||||||
|
t.setDaemon(false);
|
||||||
|
if (t.getPriority() != Thread.NORM_PRIORITY)
|
||||||
|
t.setPriority(Thread.NORM_PRIORITY);
|
||||||
|
return t;
|
||||||
|
}
|
||||||
|
}, new ThreadPoolExecutor.CallerRunsPolicy());
|
||||||
|
}
|
||||||
|
|
||||||
|
public static List<List<Configuration>> getColumnLists(List<Configuration> columns) {
|
||||||
|
int quota = 8;
|
||||||
|
int num = (columns.size() - 1) / quota + 1;
|
||||||
|
int gap = columns.size() / num;
|
||||||
|
List<List<Configuration>> columnLists = new ArrayList<>(num);
|
||||||
|
for (int j = 0; j < num - 1; j++) {
|
||||||
|
List<Configuration> destList = new ArrayList<>(columns.subList(j * gap, (j + 1) * gap));
|
||||||
|
columnLists.add(destList);
|
||||||
|
}
|
||||||
|
List<Configuration> destList = new ArrayList<>(columns.subList(gap * (num - 1), columns.size()));
|
||||||
|
columnLists.add(destList);
|
||||||
|
return columnLists;
|
||||||
|
}
|
||||||
|
|
||||||
public static boolean isTableExists(Configuration configuration) {
|
public static boolean isTableExists(Configuration configuration) {
|
||||||
String tableName = configuration.getString(Key.TABLE);
|
String tableName = configuration.getString(Key.TABLE);
|
||||||
String kuduConfig = configuration.getString(Key.KUDU_CONFIG);
|
String kuduConfig = configuration.getString(Key.KUDU_CONFIG);
|
||||||
@ -154,7 +198,7 @@ public class Kudu11xHelper {
|
|||||||
kuduClient.close();
|
kuduClient.close();
|
||||||
}
|
}
|
||||||
} catch (KuduException e) {
|
} catch (KuduException e) {
|
||||||
LOG.warn("kudu client is not gracefully closed !");
|
LOG.warn("The \"kudu client\" was not stopped gracefully. !");
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -244,6 +288,7 @@ public class Kudu11xHelper {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public static void validateParameter(Configuration configuration) {
|
public static void validateParameter(Configuration configuration) {
|
||||||
|
LOG.info("Start validating parameters!");
|
||||||
configuration.getNecessaryValue(Key.KUDU_CONFIG, Kudu11xWriterErrorcode.REQUIRED_VALUE);
|
configuration.getNecessaryValue(Key.KUDU_CONFIG, Kudu11xWriterErrorcode.REQUIRED_VALUE);
|
||||||
configuration.getNecessaryValue(Key.TABLE, Kudu11xWriterErrorcode.REQUIRED_VALUE);
|
configuration.getNecessaryValue(Key.TABLE, Kudu11xWriterErrorcode.REQUIRED_VALUE);
|
||||||
String encoding = configuration.getString(Key.ENCODING, Constant.DEFAULT_ENCODING);
|
String encoding = configuration.getString(Key.ENCODING, Constant.DEFAULT_ENCODING);
|
||||||
@ -268,7 +313,39 @@ public class Kudu11xHelper {
|
|||||||
|
|
||||||
Boolean isSkipFail = configuration.getBool(Key.SKIP_FAIL, false);
|
Boolean isSkipFail = configuration.getBool(Key.SKIP_FAIL, false);
|
||||||
configuration.set(Key.SKIP_FAIL, isSkipFail);
|
configuration.set(Key.SKIP_FAIL, isSkipFail);
|
||||||
LOG.info("==validate parameter complete!");
|
List<Configuration> columns = configuration.getListConfiguration(Key.COLUMN);
|
||||||
|
List<Configuration> goalColumns = new ArrayList<>();
|
||||||
|
//column参数验证
|
||||||
|
int indexFlag = 0;
|
||||||
|
boolean primaryKey = true;
|
||||||
|
int primaryKeyFlag = 0;
|
||||||
|
for (int i = 0; i < columns.size(); i++) {
|
||||||
|
Configuration col = columns.get(i);
|
||||||
|
String index = col.getString(Key.INDEX);
|
||||||
|
if (index == null) {
|
||||||
|
index = String.valueOf(i);
|
||||||
|
col.set(Key.INDEX, index);
|
||||||
|
indexFlag++;
|
||||||
|
}
|
||||||
|
if(primaryKey != col.getBool(Key.PRIMARYKEY, false)){
|
||||||
|
primaryKey = col.getBool(Key.PRIMARYKEY, false);
|
||||||
|
primaryKeyFlag++;
|
||||||
|
}
|
||||||
|
goalColumns.add(col);
|
||||||
|
}
|
||||||
|
if (indexFlag != 0 && indexFlag != columns.size()) {
|
||||||
|
throw DataXException.asDataXException(Kudu11xWriterErrorcode.ILLEGAL_VALUE,
|
||||||
|
"\"index\" either has values for all of them, or all of them are null!");
|
||||||
|
}
|
||||||
|
if (primaryKeyFlag > 1){
|
||||||
|
throw DataXException.asDataXException(Kudu11xWriterErrorcode.ILLEGAL_VALUE,
|
||||||
|
"\"primaryKey\" must be written in the front!");
|
||||||
|
}
|
||||||
|
configuration.set(Key.COLUMN, goalColumns);
|
||||||
|
// LOG.info("------------------------------------");
|
||||||
|
// LOG.info(configuration.toString());
|
||||||
|
// LOG.info("------------------------------------");
|
||||||
|
LOG.info("validate parameter complete!");
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void truncateTable(Configuration configuration) {
|
public static void truncateTable(Configuration configuration) {
|
||||||
|
@ -38,7 +38,7 @@ public class Kudu11xWriter extends Writer {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<Configuration> split(int i) {
|
public List<Configuration> split(int i) {
|
||||||
List<Configuration> splitResultConfigs = new ArrayList<Configuration>();
|
List<Configuration> splitResultConfigs = new ArrayList<>();
|
||||||
for (int j = 0; j < i; j++) {
|
for (int j = 0; j < i; j++) {
|
||||||
splitResultConfigs.add(config.clone());
|
splitResultConfigs.add(config.clone());
|
||||||
}
|
}
|
||||||
@ -76,7 +76,7 @@ public class Kudu11xWriter extends Writer {
|
|||||||
kuduTaskProxy.session.close();
|
kuduTaskProxy.session.close();
|
||||||
}
|
}
|
||||||
}catch (Exception e){
|
}catch (Exception e){
|
||||||
LOG.warn("kudu session is not gracefully closed !");
|
LOG.warn("The \"kudu session\" was not stopped gracefully !");
|
||||||
}
|
}
|
||||||
Kudu11xHelper.closeClient(kuduTaskProxy.kuduClient);
|
Kudu11xHelper.closeClient(kuduTaskProxy.kuduClient);
|
||||||
|
|
||||||
|
@ -12,12 +12,13 @@ import org.apache.kudu.client.*;
|
|||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.*;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import java.util.concurrent.atomic.LongAdder;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author daizihao
|
* @author daizihao
|
||||||
@ -26,28 +27,30 @@ import java.util.concurrent.atomic.AtomicLong;
|
|||||||
public class KuduWriterTask {
|
public class KuduWriterTask {
|
||||||
private final static Logger LOG = LoggerFactory.getLogger(KuduWriterTask.class);
|
private final static Logger LOG = LoggerFactory.getLogger(KuduWriterTask.class);
|
||||||
|
|
||||||
public List<Configuration> columns;
|
private List<Configuration> columns;
|
||||||
public String encoding;
|
private List<List<Configuration>> columnLists;
|
||||||
public String insertMode;
|
private ThreadPoolExecutor pool;
|
||||||
public Double batchSize;
|
private String encoding;
|
||||||
public long mutationBufferSpace;
|
private Double batchSize;
|
||||||
public Boolean isUpsert;
|
private Boolean isUpsert;
|
||||||
public Boolean isSkipFail;
|
private Boolean isSkipFail;
|
||||||
|
|
||||||
public KuduClient kuduClient;
|
public KuduClient kuduClient;
|
||||||
public KuduTable table;
|
|
||||||
public KuduSession session;
|
public KuduSession session;
|
||||||
|
private KuduTable table;
|
||||||
private Integer primaryKeyIndexUntil;
|
private Integer primaryKeyIndexUntil;
|
||||||
|
|
||||||
|
private final Object lock = new Object();
|
||||||
|
|
||||||
public KuduWriterTask(Configuration configuration) {
|
public KuduWriterTask(Configuration configuration) {
|
||||||
this.columns = configuration.getListConfiguration(Key.COLUMN);
|
columns = configuration.getListConfiguration(Key.COLUMN);
|
||||||
|
columnLists = Kudu11xHelper.getColumnLists(columns);
|
||||||
|
pool = Kudu11xHelper.createRowAddThreadPool(columnLists.size());
|
||||||
|
|
||||||
this.encoding = configuration.getString(Key.ENCODING);
|
this.encoding = configuration.getString(Key.ENCODING);
|
||||||
this.insertMode = configuration.getString(Key.INSERT_MODE);
|
|
||||||
this.batchSize = configuration.getDouble(Key.WRITE_BATCH_SIZE);
|
this.batchSize = configuration.getDouble(Key.WRITE_BATCH_SIZE);
|
||||||
this.mutationBufferSpace = configuration.getLong(Key.MUTATION_BUFFER_SPACE);
|
this.isUpsert = !configuration.getString(Key.INSERT_MODE).equalsIgnoreCase("insert");
|
||||||
this.isUpsert = !configuration.getString(Key.INSERT_MODE).equals("insert");
|
|
||||||
this.isSkipFail = configuration.getBool(Key.SKIP_FAIL);
|
this.isSkipFail = configuration.getBool(Key.SKIP_FAIL);
|
||||||
|
long mutationBufferSpace = configuration.getLong(Key.MUTATION_BUFFER_SPACE);
|
||||||
|
|
||||||
this.kuduClient = Kudu11xHelper.getKuduClient(configuration.getString(Key.KUDU_CONFIG));
|
this.kuduClient = Kudu11xHelper.getKuduClient(configuration.getString(Key.KUDU_CONFIG));
|
||||||
this.table = Kudu11xHelper.getKuduTable(configuration, kuduClient);
|
this.table = Kudu11xHelper.getKuduTable(configuration, kuduClient);
|
||||||
@ -59,9 +62,9 @@ public class KuduWriterTask {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void startWriter(RecordReceiver lineReceiver, TaskPluginCollector taskPluginCollector) {
|
public void startWriter(RecordReceiver lineReceiver, TaskPluginCollector taskPluginCollector) {
|
||||||
LOG.info("==kuduwriter began to write!");
|
LOG.info("kuduwriter began to write!");
|
||||||
Record record;
|
Record record;
|
||||||
AtomicLong counter = new AtomicLong(0L);
|
LongAdder counter = new LongAdder();
|
||||||
try {
|
try {
|
||||||
while ((record = lineReceiver.getFromReader()) != null) {
|
while ((record = lineReceiver.getFromReader()) != null) {
|
||||||
if (record.getColumnNumber() != columns.size()) {
|
if (record.getColumnNumber() != columns.size()) {
|
||||||
@ -70,7 +73,7 @@ public class KuduWriterTask {
|
|||||||
boolean isDirtyRecord = false;
|
boolean isDirtyRecord = false;
|
||||||
|
|
||||||
|
|
||||||
for (int i = 0; i <= primaryKeyIndexUntil && !isDirtyRecord; i++) {
|
for (int i = 0; i < primaryKeyIndexUntil && !isDirtyRecord; i++) {
|
||||||
Column column = record.getColumn(i);
|
Column column = record.getColumn(i);
|
||||||
isDirtyRecord = StringUtils.isBlank(column.asString());
|
isDirtyRecord = StringUtils.isBlank(column.asString());
|
||||||
}
|
}
|
||||||
@ -80,10 +83,9 @@ public class KuduWriterTask {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
CountDownLatch countDownLatch = new CountDownLatch(columnLists.size());
|
||||||
Upsert upsert = table.newUpsert();
|
Upsert upsert = table.newUpsert();
|
||||||
Insert insert = table.newInsert();
|
Insert insert = table.newInsert();
|
||||||
|
|
||||||
for (int i = 0; i < columns.size(); i++) {
|
|
||||||
PartialRow row;
|
PartialRow row;
|
||||||
if (isUpsert) {
|
if (isUpsert) {
|
||||||
//覆盖更新
|
//覆盖更新
|
||||||
@ -92,37 +94,61 @@ public class KuduWriterTask {
|
|||||||
//增量更新
|
//增量更新
|
||||||
row = insert.getRow();
|
row = insert.getRow();
|
||||||
}
|
}
|
||||||
Configuration col = columns.get(i);
|
|
||||||
|
for (List<Configuration> columnList : columnLists) {
|
||||||
|
Record finalRecord = record;
|
||||||
|
pool.submit(()->{
|
||||||
|
|
||||||
|
for (Configuration col : columnList) {
|
||||||
|
|
||||||
String name = col.getString(Key.NAME);
|
String name = col.getString(Key.NAME);
|
||||||
ColumnType type = ColumnType.getByTypeName(col.getString(Key.TYPE));
|
ColumnType type = ColumnType.getByTypeName(col.getString(Key.TYPE, "string"));
|
||||||
Column column = record.getColumn(col.getInt(Key.INDEX, i));
|
Column column = finalRecord.getColumn(col.getInt(Key.INDEX));
|
||||||
Object rawData = column.getRawData();
|
String rawData = column.asString();
|
||||||
if (rawData == null) {
|
if (rawData == null) {
|
||||||
|
synchronized (lock) {
|
||||||
row.setNull(name);
|
row.setNull(name);
|
||||||
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
switch (type) {
|
switch (type) {
|
||||||
case INT:
|
case INT:
|
||||||
row.addInt(name, Integer.parseInt(rawData.toString()));
|
synchronized (lock) {
|
||||||
|
row.addInt(name, Integer.parseInt(rawData));
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
case LONG:
|
case LONG:
|
||||||
case BIGINT:
|
case BIGINT:
|
||||||
row.addLong(name, Long.parseLong(rawData.toString()));
|
synchronized (lock) {
|
||||||
|
row.addLong(name, Long.parseLong(rawData));
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
case FLOAT:
|
case FLOAT:
|
||||||
row.addFloat(name, Float.parseFloat(rawData.toString()));
|
synchronized (lock) {
|
||||||
|
row.addFloat(name, Float.parseFloat(rawData));
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
case DOUBLE:
|
case DOUBLE:
|
||||||
row.addDouble(name, Double.parseDouble(rawData.toString()));
|
synchronized (lock) {
|
||||||
|
row.addDouble(name, Double.parseDouble(rawData));
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
case BOOLEAN:
|
case BOOLEAN:
|
||||||
row.addBoolean(name, Boolean.getBoolean(rawData.toString()));
|
synchronized (lock) {
|
||||||
|
row.addBoolean(name, Boolean.getBoolean(rawData));
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
case STRING:
|
case STRING:
|
||||||
default:
|
default:
|
||||||
row.addString(name, rawData.toString());
|
synchronized (lock) {
|
||||||
|
row.addString(name, rawData);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
countDownLatch.countDown();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
countDownLatch.await();
|
||||||
try {
|
try {
|
||||||
RetryUtil.executeWithRetry(() -> {
|
RetryUtil.executeWithRetry(() -> {
|
||||||
if (isUpsert) {
|
if (isUpsert) {
|
||||||
@ -132,18 +158,19 @@ public class KuduWriterTask {
|
|||||||
//增量更新
|
//增量更新
|
||||||
session.apply(insert);
|
session.apply(insert);
|
||||||
}
|
}
|
||||||
//提前写数据,阈值可自定义
|
//flush
|
||||||
if (counter.incrementAndGet() > batchSize * 0.75) {
|
if (counter.longValue() > (batchSize * 0.8)) {
|
||||||
session.flush();
|
session.flush();
|
||||||
counter.set(0L);
|
counter.reset();
|
||||||
}
|
}
|
||||||
|
counter.increment();
|
||||||
return true;
|
return true;
|
||||||
},5,1000L,true);
|
}, 5, 500L, true);
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Data write failed!", e);
|
LOG.error("Record Write Failure!", e);
|
||||||
if (isSkipFail) {
|
if (isSkipFail) {
|
||||||
LOG.warn("Because you have configured skipFail is true,this data will be skipped!");
|
LOG.warn("Since you have configured \"skipFail\" to be true, this record will be skipped !");
|
||||||
taskPluginCollector.collectDirtyRecord(record, e.getMessage());
|
taskPluginCollector.collectDirtyRecord(record, e.getMessage());
|
||||||
} else {
|
} else {
|
||||||
throw e;
|
throw e;
|
||||||
@ -151,7 +178,7 @@ public class KuduWriterTask {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error("write failed! the task will exit!");
|
LOG.error("write failure! the task will exit!");
|
||||||
throw DataXException.asDataXException(Kudu11xWriterErrorcode.PUT_KUDU_ERROR, e.getMessage());
|
throw DataXException.asDataXException(Kudu11xWriterErrorcode.PUT_KUDU_ERROR, e.getMessage());
|
||||||
}
|
}
|
||||||
AtomicInteger i = new AtomicInteger(10);
|
AtomicInteger i = new AtomicInteger(10);
|
||||||
@ -161,23 +188,20 @@ public class KuduWriterTask {
|
|||||||
session.flush();
|
session.flush();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
Thread.sleep(1000L);
|
Thread.sleep(20L);
|
||||||
i.decrementAndGet();
|
i.decrementAndGet();
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.info("Waiting for data to be inserted...... " + i + "s");
|
LOG.info("Waiting for data to be written to kudu...... " + i + "s");
|
||||||
try {
|
|
||||||
Thread.sleep(1000L);
|
|
||||||
} catch (InterruptedException ex) {
|
|
||||||
ex.printStackTrace();
|
|
||||||
}
|
|
||||||
i.decrementAndGet();
|
|
||||||
} finally {
|
} finally {
|
||||||
try {
|
try {
|
||||||
|
pool.shutdown();
|
||||||
|
//强制刷写
|
||||||
session.flush();
|
session.flush();
|
||||||
} catch (KuduException e) {
|
} catch (KuduException e) {
|
||||||
LOG.error("==kuduwriter flush error! the results may not be complete!");
|
LOG.error("kuduwriter flush error! The results may be incomplete!");
|
||||||
e.printStackTrace();
|
throw DataXException.asDataXException(Kudu11xWriterErrorcode.PUT_KUDU_ERROR, e.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
{
|
{
|
||||||
"name": "kudu11xwriter",
|
"name": "kuduwriter",
|
||||||
"class": "com.q1.datax.plugin.writer.kudu11xwriter.Kudu11xWriter",
|
"class": "com.q1.datax.plugin.writer.kudu11xwriter.Kudu11xWriter",
|
||||||
"description": "use put: prod. mechanism: use kudu java api put data.",
|
"description": "use put: prod. mechanism: use kudu java api put data.",
|
||||||
"developer": "com.q1.daizihao"
|
"developer": "com.q1.daizihao"
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
{
|
{
|
||||||
"name": "kudu11xwriter",
|
"name": "kuduwriter",
|
||||||
"parameter": {
|
"parameter": {
|
||||||
"kuduConfig": {
|
"kuduConfig": {
|
||||||
"kudu.master_addresses": "***",
|
"kudu.master_addresses": "***",
|
||||||
|
Loading…
Reference in New Issue
Block a user