Merge pull request #635 from heljoyLiu/pulgin-gdb-update-to-set-property

gdbwrtier:  support set-property
This commit is contained in:
Trafalgar 2020-04-23 11:48:37 +08:00 committed by GitHub
commit 3b3fa878db
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 1264 additions and 922 deletions

View File

@ -41,6 +41,14 @@ GDBWriter通过DataX框架获取Reader生成的协议数据使用`g.addV/E(GD
{ {
"random": "60,64", "random": "60,64",
"type": "string" "type": "string"
},
{
"random": "100,1000",
"type": "long"
},
{
"random": "32,48",
"type": "string"
} }
], ],
"sliceRecordCount": 1000 "sliceRecordCount": 1000
@ -70,6 +78,18 @@ GDBWriter通过DataX框架获取Reader生成的协议数据使用`g.addV/E(GD
"name": "vertex_propKey", "name": "vertex_propKey",
"value": "${2}", "value": "${2}",
"type": "string", "type": "string",
"columnType": "vertexSetProperty"
},
{
"name": "vertex_propKey",
"value": "${3}",
"type": "long",
"columnType": "vertexSetProperty"
},
{
"name": "vertex_propKey2",
"value": "${4}",
"type": "string",
"columnType": "vertexProperty" "columnType": "vertexProperty"
} }
] ]
@ -290,6 +310,7 @@ GDBWriter通过DataX框架获取Reader生成的协议数据使用`g.addV/E(GD
* primaryKey表示该字段是主键id * primaryKey表示该字段是主键id
* 点枚举值: * 点枚举值:
* vertexPropertylabelType为点时表示该字段是点的普通属性 * vertexPropertylabelType为点时表示该字段是点的普通属性
* vertexSetPropertylabelType为点时表示该字段是点的SET属性value是SET属性中的一个属性值
* vertexJsonPropertylabelType为点时表示是点json属性value结构请见备注**json properties示例**点配置最多只允许出现一个json属性 * vertexJsonPropertylabelType为点时表示是点json属性value结构请见备注**json properties示例**点配置最多只允许出现一个json属性
* 边枚举值: * 边枚举值:
* srcPrimaryKeylabelType为边时表示该字段是起点主键id * srcPrimaryKeylabelType为边时表示该字段是起点主键id
@ -305,6 +326,14 @@ GDBWriter通过DataX框架获取Reader生成的协议数据使用`g.addV/E(GD
> {"k":"age","t":"int","v":"20"}, > {"k":"age","t":"int","v":"20"},
> {"k":"sex","t":"string","v":"male"} > {"k":"sex","t":"string","v":"male"}
> ]} > ]}
>
> # json格式同样支持给点添加SET属性格式如下
> {"properties":[
> {"k":"name","t":"string","v":"tom","c":"set"},
> {"k":"name","t":"string","v":"jack","c":"set"},
> {"k":"age","t":"int","v":"20"},
> {"k":"sex","t":"string","v":"male"}
> ]}
> ``` > ```
## 4 性能报告 ## 4 性能报告
@ -367,4 +396,5 @@ DataX压测机器
- GDBWriter插件与用户查询DSL使用相同的GDB实例端口导入时可能会影响查询性能 - GDBWriter插件与用户查询DSL使用相同的GDB实例端口导入时可能会影响查询性能
## FAQ ## FAQ
1. 使用SET属性需要升级GDB实例到`1.0.20`版本及以上。
2. 边只支持普通单值属性不能给边写SET属性数据。

View File

@ -1,10 +1,5 @@
package com.alibaba.datax.plugin.writer.gdbwriter; package com.alibaba.datax.plugin.writer.gdbwriter;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.function.Function;
import com.alibaba.datax.common.element.Record; import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver; import com.alibaba.datax.common.plugin.RecordReceiver;
@ -18,24 +13,33 @@ import com.alibaba.datax.plugin.writer.gdbwriter.mapping.MappingRule;
import com.alibaba.datax.plugin.writer.gdbwriter.mapping.MappingRuleFactory; import com.alibaba.datax.plugin.writer.gdbwriter.mapping.MappingRuleFactory;
import com.alibaba.datax.plugin.writer.gdbwriter.model.GdbElement; import com.alibaba.datax.plugin.writer.gdbwriter.model.GdbElement;
import com.alibaba.datax.plugin.writer.gdbwriter.model.GdbGraph; import com.alibaba.datax.plugin.writer.gdbwriter.model.GdbGraph;
import groovy.lang.Tuple2; import groovy.lang.Tuple2;
import io.netty.util.concurrent.DefaultThreadFactory; import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
public class GdbWriter extends Writer { import java.util.ArrayList;
private static final Logger log = LoggerFactory.getLogger(GdbWriter.class); import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
private static Function<Record, GdbElement> mapper = null; public class GdbWriter extends Writer {
private static GdbGraph globalGraph = null; private static final Logger log = LoggerFactory.getLogger(GdbWriter.class);
private static boolean session = false;
private static Function<Record, GdbElement> mapper = null;
private static GdbGraph globalGraph = null;
private static boolean session = false;
/** /**
* Job 中的方法仅执行一次Task 中方法会由框架启动多个 Task 线程并行执行 * Job 中的方法仅执行一次Task 中方法会由框架启动多个 Task 线程并行执行
* <p/> * <p/>
* 整个 Writer 执行流程是 * 整个 Writer 执行流程是
*
* <pre> * <pre>
* Job类init-->prepare-->split * Job类init-->prepare-->split
* *
@ -46,17 +50,16 @@ public class GdbWriter extends Writer {
* </pre> * </pre>
*/ */
public static class Job extends Writer.Job { public static class Job extends Writer.Job {
private static final Logger LOG = LoggerFactory private static final Logger LOG = LoggerFactory.getLogger(Job.class);
.getLogger(Job.class);
private Configuration jobConfig = null; private Configuration jobConfig = null;
@Override @Override
public void init() { public void init() {
LOG.info("GDB datax plugin writer job init begin ..."); LOG.info("GDB datax plugin writer job init begin ...");
this.jobConfig = getPluginJobConf(); this.jobConfig = getPluginJobConf();
GdbWriterConfig.of(this.jobConfig); GdbWriterConfig.of(this.jobConfig);
LOG.info("GDB datax plugin writer job init end."); LOG.info("GDB datax plugin writer job init end.");
/** /**
* 注意此方法仅执行一次 * 注意此方法仅执行一次
@ -71,37 +74,37 @@ public class GdbWriter extends Writer {
* 注意此方法仅执行一次 * 注意此方法仅执行一次
* 最佳实践如果 Job 中有需要进行数据同步之前的处理可以在此处完成如果没有必要则可以直接去掉 * 最佳实践如果 Job 中有需要进行数据同步之前的处理可以在此处完成如果没有必要则可以直接去掉
*/ */
super.prepare(); super.prepare();
MappingRule rule = MappingRuleFactory.getInstance().createV2(jobConfig); final MappingRule rule = MappingRuleFactory.getInstance().createV2(this.jobConfig);
mapper = new DefaultGdbMapper().getMapper(rule); mapper = new DefaultGdbMapper(this.jobConfig).getMapper(rule);
session = jobConfig.getBool(Key.SESSION_STATE, false); session = this.jobConfig.getBool(Key.SESSION_STATE, false);
/** /**
* client connect check before task * client connect check before task
*/ */
try { try {
globalGraph = GdbGraphManager.instance().getGraph(jobConfig, false); globalGraph = GdbGraphManager.instance().getGraph(this.jobConfig, false);
} catch (RuntimeException e) { } catch (final RuntimeException e) {
throw DataXException.asDataXException(GdbWriterErrorCode.FAIL_CLIENT_CONNECT, e.getMessage()); throw DataXException.asDataXException(GdbWriterErrorCode.FAIL_CLIENT_CONNECT, e.getMessage());
} }
} }
@Override @Override
public List<Configuration> split(int mandatoryNumber) { public List<Configuration> split(final int mandatoryNumber) {
/** /**
* 注意此方法仅执行一次 * 注意此方法仅执行一次
* 最佳实践通常采用工具静态类完成把 Job 配置切分成多个 Task 配置的工作 * 最佳实践通常采用工具静态类完成把 Job 配置切分成多个 Task 配置的工作
* 这里的 mandatoryNumber 是强制必须切分的份数 * 这里的 mandatoryNumber 是强制必须切分的份数
*/ */
LOG.info("split begin..."); LOG.info("split begin...");
List<Configuration> configurationList = new ArrayList<Configuration>(); final List<Configuration> configurationList = new ArrayList<Configuration>();
for (int i = 0; i < mandatoryNumber; i++) { for (int i = 0; i < mandatoryNumber; i++) {
configurationList.add(this.jobConfig.clone()); configurationList.add(this.jobConfig.clone());
} }
LOG.info("split end..."); LOG.info("split end...");
return configurationList; return configurationList;
} }
@Override @Override
@ -127,7 +130,7 @@ public class GdbWriter extends Writer {
public static class Task extends Writer.Task { public static class Task extends Writer.Task {
private Configuration taskConfig; private Configuration taskConfig;
private int failed = 0; private int failed = 0;
private int batchRecords; private int batchRecords;
private ExecutorService submitService = null; private ExecutorService submitService = null;
@ -139,24 +142,24 @@ public class GdbWriter extends Writer {
* 注意此方法每个 Task 都会执行一次 * 注意此方法每个 Task 都会执行一次
* 最佳实践此处通过对 taskConfig 配置的读取进而初始化一些资源为 startWrite()做准备 * 最佳实践此处通过对 taskConfig 配置的读取进而初始化一些资源为 startWrite()做准备
*/ */
this.taskConfig = super.getPluginJobConf(); this.taskConfig = super.getPluginJobConf();
batchRecords = taskConfig.getInt(Key.MAX_RECORDS_IN_BATCH, GdbWriterConfig.DEFAULT_RECORD_NUM_IN_BATCH); this.batchRecords = this.taskConfig.getInt(Key.MAX_RECORDS_IN_BATCH, GdbWriterConfig.DEFAULT_RECORD_NUM_IN_BATCH);
submitService = new ThreadPoolExecutor(1, 1, 0L, this.submitService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(),
TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(), new DefaultThreadFactory("submit-dsl")); new DefaultThreadFactory("submit-dsl"));
if (!session) { if (!session) {
graph = globalGraph; this.graph = globalGraph;
} else { } else {
/** /**
* 分批创建session client由于服务端groovy编译性能的限制 * 分批创建session client由于服务端groovy编译性能的限制
*/ */
try { try {
Thread.sleep((getTaskId()/10)*10000); Thread.sleep((getTaskId() / 10) * 10000);
} catch (Exception e) { } catch (final Exception e) {
// ... // ...
} }
graph = GdbGraphManager.instance().getGraph(taskConfig, session); this.graph = GdbGraphManager.instance().getGraph(this.taskConfig, session);
} }
} }
@Override @Override
@ -165,64 +168,69 @@ public class GdbWriter extends Writer {
* 注意此方法每个 Task 都会执行一次 * 注意此方法每个 Task 都会执行一次
* 最佳实践如果 Task 中有需要进行数据同步之前的处理可以在此处完成如果没有必要则可以直接去掉 * 最佳实践如果 Task 中有需要进行数据同步之前的处理可以在此处完成如果没有必要则可以直接去掉
*/ */
super.prepare(); super.prepare();
} }
@Override @Override
public void startWrite(RecordReceiver recordReceiver) { public void startWrite(final RecordReceiver recordReceiver) {
/** /**
* 注意此方法每个 Task 都会执行一次 * 注意此方法每个 Task 都会执行一次
* 最佳实践此处适当封装确保简洁清晰完成数据写入工作 * 最佳实践此处适当封装确保简洁清晰完成数据写入工作
*/ */
Record r; Record r;
Future<Boolean> future = null; Future<Boolean> future = null;
List<Tuple2<Record, GdbElement>> records = new ArrayList<>(batchRecords); List<Tuple2<Record, GdbElement>> records = new ArrayList<>(this.batchRecords);
while ((r = recordReceiver.getFromReader()) != null) { while ((r = recordReceiver.getFromReader()) != null) {
records.add(new Tuple2<>(r, mapper.apply(r))); try {
records.add(new Tuple2<>(r, mapper.apply(r)));
} catch (final Exception ex) {
getTaskPluginCollector().collectDirtyRecord(r, ex);
continue;
}
if (records.size() >= batchRecords) { if (records.size() >= this.batchRecords) {
wait4Submit(future); wait4Submit(future);
final List<Tuple2<Record, GdbElement>> batch = records; final List<Tuple2<Record, GdbElement>> batch = records;
future = submitService.submit(() -> batchCommitRecords(batch)); future = this.submitService.submit(() -> batchCommitRecords(batch));
records = new ArrayList<>(batchRecords); records = new ArrayList<>(this.batchRecords);
} }
} }
wait4Submit(future); wait4Submit(future);
if (!records.isEmpty()) { if (!records.isEmpty()) {
final List<Tuple2<Record, GdbElement>> batch = records; final List<Tuple2<Record, GdbElement>> batch = records;
future = submitService.submit(() -> batchCommitRecords(batch)); future = this.submitService.submit(() -> batchCommitRecords(batch));
wait4Submit(future); wait4Submit(future);
} }
} }
private void wait4Submit(Future<Boolean> future) { private void wait4Submit(final Future<Boolean> future) {
if (future == null) { if (future == null) {
return; return;
} }
try { try {
future.get(); future.get();
} catch (Exception e) { } catch (final Exception e) {
e.printStackTrace(); e.printStackTrace();
} }
} }
private boolean batchCommitRecords(final List<Tuple2<Record, GdbElement>> records) { private boolean batchCommitRecords(final List<Tuple2<Record, GdbElement>> records) {
TaskPluginCollector collector = getTaskPluginCollector(); final TaskPluginCollector collector = getTaskPluginCollector();
try { try {
List<Tuple2<Record, Exception>> errors = graph.add(records); final List<Tuple2<Record, Exception>> errors = this.graph.add(records);
errors.forEach(t -> collector.collectDirtyRecord(t.getFirst(), t.getSecond())); errors.forEach(t -> collector.collectDirtyRecord(t.getFirst(), t.getSecond()));
failed += errors.size(); this.failed += errors.size();
} catch (Exception e) { } catch (final Exception e) {
records.forEach(t -> collector.collectDirtyRecord(t.getFirst(), e)); records.forEach(t -> collector.collectDirtyRecord(t.getFirst(), e));
failed += records.size(); this.failed += records.size();
} }
records.clear(); records.clear();
return true; return true;
} }
@Override @Override
@ -231,7 +239,7 @@ public class GdbWriter extends Writer {
* 注意此方法每个 Task 都会执行一次 * 注意此方法每个 Task 都会执行一次
* 最佳实践如果 Task 中有需要进行数据同步之后的后续处理可以在此处完成 * 最佳实践如果 Task 中有需要进行数据同步之后的后续处理可以在此处完成
*/ */
log.info("Task done, dirty record count - {}", failed); log.info("Task done, dirty record count - {}", this.failed);
} }
@Override @Override
@ -241,9 +249,9 @@ public class GdbWriter extends Writer {
* 最佳实践通常配合Task 中的 post() 方法一起完成 Task 的资源释放 * 最佳实践通常配合Task 中的 post() 方法一起完成 Task 的资源释放
*/ */
if (session) { if (session) {
graph.close(); this.graph.close();
} }
submitService.shutdown(); this.submitService.shutdown();
} }
} }

