配置field改名properties符合图数据库概念,日志打印精简

This commit is contained in:
FuYouJ 2023-07-06 22:14:48 +08:00
parent 4c260c4e12
commit 8b9c51cada
9 changed files with 121 additions and 148 deletions

View File

@ -9,7 +9,7 @@ import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.common.util.RetryUtil; import com.alibaba.datax.common.util.RetryUtil;
import com.alibaba.datax.plugin.writer.neo4jwriter.adapter.DateAdapter; import com.alibaba.datax.plugin.writer.neo4jwriter.adapter.DateAdapter;
import com.alibaba.datax.plugin.writer.neo4jwriter.adapter.ValueAdapter; import com.alibaba.datax.plugin.writer.neo4jwriter.adapter.ValueAdapter;
import com.alibaba.datax.plugin.writer.neo4jwriter.config.Neo4jField; import com.alibaba.datax.plugin.writer.neo4jwriter.config.Neo4jProperty;
import com.alibaba.datax.plugin.writer.neo4jwriter.exception.Neo4jErrorCode; import com.alibaba.datax.plugin.writer.neo4jwriter.exception.Neo4jErrorCode;
import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSON;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@ -52,7 +52,6 @@ public class Neo4jClient {
public void init() { public void init() {
String database = writeConfig.database; String database = writeConfig.database;
//neo4j 3.x 没有数据库 //neo4j 3.x 没有数据库
//neo4j 3.x no database
if (null != database && !"".equals(database)) { if (null != database && !"".equals(database)) {
this.session = driver.session(SessionConfig.forDatabase(database)); this.session = driver.session(SessionConfig.forDatabase(database));
} else { } else {
@ -67,12 +66,12 @@ public class Neo4jClient {
String database = config.getString(DATABASE.getKey()); String database = config.getString(DATABASE.getKey());
String batchVariableName = config.getString(BATCH_DATA_VARIABLE_NAME.getKey(), String batchVariableName = config.getString(BATCH_DATA_VARIABLE_NAME.getKey(),
BATCH_DATA_VARIABLE_NAME.getDefaultValue()); BATCH_DATA_VARIABLE_NAME.getDefaultValue());
List<Neo4jField> neo4jFields = JSON.parseArray(config.getString(NEO4J_FIELDS.getKey()), Neo4jField.class); List<Neo4jProperty> neo4jProperties = JSON.parseArray(config.getString(NEO4J_PROPERTIES.getKey()), Neo4jProperty.class);
int batchSize = config.getInt(BATCH_SIZE.getKey(), BATCH_SIZE.getDefaultValue()); int batchSize = config.getInt(BATCH_SIZE.getKey(), BATCH_SIZE.getDefaultValue());
int retryTimes = config.getInt(RETRY_TIMES.getKey(), RETRY_TIMES.getDefaultValue()); int retryTimes = config.getInt(RETRY_TIMES.getKey(), RETRY_TIMES.getDefaultValue());
return new Neo4jClient(driver, return new Neo4jClient(driver,
new WriteConfig(cypher, database, batchVariableName, neo4jFields, batchSize), new WriteConfig(cypher, database, batchVariableName, neo4jProperties, batchSize),
new RetryConfig(retryTimes, config.getLong(RETRY_SLEEP_MILLS.getKey(), RETRY_SLEEP_MILLS.getDefaultValue())), new RetryConfig(retryTimes, config.getLong(RETRY_SLEEP_MILLS.getKey(), RETRY_SLEEP_MILLS.getDefaultValue())),
taskPluginCollector taskPluginCollector
); );
@ -176,7 +175,6 @@ public class Neo4jClient {
}, this.retryConfig.retryTimes, retryConfig.retrySleepMills, true, }, this.retryConfig.retryTimes, retryConfig.retrySleepMills, true,
Collections.singletonList(Neo4jException.class)); Collections.singletonList(Neo4jException.class));
} catch (Exception e) { } catch (Exception e) {
LOGGER.error("在写入数据库时发生了异常,原因是:{}", e.getMessage());
LOGGER.error("an exception occurred while writing to the database,message:{}", e.getMessage()); LOGGER.error("an exception occurred while writing to the database,message:{}", e.getMessage());
throw DataXException.asDataXException(DATABASE_ERROR, e.getMessage()); throw DataXException.asDataXException(DATABASE_ERROR, e.getMessage());
} }
@ -200,26 +198,18 @@ public class Neo4jClient {
private MapValue checkAndConvert(Record record) { private MapValue checkAndConvert(Record record) {
int sourceColNum = record.getColumnNumber(); int sourceColNum = record.getColumnNumber();
List<Neo4jField> neo4jFields = writeConfig.neo4jFields; List<Neo4jProperty> neo4JProperties = writeConfig.neo4jProperties;
if (sourceColNum < neo4jFields.size()) {
LOGGER.warn("接收到的数据列少于neo4jWriter企图消费的数据列,请注意风险,这可能导致数据不匹配");
LOGGER.warn("Receive fewer data columns than neo4jWriter attempts to consume, " +
"be aware of the risk that this may result in data mismatch");
LOGGER.warn("接收到的数据是:" + record);
LOGGER.warn("received data is" + record);
}
int len = Math.min(sourceColNum, neo4jFields.size()); int len = Math.min(sourceColNum, neo4JProperties.size());
Map<String, Value> data = new HashMap<>(len * 4 / 3); Map<String, Value> data = new HashMap<>(len * 4 / 3);
for (int i = 0; i < len; i++) { for (int i = 0; i < len; i++) {
Column column = record.getColumn(i); Column column = record.getColumn(i);
Neo4jField neo4jField = neo4jFields.get(i); Neo4jProperty neo4jProperty = neo4JProperties.get(i);
try { try {
Value value = ValueAdapter.column2Value(column, neo4jField); Value value = ValueAdapter.column2Value(column, neo4jProperty);
data.put(neo4jField.getFieldName(), value); data.put(neo4jProperty.getName(), value);
} catch (Exception e) { } catch (Exception e) {
LOGGER.info("检测到一条脏数据:{},原因:{}", column, e.getMessage());
LOGGER.info("dirty record{},message :{}", column, e.getMessage()); LOGGER.info("dirty record{},message :{}", column, e.getMessage());
this.taskPluginCollector.collectDirtyRecord(record, e.getMessage()); this.taskPluginCollector.collectDirtyRecord(record, e.getMessage());
} }
@ -227,8 +217,8 @@ public class Neo4jClient {
return new MapValue(data); return new MapValue(data);
} }
public List<Neo4jField> getNeo4jFields() { public List<Neo4jProperty> getNeo4jFields() {
return this.writeConfig.neo4jFields; return this.writeConfig.neo4jProperties;
} }
@ -249,19 +239,19 @@ public class Neo4jClient {
String batchVariableName; String batchVariableName;
List<Neo4jField> neo4jFields; List<Neo4jProperty> neo4jProperties;
int batchSize; int batchSize;
public WriteConfig(String cypher, public WriteConfig(String cypher,
String database, String database,
String batchVariableName, String batchVariableName,
List<Neo4jField> neo4jFields, List<Neo4jProperty> neo4jProperties,
int batchSize) { int batchSize) {
this.cypher = cypher; this.cypher = cypher;
this.database = database; this.database = database;
this.batchVariableName = batchVariableName; this.batchVariableName = batchVariableName;
this.neo4jFields = neo4jFields; this.neo4jProperties = neo4jProperties;
this.batchSize = batchSize; this.batchSize = batchSize;
} }

View File

@ -17,7 +17,7 @@ public class Neo4jWriter extends Writer {
private Configuration jobConf = null; private Configuration jobConf = null;
@Override @Override
public void init() { public void init() {
LOGGER.info("Neo4jWriter Job init Success"); LOGGER.info("Neo4jWriter Job init success");
} }
@Override @Override

View File

@ -1,7 +1,7 @@
package com.alibaba.datax.plugin.writer.neo4jwriter.adapter; package com.alibaba.datax.plugin.writer.neo4jwriter.adapter;
import com.alibaba.datax.plugin.writer.neo4jwriter.config.Neo4jField; import com.alibaba.datax.plugin.writer.neo4jwriter.config.Neo4jProperty;
import org.testcontainers.shaded.com.google.common.base.Supplier; import org.testcontainers.shaded.com.google.common.base.Supplier;
import java.time.LocalDate; import java.time.LocalDate;
@ -21,12 +21,12 @@ public class DateAdapter {
private static final String DEFAULT_LOCAL_DATE_TIME_FORMATTER = "yyyy-MM-dd HH:mm:ss"; private static final String DEFAULT_LOCAL_DATE_TIME_FORMATTER = "yyyy-MM-dd HH:mm:ss";
public static LocalDate localDate(String text, Neo4jField neo4jField) { public static LocalDate localDate(String text, Neo4jProperty neo4jProperty) {
if (LOCAL_DATE_FORMATTER_MAP.get() != null) { if (LOCAL_DATE_FORMATTER_MAP.get() != null) {
return LocalDate.parse(text, LOCAL_DATE_FORMATTER_MAP.get()); return LocalDate.parse(text, LOCAL_DATE_FORMATTER_MAP.get());
} }
String format = getOrDefault(neo4jField::getDateFormat, DEFAULT_LOCAL_DATE_FORMATTER); String format = getOrDefault(neo4jProperty::getDateFormat, DEFAULT_LOCAL_DATE_FORMATTER);
DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(format); DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(format);
LOCAL_DATE_FORMATTER_MAP.set(dateTimeFormatter); LOCAL_DATE_FORMATTER_MAP.set(dateTimeFormatter);
return LocalDate.parse(text, dateTimeFormatter); return LocalDate.parse(text, dateTimeFormatter);
@ -47,22 +47,22 @@ public class DateAdapter {
LOCAL_DATE_TIME_FORMATTER_MAP.remove(); LOCAL_DATE_TIME_FORMATTER_MAP.remove();
} }
public static LocalTime localTime(String text, Neo4jField neo4jField) { public static LocalTime localTime(String text, Neo4jProperty neo4JProperty) {
if (LOCAL_TIME_FORMATTER_MAP.get() != null) { if (LOCAL_TIME_FORMATTER_MAP.get() != null) {
return LocalTime.parse(text, LOCAL_TIME_FORMATTER_MAP.get()); return LocalTime.parse(text, LOCAL_TIME_FORMATTER_MAP.get());
} }
String format = getOrDefault(neo4jField::getDateFormat, DEFAULT_LOCAL_TIME_FORMATTER); String format = getOrDefault(neo4JProperty::getDateFormat, DEFAULT_LOCAL_TIME_FORMATTER);
DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(format); DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(format);
LOCAL_TIME_FORMATTER_MAP.set(dateTimeFormatter); LOCAL_TIME_FORMATTER_MAP.set(dateTimeFormatter);
return LocalTime.parse(text, dateTimeFormatter); return LocalTime.parse(text, dateTimeFormatter);
} }
public static LocalDateTime localDateTime(String text, Neo4jField neo4jField) { public static LocalDateTime localDateTime(String text, Neo4jProperty neo4JProperty) {
if (LOCAL_DATE_TIME_FORMATTER_MAP.get() != null){ if (LOCAL_DATE_TIME_FORMATTER_MAP.get() != null){
return LocalDateTime.parse(text,LOCAL_DATE_TIME_FORMATTER_MAP.get()); return LocalDateTime.parse(text,LOCAL_DATE_TIME_FORMATTER_MAP.get());
} }
String format = getOrDefault(neo4jField::getDateFormat, DEFAULT_LOCAL_DATE_TIME_FORMATTER); String format = getOrDefault(neo4JProperty::getDateFormat, DEFAULT_LOCAL_DATE_TIME_FORMATTER);
DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(format); DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(format);
LOCAL_DATE_TIME_FORMATTER_MAP.set(dateTimeFormatter); LOCAL_DATE_TIME_FORMATTER_MAP.set(dateTimeFormatter);
return LocalDateTime.parse(text, dateTimeFormatter); return LocalDateTime.parse(text, dateTimeFormatter);

View File

@ -1,108 +0,0 @@
package com.alibaba.datax.plugin.writer.neo4jwriter.config;
import com.alibaba.datax.plugin.writer.neo4jwriter.element.FieldType;
import java.util.Arrays;
import java.util.List;
/**
* 由于dataX并不能传输数据的元数据所以只能在writer端定义每列数据的名字
* datax does not support data metadata,
* only the name of each column of data can be defined on neo4j writer
* @author fuyouj
*/
public class Neo4jField {
public static final String DEFAULT_SPLIT = ",";
public static final List<Character> DEFAULT_ARRAY_TRIM = Arrays.asList('[',']');
/**
* name of neo4j field
*/
private String fieldName;
/**
* neo4j type
* reference by org.neo4j.driver.Values
*/
private FieldType fieldType;
/**
* for date
*/
private String dateFormat;
/**
* for array type
*/
private String split;
/**
* such as [1,2,3,4,5]
* split is ,
* arrayTrimChar is [ ]
*/
private List<Character> arrayTrimChars;
public Neo4jField(){}
public Neo4jField(String fieldName, FieldType fieldType, String format, String split, List<Character> arrayTrimChars) {
this.fieldName = fieldName;
this.fieldType = fieldType;
this.dateFormat = format;
this.split = split;
this.arrayTrimChars = arrayTrimChars;
}
public String getFieldName() {
return fieldName;
}
public void setFieldName(String fieldName) {
this.fieldName = fieldName;
}
public FieldType getFieldType() {
return fieldType;
}
public void setFieldType(FieldType fieldType) {
this.fieldType = fieldType;
}
public String getDateFormat() {
return dateFormat;
}
public void setDateFormat(String dateFormat) {
this.dateFormat = dateFormat;
}
public String getSplit() {
return getSplitOrDefault();
}
public String getSplitOrDefault(){
if (split == null || "".equals(split)){
return DEFAULT_SPLIT;
}
return split;
}
public void setSplit(String split) {
this.split = split;
}
public List<Character> getArrayTrimChars() {
return getArrayTrimOrDefault();
}
public List<Character> getArrayTrimOrDefault(){
if (arrayTrimChars == null || arrayTrimChars.isEmpty()){
return DEFAULT_ARRAY_TRIM;
}
return arrayTrimChars;
}
public void setArrayTrimChars(List<Character> arrayTrimChars) {
this.arrayTrimChars = arrayTrimChars;
}
}

View File

@ -0,0 +1,82 @@
package com.alibaba.datax.plugin.writer.neo4jwriter.config;
/**
* 由于dataX并不能传输数据的元数据所以只能在writer端定义每列数据的名字
* datax does not support data metadata,
* only the name of each column of data can be defined on neo4j writer
*
* @author fuyouj
*/
public class Neo4jProperty {
public static final String DEFAULT_SPLIT = ",";
/**
* name of neo4j field
*/
private String name;
/**
* neo4j type
* reference by org.neo4j.driver.Values
*/
private String type;
/**
* for date
*/
private String dateFormat;
/**
* for array type
*/
private String split;
public Neo4jProperty() {
}
public Neo4jProperty(String name, String type, String format, String split) {
this.name = name;
this.type = type;
this.dateFormat = format;
this.split = split;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getDateFormat() {
return dateFormat;
}
public void setDateFormat(String dateFormat) {
this.dateFormat = dateFormat;
}
public String getSplit() {
return getSplitOrDefault();
}
public String getSplitOrDefault() {
if (split == null || "".equals(split)) {
return DEFAULT_SPLIT;
}
return split;
}
public void setSplit(String split) {
this.split = split;
}
}

View File

@ -1,10 +1,12 @@
package com.alibaba.datax.plugin.writer.neo4jwriter.element; package com.alibaba.datax.plugin.writer.neo4jwriter.element;
import java.util.Arrays;
/** /**
* @see org.neo4j.driver.Values * @see org.neo4j.driver.Values
* @author fuyouj * @author fuyouj
*/ */
public enum FieldType { public enum PropertyType {
NULL, NULL,
BOOLEAN, BOOLEAN,
STRING, STRING,
@ -27,6 +29,12 @@ public enum FieldType {
SHORT_ARRAY, SHORT_ARRAY,
DOUBLE_ARRAY, DOUBLE_ARRAY,
FLOAT_ARRAY, FLOAT_ARRAY,
Object_ARRAY Object_ARRAY;
public static PropertyType fromStrIgnoreCase(String typeStr) {
return Arrays.stream(PropertyType.values())
.filter(e -> e.name().equalsIgnoreCase(typeStr))
.findFirst()
.orElse(PropertyType.STRING);
}
} }

View File

@ -9,7 +9,7 @@ public enum Neo4jErrorCode implements ErrorCode {
* Invalid configuration * Invalid configuration
* 配置校验异常 * 配置校验异常
*/ */
CONFIG_INVALID("NEO4J_ERROR_01","Invalid configuration"), CONFIG_INVALID("NEO4J_ERROR_01","invalid configuration"),
/** /**
* database error * database error
* 在执行写入到数据库时抛出的异常可能是权限异常也可能是连接超时或者是配置到了从节点 * 在执行写入到数据库时抛出的异常可能是权限异常也可能是连接超时或者是配置到了从节点

View File

@ -7,7 +7,8 @@ import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.writer.mock.MockRecord; import com.alibaba.datax.plugin.writer.mock.MockRecord;
import com.alibaba.datax.plugin.writer.mock.MockUtil; import com.alibaba.datax.plugin.writer.mock.MockUtil;
import com.alibaba.datax.plugin.writer.neo4jwriter.Neo4jClient; import com.alibaba.datax.plugin.writer.neo4jwriter.Neo4jClient;
import com.alibaba.datax.plugin.writer.neo4jwriter.config.Neo4jField; import com.alibaba.datax.plugin.writer.neo4jwriter.config.Neo4jProperty;
import com.alibaba.datax.plugin.writer.neo4jwriter.element.PropertyType;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -225,10 +226,10 @@ public class Neo4jWriterTest {
} }
private Record mockAllTypeFieldTestNode(List<Neo4jField> neo4jFields) { private Record mockAllTypeFieldTestNode(List<Neo4jProperty> neo4JProperties) {
Record mock = new MockRecord(); Record mock = new MockRecord();
for (Neo4jField field : neo4jFields) { for (Neo4jProperty field : neo4JProperties) {
mock.addColumn(MockUtil.mockColumnByType(field.getFieldType())); mock.addColumn(MockUtil.mockColumnByType(PropertyType.fromStrIgnoreCase(field.getType())));
} }
return mock; return mock;
} }

View File

@ -2,7 +2,7 @@ package com.alibaba.datax.plugin.writer.mock;
import com.alibaba.datax.common.element.*; import com.alibaba.datax.common.element.*;
import com.alibaba.datax.plugin.writer.neo4jwriter.element.FieldType; import com.alibaba.datax.plugin.writer.neo4jwriter.element.PropertyType;
import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSON;
import java.time.LocalDate; import java.time.LocalDate;
@ -13,7 +13,7 @@ import java.util.Random;
public class MockUtil { public class MockUtil {
public static Column mockColumnByType(FieldType type) { public static Column mockColumnByType(PropertyType type) {
Random random = new Random(); Random random = new Random();
switch (type) { switch (type) {
case SHORT: case SHORT: