This commit is contained in:
changhua wu 2025-04-10 16:24:59 +08:00 committed by GitHub
commit 37365d9b44
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 162 additions and 20 deletions

View File

@ -51,6 +51,13 @@
<artifactId>plugin-rdbms-util</artifactId>
<version>${datax-project-version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@ -11,7 +11,8 @@ import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.google.common.base.Strings;
import com.mongodb.*;
import com.mongodb.BasicDBObject;
import com.mongodb.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.BulkWriteOptions;
@ -159,16 +160,16 @@ public class MongoDBWriter extends Writer{
//空记录处理
if (Strings.isNullOrEmpty(record.getColumn(i).asString())) {
if (KeyConstant.isArrayType(type.toLowerCase())) {
data.put(columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), new Object[0]);
MongoUtil.putValueWithSubDocumentSupport( data, columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), new Object[0]);
} else {
data.put(columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), record.getColumn(i).asString());
MongoUtil.putValueWithSubDocumentSupport( data, columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), record.getColumn(i).asString());
}
continue;
}
if (Column.Type.INT.name().equalsIgnoreCase(type)) {
//int是特殊类型, 其他类型按照保存时Column的类型进行处理
try {
data.put(columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME),
MongoUtil.putValueWithSubDocumentSupport( data, columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME),
Integer.parseInt(
String.valueOf(record.getColumn(i).getRawData())));
} catch (Exception e) {
@ -178,7 +179,7 @@ public class MongoDBWriter extends Writer{
//处理ObjectId和数组类型
try {
if (KeyConstant.isObjectIdType(type.toLowerCase())) {
data.put(columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME),
MongoUtil.putValueWithSubDocumentSupport( data, columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME),
new ObjectId(record.getColumn(i).asString()));
} else if (KeyConstant.isArrayType(type.toLowerCase())) {
String splitter = columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_SPLITTER);
@ -195,43 +196,43 @@ public class MongoDBWriter extends Writer{
for (String s : item) {
list.add(Double.parseDouble(s));
}
data.put(columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), list.toArray(new Double[0]));
MongoUtil.putValueWithSubDocumentSupport( data, columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), list.toArray(new Double[0]));
} else if (itemType.equalsIgnoreCase(Column.Type.INT.name())) {
ArrayList<Integer> list = new ArrayList<Integer>();
for (String s : item) {
list.add(Integer.parseInt(s));
}
data.put(columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), list.toArray(new Integer[0]));
MongoUtil.putValueWithSubDocumentSupport( data, columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), list.toArray(new Integer[0]));
} else if (itemType.equalsIgnoreCase(Column.Type.LONG.name())) {
ArrayList<Long> list = new ArrayList<Long>();
for (String s : item) {
list.add(Long.parseLong(s));
}
data.put(columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), list.toArray(new Long[0]));
MongoUtil.putValueWithSubDocumentSupport( data, columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), list.toArray(new Long[0]));
} else if (itemType.equalsIgnoreCase(Column.Type.BOOL.name())) {
ArrayList<Boolean> list = new ArrayList<Boolean>();
for (String s : item) {
list.add(Boolean.parseBoolean(s));
}
data.put(columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), list.toArray(new Boolean[0]));
MongoUtil.putValueWithSubDocumentSupport( data, columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), list.toArray(new Boolean[0]));
} else if (itemType.equalsIgnoreCase(Column.Type.BYTES.name())) {
ArrayList<Byte> list = new ArrayList<Byte>();
for (String s : item) {
list.add(Byte.parseByte(s));
}
data.put(columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), list.toArray(new Byte[0]));
MongoUtil.putValueWithSubDocumentSupport( data, columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), list.toArray(new Byte[0]));
} else {
data.put(columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), record.getColumn(i).asString().split(splitter));
MongoUtil.putValueWithSubDocumentSupport( data, columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), record.getColumn(i).asString().split(splitter));
}
} else {
data.put(columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), record.getColumn(i).asString().split(splitter));
MongoUtil.putValueWithSubDocumentSupport( data, columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), record.getColumn(i).asString().split(splitter));
}
} else if(type.toLowerCase().equalsIgnoreCase("json")) {
//如果是json类型,将其进行转换
Object mode = com.mongodb.util.JSON.parse(record.getColumn(i).asString());
data.put(columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME),JSON.toJSON(mode));
MongoUtil.putValueWithSubDocumentSupport( data, columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME),JSON.toJSON(mode));
} else {
data.put(columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), record.getColumn(i).asString());
MongoUtil.putValueWithSubDocumentSupport( data, columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), record.getColumn(i).asString());
}
} catch (Exception e) {
super.getTaskPluginCollector().collectDirtyRecord(record, e);
@ -239,7 +240,7 @@ public class MongoDBWriter extends Writer{
} else if(record.getColumn(i) instanceof LongColumn) {
if (Column.Type.LONG.name().equalsIgnoreCase(type)) {
data.put(columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME),record.getColumn(i).asLong());
MongoUtil.putValueWithSubDocumentSupport( data, columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME),record.getColumn(i).asLong());
} else {
super.getTaskPluginCollector().collectDirtyRecord(record, "record's [" + i + "] column's type should be: " + type);
}
@ -247,7 +248,7 @@ public class MongoDBWriter extends Writer{
} else if(record.getColumn(i) instanceof DateColumn) {
if (Column.Type.DATE.name().equalsIgnoreCase(type)) {
data.put(columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME),
MongoUtil.putValueWithSubDocumentSupport( data, columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME),
record.getColumn(i).asDate());
} else {
super.getTaskPluginCollector().collectDirtyRecord(record, "record's [" + i + "] column's type should be: " + type);
@ -256,7 +257,7 @@ public class MongoDBWriter extends Writer{
} else if(record.getColumn(i) instanceof DoubleColumn) {
if (Column.Type.DOUBLE.name().equalsIgnoreCase(type)) {
data.put(columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME),
MongoUtil.putValueWithSubDocumentSupport( data, columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME),
record.getColumn(i).asDouble());
} else {
super.getTaskPluginCollector().collectDirtyRecord(record, "record's [" + i + "] column's type should be: " + type);
@ -265,7 +266,7 @@ public class MongoDBWriter extends Writer{
} else if(record.getColumn(i) instanceof BoolColumn) {
if (Column.Type.BOOL.name().equalsIgnoreCase(type)) {
data.put(columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME),
MongoUtil.putValueWithSubDocumentSupport( data, columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME),
record.getColumn(i).asBoolean());
} else {
super.getTaskPluginCollector().collectDirtyRecord(record, "record's [" + i + "] column's type should be: " + type);
@ -274,14 +275,14 @@ public class MongoDBWriter extends Writer{
} else if(record.getColumn(i) instanceof BytesColumn) {
if (Column.Type.BYTES.name().equalsIgnoreCase(type)) {
data.put(columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME),
MongoUtil.putValueWithSubDocumentSupport( data, columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME),
record.getColumn(i).asBytes());
} else {
super.getTaskPluginCollector().collectDirtyRecord(record, "record's [" + i + "] column's type should be: " + type);
}
} else {
data.put(columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME),record.getColumn(i).asString());
MongoUtil.putValueWithSubDocumentSupport( data, columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME),record.getColumn(i).asString());
}
}
dataList.add(data);