View File

@ -27,7 +27,6 @@ public enum GdbWriterErrorCode implements ErrorCode {
@Override @Override
public String toString() { public String toString() {
return String.format("Code:[%s], Description:[%s]. ", this.code, return String.format("Code:[%s], Description:[%s]. ", this.code, this.description);
this.description);
} }
} }

View File

@ -6,136 +6,164 @@ public final class Key {
* 此处声明插件用到的需要插件使用者提供的配置项 * 此处声明插件用到的需要插件使用者提供的配置项
*/ */
public final static String HOST = "host"; public final static String HOST = "host";
public final static String PORT = "port"; public final static String PORT = "port";
public final static String USERNAME = "username"; public final static String USERNAME = "username";
public static final String PASSWORD = "password"; public static final String PASSWORD = "password";
/** /**
* import type and mode * import type and mode
*/ */
public static final String IMPORT_TYPE = "labelType"; public static final String IMPORT_TYPE = "labelType";
public static final String UPDATE_MODE = "writeMode"; public static final String UPDATE_MODE = "writeMode";
/** /**
* label prefix issue * label prefix issue
*/ */
public static final String ID_TRANS_RULE = "idTransRule"; public static final String ID_TRANS_RULE = "idTransRule";
public static final String SRC_ID_TRANS_RULE = "srcIdTransRule"; public static final String SRC_ID_TRANS_RULE = "srcIdTransRule";
public static final String DST_ID_TRANS_RULE = "dstIdTransRule"; public static final String DST_ID_TRANS_RULE = "dstIdTransRule";
public static final String LABEL = "label"; public static final String LABEL = "label";
public static final String SRC_LABEL = "srcLabel"; public static final String SRC_LABEL = "srcLabel";
public static final String DST_LABEL = "dstLabel"; public static final String DST_LABEL = "dstLabel";
public static final String MAPPING = "mapping"; public static final String MAPPING = "mapping";
/** /**
* column define in Gdb * column define in Gdb
*/ */
public static final String COLUMN = "column"; public static final String COLUMN = "column";
public static final String COLUMN_NAME = "name"; public static final String COLUMN_NAME = "name";
public static final String COLUMN_VALUE = "value"; public static final String COLUMN_VALUE = "value";
public static final String COLUMN_TYPE = "type"; public static final String COLUMN_TYPE = "type";
public static final String COLUMN_NODE_TYPE = "columnType"; public static final String COLUMN_NODE_TYPE = "columnType";
/** /**
* Gdb Vertex/Edge elements * Gdb Vertex/Edge elements
*/ */
public static final String ID = "id"; public static final String ID = "id";
public static final String FROM = "from"; public static final String FROM = "from";
public static final String TO = "to"; public static final String TO = "to";
public static final String PROPERTIES = "properties"; public static final String PROPERTIES = "properties";
public static final String PROP_KEY = "name"; public static final String PROP_KEY = "name";
public static final String PROP_VALUE = "value"; public static final String PROP_VALUE = "value";
public static final String PROP_TYPE = "type"; public static final String PROP_TYPE = "type";
public static final String PROPERTIES_JSON_STR = "propertiesJsonStr"; public static final String PROPERTIES_JSON_STR = "propertiesJsonStr";
public static final String MAX_PROPERTIES_BATCH_NUM = "maxPropertiesBatchNumber"; public static final String MAX_PROPERTIES_BATCH_NUM = "maxPropertiesBatchNumber";
/** /**
* session less client configure for connect pool * session less client configure for connect pool
*/ */
public static final String MAX_IN_PROCESS_PER_CONNECTION = "maxInProcessPerConnection"; public static final String MAX_IN_PROCESS_PER_CONNECTION = "maxInProcessPerConnection";
public static final String MAX_CONNECTION_POOL_SIZE = "maxConnectionPoolSize"; public static final String MAX_CONNECTION_POOL_SIZE = "maxConnectionPoolSize";
public static final String MAX_SIMULTANEOUS_USAGE_PER_CONNECTION = "maxSimultaneousUsagePerConnection"; public static final String MAX_SIMULTANEOUS_USAGE_PER_CONNECTION = "maxSimultaneousUsagePerConnection";
public static final String MAX_RECORDS_IN_BATCH = "maxRecordsInBatch"; public static final String MAX_RECORDS_IN_BATCH = "maxRecordsInBatch";
public static final String SESSION_STATE = "session"; public static final String SESSION_STATE = "session";
public static enum ImportType { /**
/** * request length limit, include gdb element string length GDB字段长度限制配置可分别配置各字段的限制超过限制的记录会当脏数据处理
* Import vertices */
*/ public static final String MAX_GDB_STRING_LENGTH = "maxStringLengthLimit";
VERTEX, public static final String MAX_GDB_ID_LENGTH = "maxIdStringLengthLimit";
/** public static final String MAX_GDB_LABEL_LENGTH = "maxLabelStringLengthLimit";
* Import edges public static final String MAX_GDB_PROP_KEY_LENGTH = "maxPropKeyStringLengthLimit";
*/ public static final String MAX_GDB_PROP_VALUE_LENGTH = "maxPropValueStringLengthLimit";
EDGE;
}
public static enum UpdateMode {
/**
* Insert new records, fail if exists
*/
INSERT,
/**
* Skip this record if exists
*/
SKIP,
/**
* Update property of this record if exists
*/
MERGE;
}
public static enum ColumnType { public static final String MAX_GDB_REQUEST_LENGTH = "maxRequestLengthLimit";
/**
* vertex or edge id
*/
primaryKey,
/** public static enum ImportType {
* vertex property /**
*/ * Import vertices
vertexProperty, */
VERTEX,
/**
* Import edges
*/
EDGE;
}
/** public static enum UpdateMode {
* start vertex id of edge /**
*/ * Insert new records, fail if exists
srcPrimaryKey, */
INSERT,
/**
* Skip this record if exists
*/
SKIP,
/**
* Update property of this record if exists
*/
MERGE;
}
/** public static enum ColumnType {
* end vertex id of edge /**
*/ * vertex or edge id
dstPrimaryKey, */
primaryKey,
/** /**
* edge property * vertex property
*/ */
edgeProperty, vertexProperty,
/** /**
* vertex json style property * vertex setProperty
*/ */
vertexJsonProperty, vertexSetProperty,
/** /**
* edge json style property * start vertex id of edge
*/ */
edgeJsonProperty srcPrimaryKey,
}
public static enum IdTransRule { /**
/** * end vertex id of edge
* vertex or edge id with 'label' prefix */
*/ dstPrimaryKey,
labelPrefix,
/** /**
* vertex or edge id raw * edge property
*/ */
none edgeProperty,
}
/**
* vertex json style property
*/
vertexJsonProperty,
/**
* edge json style property
*/
edgeJsonProperty
}
public static enum IdTransRule {
/**
* vertex or edge id with 'label' prefix
*/
labelPrefix,
/**
* vertex or edge id raw
*/
none
}
public static enum PropertyType {
/**
* single Vertex Property
*/
single,
/**
* set Vertex Property
*/
set
}
} }

View File

@ -3,37 +3,37 @@
*/ */
package com.alibaba.datax.plugin.writer.gdbwriter.client; package com.alibaba.datax.plugin.writer.gdbwriter.client;
import java.util.ArrayList;
import java.util.List;
import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.writer.gdbwriter.model.GdbGraph; import com.alibaba.datax.plugin.writer.gdbwriter.model.GdbGraph;
import com.alibaba.datax.plugin.writer.gdbwriter.model.ScriptGdbGraph; import com.alibaba.datax.plugin.writer.gdbwriter.model.ScriptGdbGraph;
import java.util.ArrayList;
import java.util.List;
/** /**
* @author jerrywang * @author jerrywang
* *
*/ */
public class GdbGraphManager implements AutoCloseable { public class GdbGraphManager implements AutoCloseable {
private static final GdbGraphManager instance = new GdbGraphManager(); private static final GdbGraphManager INSTANCE = new GdbGraphManager();
private List<GdbGraph> graphs = new ArrayList<>();
public static GdbGraphManager instance() {
return instance;
}
public GdbGraph getGraph(Configuration config, boolean session) { private List<GdbGraph> graphs = new ArrayList<>();
GdbGraph graph = new ScriptGdbGraph(config, session);
graphs.add(graph);
return graph;
}
@Override public static GdbGraphManager instance() {
public void close() { return INSTANCE;
for(GdbGraph graph : graphs) { }
graph.close();
} public GdbGraph getGraph(final Configuration config, final boolean session) {
graphs.clear(); final GdbGraph graph = new ScriptGdbGraph(config, session);
} this.graphs.add(graph);
return graph;
}
@Override
public void close() {
for (final GdbGraph graph : this.graphs) {
graph.close();
}
this.graphs.clear();
}
} }

