diff --git a/gdbwriter/doc/gdbwriter.md b/gdbwriter/doc/gdbwriter.md index 82cdd899..70524a59 100644 --- a/gdbwriter/doc/gdbwriter.md +++ b/gdbwriter/doc/gdbwriter.md @@ -41,6 +41,14 @@ GDBWriter通过DataX框架获取Reader生成的协议数据,使用`g.addV/E(GD { "random": "60,64", "type": "string" + }, + { + "random": "100,1000", + "type": "long" + }, + { + "random": "32,48", + "type": "string" } ], "sliceRecordCount": 1000 @@ -70,6 +78,18 @@ GDBWriter通过DataX框架获取Reader生成的协议数据,使用`g.addV/E(GD "name": "vertex_propKey", "value": "${2}", "type": "string", + "columnType": "vertexSetProperty" + }, + { + "name": "vertex_propKey", + "value": "${3}", + "type": "long", + "columnType": "vertexSetProperty" + }, + { + "name": "vertex_propKey2", + "value": "${4}", + "type": "string", "columnType": "vertexProperty" } ] @@ -290,6 +310,7 @@ GDBWriter通过DataX框架获取Reader生成的协议数据,使用`g.addV/E(GD * primaryKey:表示该字段是主键id * 点枚举值: * vertexProperty:labelType为点时,表示该字段是点的普通属性 + * vertexSetProperty:labelType为点时,表示该字段是点的SET属性,value是SET属性中的一个属性值 * vertexJsonProperty:labelType为点时,表示是点json属性,value结构请见备注**json properties示例**,点配置最多只允许出现一个json属性; * 边枚举值: * srcPrimaryKey:labelType为边时,表示该字段是起点主键id @@ -305,6 +326,14 @@ GDBWriter通过DataX框架获取Reader生成的协议数据,使用`g.addV/E(GD > {"k":"age","t":"int","v":"20"}, > {"k":"sex","t":"string","v":"male"} > ]} + > + > # json格式同样支持给点添加SET属性,格式如下 + > {"properties":[ + > {"k":"name","t":"string","v":"tom","c":"set"}, + > {"k":"name","t":"string","v":"jack","c":"set"}, + > {"k":"age","t":"int","v":"20"}, + > {"k":"sex","t":"string","v":"male"} + > ]} > ``` ## 4 性能报告 @@ -367,4 +396,5 @@ DataX压测机器 - GDBWriter插件与用户查询DSL使用相同的GDB实例端口,导入时可能会影响查询性能 ## FAQ -无 +1. 使用SET属性需要升级GDB实例到`1.0.20`版本及以上。 +2. 边只支持普通单值属性,不能给边写SET属性数据。 diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/GdbWriter.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/GdbWriter.java index 753d89fc..6470e9e6 100644 --- a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/GdbWriter.java +++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/GdbWriter.java @@ -1,10 +1,5 @@ package com.alibaba.datax.plugin.writer.gdbwriter; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.*; -import java.util.function.Function; - import com.alibaba.datax.common.element.Record; import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.plugin.RecordReceiver; @@ -18,24 +13,33 @@ import com.alibaba.datax.plugin.writer.gdbwriter.mapping.MappingRule; import com.alibaba.datax.plugin.writer.gdbwriter.mapping.MappingRuleFactory; import com.alibaba.datax.plugin.writer.gdbwriter.model.GdbElement; import com.alibaba.datax.plugin.writer.gdbwriter.model.GdbGraph; - import groovy.lang.Tuple2; import io.netty.util.concurrent.DefaultThreadFactory; import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class GdbWriter extends Writer { - private static final Logger log = LoggerFactory.getLogger(GdbWriter.class); +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; - private static Function mapper = null; - private static GdbGraph globalGraph = null; - private static boolean session = false; +public class GdbWriter extends Writer { + private static final Logger log = LoggerFactory.getLogger(GdbWriter.class); + + private static Function mapper = null; + private static GdbGraph globalGraph = null; + private static boolean session = false; /** * Job 中的方法仅执行一次,Task 中方法会由框架启动多个 Task 线程并行执行。 *

* 整个 Writer 执行流程是: + * *

      * Job类init-->prepare-->split
      *