View File

@ -4,10 +4,12 @@ import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.writer.mongodbwriter.KeyConstant;
import com.alibaba.datax.plugin.writer.mongodbwriter.MongoDBWriterErrorCode;
import com.mongodb.BasicDBObject;
import com.mongodb.MongoClient;
import com.mongodb.MongoCredential;
import com.mongodb.ServerAddress;
import javax.annotation.Nonnull;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
@ -83,6 +85,59 @@ public class MongoUtil {
return addressList;
}
/**
* put mongo document with SubDocument support
*
* patch submit by Created by changhua.wch on 2018/11/5
*
* @param recordData 最上层父节点的记录
* @param fullFieldName 字段全名包含父节点->自节点的全路径 例如 "aa.bb.cc"
* @param value 子节点的值
*/
public static void putValueWithSubDocumentSupport(
@Nonnull BasicDBObject recordData,
@Nonnull String fullFieldName,
Object value
) {
if(fullFieldName.contains(".")) {
String[] fieldLayers = fullFieldName.split("\\.");
if(fieldLayers.length > 1) {
BasicDBObject currentLayerData = recordData;
// check-exists and add all parent layer
for(int i =0; i< fieldLayers.length - 1; ++i) {
String layer = fieldLayers[ i ];
if(currentLayerData.containsField(layer)) {
currentLayerData = (BasicDBObject)currentLayerData.get(layer);
} else {
BasicDBObject subLayerDoc = new BasicDBObject();
currentLayerData.put(layer, subLayerDoc);
currentLayerData = subLayerDoc;
}
}
// add last layer's value
String childFieldName = fieldLayers[ fieldLayers.length - 1 ];
currentLayerData.put(childFieldName, value);
} else {
throw DataXException.asDataXException(MongoDBWriterErrorCode.ILLEGAL_VALUE,
MongoDBWriterErrorCode.ILLEGAL_VALUE.getDescription() + " column: " + fullFieldName);
}
} else {
// simple behavior, directly add field-value pair
recordData.put(fullFieldName, value);
}
}
public static void main(String[] args) {
try {
ArrayList hostAddress = new ArrayList();

View File

@ -0,0 +1,51 @@
package com.alibaba.datax.plugin.writer.mongodbwriter.util;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.mongodb.BasicDBObject;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Created by changhua.wch on 2018/11/5
*/
public class MongoUtilTest {
Logger log = LoggerFactory.getLogger(MongoUtilTest.class);
@Test
public void testPutValueWithSubDocumentSupport() {
BasicDBObject data = new BasicDBObject();
String fullFieldName = "key.parent.sub.field";
MongoUtil.putValueWithSubDocumentSupport( data, fullFieldName, "String1-Value");
MongoUtil.putValueWithSubDocumentSupport( data, "key.parent.sub1.field", 22L);
MongoUtil.putValueWithSubDocumentSupport( data, "key.parent.sub1.fieldB", null);
MongoUtil.putValueWithSubDocumentSupport( data, "key.sub12.field3", 56.04);
String dumpJson = JSON.toJSONString(data, SerializerFeature.PrettyFormat);
log.info("data: {}", dumpJson);
BasicDBObject layer1 = (BasicDBObject)data.get("key");
Assert.assertNotNull( layer1 );
BasicDBObject layer2 = (BasicDBObject)layer1.get("parent");
Assert.assertNotNull( layer2 );
BasicDBObject layer3 = (BasicDBObject)layer2.get("sub");
Assert.assertNotNull( layer3 );
String layer4 = layer3.getString("field");
Assert.assertNotNull( layer4 );
Assert.assertEquals( "String1-Value", layer4 );
}
}

View File

@ -0,0 +1,6 @@
log4j.rootLogger=info,stdout
### output log to console ###
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n

View File

@ -0,0 +1,22 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="consoleLog" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<logger name="com.alibaba.datax">
<level value="DEBUG"/>
</logger>
<root level="WARN">
<appender-ref ref="consoleLog" />
</root>
</configuration>