View File

@ -3,39 +3,43 @@
*/ */
package com.alibaba.datax.plugin.writer.gdbwriter.client; package com.alibaba.datax.plugin.writer.gdbwriter.client;
import static com.alibaba.datax.plugin.writer.gdbwriter.util.ConfigHelper.assertConfig;
import static com.alibaba.datax.plugin.writer.gdbwriter.util.ConfigHelper.assertHasContent;
import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.writer.gdbwriter.Key; import com.alibaba.datax.plugin.writer.gdbwriter.Key;
import static com.alibaba.datax.plugin.writer.gdbwriter.util.ConfigHelper.*;
/** /**
* @author jerrywang * @author jerrywang
* *
*/ */
public class GdbWriterConfig { public class GdbWriterConfig {
public static final int DEFAULT_MAX_IN_PROCESS_PER_CONNECTION = 4; public static final int DEFAULT_MAX_IN_PROCESS_PER_CONNECTION = 4;
public static final int DEFAULT_MAX_CONNECTION_POOL_SIZE = 8; public static final int DEFAULT_MAX_CONNECTION_POOL_SIZE = 8;
public static final int DEFAULT_MAX_SIMULTANEOUS_USAGE_PER_CONNECTION = 8; public static final int DEFAULT_MAX_SIMULTANEOUS_USAGE_PER_CONNECTION = 8;
public static final int DEFAULT_BATCH_PROPERTY_NUM = 30; public static final int DEFAULT_BATCH_PROPERTY_NUM = 30;
public static final int DEFAULT_RECORD_NUM_IN_BATCH = 16; public static final int DEFAULT_RECORD_NUM_IN_BATCH = 16;
private Configuration config; public static final int MAX_STRING_LENGTH = 10240;
public static final int MAX_REQUEST_LENGTH = 65535 - 1000;
private GdbWriterConfig(Configuration config) { private Configuration config;
this.config = config;
validate(); private GdbWriterConfig(final Configuration config) {
} this.config = config;
private void validate() { validate();
assertHasContent(config, Key.HOST); }
assertConfig(Key.PORT, () -> config.getInt(Key.PORT) > 0);
assertHasContent(config, Key.USERNAME); public static GdbWriterConfig of(final Configuration config) {
assertHasContent(config, Key.PASSWORD); return new GdbWriterConfig(config);
} }
public static GdbWriterConfig of(Configuration config) { private void validate() {
return new GdbWriterConfig(config); assertHasContent(this.config, Key.HOST);
} assertConfig(Key.PORT, () -> this.config.getInt(Key.PORT) > 0);
assertHasContent(this.config, Key.USERNAME);
assertHasContent(this.config, Key.PASSWORD);
}
} }

View File

@ -3,6 +3,8 @@
*/ */
package com.alibaba.datax.plugin.writer.gdbwriter.mapping; package com.alibaba.datax.plugin.writer.gdbwriter.mapping;
import static com.alibaba.datax.plugin.writer.gdbwriter.Key.ImportType.VERTEX;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
@ -12,179 +14,191 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import com.alibaba.datax.common.element.Record; import com.alibaba.datax.common.element.Record;
import com.alibaba.fastjson.JSONArray; import com.alibaba.datax.common.util.Configuration;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.datax.plugin.writer.gdbwriter.Key; import com.alibaba.datax.plugin.writer.gdbwriter.Key;
import com.alibaba.datax.plugin.writer.gdbwriter.model.GdbEdge; import com.alibaba.datax.plugin.writer.gdbwriter.model.GdbEdge;
import com.alibaba.datax.plugin.writer.gdbwriter.model.GdbElement; import com.alibaba.datax.plugin.writer.gdbwriter.model.GdbElement;
import com.alibaba.datax.plugin.writer.gdbwriter.model.GdbVertex; import com.alibaba.datax.plugin.writer.gdbwriter.model.GdbVertex;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import static com.alibaba.datax.plugin.writer.gdbwriter.Key.ImportType.VERTEX;
/** /**
* @author jerrywang * @author jerrywang
* *
*/ */
@Slf4j @Slf4j
public class DefaultGdbMapper implements GdbMapper { public class DefaultGdbMapper implements GdbMapper {
private static final Pattern STR_PATTERN = Pattern.compile("\\$\\{(\\d+)}"); private static final Pattern STR_DOLLAR_PATTERN = Pattern.compile("\\$\\{(\\d+)}");
private static final Pattern NORMAL_PATTERN = Pattern.compile("^\\$\\{(\\d+)}$"); private static final Pattern NORMAL_DOLLAR_PATTERN = Pattern.compile("^\\$\\{(\\d+)}$");
@Override private static final Pattern STR_NUM_PATTERN = Pattern.compile("#\\{(\\d+)}");
public Function<Record, GdbElement> getMapper(MappingRule rule) { private static final Pattern NORMAL_NUM_PATTERN = Pattern.compile("^#\\{(\\d+)}$");
return r -> {
GdbElement e = (rule.getImportType() == VERTEX) ? new GdbVertex() : new GdbEdge(); public DefaultGdbMapper() {}
forElement(rule).accept(r, e);
return e; public DefaultGdbMapper(final Configuration config) {
MapperConfig.getInstance().updateConfig(config);
}
private static BiConsumer<Record, GdbElement> forElement(final MappingRule rule) {
final boolean numPattern = rule.isNumPattern();
final List<BiConsumer<Record, GdbElement>> properties = new ArrayList<>();
for (final MappingRule.PropertyMappingRule propRule : rule.getProperties()) {
final Function<Record, String> keyFunc = forStrColumn(numPattern, propRule.getKey());
if (propRule.getValueType() == ValueType.STRING) {
final Function<Record, String> valueFunc = forStrColumn(numPattern, propRule.getValue());
properties.add((r, e) -> {
e.addProperty(keyFunc.apply(r), valueFunc.apply(r), propRule.getPType());
});
} else {
final Function<Record, Object> valueFunc =
forObjColumn(numPattern, propRule.getValue(), propRule.getValueType());
properties.add((r, e) -> {
e.addProperty(keyFunc.apply(r), valueFunc.apply(r), propRule.getPType());
});
}
}
if (rule.getPropertiesJsonStr() != null) {
final Function<Record, String> jsonFunc = forStrColumn(numPattern, rule.getPropertiesJsonStr());
properties.add((r, e) -> {
final String propertiesStr = jsonFunc.apply(r);
final JSONObject root = (JSONObject)JSONObject.parse(propertiesStr);
final JSONArray propertiesList = root.getJSONArray("properties");
for (final Object object : propertiesList) {
final JSONObject jsonObject = (JSONObject)object;
final String key = jsonObject.getString("k");
final String name = jsonObject.getString("v");
final String type = jsonObject.getString("t");
final String card = jsonObject.getString("c");
if (key == null || name == null) {
continue;
}
addToProperties(e, key, name, type, card);
}
});
}
final BiConsumer<Record, GdbElement> ret = (r, e) -> {
final String label = forStrColumn(numPattern, rule.getLabel()).apply(r);
String id = forStrColumn(numPattern, rule.getId()).apply(r);
if (rule.getImportType() == Key.ImportType.EDGE) {
final String to = forStrColumn(numPattern, rule.getTo()).apply(r);
final String from = forStrColumn(numPattern, rule.getFrom()).apply(r);
if (to == null || from == null) {
log.error("invalid record to: {} , from: {}", to, from);
throw new IllegalArgumentException("to or from missed in edge");
}
((GdbEdge)e).setTo(to);
((GdbEdge)e).setFrom(from);
// generate UUID for edge
if (id == null) {
id = UUID.randomUUID().toString();
}
}
if (id == null || label == null) {
log.error("invalid record id: {} , label: {}", id, label);
throw new IllegalArgumentException("id or label missed");
}
e.setId(id);
e.setLabel(label);
properties.forEach(p -> p.accept(r, e));
}; };
} return ret;
}
private static BiConsumer<Record, GdbElement> forElement(MappingRule rule) { private static Function<Record, Object> forObjColumn(final boolean numPattern, final String rule, final ValueType type) {
List<BiConsumer<Record, GdbElement>> properties = new ArrayList<>(); final Pattern pattern = numPattern ? NORMAL_NUM_PATTERN : NORMAL_DOLLAR_PATTERN;
for (MappingRule.PropertyMappingRule propRule : rule.getProperties()) { final Matcher m = pattern.matcher(rule);
Function<Record, String> keyFunc = forStrColumn(propRule.getKey()); if (m.matches()) {
final int index = Integer.valueOf(m.group(1));
return r -> type.applyColumn(r.getColumn(index));
} else {
return r -> type.fromStrFunc(rule);
}
}
if (propRule.getValueType() == ValueType.STRING) { private static Function<Record, String> forStrColumn(final boolean numPattern, final String rule) {
final Function<Record, String> valueFunc = forStrColumn(propRule.getValue()); final List<BiConsumer<StringBuilder, Record>> list = new ArrayList<>();
properties.add((r, e) -> { final Pattern pattern = numPattern ? STR_NUM_PATTERN : STR_DOLLAR_PATTERN;
String k = keyFunc.apply(r); final Matcher m = pattern.matcher(rule);
String v = valueFunc.apply(r); int last = 0;
if (k != null && v != null) { while (m.find()) {
e.getProperties().put(k, v); final String index = m.group(1);
} // as simple integer index.
}); final int i = Integer.parseInt(index);
} else {
final Function<Record, Object> valueFunc = forObjColumn(propRule.getValue(), propRule.getValueType());
properties.add((r, e) -> {
String k = keyFunc.apply(r);
Object v = valueFunc.apply(r);
if (k != null && v != null) {
e.getProperties().put(k, v);
}
});
}
}
if (rule.getPropertiesJsonStr() != null) { final int tmp = last;
Function<Record, String> jsonFunc = forStrColumn(rule.getPropertiesJsonStr()); final int start = m.start();
properties.add((r, e) -> { list.add((sb, record) -> {
String propertiesStr = jsonFunc.apply(r); sb.append(rule.subSequence(tmp, start));
JSONObject root = (JSONObject)JSONObject.parse(propertiesStr); if (record.getColumn(i) != null && record.getColumn(i).getByteSize() > 0) {
JSONArray propertiesList = root.getJSONArray("properties"); sb.append(record.getColumn(i).asString());
}
});
for (Object object : propertiesList) { last = m.end();
JSONObject jsonObject = (JSONObject)object; }
String key = jsonObject.getString("k");
String name = jsonObject.getString("v");
String type = jsonObject.getString("t");
if (key == null || name == null) { final int tmp = last;
continue; list.add((sb, record) -> {
} sb.append(rule.subSequence(tmp, rule.length()));
addToProperties(e, key, name, type); });
}
});
}
BiConsumer<Record, GdbElement> ret = (r, e) -> { return r -> {
String label = forStrColumn(rule.getLabel()).apply(r); final StringBuilder sb = new StringBuilder();
String id = forStrColumn(rule.getId()).apply(r); list.forEach(c -> c.accept(sb, r));
final String res = sb.toString();
return res.isEmpty() ? null : res;
};
}
if (rule.getImportType() == Key.ImportType.EDGE) { private static boolean addToProperties(final GdbElement e, final String key, final String value, final String type, final String card) {
String to = forStrColumn(rule.getTo()).apply(r); final Object pValue;
String from = forStrColumn(rule.getFrom()).apply(r); final ValueType valueType = ValueType.fromShortName(type);
if (to == null || from == null) {
log.error("invalid record to: {} , from: {}", to, from);
throw new IllegalArgumentException("to or from missed in edge");
}
((GdbEdge)e).setTo(to);
((GdbEdge)e).setFrom(from);
// generate UUID for edge if (valueType == ValueType.STRING) {
if (id == null) { pValue = value;
id = UUID.randomUUID().toString(); } else if (valueType == ValueType.INT || valueType == ValueType.INTEGER) {
} pValue = Integer.valueOf(value);
} } else if (valueType == ValueType.LONG) {
pValue = Long.valueOf(value);
} else if (valueType == ValueType.DOUBLE) {
pValue = Double.valueOf(value);
} else if (valueType == ValueType.FLOAT) {
pValue = Float.valueOf(value);
} else if (valueType == ValueType.BOOLEAN) {
pValue = Boolean.valueOf(value);
} else {
log.error("invalid property key {}, value {}, type {}", key, value, type);
return false;
}
if (id == null || label == null) { // apply vertexSetProperty
log.error("invalid record id: {} , label: {}", id, label); if (Key.PropertyType.set.name().equals(card) && (e instanceof GdbVertex)) {
throw new IllegalArgumentException("id or label missed"); e.addProperty(key, pValue, Key.PropertyType.set);
} } else {
e.addProperty(key, pValue);
}
return true;
}
e.setId(id); @Override
e.setLabel(label); public Function<Record, GdbElement> getMapper(final MappingRule rule) {
return r -> {
properties.forEach(p -> p.accept(r, e)); final GdbElement e = (rule.getImportType() == VERTEX) ? new GdbVertex() : new GdbEdge();
}; forElement(rule).accept(r, e);
return ret; return e;
} };
}
static Function<Record, Object> forObjColumn(String rule, ValueType type) {
Matcher m = NORMAL_PATTERN.matcher(rule);
if (m.matches()) {
int index = Integer.valueOf(m.group(1));
return r -> type.applyColumn(r.getColumn(index));
} else {
return r -> type.fromStrFunc(rule);
}
}
static Function<Record, String> forStrColumn(String rule) {
List<BiConsumer<StringBuilder, Record>> list = new ArrayList<>();
Matcher m = STR_PATTERN.matcher(rule);
int last = 0;
while (m.find()) {
String index = m.group(1);
// as simple integer index.
int i = Integer.parseInt(index);
final int tmp = last;
final int start = m.start();
list.add((sb, record) -> {
sb.append(rule.subSequence(tmp, start));
if(record.getColumn(i) != null && record.getColumn(i).getByteSize() > 0) {
sb.append(record.getColumn(i).asString());
}
});
last = m.end();
}
final int tmp = last;
list.add((sb, record) -> {
sb.append(rule.subSequence(tmp, rule.length()));
});
return r -> {
StringBuilder sb = new StringBuilder();
list.forEach(c -> c.accept(sb, r));
String res = sb.toString();
return res.isEmpty() ? null : res;
};
}
static boolean addToProperties(GdbElement e, String key, String value, String type) {
ValueType valueType = ValueType.fromShortName(type);
if(valueType == ValueType.STRING) {
e.getProperties().put(key, value);
} else if (valueType == ValueType.INT) {
e.getProperties().put(key, Integer.valueOf(value));
} else if (valueType == ValueType.LONG) {
e.getProperties().put(key, Long.valueOf(value));
} else if (valueType == ValueType.DOUBLE) {
e.getProperties().put(key, Double.valueOf(value));
} else if (valueType == ValueType.FLOAT) {
e.getProperties().put(key, Float.valueOf(value));
} else if (valueType == ValueType.BOOLEAN) {
e.getProperties().put(key, Boolean.valueOf(value));
} else {
log.error("invalid property key {}, value {}, type {}", key, value, type);
return false;
}
return true;
}
} }

