mapper = null;
+ private static GdbGraph globalGraph = null;
+ private static boolean session = false;
+
+ /**
+ * Job 中的方法仅执行一次,Task 中方法会由框架启动多个 Task 线程并行执行。
+ *
+ * 整个 Writer 执行流程是:
+ *
+ * Job类init-->prepare-->split
+ *
+ * Task类init-->prepare-->startWrite-->post-->destroy
+ * Task类init-->prepare-->startWrite-->post-->destroy
+ *
+ * Job类post-->destroy
+ *
+ */
+ public static class Job extends Writer.Job {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(Job.class);
+
+ private Configuration jobConfig = null;
+
+ @Override
+ public void init() {
+ LOG.info("GDB datax plugin writer job init begin ...");
+ this.jobConfig = getPluginJobConf();
+ GdbWriterConfig.of(this.jobConfig);
+ LOG.info("GDB datax plugin writer job init end.");
+
+ /**
+ * 注意:此方法仅执行一次。
+ * 最佳实践:通常在这里对用户的配置进行校验:是否缺失必填项?有无错误值?有没有无关配置项?...
+ * 并给出清晰的报错/警告提示。校验通常建议采用静态工具类进行,以保证本类结构清晰。
+ */
+ }
+
+ @Override
+ public void prepare() {
+ /**
+ * 注意:此方法仅执行一次。
+ * 最佳实践:如果 Job 中有需要进行数据同步之前的处理,可以在此处完成,如果没有必要则可以直接去掉。
+ */
+ super.prepare();
+
+ MappingRule rule = MappingRuleFactory.getInstance().createV2(jobConfig);
+
+ mapper = new DefaultGdbMapper().getMapper(rule);
+ session = jobConfig.getBool(Key.SESSION_STATE, false);
+
+ /**
+ * client connect check before task
+ */
+ try {
+ globalGraph = GdbGraphManager.instance().getGraph(jobConfig, false);
+ } catch (RuntimeException e) {
+ throw DataXException.asDataXException(GdbWriterErrorCode.FAIL_CLIENT_CONNECT, e.getMessage());
+ }
+ }
+
+ @Override
+ public List split(int mandatoryNumber) {
+ /**
+ * 注意:此方法仅执行一次。
+ * 最佳实践:通常采用工具静态类完成把 Job 配置切分成多个 Task 配置的工作。
+ * 这里的 mandatoryNumber 是强制必须切分的份数。
+ */
+ LOG.info("split begin...");
+ List configurationList = new ArrayList();
+ for (int i = 0; i < mandatoryNumber; i++) {
+ configurationList.add(this.jobConfig.clone());
+ }
+ LOG.info("split end...");
+ return configurationList;
+ }
+
+ @Override
+ public void post() {
+ /**
+ * 注意:此方法仅执行一次。
+ * 最佳实践:如果 Job 中有需要进行数据同步之后的后续处理,可以在此处完成。
+ */
+ globalGraph.close();
+ }
+
+ @Override
+ public void destroy() {
+ /**
+ * 注意:此方法仅执行一次。
+ * 最佳实践:通常配合 Job 中的 post() 方法一起完成 Job 的资源释放。
+ */
+ }
+
+ }
+
+ @Slf4j
+ public static class Task extends Writer.Task {
+
+ private Configuration taskConfig;
+
+ private int failed = 0;
+ private int batchRecords;
+ private ExecutorService submitService = null;
+ private GdbGraph graph;
+
+ @Override
+ public void init() {
+ /**
+ * 注意:此方法每个 Task 都会执行一次。
+ * 最佳实践:此处通过对 taskConfig 配置的读取,进而初始化一些资源为 startWrite()做准备。
+ */
+ this.taskConfig = super.getPluginJobConf();
+ batchRecords = taskConfig.getInt(Key.MAX_RECORDS_IN_BATCH, GdbWriterConfig.DEFAULT_RECORD_NUM_IN_BATCH);
+ submitService = new ThreadPoolExecutor(1, 1, 0L,
+ TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(), new DefaultThreadFactory("submit-dsl"));
+
+ if (!session) {
+ graph = globalGraph;
+ } else {
+ /**
+ * 分批创建session client,由于服务端groovy编译性能的限制
+ */
+ try {
+ Thread.sleep((getTaskId()/10)*10000);
+ } catch (Exception e) {
+ // ...
+ }
+ graph = GdbGraphManager.instance().getGraph(taskConfig, session);
+ }
+ }
+
+ @Override
+ public void prepare() {
+ /**
+ * 注意:此方法每个 Task 都会执行一次。
+ * 最佳实践:如果 Task 中有需要进行数据同步之前的处理,可以在此处完成,如果没有必要则可以直接去掉。
+ */
+ super.prepare();
+ }
+
+ @Override
+ public void startWrite(RecordReceiver recordReceiver) {
+ /**
+ * 注意:此方法每个 Task 都会执行一次。
+ * 最佳实践:此处适当封装确保简洁清晰完成数据写入工作。
+ */
+ Record r;
+ Future future = null;
+ List> records = new ArrayList<>(batchRecords);
+
+ while ((r = recordReceiver.getFromReader()) != null) {
+ records.add(new Tuple2<>(r, mapper.apply(r)));
+
+ if (records.size() >= batchRecords) {
+ wait4Submit(future);
+
+ final List> batch = records;
+ future = submitService.submit(() -> batchCommitRecords(batch));
+ records = new ArrayList<>(batchRecords);
+ }
+ }
+
+ wait4Submit(future);
+ if (!records.isEmpty()) {
+ final List> batch = records;
+ future = submitService.submit(() -> batchCommitRecords(batch));
+ wait4Submit(future);
+ }
+ }
+
+ private void wait4Submit(Future future) {
+ if (future == null) {
+ return;
+ }
+
+ try {
+ future.get();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private boolean batchCommitRecords(final List> records) {
+ TaskPluginCollector collector = getTaskPluginCollector();
+ try {
+ List> errors = graph.add(records);
+ errors.forEach(t -> collector.collectDirtyRecord(t.getFirst(), t.getSecond()));
+ failed += errors.size();
+ } catch (Exception e) {
+ records.forEach(t -> collector.collectDirtyRecord(t.getFirst(), e));
+ failed += records.size();
+ }
+
+ records.clear();
+ return true;
+ }
+
+ @Override
+ public void post() {
+ /**
+ * 注意:此方法每个 Task 都会执行一次。
+ * 最佳实践:如果 Task 中有需要进行数据同步之后的后续处理,可以在此处完成。
+ */
+ log.info("Task done, dirty record count - {}", failed);
+ }
+
+ @Override
+ public void destroy() {
+ /**
+ * 注意:此方法每个 Task 都会执行一次。
+ * 最佳实践:通常配合Task 中的 post() 方法一起完成 Task 的资源释放。
+ */
+ if (session) {
+ graph.close();
+ }
+ submitService.shutdown();
+ }
+
+ }
+
+}
\ No newline at end of file
diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/GdbWriterErrorCode.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/GdbWriterErrorCode.java
new file mode 100644
index 00000000..a6f506ef
--- /dev/null
+++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/GdbWriterErrorCode.java
@@ -0,0 +1,33 @@
+package com.alibaba.datax.plugin.writer.gdbwriter;
+
+import com.alibaba.datax.common.spi.ErrorCode;
+
+public enum GdbWriterErrorCode implements ErrorCode {
+ BAD_CONFIG_VALUE("GdbWriter-00", "您配置的值不合法."),
+ CONFIG_ITEM_MISS("GdbWriter-01", "您配置项缺失."),
+ FAIL_CLIENT_CONNECT("GdbWriter-02", "GDB连接异常."),;
+
+ private final String code;
+ private final String description;
+
+ private GdbWriterErrorCode(String code, String description) {
+ this.code = code;
+ this.description = description;
+ }
+
+ @Override
+ public String getCode() {
+ return this.code;
+ }
+
+ @Override
+ public String getDescription() {
+ return this.description;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Code:[%s], Description:[%s]. ", this.code,
+ this.description);
+ }
+}
\ No newline at end of file
diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/Key.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/Key.java
new file mode 100644
index 00000000..f2e37005
--- /dev/null
+++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/Key.java
@@ -0,0 +1,141 @@
+package com.alibaba.datax.plugin.writer.gdbwriter;
+
+public final class Key {
+
+ /**
+ * 此处声明插件用到的需要插件使用者提供的配置项
+ */
+
+ public final static String HOST = "host";
+ public final static String PORT = "port";
+ public final static String USERNAME = "username";
+ public static final String PASSWORD = "password";
+
+ /**
+ * import type and mode
+ */
+ public static final String IMPORT_TYPE = "labelType";
+ public static final String UPDATE_MODE = "writeMode";
+
+ /**
+ * label prefix issue
+ */
+ public static final String ID_TRANS_RULE = "idTransRule";
+ public static final String SRC_ID_TRANS_RULE = "srcIdTransRule";
+ public static final String DST_ID_TRANS_RULE = "dstIdTransRule";
+
+ public static final String LABEL = "label";
+ public static final String SRC_LABEL = "srcLabel";
+ public static final String DST_LABEL = "dstLabel";
+
+ public static final String MAPPING = "mapping";
+
+ /**
+ * column define in Gdb
+ */
+ public static final String COLUMN = "column";
+ public static final String COLUMN_NAME = "name";
+ public static final String COLUMN_VALUE = "value";
+ public static final String COLUMN_TYPE = "type";
+ public static final String COLUMN_NODE_TYPE = "columnType";
+
+ /**
+ * Gdb Vertex/Edge elements
+ */
+ public static final String ID = "id";
+ public static final String FROM = "from";
+ public static final String TO = "to";
+ public static final String PROPERTIES = "properties";
+ public static final String PROP_KEY = "name";
+ public static final String PROP_VALUE = "value";
+ public static final String PROP_TYPE = "type";
+
+ public static final String PROPERTIES_JSON_STR = "propertiesJsonStr";
+ public static final String MAX_PROPERTIES_BATCH_NUM = "maxPropertiesBatchNumber";
+
+ /**
+ * session less client configure for connect pool
+ */
+ public static final String MAX_IN_PROCESS_PER_CONNECTION = "maxInProcessPerConnection";
+ public static final String MAX_CONNECTION_POOL_SIZE = "maxConnectionPoolSize";
+ public static final String MAX_SIMULTANEOUS_USAGE_PER_CONNECTION = "maxSimultaneousUsagePerConnection";
+
+ public static final String MAX_RECORDS_IN_BATCH = "maxRecordsInBatch";
+ public static final String SESSION_STATE = "session";
+
+ public static enum ImportType {
+ /**
+ * Import vertices
+ */
+ VERTEX,
+ /**
+ * Import edges
+ */
+ EDGE;
+ }
+
+ public static enum UpdateMode {
+ /**
+ * Insert new records, fail if exists
+ */
+ INSERT,
+ /**
+ * Skip this record if exists
+ */
+ SKIP,
+ /**
+ * Update property of this record if exists
+ */
+ MERGE;
+ }
+
+ public static enum ColumnType {
+ /**
+ * vertex or edge id
+ */
+ primaryKey,
+
+ /**
+ * vertex property
+ */
+ vertexProperty,
+
+ /**
+ * start vertex id of edge
+ */
+ srcPrimaryKey,
+
+ /**
+ * end vertex id of edge
+ */
+ dstPrimaryKey,
+
+ /**
+ * edge property
+ */
+ edgeProperty,
+
+ /**
+ * vertex json style property
+ */
+ vertexJsonProperty,
+
+ /**
+ * edge json style property
+ */
+ edgeJsonProperty
+ }
+
+ public static enum IdTransRule {
+ /**
+ * vertex or edge id with 'label' prefix
+ */
+ labelPrefix,
+
+ /**
+ * vertex or edge id raw
+ */
+ none
+ }
+
+}
diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/client/GdbGraphManager.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/client/GdbGraphManager.java
new file mode 100644
index 00000000..ac06013c
--- /dev/null
+++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/client/GdbGraphManager.java
@@ -0,0 +1,39 @@
+/**
+ *
+ */
+package com.alibaba.datax.plugin.writer.gdbwriter.client;
+
+import com.alibaba.datax.common.util.Configuration;
+import com.alibaba.datax.plugin.writer.gdbwriter.model.GdbGraph;
+import com.alibaba.datax.plugin.writer.gdbwriter.model.ScriptGdbGraph;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author jerrywang
+ *
+ */
+public class GdbGraphManager implements AutoCloseable {
+ private static final GdbGraphManager instance = new GdbGraphManager();
+
+ private List graphs = new ArrayList<>();
+
+ public static GdbGraphManager instance() {
+ return instance;
+ }
+
+ public GdbGraph getGraph(Configuration config, boolean session) {
+ GdbGraph graph = new ScriptGdbGraph(config, session);
+ graphs.add(graph);
+ return graph;
+ }
+
+ @Override
+ public void close() {
+ for(GdbGraph graph : graphs) {
+ graph.close();
+ }
+ graphs.clear();
+ }
+}
diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/client/GdbWriterConfig.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/client/GdbWriterConfig.java
new file mode 100644
index 00000000..0266a010
--- /dev/null
+++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/client/GdbWriterConfig.java
@@ -0,0 +1,41 @@
+/**
+ *
+ */
+package com.alibaba.datax.plugin.writer.gdbwriter.client;
+
+import com.alibaba.datax.common.util.Configuration;
+import com.alibaba.datax.plugin.writer.gdbwriter.Key;
+
+import static com.alibaba.datax.plugin.writer.gdbwriter.util.ConfigHelper.*;
+
+/**
+ * @author jerrywang
+ *
+ */
+public class GdbWriterConfig {
+ public static final int DEFAULT_MAX_IN_PROCESS_PER_CONNECTION = 4;
+ public static final int DEFAULT_MAX_CONNECTION_POOL_SIZE = 8;
+ public static final int DEFAULT_MAX_SIMULTANEOUS_USAGE_PER_CONNECTION = 8;
+ public static final int DEFAULT_BATCH_PROPERTY_NUM = 30;
+ public static final int DEFAULT_RECORD_NUM_IN_BATCH = 16;
+
+ private Configuration config;
+
+ private GdbWriterConfig(Configuration config) {
+ this.config = config;
+
+ validate();
+ }
+
+ private void validate() {
+ assertHasContent(config, Key.HOST);
+ assertConfig(Key.PORT, () -> config.getInt(Key.PORT) > 0);
+
+ assertHasContent(config, Key.USERNAME);
+ assertHasContent(config, Key.PASSWORD);
+ }
+
+ public static GdbWriterConfig of(Configuration config) {
+ return new GdbWriterConfig(config);
+ }
+}
diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/DefaultGdbMapper.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/DefaultGdbMapper.java
new file mode 100644
index 00000000..f5957295
--- /dev/null
+++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/DefaultGdbMapper.java
@@ -0,0 +1,190 @@
+/**
+ *
+ */
+package com.alibaba.datax.plugin.writer.gdbwriter.mapping;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import com.alibaba.datax.common.element.Record;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.alibaba.datax.plugin.writer.gdbwriter.Key;
+import com.alibaba.datax.plugin.writer.gdbwriter.model.GdbEdge;
+import com.alibaba.datax.plugin.writer.gdbwriter.model.GdbElement;
+import com.alibaba.datax.plugin.writer.gdbwriter.model.GdbVertex;
+
+import lombok.extern.slf4j.Slf4j;
+
+import static com.alibaba.datax.plugin.writer.gdbwriter.Key.ImportType.VERTEX;
+
+/**
+ * @author jerrywang
+ *
+ */
+@Slf4j
+public class DefaultGdbMapper implements GdbMapper {
+ private static final Pattern STR_PATTERN = Pattern.compile("\\$\\{(\\d+)}");
+ private static final Pattern NORMAL_PATTERN = Pattern.compile("^\\$\\{(\\d+)}$");
+
+ @Override
+ public Function getMapper(MappingRule rule) {
+ return r -> {
+ GdbElement e = (rule.getImportType() == VERTEX) ? new GdbVertex() : new GdbEdge();
+ forElement(rule).accept(r, e);
+ return e;
+ };
+ }
+
+ private static BiConsumer forElement(MappingRule rule) {
+ List> properties = new ArrayList<>();
+ for (MappingRule.PropertyMappingRule propRule : rule.getProperties()) {
+ Function keyFunc = forStrColumn(propRule.getKey());
+
+ if (propRule.getValueType() == ValueType.STRING) {
+ final Function valueFunc = forStrColumn(propRule.getValue());
+ properties.add((r, e) -> {
+ String k = keyFunc.apply(r);
+ String v = valueFunc.apply(r);
+ if (k != null && v != null) {
+ e.getProperties().put(k, v);
+ }
+ });
+ } else {
+ final Function valueFunc = forObjColumn(propRule.getValue(), propRule.getValueType());
+ properties.add((r, e) -> {
+ String k = keyFunc.apply(r);
+ Object v = valueFunc.apply(r);
+ if (k != null && v != null) {
+ e.getProperties().put(k, v);
+ }
+ });
+ }
+ }
+
+ if (rule.getPropertiesJsonStr() != null) {
+ Function jsonFunc = forStrColumn(rule.getPropertiesJsonStr());
+ properties.add((r, e) -> {
+ String propertiesStr = jsonFunc.apply(r);
+ JSONObject root = (JSONObject)JSONObject.parse(propertiesStr);
+ JSONArray propertiesList = root.getJSONArray("properties");
+
+ for (Object object : propertiesList) {
+ JSONObject jsonObject = (JSONObject)object;
+ String key = jsonObject.getString("k");
+ String name = jsonObject.getString("v");
+ String type = jsonObject.getString("t");
+
+ if (key == null || name == null) {
+ continue;
+ }
+ addToProperties(e, key, name, type);
+ }
+ });
+ }
+
+ BiConsumer ret = (r, e) -> {
+ String label = forStrColumn(rule.getLabel()).apply(r);
+ String id = forStrColumn(rule.getId()).apply(r);
+
+ if (rule.getImportType() == Key.ImportType.EDGE) {
+ String to = forStrColumn(rule.getTo()).apply(r);
+ String from = forStrColumn(rule.getFrom()).apply(r);
+ if (to == null || from == null) {
+ log.error("invalid record to: {} , from: {}", to, from);
+ throw new IllegalArgumentException("to or from missed in edge");
+ }
+ ((GdbEdge)e).setTo(to);
+ ((GdbEdge)e).setFrom(from);
+
+ // generate UUID for edge
+ if (id == null) {
+ id = UUID.randomUUID().toString();
+ }
+ }
+
+ if (id == null || label == null) {
+ log.error("invalid record id: {} , label: {}", id, label);
+ throw new IllegalArgumentException("id or label missed");
+ }
+
+ e.setId(id);
+ e.setLabel(label);
+
+ properties.forEach(p -> p.accept(r, e));
+ };
+ return ret;
+ }
+
+ static Function forObjColumn(String rule, ValueType type) {
+ Matcher m = NORMAL_PATTERN.matcher(rule);
+ if (m.matches()) {
+ int index = Integer.valueOf(m.group(1));
+ return r -> type.applyColumn(r.getColumn(index));
+ } else {
+ return r -> type.fromStrFunc(rule);
+ }
+ }
+
+ static Function forStrColumn(String rule) {
+ List> list = new ArrayList<>();
+ Matcher m = STR_PATTERN.matcher(rule);
+ int last = 0;
+ while (m.find()) {
+ String index = m.group(1);
+ // as simple integer index.
+ int i = Integer.parseInt(index);
+
+ final int tmp = last;
+ final int start = m.start();
+ list.add((sb, record) -> {
+ sb.append(rule.subSequence(tmp, start));
+ if(record.getColumn(i) != null && record.getColumn(i).getByteSize() > 0) {
+ sb.append(record.getColumn(i).asString());
+ }
+ });
+
+ last = m.end();
+ }
+
+ final int tmp = last;
+ list.add((sb, record) -> {
+ sb.append(rule.subSequence(tmp, rule.length()));
+ });
+
+ return r -> {
+ StringBuilder sb = new StringBuilder();
+ list.forEach(c -> c.accept(sb, r));
+ String res = sb.toString();
+ return res.isEmpty() ? null : res;
+ };
+ }
+
+ static boolean addToProperties(GdbElement e, String key, String value, String type) {
+ ValueType valueType = ValueType.fromShortName(type);
+
+ if(valueType == ValueType.STRING) {
+ e.getProperties().put(key, value);
+ } else if (valueType == ValueType.INT) {
+ e.getProperties().put(key, Integer.valueOf(value));
+ } else if (valueType == ValueType.LONG) {
+ e.getProperties().put(key, Long.valueOf(value));
+ } else if (valueType == ValueType.DOUBLE) {
+ e.getProperties().put(key, Double.valueOf(value));
+ } else if (valueType == ValueType.FLOAT) {
+ e.getProperties().put(key, Float.valueOf(value));
+ } else if (valueType == ValueType.BOOLEAN) {
+ e.getProperties().put(key, Boolean.valueOf(value));
+ } else {
+ log.error("invalid property key {}, value {}, type {}", key, value, type);
+ return false;
+ }
+
+ return true;
+ }
+}
diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/GdbMapper.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/GdbMapper.java
new file mode 100644
index 00000000..3282f203
--- /dev/null
+++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/GdbMapper.java
@@ -0,0 +1,17 @@
+/**
+ *
+ */
+package com.alibaba.datax.plugin.writer.gdbwriter.mapping;
+
+import java.util.function.Function;
+
+import com.alibaba.datax.common.element.Record;
+import com.alibaba.datax.plugin.writer.gdbwriter.model.GdbElement;
+
+/**
+ * @author jerrywang
+ *
+ */
+public interface GdbMapper {
+ Function getMapper(MappingRule rule);
+}
diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/MappingRule.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/MappingRule.java
new file mode 100644
index 00000000..c0c58d88
--- /dev/null
+++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/MappingRule.java
@@ -0,0 +1,41 @@
+/**
+ *
+ */
+package com.alibaba.datax.plugin.writer.gdbwriter.mapping;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.alibaba.datax.plugin.writer.gdbwriter.Key.ImportType;
+
+import lombok.Data;
+
+/**
+ * @author jerrywang
+ *
+ */
+@Data
+public class MappingRule {
+ private String id = null;
+
+ private String label = null;
+
+ private ImportType importType = null;
+
+ private String from = null;
+
+ private String to = null;
+
+ private List properties = new ArrayList<>();
+
+ private String propertiesJsonStr = null;
+
+ @Data
+ public static class PropertyMappingRule {
+ private String key = null;
+
+ private String value = null;
+
+ private ValueType valueType = null;
+ }
+}
diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/MappingRuleFactory.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/MappingRuleFactory.java
new file mode 100644
index 00000000..0738ac17
--- /dev/null
+++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/MappingRuleFactory.java
@@ -0,0 +1,181 @@
+/**
+ *
+ */
+package com.alibaba.datax.plugin.writer.gdbwriter.mapping;
+
+import com.alibaba.datax.common.exception.DataXException;
+import com.alibaba.datax.common.util.Configuration;
+import com.alibaba.datax.plugin.writer.gdbwriter.GdbWriterErrorCode;
+import com.alibaba.datax.plugin.writer.gdbwriter.Key;
+import com.alibaba.datax.plugin.writer.gdbwriter.Key.ImportType;
+import com.alibaba.datax.plugin.writer.gdbwriter.Key.IdTransRule;
+import com.alibaba.datax.plugin.writer.gdbwriter.Key.ColumnType;
+import com.alibaba.datax.plugin.writer.gdbwriter.mapping.MappingRule.PropertyMappingRule;
+import com.alibaba.datax.plugin.writer.gdbwriter.util.ConfigHelper;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.List;
+
+/**
+ * @author jerrywang
+ *
+ */
+@Slf4j
+public class MappingRuleFactory {
+ private static final MappingRuleFactory instance = new MappingRuleFactory();
+
+ public static final MappingRuleFactory getInstance() {
+ return instance;
+ }
+
+ @Deprecated
+ public MappingRule create(Configuration config, ImportType type) {
+ MappingRule rule = new MappingRule();
+ rule.setId(config.getString(Key.ID));
+ rule.setLabel(config.getString(Key.LABEL));
+ if (type == ImportType.EDGE) {
+ rule.setFrom(config.getString(Key.FROM));
+ rule.setTo(config.getString(Key.TO));
+ }
+
+ rule.setImportType(type);
+
+ List configurations = config.getListConfiguration(Key.PROPERTIES);
+ if (configurations != null) {
+ for (Configuration prop : config.getListConfiguration(Key.PROPERTIES)) {
+ PropertyMappingRule propRule = new PropertyMappingRule();
+ propRule.setKey(prop.getString(Key.PROP_KEY));
+ propRule.setValue(prop.getString(Key.PROP_VALUE));
+ propRule.setValueType(ValueType.fromShortName(prop.getString(Key.PROP_TYPE).toLowerCase()));
+ rule.getProperties().add(propRule);
+ }
+ }
+
+ String propertiesJsonStr = config.getString(Key.PROPERTIES_JSON_STR, null);
+ if (propertiesJsonStr != null) {
+ rule.setPropertiesJsonStr(propertiesJsonStr);
+ }
+
+ return rule;
+ }
+
+ public MappingRule createV2(Configuration config) {
+ try {
+ ImportType type = ImportType.valueOf(config.getString(Key.IMPORT_TYPE));
+ return createV2(config, type);
+ } catch (NullPointerException e) {
+ throw DataXException.asDataXException(GdbWriterErrorCode.CONFIG_ITEM_MISS, Key.IMPORT_TYPE);
+ } catch (IllegalArgumentException e) {
+ throw DataXException.asDataXException(GdbWriterErrorCode.BAD_CONFIG_VALUE, Key.IMPORT_TYPE);
+ }
+ }
+
+ public MappingRule createV2(Configuration config, ImportType type) {
+ MappingRule rule = new MappingRule();
+
+ ConfigHelper.assertHasContent(config, Key.LABEL);
+ rule.setLabel(config.getString(Key.LABEL));
+ rule.setImportType(type);
+
+ IdTransRule srcTransRule = IdTransRule.none;
+ IdTransRule dstTransRule = IdTransRule.none;
+ if (type == ImportType.EDGE) {
+ ConfigHelper.assertHasContent(config, Key.SRC_ID_TRANS_RULE);
+ ConfigHelper.assertHasContent(config, Key.DST_ID_TRANS_RULE);
+
+ srcTransRule = IdTransRule.valueOf(config.getString(Key.SRC_ID_TRANS_RULE));
+ dstTransRule = IdTransRule.valueOf(config.getString(Key.DST_ID_TRANS_RULE));
+
+ if (srcTransRule == IdTransRule.labelPrefix) {
+ ConfigHelper.assertHasContent(config, Key.SRC_LABEL);
+ }
+
+ if (dstTransRule == IdTransRule.labelPrefix) {
+ ConfigHelper.assertHasContent(config, Key.DST_LABEL);
+ }
+ }
+ ConfigHelper.assertHasContent(config, Key.ID_TRANS_RULE);
+ IdTransRule transRule = IdTransRule.valueOf(config.getString(Key.ID_TRANS_RULE));
+
+ List configurationList = config.getListConfiguration(Key.COLUMN);
+ ConfigHelper.assertConfig(Key.COLUMN, () -> (configurationList != null && !configurationList.isEmpty()));
+ for (Configuration column : configurationList) {
+ ConfigHelper.assertHasContent(column, Key.COLUMN_NAME);
+ ConfigHelper.assertHasContent(column, Key.COLUMN_VALUE);
+ ConfigHelper.assertHasContent(column, Key.COLUMN_TYPE);
+ ConfigHelper.assertHasContent(column, Key.COLUMN_NODE_TYPE);
+
+ String columnValue = column.getString(Key.COLUMN_VALUE);
+ ColumnType columnType = ColumnType.valueOf(column.getString(Key.COLUMN_NODE_TYPE));
+ if (columnValue == null || columnValue.isEmpty()) {
+ // only allow edge empty id
+ ConfigHelper.assertConfig("empty column value",
+ () -> (type == ImportType.EDGE && columnType == ColumnType.primaryKey));
+ }
+
+ if (columnType == ColumnType.primaryKey) {
+ ValueType propType = ValueType.fromShortName(column.getString(Key.COLUMN_TYPE));
+ ConfigHelper.assertConfig("only string is allowed in primary key", () -> (propType == ValueType.STRING));
+
+ if (transRule == IdTransRule.labelPrefix) {
+ rule.setId(config.getString(Key.LABEL) + columnValue);
+ } else {
+ rule.setId(columnValue);
+ }
+ } else if (columnType == ColumnType.edgeJsonProperty || columnType == ColumnType.vertexJsonProperty) {
+ // only support one json property in column
+ ConfigHelper.assertConfig("multi JsonProperty", () -> (rule.getPropertiesJsonStr() == null));
+
+ rule.setPropertiesJsonStr(columnValue);
+ } else if (columnType == ColumnType.vertexProperty || columnType == ColumnType.edgeProperty) {
+ PropertyMappingRule propertyMappingRule = new PropertyMappingRule();
+
+ propertyMappingRule.setKey(column.getString(Key.COLUMN_NAME));
+ propertyMappingRule.setValue(columnValue);
+ ValueType propType = ValueType.fromShortName(column.getString(Key.COLUMN_TYPE));
+ ConfigHelper.assertConfig("unsupported property type", () -> propType != null);
+
+ propertyMappingRule.setValueType(propType);
+ rule.getProperties().add(propertyMappingRule);
+ } else if (columnType == ColumnType.srcPrimaryKey) {
+ if (type != ImportType.EDGE) {
+ continue;
+ }
+
+ ValueType propType = ValueType.fromShortName(column.getString(Key.COLUMN_TYPE));
+ ConfigHelper.assertConfig("only string is allowed in primary key", () -> (propType == ValueType.STRING));
+
+ if (srcTransRule == IdTransRule.labelPrefix) {
+ rule.setFrom(config.getString(Key.SRC_LABEL) + columnValue);
+ } else {
+ rule.setFrom(columnValue);
+ }
+ } else if (columnType == ColumnType.dstPrimaryKey) {
+ if (type != ImportType.EDGE) {
+ continue;
+ }
+
+ ValueType propType = ValueType.fromShortName(column.getString(Key.COLUMN_TYPE));
+ ConfigHelper.assertConfig("only string is allowed in primary key", () -> (propType == ValueType.STRING));
+
+ if (dstTransRule == IdTransRule.labelPrefix) {
+ rule.setTo(config.getString(Key.DST_LABEL) + columnValue);
+ } else {
+ rule.setTo(columnValue);
+ }
+ }
+ }
+
+ if (rule.getImportType() == ImportType.EDGE) {
+ if (rule.getId() == null) {
+ rule.setId("");
+ log.info("edge id is missed, uuid be default");
+ }
+ ConfigHelper.assertConfig("to needed in edge", () -> (rule.getTo() != null));
+ ConfigHelper.assertConfig("from needed in edge", () -> (rule.getFrom() != null));
+ }
+ ConfigHelper.assertConfig("id needed", () -> (rule.getId() != null));
+
+ return rule;
+ }
+}
diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/ValueType.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/ValueType.java
new file mode 100644
index 00000000..9ad8bd8d
--- /dev/null
+++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/ValueType.java
@@ -0,0 +1,71 @@
+/**
+ *
+ */
+package com.alibaba.datax.plugin.writer.gdbwriter.mapping;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+
+import com.alibaba.datax.common.element.Column;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * @author jerrywang
+ *
+ */
+@Slf4j
+public enum ValueType {
+ INT(Integer.class, "int", Column::asLong, Integer::valueOf),
+ LONG(Long.class, "long", Column::asLong, Long::valueOf),
+ DOUBLE(Double.class, "double", Column::asDouble, Double::valueOf),
+ FLOAT(Float.class, "float", Column::asDouble, Float::valueOf),
+ BOOLEAN(Boolean.class, "boolean", Column::asBoolean, Boolean::valueOf),
+ STRING(String.class, "string", Column::asString, String::valueOf);
+
+ private Class> type = null;
+ private String shortName = null;
+ private Function columnFunc = null;
+ private Function fromStrFunc = null;
+
+ private ValueType(Class> type, String name, Function columnFunc, Function fromStrFunc) {
+ this.type = type;
+ this.shortName = name;
+ this.columnFunc = columnFunc;
+ this.fromStrFunc = fromStrFunc;
+
+ ValueTypeHolder.shortName2type.put(name, this);
+ }
+
+ public static ValueType fromShortName(String name) {
+ return ValueTypeHolder.shortName2type.get(name);
+ }
+
+ public Class> type() {
+ return this.type;
+ }
+
+ public String shortName() {
+ return this.shortName;
+ }
+
+ public Object applyColumn(Column column) {
+ try {
+ if (column == null) {
+ return null;
+ }
+ return columnFunc.apply(column);
+ } catch (Exception e) {
+ log.error("applyColumn error {}, column {}", e.toString(), column);
+ throw e;
+ }
+ }
+
+ public Object fromStrFunc(String str) {
+ return fromStrFunc.apply(str);
+ }
+
+ private static class ValueTypeHolder {
+ private static Map shortName2type = new HashMap<>();
+ }
+}
diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/AbstractGdbGraph.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/AbstractGdbGraph.java
new file mode 100644
index 00000000..0c31c644
--- /dev/null
+++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/AbstractGdbGraph.java
@@ -0,0 +1,151 @@
+/**
+ *
+ */
+package com.alibaba.datax.plugin.writer.gdbwriter.model;
+
+import com.alibaba.datax.common.util.Configuration;
+import com.alibaba.datax.plugin.writer.gdbwriter.Key;
+import com.alibaba.datax.plugin.writer.gdbwriter.client.GdbWriterConfig;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.tinkerpop.gremlin.driver.Client;
+import org.apache.tinkerpop.gremlin.driver.Cluster;
+import org.apache.tinkerpop.gremlin.driver.RequestOptions;
+import org.apache.tinkerpop.gremlin.driver.ResultSet;
+import org.apache.tinkerpop.gremlin.driver.ser.Serializers;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author jerrywang
+ *
+ */
+@Slf4j
+public abstract class AbstractGdbGraph implements GdbGraph {
+ private final static int DEFAULT_TIMEOUT = 30000;
+
+ protected Client client = null;
+ protected Key.UpdateMode updateMode = Key.UpdateMode.INSERT;
+ protected int propertiesBatchNum = GdbWriterConfig.DEFAULT_BATCH_PROPERTY_NUM;
+ protected boolean session = false;
+
+
+ protected AbstractGdbGraph() {}
+
+ protected AbstractGdbGraph(Configuration config, boolean session) {
+ initClient(config, session);
+ }
+
+ protected void initClient(Configuration config, boolean session) {
+ updateMode = Key.UpdateMode.valueOf(config.getString(Key.UPDATE_MODE, "INSERT"));
+ log.info("init graphdb client");
+ String host = config.getString(Key.HOST);
+ int port = config.getInt(Key.PORT);
+ String username = config.getString(Key.USERNAME);
+ String password = config.getString(Key.PASSWORD);
+ int maxDepthPerConnection = config.getInt(Key.MAX_IN_PROCESS_PER_CONNECTION,
+ GdbWriterConfig.DEFAULT_MAX_IN_PROCESS_PER_CONNECTION);
+
+ int maxConnectionPoolSize = config.getInt(Key.MAX_CONNECTION_POOL_SIZE,
+ GdbWriterConfig.DEFAULT_MAX_CONNECTION_POOL_SIZE);
+
+ int maxSimultaneousUsagePerConnection = config.getInt(Key.MAX_SIMULTANEOUS_USAGE_PER_CONNECTION,
+ GdbWriterConfig.DEFAULT_MAX_SIMULTANEOUS_USAGE_PER_CONNECTION);
+
+ this.session = session;
+ if (this.session) {
+ maxConnectionPoolSize = GdbWriterConfig.DEFAULT_MAX_CONNECTION_POOL_SIZE;
+ maxDepthPerConnection = GdbWriterConfig.DEFAULT_MAX_IN_PROCESS_PER_CONNECTION;
+ maxSimultaneousUsagePerConnection = GdbWriterConfig.DEFAULT_MAX_SIMULTANEOUS_USAGE_PER_CONNECTION;
+ }
+
+ try {
+ Cluster cluster = Cluster.build(host).port(port).credentials(username, password)
+ .serializer(Serializers.GRAPHBINARY_V1D0)
+ .maxContentLength(1048576)
+ .maxInProcessPerConnection(maxDepthPerConnection)
+ .minInProcessPerConnection(0)
+ .maxConnectionPoolSize(maxConnectionPoolSize)
+ .minConnectionPoolSize(maxConnectionPoolSize)
+ .maxSimultaneousUsagePerConnection(maxSimultaneousUsagePerConnection)
+ .resultIterationBatchSize(64)
+ .create();
+ client = session ? cluster.connect(UUID.randomUUID().toString()).init() : cluster.connect().init();
+ warmClient(maxConnectionPoolSize*maxDepthPerConnection);
+ } catch (RuntimeException e) {
+ log.error("Failed to connect to GDB {}:{}, due to {}", host, port, e);
+ throw e;
+ }
+
+ propertiesBatchNum = config.getInt(Key.MAX_PROPERTIES_BATCH_NUM, GdbWriterConfig.DEFAULT_BATCH_PROPERTY_NUM);
+ }
+
+
+ /**
+ * @param dsl
+ * @param parameters
+ */
+ protected void runInternal(String dsl, final Map parameters) throws Exception {
+ RequestOptions.Builder options = RequestOptions.build().timeout(DEFAULT_TIMEOUT);
+ if (parameters != null && !parameters.isEmpty()) {
+ parameters.forEach(options::addParameter);
+ }
+
+ ResultSet results = client.submitAsync(dsl, options.create()).get(DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS);
+ results.all().get(DEFAULT_TIMEOUT + 1000, TimeUnit.MILLISECONDS);
+ }
+
+ void beginTx() {
+ if (!session) {
+ return;
+ }
+
+ String dsl = "g.tx().open()";
+ client.submit(dsl).all().join();
+ }
+
+ void doCommit() {
+ if (!session) {
+ return;
+ }
+
+ try {
+ String dsl = "g.tx().commit()";
+ client.submit(dsl).all().join();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ void doRollback() {
+ if (!session) {
+ return;
+ }
+
+ String dsl = "g.tx().rollback()";
+ client.submit(dsl).all().join();
+ }
+
+ private void warmClient(int num) {
+ try {
+ beginTx();
+ runInternal("g.V('test')", null);
+ doCommit();
+ log.info("warm graphdb client over");
+ } catch (Exception e) {
+ log.error("warmClient error");
+ doRollback();
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() {
+ if (client != null) {
+ log.info("close graphdb client");
+ client.close();
+ }
+ }
+}
diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/GdbEdge.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/GdbEdge.java
new file mode 100644
index 00000000..d42c9182
--- /dev/null
+++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/GdbEdge.java
@@ -0,0 +1,20 @@
+/**
+ *
+ */
+package com.alibaba.datax.plugin.writer.gdbwriter.model;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+/**
+ * @author jerrywang
+ *
+ */
+@Data
+@EqualsAndHashCode(callSuper = true)
+@ToString(callSuper = true)
+public class GdbEdge extends GdbElement {
+ private String from = null;
+ private String to = null;
+}
diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/GdbElement.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/GdbElement.java
new file mode 100644
index 00000000..af3c7090
--- /dev/null
+++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/GdbElement.java
@@ -0,0 +1,20 @@
+/**
+ *
+ */
+package com.alibaba.datax.plugin.writer.gdbwriter.model;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import lombok.Data;
+
+/**
+ * @author jerrywang
+ *
+ */
+@Data
+public class GdbElement {
+ String id = null;
+ String label = null;
+ Map properties = new HashMap<>();
+}
diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/GdbGraph.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/GdbGraph.java
new file mode 100644
index 00000000..5b98c502
--- /dev/null
+++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/GdbGraph.java
@@ -0,0 +1,20 @@
+/**
+ *
+ */
+package com.alibaba.datax.plugin.writer.gdbwriter.model;
+
+import com.alibaba.datax.common.element.Record;
+import groovy.lang.Tuple2;
+
+import java.util.List;
+
+/**
+ * @author jerrywang
+ *
+ */
+public interface GdbGraph extends AutoCloseable {
+ List> add(List> records);
+
+ @Override
+ void close();
+}
diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/GdbVertex.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/GdbVertex.java
new file mode 100644
index 00000000..0bc762bd
--- /dev/null
+++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/GdbVertex.java
@@ -0,0 +1,17 @@
+/**
+ *
+ */
+package com.alibaba.datax.plugin.writer.gdbwriter.model;
+
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+/**
+ * @author jerrywang
+ *
+ */
+@EqualsAndHashCode(callSuper = true)
+@ToString(callSuper = true)
+public class GdbVertex extends GdbElement {
+
+}
diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/ScriptGdbGraph.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/ScriptGdbGraph.java
new file mode 100644
index 00000000..7f898431
--- /dev/null
+++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/ScriptGdbGraph.java
@@ -0,0 +1,196 @@
+/**
+ *
+ */
+package com.alibaba.datax.plugin.writer.gdbwriter.model;
+
+import java.util.*;
+
+import com.alibaba.datax.common.element.Record;
+import com.alibaba.datax.common.util.Configuration;
+
+import com.alibaba.datax.plugin.writer.gdbwriter.Key;
+import com.alibaba.datax.plugin.writer.gdbwriter.util.GdbDuplicateIdException;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import groovy.lang.Tuple2;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * @author jerrywang
+ *
+ */
+@Slf4j
+public class ScriptGdbGraph extends AbstractGdbGraph {
+ private static final String VAR_PREFIX = "GDB___";
+ private static final String VAR_ID = VAR_PREFIX + "id";
+ private static final String VAR_LABEL = VAR_PREFIX + "label";
+ private static final String VAR_FROM = VAR_PREFIX + "from";
+ private static final String VAR_TO = VAR_PREFIX + "to";
+ private static final String VAR_PROP_KEY = VAR_PREFIX + "PK";
+ private static final String VAR_PROP_VALUE = VAR_PREFIX + "PV";
+ private static final String ADD_V_START = "g.addV(" + VAR_LABEL + ").property(id, " + VAR_ID + ")";
+ private static final String ADD_E_START = "g.addE(" + VAR_LABEL + ").property(id, " + VAR_ID + ").from(V("
+ + VAR_FROM + ")).to(V(" + VAR_TO + "))";
+
+ private static final String UPDATE_V_START = "g.V("+VAR_ID+")";
+ private static final String UPDATE_E_START = "g.E("+VAR_ID+")";
+
+ private Cache propertyCache;
+ private Random random;
+
+ public ScriptGdbGraph() {
+ propertyCache = Caffeine.newBuilder().maximumSize(1024).build();
+ random = new Random();
+ }
+
+ public ScriptGdbGraph(Configuration config, boolean session) {
+ super(config, session);
+
+ propertyCache = Caffeine.newBuilder().maximumSize(1024).build();
+ random = new Random();
+
+ log.info("Init as ScriptGdbGraph.");
+ }
+
+ /**
+ * Apply list of {@link GdbElement} to GDB, return the failed records
+ * @param records list of element to apply
+ * @return
+ */
+ @Override
+ public List> add(List> records) {
+ List> errors = new ArrayList<>();
+ try {
+ beginTx();
+ for (Tuple2 elementTuple2 : records) {
+ try {
+ addInternal(elementTuple2.getSecond());
+ } catch (Exception e) {
+ errors.add(new Tuple2<>(elementTuple2.getFirst(), e));
+ }
+ }
+ doCommit();
+ } catch (Exception ex) {
+ doRollback();
+ throw new RuntimeException(ex);
+ }
+ return errors;
+ }
+
+ private void addInternal(GdbElement element) {
+ try {
+ addInternal(element, false);
+ } catch (GdbDuplicateIdException e) {
+ if (updateMode == Key.UpdateMode.SKIP) {
+ log.debug("Skip duplicate id {}", element.getId());
+ } else if (updateMode == Key.UpdateMode.INSERT) {
+ throw new RuntimeException(e);
+ } else if (updateMode == Key.UpdateMode.MERGE) {
+ if (element.getProperties().isEmpty()) {
+ return;
+ }
+
+ try {
+ addInternal(element, true);
+ } catch (GdbDuplicateIdException e1) {
+ log.error("duplicate id {} while update...", element.getId());
+ throw new RuntimeException(e1);
+ }
+ }
+ }
+ }
+
+ private void addInternal(GdbElement element, boolean update) throws GdbDuplicateIdException {
+ Map params = element.getProperties();
+ Map subParams = new HashMap<>(propertiesBatchNum);
+ boolean firstAdd = !update;
+ boolean isVertex = (element instanceof GdbVertex);
+
+ for (Map.Entry entry : params.entrySet()) {
+ subParams.put(entry.getKey(), entry.getValue());
+ if (subParams.size() >= propertiesBatchNum) {
+ setGraphDbElement(element, subParams, isVertex, firstAdd);
+ firstAdd = false;
+ subParams.clear();
+ }
+ }
+ if (!subParams.isEmpty() || firstAdd) {
+ setGraphDbElement(element, subParams, isVertex, firstAdd);
+ }
+ }
+
+ private Tuple2> buildDsl(GdbElement element,
+ Map properties,
+ boolean isVertex, boolean firstAdd) {
+ Map params = new HashMap<>();
+
+ String dslPropertyPart = propertyCache.get(properties.size(), keys -> {
+ final StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < keys; i++) {
+ sb.append(".property(").append(VAR_PROP_KEY).append(i)
+ .append(", ").append(VAR_PROP_VALUE).append(i).append(")");
+ }
+ return sb.toString();
+ });
+
+ String dsl;
+ if (isVertex) {
+ dsl = (firstAdd ? ADD_V_START : UPDATE_V_START) + dslPropertyPart;
+ } else {
+ dsl = (firstAdd ? ADD_E_START : UPDATE_E_START) + dslPropertyPart;
+ if (firstAdd) {
+ params.put(VAR_FROM, ((GdbEdge)element).getFrom());
+ params.put(VAR_TO, ((GdbEdge)element).getTo());
+ }
+ }
+
+ int index = 0;
+ for (Map.Entry entry : properties.entrySet()) {
+ params.put(VAR_PROP_KEY+index, entry.getKey());
+ params.put(VAR_PROP_VALUE+index, entry.getValue());
+ index++;
+ }
+
+ if (firstAdd) {
+ params.put(VAR_LABEL, element.getLabel());
+ }
+ params.put(VAR_ID, element.getId());
+
+ return new Tuple2<>(dsl, params);
+ }
+
+ private void setGraphDbElement(GdbElement element, Map properties,
+ boolean isVertex, boolean firstAdd) throws GdbDuplicateIdException {
+ int retry = 10;
+ int idleTime = random.nextInt(10) + 10;
+ Tuple2> elementDsl = buildDsl(element, properties, isVertex, firstAdd);
+
+ while (retry > 0) {
+ try {
+ runInternal(elementDsl.getFirst(), elementDsl.getSecond());
+ log.debug("AddElement {}", element.getId());
+ return;
+ } catch (Exception e) {
+ String cause = e.getCause() == null ? "" : e.getCause().toString();
+ if (cause.contains("rejected from")) {
+ retry--;
+ try {
+ Thread.sleep(idleTime);
+ } catch (InterruptedException e1) {
+ // ...
+ }
+ idleTime = Math.min(idleTime * 2, 2000);
+ continue;
+ } else if (firstAdd && cause.contains("GraphDB id exists")) {
+ throw new GdbDuplicateIdException(e);
+ }
+ log.error("Add Failed id {}, dsl {}, params {}, e {}", element.getId(),
+ elementDsl.getFirst(), elementDsl.getSecond(), e);
+ throw new RuntimeException(e);
+ }
+ }
+ log.error("Add Failed id {}, dsl {}, params {}", element.getId(),
+ elementDsl.getFirst(), elementDsl.getSecond());
+ throw new RuntimeException("failed to queue new element to server");
+ }
+}
diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/util/ConfigHelper.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/util/ConfigHelper.java
new file mode 100644
index 00000000..77175197
--- /dev/null
+++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/util/ConfigHelper.java
@@ -0,0 +1,59 @@
+/**
+ *
+ */
+package com.alibaba.datax.plugin.writer.gdbwriter.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.function.Supplier;
+
+import com.alibaba.datax.common.exception.DataXException;
+import com.alibaba.datax.common.util.Configuration;
+import com.alibaba.datax.plugin.writer.gdbwriter.GdbWriterErrorCode;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ * @author jerrywang
+ *
+ */
+public interface ConfigHelper {
+ static void assertConfig(String key, Supplier f) {
+ if (!f.get()) {
+ throw DataXException.asDataXException(GdbWriterErrorCode.BAD_CONFIG_VALUE, key);
+ }
+ }
+
+ static void assertHasContent(Configuration config, String key) {
+ assertConfig(key, () -> StringUtils.isNotBlank(config.getString(key)));
+ }
+
+ /**
+ * NOTE: {@code Configuration::get(String, Class)} doesn't work.
+ *
+ * @param conf Configuration
+ * @param key key path to configuration
+ * @param cls Class of result type
+ * @return the target configuration object of type T
+ */
+ static T getConfig(Configuration conf, String key, Class cls) {
+ JSONObject j = (JSONObject) conf.get(key);
+ return JSON.toJavaObject(j, cls);
+ }
+
+ /**
+ * Create a configuration from the specified file on the classpath.
+ *
+ * @param name file name
+ * @return Configuration instance.
+ */
+ static Configuration fromClasspath(String name) {
+ try (InputStream is = Thread.currentThread().getContextClassLoader().getResourceAsStream(name)) {
+ return Configuration.from(is);
+ } catch (IOException e) {
+ throw new IllegalArgumentException("File not found: " + name);
+ }
+ }
+}
diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/util/GdbDuplicateIdException.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/util/GdbDuplicateIdException.java
new file mode 100644
index 00000000..e531d51b
--- /dev/null
+++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/util/GdbDuplicateIdException.java
@@ -0,0 +1,23 @@
+/*
+ * (C) 2019-present Alibaba Group Holding Limited.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ */
+package com.alibaba.datax.plugin.writer.gdbwriter.util;
+
+/**
+ * @author : Liu Jianping
+ * @date : 2019/8/3
+ */
+
+public class GdbDuplicateIdException extends Exception {
+ public GdbDuplicateIdException(Exception e) {
+ super(e);
+ }
+
+ public GdbDuplicateIdException() {
+ super();
+ }
+}
diff --git a/gdbwriter/src/main/resources/plugin.json b/gdbwriter/src/main/resources/plugin.json
new file mode 100644
index 00000000..52d1b11d
--- /dev/null
+++ b/gdbwriter/src/main/resources/plugin.json
@@ -0,0 +1,6 @@
+{
+ "name": "gdbwriter",
+ "class": "com.alibaba.datax.plugin.writer.gdbwriter.GdbWriter",
+ "description": "useScene: prod. mechanism: connect GDB with gremlin-client, execute DSL as 'g.addV() or g.addE()' to write record",
+ "developer": "alibaba"
+}
diff --git a/gdbwriter/src/main/resources/plugin_job_template.json b/gdbwriter/src/main/resources/plugin_job_template.json
new file mode 100644
index 00000000..09a03142
--- /dev/null
+++ b/gdbwriter/src/main/resources/plugin_job_template.json
@@ -0,0 +1,74 @@
+{
+ "job": {
+ "setting": {
+ "speed": {
+ "channel": 1
+ }
+ },
+
+ "content": [
+ {
+ "reader": {
+ "name": "odpsreader"
+ },
+
+ "writer": {
+ "name": "gdbwriter",
+ "parameter": {
+ "host": "localhost",
+ "port": 8182,
+ "username": "username",
+ "password": "password",
+ "label": "test-label",
+ "srcLabel": "test-srcLabel-",
+ "dstLabel": "test-dstLabel-",
+ "labelType": "EDGE",
+ "writeMode": "INSERT",
+ "idTransRule": "labelPrefix",
+ "srcIdTransRule": "labelPrefix",
+ "dstIdTransRule": "labelPrefix",
+
+ "column": [
+ {
+ "name": "id",
+ "value": "-test-${0}",
+ "type": "string",
+ "columnType": "primaryKey"
+ },
+ {
+ "name": "id",
+ "value": "from-id-${2}",
+ "type": "string",
+ "columnType": "srcPrimaryKey"
+ },
+ {
+ "name": "id",
+ "value": "to-id-${3}",
+ "type": "string",
+ "columnType": "dstPrimaryKey"
+ },
+ {
+ "name": "strValue-${2}-key",
+ "value": "strValue-${2}-value",
+ "type": "string",
+ "columnType": "edgeProperty"
+ },
+ {
+ "name": "intProp",
+ "value": "${3}",
+ "type": "int",
+ "columnType": "edgeProperty"
+ },
+ {
+ "name": "booleanProp",
+ "value": "${5}",
+ "type": "boolean",
+ "columnType": "edgeProperty"
+ }
+ ]
+ }
+ }
+ }
+ ]
+ }
+}
diff --git a/pom.xml b/pom.xml
index 79865473..50ee2080 100755
--- a/pom.xml
+++ b/pom.xml
@@ -88,6 +88,7 @@
elasticsearchwriter
tsdbwriter
adbpgwriter
+ gdbwriter
plugin-rdbms-util