This commit is contained in:
felix.wang 2025-04-10 16:22:56 +08:00 committed by GitHub
commit b23c1b050d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 31 additions and 2 deletions

View File

@ -94,4 +94,9 @@ public class KeyConstant {
public static boolean isDocumentType(String type) {
return type.startsWith(DOCUMENT_TYPE);
}
/**
* mongodb url
*/
public static final String MONGO_URL="mongoUrl";
}

View File

@ -47,6 +47,8 @@ public class MongoDBReader extends Reader {
private String userName = null;
private String password = null;
private String mongoDBUrl = null;
@Override
public List<Configuration> split(int adviceNumber) {
return CollectionSplitUtil.doSplit(originalConfig,adviceNumber,mongoClient);
@ -57,9 +59,12 @@ public class MongoDBReader extends Reader {
this.originalConfig = super.getPluginJobConf();
this.userName = originalConfig.getString(KeyConstant.MONGO_USER_NAME, originalConfig.getString(KeyConstant.MONGO_USERNAME));
this.password = originalConfig.getString(KeyConstant.MONGO_USER_PASSWORD, originalConfig.getString(KeyConstant.MONGO_PASSWORD));
this.mongoDBUrl = originalConfig.getString(KeyConstant.MONGO_URL);
String database = originalConfig.getString(KeyConstant.MONGO_DB_NAME, originalConfig.getString(KeyConstant.MONGO_DATABASE));
String authDb = originalConfig.getString(KeyConstant.MONGO_AUTHDB, database);
if(!Strings.isNullOrEmpty(this.userName) && !Strings.isNullOrEmpty(this.password)) {
if (!Strings.isNullOrEmpty(mongoDBUrl)) {
this.mongoClient = MongoUtil.initMongoClientByURL(mongoDBUrl);
}else if(!Strings.isNullOrEmpty(this.userName) && !Strings.isNullOrEmpty(this.password)) {
this.mongoClient = MongoUtil.initCredentialMongoClient(originalConfig,userName,password,authDb);
} else {
this.mongoClient = MongoUtil.initMongoClient(originalConfig);
@ -93,6 +98,8 @@ public class MongoDBReader extends Reader {
private Object upperBound = null;
private boolean isObjectId = true;
private String mongoDBUrl = null;
@Override
public void startRead(RecordSender recordSender) {
@ -189,7 +196,10 @@ public class MongoDBReader extends Reader {
this.password = readerSliceConfig.getString(KeyConstant.MONGO_USER_PASSWORD, readerSliceConfig.getString(KeyConstant.MONGO_PASSWORD));
this.database = readerSliceConfig.getString(KeyConstant.MONGO_DB_NAME, readerSliceConfig.getString(KeyConstant.MONGO_DATABASE));
this.authDb = readerSliceConfig.getString(KeyConstant.MONGO_AUTHDB, this.database);
if(!Strings.isNullOrEmpty(userName) && !Strings.isNullOrEmpty(password)) {
this.mongoDBUrl = readerSliceConfig.getString(KeyConstant.MONGO_URL);
if (!Strings.isNullOrEmpty(mongoDBUrl)) {
this.mongoClient = MongoUtil.initMongoClientByURL(mongoDBUrl);
} else if(!Strings.isNullOrEmpty(userName) && !Strings.isNullOrEmpty(password)) {
mongoClient = MongoUtil.initCredentialMongoClient(readerSliceConfig,userName,password,authDb);
} else {
mongoClient = MongoUtil.initMongoClient(readerSliceConfig);

View File

@ -11,6 +11,7 @@ import com.alibaba.datax.plugin.reader.mongodbreader.KeyConstant;
import com.alibaba.datax.plugin.reader.mongodbreader.MongoDBReaderErrorCode;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.MongoCredential;
import com.mongodb.ServerAddress;
@ -87,4 +88,17 @@ public class MongoUtil {
}
return addressList;
}
public static MongoClient initMongoClientByURL(String mongoUrl) {
try {
return new MongoClient(new MongoClientURI(mongoUrl));
} catch (NumberFormatException e) {
e.printStackTrace();
throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE, "不合法参数");
} catch (Exception e) {
e.printStackTrace();
throw DataXException.asDataXException(MongoDBReaderErrorCode.UNEXCEPT_EXCEPTION, "未知异常");
}
}
}