View File

@ -13,5 +13,5 @@ import com.alibaba.datax.plugin.writer.gdbwriter.model.GdbElement;
* *
*/ */
public interface GdbMapper { public interface GdbMapper {
Function<Record, GdbElement> getMapper(MappingRule rule); Function<Record, GdbElement> getMapper(MappingRule rule);
} }

View File

@ -0,0 +1,68 @@
/*
* (C) 2019-present Alibaba Group Holding Limited.
*
* This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public
* License version 2 as published by the Free Software Foundation.
*/
package com.alibaba.datax.plugin.writer.gdbwriter.mapping;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.writer.gdbwriter.Key;
import com.alibaba.datax.plugin.writer.gdbwriter.client.GdbWriterConfig;
/**
* @author : Liu Jianping
* @date : 2019/10/15
*/
public class MapperConfig {
private static MapperConfig instance = new MapperConfig();
private int maxIdLength;
private int maxLabelLength;
private int maxPropKeyLength;
private int maxPropValueLength;
private MapperConfig() {
this.maxIdLength = GdbWriterConfig.MAX_STRING_LENGTH;
this.maxLabelLength = GdbWriterConfig.MAX_STRING_LENGTH;
this.maxPropKeyLength = GdbWriterConfig.MAX_STRING_LENGTH;
this.maxPropValueLength = GdbWriterConfig.MAX_STRING_LENGTH;
}
public static MapperConfig getInstance() {
return instance;
}
public void updateConfig(final Configuration config) {
final int length = config.getInt(Key.MAX_GDB_STRING_LENGTH, GdbWriterConfig.MAX_STRING_LENGTH);
Integer sLength = config.getInt(Key.MAX_GDB_ID_LENGTH);
this.maxIdLength = sLength == null ? length : sLength;
sLength = config.getInt(Key.MAX_GDB_LABEL_LENGTH);
this.maxLabelLength = sLength == null ? length : sLength;
sLength = config.getInt(Key.MAX_GDB_PROP_KEY_LENGTH);
this.maxPropKeyLength = sLength == null ? length : sLength;
sLength = config.getInt(Key.MAX_GDB_PROP_VALUE_LENGTH);
this.maxPropValueLength = sLength == null ? length : sLength;
}
public int getMaxIdLength() {
return this.maxIdLength;
}
public int getMaxLabelLength() {
return this.maxLabelLength;
}
public int getMaxPropKeyLength() {
return this.maxPropKeyLength;
}
public int getMaxPropValueLength() {
return this.maxPropValueLength;
}
}

View File

@ -7,6 +7,7 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import com.alibaba.datax.plugin.writer.gdbwriter.Key.ImportType; import com.alibaba.datax.plugin.writer.gdbwriter.Key.ImportType;
import com.alibaba.datax.plugin.writer.gdbwriter.Key.PropertyType;
import lombok.Data; import lombok.Data;
@ -16,26 +17,30 @@ import lombok.Data;
*/ */
@Data @Data
public class MappingRule { public class MappingRule {
private String id = null; private String id = null;
private String label = null; private String label = null;
private ImportType importType = null;
private String from = null;
private String to = null; private ImportType importType = null;
private List<PropertyMappingRule> properties = new ArrayList<>(); private String from = null;
private String propertiesJsonStr = null; private String to = null;
@Data private List<PropertyMappingRule> properties = new ArrayList<>();
public static class PropertyMappingRule {
private String key = null; private String propertiesJsonStr = null;
private String value = null; private boolean numPattern = false;
private ValueType valueType = null; @Data
} public static class PropertyMappingRule {
private String key = null;
private String value = null;
private ValueType valueType = null;
private PropertyType pType = PropertyType.single;
}
} }

View File

