diff --git a/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/Neo4jClient.java b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/Neo4jClient.java index c9cbb060..fabcd0aa 100644 --- a/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/Neo4jClient.java +++ b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/Neo4jClient.java @@ -9,7 +9,7 @@ import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.common.util.RetryUtil; import com.alibaba.datax.plugin.writer.neo4jwriter.adapter.DateAdapter; import com.alibaba.datax.plugin.writer.neo4jwriter.adapter.ValueAdapter; -import com.alibaba.datax.plugin.writer.neo4jwriter.config.Neo4jField; +import com.alibaba.datax.plugin.writer.neo4jwriter.config.Neo4jProperty; import com.alibaba.datax.plugin.writer.neo4jwriter.exception.Neo4jErrorCode; import com.alibaba.fastjson2.JSON; import org.apache.commons.lang3.StringUtils; @@ -52,7 +52,6 @@ public class Neo4jClient { public void init() { String database = writeConfig.database; //neo4j 3.x 没有数据库 - //neo4j 3.x no database if (null != database && !"".equals(database)) { this.session = driver.session(SessionConfig.forDatabase(database)); } else { @@ -67,12 +66,12 @@ public class Neo4jClient { String database = config.getString(DATABASE.getKey()); String batchVariableName = config.getString(BATCH_DATA_VARIABLE_NAME.getKey(), BATCH_DATA_VARIABLE_NAME.getDefaultValue()); - List neo4jFields = JSON.parseArray(config.getString(NEO4J_FIELDS.getKey()), Neo4jField.class); + List neo4jProperties = JSON.parseArray(config.getString(NEO4J_PROPERTIES.getKey()), Neo4jProperty.class); int batchSize = config.getInt(BATCH_SIZE.getKey(), BATCH_SIZE.getDefaultValue()); int retryTimes = config.getInt(RETRY_TIMES.getKey(), RETRY_TIMES.getDefaultValue()); return new Neo4jClient(driver, - new WriteConfig(cypher, database, batchVariableName, neo4jFields, batchSize), + new WriteConfig(cypher, database, batchVariableName, neo4jProperties, batchSize), new RetryConfig(retryTimes, config.getLong(RETRY_SLEEP_MILLS.getKey(), RETRY_SLEEP_MILLS.getDefaultValue())), taskPluginCollector ); @@ -176,7 +175,6 @@ public class Neo4jClient { }, this.retryConfig.retryTimes, retryConfig.retrySleepMills, true, Collections.singletonList(Neo4jException.class)); } catch (Exception e) { - LOGGER.error("在写入数据库时发生了异常,原因是:{}", e.getMessage()); LOGGER.error("an exception occurred while writing to the database,message:{}", e.getMessage()); throw DataXException.asDataXException(DATABASE_ERROR, e.getMessage()); } @@ -200,26 +198,18 @@ public class Neo4jClient { private MapValue checkAndConvert(Record record) { int sourceColNum = record.getColumnNumber(); - List neo4jFields = writeConfig.neo4jFields; - if (sourceColNum < neo4jFields.size()) { - LOGGER.warn("接收到的数据列少于neo4jWriter企图消费的数据列,请注意风险,这可能导致数据不匹配"); - LOGGER.warn("Receive fewer data columns than neo4jWriter attempts to consume, " + - "be aware of the risk that this may result in data mismatch"); - LOGGER.warn("接收到的数据是:" + record); - LOGGER.warn("received data is:" + record); - } + List neo4JProperties = writeConfig.neo4jProperties; - int len = Math.min(sourceColNum, neo4jFields.size()); + int len = Math.min(sourceColNum, neo4JProperties.size()); Map data = new HashMap<>(len * 4 / 3); for (int i = 0; i < len; i++) { Column column = record.getColumn(i); - Neo4jField neo4jField = neo4jFields.get(i); + Neo4jProperty neo4jProperty = neo4JProperties.get(i); try { - Value value = ValueAdapter.column2Value(column, neo4jField); - data.put(neo4jField.getFieldName(), value); + Value value = ValueAdapter.column2Value(column, neo4jProperty); + data.put(neo4jProperty.getName(), value); } catch (Exception e) { - LOGGER.info("检测到一条脏数据:{},原因:{}", column, e.getMessage()); LOGGER.info("dirty record:{},message :{}", column, e.getMessage()); this.taskPluginCollector.collectDirtyRecord(record, e.getMessage()); } @@ -227,8 +217,8 @@ public class Neo4jClient { return new MapValue(data); } - public List getNeo4jFields() { - return this.writeConfig.neo4jFields; + public List getNeo4jFields() { + return this.writeConfig.neo4jProperties; } @@ -249,19 +239,19 @@ public class Neo4jClient { String batchVariableName; - List neo4jFields; + List neo4jProperties; int batchSize; public WriteConfig(String cypher, String database, String batchVariableName, - List neo4jFields, + List neo4jProperties, int batchSize) { this.cypher = cypher; this.database = database; this.batchVariableName = batchVariableName; - this.neo4jFields = neo4jFields; + this.neo4jProperties = neo4jProperties; this.batchSize = batchSize; } diff --git a/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/Neo4jWriter.java b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/Neo4jWriter.java index 9a7b62ee..a89f4674 100644 --- a/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/Neo4jWriter.java +++ b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/Neo4jWriter.java @@ -17,7 +17,7 @@ public class Neo4jWriter extends Writer { private Configuration jobConf = null; @Override public void init() { - LOGGER.info("Neo4jWriter Job init Success"); + LOGGER.info("Neo4jWriter Job init success"); } @Override diff --git a/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/adapter/DateAdapter.java b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/adapter/DateAdapter.java index c69c92fd..51b214bd 100644 --- a/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/adapter/DateAdapter.java +++ b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/adapter/DateAdapter.java @@ -1,7 +1,7 @@ package com.alibaba.datax.plugin.writer.neo4jwriter.adapter; -import com.alibaba.datax.plugin.writer.neo4jwriter.config.Neo4jField; +import com.alibaba.datax.plugin.writer.neo4jwriter.config.Neo4jProperty; import org.testcontainers.shaded.com.google.common.base.Supplier; import java.time.LocalDate; @@ -21,12 +21,12 @@ public class DateAdapter { private static final String DEFAULT_LOCAL_DATE_TIME_FORMATTER = "yyyy-MM-dd HH:mm:ss"; - public static LocalDate localDate(String text, Neo4jField neo4jField) { + public static LocalDate localDate(String text, Neo4jProperty neo4jProperty) { if (LOCAL_DATE_FORMATTER_MAP.get() != null) { return LocalDate.parse(text, LOCAL_DATE_FORMATTER_MAP.get()); } - String format = getOrDefault(neo4jField::getDateFormat, DEFAULT_LOCAL_DATE_FORMATTER); + String format = getOrDefault(neo4jProperty::getDateFormat, DEFAULT_LOCAL_DATE_FORMATTER); DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(format); LOCAL_DATE_FORMATTER_MAP.set(dateTimeFormatter); return LocalDate.parse(text, dateTimeFormatter); @@ -47,22 +47,22 @@ public class DateAdapter { LOCAL_DATE_TIME_FORMATTER_MAP.remove(); } - public static LocalTime localTime(String text, Neo4jField neo4jField) { + public static LocalTime localTime(String text, Neo4jProperty neo4JProperty) { if (LOCAL_TIME_FORMATTER_MAP.get() != null) { return LocalTime.parse(text, LOCAL_TIME_FORMATTER_MAP.get()); } - String format = getOrDefault(neo4jField::getDateFormat, DEFAULT_LOCAL_TIME_FORMATTER); + String format = getOrDefault(neo4JProperty::getDateFormat, DEFAULT_LOCAL_TIME_FORMATTER); DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(format); LOCAL_TIME_FORMATTER_MAP.set(dateTimeFormatter); return LocalTime.parse(text, dateTimeFormatter); } - public static LocalDateTime localDateTime(String text, Neo4jField neo4jField) { + public static LocalDateTime localDateTime(String text, Neo4jProperty neo4JProperty) { if (LOCAL_DATE_TIME_FORMATTER_MAP.get() != null){ return LocalDateTime.parse(text,LOCAL_DATE_TIME_FORMATTER_MAP.get()); } - String format = getOrDefault(neo4jField::getDateFormat, DEFAULT_LOCAL_DATE_TIME_FORMATTER); + String format = getOrDefault(neo4JProperty::getDateFormat, DEFAULT_LOCAL_DATE_TIME_FORMATTER); DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(format); LOCAL_DATE_TIME_FORMATTER_MAP.set(dateTimeFormatter); return LocalDateTime.parse(text, dateTimeFormatter); diff --git a/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/config/Neo4jField.java b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/config/Neo4jField.java deleted file mode 100644 index 904a2f13..00000000 --- a/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/config/Neo4jField.java +++ /dev/null @@ -1,108 +0,0 @@ -package com.alibaba.datax.plugin.writer.neo4jwriter.config; - -import com.alibaba.datax.plugin.writer.neo4jwriter.element.FieldType; - -import java.util.Arrays; -import java.util.List; - -/** - * 由于dataX并不能传输数据的元数据,所以只能在writer端定义每列数据的名字 - * datax does not support data metadata, - * only the name of each column of data can be defined on neo4j writer - * @author fuyouj - */ -public class Neo4jField { - public static final String DEFAULT_SPLIT = ","; - public static final List DEFAULT_ARRAY_TRIM = Arrays.asList('[',']'); - - /** - * name of neo4j field - */ - private String fieldName; - - /** - * neo4j type - * reference by org.neo4j.driver.Values - */ - private FieldType fieldType; - - /** - * for date - */ - private String dateFormat; - - /** - * for array type - */ - private String split; - /** - * such as [1,2,3,4,5] - * split is , - * arrayTrimChar is [ ] - */ - private List arrayTrimChars; - - public Neo4jField(){} - - public Neo4jField(String fieldName, FieldType fieldType, String format, String split, List arrayTrimChars) { - this.fieldName = fieldName; - this.fieldType = fieldType; - this.dateFormat = format; - this.split = split; - this.arrayTrimChars = arrayTrimChars; - } - - public String getFieldName() { - return fieldName; - } - - public void setFieldName(String fieldName) { - this.fieldName = fieldName; - } - - public FieldType getFieldType() { - return fieldType; - } - - public void setFieldType(FieldType fieldType) { - this.fieldType = fieldType; - } - - public String getDateFormat() { - return dateFormat; - } - - public void setDateFormat(String dateFormat) { - this.dateFormat = dateFormat; - } - - public String getSplit() { - return getSplitOrDefault(); - } - - public String getSplitOrDefault(){ - if (split == null || "".equals(split)){ - return DEFAULT_SPLIT; - } - return split; - } - - public void setSplit(String split) { - this.split = split; - } - - public List getArrayTrimChars() { - return getArrayTrimOrDefault(); - } - - public List getArrayTrimOrDefault(){ - if (arrayTrimChars == null || arrayTrimChars.isEmpty()){ - return DEFAULT_ARRAY_TRIM; - } - return arrayTrimChars; - } - - public void setArrayTrimChars(List arrayTrimChars) { - this.arrayTrimChars = arrayTrimChars; - } -} diff --git a/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/config/Neo4jProperty.java b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/config/Neo4jProperty.java new file mode 100644 index 00000000..5c5867b3 --- /dev/null +++ b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/config/Neo4jProperty.java @@ -0,0 +1,82 @@ +package com.alibaba.datax.plugin.writer.neo4jwriter.config; + +/** + * 由于dataX并不能传输数据的元数据,所以只能在writer端定义每列数据的名字 + * datax does not support data metadata, + * only the name of each column of data can be defined on neo4j writer + * + * @author fuyouj + */ +public class Neo4jProperty { + public static final String DEFAULT_SPLIT = ","; + + /** + * name of neo4j field + */ + private String name; + + /** + * neo4j type + * reference by org.neo4j.driver.Values + */ + private String type; + + /** + * for date + */ + private String dateFormat; + + /** + * for array type + */ + private String split; + + public Neo4jProperty() { + } + + public Neo4jProperty(String name, String type, String format, String split) { + this.name = name; + this.type = type; + this.dateFormat = format; + this.split = split; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public String getDateFormat() { + return dateFormat; + } + + public void setDateFormat(String dateFormat) { + this.dateFormat = dateFormat; + } + + public String getSplit() { + return getSplitOrDefault(); + } + + public String getSplitOrDefault() { + if (split == null || "".equals(split)) { + return DEFAULT_SPLIT; + } + return split; + } + + public void setSplit(String split) { + this.split = split; + } +} diff --git a/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/element/FieldType.java b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/element/PropertyType.java similarity index 56% rename from neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/element/FieldType.java rename to neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/element/PropertyType.java index 3a4bdbdc..b3446de7 100644 --- a/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/element/FieldType.java +++ b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/element/PropertyType.java @@ -1,10 +1,12 @@ package com.alibaba.datax.plugin.writer.neo4jwriter.element; +import java.util.Arrays; + /** * @see org.neo4j.driver.Values * @author fuyouj */ -public enum FieldType { +public enum PropertyType { NULL, BOOLEAN, STRING, @@ -27,6 +29,12 @@ public enum FieldType { SHORT_ARRAY, DOUBLE_ARRAY, FLOAT_ARRAY, - Object_ARRAY + Object_ARRAY; + public static PropertyType fromStrIgnoreCase(String typeStr) { + return Arrays.stream(PropertyType.values()) + .filter(e -> e.name().equalsIgnoreCase(typeStr)) + .findFirst() + .orElse(PropertyType.STRING); + } } diff --git a/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/exception/Neo4jErrorCode.java b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/exception/Neo4jErrorCode.java index 6e531528..d7df79ff 100644 --- a/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/exception/Neo4jErrorCode.java +++ b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/exception/Neo4jErrorCode.java @@ -9,7 +9,7 @@ public enum Neo4jErrorCode implements ErrorCode { * Invalid configuration * 配置校验异常 */ - CONFIG_INVALID("NEO4J_ERROR_01","Invalid configuration"), + CONFIG_INVALID("NEO4J_ERROR_01","invalid configuration"), /** * database error * 在执行写入到数据库时抛出的异常,可能是权限异常,也可能是连接超时,或者是配置到了从节点。 diff --git a/neo4jwriter/src/test/java/com/alibaba/datax/plugin/writer/Neo4jWriterTest.java b/neo4jwriter/src/test/java/com/alibaba/datax/plugin/writer/Neo4jWriterTest.java index a132d4fd..1a2ca24c 100644 --- a/neo4jwriter/src/test/java/com/alibaba/datax/plugin/writer/Neo4jWriterTest.java +++ b/neo4jwriter/src/test/java/com/alibaba/datax/plugin/writer/Neo4jWriterTest.java @@ -7,7 +7,8 @@ import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.plugin.writer.mock.MockRecord; import com.alibaba.datax.plugin.writer.mock.MockUtil; import com.alibaba.datax.plugin.writer.neo4jwriter.Neo4jClient; -import com.alibaba.datax.plugin.writer.neo4jwriter.config.Neo4jField; +import com.alibaba.datax.plugin.writer.neo4jwriter.config.Neo4jProperty; +import com.alibaba.datax.plugin.writer.neo4jwriter.element.PropertyType; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -225,10 +226,10 @@ public class Neo4jWriterTest { } - private Record mockAllTypeFieldTestNode(List neo4jFields) { + private Record mockAllTypeFieldTestNode(List neo4JProperties) { Record mock = new MockRecord(); - for (Neo4jField field : neo4jFields) { - mock.addColumn(MockUtil.mockColumnByType(field.getFieldType())); + for (Neo4jProperty field : neo4JProperties) { + mock.addColumn(MockUtil.mockColumnByType(PropertyType.fromStrIgnoreCase(field.getType()))); } return mock; } diff --git a/neo4jwriter/src/test/java/com/alibaba/datax/plugin/writer/mock/MockUtil.java b/neo4jwriter/src/test/java/com/alibaba/datax/plugin/writer/mock/MockUtil.java index 5fd71135..8f05f1e8 100644 --- a/neo4jwriter/src/test/java/com/alibaba/datax/plugin/writer/mock/MockUtil.java +++ b/neo4jwriter/src/test/java/com/alibaba/datax/plugin/writer/mock/MockUtil.java @@ -2,7 +2,7 @@ package com.alibaba.datax.plugin.writer.mock; import com.alibaba.datax.common.element.*; -import com.alibaba.datax.plugin.writer.neo4jwriter.element.FieldType; +import com.alibaba.datax.plugin.writer.neo4jwriter.element.PropertyType; import com.alibaba.fastjson2.JSON; import java.time.LocalDate; @@ -13,7 +13,7 @@ import java.util.Random; public class MockUtil { - public static Column mockColumnByType(FieldType type) { + public static Column mockColumnByType(PropertyType type) { Random random = new Random(); switch (type) { case SHORT: