diff --git a/gdbreader/doc/gdbreader.md b/gdbreader/doc/gdbreader.md new file mode 100644 index 00000000..e883f20d --- /dev/null +++ b/gdbreader/doc/gdbreader.md @@ -0,0 +1,260 @@ + +# DataX GDBReader + +## 1. 快速介绍 + +GDBReader插件实现读取GDB实例数据的功能,通过`Gremlin Client`连接远程GDB实例,按配置提供的`label`生成查询DSL,遍历点或边数据,包括属性数据,并将数据写入到Record中给到Writer使用。 + +## 2. 实现原理 + +GDBReader使用`Gremlin Client`连接GDB实例,按`label`分不同Task取点或边数据。 +单个Task中按`label`遍历点或边的id,再切分范围分多次请求查询点或边和属性数据,最后将点或边数据根据配置转换成指定格式记录发送给下游写插件。 + +GDBReader按`label`切分多个Task并发,同一个`label`的数据批量异步获取来加快读取速度。如果配置读取的`label`列表为空,任务启动前会从GDB查询所有`label`再切分Task。 + +## 3. 功能说明 + +GDB中点和边不同,读取需要区分点和边点配置。 + +### 3.1 点配置样例 + +``` +{ + "job": { + "setting": { + "speed": { + "channel": 1 + } + "errorLimit": { + "record": 1 + } + }, + + "content": [ + { + "reader": { + "name": "gdbreader", + "parameter": { + "host": "10.218.145.24", + "port": 8182, + "username": "***", + "password": "***", + "fetchBatchSize": 100, + "rangeSplitSize": 1000, + "labelType": "VERTEX", + "labels": ["label1", "label2"], + "column": [ + { + "name": "id", + "type": "string", + "columnType": "primaryKey" + }, + { + "name": "label", + "type": "string", + "columnType": "primaryLabel" + }, + { + "name": "age", + "type": "int", + "columnType": "vertexProperty" + } + ] + } + }, + "writer": { + "name": "streamwriter", + "parameter": { + "print": true + } + } + } + ] + } +} +``` + +### 3.2 边配置样例 + +``` +{ + "job": { + "setting": { + "speed": { + "channel": 1 + }, + "errorLimit": { + "record": 1 + } + }, + + "content": [ + { + "reader": { + "name": "gdbreader", + "parameter": { + "host": "10.218.145.24", + "port": 8182, + "username": "***", + "password": "***", + "fetchBatchSize": 100, + "rangeSplitSize": 1000, + "labelType": "EDGE", + "labels": ["label1", "label2"], + "column": [ + { + "name": "id", + "type": "string", + "columnType": "primaryKey" + }, + { + "name": "label", + "type": "string", + "columnType": "primaryLabel" + }, + { + "name": "srcId", + "type": "string", + "columnType": "srcPrimaryKey" + }, + { + "name": "srcLabel", + "type": "string", + "columnType": "srcPrimaryLabel" + }, + { + "name": "dstId", + "type": "string", + "columnType": "srcPrimaryKey" + }, + { + "name": "dstLabel", + "type": "string", + "columnType": "srcPrimaryLabel" + }, + { + "name": "name", + "type": "string", + "columnType": "edgeProperty" + }, + { + "name": "weight", + "type": "double", + "columnType": "edgeProperty" + } + ] + } + }, + + "writer": { + "name": "streamwriter", + "parameter": { + "print": true + } + } + } + ] + } +} +``` + +### 3.3 参数说明 + +* **host** + * 描述:GDB实例连接地址,对应'实例管理'->'基本信息'页面的网络地址 + * 必选:是 + * 默认值:无 + +* **port** + * 描述:GDB实例连接地址对应的端口 + * 必选:是 + * 默认值:8182 + +* **username** + * 描述:GDB实例账号名 + * 必选:是 + * 默认值:无 + +* **password** + * 描述:GDB实例账号名对应的密码 + * 必选:是 + * 默认值:无 + +* **fetchBatchSize** + * 描述:一次GDB请求读取点或边的数量,响应包含点或边以及属性 + * 必选:是 + * 默认值:100 + +* **rangeSplitSize** + * 描述:id遍历,一次遍历请求扫描的id个数 + * 必选:是 + * 默认值:10 \* fetchBatchSize + +* **labels** + * 描述:标签数组,即需要导出的点或边标签,支持读取多个标签,用数组表示。如果留空([]),表示GDB中所有点或边标签 + * 必选:是 + * 默认值:无 + +* **labelType** + * 描述:数据标签类型,支持点、边两种枚举值 + * VERTEX:表示点 + * EDGE:表示边 + * 必选:是 + * 默认值:无 + +* **column** + * 描述:点或边字段映射关系配置 + * 必选:是 + * 默认值:无 + +* **column -> name** + * 描述:点或边映射关系的字段名,指定属性时表示读取的属性名,读取其他字段时会被忽略 + * 必选:是 + * 默认值:无 + +* **column -> type** + * 描述:点或边映射关系的字段类型 + * id, label在GDB中都是string类型,配置非string类型时可能会转换失败 + * 普通属性支持基础类型,包括int, long, float, double, boolean, string + * GDBReader尽量将读取到的数据转换成配置要求的类型,但转换失败会导致该条记录错误 + * 必选:是 + * 默认值:无 + +* **column -> columnType** + * 描述:GDB点或边数据到列数据的映射关系,支持以下枚举值: + * primaryKey: 表示该字段是点或边的id + * primaryLabel: 表示该字段是点或边的label + * srcPrimaryKey: 表示该字段是边关联的起点id,只在读取边时使用 + * srcPrimaryLabel: 表示该字段是边关联的起点label,只在读取边时使用 + * dstPrimaryKey: 表示该字段是边关联的终点id,只在读取边时使用 + * dstPrimaryLabel: 表示该字段是边关联的终点label,只在读取边时使用 + * vertexProperty: 表示该字段是点的属性,只在读取点时使用,应用到SET属性时只读取其中的一个属性值 + * vertexJsonProperty: 表示该字段是点的属性集合,只在读取点时使用。属性集合使用JSON格式输出,包含所有的属性,不能与其他vertexProperty配置一起使用 + * edgeProperty: 表示该字段是边的属性,只在读取边时使用 + * edgeJsonProperty: 表示该字段是边的属性集合,只在读取边时使用。属性集合使用JSON格式输出,包含所有的属性,不能与其他edgeProperty配置一起使用 + * 必选:是 + * 默认值:无 + * vertexJsonProperty格式示例,新增`c`字段区分SET属性,但是SET属性只包含单个属性值时会标记成普通属性 + ``` + {"properties":[ + {"k":"name","t","string","v":"Jack","c":"set"}, + {"k":"name","t","string","v":"Luck","c":"set"}, + {"k":"age","t","int","v":"20","c":"single"} + ]} + ``` + * edgeJsonProperty格式示例,边不支持多值属性 + ``` + {"properties":[ + {"k":"created_at","t","long","v":"153498653"}, + {"k":"weight","t","double","v":"3.14"} + ]} + +## 4 性能报告 +(TODO) + +## 5 使用约束 +无 + +## 6 FAQ +无 + diff --git a/gdbreader/pom.xml b/gdbreader/pom.xml new file mode 100644 index 00000000..a226a21f --- /dev/null +++ b/gdbreader/pom.xml @@ -0,0 +1,125 @@ + + + + datax-all + com.alibaba.datax + 0.0.1-SNAPSHOT + + 4.0.0 + + gdbreader + + com.alibaba.datax + 0.0.1-SNAPSHOT + + + + com.alibaba.datax + datax-common + ${datax-project-version} + + + slf4j-log4j12 + org.slf4j + + + + + com.alibaba.datax + datax-core + ${datax-project-version} + test + + + slf4j-log4j12 + org.slf4j + + + + + + org.slf4j + slf4j-api + + + ch.qos.logback + logback-classic + + + + org.apache.tinkerpop + gremlin-driver + 3.4.1 + + + org.projectlombok + lombok + 1.18.8 + + + org.junit.jupiter + junit-jupiter-api + 5.4.0 + test + + + org.junit.jupiter + junit-jupiter-engine + 5.4.0 + test + + + + + + + + + maven-compiler-plugin + + 1.6 + 1.6 + ${project-sourceEncoding} + + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.22.0 + + + **/*Test*.class + + + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 8 + 8 + + + + + diff --git a/gdbreader/src/main/assembly/package.xml b/gdbreader/src/main/assembly/package.xml new file mode 100644 index 00000000..c834c2f2 --- /dev/null +++ b/gdbreader/src/main/assembly/package.xml @@ -0,0 +1,35 @@ + + + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/reader/gdbreader + + + target/ + + gdbreader-0.0.1-SNAPSHOT.jar + + plugin/reader/gdbreader + + + + + + false + plugin/reader/gdbreader/libs + runtime + + + diff --git a/gdbreader/src/main/java/com/alibaba/datax/plugin/reader/gdbreader/GdbReader.java b/gdbreader/src/main/java/com/alibaba/datax/plugin/reader/gdbreader/GdbReader.java new file mode 100644 index 00000000..025e7b51 --- /dev/null +++ b/gdbreader/src/main/java/com/alibaba/datax/plugin/reader/gdbreader/GdbReader.java @@ -0,0 +1,231 @@ +package com.alibaba.datax.plugin.reader.gdbreader; + +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.plugin.RecordSender; +import com.alibaba.datax.common.spi.Reader; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.reader.gdbreader.mapping.DefaultGdbMapper; +import com.alibaba.datax.plugin.reader.gdbreader.mapping.MappingRule; +import com.alibaba.datax.plugin.reader.gdbreader.mapping.MappingRuleFactory; +import com.alibaba.datax.plugin.reader.gdbreader.model.GdbElement; +import com.alibaba.datax.plugin.reader.gdbreader.model.GdbGraph; +import com.alibaba.datax.plugin.reader.gdbreader.model.ScriptGdbGraph; +import com.alibaba.datax.plugin.reader.gdbreader.util.ConfigHelper; +import org.apache.tinkerpop.gremlin.driver.ResultSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.LinkedList; +import java.util.List; + +public class GdbReader extends Reader { + private final static int DEFAULT_FETCH_BATCH_SIZE = 200; + private static GdbGraph graph; + private static Key.ExportType exportType; + + /** + * Job 中的方法仅执行一次,Task 中方法会由框架启动多个 Task 线程并行执行。 + *