@ -3,18 +3,21 @@
*/ */
package com.alibaba.datax.plugin.writer.gdbwriter.mapping; package com.alibaba.datax.plugin.writer.gdbwriter.mapping;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.writer.gdbwriter.GdbWriterErrorCode; import com.alibaba.datax.plugin.writer.gdbwriter.GdbWriterErrorCode;
import com.alibaba.datax.plugin.writer.gdbwriter.Key; import com.alibaba.datax.plugin.writer.gdbwriter.Key;
import com.alibaba.datax.plugin.writer.gdbwriter.Key.ImportType;
import com.alibaba.datax.plugin.writer.gdbwriter.Key.IdTransRule;
import com.alibaba.datax.plugin.writer.gdbwriter.Key.ColumnType; import com.alibaba.datax.plugin.writer.gdbwriter.Key.ColumnType;
import com.alibaba.datax.plugin.writer.gdbwriter.Key.IdTransRule;
import com.alibaba.datax.plugin.writer.gdbwriter.Key.ImportType;
import com.alibaba.datax.plugin.writer.gdbwriter.mapping.MappingRule.PropertyMappingRule; import com.alibaba.datax.plugin.writer.gdbwriter.mapping.MappingRule.PropertyMappingRule;
import com.alibaba.datax.plugin.writer.gdbwriter.util.ConfigHelper; import com.alibaba.datax.plugin.writer.gdbwriter.util.ConfigHelper;
import lombok.extern.slf4j.Slf4j;
import java.util.List; import lombok.extern.slf4j.Slf4j;
/** /**
* @author jerrywang * @author jerrywang
@ -22,66 +25,94 @@ import java.util.List;
*/ */
@Slf4j @Slf4j
public class MappingRuleFactory { public class MappingRuleFactory {
private static final MappingRuleFactory instance = new MappingRuleFactory(); private static final MappingRuleFactory instance = new MappingRuleFactory();
private static final Pattern STR_PATTERN = Pattern.compile("\\$\\{(\\d+)}");
public static final MappingRuleFactory getInstance() { private static final Pattern STR_NUM_PATTERN = Pattern.compile("#\\{(\\d+)}");
return instance;
}
@Deprecated public static MappingRuleFactory getInstance() {
public MappingRule create(Configuration config, ImportType type) { return instance;
MappingRule rule = new MappingRule();
rule.setId(config.getString(Key.ID));
rule.setLabel(config.getString(Key.LABEL));
if (type == ImportType.EDGE) {
rule.setFrom(config.getString(Key.FROM));
rule.setTo(config.getString(Key.TO));
}
rule.setImportType(type);
List<Configuration> configurations = config.getListConfiguration(Key.PROPERTIES);
if (configurations != null) {
for (Configuration prop : config.getListConfiguration(Key.PROPERTIES)) {
PropertyMappingRule propRule = new PropertyMappingRule();
propRule.setKey(prop.getString(Key.PROP_KEY));
propRule.setValue(prop.getString(Key.PROP_VALUE));
propRule.setValueType(ValueType.fromShortName(prop.getString(Key.PROP_TYPE).toLowerCase()));
rule.getProperties().add(propRule);
}
}
String propertiesJsonStr = config.getString(Key.PROPERTIES_JSON_STR, null);
if (propertiesJsonStr != null) {
rule.setPropertiesJsonStr(propertiesJsonStr);
}
return rule;
}
public MappingRule createV2(Configuration config) {
try {
ImportType type = ImportType.valueOf(config.getString(Key.IMPORT_TYPE));
return createV2(config, type);
} catch (NullPointerException e) {
throw DataXException.asDataXException(GdbWriterErrorCode.CONFIG_ITEM_MISS, Key.IMPORT_TYPE);
} catch (IllegalArgumentException e) {
throw DataXException.asDataXException(GdbWriterErrorCode.BAD_CONFIG_VALUE, Key.IMPORT_TYPE);
}
} }
public MappingRule createV2(Configuration config, ImportType type) { private static boolean isPattern(final String value, final MappingRule rule, final boolean checked) {
MappingRule rule = new MappingRule(); if (checked) {
return true;
}
ConfigHelper.assertHasContent(config, Key.LABEL); if (value == null || value.isEmpty()) {
rule.setLabel(config.getString(Key.LABEL)); return false;
rule.setImportType(type); }
IdTransRule srcTransRule = IdTransRule.none; Matcher m = STR_PATTERN.matcher(value);
if (m.find()) {
rule.setNumPattern(false);
return true;
}
m = STR_NUM_PATTERN.matcher(value);
if (m.find()) {
rule.setNumPattern(true);
return true;
}
return false;
}
@Deprecated
public MappingRule create(final Configuration config, final ImportType type) {
final MappingRule rule = new MappingRule();
rule.setId(config.getString(Key.ID));
rule.setLabel(config.getString(Key.LABEL));
if (type == ImportType.EDGE) {
rule.setFrom(config.getString(Key.FROM));
rule.setTo(config.getString(Key.TO));
}
rule.setImportType(type);
final List<Configuration> configurations = config.getListConfiguration(Key.PROPERTIES);
if (configurations != null) {
for (final Configuration prop : config.getListConfiguration(Key.PROPERTIES)) {
final PropertyMappingRule propRule = new PropertyMappingRule();
propRule.setKey(prop.getString(Key.PROP_KEY));
propRule.setValue(prop.getString(Key.PROP_VALUE));
propRule.setValueType(ValueType.fromShortName(prop.getString(Key.PROP_TYPE).toLowerCase()));
rule.getProperties().add(propRule);
}
}
final String propertiesJsonStr = config.getString(Key.PROPERTIES_JSON_STR, null);
if (propertiesJsonStr != null) {
rule.setPropertiesJsonStr(propertiesJsonStr);
}
return rule;
}
public MappingRule createV2(final Configuration config) {
try {
final ImportType type = ImportType.valueOf(config.getString(Key.IMPORT_TYPE));
return createV2(config, type);
} catch (final NullPointerException e) {
throw DataXException.asDataXException(GdbWriterErrorCode.CONFIG_ITEM_MISS, Key.IMPORT_TYPE);
} catch (final IllegalArgumentException e) {
throw DataXException.asDataXException(GdbWriterErrorCode.BAD_CONFIG_VALUE, Key.IMPORT_TYPE);
}
}
public MappingRule createV2(final Configuration config, final ImportType type) {
final MappingRule rule = new MappingRule();
boolean patternChecked = false;
ConfigHelper.assertHasContent(config, Key.LABEL);
rule.setLabel(config.getString(Key.LABEL));
rule.setImportType(type);
patternChecked = isPattern(rule.getLabel(), rule, patternChecked);
IdTransRule srcTransRule = IdTransRule.none;
IdTransRule dstTransRule = IdTransRule.none; IdTransRule dstTransRule = IdTransRule.none;
if (type == ImportType.EDGE) { if (type == ImportType.EDGE) {
ConfigHelper.assertHasContent(config, Key.SRC_ID_TRANS_RULE); ConfigHelper.assertHasContent(config, Key.SRC_ID_TRANS_RULE);
ConfigHelper.assertHasContent(config, Key.DST_ID_TRANS_RULE); ConfigHelper.assertHasContent(config, Key.DST_ID_TRANS_RULE);
srcTransRule = IdTransRule.valueOf(config.getString(Key.SRC_ID_TRANS_RULE)); srcTransRule = IdTransRule.valueOf(config.getString(Key.SRC_ID_TRANS_RULE));
dstTransRule = IdTransRule.valueOf(config.getString(Key.DST_ID_TRANS_RULE)); dstTransRule = IdTransRule.valueOf(config.getString(Key.DST_ID_TRANS_RULE));
@ -94,88 +125,96 @@ public class MappingRuleFactory {
ConfigHelper.assertHasContent(config, Key.DST_LABEL); ConfigHelper.assertHasContent(config, Key.DST_LABEL);
} }
} }
ConfigHelper.assertHasContent(config, Key.ID_TRANS_RULE); ConfigHelper.assertHasContent(config, Key.ID_TRANS_RULE);
IdTransRule transRule = IdTransRule.valueOf(config.getString(Key.ID_TRANS_RULE)); final IdTransRule transRule = IdTransRule.valueOf(config.getString(Key.ID_TRANS_RULE));
List<Configuration> configurationList = config.getListConfiguration(Key.COLUMN); final List<Configuration> configurationList = config.getListConfiguration(Key.COLUMN);
ConfigHelper.assertConfig(Key.COLUMN, () -> (configurationList != null && !configurationList.isEmpty())); ConfigHelper.assertConfig(Key.COLUMN, () -> (configurationList != null && !configurationList.isEmpty()));
for (Configuration column : configurationList) { for (final Configuration column : configurationList) {
ConfigHelper.assertHasContent(column, Key.COLUMN_NAME); ConfigHelper.assertHasContent(column, Key.COLUMN_NAME);
ConfigHelper.assertHasContent(column, Key.COLUMN_VALUE); ConfigHelper.assertHasContent(column, Key.COLUMN_VALUE);
ConfigHelper.assertHasContent(column, Key.COLUMN_TYPE); ConfigHelper.assertHasContent(column, Key.COLUMN_TYPE);
ConfigHelper.assertHasContent(column, Key.COLUMN_NODE_TYPE); ConfigHelper.assertHasContent(column, Key.COLUMN_NODE_TYPE);
String columnValue = column.getString(Key.COLUMN_VALUE); final String columnValue = column.getString(Key.COLUMN_VALUE);
ColumnType columnType = ColumnType.valueOf(column.getString(Key.COLUMN_NODE_TYPE)); final ColumnType columnType = ColumnType.valueOf(column.getString(Key.COLUMN_NODE_TYPE));
if (columnValue == null || columnValue.isEmpty()) { if (columnValue == null || columnValue.isEmpty()) {
// only allow edge empty id // only allow edge empty id
ConfigHelper.assertConfig("empty column value", ConfigHelper.assertConfig("empty column value",
() -> (type == ImportType.EDGE && columnType == ColumnType.primaryKey)); () -> (type == ImportType.EDGE && columnType == ColumnType.primaryKey));
} }
patternChecked = isPattern(columnValue, rule, patternChecked);
if (columnType == ColumnType.primaryKey) { if (columnType == ColumnType.primaryKey) {
ValueType propType = ValueType.fromShortName(column.getString(Key.COLUMN_TYPE)); final ValueType propType = ValueType.fromShortName(column.getString(Key.COLUMN_TYPE));
ConfigHelper.assertConfig("only string is allowed in primary key", () -> (propType == ValueType.STRING)); ConfigHelper.assertConfig("only string is allowed in primary key",
() -> (propType == ValueType.STRING));
if (transRule == IdTransRule.labelPrefix) { if (transRule == IdTransRule.labelPrefix) {
rule.setId(config.getString(Key.LABEL) + columnValue); rule.setId(config.getString(Key.LABEL) + columnValue);
} else {
rule.setId(columnValue);
}
} else if (columnType == ColumnType.edgeJsonProperty || columnType == ColumnType.vertexJsonProperty) {
// only support one json property in column
ConfigHelper.assertConfig("multi JsonProperty", () -> (rule.getPropertiesJsonStr() == null));
rule.setPropertiesJsonStr(columnValue);
} else if (columnType == ColumnType.vertexProperty || columnType == ColumnType.edgeProperty) {
PropertyMappingRule propertyMappingRule = new PropertyMappingRule();
propertyMappingRule.setKey(column.getString(Key.COLUMN_NAME));
propertyMappingRule.setValue(columnValue);
ValueType propType = ValueType.fromShortName(column.getString(Key.COLUMN_TYPE));
ConfigHelper.assertConfig("unsupported property type", () -> propType != null);
propertyMappingRule.setValueType(propType);
rule.getProperties().add(propertyMappingRule);
} else if (columnType == ColumnType.srcPrimaryKey) {
if (type != ImportType.EDGE) {
continue;
}
ValueType propType = ValueType.fromShortName(column.getString(Key.COLUMN_TYPE));
ConfigHelper.assertConfig("only string is allowed in primary key", () -> (propType == ValueType.STRING));
if (srcTransRule == IdTransRule.labelPrefix) {
rule.setFrom(config.getString(Key.SRC_LABEL) + columnValue);
} else { } else {
rule.setFrom(columnValue); rule.setId(columnValue);
} }
} else if (columnType == ColumnType.dstPrimaryKey) { } else if (columnType == ColumnType.edgeJsonProperty || columnType == ColumnType.vertexJsonProperty) {
// only support one json property in column
ConfigHelper.assertConfig("multi JsonProperty", () -> (rule.getPropertiesJsonStr() == null));
rule.setPropertiesJsonStr(columnValue);
} else if (columnType == ColumnType.vertexProperty || columnType == ColumnType.edgeProperty
|| columnType == ColumnType.vertexSetProperty) {
final PropertyMappingRule propertyMappingRule = new PropertyMappingRule();
propertyMappingRule.setKey(column.getString(Key.COLUMN_NAME));
propertyMappingRule.setValue(columnValue);
final ValueType propType = ValueType.fromShortName(column.getString(Key.COLUMN_TYPE));
ConfigHelper.assertConfig("unsupported property type", () -> propType != null);
if (columnType == ColumnType.vertexSetProperty) {
propertyMappingRule.setPType(Key.PropertyType.set);
}
propertyMappingRule.setValueType(propType);
rule.getProperties().add(propertyMappingRule);
} else if (columnType == ColumnType.srcPrimaryKey) {
if (type != ImportType.EDGE) { if (type != ImportType.EDGE) {
continue; continue;
} }
ValueType propType = ValueType.fromShortName(column.getString(Key.COLUMN_TYPE)); final ValueType propType = ValueType.fromShortName(column.getString(Key.COLUMN_TYPE));
ConfigHelper.assertConfig("only string is allowed in primary key", () -> (propType == ValueType.STRING)); ConfigHelper.assertConfig("only string is allowed in primary key",
() -> (propType == ValueType.STRING));
if (srcTransRule == IdTransRule.labelPrefix) {
rule.setFrom(config.getString(Key.SRC_LABEL) + columnValue);
} else {
rule.setFrom(columnValue);
}
} else if (columnType == ColumnType.dstPrimaryKey) {
if (type != ImportType.EDGE) {
continue;
}
final ValueType propType = ValueType.fromShortName(column.getString(Key.COLUMN_TYPE));
ConfigHelper.assertConfig("only string is allowed in primary key",
() -> (propType == ValueType.STRING));
if (dstTransRule == IdTransRule.labelPrefix) { if (dstTransRule == IdTransRule.labelPrefix) {
rule.setTo(config.getString(Key.DST_LABEL) + columnValue); rule.setTo(config.getString(Key.DST_LABEL) + columnValue);
} else { } else {
rule.setTo(columnValue); rule.setTo(columnValue);
} }
} }
} }
if (rule.getImportType() == ImportType.EDGE) { if (rule.getImportType() == ImportType.EDGE) {
if (rule.getId() == null) { if (rule.getId() == null) {
rule.setId(""); rule.setId("");
log.info("edge id is missed, uuid be default"); log.info("edge id is missed, uuid be default");
} }
ConfigHelper.assertConfig("to needed in edge", () -> (rule.getTo() != null)); ConfigHelper.assertConfig("to needed in edge", () -> (rule.getTo() != null));
ConfigHelper.assertConfig("from needed in edge", () -> (rule.getFrom() != null)); ConfigHelper.assertConfig("from needed in edge", () -> (rule.getFrom() != null));
} }
ConfigHelper.assertConfig("id needed", () -> (rule.getId() != null)); ConfigHelper.assertConfig("id needed", () -> (rule.getId() != null));
return rule; return rule;
} }
} }

View File

