diff --git a/mongodbwriter/pom.xml b/mongodbwriter/pom.xml index ac6d9394..7ee43940 100644 --- a/mongodbwriter/pom.xml +++ b/mongodbwriter/pom.xml @@ -51,6 +51,13 @@ plugin-rdbms-util ${datax-project-version} + + + junit + junit + 4.12 + test + diff --git a/mongodbwriter/src/main/java/com/alibaba/datax/plugin/writer/mongodbwriter/MongoDBWriter.java b/mongodbwriter/src/main/java/com/alibaba/datax/plugin/writer/mongodbwriter/MongoDBWriter.java index 76f35a40..cd5f856b 100644 --- a/mongodbwriter/src/main/java/com/alibaba/datax/plugin/writer/mongodbwriter/MongoDBWriter.java +++ b/mongodbwriter/src/main/java/com/alibaba/datax/plugin/writer/mongodbwriter/MongoDBWriter.java @@ -11,7 +11,8 @@ import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import com.google.common.base.Strings; -import com.mongodb.*; +import com.mongodb.BasicDBObject; +import com.mongodb.MongoClient; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import com.mongodb.client.model.BulkWriteOptions; @@ -159,16 +160,16 @@ public class MongoDBWriter extends Writer{ //空记录处理 if (Strings.isNullOrEmpty(record.getColumn(i).asString())) { if (KeyConstant.isArrayType(type.toLowerCase())) { - data.put(columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), new Object[0]); + MongoUtil.putValueWithSubDocumentSupport( data, columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), new Object[0]); } else { - data.put(columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), record.getColumn(i).asString()); + MongoUtil.putValueWithSubDocumentSupport( data, columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), record.getColumn(i).asString()); } continue; } if (Column.Type.INT.name().equalsIgnoreCase(type)) { //int是特殊类型, 其他类型按照保存时Column的类型进行处理 try { - data.put(columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), + MongoUtil.putValueWithSubDocumentSupport( data, columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), Integer.parseInt( String.valueOf(record.getColumn(i).getRawData()))); } catch (Exception e) { @@ -178,7 +179,7 @@ public class MongoDBWriter extends Writer{ //处理ObjectId和数组类型 try { if (KeyConstant.isObjectIdType(type.toLowerCase())) { - data.put(columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), + MongoUtil.putValueWithSubDocumentSupport( data, columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), new ObjectId(record.getColumn(i).asString())); } else if (KeyConstant.isArrayType(type.toLowerCase())) { String splitter = columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_SPLITTER); @@ -195,43 +196,43 @@ public class MongoDBWriter extends Writer{ for (String s : item) { list.add(Double.parseDouble(s)); } - data.put(columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), list.toArray(new Double[0])); + MongoUtil.putValueWithSubDocumentSupport( data, columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), list.toArray(new Double[0])); } else if (itemType.equalsIgnoreCase(Column.Type.INT.name())) { ArrayList list = new ArrayList(); for (String s : item) { list.add(Integer.parseInt(s)); } - data.put(columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), list.toArray(new Integer[0])); + MongoUtil.putValueWithSubDocumentSupport( data, columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), list.toArray(new Integer[0])); } else if (itemType.equalsIgnoreCase(Column.Type.LONG.name())) { ArrayList list = new ArrayList(); for (String s : item) { list.add(Long.parseLong(s)); } - data.put(columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), list.toArray(new Long[0])); + MongoUtil.putValueWithSubDocumentSupport( data, columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), list.toArray(new Long[0])); } else if (itemType.equalsIgnoreCase(Column.Type.BOOL.name())) { ArrayList list = new ArrayList(); for (String s : item) { list.add(Boolean.parseBoolean(s)); } - data.put(columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), list.toArray(new Boolean[0])); + MongoUtil.putValueWithSubDocumentSupport( data, columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), list.toArray(new Boolean[0])); } else if (itemType.equalsIgnoreCase(Column.Type.BYTES.name())) { ArrayList list = new ArrayList(); for (String s : item) { list.add(Byte.parseByte(s)); } - data.put(columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), list.toArray(new Byte[0])); + MongoUtil.putValueWithSubDocumentSupport( data, columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), list.toArray(new Byte[0])); } else { - data.put(columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), record.getColumn(i).asString().split(splitter)); + MongoUtil.putValueWithSubDocumentSupport( data, columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), record.getColumn(i).asString().split(splitter)); } } else { - data.put(columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), record.getColumn(i).asString().split(splitter)); + MongoUtil.putValueWithSubDocumentSupport( data, columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), record.getColumn(i).asString().split(splitter)); } } else if(type.toLowerCase().equalsIgnoreCase("json")) { //如果是json类型,将其进行转换 Object mode = com.mongodb.util.JSON.parse(record.getColumn(i).asString()); - data.put(columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME),JSON.toJSON(mode)); + MongoUtil.putValueWithSubDocumentSupport( data, columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME),JSON.toJSON(mode)); } else { - data.put(columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), record.getColumn(i).asString()); + MongoUtil.putValueWithSubDocumentSupport( data, columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), record.getColumn(i).asString()); } } catch (Exception e) { super.getTaskPluginCollector().collectDirtyRecord(record, e); @@ -239,7 +240,7 @@ public class MongoDBWriter extends Writer{ } else if(record.getColumn(i) instanceof LongColumn) { if (Column.Type.LONG.name().equalsIgnoreCase(type)) { - data.put(columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME),record.getColumn(i).asLong()); + MongoUtil.putValueWithSubDocumentSupport( data, columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME),record.getColumn(i).asLong()); } else { super.getTaskPluginCollector().collectDirtyRecord(record, "record's [" + i + "] column's type should be: " + type); } @@ -247,7 +248,7 @@ public class MongoDBWriter extends Writer{ } else if(record.getColumn(i) instanceof DateColumn) { if (Column.Type.DATE.name().equalsIgnoreCase(type)) { - data.put(columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), + MongoUtil.putValueWithSubDocumentSupport( data, columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), record.getColumn(i).asDate()); } else { super.getTaskPluginCollector().collectDirtyRecord(record, "record's [" + i + "] column's type should be: " + type); @@ -256,7 +257,7 @@ public class MongoDBWriter extends Writer{ } else if(record.getColumn(i) instanceof DoubleColumn) { if (Column.Type.DOUBLE.name().equalsIgnoreCase(type)) { - data.put(columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), + MongoUtil.putValueWithSubDocumentSupport( data, columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), record.getColumn(i).asDouble()); } else { super.getTaskPluginCollector().collectDirtyRecord(record, "record's [" + i + "] column's type should be: " + type); @@ -265,7 +266,7 @@ public class MongoDBWriter extends Writer{ } else if(record.getColumn(i) instanceof BoolColumn) { if (Column.Type.BOOL.name().equalsIgnoreCase(type)) { - data.put(columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), + MongoUtil.putValueWithSubDocumentSupport( data, columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), record.getColumn(i).asBoolean()); } else { super.getTaskPluginCollector().collectDirtyRecord(record, "record's [" + i + "] column's type should be: " + type); @@ -274,14 +275,14 @@ public class MongoDBWriter extends Writer{ } else if(record.getColumn(i) instanceof BytesColumn) { if (Column.Type.BYTES.name().equalsIgnoreCase(type)) { - data.put(columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), + MongoUtil.putValueWithSubDocumentSupport( data, columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), record.getColumn(i).asBytes()); } else { super.getTaskPluginCollector().collectDirtyRecord(record, "record's [" + i + "] column's type should be: " + type); } } else { - data.put(columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME),record.getColumn(i).asString()); + MongoUtil.putValueWithSubDocumentSupport( data, columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME),record.getColumn(i).asString()); } } dataList.add(data); diff --git a/mongodbwriter/src/main/java/com/alibaba/datax/plugin/writer/mongodbwriter/util/MongoUtil.java b/mongodbwriter/src/main/java/com/alibaba/datax/plugin/writer/mongodbwriter/util/MongoUtil.java index 17334be4..41ac1f7f 100644 --- a/mongodbwriter/src/main/java/com/alibaba/datax/plugin/writer/mongodbwriter/util/MongoUtil.java +++ b/mongodbwriter/src/main/java/com/alibaba/datax/plugin/writer/mongodbwriter/util/MongoUtil.java @@ -4,10 +4,12 @@ import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.plugin.writer.mongodbwriter.KeyConstant; import com.alibaba.datax.plugin.writer.mongodbwriter.MongoDBWriterErrorCode; +import com.mongodb.BasicDBObject; import com.mongodb.MongoClient; import com.mongodb.MongoCredential; import com.mongodb.ServerAddress; +import javax.annotation.Nonnull; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Arrays; @@ -83,6 +85,59 @@ public class MongoUtil { return addressList; } + + /** + * put mongo document with SubDocument support + * + * patch submit by Created by changhua.wch on 2018/11/5 + * + * @param recordData 最上层父节点的记录 + * @param fullFieldName 字段全名,包含父节点->自节点的全路径 例如: "aa.bb.cc" + * @param value 子节点的值 + */ + public static void putValueWithSubDocumentSupport( + @Nonnull BasicDBObject recordData, + @Nonnull String fullFieldName, + Object value + ) { + + if(fullFieldName.contains(".")) { + String[] fieldLayers = fullFieldName.split("\\."); + + if(fieldLayers.length > 1) { + + BasicDBObject currentLayerData = recordData; + + // check-exists and add all parent layer + for(int i =0; i< fieldLayers.length - 1; ++i) { + String layer = fieldLayers[ i ]; + if(currentLayerData.containsField(layer)) { + currentLayerData = (BasicDBObject)currentLayerData.get(layer); + } else { + BasicDBObject subLayerDoc = new BasicDBObject(); + currentLayerData.put(layer, subLayerDoc); + currentLayerData = subLayerDoc; + } + } + + // add last layer's value + String childFieldName = fieldLayers[ fieldLayers.length - 1 ]; + currentLayerData.put(childFieldName, value); + } else { + throw DataXException.asDataXException(MongoDBWriterErrorCode.ILLEGAL_VALUE, + MongoDBWriterErrorCode.ILLEGAL_VALUE.getDescription() + " column: " + fullFieldName); + } + + + } else { + + // simple behavior, directly add field-value pair + recordData.put(fullFieldName, value); + + } + + } + public static void main(String[] args) { try { ArrayList hostAddress = new ArrayList(); diff --git a/mongodbwriter/src/test/java/com/alibaba/datax/plugin/writer/mongodbwriter/util/MongoUtilTest.java b/mongodbwriter/src/test/java/com/alibaba/datax/plugin/writer/mongodbwriter/util/MongoUtilTest.java new file mode 100644 index 00000000..61dba155 --- /dev/null +++ b/mongodbwriter/src/test/java/com/alibaba/datax/plugin/writer/mongodbwriter/util/MongoUtilTest.java @@ -0,0 +1,51 @@ +package com.alibaba.datax.plugin.writer.mongodbwriter.util; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.serializer.SerializerFeature; +import com.mongodb.BasicDBObject; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Created by changhua.wch on 2018/11/5 + */ +public class MongoUtilTest { + + + Logger log = LoggerFactory.getLogger(MongoUtilTest.class); + + + @Test + public void testPutValueWithSubDocumentSupport() { + + BasicDBObject data = new BasicDBObject(); + + String fullFieldName = "key.parent.sub.field"; + + MongoUtil.putValueWithSubDocumentSupport( data, fullFieldName, "String1-Value"); + MongoUtil.putValueWithSubDocumentSupport( data, "key.parent.sub1.field", 22L); + MongoUtil.putValueWithSubDocumentSupport( data, "key.parent.sub1.fieldB", null); + + MongoUtil.putValueWithSubDocumentSupport( data, "key.sub12.field3", 56.04); + + String dumpJson = JSON.toJSONString(data, SerializerFeature.PrettyFormat); + log.info("data: {}", dumpJson); + + + BasicDBObject layer1 = (BasicDBObject)data.get("key"); + Assert.assertNotNull( layer1 ); + + BasicDBObject layer2 = (BasicDBObject)layer1.get("parent"); + Assert.assertNotNull( layer2 ); + + BasicDBObject layer3 = (BasicDBObject)layer2.get("sub"); + Assert.assertNotNull( layer3 ); + + String layer4 = layer3.getString("field"); + Assert.assertNotNull( layer4 ); + Assert.assertEquals( "String1-Value", layer4 ); + } + +} diff --git a/mongodbwriter/src/test/resources/log4j.properties b/mongodbwriter/src/test/resources/log4j.properties new file mode 100755 index 00000000..2e1fc3cf --- /dev/null +++ b/mongodbwriter/src/test/resources/log4j.properties @@ -0,0 +1,6 @@ +log4j.rootLogger=info,stdout +### output log to console ### +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.Target=System.out +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=[%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n \ No newline at end of file diff --git a/mongodbwriter/src/test/resources/logback.xml b/mongodbwriter/src/test/resources/logback.xml new file mode 100644 index 00000000..c698127f --- /dev/null +++ b/mongodbwriter/src/test/resources/logback.xml @@ -0,0 +1,22 @@ + + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + + + + + + + +