mirror of
https://github.com/alibaba/DataX.git
synced 2025-05-02 17:22:15 +08:00
Merge b7390ab288
into 0824b45c5e
This commit is contained in:
commit
fa2cb024ea
@ -7,15 +7,24 @@ import com.alibaba.datax.common.spi.ErrorCode;
|
|||||||
*/
|
*/
|
||||||
public enum MongoDBReaderErrorCode implements ErrorCode {
|
public enum MongoDBReaderErrorCode implements ErrorCode {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 参数不合法
|
||||||
|
*/
|
||||||
ILLEGAL_VALUE("ILLEGAL_PARAMETER_VALUE","参数不合法"),
|
ILLEGAL_VALUE("ILLEGAL_PARAMETER_VALUE","参数不合法"),
|
||||||
|
/**
|
||||||
|
* 不合法的Mongo地址
|
||||||
|
*/
|
||||||
ILLEGAL_ADDRESS("ILLEGAL_ADDRESS","不合法的Mongo地址"),
|
ILLEGAL_ADDRESS("ILLEGAL_ADDRESS","不合法的Mongo地址"),
|
||||||
UNEXCEPT_EXCEPTION("UNEXCEPT_EXCEPTION","未知异常");
|
/**
|
||||||
|
* 未知异常
|
||||||
|
*/
|
||||||
|
UNEXPECTED_EXCEPTION("UNEXPECTED_EXCEPTION","未知异常");
|
||||||
|
|
||||||
private final String code;
|
private final String code;
|
||||||
|
|
||||||
private final String description;
|
private final String description;
|
||||||
|
|
||||||
private MongoDBReaderErrorCode(String code,String description) {
|
MongoDBReaderErrorCode(String code,String description) {
|
||||||
this.code = code;
|
this.code = code;
|
||||||
this.description = description;
|
this.description = description;
|
||||||
}
|
}
|
||||||
|
@ -3,6 +3,7 @@ package com.alibaba.datax.plugin.reader.mongodbreader.util;
|
|||||||
import java.net.UnknownHostException;
|
import java.net.UnknownHostException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import com.alibaba.datax.common.exception.DataXException;
|
import com.alibaba.datax.common.exception.DataXException;
|
||||||
@ -33,7 +34,7 @@ public class MongoUtil {
|
|||||||
} catch (NumberFormatException e) {
|
} catch (NumberFormatException e) {
|
||||||
throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE,"不合法参数");
|
throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE,"不合法参数");
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw DataXException.asDataXException(MongoDBReaderErrorCode.UNEXCEPT_EXCEPTION,"未知异常");
|
throw DataXException.asDataXException(MongoDBReaderErrorCode.UNEXPECTED_EXCEPTION,"未知异常");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -45,14 +46,14 @@ public class MongoUtil {
|
|||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
MongoCredential credential = MongoCredential.createCredential(userName, database, password.toCharArray());
|
MongoCredential credential = MongoCredential.createCredential(userName, database, password.toCharArray());
|
||||||
return new MongoClient(parseServerAddress(addressList), Arrays.asList(credential));
|
return new MongoClient(parseServerAddress(addressList), Collections.singletonList(credential));
|
||||||
|
|
||||||
} catch (UnknownHostException e) {
|
} catch (UnknownHostException e) {
|
||||||
throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_ADDRESS,"不合法的地址");
|
throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_ADDRESS,"不合法的地址");
|
||||||
} catch (NumberFormatException e) {
|
} catch (NumberFormatException e) {
|
||||||
throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE,"不合法参数");
|
throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE,"不合法参数");
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw DataXException.asDataXException(MongoDBReaderErrorCode.UNEXCEPT_EXCEPTION,"未知异常");
|
throw DataXException.asDataXException(MongoDBReaderErrorCode.UNEXPECTED_EXCEPTION,"未知异常");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
@ -79,7 +80,7 @@ public class MongoUtil {
|
|||||||
for(Object address : rawAddressList) {
|
for(Object address : rawAddressList) {
|
||||||
String[] tempAddress = ((String)address).split(":");
|
String[] tempAddress = ((String)address).split(":");
|
||||||
try {
|
try {
|
||||||
ServerAddress sa = new ServerAddress(tempAddress[0],Integer.valueOf(tempAddress[1]));
|
ServerAddress sa = new ServerAddress(tempAddress[0],Integer.parseInt(tempAddress[1]));
|
||||||
addressList.add(sa);
|
addressList.add(sa);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new UnknownHostException();
|
throw new UnknownHostException();
|
||||||
|
@ -132,6 +132,8 @@ MongoDBWriter通过Datax框架获取Reader生成的数据,然后将Datax支持
|
|||||||
* address: MongoDB的数据地址信息,因为MonogDB可能是个集群,则ip端口信息需要以Json数组的形式给出。【必填】
|
* address: MongoDB的数据地址信息,因为MonogDB可能是个集群,则ip端口信息需要以Json数组的形式给出。【必填】
|
||||||
* userName:MongoDB的用户名。【选填】
|
* userName:MongoDB的用户名。【选填】
|
||||||
* userPassword: MongoDB的密码。【选填】
|
* userPassword: MongoDB的密码。【选填】
|
||||||
|
* dbName: MongoDB数据库【选填】
|
||||||
|
* authDb: MongoDB认证数据库【选填】
|
||||||
* collectionName: MonogoDB的集合名。【必填】
|
* collectionName: MonogoDB的集合名。【必填】
|
||||||
* column:MongoDB的文档列名。【必填】
|
* column:MongoDB的文档列名。【必填】
|
||||||
* name:Column的名字。【必填】
|
* name:Column的名字。【必填】
|
||||||
|
@ -25,6 +25,10 @@ public class KeyConstant {
|
|||||||
* mongodb 数据库名
|
* mongodb 数据库名
|
||||||
*/
|
*/
|
||||||
public static final String MONGO_DB_NAME = "dbName";
|
public static final String MONGO_DB_NAME = "dbName";
|
||||||
|
/**
|
||||||
|
* mongodb 认证数据库名
|
||||||
|
*/
|
||||||
|
public static final String MONGO_AUTHDB = "authDb";
|
||||||
/**
|
/**
|
||||||
* mongodb 集合名
|
* mongodb 集合名
|
||||||
*/
|
*/
|
||||||
|
@ -66,11 +66,12 @@ public class MongoDBWriter extends Writer{
|
|||||||
private String password = null;
|
private String password = null;
|
||||||
|
|
||||||
private String database = null;
|
private String database = null;
|
||||||
|
private String authDb = null;
|
||||||
private String collection = null;
|
private String collection = null;
|
||||||
private Integer batchSize = null;
|
private Integer batchSize = null;
|
||||||
private JSONArray mongodbColumnMeta = null;
|
private JSONArray mongodbColumnMeta = null;
|
||||||
private JSONObject writeMode = null;
|
private JSONObject writeMode = null;
|
||||||
private static int BATCH_SIZE = 1000;
|
private static final int BATCH_SIZE = 1000;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void prepare() {
|
public void prepare() {
|
||||||
@ -320,8 +321,11 @@ public class MongoDBWriter extends Writer{
|
|||||||
this.userName = writerSliceConfig.getString(KeyConstant.MONGO_USER_NAME);
|
this.userName = writerSliceConfig.getString(KeyConstant.MONGO_USER_NAME);
|
||||||
this.password = writerSliceConfig.getString(KeyConstant.MONGO_USER_PASSWORD);
|
this.password = writerSliceConfig.getString(KeyConstant.MONGO_USER_PASSWORD);
|
||||||
this.database = writerSliceConfig.getString(KeyConstant.MONGO_DB_NAME);
|
this.database = writerSliceConfig.getString(KeyConstant.MONGO_DB_NAME);
|
||||||
|
this.authDb = writerSliceConfig.getString(KeyConstant.MONGO_AUTHDB);
|
||||||
|
// 认证源
|
||||||
|
String authSource = Strings.isNullOrEmpty(authDb) ? database : authDb;
|
||||||
if(!Strings.isNullOrEmpty(userName) && !Strings.isNullOrEmpty(password)) {
|
if(!Strings.isNullOrEmpty(userName) && !Strings.isNullOrEmpty(password)) {
|
||||||
this.mongoClient = MongoUtil.initCredentialMongoClient(this.writerSliceConfig,userName,password,database);
|
this.mongoClient = MongoUtil.initCredentialMongoClient(this.writerSliceConfig,userName,password,authSource);
|
||||||
} else {
|
} else {
|
||||||
this.mongoClient = MongoUtil.initMongoClient(this.writerSliceConfig);
|
this.mongoClient = MongoUtil.initMongoClient(this.writerSliceConfig);
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user