@ -8,6 +8,7 @@ import java.util.Map;
import java.util.function.Function; import java.util.function.Function;
import com.alibaba.datax.common.element.Column; import com.alibaba.datax.common.element.Column;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
/** /**
@ -16,56 +17,61 @@ import lombok.extern.slf4j.Slf4j;
*/ */
@Slf4j @Slf4j
public enum ValueType { public enum ValueType {
INT(Integer.class, "int", Column::asLong, Integer::valueOf), /**
LONG(Long.class, "long", Column::asLong, Long::valueOf), * property value type
DOUBLE(Double.class, "double", Column::asDouble, Double::valueOf), */
FLOAT(Float.class, "float", Column::asDouble, Float::valueOf), INT(Integer.class, "int", Column::asLong, Integer::valueOf),
BOOLEAN(Boolean.class, "boolean", Column::asBoolean, Boolean::valueOf), INTEGER(Integer.class, "integer", Column::asLong, Integer::valueOf),
STRING(String.class, "string", Column::asString, String::valueOf); LONG(Long.class, "long", Column::asLong, Long::valueOf),
DOUBLE(Double.class, "double", Column::asDouble, Double::valueOf),
FLOAT(Float.class, "float", Column::asDouble, Float::valueOf),
BOOLEAN(Boolean.class, "boolean", Column::asBoolean, Boolean::valueOf),
STRING(String.class, "string", Column::asString, String::valueOf);
private Class<?> type = null; private Class<?> type = null;
private String shortName = null; private String shortName = null;
private Function<Column, Object> columnFunc = null; private Function<Column, Object> columnFunc = null;
private Function<String, Object> fromStrFunc = null; private Function<String, Object> fromStrFunc = null;
private ValueType(Class<?> type, String name, Function<Column, Object> columnFunc, Function<String, Object> fromStrFunc) { private ValueType(final Class<?> type, final String name, final Function<Column, Object> columnFunc,
this.type = type; final Function<String, Object> fromStrFunc) {
this.shortName = name; this.type = type;
this.columnFunc = columnFunc; this.shortName = name;
this.fromStrFunc = fromStrFunc; this.columnFunc = columnFunc;
this.fromStrFunc = fromStrFunc;
ValueTypeHolder.shortName2type.put(name, this);
}
public static ValueType fromShortName(String name) {
return ValueTypeHolder.shortName2type.get(name);
}
public Class<?> type() { ValueTypeHolder.shortName2type.put(name, this);
return this.type; }
}
public String shortName() {
return this.shortName;
}
public Object applyColumn(Column column) {
try {
if (column == null) {
return null;
}
return columnFunc.apply(column);
} catch (Exception e) {
log.error("applyColumn error {}, column {}", e.toString(), column);
throw e;
}
}
public Object fromStrFunc(String str) {
return fromStrFunc.apply(str);
}
private static class ValueTypeHolder { public static ValueType fromShortName(final String name) {
private static Map<String, ValueType> shortName2type = new HashMap<>(); return ValueTypeHolder.shortName2type.get(name);
} }
public Class<?> type() {
return this.type;
}
public String shortName() {
return this.shortName;
}
public Object applyColumn(final Column column) {
try {
if (column == null) {
return null;
}
return this.columnFunc.apply(column);
} catch (final Exception e) {
log.error("applyColumn error {}, column {}", e.toString(), column);
throw e;
}
}
public Object fromStrFunc(final String str) {
return this.fromStrFunc.apply(str);
}
private static class ValueTypeHolder {
private static Map<String, ValueType> shortName2type = new HashMap<>();
}
} }

View File

@ -3,20 +3,24 @@
*/ */
package com.alibaba.datax.plugin.writer.gdbwriter.model; package com.alibaba.datax.plugin.writer.gdbwriter.model;
import com.alibaba.datax.common.util.Configuration; import static com.alibaba.datax.plugin.writer.gdbwriter.client.GdbWriterConfig.DEFAULT_BATCH_PROPERTY_NUM;
import com.alibaba.datax.plugin.writer.gdbwriter.Key; import static com.alibaba.datax.plugin.writer.gdbwriter.client.GdbWriterConfig.MAX_REQUEST_LENGTH;
import com.alibaba.datax.plugin.writer.gdbwriter.client.GdbWriterConfig;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.tinkerpop.gremlin.driver.Client; import org.apache.tinkerpop.gremlin.driver.Client;
import org.apache.tinkerpop.gremlin.driver.Cluster; import org.apache.tinkerpop.gremlin.driver.Cluster;
import org.apache.tinkerpop.gremlin.driver.RequestOptions; import org.apache.tinkerpop.gremlin.driver.RequestOptions;
import org.apache.tinkerpop.gremlin.driver.ResultSet; import org.apache.tinkerpop.gremlin.driver.ResultSet;
import org.apache.tinkerpop.gremlin.driver.ser.Serializers; import org.apache.tinkerpop.gremlin.driver.ser.Serializers;
import java.util.Map; import com.alibaba.datax.common.util.Configuration;
import java.util.UUID; import com.alibaba.datax.plugin.writer.gdbwriter.Key;
import java.util.concurrent.TimeUnit; import com.alibaba.datax.plugin.writer.gdbwriter.client.GdbWriterConfig;
import lombok.extern.slf4j.Slf4j;
/** /**
* @author jerrywang * @author jerrywang
@ -24,128 +28,124 @@ import java.util.concurrent.TimeUnit;
*/ */
@Slf4j @Slf4j
public abstract class AbstractGdbGraph implements GdbGraph { public abstract class AbstractGdbGraph implements GdbGraph {
private final static int DEFAULT_TIMEOUT = 30000; private final static int DEFAULT_TIMEOUT = 30000;
protected Client client = null; protected Client client = null;
protected Key.UpdateMode updateMode = Key.UpdateMode.INSERT; protected Key.UpdateMode updateMode = Key.UpdateMode.INSERT;
protected int propertiesBatchNum = GdbWriterConfig.DEFAULT_BATCH_PROPERTY_NUM; protected int propertiesBatchNum = DEFAULT_BATCH_PROPERTY_NUM;
protected boolean session = false; protected boolean session = false;
protected int maxRequestLength = GdbWriterConfig.MAX_REQUEST_LENGTH;
protected AbstractGdbGraph() {}
protected AbstractGdbGraph() {} protected AbstractGdbGraph(final Configuration config, final boolean session) {
initClient(config, session);
}
protected AbstractGdbGraph(Configuration config, boolean session) { protected void initClient(final Configuration config, final boolean session) {
initClient(config, session); this.updateMode = Key.UpdateMode.valueOf(config.getString(Key.UPDATE_MODE, "INSERT"));
} log.info("init graphdb client");
final String host = config.getString(Key.HOST);
final int port = config.getInt(Key.PORT);
final String username = config.getString(Key.USERNAME);
final String password = config.getString(Key.PASSWORD);
int maxDepthPerConnection =
config.getInt(Key.MAX_IN_PROCESS_PER_CONNECTION, GdbWriterConfig.DEFAULT_MAX_IN_PROCESS_PER_CONNECTION);
protected void initClient(Configuration config, boolean session) { int maxConnectionPoolSize =
updateMode = Key.UpdateMode.valueOf(config.getString(Key.UPDATE_MODE, "INSERT")); config.getInt(Key.MAX_CONNECTION_POOL_SIZE, GdbWriterConfig.DEFAULT_MAX_CONNECTION_POOL_SIZE);
log.info("init graphdb client");
String host = config.getString(Key.HOST);
int port = config.getInt(Key.PORT);
String username = config.getString(Key.USERNAME);
String password = config.getString(Key.PASSWORD);
int maxDepthPerConnection = config.getInt(Key.MAX_IN_PROCESS_PER_CONNECTION,
GdbWriterConfig.DEFAULT_MAX_IN_PROCESS_PER_CONNECTION);
int maxConnectionPoolSize = config.getInt(Key.MAX_CONNECTION_POOL_SIZE, int maxSimultaneousUsagePerConnection = config.getInt(Key.MAX_SIMULTANEOUS_USAGE_PER_CONNECTION,
GdbWriterConfig.DEFAULT_MAX_CONNECTION_POOL_SIZE); GdbWriterConfig.DEFAULT_MAX_SIMULTANEOUS_USAGE_PER_CONNECTION);
int maxSimultaneousUsagePerConnection = config.getInt(Key.MAX_SIMULTANEOUS_USAGE_PER_CONNECTION, this.session = session;
GdbWriterConfig.DEFAULT_MAX_SIMULTANEOUS_USAGE_PER_CONNECTION); if (this.session) {
maxConnectionPoolSize = GdbWriterConfig.DEFAULT_MAX_CONNECTION_POOL_SIZE;
maxDepthPerConnection = GdbWriterConfig.DEFAULT_MAX_IN_PROCESS_PER_CONNECTION;
maxSimultaneousUsagePerConnection = GdbWriterConfig.DEFAULT_MAX_SIMULTANEOUS_USAGE_PER_CONNECTION;
}
this.session = session; try {
if (this.session) { final Cluster cluster = Cluster.build(host).port(port).credentials(username, password)
maxConnectionPoolSize = GdbWriterConfig.DEFAULT_MAX_CONNECTION_POOL_SIZE; .serializer(Serializers.GRAPHBINARY_V1D0).maxContentLength(1048576)
maxDepthPerConnection = GdbWriterConfig.DEFAULT_MAX_IN_PROCESS_PER_CONNECTION; .maxInProcessPerConnection(maxDepthPerConnection).minInProcessPerConnection(0)
maxSimultaneousUsagePerConnection = GdbWriterConfig.DEFAULT_MAX_SIMULTANEOUS_USAGE_PER_CONNECTION; .maxConnectionPoolSize(maxConnectionPoolSize).minConnectionPoolSize(maxConnectionPoolSize)
} .maxSimultaneousUsagePerConnection(maxSimultaneousUsagePerConnection).resultIterationBatchSize(64)
.create();
this.client = session ? cluster.connect(UUID.randomUUID().toString()).init() : cluster.connect().init();
warmClient(maxConnectionPoolSize * maxDepthPerConnection);
} catch (final RuntimeException e) {
log.error("Failed to connect to GDB {}:{}, due to {}", host, port, e);
throw e;
}
try { this.propertiesBatchNum = config.getInt(Key.MAX_PROPERTIES_BATCH_NUM, DEFAULT_BATCH_PROPERTY_NUM);
Cluster cluster = Cluster.build(host).port(port).credentials(username, password) this.maxRequestLength = config.getInt(Key.MAX_GDB_REQUEST_LENGTH, MAX_REQUEST_LENGTH);
.serializer(Serializers.GRAPHBINARY_V1D0) }
.maxContentLength(1048576)
.maxInProcessPerConnection(maxDepthPerConnection)
.minInProcessPerConnection(0)
.maxConnectionPoolSize(maxConnectionPoolSize)
.minConnectionPoolSize(maxConnectionPoolSize)
.maxSimultaneousUsagePerConnection(maxSimultaneousUsagePerConnection)
.resultIterationBatchSize(64)
.create();
client = session ? cluster.connect(UUID.randomUUID().toString()).init() : cluster.connect().init();
warmClient(maxConnectionPoolSize*maxDepthPerConnection);
} catch (RuntimeException e) {
log.error("Failed to connect to GDB {}:{}, due to {}", host, port, e);
throw e;
}
propertiesBatchNum = config.getInt(Key.MAX_PROPERTIES_BATCH_NUM, GdbWriterConfig.DEFAULT_BATCH_PROPERTY_NUM); /**
} * @param dsl
* @param parameters
*/
protected void runInternal(final String dsl, final Map<String, Object> parameters) throws Exception {
final RequestOptions.Builder options = RequestOptions.build().timeout(DEFAULT_TIMEOUT);
if (parameters != null && !parameters.isEmpty()) {
parameters.forEach(options::addParameter);
}
final ResultSet results = this.client.submitAsync(dsl, options.create()).get(DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS);
results.all().get(DEFAULT_TIMEOUT + 1000, TimeUnit.MILLISECONDS);
}
/** void beginTx() {
* @param dsl if (!this.session) {
* @param parameters return;
*/ }
protected void runInternal(String dsl, final Map<String, Object> parameters) throws Exception {
RequestOptions.Builder options = RequestOptions.build().timeout(DEFAULT_TIMEOUT);
if (parameters != null && !parameters.isEmpty()) {
parameters.forEach(options::addParameter);
}
ResultSet results = client.submitAsync(dsl, options.create()).get(DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS); final String dsl = "g.tx().open()";
results.all().get(DEFAULT_TIMEOUT + 1000, TimeUnit.MILLISECONDS); this.client.submit(dsl).all().join();
} }
void beginTx() { void doCommit() {
if (!session) { if (!this.session) {
return; return;
} }
String dsl = "g.tx().open()"; try {
client.submit(dsl).all().join(); final String dsl = "g.tx().commit()";
} this.client.submit(dsl).all().join();
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
void doCommit() { void doRollback() {
if (!session) { if (!this.session) {
return; return;
} }
try { final String dsl = "g.tx().rollback()";
String dsl = "g.tx().commit()"; this.client.submit(dsl).all().join();
client.submit(dsl).all().join(); }
} catch (Exception e) {
throw new RuntimeException(e);
}
}
void doRollback() { private void warmClient(final int num) {
if (!session) { try {
return; beginTx();
} runInternal("g.V('test')", null);
doCommit();
log.info("warm graphdb client over");
} catch (final Exception e) {
log.error("warmClient error");
doRollback();
throw new RuntimeException(e);
}
}
String dsl = "g.tx().rollback()"; @Override
client.submit(dsl).all().join(); public void close() {
} if (this.client != null) {
log.info("close graphdb client");
private void warmClient(int num) { this.client.close();
try { }
beginTx(); }
runInternal("g.V('test')", null);
doCommit();
log.info("warm graphdb client over");
} catch (Exception e) {
log.error("warmClient error");
doRollback();
throw new RuntimeException(e);
}
}
@Override
public void close() {
if (client != null) {
log.info("close graphdb client");
client.close();
}
}
} }

View File

@ -3,7 +3,8 @@
*/ */
package com.alibaba.datax.plugin.writer.gdbwriter.model; package com.alibaba.datax.plugin.writer.gdbwriter.model;
import lombok.Data; import com.alibaba.datax.plugin.writer.gdbwriter.mapping.MapperConfig;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
import lombok.ToString; import lombok.ToString;
@ -11,10 +12,33 @@ import lombok.ToString;
* @author jerrywang * @author jerrywang
* *
*/ */
@Data
@EqualsAndHashCode(callSuper = true) @EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true) @ToString(callSuper = true)
public class GdbEdge extends GdbElement { public class GdbEdge extends GdbElement {
private String from = null; private String from = null;
private String to = null; private String to = null;
public String getFrom() {
return this.from;
}
public void setFrom(final String from) {
final int maxIdLength = MapperConfig.getInstance().getMaxIdLength();
if (from.length() > maxIdLength) {
throw new IllegalArgumentException("from length over limit(" + maxIdLength + ")");
}
this.from = from;
}
public String getTo() {
return this.to;
}
public void setTo(final String to) {
final int maxIdLength = MapperConfig.getInstance().getMaxIdLength();
if (to.length() > maxIdLength) {
throw new IllegalArgumentException("to length over limit(" + maxIdLength + ")");
}
this.to = to;
}
} }