+ * 整个 Reader 执行流程是: + *

+     * Job类init-->prepare-->split
+     *
+     *                            Task类init-->prepare-->startRead-->post-->destroy
+     *                            Task类init-->prepare-->startRead-->post-->destroy
+     *
+     *                                                                             Job类post-->destroy
+     * 
+ */ + public static class Job extends Reader.Job { + private static final Logger LOG = LoggerFactory.getLogger(Job.class); + + private Configuration jobConfig = null; + + @Override + public void init() { + this.jobConfig = super.getPluginJobConf(); + + /** + * 注意:此方法仅执行一次。 + * 最佳实践:通常在这里对用户的配置进行校验:是否缺失必填项?有无错误值?有没有无关配置项?... + * 并给出清晰的报错/警告提示。校验通常建议采用静态工具类进行,以保证本类结构清晰。 + */ + + ConfigHelper.assertGdbClient(jobConfig); + ConfigHelper.assertLabels(jobConfig); + try { + exportType = Key.ExportType.valueOf(jobConfig.getString(Key.EXPORT_TYPE)); + } catch (NullPointerException | IllegalArgumentException e) { + throw DataXException.asDataXException(GdbReaderErrorCode.BAD_CONFIG_VALUE, Key.EXPORT_TYPE); + } + } + + @Override + public void prepare() { + /** + * 注意:此方法仅执行一次。 + * 最佳实践:如果 Job 中有需要进行数据同步之前的处理,可以在此处完成,如果没有必要则可以直接去掉。 + */ + + try { + graph = new ScriptGdbGraph(jobConfig, exportType); + } catch (RuntimeException e) { + throw DataXException.asDataXException(GdbReaderErrorCode.FAIL_CLIENT_CONNECT, e.getMessage()); + } + } + + @Override + public List split(int adviceNumber) { + /** + * 注意:此方法仅执行一次。 + * 最佳实践:通常采用工具静态类完成把 Job 配置切分成多个 Task 配置的工作。 + * 这里的 adviceNumber 是框架根据用户的同步速度的要求建议的切分份数,仅供参考,不是强制必须切分的份数。 + */ + List labels = ConfigHelper.assertLabels(jobConfig); + + /** + * 配置label列表为空时,尝试查询GDB中所有label,添加到读取列表 + */ + if (labels.isEmpty()) { + try { + labels.addAll(graph.getLabels().keySet()); + } catch (RuntimeException ex) { + throw DataXException.asDataXException(GdbReaderErrorCode.FAIL_FETCH_LABELS, ex.getMessage()); + } + } + + if (labels.isEmpty()) { + throw DataXException.asDataXException(GdbReaderErrorCode.FAIL_FETCH_LABELS, "none labels to read"); + } + + return ConfigHelper.splitConfig(jobConfig, labels); + } + + @Override + public void post() { + /** + * 注意:此方法仅执行一次。 + * 最佳实践:如果 Job 中有需要进行数据同步之后的后续处理,可以在此处完成。 + */ + } + + @Override + public void destroy() { + /** + * 注意:此方法仅执行一次。 + * 最佳实践:通常配合 Job 中的 post() 方法一起完成 Job 的资源释放。 + */ + try { + graph.close(); + } catch (Exception ex) { + LOG.error("Failed to close client : {}", ex); + } + } + + } + + public static class Task extends Reader.Task { + private static final Logger LOG = LoggerFactory.getLogger(Task.class); + private static MappingRule rule; + private Configuration taskConfig; + private String fetchLabel = null; + + private int rangeSplitSize; + private int fetchBatchSize; + + @Override + public void init() { + this.taskConfig = super.getPluginJobConf(); + + /** + * 注意:此方法每个 Task 都会执行一次。 + * 最佳实践:此处通过对 taskConfig 配置的读取,进而初始化一些资源为 startRead()做准备。 + */ + fetchLabel = taskConfig.getString(Key.LABEL); + fetchBatchSize = taskConfig.getInt(Key.FETCH_BATCH_SIZE, DEFAULT_FETCH_BATCH_SIZE); + rangeSplitSize = taskConfig.getInt(Key.RANGE_SPLIT_SIZE, fetchBatchSize * 10); + rule = MappingRuleFactory.getInstance().create(taskConfig, exportType); + } + + @Override + public void prepare() { + /** + * 注意:此方法仅执行一次。 + * 最佳实践:如果 Job 中有需要进行数据同步之后的处理,可以在此处完成,如果没有必要则可以直接去掉。 + */ + } + + @Override + public void startRead(RecordSender recordSender) { + /** + * 注意:此方法每个 Task 都会执行一次。 + * 最佳实践:此处适当封装确保简洁清晰完成数据读取工作。 + */ + + String start = ""; + while (true) { + List ids; + try { + ids = graph.fetchIds(fetchLabel, start, rangeSplitSize); + if (ids.isEmpty()) { + break; + } + start = ids.get(ids.size() - 1); + } catch (Exception ex) { + throw DataXException.asDataXException(GdbReaderErrorCode.FAIL_FETCH_IDS, ex.getMessage()); + } + + // send range fetch async + int count = ids.size(); + List resultSets = new LinkedList<>(); + for (int pos = 0; pos < count; pos += fetchBatchSize) { + int rangeSize = Math.min(fetchBatchSize, count - pos); + String endId = ids.get(pos + rangeSize - 1); + String beginId = ids.get(pos); + + List propNames = rule.isHasProperty() ? rule.getPropertyNames() : null; + try { + resultSets.add(graph.fetchElementsAsync(fetchLabel, beginId, endId, propNames)); + } catch (Exception ex) { + // just print error logs and continues + LOG.error("failed to request label: {}, start: {}, end: {}, e: {}", fetchLabel, beginId, endId, ex); + } + } + + // get range fetch dsl results + resultSets.forEach(results -> { + try { + List elements = graph.getElement(results); + elements.forEach(element -> { + Record record = recordSender.createRecord(); + DefaultGdbMapper.getMapper(rule).accept(element, record); + recordSender.sendToWriter(record); + }); + recordSender.flush(); + } catch (Exception ex) { + LOG.error("failed to send records e {}", ex); + } + }); + } + } + + @Override + public void post() { + /** + * 注意:此方法每个 Task 都会执行一次。 + * 最佳实践:如果 Task 中有需要进行数据同步之后的后续处理,可以在此处完成。 + */ + } + + @Override + public void destroy() { + /** + * 注意:此方法每个 Task 都会执行一次。 + * 最佳实践:通常配合Task 中的 post() 方法一起完成 Task 的资源释放。 + */ + } + + } + +} \ No newline at end of file diff --git a/gdbreader/src/main/java/com/alibaba/datax/plugin/reader/gdbreader/GdbReaderErrorCode.java b/gdbreader/src/main/java/com/alibaba/datax/plugin/reader/gdbreader/GdbReaderErrorCode.java new file mode 100644 index 00000000..1d320bbd --- /dev/null +++ b/gdbreader/src/main/java/com/alibaba/datax/plugin/reader/gdbreader/GdbReaderErrorCode.java @@ -0,0 +1,39 @@ +package com.alibaba.datax.plugin.reader.gdbreader; + +import com.alibaba.datax.common.spi.ErrorCode; + +public enum GdbReaderErrorCode implements ErrorCode { + /** + * + */ + BAD_CONFIG_VALUE("GdbReader-00", "The value you configured is invalid."), + FAIL_CLIENT_CONNECT("GdbReader-02", "GDB connection is abnormal."), + UNSUPPORTED_TYPE("GdbReader-03", "Unsupported data type conversion."), + FAIL_FETCH_LABELS("GdbReader-04", "Error pulling all labels, it is recommended to configure the specified label pull."), + FAIL_FETCH_IDS("GdbReader-05", "Pull range id error."), + ; + + private final String code; + private final String description; + + private GdbReaderErrorCode(String code, String description) { + this.code = code; + this.description = description; + } + + @Override + public String getCode() { + return this.code; + } + + @Override + public String getDescription() { + return this.description; + } + + @Override + public String toString() { + return String.format("Code:[%s], Description:[%s]. ", this.code, + this.description); + } +} \ No newline at end of file diff --git a/gdbreader/src/main/java/com/alibaba/datax/plugin/reader/gdbreader/Key.java b/gdbreader/src/main/java/com/alibaba/datax/plugin/reader/gdbreader/Key.java new file mode 100644 index 00000000..31d5e631 --- /dev/null +++ b/gdbreader/src/main/java/com/alibaba/datax/plugin/reader/gdbreader/Key.java @@ -0,0 +1,86 @@ +package com.alibaba.datax.plugin.reader.gdbreader; + +public final class Key { + + /** + * 此处声明插件用到的需要插件使用者提供的配置项 + */ + public final static String HOST = "host"; + public final static String PORT = "port"; + public final static String USERNAME = "username"; + public static final String PASSWORD = "password"; + + public static final String LABEL = "labels"; + public static final String EXPORT_TYPE = "labelType"; + + public static final String RANGE_SPLIT_SIZE = "RangeSplitSize"; + public static final String FETCH_BATCH_SIZE = "fetchBatchSize"; + + public static final String COLUMN = "column"; + public static final String COLUMN_NAME = "name"; + public static final String COLUMN_TYPE = "type"; + public static final String COLUMN_NODE_TYPE = "columnType"; + + public enum ExportType { + /** + * Import vertices + */ + VERTEX, + /** + * Import edges + */ + EDGE + } + + public enum ColumnType { + /** + * vertex or edge id + */ + primaryKey, + + /** + * vertex or edge label + */ + primaryLabel, + + /** + * vertex property + */ + vertexProperty, + + /** + * collects all vertex property to Json list + */ + vertexJsonProperty, + + /** + * start vertex id of edge + */ + srcPrimaryKey, + + /** + * start vertex label of edge + */ + srcPrimaryLabel, + + /** + * end vertex id of edge + */ + dstPrimaryKey, + + /** + * end vertex label of edge + */ + dstPrimaryLabel, + + /** + * edge property + */ + edgeProperty, + + /** + * collects all edge property to Json list + */ + edgeJsonProperty, + } +} diff --git a/gdbreader/src/main/java/com/alibaba/datax/plugin/reader/gdbreader/mapping/DefaultGdbMapper.java b/gdbreader/src/main/java/com/alibaba/datax/plugin/reader/gdbreader/mapping/DefaultGdbMapper.java new file mode 100644 index 00000000..d874cf36 --- /dev/null +++ b/gdbreader/src/main/java/com/alibaba/datax/plugin/reader/gdbreader/mapping/DefaultGdbMapper.java @@ -0,0 +1,150 @@ +/* + * (C) 2019-present Alibaba Group Holding Limited. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + */ +package com.alibaba.datax.plugin.reader.gdbreader.mapping; + +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.plugin.reader.gdbreader.model.GdbElement; +import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceProperty; +import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceVertexProperty; + +import java.util.List; +import java.util.Map; +import java.util.function.BiConsumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * @author : Liu Jianping + * @date : 2019/9/6 + */ + +public class DefaultGdbMapper { + + public static BiConsumer getMapper(MappingRule rule) { + return (gdbElement, record) -> rule.getColumns().forEach(columnMappingRule -> { + Object value = null; + ValueType type = columnMappingRule.getValueType(); + String name = columnMappingRule.getName(); + Map props = gdbElement.getProperties(); + + switch (columnMappingRule.getColumnType()) { + case dstPrimaryKey: + value = gdbElement.getTo(); + break; + case srcPrimaryKey: + value = gdbElement.getFrom(); + break; + case primaryKey: + value = gdbElement.getId(); + break; + case primaryLabel: + value = gdbElement.getLabel(); + break; + case dstPrimaryLabel: + value = gdbElement.getToLabel(); + break; + case srcPrimaryLabel: + value = gdbElement.getFromLabel(); + break; + case vertexProperty: + value = forVertexOnePropertyValue().apply(props.get(name)); + break; + case edgeProperty: + value = forEdgePropertyValue().apply(props.get(name)); + break; + case edgeJsonProperty: + value = forEdgeJsonProperties().apply(props); + break; + case vertexJsonProperty: + value = forVertexJsonProperties().apply(props); + break; + default: + break; + } + record.addColumn(type.applyObject(value)); + }); + } + + + /** + * parser ReferenceProperty value for edge + * + * @return property value + */ + private static Function forEdgePropertyValue() { + return prop -> { + if (prop instanceof ReferenceProperty) { + return ((ReferenceProperty) prop).value(); + } + return null; + }; + } + + /** + * parser ReferenceVertexProperty value for vertex + * + * @return the first property value in list + */ + private static Function forVertexOnePropertyValue() { + return props -> { + if (props instanceof List) { + // get the first one property if more than one + Object o = ((List) props).get(0); + if (o instanceof ReferenceVertexProperty) { + return ((ReferenceVertexProperty) o).value(); + } + } + return null; + }; + } + + /** + * parser all edge properties to json string + * + * @return json string + */ + private static Function, String> forEdgeJsonProperties() { + return props -> "{\"properties\":[" + + props.entrySet().stream().filter(p -> p.getValue() instanceof ReferenceProperty) + .map(p -> "{\"k\":\"" + ((ReferenceProperty) p.getValue()).key() + "\"," + + "\"t\":\"" + ((ReferenceProperty) p.getValue()).value().getClass().getSimpleName().toLowerCase() + "\"," + + "\"v\":\"" + String.valueOf(((ReferenceProperty) p.getValue()).value()) + "\"}") + .collect(Collectors.joining(",")) + + "]}"; + } + + /** + * parser all vertex properties to json string, include set-property + * + * @return json string + */ + private static Function, String> forVertexJsonProperties() { + return props -> "{\"properties\":[" + + props.entrySet().stream().filter(p -> p.getValue() instanceof List) + .map(p -> forVertexPropertyStr().apply((List) p.getValue())) + .collect(Collectors.joining(",")) + + "]}"; + } + + /** + * parser one vertex property to json string item, set 'cardinality' + * + * @return json string item + */ + private static Function, String> forVertexPropertyStr() { + return vp -> { + final String setFlag = vp.size() > 1 ? "set" : "single"; + return vp.stream().filter(p -> p instanceof ReferenceVertexProperty) + .map(p -> "{\"k\":\"" + ((ReferenceVertexProperty) p).key() + "\"," + + "\"t\":\"" + ((ReferenceVertexProperty) p).value().getClass().getSimpleName().toLowerCase() + "\"," + + "\"v\":\"" + String.valueOf(((ReferenceVertexProperty) p).value()) + "\"," + + "\"c\":\"" + setFlag + "\"}") + .collect(Collectors.joining(",")); + }; + } +} diff --git a/gdbreader/src/main/java/com/alibaba/datax/plugin/reader/gdbreader/mapping/MappingRule.java b/gdbreader/src/main/java/com/alibaba/datax/plugin/reader/gdbreader/mapping/MappingRule.java new file mode 100644 index 00000000..7baed498 --- /dev/null +++ b/gdbreader/src/main/java/com/alibaba/datax/plugin/reader/gdbreader/mapping/MappingRule.java @@ -0,0 +1,79 @@ +/* + * (C) 2019-present Alibaba Group Holding Limited. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + */ +package com.alibaba.datax.plugin.reader.gdbreader.mapping; + +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.plugin.reader.gdbreader.GdbReaderErrorCode; +import com.alibaba.datax.plugin.reader.gdbreader.Key.ColumnType; +import com.alibaba.datax.plugin.reader.gdbreader.Key.ExportType; +import lombok.Data; + +import java.util.ArrayList; +import java.util.List; + +/** + * @author : Liu Jianping + * @date : 2019/9/6 + */ + +@Data +public class MappingRule { + private boolean hasRelation = false; + private boolean hasProperty = false; + private ExportType type = ExportType.VERTEX; + + /** + * property names for property key-value + */ + private List propertyNames = new ArrayList<>(); + + private List columns = new ArrayList<>(); + + void addColumn(ColumnType columnType, ValueType type, String name) { + ColumnMappingRule rule = new ColumnMappingRule(); + rule.setColumnType(columnType); + rule.setName(name); + rule.setValueType(type); + + if (columnType == ColumnType.vertexProperty || columnType == ColumnType.edgeProperty) { + propertyNames.add(name); + hasProperty = true; + } + + boolean hasTo = columnType == ColumnType.dstPrimaryKey || columnType == ColumnType.dstPrimaryLabel; + boolean hasFrom = columnType == ColumnType.srcPrimaryKey || columnType == ColumnType.srcPrimaryLabel; + if (hasTo || hasFrom) { + hasRelation = true; + } + + columns.add(rule); + } + + void addJsonColumn(ColumnType columnType) { + ColumnMappingRule rule = new ColumnMappingRule(); + rule.setColumnType(columnType); + rule.setName("json"); + rule.setValueType(ValueType.STRING); + + if (!propertyNames.isEmpty()) { + throw DataXException.asDataXException(GdbReaderErrorCode.BAD_CONFIG_VALUE, "JsonProperties should be only property"); + } + + columns.add(rule); + hasProperty = true; + } + + @Data + protected static class ColumnMappingRule { + private String name = null; + + private ValueType valueType = null; + + private ColumnType columnType = null; + } +} diff --git a/gdbreader/src/main/java/com/alibaba/datax/plugin/reader/gdbreader/mapping/MappingRuleFactory.java b/gdbreader/src/main/java/com/alibaba/datax/plugin/reader/gdbreader/mapping/MappingRuleFactory.java new file mode 100644 index 00000000..c71a19ac --- /dev/null +++ b/gdbreader/src/main/java/com/alibaba/datax/plugin/reader/gdbreader/mapping/MappingRuleFactory.java @@ -0,0 +1,76 @@ +/* + * (C) 2019-present Alibaba Group Holding Limited. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + */ +package com.alibaba.datax.plugin.reader.gdbreader.mapping; + +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.reader.gdbreader.GdbReaderErrorCode; +import com.alibaba.datax.plugin.reader.gdbreader.Key; +import com.alibaba.datax.plugin.reader.gdbreader.Key.ColumnType; +import com.alibaba.datax.plugin.reader.gdbreader.Key.ExportType; +import com.alibaba.datax.plugin.reader.gdbreader.util.ConfigHelper; + +import java.util.List; + +/** + * @author : Liu Jianping + * @date : 2019/9/20 + */ + +public class MappingRuleFactory { + private static final MappingRuleFactory instance = new MappingRuleFactory(); + + public static MappingRuleFactory getInstance() { + return instance; + } + + public MappingRule create(Configuration config, ExportType exportType) { + MappingRule rule = new MappingRule(); + + rule.setType(exportType); + List configurationList = config.getListConfiguration(Key.COLUMN); + for (Configuration column : configurationList) { + ColumnType columnType; + try { + columnType = ColumnType.valueOf(column.getString(Key.COLUMN_NODE_TYPE)); + } catch (NullPointerException | IllegalArgumentException e) { + throw DataXException.asDataXException(GdbReaderErrorCode.BAD_CONFIG_VALUE, Key.COLUMN_NODE_TYPE); + } + + if (exportType == ExportType.VERTEX) { + // only id/label/property column allow when vertex + ConfigHelper.assertConfig(Key.COLUMN_NODE_TYPE, () -> + columnType == ColumnType.primaryKey || columnType == ColumnType.primaryLabel + || columnType == ColumnType.vertexProperty || columnType == ColumnType.vertexJsonProperty); + } else if (exportType == ExportType.EDGE) { + // edge + ConfigHelper.assertConfig(Key.COLUMN_NODE_TYPE, () -> + columnType == ColumnType.primaryKey || columnType == ColumnType.primaryLabel + || columnType == ColumnType.srcPrimaryKey || columnType == ColumnType.srcPrimaryLabel + || columnType == ColumnType.dstPrimaryKey || columnType == ColumnType.dstPrimaryLabel + || columnType == ColumnType.edgeProperty || columnType == ColumnType.edgeJsonProperty); + } + + if (columnType == ColumnType.edgeProperty || columnType == ColumnType.vertexProperty) { + String name = column.getString(Key.COLUMN_NAME); + ValueType propType = ValueType.fromShortName(column.getString(Key.COLUMN_TYPE)); + + ConfigHelper.assertConfig(Key.COLUMN_NAME, () -> name != null); + if (propType == null) { + throw DataXException.asDataXException(GdbReaderErrorCode.UNSUPPORTED_TYPE, Key.COLUMN_TYPE); + } + rule.addColumn(columnType, propType, name); + } else if (columnType == ColumnType.vertexJsonProperty || columnType == ColumnType.edgeJsonProperty) { + rule.addJsonColumn(columnType); + } else { + rule.addColumn(columnType, ValueType.STRING, null); + } + } + return rule; + } +} diff --git a/gdbreader/src/main/java/com/alibaba/datax/plugin/reader/gdbreader/mapping/ValueType.java b/gdbreader/src/main/java/com/alibaba/datax/plugin/reader/gdbreader/mapping/ValueType.java new file mode 100644 index 00000000..826e0493 --- /dev/null +++ b/gdbreader/src/main/java/com/alibaba/datax/plugin/reader/gdbreader/mapping/ValueType.java @@ -0,0 +1,128 @@ +/* + * (C) 2019-present Alibaba Group Holding Limited. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + */ +package com.alibaba.datax.plugin.reader.gdbreader.mapping; + +import com.alibaba.datax.common.element.BoolColumn; +import com.alibaba.datax.common.element.Column; +import com.alibaba.datax.common.element.DoubleColumn; +import com.alibaba.datax.common.element.LongColumn; +import com.alibaba.datax.common.element.StringColumn; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.Function; + +/** + * @author : Liu Jianping + * @date : 2019/9/6 + */ + +public enum ValueType { + /** + * transfer gdb element object value to DataX Column data + *

+ * int, long -> LongColumn + * float, double -> DoubleColumn + * bool -> BooleanColumn + * string -> StringColumn + */ + INT(Integer.class, "int", ValueTypeHolder::longColumnMapper), + INTEGER(Integer.class, "integer", ValueTypeHolder::longColumnMapper), + LONG(Long.class, "long", ValueTypeHolder::longColumnMapper), + DOUBLE(Double.class, "double", ValueTypeHolder::doubleColumnMapper), + FLOAT(Float.class, "float", ValueTypeHolder::doubleColumnMapper), + BOOLEAN(Boolean.class, "boolean", ValueTypeHolder::boolColumnMapper), + STRING(String.class, "string", ValueTypeHolder::stringColumnMapper), + ; + + private Class type = null; + private String shortName = null; + private Function columnFunc = null; + + ValueType(Class type, String name, Function columnFunc) { + this.type = type; + this.shortName = name; + this.columnFunc = columnFunc; + + ValueTypeHolder.shortName2type.put(shortName, this); + } + + public static ValueType fromShortName(String name) { + return ValueTypeHolder.shortName2type.get(name); + } + + public Column applyObject(Object value) { + if (value == null) { + return null; + } + return columnFunc.apply(value); + } + + private static class ValueTypeHolder { + private static Map shortName2type = new HashMap<>(); + + private static LongColumn longColumnMapper(Object o) { + long v; + if (o instanceof Integer) { + v = (int) o; + } else if (o instanceof Long) { + v = (long) o; + } else if (o instanceof String) { + v = Long.valueOf((String) o); + } else { + throw new RuntimeException("Failed to cast " + o.getClass() + " to Long"); + } + + return new LongColumn(v); + } + + private static DoubleColumn doubleColumnMapper(Object o) { + double v; + if (o instanceof Integer) { + v = (double) (int) o; + } else if (o instanceof Long) { + v = (double) (long) o; + } else if (o instanceof Float) { + v = (double) (float) o; + } else if (o instanceof Double) { + v = (double) o; + } else if (o instanceof String) { + v = Double.valueOf((String) o); + } else { + throw new RuntimeException("Failed to cast " + o.getClass() + " to Double"); + } + + return new DoubleColumn(v); + } + + private static BoolColumn boolColumnMapper(Object o) { + boolean v; + if (o instanceof Integer) { + v = ((int) o != 0); + } else if (o instanceof Long) { + v = ((long) o != 0); + } else if (o instanceof Boolean) { + v = (boolean) o; + } else if (o instanceof String) { + v = Boolean.valueOf((String) o); + } else { + throw new RuntimeException("Failed to cast " + o.getClass() + " to Boolean"); + } + + return new BoolColumn(v); + } + + private static StringColumn stringColumnMapper(Object o) { + if (o instanceof String) { + return new StringColumn((String) o); + } else { + return new StringColumn(String.valueOf(o)); + } + } + } +} diff --git a/gdbreader/src/main/java/com/alibaba/datax/plugin/reader/gdbreader/model/AbstractGdbGraph.java b/gdbreader/src/main/java/com/alibaba/datax/plugin/reader/gdbreader/model/AbstractGdbGraph.java new file mode 100644 index 00000000..4eda2eed --- /dev/null +++ b/gdbreader/src/main/java/com/alibaba/datax/plugin/reader/gdbreader/model/AbstractGdbGraph.java @@ -0,0 +1,89 @@ +/* + * (C) 2019-present Alibaba Group Holding Limited. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + */ +package com.alibaba.datax.plugin.reader.gdbreader.model; + +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.reader.gdbreader.Key; +import org.apache.tinkerpop.gremlin.driver.Client; +import org.apache.tinkerpop.gremlin.driver.Cluster; +import org.apache.tinkerpop.gremlin.driver.RequestOptions; +import org.apache.tinkerpop.gremlin.driver.Result; +import org.apache.tinkerpop.gremlin.driver.ResultSet; +import org.apache.tinkerpop.gremlin.driver.ser.Serializers; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * @author : Liu Jianping + * @date : 2019/9/6 + */ + +public abstract class AbstractGdbGraph implements GdbGraph { + final static int DEFAULT_TIMEOUT = 30000; + private static final Logger log = LoggerFactory.getLogger(AbstractGdbGraph.class); + private Client client; + + AbstractGdbGraph() { + } + + AbstractGdbGraph(Configuration config) { + log.info("init graphdb client"); + String host = config.getString(Key.HOST); + int port = config.getInt(Key.PORT); + String username = config.getString(Key.USERNAME); + String password = config.getString(Key.PASSWORD); + + try { + Cluster cluster = Cluster.build(host).port(port).credentials(username, password) + .serializer(Serializers.GRAPHBINARY_V1D0) + .maxContentLength(1024 * 1024) + .resultIterationBatchSize(64) + .create(); + client = cluster.connect().init(); + + warmClient(); + } catch (RuntimeException e) { + log.error("Failed to connect to GDB {}:{}, due to {}", host, port, e); + throw e; + } + } + + protected List runInternal(String dsl, Map params) throws Exception { + return runInternalAsync(dsl, params).all().get(DEFAULT_TIMEOUT + 1000, TimeUnit.MILLISECONDS); + } + + protected ResultSet runInternalAsync(String dsl, Map params) throws Exception { + RequestOptions.Builder options = RequestOptions.build().timeout(DEFAULT_TIMEOUT); + if (params != null && !params.isEmpty()) { + params.forEach(options::addParameter); + } + return client.submitAsync(dsl, options.create()).get(DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS); + } + + private void warmClient() { + try { + runInternal("g.V('test')", null); + log.info("warm graphdb client over"); + } catch (Exception e) { + log.error("warmClient error"); + throw new RuntimeException(e); + } + } + + @Override + public void close() throws Exception { + if (client != null) { + log.info("close graphdb client"); + client.close(); + } + } +} diff --git a/gdbreader/src/main/java/com/alibaba/datax/plugin/reader/gdbreader/model/GdbElement.java b/gdbreader/src/main/java/com/alibaba/datax/plugin/reader/gdbreader/model/GdbElement.java new file mode 100644 index 00000000..79619ad0 --- /dev/null +++ b/gdbreader/src/main/java/com/alibaba/datax/plugin/reader/gdbreader/model/GdbElement.java @@ -0,0 +1,39 @@ +/* + * (C) 2019-present Alibaba Group Holding Limited. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + */ +package com.alibaba.datax.plugin.reader.gdbreader.model; + +import lombok.Data; + +import java.util.HashMap; +import java.util.Map; + +/** + * @author : Liu Jianping + * @date : 2019/9/6 + */ + +@Data +public class GdbElement { + String id = null; + String label = null; + String to = null; + String from = null; + String toLabel = null; + String fromLabel = null; + + Map properties = new HashMap<>(); + + public GdbElement() { + } + + public GdbElement(String id, String label) { + this.id = id; + this.label = label; + } + +} diff --git a/gdbreader/src/main/java/com/alibaba/datax/plugin/reader/gdbreader/model/GdbGraph.java b/gdbreader/src/main/java/com/alibaba/datax/plugin/reader/gdbreader/model/GdbGraph.java new file mode 100644 index 00000000..e6651293 --- /dev/null +++ b/gdbreader/src/main/java/com/alibaba/datax/plugin/reader/gdbreader/model/GdbGraph.java @@ -0,0 +1,65 @@ +/* + * (C) 2019-present Alibaba Group Holding Limited. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + */ +package com.alibaba.datax.plugin.reader.gdbreader.model; + +import org.apache.tinkerpop.gremlin.driver.ResultSet; + +import java.util.List; +import java.util.Map; + +/** + * @author : Liu Jianping + * @date : 2019/9/6 + */ + +public interface GdbGraph extends AutoCloseable { + + /** + * Get All labels of GraphDB + * + * @return labels map included numbers + */ + Map getLabels(); + + /** + * Get the Ids list of special 'label', size up to 'limit' + * + * @param label is Label of Vertex or Edge + * @param start of Ids range to get + * @param limit size of Ids list + * @return Ids list + */ + List fetchIds(String label, String start, long limit); + + /** + * Fetch element in async mode, just send query dsl to server + * + * @param label node label to filter + * @param start range begin(included) + * @param end range end(included) + * @param propNames propKey list to fetch + * @return future to get result later + */ + ResultSet fetchElementsAsync(String label, String start, String end, List propNames); + + /** + * Get get element from Response @{ResultSet} + * + * @param results Response of Server + * @return element sets + */ + List getElement(ResultSet results); + + /** + * close graph client + * + * @throws Exception if fails + */ + @Override + void close() throws Exception; +} diff --git a/gdbreader/src/main/java/com/alibaba/datax/plugin/reader/gdbreader/model/ScriptGdbGraph.java b/gdbreader/src/main/java/com/alibaba/datax/plugin/reader/gdbreader/model/ScriptGdbGraph.java new file mode 100644 index 00000000..8c08b819 --- /dev/null +++ b/gdbreader/src/main/java/com/alibaba/datax/plugin/reader/gdbreader/model/ScriptGdbGraph.java @@ -0,0 +1,192 @@ +/* + * (C) 2019-present Alibaba Group Holding Limited. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + */ +package com.alibaba.datax.plugin.reader.gdbreader.model; + +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.reader.gdbreader.Key.ExportType; +import org.apache.tinkerpop.gremlin.driver.Result; +import org.apache.tinkerpop.gremlin.driver.ResultSet; +import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceEdge; +import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceVertex; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * @author : Liu Jianping + * @date : 2019/9/6 + */ + +public class ScriptGdbGraph extends AbstractGdbGraph { + private static final Logger log = LoggerFactory.getLogger(ScriptGdbGraph.class); + + private final static String LABEL = "GDB___LABEL"; + private final static String START_ID = "GDB___ID"; + private final static String END_ID = "GDB___ID_END"; + private final static String LIMIT = "GDB___LIMIT"; + + private final static String FETCH_VERTEX_IDS_DSL = "g.V().hasLabel(" + LABEL + ").has(id, gt(" + START_ID + ")).limit(" + LIMIT + ").id()"; + private final static String FETCH_EDGE_IDS_DSL = "g.E().hasLabel(" + LABEL + ").has(id, gt(" + START_ID + ")).limit(" + LIMIT + ").id()"; + + private final static String FETCH_VERTEX_LABELS_DSL = "g.V().groupCount().by(label)"; + private final static String FETCH_EDGE_LABELS_DSL = "g.E().groupCount().by(label)"; + + /** + * fetch node range [START_ID, END_ID] + */ + private final static String FETCH_RANGE_VERTEX_DSL = "g.V().hasLabel(" + LABEL + ").has(id, gte(" + START_ID + ")).has(id, lte(" + END_ID + "))"; + private final static String FETCH_RANGE_EDGE_DSL = "g.E().hasLabel(" + LABEL + ").has(id, gte(" + START_ID + ")).has(id, lte(" + END_ID + "))"; + private final static String PART_WITH_PROP_DSL = ".as('a').project('node', 'props').by(select('a')).by(select('a').propertyMap("; + + private final ExportType exportType; + + public ScriptGdbGraph(ExportType exportType) { + super(); + this.exportType = exportType; + } + + public ScriptGdbGraph(Configuration config, ExportType exportType) { + super(config); + this.exportType = exportType; + } + + @Override + public List fetchIds(final String label, final String start, long limit) { + Map params = new HashMap(3) {{ + put(LABEL, label); + put(START_ID, start); + put(LIMIT, limit); + }}; + String fetchDsl = exportType == ExportType.VERTEX ? FETCH_VERTEX_IDS_DSL : FETCH_EDGE_IDS_DSL; + + List ids = new ArrayList<>(); + try { + List results = runInternal(fetchDsl, params); + + // transfer result to id string + results.forEach(id -> ids.add(id.getString())); + } catch (Exception e) { + log.error("fetch range node failed, label {}, start {}", label, start); + throw new RuntimeException(e); + } + return ids; + } + + @Override + public ResultSet fetchElementsAsync(final String label, final String start, final String end, final List propNames) { + Map params = new HashMap<>(3); + params.put(LABEL, label); + params.put(START_ID, start); + params.put(END_ID, end); + + String prefixDsl = exportType == ExportType.VERTEX ? FETCH_RANGE_VERTEX_DSL : FETCH_RANGE_EDGE_DSL; + StringBuilder fetchDsl = new StringBuilder(prefixDsl); + if (propNames != null) { + fetchDsl.append(PART_WITH_PROP_DSL); + for (int i = 0; i < propNames.size(); i++) { + String propName = "GDB___PK" + String.valueOf(i); + params.put(propName, propNames.get(i)); + + fetchDsl.append(propName); + if (i != propNames.size() - 1) { + fetchDsl.append(", "); + } + } + fetchDsl.append("))"); + } + + try { + return runInternalAsync(fetchDsl.toString(), params); + } catch (Exception e) { + log.error("Failed to fetch range node startId {}, end {} , e {}", start, end, e); + throw new RuntimeException(e); + } + } + + @Override + @SuppressWarnings("unchecked") + public List getElement(ResultSet results) { + List elements = new LinkedList<>(); + try { + List resultList = results.all().get(DEFAULT_TIMEOUT + 1000, TimeUnit.MILLISECONDS); + + resultList.forEach(n -> { + Object o = n.getObject(); + GdbElement element = new GdbElement(); + if (o instanceof Map) { + // project response + Object node = ((Map) o).get("node"); + Object props = ((Map) o).get("props"); + + mapNodeToElement(node, element); + mapPropToElement((Map) props, element); + } else { + // range node response + mapNodeToElement(n.getObject(), element); + } + if (element.getId() != null) { + elements.add(element); + } + }); + } catch (Exception e) { + log.error("Failed to get node: {}", e); + throw new RuntimeException(e); + } + return elements; + } + + private void mapNodeToElement(Object node, GdbElement element) { + if (node instanceof ReferenceVertex) { + ReferenceVertex v = (ReferenceVertex) node; + + element.setId((String) v.id()); + element.setLabel(v.label()); + } else if (node instanceof ReferenceEdge) { + ReferenceEdge e = (ReferenceEdge) node; + + element.setId((String) e.id()); + element.setLabel(e.label()); + element.setTo((String) e.inVertex().id()); + element.setToLabel(e.inVertex().label()); + element.setFrom((String) e.outVertex().id()); + element.setFromLabel(e.outVertex().label()); + } + } + + private void mapPropToElement(Map props, GdbElement element) { + element.setProperties(props); + } + + @Override + public Map getLabels() { + String dsl = exportType == ExportType.VERTEX ? FETCH_VERTEX_LABELS_DSL : FETCH_EDGE_LABELS_DSL; + + try { + List results = runInternal(dsl, null); + Map labelMap = new HashMap<>(2); + + Map labels = results.get(0).get(Map.class); + labels.forEach((k, v) -> { + String label = (String) k; + Long count = (Long) v; + labelMap.put(label, count); + }); + + return labelMap; + } catch (Exception e) { + log.error("Failed to fetch label list, please give special labels and run again, e {}", e); + throw new RuntimeException(e); + } + } +} diff --git a/gdbreader/src/main/java/com/alibaba/datax/plugin/reader/gdbreader/util/ConfigHelper.java b/gdbreader/src/main/java/com/alibaba/datax/plugin/reader/gdbreader/util/ConfigHelper.java new file mode 100644 index 00000000..2ec9d153 --- /dev/null +++ b/gdbreader/src/main/java/com/alibaba/datax/plugin/reader/gdbreader/util/ConfigHelper.java @@ -0,0 +1,77 @@ +/* + * (C) 2019-present Alibaba Group Holding Limited. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + */ +package com.alibaba.datax.plugin.reader.gdbreader.util; + +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.reader.gdbreader.GdbReaderErrorCode; +import com.alibaba.datax.plugin.reader.gdbreader.Key; +import org.apache.commons.lang3.StringUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Supplier; + +/** + * @author : Liu Jianping + * @date : 2019/9/6 + */ + +public interface ConfigHelper { + static void assertConfig(String key, Supplier f) { + if (!f.get()) { + throw DataXException.asDataXException(GdbReaderErrorCode.BAD_CONFIG_VALUE, key); + } + } + + static void assertHasContent(Configuration config, String key) { + assertConfig(key, () -> StringUtils.isNotBlank(config.getString(key))); + } + + static void assertGdbClient(Configuration config) { + assertHasContent(config, Key.HOST); + assertConfig(Key.PORT, () -> config.getInt(Key.PORT) > 0); + + assertHasContent(config, Key.USERNAME); + assertHasContent(config, Key.PASSWORD); + } + + static List assertLabels(Configuration config) { + Object labels = config.get(Key.LABEL); + if (!(labels instanceof List)) { + throw DataXException.asDataXException(GdbReaderErrorCode.BAD_CONFIG_VALUE, "labels should be List"); + } + + List list = (List) labels; + List configLabels = new ArrayList<>(0); + list.forEach(n -> configLabels.add(String.valueOf(n))); + + return configLabels; + } + + static List splitConfig(Configuration config, List labels) { + List configs = new ArrayList<>(); + for (String label : labels) { + Configuration conf = config.clone(); + conf.set(Key.LABEL, label); + + configs.add(conf); + } + return configs; + } + + static Configuration fromClasspath(String name) { + try (InputStream is = Thread.currentThread().getContextClassLoader().getResourceAsStream(name)) { + return Configuration.from(is); + } catch (IOException e) { + throw new IllegalArgumentException("File not found: " + name); + } + } +} diff --git a/gdbreader/src/main/resources/plugin.json b/gdbreader/src/main/resources/plugin.json new file mode 100644 index 00000000..4fedfa24 --- /dev/null +++ b/gdbreader/src/main/resources/plugin.json @@ -0,0 +1,6 @@ +{ + "name": "gdbreader", + "class": "com.alibaba.datax.plugin.reader.gdbreader.GdbReader", + "description": "useScene: prod. mechanism: connect GDB with gremlin-client, execute 'g.V().propertyMap() or g.E().propertyMap()' to get record", + "developer": "alibaba" +} \ No newline at end of file diff --git a/gdbreader/src/main/resources/plugin_job_template.json b/gdbreader/src/main/resources/plugin_job_template.json new file mode 100644 index 00000000..bfca7780 --- /dev/null +++ b/gdbreader/src/main/resources/plugin_job_template.json @@ -0,0 +1,77 @@ +{ + "job": { + "setting": { + "speed": { + "channel": 1 + }, + "errorLimit": { + "record": 1 + } + }, + + "content": [ + { + "reader": { + "name": "gdbreader", + "parameter": { + "host": "10.218.145.24", + "port": 8182, + "username": "***", + "password": "***", + "labelType": "EDGE", + "labels": ["label1", "label2"], + "column": [ + { + "name": "id", + "type": "string", + "columnType": "primaryKey" + }, + { + "name": "label", + "type": "string", + "columnType": "primaryLabel" + }, + { + "name": "srcId", + "type": "string", + "columnType": "srcPrimaryKey" + }, + { + "name": "srcLabel", + "type": "string", + "columnType": "srcPrimaryLabel" + }, + { + "name": "dstId", + "type": "string", + "columnType": "srcPrimaryKey" + }, + { + "name": "dstLabel", + "type": "string", + "columnType": "srcPrimaryLabel" + }, + { + "name": "name", + "type": "string", + "columnType": "edgeProperty" + }, + { + "name": "weight", + "type": "double", + "columnType": "edgeProperty" + } + ] + } + }, + + "writer": { + "name": "streamwriter", + "parameter": { + "print": true + } + } + } + ] + } +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index d720533a..a52ed8e1 100755 --- a/pom.xml +++ b/pom.xml @@ -66,6 +66,7 @@ tsdbreader opentsdbreader cassandrareader + gdbreader mysqlwriter