@@ -46,17 +50,16 @@ public class GdbWriter extends Writer {
      * 
*/ public static class Job extends Writer.Job { - private static final Logger LOG = LoggerFactory - .getLogger(Job.class); + private static final Logger LOG = LoggerFactory.getLogger(Job.class); private Configuration jobConfig = null; - + @Override public void init() { - LOG.info("GDB datax plugin writer job init begin ..."); - this.jobConfig = getPluginJobConf(); - GdbWriterConfig.of(this.jobConfig); - LOG.info("GDB datax plugin writer job init end."); + LOG.info("GDB datax plugin writer job init begin ..."); + this.jobConfig = getPluginJobConf(); + GdbWriterConfig.of(this.jobConfig); + LOG.info("GDB datax plugin writer job init end."); /** * 注意:此方法仅执行一次。 @@ -71,37 +74,37 @@ public class GdbWriter extends Writer { * 注意:此方法仅执行一次。 * 最佳实践:如果 Job 中有需要进行数据同步之前的处理,可以在此处完成,如果没有必要则可以直接去掉。 */ - super.prepare(); + super.prepare(); - MappingRule rule = MappingRuleFactory.getInstance().createV2(jobConfig); + final MappingRule rule = MappingRuleFactory.getInstance().createV2(this.jobConfig); - mapper = new DefaultGdbMapper().getMapper(rule); - session = jobConfig.getBool(Key.SESSION_STATE, false); + mapper = new DefaultGdbMapper(this.jobConfig).getMapper(rule); + session = this.jobConfig.getBool(Key.SESSION_STATE, false); /** * client connect check before task */ try { - globalGraph = GdbGraphManager.instance().getGraph(jobConfig, false); - } catch (RuntimeException e) { + globalGraph = GdbGraphManager.instance().getGraph(this.jobConfig, false); + } catch (final RuntimeException e) { throw DataXException.asDataXException(GdbWriterErrorCode.FAIL_CLIENT_CONNECT, e.getMessage()); } } @Override - public List split(int mandatoryNumber) { + public List split(final int mandatoryNumber) { /** * 注意:此方法仅执行一次。 * 最佳实践:通常采用工具静态类完成把 Job 配置切分成多个 Task 配置的工作。 * 这里的 mandatoryNumber 是强制必须切分的份数。 */ - LOG.info("split begin..."); - List configurationList = new ArrayList(); - for (int i = 0; i < mandatoryNumber; i++) { - configurationList.add(this.jobConfig.clone()); - } - LOG.info("split end..."); - return configurationList; + LOG.info("split begin..."); + final List configurationList = new ArrayList(); + for (int i = 0; i < mandatoryNumber; i++) { + configurationList.add(this.jobConfig.clone()); + } + LOG.info("split end..."); + return configurationList; } @Override @@ -127,7 +130,7 @@ public class GdbWriter extends Writer { public static class Task extends Writer.Task { private Configuration taskConfig; - + private int failed = 0; private int batchRecords; private ExecutorService submitService = null; @@ -139,24 +142,24 @@ public class GdbWriter extends Writer { * 注意:此方法每个 Task 都会执行一次。 * 最佳实践:此处通过对 taskConfig 配置的读取,进而初始化一些资源为 startWrite()做准备。 */ - this.taskConfig = super.getPluginJobConf(); - batchRecords = taskConfig.getInt(Key.MAX_RECORDS_IN_BATCH, GdbWriterConfig.DEFAULT_RECORD_NUM_IN_BATCH); - submitService = new ThreadPoolExecutor(1, 1, 0L, - TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(), new DefaultThreadFactory("submit-dsl")); + this.taskConfig = super.getPluginJobConf(); + this.batchRecords = this.taskConfig.getInt(Key.MAX_RECORDS_IN_BATCH, GdbWriterConfig.DEFAULT_RECORD_NUM_IN_BATCH); + this.submitService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(), + new DefaultThreadFactory("submit-dsl")); - if (!session) { - graph = globalGraph; - } else { - /** - * 分批创建session client,由于服务端groovy编译性能的限制 - */ - try { - Thread.sleep((getTaskId()/10)*10000); - } catch (Exception e) { - // ... - } - graph = GdbGraphManager.instance().getGraph(taskConfig, session); - } + if (!session) { + this.graph = globalGraph; + } else { + /** + * 分批创建session client,由于服务端groovy编译性能的限制 + */ + try { + Thread.sleep((getTaskId() / 10) * 10000); + } catch (final Exception e) { + // ... + } + this.graph = GdbGraphManager.instance().getGraph(this.taskConfig, session); + } } @Override @@ -165,64 +168,69 @@ public class GdbWriter extends Writer { * 注意:此方法每个 Task 都会执行一次。 * 最佳实践:如果 Task 中有需要进行数据同步之前的处理,可以在此处完成,如果没有必要则可以直接去掉。 */ - super.prepare(); + super.prepare(); } @Override - public void startWrite(RecordReceiver recordReceiver) { + public void startWrite(final RecordReceiver recordReceiver) { /** * 注意:此方法每个 Task 都会执行一次。 * 最佳实践:此处适当封装确保简洁清晰完成数据写入工作。 */ - Record r; - Future future = null; - List> records = new ArrayList<>(batchRecords); + Record r; + Future future = null; + List> records = new ArrayList<>(this.batchRecords); - while ((r = recordReceiver.getFromReader()) != null) { - records.add(new Tuple2<>(r, mapper.apply(r))); + while ((r = recordReceiver.getFromReader()) != null) { + try { + records.add(new Tuple2<>(r, mapper.apply(r))); + } catch (final Exception ex) { + getTaskPluginCollector().collectDirtyRecord(r, ex); + continue; + } - if (records.size() >= batchRecords) { - wait4Submit(future); + if (records.size() >= this.batchRecords) { + wait4Submit(future); - final List> batch = records; - future = submitService.submit(() -> batchCommitRecords(batch)); - records = new ArrayList<>(batchRecords); - } - } + final List> batch = records; + future = this.submitService.submit(() -> batchCommitRecords(batch)); + records = new ArrayList<>(this.batchRecords); + } + } - wait4Submit(future); - if (!records.isEmpty()) { - final List> batch = records; - future = submitService.submit(() -> batchCommitRecords(batch)); - wait4Submit(future); - } + wait4Submit(future); + if (!records.isEmpty()) { + final List> batch = records; + future = this.submitService.submit(() -> batchCommitRecords(batch)); + wait4Submit(future); + } } - private void wait4Submit(Future future) { - if (future == null) { - return; - } + private void wait4Submit(final Future future) { + if (future == null) { + return; + } - try { - future.get(); - } catch (Exception e) { - e.printStackTrace(); - } + try { + future.get(); + } catch (final Exception e) { + e.printStackTrace(); + } } private boolean batchCommitRecords(final List> records) { - TaskPluginCollector collector = getTaskPluginCollector(); - try { - List> errors = graph.add(records); - errors.forEach(t -> collector.collectDirtyRecord(t.getFirst(), t.getSecond())); - failed += errors.size(); - } catch (Exception e) { - records.forEach(t -> collector.collectDirtyRecord(t.getFirst(), e)); - failed += records.size(); - } + final TaskPluginCollector collector = getTaskPluginCollector(); + try { + final List> errors = this.graph.add(records); + errors.forEach(t -> collector.collectDirtyRecord(t.getFirst(), t.getSecond())); + this.failed += errors.size(); + } catch (final Exception e) { + records.forEach(t -> collector.collectDirtyRecord(t.getFirst(), e)); + this.failed += records.size(); + } - records.clear(); - return true; + records.clear(); + return true; } @Override @@ -231,7 +239,7 @@ public class GdbWriter extends Writer { * 注意:此方法每个 Task 都会执行一次。 * 最佳实践:如果 Task 中有需要进行数据同步之后的后续处理,可以在此处完成。 */ - log.info("Task done, dirty record count - {}", failed); + log.info("Task done, dirty record count - {}", this.failed); } @Override @@ -241,9 +249,9 @@ public class GdbWriter extends Writer { * 最佳实践:通常配合Task 中的 post() 方法一起完成 Task 的资源释放。 */ if (session) { - graph.close(); + this.graph.close(); } - submitService.shutdown(); + this.submitService.shutdown(); } } diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/GdbWriterErrorCode.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/GdbWriterErrorCode.java index a6f506ef..e1c9080b 100644 --- a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/GdbWriterErrorCode.java +++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/GdbWriterErrorCode.java @@ -27,7 +27,6 @@ public enum GdbWriterErrorCode implements ErrorCode { @Override public String toString() { - return String.format("Code:[%s], Description:[%s]. ", this.code, - this.description); + return String.format("Code:[%s], Description:[%s]. ", this.code, this.description); } } \ No newline at end of file diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/Key.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/Key.java index f2e37005..afa58239 100644 --- a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/Key.java +++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/Key.java @@ -6,136 +6,164 @@ public final class Key { * 此处声明插件用到的需要插件使用者提供的配置项 */ - public final static String HOST = "host"; - public final static String PORT = "port"; + public final static String HOST = "host"; + public final static String PORT = "port"; public final static String USERNAME = "username"; - public static final String PASSWORD = "password"; + public static final String PASSWORD = "password"; - /** - * import type and mode - */ - public static final String IMPORT_TYPE = "labelType"; - public static final String UPDATE_MODE = "writeMode"; + /** + * import type and mode + */ + public static final String IMPORT_TYPE = "labelType"; + public static final String UPDATE_MODE = "writeMode"; - /** - * label prefix issue - */ - public static final String ID_TRANS_RULE = "idTransRule"; - public static final String SRC_ID_TRANS_RULE = "srcIdTransRule"; - public static final String DST_ID_TRANS_RULE = "dstIdTransRule"; + /** + * label prefix issue + */ + public static final String ID_TRANS_RULE = "idTransRule"; + public static final String SRC_ID_TRANS_RULE = "srcIdTransRule"; + public static final String DST_ID_TRANS_RULE = "dstIdTransRule"; - public static final String LABEL = "label"; - public static final String SRC_LABEL = "srcLabel"; - public static final String DST_LABEL = "dstLabel"; + public static final String LABEL = "label"; + public static final String SRC_LABEL = "srcLabel"; + public static final String DST_LABEL = "dstLabel"; - public static final String MAPPING = "mapping"; + public static final String MAPPING = "mapping"; - /** - * column define in Gdb - */ - public static final String COLUMN = "column"; - public static final String COLUMN_NAME = "name"; - public static final String COLUMN_VALUE = "value"; - public static final String COLUMN_TYPE = "type"; - public static final String COLUMN_NODE_TYPE = "columnType"; + /** + * column define in Gdb + */ + public static final String COLUMN = "column"; + public static final String COLUMN_NAME = "name"; + public static final String COLUMN_VALUE = "value"; + public static final String COLUMN_TYPE = "type"; + public static final String COLUMN_NODE_TYPE = "columnType"; - /** - * Gdb Vertex/Edge elements - */ - public static final String ID = "id"; - public static final String FROM = "from"; - public static final String TO = "to"; - public static final String PROPERTIES = "properties"; - public static final String PROP_KEY = "name"; - public static final String PROP_VALUE = "value"; - public static final String PROP_TYPE = "type"; + /** + * Gdb Vertex/Edge elements + */ + public static final String ID = "id"; + public static final String FROM = "from"; + public static final String TO = "to"; + public static final String PROPERTIES = "properties"; + public static final String PROP_KEY = "name"; + public static final String PROP_VALUE = "value"; + public static final String PROP_TYPE = "type"; - public static final String PROPERTIES_JSON_STR = "propertiesJsonStr"; - public static final String MAX_PROPERTIES_BATCH_NUM = "maxPropertiesBatchNumber"; + public static final String PROPERTIES_JSON_STR = "propertiesJsonStr"; + public static final String MAX_PROPERTIES_BATCH_NUM = "maxPropertiesBatchNumber"; - /** - * session less client configure for connect pool - */ - public static final String MAX_IN_PROCESS_PER_CONNECTION = "maxInProcessPerConnection"; - public static final String MAX_CONNECTION_POOL_SIZE = "maxConnectionPoolSize"; - public static final String MAX_SIMULTANEOUS_USAGE_PER_CONNECTION = "maxSimultaneousUsagePerConnection"; + /** + * session less client configure for connect pool + */ + public static final String MAX_IN_PROCESS_PER_CONNECTION = "maxInProcessPerConnection"; + public static final String MAX_CONNECTION_POOL_SIZE = "maxConnectionPoolSize"; + public static final String MAX_SIMULTANEOUS_USAGE_PER_CONNECTION = "maxSimultaneousUsagePerConnection"; - public static final String MAX_RECORDS_IN_BATCH = "maxRecordsInBatch"; - public static final String SESSION_STATE = "session"; + public static final String MAX_RECORDS_IN_BATCH = "maxRecordsInBatch"; + public static final String SESSION_STATE = "session"; - public static enum ImportType { - /** - * Import vertices - */ - VERTEX, - /** - * Import edges - */ - EDGE; - } - - public static enum UpdateMode { - /** - * Insert new records, fail if exists - */ - INSERT, - /** - * Skip this record if exists - */ - SKIP, - /** - * Update property of this record if exists - */ - MERGE; - } + /** + * request length limit, include gdb element string length GDB字段长度限制配置,可分别配置各字段的限制,超过限制的记录会当脏数据处理 + */ + public static final String MAX_GDB_STRING_LENGTH = "maxStringLengthLimit"; + public static final String MAX_GDB_ID_LENGTH = "maxIdStringLengthLimit"; + public static final String MAX_GDB_LABEL_LENGTH = "maxLabelStringLengthLimit"; + public static final String MAX_GDB_PROP_KEY_LENGTH = "maxPropKeyStringLengthLimit"; + public static final String MAX_GDB_PROP_VALUE_LENGTH = "maxPropValueStringLengthLimit"; - public static enum ColumnType { - /** - * vertex or edge id - */ - primaryKey, + public static final String MAX_GDB_REQUEST_LENGTH = "maxRequestLengthLimit"; - /** - * vertex property - */ - vertexProperty, + public static enum ImportType { + /** + * Import vertices + */ + VERTEX, + /** + * Import edges + */ + EDGE; + } - /** - * start vertex id of edge - */ - srcPrimaryKey, + public static enum UpdateMode { + /** + * Insert new records, fail if exists + */ + INSERT, + /** + * Skip this record if exists + */ + SKIP, + /** + * Update property of this record if exists + */ + MERGE; + } - /** - * end vertex id of edge - */ - dstPrimaryKey, + public static enum ColumnType { + /** + * vertex or edge id + */ + primaryKey, - /** - * edge property - */ - edgeProperty, + /** + * vertex property + */ + vertexProperty, - /** - * vertex json style property - */ - vertexJsonProperty, + /** + * vertex setProperty + */ + vertexSetProperty, - /** - * edge json style property - */ - edgeJsonProperty - } + /** + * start vertex id of edge + */ + srcPrimaryKey, - public static enum IdTransRule { - /** - * vertex or edge id with 'label' prefix - */ - labelPrefix, + /** + * end vertex id of edge + */ + dstPrimaryKey, - /** - * vertex or edge id raw - */ - none - } + /** + * edge property + */ + edgeProperty, + + /** + * vertex json style property + */ + vertexJsonProperty, + + /** + * edge json style property + */ + edgeJsonProperty + } + + public static enum IdTransRule { + /** + * vertex or edge id with 'label' prefix + */ + labelPrefix, + + /** + * vertex or edge id raw + */ + none + } + + public static enum PropertyType { + /** + * single Vertex Property + */ + single, + + /** + * set Vertex Property + */ + set + } } diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/client/GdbGraphManager.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/client/GdbGraphManager.java index ac06013c..53668127 100644 --- a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/client/GdbGraphManager.java +++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/client/GdbGraphManager.java @@ -3,37 +3,37 @@ */ package com.alibaba.datax.plugin.writer.gdbwriter.client; +import java.util.ArrayList; +import java.util.List; + import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.plugin.writer.gdbwriter.model.GdbGraph; import com.alibaba.datax.plugin.writer.gdbwriter.model.ScriptGdbGraph; -import java.util.ArrayList; -import java.util.List; - /** * @author jerrywang * */ public class GdbGraphManager implements AutoCloseable { - private static final GdbGraphManager instance = new GdbGraphManager(); - - private List graphs = new ArrayList<>(); - - public static GdbGraphManager instance() { - return instance; - } + private static final GdbGraphManager INSTANCE = new GdbGraphManager(); - public GdbGraph getGraph(Configuration config, boolean session) { - GdbGraph graph = new ScriptGdbGraph(config, session); - graphs.add(graph); - return graph; - } + private List graphs = new ArrayList<>(); - @Override - public void close() { - for(GdbGraph graph : graphs) { - graph.close(); - } - graphs.clear(); - } + public static GdbGraphManager instance() { + return INSTANCE; + } + + public GdbGraph getGraph(final Configuration config, final boolean session) { + final GdbGraph graph = new ScriptGdbGraph(config, session); + this.graphs.add(graph); + return graph; + } + + @Override + public void close() { + for (final GdbGraph graph : this.graphs) { + graph.close(); + } + this.graphs.clear(); + } } diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/client/GdbWriterConfig.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/client/GdbWriterConfig.java index 0266a010..dbc68b90 100644 --- a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/client/GdbWriterConfig.java +++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/client/GdbWriterConfig.java @@ -3,39 +3,43 @@ */ package com.alibaba.datax.plugin.writer.gdbwriter.client; +import static com.alibaba.datax.plugin.writer.gdbwriter.util.ConfigHelper.assertConfig; +import static com.alibaba.datax.plugin.writer.gdbwriter.util.ConfigHelper.assertHasContent; + import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.plugin.writer.gdbwriter.Key; -import static com.alibaba.datax.plugin.writer.gdbwriter.util.ConfigHelper.*; - /** * @author jerrywang * */ public class GdbWriterConfig { - public static final int DEFAULT_MAX_IN_PROCESS_PER_CONNECTION = 4; - public static final int DEFAULT_MAX_CONNECTION_POOL_SIZE = 8; - public static final int DEFAULT_MAX_SIMULTANEOUS_USAGE_PER_CONNECTION = 8; - public static final int DEFAULT_BATCH_PROPERTY_NUM = 30; - public static final int DEFAULT_RECORD_NUM_IN_BATCH = 16; + public static final int DEFAULT_MAX_IN_PROCESS_PER_CONNECTION = 4; + public static final int DEFAULT_MAX_CONNECTION_POOL_SIZE = 8; + public static final int DEFAULT_MAX_SIMULTANEOUS_USAGE_PER_CONNECTION = 8; + public static final int DEFAULT_BATCH_PROPERTY_NUM = 30; + public static final int DEFAULT_RECORD_NUM_IN_BATCH = 16; - private Configuration config; + public static final int MAX_STRING_LENGTH = 10240; + public static final int MAX_REQUEST_LENGTH = 65535 - 1000; - private GdbWriterConfig(Configuration config) { - this.config = config; + private Configuration config; - validate(); - } + private GdbWriterConfig(final Configuration config) { + this.config = config; - private void validate() { - assertHasContent(config, Key.HOST); - assertConfig(Key.PORT, () -> config.getInt(Key.PORT) > 0); + validate(); + } - assertHasContent(config, Key.USERNAME); - assertHasContent(config, Key.PASSWORD); - } - - public static GdbWriterConfig of(Configuration config) { - return new GdbWriterConfig(config); - } + public static GdbWriterConfig of(final Configuration config) { + return new GdbWriterConfig(config); + } + + private void validate() { + assertHasContent(this.config, Key.HOST); + assertConfig(Key.PORT, () -> this.config.getInt(Key.PORT) > 0); + + assertHasContent(this.config, Key.USERNAME); + assertHasContent(this.config, Key.PASSWORD); + } } diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/DefaultGdbMapper.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/DefaultGdbMapper.java index f5957295..73a94cf5 100644 --- a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/DefaultGdbMapper.java +++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/DefaultGdbMapper.java @@ -3,6 +3,8 @@ */ package com.alibaba.datax.plugin.writer.gdbwriter.mapping; +import static com.alibaba.datax.plugin.writer.gdbwriter.Key.ImportType.VERTEX; + import java.util.ArrayList; import java.util.List; import java.util.UUID; @@ -12,179 +14,191 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import com.alibaba.datax.common.element.Record; -import com.alibaba.fastjson.JSONArray; -import com.alibaba.fastjson.JSONObject; +import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.plugin.writer.gdbwriter.Key; import com.alibaba.datax.plugin.writer.gdbwriter.model.GdbEdge; import com.alibaba.datax.plugin.writer.gdbwriter.model.GdbElement; import com.alibaba.datax.plugin.writer.gdbwriter.model.GdbVertex; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; -import static com.alibaba.datax.plugin.writer.gdbwriter.Key.ImportType.VERTEX; - /** * @author jerrywang * */ @Slf4j public class DefaultGdbMapper implements GdbMapper { - private static final Pattern STR_PATTERN = Pattern.compile("\\$\\{(\\d+)}"); - private static final Pattern NORMAL_PATTERN = Pattern.compile("^\\$\\{(\\d+)}$"); + private static final Pattern STR_DOLLAR_PATTERN = Pattern.compile("\\$\\{(\\d+)}"); + private static final Pattern NORMAL_DOLLAR_PATTERN = Pattern.compile("^\\$\\{(\\d+)}$"); - @Override - public Function getMapper(MappingRule rule) { - return r -> { - GdbElement e = (rule.getImportType() == VERTEX) ? new GdbVertex() : new GdbEdge(); - forElement(rule).accept(r, e); - return e; + private static final Pattern STR_NUM_PATTERN = Pattern.compile("#\\{(\\d+)}"); + private static final Pattern NORMAL_NUM_PATTERN = Pattern.compile("^#\\{(\\d+)}$"); + + public DefaultGdbMapper() {} + + public DefaultGdbMapper(final Configuration config) { + MapperConfig.getInstance().updateConfig(config); + } + + private static BiConsumer forElement(final MappingRule rule) { + final boolean numPattern = rule.isNumPattern(); + final List> properties = new ArrayList<>(); + for (final MappingRule.PropertyMappingRule propRule : rule.getProperties()) { + final Function keyFunc = forStrColumn(numPattern, propRule.getKey()); + + if (propRule.getValueType() == ValueType.STRING) { + final Function valueFunc = forStrColumn(numPattern, propRule.getValue()); + properties.add((r, e) -> { + e.addProperty(keyFunc.apply(r), valueFunc.apply(r), propRule.getPType()); + }); + } else { + final Function valueFunc = + forObjColumn(numPattern, propRule.getValue(), propRule.getValueType()); + properties.add((r, e) -> { + e.addProperty(keyFunc.apply(r), valueFunc.apply(r), propRule.getPType()); + }); + } + } + + if (rule.getPropertiesJsonStr() != null) { + final Function jsonFunc = forStrColumn(numPattern, rule.getPropertiesJsonStr()); + properties.add((r, e) -> { + final String propertiesStr = jsonFunc.apply(r); + final JSONObject root = (JSONObject)JSONObject.parse(propertiesStr); + final JSONArray propertiesList = root.getJSONArray("properties"); + + for (final Object object : propertiesList) { + final JSONObject jsonObject = (JSONObject)object; + final String key = jsonObject.getString("k"); + final String name = jsonObject.getString("v"); + final String type = jsonObject.getString("t"); + final String card = jsonObject.getString("c"); + + if (key == null || name == null) { + continue; + } + addToProperties(e, key, name, type, card); + } + }); + } + + final BiConsumer ret = (r, e) -> { + final String label = forStrColumn(numPattern, rule.getLabel()).apply(r); + String id = forStrColumn(numPattern, rule.getId()).apply(r); + + if (rule.getImportType() == Key.ImportType.EDGE) { + final String to = forStrColumn(numPattern, rule.getTo()).apply(r); + final String from = forStrColumn(numPattern, rule.getFrom()).apply(r); + if (to == null || from == null) { + log.error("invalid record to: {} , from: {}", to, from); + throw new IllegalArgumentException("to or from missed in edge"); + } + ((GdbEdge)e).setTo(to); + ((GdbEdge)e).setFrom(from); + + // generate UUID for edge + if (id == null) { + id = UUID.randomUUID().toString(); + } + } + + if (id == null || label == null) { + log.error("invalid record id: {} , label: {}", id, label); + throw new IllegalArgumentException("id or label missed"); + } + + e.setId(id); + e.setLabel(label); + + properties.forEach(p -> p.accept(r, e)); }; - } + return ret; + } - private static BiConsumer forElement(MappingRule rule) { - List> properties = new ArrayList<>(); - for (MappingRule.PropertyMappingRule propRule : rule.getProperties()) { - Function keyFunc = forStrColumn(propRule.getKey()); + private static Function forObjColumn(final boolean numPattern, final String rule, final ValueType type) { + final Pattern pattern = numPattern ? NORMAL_NUM_PATTERN : NORMAL_DOLLAR_PATTERN; + final Matcher m = pattern.matcher(rule); + if (m.matches()) { + final int index = Integer.valueOf(m.group(1)); + return r -> type.applyColumn(r.getColumn(index)); + } else { + return r -> type.fromStrFunc(rule); + } + } - if (propRule.getValueType() == ValueType.STRING) { - final Function valueFunc = forStrColumn(propRule.getValue()); - properties.add((r, e) -> { - String k = keyFunc.apply(r); - String v = valueFunc.apply(r); - if (k != null && v != null) { - e.getProperties().put(k, v); - } - }); - } else { - final Function valueFunc = forObjColumn(propRule.getValue(), propRule.getValueType()); - properties.add((r, e) -> { - String k = keyFunc.apply(r); - Object v = valueFunc.apply(r); - if (k != null && v != null) { - e.getProperties().put(k, v); - } - }); - } - } + private static Function forStrColumn(final boolean numPattern, final String rule) { + final List> list = new ArrayList<>(); + final Pattern pattern = numPattern ? STR_NUM_PATTERN : STR_DOLLAR_PATTERN; + final Matcher m = pattern.matcher(rule); + int last = 0; + while (m.find()) { + final String index = m.group(1); + // as simple integer index. + final int i = Integer.parseInt(index); - if (rule.getPropertiesJsonStr() != null) { - Function jsonFunc = forStrColumn(rule.getPropertiesJsonStr()); - properties.add((r, e) -> { - String propertiesStr = jsonFunc.apply(r); - JSONObject root = (JSONObject)JSONObject.parse(propertiesStr); - JSONArray propertiesList = root.getJSONArray("properties"); + final int tmp = last; + final int start = m.start(); + list.add((sb, record) -> { + sb.append(rule.subSequence(tmp, start)); + if (record.getColumn(i) != null && record.getColumn(i).getByteSize() > 0) { + sb.append(record.getColumn(i).asString()); + } + }); - for (Object object : propertiesList) { - JSONObject jsonObject = (JSONObject)object; - String key = jsonObject.getString("k"); - String name = jsonObject.getString("v"); - String type = jsonObject.getString("t"); + last = m.end(); + } - if (key == null || name == null) { - continue; - } - addToProperties(e, key, name, type); - } - }); - } + final int tmp = last; + list.add((sb, record) -> { + sb.append(rule.subSequence(tmp, rule.length())); + }); - BiConsumer ret = (r, e) -> { - String label = forStrColumn(rule.getLabel()).apply(r); - String id = forStrColumn(rule.getId()).apply(r); + return r -> { + final StringBuilder sb = new StringBuilder(); + list.forEach(c -> c.accept(sb, r)); + final String res = sb.toString(); + return res.isEmpty() ? null : res; + }; + } - if (rule.getImportType() == Key.ImportType.EDGE) { - String to = forStrColumn(rule.getTo()).apply(r); - String from = forStrColumn(rule.getFrom()).apply(r); - if (to == null || from == null) { - log.error("invalid record to: {} , from: {}", to, from); - throw new IllegalArgumentException("to or from missed in edge"); - } - ((GdbEdge)e).setTo(to); - ((GdbEdge)e).setFrom(from); + private static boolean addToProperties(final GdbElement e, final String key, final String value, final String type, final String card) { + final Object pValue; + final ValueType valueType = ValueType.fromShortName(type); - // generate UUID for edge - if (id == null) { - id = UUID.randomUUID().toString(); - } - } + if (valueType == ValueType.STRING) { + pValue = value; + } else if (valueType == ValueType.INT || valueType == ValueType.INTEGER) { + pValue = Integer.valueOf(value); + } else if (valueType == ValueType.LONG) { + pValue = Long.valueOf(value); + } else if (valueType == ValueType.DOUBLE) { + pValue = Double.valueOf(value); + } else if (valueType == ValueType.FLOAT) { + pValue = Float.valueOf(value); + } else if (valueType == ValueType.BOOLEAN) { + pValue = Boolean.valueOf(value); + } else { + log.error("invalid property key {}, value {}, type {}", key, value, type); + return false; + } - if (id == null || label == null) { - log.error("invalid record id: {} , label: {}", id, label); - throw new IllegalArgumentException("id or label missed"); - } + // apply vertexSetProperty + if (Key.PropertyType.set.name().equals(card) && (e instanceof GdbVertex)) { + e.addProperty(key, pValue, Key.PropertyType.set); + } else { + e.addProperty(key, pValue); + } + return true; + } - e.setId(id); - e.setLabel(label); - - properties.forEach(p -> p.accept(r, e)); - }; - return ret; - } - - static Function forObjColumn(String rule, ValueType type) { - Matcher m = NORMAL_PATTERN.matcher(rule); - if (m.matches()) { - int index = Integer.valueOf(m.group(1)); - return r -> type.applyColumn(r.getColumn(index)); - } else { - return r -> type.fromStrFunc(rule); - } - } - - static Function forStrColumn(String rule) { - List> list = new ArrayList<>(); - Matcher m = STR_PATTERN.matcher(rule); - int last = 0; - while (m.find()) { - String index = m.group(1); - // as simple integer index. - int i = Integer.parseInt(index); - - final int tmp = last; - final int start = m.start(); - list.add((sb, record) -> { - sb.append(rule.subSequence(tmp, start)); - if(record.getColumn(i) != null && record.getColumn(i).getByteSize() > 0) { - sb.append(record.getColumn(i).asString()); - } - }); - - last = m.end(); - } - - final int tmp = last; - list.add((sb, record) -> { - sb.append(rule.subSequence(tmp, rule.length())); - }); - - return r -> { - StringBuilder sb = new StringBuilder(); - list.forEach(c -> c.accept(sb, r)); - String res = sb.toString(); - return res.isEmpty() ? null : res; - }; - } - - static boolean addToProperties(GdbElement e, String key, String value, String type) { - ValueType valueType = ValueType.fromShortName(type); - - if(valueType == ValueType.STRING) { - e.getProperties().put(key, value); - } else if (valueType == ValueType.INT) { - e.getProperties().put(key, Integer.valueOf(value)); - } else if (valueType == ValueType.LONG) { - e.getProperties().put(key, Long.valueOf(value)); - } else if (valueType == ValueType.DOUBLE) { - e.getProperties().put(key, Double.valueOf(value)); - } else if (valueType == ValueType.FLOAT) { - e.getProperties().put(key, Float.valueOf(value)); - } else if (valueType == ValueType.BOOLEAN) { - e.getProperties().put(key, Boolean.valueOf(value)); - } else { - log.error("invalid property key {}, value {}, type {}", key, value, type); - return false; - } - - return true; - } + @Override + public Function getMapper(final MappingRule rule) { + return r -> { + final GdbElement e = (rule.getImportType() == VERTEX) ? new GdbVertex() : new GdbEdge(); + forElement(rule).accept(r, e); + return e; + }; + } } diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/GdbMapper.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/GdbMapper.java index 3282f203..6a717a95 100644 --- a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/GdbMapper.java +++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/GdbMapper.java @@ -13,5 +13,5 @@ import com.alibaba.datax.plugin.writer.gdbwriter.model.GdbElement; * */ public interface GdbMapper { - Function getMapper(MappingRule rule); + Function getMapper(MappingRule rule); } diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/MapperConfig.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/MapperConfig.java new file mode 100644 index 00000000..241cd31a --- /dev/null +++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/MapperConfig.java @@ -0,0 +1,68 @@ +/* + * (C) 2019-present Alibaba Group Holding Limited. + * + * This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public + * License version 2 as published by the Free Software Foundation. + */ +package com.alibaba.datax.plugin.writer.gdbwriter.mapping; + +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.writer.gdbwriter.Key; +import com.alibaba.datax.plugin.writer.gdbwriter.client.GdbWriterConfig; + +/** + * @author : Liu Jianping + * @date : 2019/10/15 + */ + +public class MapperConfig { + private static MapperConfig instance = new MapperConfig(); + private int maxIdLength; + private int maxLabelLength; + private int maxPropKeyLength; + private int maxPropValueLength; + + private MapperConfig() { + this.maxIdLength = GdbWriterConfig.MAX_STRING_LENGTH; + this.maxLabelLength = GdbWriterConfig.MAX_STRING_LENGTH; + this.maxPropKeyLength = GdbWriterConfig.MAX_STRING_LENGTH; + this.maxPropValueLength = GdbWriterConfig.MAX_STRING_LENGTH; + } + + public static MapperConfig getInstance() { + return instance; + } + + public void updateConfig(final Configuration config) { + final int length = config.getInt(Key.MAX_GDB_STRING_LENGTH, GdbWriterConfig.MAX_STRING_LENGTH); + + Integer sLength = config.getInt(Key.MAX_GDB_ID_LENGTH); + this.maxIdLength = sLength == null ? length : sLength; + + sLength = config.getInt(Key.MAX_GDB_LABEL_LENGTH); + this.maxLabelLength = sLength == null ? length : sLength; + + sLength = config.getInt(Key.MAX_GDB_PROP_KEY_LENGTH); + this.maxPropKeyLength = sLength == null ? length : sLength; + + sLength = config.getInt(Key.MAX_GDB_PROP_VALUE_LENGTH); + this.maxPropValueLength = sLength == null ? length : sLength; + } + + public int getMaxIdLength() { + return this.maxIdLength; + } + + public int getMaxLabelLength() { + return this.maxLabelLength; + } + + public int getMaxPropKeyLength() { + return this.maxPropKeyLength; + } + + public int getMaxPropValueLength() { + return this.maxPropValueLength; + } + +} diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/MappingRule.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/MappingRule.java index c0c58d88..971fd6da 100644 --- a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/MappingRule.java +++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/MappingRule.java @@ -7,6 +7,7 @@ import java.util.ArrayList; import java.util.List; import com.alibaba.datax.plugin.writer.gdbwriter.Key.ImportType; +import com.alibaba.datax.plugin.writer.gdbwriter.Key.PropertyType; import lombok.Data; @@ -16,26 +17,30 @@ import lombok.Data; */ @Data public class MappingRule { - private String id = null; + private String id = null; - private String label = null; - - private ImportType importType = null; - - private String from = null; + private String label = null; - private String to = null; + private ImportType importType = null; - private List properties = new ArrayList<>(); + private String from = null; - private String propertiesJsonStr = null; + private String to = null; - @Data - public static class PropertyMappingRule { - private String key = null; - - private String value = null; - - private ValueType valueType = null; - } + private List properties = new ArrayList<>(); + + private String propertiesJsonStr = null; + + private boolean numPattern = false; + + @Data + public static class PropertyMappingRule { + private String key = null; + + private String value = null; + + private ValueType valueType = null; + + private PropertyType pType = PropertyType.single; + } } diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/MappingRuleFactory.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/MappingRuleFactory.java index 0738ac17..3e3a2afe 100644 --- a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/MappingRuleFactory.java +++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/MappingRuleFactory.java @@ -3,18 +3,21 @@ */ package com.alibaba.datax.plugin.writer.gdbwriter.mapping; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.plugin.writer.gdbwriter.GdbWriterErrorCode; import com.alibaba.datax.plugin.writer.gdbwriter.Key; -import com.alibaba.datax.plugin.writer.gdbwriter.Key.ImportType; -import com.alibaba.datax.plugin.writer.gdbwriter.Key.IdTransRule; import com.alibaba.datax.plugin.writer.gdbwriter.Key.ColumnType; +import com.alibaba.datax.plugin.writer.gdbwriter.Key.IdTransRule; +import com.alibaba.datax.plugin.writer.gdbwriter.Key.ImportType; import com.alibaba.datax.plugin.writer.gdbwriter.mapping.MappingRule.PropertyMappingRule; import com.alibaba.datax.plugin.writer.gdbwriter.util.ConfigHelper; -import lombok.extern.slf4j.Slf4j; -import java.util.List; +import lombok.extern.slf4j.Slf4j; /** * @author jerrywang @@ -22,66 +25,94 @@ import java.util.List; */ @Slf4j public class MappingRuleFactory { - private static final MappingRuleFactory instance = new MappingRuleFactory(); - - public static final MappingRuleFactory getInstance() { - return instance; - } + private static final MappingRuleFactory instance = new MappingRuleFactory(); + private static final Pattern STR_PATTERN = Pattern.compile("\\$\\{(\\d+)}"); + private static final Pattern STR_NUM_PATTERN = Pattern.compile("#\\{(\\d+)}"); - @Deprecated - public MappingRule create(Configuration config, ImportType type) { - MappingRule rule = new MappingRule(); - rule.setId(config.getString(Key.ID)); - rule.setLabel(config.getString(Key.LABEL)); - if (type == ImportType.EDGE) { - rule.setFrom(config.getString(Key.FROM)); - rule.setTo(config.getString(Key.TO)); - } - - rule.setImportType(type); - - List configurations = config.getListConfiguration(Key.PROPERTIES); - if (configurations != null) { - for (Configuration prop : config.getListConfiguration(Key.PROPERTIES)) { - PropertyMappingRule propRule = new PropertyMappingRule(); - propRule.setKey(prop.getString(Key.PROP_KEY)); - propRule.setValue(prop.getString(Key.PROP_VALUE)); - propRule.setValueType(ValueType.fromShortName(prop.getString(Key.PROP_TYPE).toLowerCase())); - rule.getProperties().add(propRule); - } - } - - String propertiesJsonStr = config.getString(Key.PROPERTIES_JSON_STR, null); - if (propertiesJsonStr != null) { - rule.setPropertiesJsonStr(propertiesJsonStr); - } - - return rule; - } - - public MappingRule createV2(Configuration config) { - try { - ImportType type = ImportType.valueOf(config.getString(Key.IMPORT_TYPE)); - return createV2(config, type); - } catch (NullPointerException e) { - throw DataXException.asDataXException(GdbWriterErrorCode.CONFIG_ITEM_MISS, Key.IMPORT_TYPE); - } catch (IllegalArgumentException e) { - throw DataXException.asDataXException(GdbWriterErrorCode.BAD_CONFIG_VALUE, Key.IMPORT_TYPE); - } + public static MappingRuleFactory getInstance() { + return instance; } - public MappingRule createV2(Configuration config, ImportType type) { - MappingRule rule = new MappingRule(); + private static boolean isPattern(final String value, final MappingRule rule, final boolean checked) { + if (checked) { + return true; + } - ConfigHelper.assertHasContent(config, Key.LABEL); - rule.setLabel(config.getString(Key.LABEL)); - rule.setImportType(type); + if (value == null || value.isEmpty()) { + return false; + } - IdTransRule srcTransRule = IdTransRule.none; + Matcher m = STR_PATTERN.matcher(value); + if (m.find()) { + rule.setNumPattern(false); + return true; + } + + m = STR_NUM_PATTERN.matcher(value); + if (m.find()) { + rule.setNumPattern(true); + return true; + } + + return false; + } + + @Deprecated + public MappingRule create(final Configuration config, final ImportType type) { + final MappingRule rule = new MappingRule(); + rule.setId(config.getString(Key.ID)); + rule.setLabel(config.getString(Key.LABEL)); + if (type == ImportType.EDGE) { + rule.setFrom(config.getString(Key.FROM)); + rule.setTo(config.getString(Key.TO)); + } + + rule.setImportType(type); + + final List configurations = config.getListConfiguration(Key.PROPERTIES); + if (configurations != null) { + for (final Configuration prop : config.getListConfiguration(Key.PROPERTIES)) { + final PropertyMappingRule propRule = new PropertyMappingRule(); + propRule.setKey(prop.getString(Key.PROP_KEY)); + propRule.setValue(prop.getString(Key.PROP_VALUE)); + propRule.setValueType(ValueType.fromShortName(prop.getString(Key.PROP_TYPE).toLowerCase())); + rule.getProperties().add(propRule); + } + } + + final String propertiesJsonStr = config.getString(Key.PROPERTIES_JSON_STR, null); + if (propertiesJsonStr != null) { + rule.setPropertiesJsonStr(propertiesJsonStr); + } + + return rule; + } + + public MappingRule createV2(final Configuration config) { + try { + final ImportType type = ImportType.valueOf(config.getString(Key.IMPORT_TYPE)); + return createV2(config, type); + } catch (final NullPointerException e) { + throw DataXException.asDataXException(GdbWriterErrorCode.CONFIG_ITEM_MISS, Key.IMPORT_TYPE); + } catch (final IllegalArgumentException e) { + throw DataXException.asDataXException(GdbWriterErrorCode.BAD_CONFIG_VALUE, Key.IMPORT_TYPE); + } + } + + public MappingRule createV2(final Configuration config, final ImportType type) { + final MappingRule rule = new MappingRule(); + boolean patternChecked = false; + + ConfigHelper.assertHasContent(config, Key.LABEL); + rule.setLabel(config.getString(Key.LABEL)); + rule.setImportType(type); + patternChecked = isPattern(rule.getLabel(), rule, patternChecked); + + IdTransRule srcTransRule = IdTransRule.none; IdTransRule dstTransRule = IdTransRule.none; if (type == ImportType.EDGE) { - ConfigHelper.assertHasContent(config, Key.SRC_ID_TRANS_RULE); - ConfigHelper.assertHasContent(config, Key.DST_ID_TRANS_RULE); + ConfigHelper.assertHasContent(config, Key.SRC_ID_TRANS_RULE); + ConfigHelper.assertHasContent(config, Key.DST_ID_TRANS_RULE); srcTransRule = IdTransRule.valueOf(config.getString(Key.SRC_ID_TRANS_RULE)); dstTransRule = IdTransRule.valueOf(config.getString(Key.DST_ID_TRANS_RULE)); @@ -94,88 +125,96 @@ public class MappingRuleFactory { ConfigHelper.assertHasContent(config, Key.DST_LABEL); } } - ConfigHelper.assertHasContent(config, Key.ID_TRANS_RULE); - IdTransRule transRule = IdTransRule.valueOf(config.getString(Key.ID_TRANS_RULE)); + ConfigHelper.assertHasContent(config, Key.ID_TRANS_RULE); + final IdTransRule transRule = IdTransRule.valueOf(config.getString(Key.ID_TRANS_RULE)); - List configurationList = config.getListConfiguration(Key.COLUMN); - ConfigHelper.assertConfig(Key.COLUMN, () -> (configurationList != null && !configurationList.isEmpty())); - for (Configuration column : configurationList) { - ConfigHelper.assertHasContent(column, Key.COLUMN_NAME); - ConfigHelper.assertHasContent(column, Key.COLUMN_VALUE); - ConfigHelper.assertHasContent(column, Key.COLUMN_TYPE); - ConfigHelper.assertHasContent(column, Key.COLUMN_NODE_TYPE); + final List configurationList = config.getListConfiguration(Key.COLUMN); + ConfigHelper.assertConfig(Key.COLUMN, () -> (configurationList != null && !configurationList.isEmpty())); + for (final Configuration column : configurationList) { + ConfigHelper.assertHasContent(column, Key.COLUMN_NAME); + ConfigHelper.assertHasContent(column, Key.COLUMN_VALUE); + ConfigHelper.assertHasContent(column, Key.COLUMN_TYPE); + ConfigHelper.assertHasContent(column, Key.COLUMN_NODE_TYPE); - String columnValue = column.getString(Key.COLUMN_VALUE); - ColumnType columnType = ColumnType.valueOf(column.getString(Key.COLUMN_NODE_TYPE)); - if (columnValue == null || columnValue.isEmpty()) { - // only allow edge empty id - ConfigHelper.assertConfig("empty column value", - () -> (type == ImportType.EDGE && columnType == ColumnType.primaryKey)); - } + final String columnValue = column.getString(Key.COLUMN_VALUE); + final ColumnType columnType = ColumnType.valueOf(column.getString(Key.COLUMN_NODE_TYPE)); + if (columnValue == null || columnValue.isEmpty()) { + // only allow edge empty id + ConfigHelper.assertConfig("empty column value", + () -> (type == ImportType.EDGE && columnType == ColumnType.primaryKey)); + } + patternChecked = isPattern(columnValue, rule, patternChecked); - if (columnType == ColumnType.primaryKey) { - ValueType propType = ValueType.fromShortName(column.getString(Key.COLUMN_TYPE)); - ConfigHelper.assertConfig("only string is allowed in primary key", () -> (propType == ValueType.STRING)); + if (columnType == ColumnType.primaryKey) { + final ValueType propType = ValueType.fromShortName(column.getString(Key.COLUMN_TYPE)); + ConfigHelper.assertConfig("only string is allowed in primary key", + () -> (propType == ValueType.STRING)); - if (transRule == IdTransRule.labelPrefix) { - rule.setId(config.getString(Key.LABEL) + columnValue); - } else { - rule.setId(columnValue); - } - } else if (columnType == ColumnType.edgeJsonProperty || columnType == ColumnType.vertexJsonProperty) { - // only support one json property in column - ConfigHelper.assertConfig("multi JsonProperty", () -> (rule.getPropertiesJsonStr() == null)); - - rule.setPropertiesJsonStr(columnValue); - } else if (columnType == ColumnType.vertexProperty || columnType == ColumnType.edgeProperty) { - PropertyMappingRule propertyMappingRule = new PropertyMappingRule(); - - propertyMappingRule.setKey(column.getString(Key.COLUMN_NAME)); - propertyMappingRule.setValue(columnValue); - ValueType propType = ValueType.fromShortName(column.getString(Key.COLUMN_TYPE)); - ConfigHelper.assertConfig("unsupported property type", () -> propType != null); - - propertyMappingRule.setValueType(propType); - rule.getProperties().add(propertyMappingRule); - } else if (columnType == ColumnType.srcPrimaryKey) { - if (type != ImportType.EDGE) { - continue; - } - - ValueType propType = ValueType.fromShortName(column.getString(Key.COLUMN_TYPE)); - ConfigHelper.assertConfig("only string is allowed in primary key", () -> (propType == ValueType.STRING)); - - if (srcTransRule == IdTransRule.labelPrefix) { - rule.setFrom(config.getString(Key.SRC_LABEL) + columnValue); + if (transRule == IdTransRule.labelPrefix) { + rule.setId(config.getString(Key.LABEL) + columnValue); } else { - rule.setFrom(columnValue); + rule.setId(columnValue); } - } else if (columnType == ColumnType.dstPrimaryKey) { + } else if (columnType == ColumnType.edgeJsonProperty || columnType == ColumnType.vertexJsonProperty) { + // only support one json property in column + ConfigHelper.assertConfig("multi JsonProperty", () -> (rule.getPropertiesJsonStr() == null)); + + rule.setPropertiesJsonStr(columnValue); + } else if (columnType == ColumnType.vertexProperty || columnType == ColumnType.edgeProperty + || columnType == ColumnType.vertexSetProperty) { + final PropertyMappingRule propertyMappingRule = new PropertyMappingRule(); + + propertyMappingRule.setKey(column.getString(Key.COLUMN_NAME)); + propertyMappingRule.setValue(columnValue); + final ValueType propType = ValueType.fromShortName(column.getString(Key.COLUMN_TYPE)); + ConfigHelper.assertConfig("unsupported property type", () -> propType != null); + + if (columnType == ColumnType.vertexSetProperty) { + propertyMappingRule.setPType(Key.PropertyType.set); + } + propertyMappingRule.setValueType(propType); + rule.getProperties().add(propertyMappingRule); + } else if (columnType == ColumnType.srcPrimaryKey) { if (type != ImportType.EDGE) { continue; } - ValueType propType = ValueType.fromShortName(column.getString(Key.COLUMN_TYPE)); - ConfigHelper.assertConfig("only string is allowed in primary key", () -> (propType == ValueType.STRING)); + final ValueType propType = ValueType.fromShortName(column.getString(Key.COLUMN_TYPE)); + ConfigHelper.assertConfig("only string is allowed in primary key", + () -> (propType == ValueType.STRING)); + + if (srcTransRule == IdTransRule.labelPrefix) { + rule.setFrom(config.getString(Key.SRC_LABEL) + columnValue); + } else { + rule.setFrom(columnValue); + } + } else if (columnType == ColumnType.dstPrimaryKey) { + if (type != ImportType.EDGE) { + continue; + } + + final ValueType propType = ValueType.fromShortName(column.getString(Key.COLUMN_TYPE)); + ConfigHelper.assertConfig("only string is allowed in primary key", + () -> (propType == ValueType.STRING)); if (dstTransRule == IdTransRule.labelPrefix) { rule.setTo(config.getString(Key.DST_LABEL) + columnValue); } else { rule.setTo(columnValue); } - } - } + } + } - if (rule.getImportType() == ImportType.EDGE) { - if (rule.getId() == null) { - rule.setId(""); - log.info("edge id is missed, uuid be default"); - } - ConfigHelper.assertConfig("to needed in edge", () -> (rule.getTo() != null)); - ConfigHelper.assertConfig("from needed in edge", () -> (rule.getFrom() != null)); - } - ConfigHelper.assertConfig("id needed", () -> (rule.getId() != null)); + if (rule.getImportType() == ImportType.EDGE) { + if (rule.getId() == null) { + rule.setId(""); + log.info("edge id is missed, uuid be default"); + } + ConfigHelper.assertConfig("to needed in edge", () -> (rule.getTo() != null)); + ConfigHelper.assertConfig("from needed in edge", () -> (rule.getFrom() != null)); + } + ConfigHelper.assertConfig("id needed", () -> (rule.getId() != null)); - return rule; - } + return rule; + } } diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/ValueType.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/ValueType.java index 9ad8bd8d..969fda3b 100644 --- a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/ValueType.java +++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/ValueType.java @@ -8,6 +8,7 @@ import java.util.Map; import java.util.function.Function; import com.alibaba.datax.common.element.Column; + import lombok.extern.slf4j.Slf4j; /** @@ -16,56 +17,61 @@ import lombok.extern.slf4j.Slf4j; */ @Slf4j public enum ValueType { - INT(Integer.class, "int", Column::asLong, Integer::valueOf), - LONG(Long.class, "long", Column::asLong, Long::valueOf), - DOUBLE(Double.class, "double", Column::asDouble, Double::valueOf), - FLOAT(Float.class, "float", Column::asDouble, Float::valueOf), - BOOLEAN(Boolean.class, "boolean", Column::asBoolean, Boolean::valueOf), - STRING(String.class, "string", Column::asString, String::valueOf); + /** + * property value type + */ + INT(Integer.class, "int", Column::asLong, Integer::valueOf), + INTEGER(Integer.class, "integer", Column::asLong, Integer::valueOf), + LONG(Long.class, "long", Column::asLong, Long::valueOf), + DOUBLE(Double.class, "double", Column::asDouble, Double::valueOf), + FLOAT(Float.class, "float", Column::asDouble, Float::valueOf), + BOOLEAN(Boolean.class, "boolean", Column::asBoolean, Boolean::valueOf), + STRING(String.class, "string", Column::asString, String::valueOf); - private Class type = null; - private String shortName = null; - private Function columnFunc = null; - private Function fromStrFunc = null; + private Class type = null; + private String shortName = null; + private Function columnFunc = null; + private Function fromStrFunc = null; - private ValueType(Class type, String name, Function columnFunc, Function fromStrFunc) { - this.type = type; - this.shortName = name; - this.columnFunc = columnFunc; - this.fromStrFunc = fromStrFunc; - - ValueTypeHolder.shortName2type.put(name, this); - } - - public static ValueType fromShortName(String name) { - return ValueTypeHolder.shortName2type.get(name); - } + private ValueType(final Class type, final String name, final Function columnFunc, + final Function fromStrFunc) { + this.type = type; + this.shortName = name; + this.columnFunc = columnFunc; + this.fromStrFunc = fromStrFunc; - public Class type() { - return this.type; - } - - public String shortName() { - return this.shortName; - } - - public Object applyColumn(Column column) { - try { - if (column == null) { - return null; - } - return columnFunc.apply(column); - } catch (Exception e) { - log.error("applyColumn error {}, column {}", e.toString(), column); - throw e; - } - } - - public Object fromStrFunc(String str) { - return fromStrFunc.apply(str); - } + ValueTypeHolder.shortName2type.put(name, this); + } - private static class ValueTypeHolder { - private static Map shortName2type = new HashMap<>(); - } + public static ValueType fromShortName(final String name) { + return ValueTypeHolder.shortName2type.get(name); + } + + public Class type() { + return this.type; + } + + public String shortName() { + return this.shortName; + } + + public Object applyColumn(final Column column) { + try { + if (column == null) { + return null; + } + return this.columnFunc.apply(column); + } catch (final Exception e) { + log.error("applyColumn error {}, column {}", e.toString(), column); + throw e; + } + } + + public Object fromStrFunc(final String str) { + return this.fromStrFunc.apply(str); + } + + private static class ValueTypeHolder { + private static Map shortName2type = new HashMap<>(); + } } diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/AbstractGdbGraph.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/AbstractGdbGraph.java index 0c31c644..038663ac 100644 --- a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/AbstractGdbGraph.java +++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/AbstractGdbGraph.java @@ -3,20 +3,24 @@ */ package com.alibaba.datax.plugin.writer.gdbwriter.model; -import com.alibaba.datax.common.util.Configuration; -import com.alibaba.datax.plugin.writer.gdbwriter.Key; -import com.alibaba.datax.plugin.writer.gdbwriter.client.GdbWriterConfig; +import static com.alibaba.datax.plugin.writer.gdbwriter.client.GdbWriterConfig.DEFAULT_BATCH_PROPERTY_NUM; +import static com.alibaba.datax.plugin.writer.gdbwriter.client.GdbWriterConfig.MAX_REQUEST_LENGTH; + +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; -import lombok.extern.slf4j.Slf4j; import org.apache.tinkerpop.gremlin.driver.Client; import org.apache.tinkerpop.gremlin.driver.Cluster; import org.apache.tinkerpop.gremlin.driver.RequestOptions; import org.apache.tinkerpop.gremlin.driver.ResultSet; import org.apache.tinkerpop.gremlin.driver.ser.Serializers; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.TimeUnit; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.writer.gdbwriter.Key; +import com.alibaba.datax.plugin.writer.gdbwriter.client.GdbWriterConfig; + +import lombok.extern.slf4j.Slf4j; /** * @author jerrywang @@ -24,128 +28,124 @@ import java.util.concurrent.TimeUnit; */ @Slf4j public abstract class AbstractGdbGraph implements GdbGraph { - private final static int DEFAULT_TIMEOUT = 30000; + private final static int DEFAULT_TIMEOUT = 30000; - protected Client client = null; - protected Key.UpdateMode updateMode = Key.UpdateMode.INSERT; - protected int propertiesBatchNum = GdbWriterConfig.DEFAULT_BATCH_PROPERTY_NUM; - protected boolean session = false; + protected Client client = null; + protected Key.UpdateMode updateMode = Key.UpdateMode.INSERT; + protected int propertiesBatchNum = DEFAULT_BATCH_PROPERTY_NUM; + protected boolean session = false; + protected int maxRequestLength = GdbWriterConfig.MAX_REQUEST_LENGTH; + protected AbstractGdbGraph() {} - protected AbstractGdbGraph() {} + protected AbstractGdbGraph(final Configuration config, final boolean session) { + initClient(config, session); + } - protected AbstractGdbGraph(Configuration config, boolean session) { - initClient(config, session); - } + protected void initClient(final Configuration config, final boolean session) { + this.updateMode = Key.UpdateMode.valueOf(config.getString(Key.UPDATE_MODE, "INSERT")); + log.info("init graphdb client"); + final String host = config.getString(Key.HOST); + final int port = config.getInt(Key.PORT); + final String username = config.getString(Key.USERNAME); + final String password = config.getString(Key.PASSWORD); + int maxDepthPerConnection = + config.getInt(Key.MAX_IN_PROCESS_PER_CONNECTION, GdbWriterConfig.DEFAULT_MAX_IN_PROCESS_PER_CONNECTION); - protected void initClient(Configuration config, boolean session) { - updateMode = Key.UpdateMode.valueOf(config.getString(Key.UPDATE_MODE, "INSERT")); - log.info("init graphdb client"); - String host = config.getString(Key.HOST); - int port = config.getInt(Key.PORT); - String username = config.getString(Key.USERNAME); - String password = config.getString(Key.PASSWORD); - int maxDepthPerConnection = config.getInt(Key.MAX_IN_PROCESS_PER_CONNECTION, - GdbWriterConfig.DEFAULT_MAX_IN_PROCESS_PER_CONNECTION); + int maxConnectionPoolSize = + config.getInt(Key.MAX_CONNECTION_POOL_SIZE, GdbWriterConfig.DEFAULT_MAX_CONNECTION_POOL_SIZE); - int maxConnectionPoolSize = config.getInt(Key.MAX_CONNECTION_POOL_SIZE, - GdbWriterConfig.DEFAULT_MAX_CONNECTION_POOL_SIZE); + int maxSimultaneousUsagePerConnection = config.getInt(Key.MAX_SIMULTANEOUS_USAGE_PER_CONNECTION, + GdbWriterConfig.DEFAULT_MAX_SIMULTANEOUS_USAGE_PER_CONNECTION); - int maxSimultaneousUsagePerConnection = config.getInt(Key.MAX_SIMULTANEOUS_USAGE_PER_CONNECTION, - GdbWriterConfig.DEFAULT_MAX_SIMULTANEOUS_USAGE_PER_CONNECTION); + this.session = session; + if (this.session) { + maxConnectionPoolSize = GdbWriterConfig.DEFAULT_MAX_CONNECTION_POOL_SIZE; + maxDepthPerConnection = GdbWriterConfig.DEFAULT_MAX_IN_PROCESS_PER_CONNECTION; + maxSimultaneousUsagePerConnection = GdbWriterConfig.DEFAULT_MAX_SIMULTANEOUS_USAGE_PER_CONNECTION; + } - this.session = session; - if (this.session) { - maxConnectionPoolSize = GdbWriterConfig.DEFAULT_MAX_CONNECTION_POOL_SIZE; - maxDepthPerConnection = GdbWriterConfig.DEFAULT_MAX_IN_PROCESS_PER_CONNECTION; - maxSimultaneousUsagePerConnection = GdbWriterConfig.DEFAULT_MAX_SIMULTANEOUS_USAGE_PER_CONNECTION; - } + try { + final Cluster cluster = Cluster.build(host).port(port).credentials(username, password) + .serializer(Serializers.GRAPHBINARY_V1D0).maxContentLength(1048576) + .maxInProcessPerConnection(maxDepthPerConnection).minInProcessPerConnection(0) + .maxConnectionPoolSize(maxConnectionPoolSize).minConnectionPoolSize(maxConnectionPoolSize) + .maxSimultaneousUsagePerConnection(maxSimultaneousUsagePerConnection).resultIterationBatchSize(64) + .create(); + this.client = session ? cluster.connect(UUID.randomUUID().toString()).init() : cluster.connect().init(); + warmClient(maxConnectionPoolSize * maxDepthPerConnection); + } catch (final RuntimeException e) { + log.error("Failed to connect to GDB {}:{}, due to {}", host, port, e); + throw e; + } - try { - Cluster cluster = Cluster.build(host).port(port).credentials(username, password) - .serializer(Serializers.GRAPHBINARY_V1D0) - .maxContentLength(1048576) - .maxInProcessPerConnection(maxDepthPerConnection) - .minInProcessPerConnection(0) - .maxConnectionPoolSize(maxConnectionPoolSize) - .minConnectionPoolSize(maxConnectionPoolSize) - .maxSimultaneousUsagePerConnection(maxSimultaneousUsagePerConnection) - .resultIterationBatchSize(64) - .create(); - client = session ? cluster.connect(UUID.randomUUID().toString()).init() : cluster.connect().init(); - warmClient(maxConnectionPoolSize*maxDepthPerConnection); - } catch (RuntimeException e) { - log.error("Failed to connect to GDB {}:{}, due to {}", host, port, e); - throw e; - } + this.propertiesBatchNum = config.getInt(Key.MAX_PROPERTIES_BATCH_NUM, DEFAULT_BATCH_PROPERTY_NUM); + this.maxRequestLength = config.getInt(Key.MAX_GDB_REQUEST_LENGTH, MAX_REQUEST_LENGTH); + } - propertiesBatchNum = config.getInt(Key.MAX_PROPERTIES_BATCH_NUM, GdbWriterConfig.DEFAULT_BATCH_PROPERTY_NUM); - } + /** + * @param dsl + * @param parameters + */ + protected void runInternal(final String dsl, final Map parameters) throws Exception { + final RequestOptions.Builder options = RequestOptions.build().timeout(DEFAULT_TIMEOUT); + if (parameters != null && !parameters.isEmpty()) { + parameters.forEach(options::addParameter); + } + final ResultSet results = this.client.submitAsync(dsl, options.create()).get(DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS); + results.all().get(DEFAULT_TIMEOUT + 1000, TimeUnit.MILLISECONDS); + } - /** - * @param dsl - * @param parameters - */ - protected void runInternal(String dsl, final Map parameters) throws Exception { - RequestOptions.Builder options = RequestOptions.build().timeout(DEFAULT_TIMEOUT); - if (parameters != null && !parameters.isEmpty()) { - parameters.forEach(options::addParameter); - } + void beginTx() { + if (!this.session) { + return; + } - ResultSet results = client.submitAsync(dsl, options.create()).get(DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS); - results.all().get(DEFAULT_TIMEOUT + 1000, TimeUnit.MILLISECONDS); - } + final String dsl = "g.tx().open()"; + this.client.submit(dsl).all().join(); + } - void beginTx() { - if (!session) { - return; - } + void doCommit() { + if (!this.session) { + return; + } - String dsl = "g.tx().open()"; - client.submit(dsl).all().join(); - } + try { + final String dsl = "g.tx().commit()"; + this.client.submit(dsl).all().join(); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } - void doCommit() { - if (!session) { - return; - } + void doRollback() { + if (!this.session) { + return; + } - try { - String dsl = "g.tx().commit()"; - client.submit(dsl).all().join(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } + final String dsl = "g.tx().rollback()"; + this.client.submit(dsl).all().join(); + } - void doRollback() { - if (!session) { - return; - } + private void warmClient(final int num) { + try { + beginTx(); + runInternal("g.V('test')", null); + doCommit(); + log.info("warm graphdb client over"); + } catch (final Exception e) { + log.error("warmClient error"); + doRollback(); + throw new RuntimeException(e); + } + } - String dsl = "g.tx().rollback()"; - client.submit(dsl).all().join(); - } - - private void warmClient(int num) { - try { - beginTx(); - runInternal("g.V('test')", null); - doCommit(); - log.info("warm graphdb client over"); - } catch (Exception e) { - log.error("warmClient error"); - doRollback(); - throw new RuntimeException(e); - } - } - - @Override - public void close() { - if (client != null) { - log.info("close graphdb client"); - client.close(); - } - } + @Override + public void close() { + if (this.client != null) { + log.info("close graphdb client"); + this.client.close(); + } + } } diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/GdbEdge.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/GdbEdge.java index d42c9182..0bd42057 100644 --- a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/GdbEdge.java +++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/GdbEdge.java @@ -3,7 +3,8 @@ */ package com.alibaba.datax.plugin.writer.gdbwriter.model; -import lombok.Data; +import com.alibaba.datax.plugin.writer.gdbwriter.mapping.MapperConfig; + import lombok.EqualsAndHashCode; import lombok.ToString; @@ -11,10 +12,33 @@ import lombok.ToString; * @author jerrywang * */ -@Data @EqualsAndHashCode(callSuper = true) @ToString(callSuper = true) public class GdbEdge extends GdbElement { - private String from = null; - private String to = null; + private String from = null; + private String to = null; + + public String getFrom() { + return this.from; + } + + public void setFrom(final String from) { + final int maxIdLength = MapperConfig.getInstance().getMaxIdLength(); + if (from.length() > maxIdLength) { + throw new IllegalArgumentException("from length over limit(" + maxIdLength + ")"); + } + this.from = from; + } + + public String getTo() { + return this.to; + } + + public void setTo(final String to) { + final int maxIdLength = MapperConfig.getInstance().getMaxIdLength(); + if (to.length() > maxIdLength) { + throw new IllegalArgumentException("to length over limit(" + maxIdLength + ")"); + } + this.to = to; + } } diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/GdbElement.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/GdbElement.java index af3c7090..3d513a6a 100644 --- a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/GdbElement.java +++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/GdbElement.java @@ -3,18 +3,107 @@ */ package com.alibaba.datax.plugin.writer.gdbwriter.model; -import java.util.HashMap; -import java.util.Map; +import java.util.LinkedList; +import java.util.List; -import lombok.Data; +import com.alibaba.datax.plugin.writer.gdbwriter.Key.PropertyType; +import com.alibaba.datax.plugin.writer.gdbwriter.mapping.MapperConfig; /** * @author jerrywang * */ -@Data public class GdbElement { - String id = null; - String label = null; - Map properties = new HashMap<>(); + private String id = null; + private String label = null; + private List properties = new LinkedList<>(); + + public String getId() { + return this.id; + } + + public void setId(final String id) { + final int maxIdLength = MapperConfig.getInstance().getMaxIdLength(); + if (id.length() > maxIdLength) { + throw new IllegalArgumentException("id length over limit(" + maxIdLength + ")"); + } + this.id = id; + } + + public String getLabel() { + return this.label; + } + + public void setLabel(final String label) { + final int maxLabelLength = MapperConfig.getInstance().getMaxLabelLength(); + if (label.length() > maxLabelLength) { + throw new IllegalArgumentException("label length over limit(" + maxLabelLength + ")"); + } + this.label = label; + } + + public List getProperties() { + return this.properties; + } + + public void addProperty(final String propKey, final Object propValue, final PropertyType card) { + if (propKey == null || propValue == null) { + return; + } + + final int maxPropKeyLength = MapperConfig.getInstance().getMaxPropKeyLength(); + if (propKey.length() > maxPropKeyLength) { + throw new IllegalArgumentException("property key length over limit(" + maxPropKeyLength + ")"); + } + if (propValue instanceof String) { + final int maxPropValueLength = MapperConfig.getInstance().getMaxPropValueLength(); + if (((String)propValue).length() > maxPropKeyLength) { + throw new IllegalArgumentException("property value length over limit(" + maxPropValueLength + ")"); + } + } + + this.properties.add(new GdbProperty(propKey, propValue, card)); + } + + public void addProperty(final String propKey, final Object propValue) { + addProperty(propKey, propValue, PropertyType.single); + } + + @Override + public String toString() { + final StringBuffer sb = new StringBuffer(this.id + "[" + this.label + "]{"); + this.properties.forEach(n -> { + sb.append(n.cardinality.name()); + sb.append("["); + sb.append(n.key); + sb.append(" - "); + sb.append(String.valueOf(n.value)); + sb.append("]"); + }); + return sb.toString(); + } + + public static class GdbProperty { + private String key; + private Object value; + private PropertyType cardinality; + + private GdbProperty(final String key, final Object value, final PropertyType card) { + this.key = key; + this.value = value; + this.cardinality = card; + } + + public PropertyType getCardinality() { + return this.cardinality; + } + + public String getKey() { + return this.key; + } + + public Object getValue() { + return this.value; + } + } } diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/GdbGraph.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/GdbGraph.java index 5b98c502..5d9b4508 100644 --- a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/GdbGraph.java +++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/GdbGraph.java @@ -3,18 +3,19 @@ */ package com.alibaba.datax.plugin.writer.gdbwriter.model; -import com.alibaba.datax.common.element.Record; -import groovy.lang.Tuple2; - import java.util.List; +import com.alibaba.datax.common.element.Record; + +import groovy.lang.Tuple2; + /** * @author jerrywang * */ public interface GdbGraph extends AutoCloseable { - List> add(List> records); + List> add(List> records); - @Override - void close(); + @Override + void close(); } diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/ScriptGdbGraph.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/ScriptGdbGraph.java index 7f898431..9ecee8ab 100644 --- a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/ScriptGdbGraph.java +++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/ScriptGdbGraph.java @@ -3,15 +3,17 @@ */ package com.alibaba.datax.plugin.writer.gdbwriter.model; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; import com.alibaba.datax.common.element.Record; import com.alibaba.datax.common.util.Configuration; - import com.alibaba.datax.plugin.writer.gdbwriter.Key; import com.alibaba.datax.plugin.writer.gdbwriter.util.GdbDuplicateIdException; -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; + import groovy.lang.Tuple2; import lombok.extern.slf4j.Slf4j; @@ -21,176 +23,198 @@ import lombok.extern.slf4j.Slf4j; */ @Slf4j public class ScriptGdbGraph extends AbstractGdbGraph { - private static final String VAR_PREFIX = "GDB___"; - private static final String VAR_ID = VAR_PREFIX + "id"; - private static final String VAR_LABEL = VAR_PREFIX + "label"; - private static final String VAR_FROM = VAR_PREFIX + "from"; - private static final String VAR_TO = VAR_PREFIX + "to"; - private static final String VAR_PROP_KEY = VAR_PREFIX + "PK"; - private static final String VAR_PROP_VALUE = VAR_PREFIX + "PV"; - private static final String ADD_V_START = "g.addV(" + VAR_LABEL + ").property(id, " + VAR_ID + ")"; - private static final String ADD_E_START = "g.addE(" + VAR_LABEL + ").property(id, " + VAR_ID + ").from(V(" - + VAR_FROM + ")).to(V(" + VAR_TO + "))"; + private static final String VAR_PREFIX = "GDB___"; + private static final String VAR_ID = VAR_PREFIX + "id"; + private static final String VAR_LABEL = VAR_PREFIX + "label"; + private static final String VAR_FROM = VAR_PREFIX + "from"; + private static final String VAR_TO = VAR_PREFIX + "to"; + private static final String VAR_PROP_KEY = VAR_PREFIX + "PK"; + private static final String VAR_PROP_VALUE = VAR_PREFIX + "PV"; + private static final String ADD_V_START = "g.addV(" + VAR_LABEL + ").property(id, " + VAR_ID + ")"; + private static final String ADD_E_START = + "g.addE(" + VAR_LABEL + ").property(id, " + VAR_ID + ").from(V(" + VAR_FROM + ")).to(V(" + VAR_TO + "))"; - private static final String UPDATE_V_START = "g.V("+VAR_ID+")"; - private static final String UPDATE_E_START = "g.E("+VAR_ID+")"; + private static final String UPDATE_V_START = "g.V(" + VAR_ID + ")"; + private static final String UPDATE_E_START = "g.E(" + VAR_ID + ")"; - private Cache propertyCache; - private Random random; + private Random random; - public ScriptGdbGraph() { - propertyCache = Caffeine.newBuilder().maximumSize(1024).build(); - random = new Random(); - } + public ScriptGdbGraph() { + this.random = new Random(); + } - public ScriptGdbGraph(Configuration config, boolean session) { - super(config, session); + public ScriptGdbGraph(final Configuration config, final boolean session) { + super(config, session); - propertyCache = Caffeine.newBuilder().maximumSize(1024).build(); - random = new Random(); + this.random = new Random(); + log.info("Init as ScriptGdbGraph."); + } - log.info("Init as ScriptGdbGraph."); - } + /** + * Apply list of {@link GdbElement} to GDB, return the failed records + * + * @param records + * list of element to apply + * @return + */ + @Override + public List> add(final List> records) { + final List> errors = new ArrayList<>(); + try { + beginTx(); + for (final Tuple2 elementTuple2 : records) { + try { + addInternal(elementTuple2.getSecond()); + } catch (final Exception e) { + errors.add(new Tuple2<>(elementTuple2.getFirst(), e)); + } + } + doCommit(); + } catch (final Exception ex) { + doRollback(); + throw new RuntimeException(ex); + } + return errors; + } - /** - * Apply list of {@link GdbElement} to GDB, return the failed records - * @param records list of element to apply - * @return - */ - @Override - public List> add(List> records) { - List> errors = new ArrayList<>(); - try { - beginTx(); - for (Tuple2 elementTuple2 : records) { - try { - addInternal(elementTuple2.getSecond()); - } catch (Exception e) { - errors.add(new Tuple2<>(elementTuple2.getFirst(), e)); - } - } - doCommit(); - } catch (Exception ex) { - doRollback(); - throw new RuntimeException(ex); - } - return errors; - } + private void addInternal(final GdbElement element) { + try { + addInternal(element, false); + } catch (final GdbDuplicateIdException e) { + if (this.updateMode == Key.UpdateMode.SKIP) { + log.debug("Skip duplicate id {}", element.getId()); + } else if (this.updateMode == Key.UpdateMode.INSERT) { + throw new RuntimeException(e); + } else if (this.updateMode == Key.UpdateMode.MERGE) { + if (element.getProperties().isEmpty()) { + return; + } - private void addInternal(GdbElement element) { - try { - addInternal(element, false); - } catch (GdbDuplicateIdException e) { - if (updateMode == Key.UpdateMode.SKIP) { - log.debug("Skip duplicate id {}", element.getId()); - } else if (updateMode == Key.UpdateMode.INSERT) { - throw new RuntimeException(e); - } else if (updateMode == Key.UpdateMode.MERGE) { - if (element.getProperties().isEmpty()) { - return; - } + try { + addInternal(element, true); + } catch (final GdbDuplicateIdException e1) { + log.error("duplicate id {} while update...", element.getId()); + throw new RuntimeException(e1); + } + } + } + } - try { - addInternal(element, true); - } catch (GdbDuplicateIdException e1) { - log.error("duplicate id {} while update...", element.getId()); - throw new RuntimeException(e1); - } - } - } - } + private void addInternal(final GdbElement element, final boolean update) throws GdbDuplicateIdException { + boolean firstAdd = !update; + final boolean isVertex = (element instanceof GdbVertex); + final List params = element.getProperties(); + final List subParams = new ArrayList<>(this.propertiesBatchNum); - private void addInternal(GdbElement element, boolean update) throws GdbDuplicateIdException { - Map params = element.getProperties(); - Map subParams = new HashMap<>(propertiesBatchNum); - boolean firstAdd = !update; - boolean isVertex = (element instanceof GdbVertex); + final int idLength = element.getId().length(); + int attachLength = element.getLabel().length(); + if (element instanceof GdbEdge) { + attachLength += ((GdbEdge)element).getFrom().length(); + attachLength += ((GdbEdge)element).getTo().length(); + } - for (Map.Entry entry : params.entrySet()) { - subParams.put(entry.getKey(), entry.getValue()); - if (subParams.size() >= propertiesBatchNum) { - setGraphDbElement(element, subParams, isVertex, firstAdd); - firstAdd = false; - subParams.clear(); - } - } - if (!subParams.isEmpty() || firstAdd) { - setGraphDbElement(element, subParams, isVertex, firstAdd); - } - } + int requestLength = idLength; + for (final GdbElement.GdbProperty entry : params) { + final String propKey = entry.getKey(); + final Object propValue = entry.getValue(); - private Tuple2> buildDsl(GdbElement element, - Map properties, - boolean isVertex, boolean firstAdd) { - Map params = new HashMap<>(); + int appendLength = propKey.length(); + if (propValue instanceof String) { + appendLength += ((String)propValue).length(); + } - String dslPropertyPart = propertyCache.get(properties.size(), keys -> { - final StringBuilder sb = new StringBuilder(); - for (int i = 0; i < keys; i++) { - sb.append(".property(").append(VAR_PROP_KEY).append(i) - .append(", ").append(VAR_PROP_VALUE).append(i).append(")"); - } - return sb.toString(); - }); + if (checkSplitDsl(firstAdd, requestLength, attachLength, appendLength, subParams.size())) { + setGraphDbElement(element, subParams, isVertex, firstAdd); + firstAdd = false; + subParams.clear(); + requestLength = idLength; + } - String dsl; - if (isVertex) { - dsl = (firstAdd ? ADD_V_START : UPDATE_V_START) + dslPropertyPart; - } else { - dsl = (firstAdd ? ADD_E_START : UPDATE_E_START) + dslPropertyPart; - if (firstAdd) { - params.put(VAR_FROM, ((GdbEdge)element).getFrom()); - params.put(VAR_TO, ((GdbEdge)element).getTo()); - } - } + requestLength += appendLength; + subParams.add(entry); + } + if (!subParams.isEmpty() || firstAdd) { + checkSplitDsl(firstAdd, requestLength, attachLength, 0, 0); + setGraphDbElement(element, subParams, isVertex, firstAdd); + } + } - int index = 0; - for (Map.Entry entry : properties.entrySet()) { - params.put(VAR_PROP_KEY+index, entry.getKey()); - params.put(VAR_PROP_VALUE+index, entry.getValue()); - index++; - } + private boolean checkSplitDsl(final boolean firstAdd, final int requestLength, final int attachLength, final int appendLength, + final int propNum) { + final int length = firstAdd ? requestLength + attachLength : requestLength; + if (length > this.maxRequestLength) { + throw new IllegalArgumentException("request length over limit(" + this.maxRequestLength + ")"); + } + return length + appendLength > this.maxRequestLength || propNum >= this.propertiesBatchNum; + } - if (firstAdd) { - params.put(VAR_LABEL, element.getLabel()); - } - params.put(VAR_ID, element.getId()); + private Tuple2> buildDsl(final GdbElement element, final List properties, + final boolean isVertex, final boolean firstAdd) { + final Map params = new HashMap<>(); + final StringBuilder sb = new StringBuilder(); + if (isVertex) { + sb.append(firstAdd ? ADD_V_START : UPDATE_V_START); + } else { + sb.append(firstAdd ? ADD_E_START : UPDATE_E_START); + } - return new Tuple2<>(dsl, params); - } + for (int i = 0; i < properties.size(); i++) { + final GdbElement.GdbProperty prop = properties.get(i); - private void setGraphDbElement(GdbElement element, Map properties, - boolean isVertex, boolean firstAdd) throws GdbDuplicateIdException { - int retry = 10; - int idleTime = random.nextInt(10) + 10; - Tuple2> elementDsl = buildDsl(element, properties, isVertex, firstAdd); + sb.append(".property("); + if (prop.getCardinality() == Key.PropertyType.set) { + sb.append("set, "); + } + sb.append(VAR_PROP_KEY).append(i).append(", ").append(VAR_PROP_VALUE).append(i).append(")"); - while (retry > 0) { - try { - runInternal(elementDsl.getFirst(), elementDsl.getSecond()); - log.debug("AddElement {}", element.getId()); - return; - } catch (Exception e) { - String cause = e.getCause() == null ? "" : e.getCause().toString(); - if (cause.contains("rejected from")) { - retry--; - try { - Thread.sleep(idleTime); - } catch (InterruptedException e1) { - // ... - } - idleTime = Math.min(idleTime * 2, 2000); - continue; - } else if (firstAdd && cause.contains("GraphDB id exists")) { - throw new GdbDuplicateIdException(e); - } - log.error("Add Failed id {}, dsl {}, params {}, e {}", element.getId(), - elementDsl.getFirst(), elementDsl.getSecond(), e); - throw new RuntimeException(e); - } - } - log.error("Add Failed id {}, dsl {}, params {}", element.getId(), - elementDsl.getFirst(), elementDsl.getSecond()); - throw new RuntimeException("failed to queue new element to server"); - } + params.put(VAR_PROP_KEY + i, prop.getKey()); + params.put(VAR_PROP_VALUE + i, prop.getValue()); + } + + if (firstAdd) { + params.put(VAR_LABEL, element.getLabel()); + if (!isVertex) { + params.put(VAR_FROM, ((GdbEdge)element).getFrom()); + params.put(VAR_TO, ((GdbEdge)element).getTo()); + } + } + params.put(VAR_ID, element.getId()); + + return new Tuple2<>(sb.toString(), params); + } + + private void setGraphDbElement(final GdbElement element, final List properties, final boolean isVertex, + final boolean firstAdd) throws GdbDuplicateIdException { + int retry = 10; + int idleTime = this.random.nextInt(10) + 10; + final Tuple2> elementDsl = buildDsl(element, properties, isVertex, firstAdd); + + while (retry > 0) { + try { + runInternal(elementDsl.getFirst(), elementDsl.getSecond()); + log.debug("AddElement {}", element.getId()); + return; + } catch (final Exception e) { + final String cause = e.getCause() == null ? "" : e.getCause().toString(); + if (cause.contains("rejected from") || cause.contains("Timeout waiting to lock key")) { + retry--; + try { + Thread.sleep(idleTime); + } catch (final InterruptedException e1) { + // ... + } + idleTime = Math.min(idleTime * 2, 2000); + continue; + } else if (firstAdd && cause.contains("GraphDB id exists")) { + throw new GdbDuplicateIdException(e); + } + log.error("Add Failed id {}, dsl {}, params {}, e {}", element.getId(), elementDsl.getFirst(), + elementDsl.getSecond(), e); + throw new RuntimeException(e); + } + } + log.error("Add Failed id {}, dsl {}, params {}", element.getId(), elementDsl.getFirst(), + elementDsl.getSecond()); + throw new RuntimeException("failed to queue new element to server"); + } } diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/util/ConfigHelper.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/util/ConfigHelper.java index 77175197..178b5e7c 100644 --- a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/util/ConfigHelper.java +++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/util/ConfigHelper.java @@ -7,53 +7,57 @@ import java.io.IOException; import java.io.InputStream; import java.util.function.Supplier; +import org.apache.commons.lang3.StringUtils; + import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.plugin.writer.gdbwriter.GdbWriterErrorCode; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; -import org.apache.commons.lang3.StringUtils; - /** * @author jerrywang * */ public interface ConfigHelper { - static void assertConfig(String key, Supplier f) { - if (!f.get()) { - throw DataXException.asDataXException(GdbWriterErrorCode.BAD_CONFIG_VALUE, key); - } - } + static void assertConfig(final String key, final Supplier f) { + if (!f.get()) { + throw DataXException.asDataXException(GdbWriterErrorCode.BAD_CONFIG_VALUE, key); + } + } - static void assertHasContent(Configuration config, String key) { - assertConfig(key, () -> StringUtils.isNotBlank(config.getString(key))); - } + static void assertHasContent(final Configuration config, final String key) { + assertConfig(key, () -> StringUtils.isNotBlank(config.getString(key))); + } - /** - * NOTE: {@code Configuration::get(String, Class)} doesn't work. - * - * @param conf Configuration - * @param key key path to configuration - * @param cls Class of result type - * @return the target configuration object of type T - */ - static T getConfig(Configuration conf, String key, Class cls) { - JSONObject j = (JSONObject) conf.get(key); - return JSON.toJavaObject(j, cls); - } - - /** - * Create a configuration from the specified file on the classpath. - * - * @param name file name - * @return Configuration instance. - */ - static Configuration fromClasspath(String name) { - try (InputStream is = Thread.currentThread().getContextClassLoader().getResourceAsStream(name)) { - return Configuration.from(is); - } catch (IOException e) { - throw new IllegalArgumentException("File not found: " + name); - } - } + /** + * NOTE: {@code Configuration::get(String, Class)} doesn't work. + * + * @param conf + * Configuration + * @param key + * key path to configuration + * @param cls + * Class of result type + * @return the target configuration object of type T + */ + static T getConfig(final Configuration conf, final String key, final Class cls) { + final JSONObject j = (JSONObject)conf.get(key); + return JSON.toJavaObject(j, cls); + } + + /** + * Create a configuration from the specified file on the classpath. + * + * @param name + * file name + * @return Configuration instance. + */ + static Configuration fromClasspath(final String name) { + try (final InputStream is = Thread.currentThread().getContextClassLoader().getResourceAsStream(name)) { + return Configuration.from(is); + } catch (final IOException e) { + throw new IllegalArgumentException("File not found: " + name); + } + } } diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/util/GdbDuplicateIdException.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/util/GdbDuplicateIdException.java index e531d51b..dba641b0 100644 --- a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/util/GdbDuplicateIdException.java +++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/util/GdbDuplicateIdException.java @@ -1,9 +1,8 @@ /* - * (C) 2019-present Alibaba Group Holding Limited. + * (C) 2019-present Alibaba Group Holding Limited. * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License version 2 as - * published by the Free Software Foundation. + * This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public + * License version 2 as published by the Free Software Foundation. */ package com.alibaba.datax.plugin.writer.gdbwriter.util; @@ -13,11 +12,11 @@ package com.alibaba.datax.plugin.writer.gdbwriter.util; */ public class GdbDuplicateIdException extends Exception { - public GdbDuplicateIdException(Exception e) { - super(e); - } + public GdbDuplicateIdException(Exception e) { + super(e); + } - public GdbDuplicateIdException() { - super(); - } + public GdbDuplicateIdException() { + super(); + } }