View File

@ -3,18 +3,107 @@
*/ */
package com.alibaba.datax.plugin.writer.gdbwriter.model; package com.alibaba.datax.plugin.writer.gdbwriter.model;
import java.util.HashMap; import java.util.LinkedList;
import java.util.Map; import java.util.List;
import lombok.Data; import com.alibaba.datax.plugin.writer.gdbwriter.Key.PropertyType;
import com.alibaba.datax.plugin.writer.gdbwriter.mapping.MapperConfig;
/** /**
* @author jerrywang * @author jerrywang
* *
*/ */
@Data
public class GdbElement { public class GdbElement {
String id = null; private String id = null;
String label = null; private String label = null;
Map<String, Object> properties = new HashMap<>(); private List<GdbProperty> properties = new LinkedList<>();
public String getId() {
return this.id;
}
public void setId(final String id) {
final int maxIdLength = MapperConfig.getInstance().getMaxIdLength();
if (id.length() > maxIdLength) {
throw new IllegalArgumentException("id length over limit(" + maxIdLength + ")");
}
this.id = id;
}
public String getLabel() {
return this.label;
}
public void setLabel(final String label) {
final int maxLabelLength = MapperConfig.getInstance().getMaxLabelLength();
if (label.length() > maxLabelLength) {
throw new IllegalArgumentException("label length over limit(" + maxLabelLength + ")");
}
this.label = label;
}
public List<GdbProperty> getProperties() {
return this.properties;
}
public void addProperty(final String propKey, final Object propValue, final PropertyType card) {
if (propKey == null || propValue == null) {
return;
}
final int maxPropKeyLength = MapperConfig.getInstance().getMaxPropKeyLength();
if (propKey.length() > maxPropKeyLength) {
throw new IllegalArgumentException("property key length over limit(" + maxPropKeyLength + ")");
}
if (propValue instanceof String) {
final int maxPropValueLength = MapperConfig.getInstance().getMaxPropValueLength();
if (((String)propValue).length() > maxPropKeyLength) {
throw new IllegalArgumentException("property value length over limit(" + maxPropValueLength + ")");
}
}
this.properties.add(new GdbProperty(propKey, propValue, card));
}
public void addProperty(final String propKey, final Object propValue) {
addProperty(propKey, propValue, PropertyType.single);
}
@Override
public String toString() {
final StringBuffer sb = new StringBuffer(this.id + "[" + this.label + "]{");
this.properties.forEach(n -> {
sb.append(n.cardinality.name());
sb.append("[");
sb.append(n.key);
sb.append(" - ");
sb.append(String.valueOf(n.value));
sb.append("]");
});
return sb.toString();
}
public static class GdbProperty {
private String key;
private Object value;
private PropertyType cardinality;
private GdbProperty(final String key, final Object value, final PropertyType card) {
this.key = key;
this.value = value;
this.cardinality = card;
}
public PropertyType getCardinality() {
return this.cardinality;
}
public String getKey() {
return this.key;
}
public Object getValue() {
return this.value;
}
}
} }

View File

@ -3,18 +3,19 @@
*/ */
package com.alibaba.datax.plugin.writer.gdbwriter.model; package com.alibaba.datax.plugin.writer.gdbwriter.model;
import com.alibaba.datax.common.element.Record;
import groovy.lang.Tuple2;
import java.util.List; import java.util.List;
import com.alibaba.datax.common.element.Record;
import groovy.lang.Tuple2;
/** /**
* @author jerrywang * @author jerrywang
* *
*/ */
public interface GdbGraph extends AutoCloseable { public interface GdbGraph extends AutoCloseable {
List<Tuple2<Record, Exception>> add(List<Tuple2<Record, GdbElement>> records); List<Tuple2<Record, Exception>> add(List<Tuple2<Record, GdbElement>> records);
@Override @Override
void close(); void close();
} }

View File

@ -3,15 +3,17 @@
*/ */
package com.alibaba.datax.plugin.writer.gdbwriter.model; package com.alibaba.datax.plugin.writer.gdbwriter.model;
import java.util.*; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import com.alibaba.datax.common.element.Record; import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.writer.gdbwriter.Key; import com.alibaba.datax.plugin.writer.gdbwriter.Key;
import com.alibaba.datax.plugin.writer.gdbwriter.util.GdbDuplicateIdException; import com.alibaba.datax.plugin.writer.gdbwriter.util.GdbDuplicateIdException;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import groovy.lang.Tuple2; import groovy.lang.Tuple2;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -21,176 +23,198 @@ import lombok.extern.slf4j.Slf4j;
*/ */
@Slf4j @Slf4j
public class ScriptGdbGraph extends AbstractGdbGraph { public class ScriptGdbGraph extends AbstractGdbGraph {
private static final String VAR_PREFIX = "GDB___"; private static final String VAR_PREFIX = "GDB___";
private static final String VAR_ID = VAR_PREFIX + "id"; private static final String VAR_ID = VAR_PREFIX + "id";
private static final String VAR_LABEL = VAR_PREFIX + "label"; private static final String VAR_LABEL = VAR_PREFIX + "label";
private static final String VAR_FROM = VAR_PREFIX + "from"; private static final String VAR_FROM = VAR_PREFIX + "from";
private static final String VAR_TO = VAR_PREFIX + "to"; private static final String VAR_TO = VAR_PREFIX + "to";
private static final String VAR_PROP_KEY = VAR_PREFIX + "PK"; private static final String VAR_PROP_KEY = VAR_PREFIX + "PK";
private static final String VAR_PROP_VALUE = VAR_PREFIX + "PV"; private static final String VAR_PROP_VALUE = VAR_PREFIX + "PV";
private static final String ADD_V_START = "g.addV(" + VAR_LABEL + ").property(id, " + VAR_ID + ")"; private static final String ADD_V_START = "g.addV(" + VAR_LABEL + ").property(id, " + VAR_ID + ")";
private static final String ADD_E_START = "g.addE(" + VAR_LABEL + ").property(id, " + VAR_ID + ").from(V(" private static final String ADD_E_START =
+ VAR_FROM + ")).to(V(" + VAR_TO + "))"; "g.addE(" + VAR_LABEL + ").property(id, " + VAR_ID + ").from(V(" + VAR_FROM + ")).to(V(" + VAR_TO + "))";
private static final String UPDATE_V_START = "g.V("+VAR_ID+")"; private static final String UPDATE_V_START = "g.V(" + VAR_ID + ")";
private static final String UPDATE_E_START = "g.E("+VAR_ID+")"; private static final String UPDATE_E_START = "g.E(" + VAR_ID + ")";
private Cache<Integer, String> propertyCache; private Random random;
private Random random;
public ScriptGdbGraph() { public ScriptGdbGraph() {
propertyCache = Caffeine.newBuilder().maximumSize(1024).build(); this.random = new Random();
random = new Random(); }
}
public ScriptGdbGraph(Configuration config, boolean session) { public ScriptGdbGraph(final Configuration config, final boolean session) {
super(config, session); super(config, session);
propertyCache = Caffeine.newBuilder().maximumSize(1024).build(); this.random = new Random();
random = new Random(); log.info("Init as ScriptGdbGraph.");
}
log.info("Init as ScriptGdbGraph."); /**
} * Apply list of {@link GdbElement} to GDB, return the failed records
*
* @param records
* list of element to apply
* @return
*/
@Override
public List<Tuple2<Record, Exception>> add(final List<Tuple2<Record, GdbElement>> records) {
final List<Tuple2<Record, Exception>> errors = new ArrayList<>();
try {
beginTx();
for (final Tuple2<Record, GdbElement> elementTuple2 : records) {
try {
addInternal(elementTuple2.getSecond());
} catch (final Exception e) {
errors.add(new Tuple2<>(elementTuple2.getFirst(), e));
}
}
doCommit();
} catch (final Exception ex) {
doRollback();
throw new RuntimeException(ex);
}
return errors;
}
/** private void addInternal(final GdbElement element) {
* Apply list of {@link GdbElement} to GDB, return the failed records try {
* @param records list of element to apply addInternal(element, false);
* @return } catch (final GdbDuplicateIdException e) {
*/ if (this.updateMode == Key.UpdateMode.SKIP) {
@Override log.debug("Skip duplicate id {}", element.getId());
public List<Tuple2<Record, Exception>> add(List<Tuple2<Record, GdbElement>> records) { } else if (this.updateMode == Key.UpdateMode.INSERT) {
List<Tuple2<Record, Exception>> errors = new ArrayList<>(); throw new RuntimeException(e);
try { } else if (this.updateMode == Key.UpdateMode.MERGE) {
beginTx(); if (element.getProperties().isEmpty()) {
for (Tuple2<Record, GdbElement> elementTuple2 : records) { return;
try { }
addInternal(elementTuple2.getSecond());
} catch (Exception e) {
errors.add(new Tuple2<>(elementTuple2.getFirst(), e));
}
}
doCommit();
} catch (Exception ex) {
doRollback();
throw new RuntimeException(ex);
}
return errors;
}
private void addInternal(GdbElement element) { try {
try { addInternal(element, true);
addInternal(element, false); } catch (final GdbDuplicateIdException e1) {
} catch (GdbDuplicateIdException e) { log.error("duplicate id {} while update...", element.getId());
if (updateMode == Key.UpdateMode.SKIP) { throw new RuntimeException(e1);
log.debug("Skip duplicate id {}", element.getId()); }
} else if (updateMode == Key.UpdateMode.INSERT) { }
throw new RuntimeException(e); }
} else if (updateMode == Key.UpdateMode.MERGE) { }
if (element.getProperties().isEmpty()) {
return;
}
try { private void addInternal(final GdbElement element, final boolean update) throws GdbDuplicateIdException {
addInternal(element, true); boolean firstAdd = !update;
} catch (GdbDuplicateIdException e1) { final boolean isVertex = (element instanceof GdbVertex);
log.error("duplicate id {} while update...", element.getId()); final List<GdbElement.GdbProperty> params = element.getProperties();
throw new RuntimeException(e1); final List<GdbElement.GdbProperty> subParams = new ArrayList<>(this.propertiesBatchNum);
}
}
}
}
private void addInternal(GdbElement element, boolean update) throws GdbDuplicateIdException { final int idLength = element.getId().length();
Map<String, Object> params = element.getProperties(); int attachLength = element.getLabel().length();
Map<String, Object> subParams = new HashMap<>(propertiesBatchNum); if (element instanceof GdbEdge) {
boolean firstAdd = !update; attachLength += ((GdbEdge)element).getFrom().length();
boolean isVertex = (element instanceof GdbVertex); attachLength += ((GdbEdge)element).getTo().length();
}
for (Map.Entry<String, Object> entry : params.entrySet()) { int requestLength = idLength;
subParams.put(entry.getKey(), entry.getValue()); for (final GdbElement.GdbProperty entry : params) {
if (subParams.size() >= propertiesBatchNum) { final String propKey = entry.getKey();
setGraphDbElement(element, subParams, isVertex, firstAdd); final Object propValue = entry.getValue();
firstAdd = false;
subParams.clear();
}
}
if (!subParams.isEmpty() || firstAdd) {
setGraphDbElement(element, subParams, isVertex, firstAdd);
}
}
private Tuple2<String, Map<String, Object>> buildDsl(GdbElement element, int appendLength = propKey.length();
Map<String, Object> properties, if (propValue instanceof String) {
boolean isVertex, boolean firstAdd) { appendLength += ((String)propValue).length();
Map<String, Object> params = new HashMap<>(); }
String dslPropertyPart = propertyCache.get(properties.size(), keys -> { if (checkSplitDsl(firstAdd, requestLength, attachLength, appendLength, subParams.size())) {
final StringBuilder sb = new StringBuilder(); setGraphDbElement(element, subParams, isVertex, firstAdd);
for (int i = 0; i < keys; i++) { firstAdd = false;
sb.append(".property(").append(VAR_PROP_KEY).append(i) subParams.clear();
.append(", ").append(VAR_PROP_VALUE).append(i).append(")"); requestLength = idLength;
} }
return sb.toString();
});
String dsl; requestLength += appendLength;
if (isVertex) { subParams.add(entry);
dsl = (firstAdd ? ADD_V_START : UPDATE_V_START) + dslPropertyPart; }
} else { if (!subParams.isEmpty() || firstAdd) {
dsl = (firstAdd ? ADD_E_START : UPDATE_E_START) + dslPropertyPart; checkSplitDsl(firstAdd, requestLength, attachLength, 0, 0);
if (firstAdd) { setGraphDbElement(element, subParams, isVertex, firstAdd);
params.put(VAR_FROM, ((GdbEdge)element).getFrom()); }
params.put(VAR_TO, ((GdbEdge)element).getTo()); }
}
}
int index = 0; private boolean checkSplitDsl(final boolean firstAdd, final int requestLength, final int attachLength, final int appendLength,
for (Map.Entry<String, Object> entry : properties.entrySet()) { final int propNum) {
params.put(VAR_PROP_KEY+index, entry.getKey()); final int length = firstAdd ? requestLength + attachLength : requestLength;
params.put(VAR_PROP_VALUE+index, entry.getValue()); if (length > this.maxRequestLength) {
index++; throw new IllegalArgumentException("request length over limit(" + this.maxRequestLength + ")");
} }
return length + appendLength > this.maxRequestLength || propNum >= this.propertiesBatchNum;
}
if (firstAdd) { private Tuple2<String, Map<String, Object>> buildDsl(final GdbElement element, final List<GdbElement.GdbProperty> properties,
params.put(VAR_LABEL, element.getLabel()); final boolean isVertex, final boolean firstAdd) {
} final Map<String, Object> params = new HashMap<>();
params.put(VAR_ID, element.getId()); final StringBuilder sb = new StringBuilder();
if (isVertex) {
sb.append(firstAdd ? ADD_V_START : UPDATE_V_START);
} else {
sb.append(firstAdd ? ADD_E_START : UPDATE_E_START);
}
return new Tuple2<>(dsl, params); for (int i = 0; i < properties.size(); i++) {
} final GdbElement.GdbProperty prop = properties.get(i);
private void setGraphDbElement(GdbElement element, Map<String, Object> properties, sb.append(".property(");
boolean isVertex, boolean firstAdd) throws GdbDuplicateIdException { if (prop.getCardinality() == Key.PropertyType.set) {
int retry = 10; sb.append("set, ");
int idleTime = random.nextInt(10) + 10; }
Tuple2<String, Map<String, Object>> elementDsl = buildDsl(element, properties, isVertex, firstAdd); sb.append(VAR_PROP_KEY).append(i).append(", ").append(VAR_PROP_VALUE).append(i).append(")");
while (retry > 0) { params.put(VAR_PROP_KEY + i, prop.getKey());
try { params.put(VAR_PROP_VALUE + i, prop.getValue());
runInternal(elementDsl.getFirst(), elementDsl.getSecond()); }
log.debug("AddElement {}", element.getId());
return; if (firstAdd) {
} catch (Exception e) { params.put(VAR_LABEL, element.getLabel());
String cause = e.getCause() == null ? "" : e.getCause().toString(); if (!isVertex) {
if (cause.contains("rejected from")) { params.put(VAR_FROM, ((GdbEdge)element).getFrom());
retry--; params.put(VAR_TO, ((GdbEdge)element).getTo());
try { }
Thread.sleep(idleTime); }
} catch (InterruptedException e1) { params.put(VAR_ID, element.getId());
// ...
} return new Tuple2<>(sb.toString(), params);
idleTime = Math.min(idleTime * 2, 2000); }
continue;
} else if (firstAdd && cause.contains("GraphDB id exists")) { private void setGraphDbElement(final GdbElement element, final List<GdbElement.GdbProperty> properties, final boolean isVertex,
throw new GdbDuplicateIdException(e); final boolean firstAdd) throws GdbDuplicateIdException {
} int retry = 10;
log.error("Add Failed id {}, dsl {}, params {}, e {}", element.getId(), int idleTime = this.random.nextInt(10) + 10;
elementDsl.getFirst(), elementDsl.getSecond(), e); final Tuple2<String, Map<String, Object>> elementDsl = buildDsl(element, properties, isVertex, firstAdd);
throw new RuntimeException(e);
} while (retry > 0) {
} try {
log.error("Add Failed id {}, dsl {}, params {}", element.getId(), runInternal(elementDsl.getFirst(), elementDsl.getSecond());
elementDsl.getFirst(), elementDsl.getSecond()); log.debug("AddElement {}", element.getId());
throw new RuntimeException("failed to queue new element to server"); return;
} } catch (final Exception e) {
final String cause = e.getCause() == null ? "" : e.getCause().toString();
if (cause.contains("rejected from") || cause.contains("Timeout waiting to lock key")) {
retry--;
try {
Thread.sleep(idleTime);
} catch (final InterruptedException e1) {
// ...
}
idleTime = Math.min(idleTime * 2, 2000);
continue;
} else if (firstAdd && cause.contains("GraphDB id exists")) {
throw new GdbDuplicateIdException(e);
}
log.error("Add Failed id {}, dsl {}, params {}, e {}", element.getId(), elementDsl.getFirst(),
elementDsl.getSecond(), e);
throw new RuntimeException(e);
}
}
log.error("Add Failed id {}, dsl {}, params {}", element.getId(), elementDsl.getFirst(),
elementDsl.getSecond());
throw new RuntimeException("failed to queue new element to server");
}
} }

View File

@ -7,53 +7,57 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.function.Supplier; import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.writer.gdbwriter.GdbWriterErrorCode; import com.alibaba.datax.plugin.writer.gdbwriter.GdbWriterErrorCode;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
/** /**
* @author jerrywang * @author jerrywang
* *
*/ */
public interface ConfigHelper { public interface ConfigHelper {
static void assertConfig(String key, Supplier<Boolean> f) { static void assertConfig(final String key, final Supplier<Boolean> f) {
if (!f.get()) { if (!f.get()) {
throw DataXException.asDataXException(GdbWriterErrorCode.BAD_CONFIG_VALUE, key); throw DataXException.asDataXException(GdbWriterErrorCode.BAD_CONFIG_VALUE, key);
} }
} }
static void assertHasContent(Configuration config, String key) { static void assertHasContent(final Configuration config, final String key) {
assertConfig(key, () -> StringUtils.isNotBlank(config.getString(key))); assertConfig(key, () -> StringUtils.isNotBlank(config.getString(key)));
} }
/** /**
* NOTE: {@code Configuration::get(String, Class<T>)} doesn't work. * NOTE: {@code Configuration::get(String, Class<T>)} doesn't work.
* *
* @param conf Configuration * @param conf
* @param key key path to configuration * Configuration
* @param cls Class of result type * @param key
* @return the target configuration object of type T * key path to configuration
*/ * @param cls
static <T> T getConfig(Configuration conf, String key, Class<T> cls) { * Class of result type
JSONObject j = (JSONObject) conf.get(key); * @return the target configuration object of type T
return JSON.toJavaObject(j, cls); */
} static <T> T getConfig(final Configuration conf, final String key, final Class<T> cls) {
final JSONObject j = (JSONObject)conf.get(key);
/** return JSON.toJavaObject(j, cls);
* Create a configuration from the specified file on the classpath. }
*
* @param name file name /**
* @return Configuration instance. * Create a configuration from the specified file on the classpath.
*/ *
static Configuration fromClasspath(String name) { * @param name
try (InputStream is = Thread.currentThread().getContextClassLoader().getResourceAsStream(name)) { * file name
return Configuration.from(is); * @return Configuration instance.
} catch (IOException e) { */
throw new IllegalArgumentException("File not found: " + name); static Configuration fromClasspath(final String name) {
} try (final InputStream is = Thread.currentThread().getContextClassLoader().getResourceAsStream(name)) {
} return Configuration.from(is);
} catch (final IOException e) {
throw new IllegalArgumentException("File not found: " + name);
}
}
} }

View File

@ -1,9 +1,8 @@
/* /*
* (C) 2019-present Alibaba Group Holding Limited. * (C) 2019-present Alibaba Group Holding Limited.
* *
* This program is free software; you can redistribute it and/or modify * This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public
* it under the terms of the GNU General Public License version 2 as * License version 2 as published by the Free Software Foundation.
* published by the Free Software Foundation.
*/ */
package com.alibaba.datax.plugin.writer.gdbwriter.util; package com.alibaba.datax.plugin.writer.gdbwriter.util;
@ -13,11 +12,11 @@ package com.alibaba.datax.plugin.writer.gdbwriter.util;
*/ */
public class GdbDuplicateIdException extends Exception { public class GdbDuplicateIdException extends Exception {
public GdbDuplicateIdException(Exception e) { public GdbDuplicateIdException(Exception e) {
super(e); super(e);
} }
public GdbDuplicateIdException() { public GdbDuplicateIdException() {
super(); super();
} }
} }