mirror of
https://github.com/alibaba/DataX.git
synced 2025-05-03 07:10:53 +08:00
Merge 089609c628
into 0824b45c5e
This commit is contained in:
commit
68ae698b2e
@ -27,6 +27,12 @@ MongoDBReader通过Datax框架从MongoDB并行的读取数据,通过主控的J
|
|||||||
"userPassword": "",
|
"userPassword": "",
|
||||||
"dbName": "tag_per_data",
|
"dbName": "tag_per_data",
|
||||||
"collectionName": "tag_data12",
|
"collectionName": "tag_data12",
|
||||||
|
"authDb": "authDb",
|
||||||
|
"sslMode": "two-way",
|
||||||
|
"trustStorePath": "/path/to/trustStore",
|
||||||
|
"trustStorePwd": "",
|
||||||
|
"keyStorePath": "/path/to/keyStore",
|
||||||
|
"keyStorePwd": "",
|
||||||
"column": [
|
"column": [
|
||||||
{
|
{
|
||||||
"name": "unique_id",
|
"name": "unique_id",
|
||||||
@ -128,6 +134,12 @@ MongoDBReader通过Datax框架从MongoDB并行的读取数据,通过主控的J
|
|||||||
* userPassword: MongoDB的密码。【选填】
|
* userPassword: MongoDB的密码。【选填】
|
||||||
* authDb: MongoDB认证数据库【选填】
|
* authDb: MongoDB认证数据库【选填】
|
||||||
* collectionName: MonogoDB的集合名。【必填】
|
* collectionName: MonogoDB的集合名。【必填】
|
||||||
|
* authDb: MongoDB用户的鉴权数据库。【选填】
|
||||||
|
* sslMode: MongoDB ssl/tls 加密访问选项,支持 two-way(双向验证) | client-authentication(服务端确认客服端身份,客服端直接信赖服务端) | no-authentication(双方均不验证证书,直接新任) 【选填】
|
||||||
|
* trustStorePath: 信赖的证书库路径,client-authentication时不需要。【选填】
|
||||||
|
* trustStorePwd: 信赖的证书库密码。【选填】
|
||||||
|
* keyStorePath: client端证书和密钥库。【选填】
|
||||||
|
* keyStorePwd: client端证书和密钥库密码。【选填】
|
||||||
* column:MongoDB的文档列名。【必填】
|
* column:MongoDB的文档列名。【必填】
|
||||||
* name:Column的名字。【必填】
|
* name:Column的名字。【必填】
|
||||||
* type:Column的类型。【选填】
|
* type:Column的类型。【选填】
|
||||||
|
@ -94,4 +94,28 @@ public class KeyConstant {
|
|||||||
public static boolean isDocumentType(String type) {
|
public static boolean isDocumentType(String type) {
|
||||||
return type.startsWith(DOCUMENT_TYPE);
|
return type.startsWith(DOCUMENT_TYPE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* sslMode: two-way | client-authentication | no-authentication
|
||||||
|
*/
|
||||||
|
public static final String SSL_MODE = "sslMode";
|
||||||
|
/**
|
||||||
|
* trustStore path
|
||||||
|
*/
|
||||||
|
public static final String TRUST_STORE_PATH = "trustStorePath";
|
||||||
|
/**
|
||||||
|
* trustStore password
|
||||||
|
*/
|
||||||
|
public static final String TRUST_STORE_PWD = "trustStorePwd";
|
||||||
|
/**
|
||||||
|
* keyStore path
|
||||||
|
*/
|
||||||
|
public static final String KEY_STORE_PATH = "keyStorePath";
|
||||||
|
/**
|
||||||
|
* keyStore password
|
||||||
|
*/
|
||||||
|
public static final String KEY_STORE_PWD = "keyStorePwd";
|
||||||
|
public static boolean isValueTrue(String value){
|
||||||
|
return "true".equals(value);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,10 +1,6 @@
|
|||||||
package com.alibaba.datax.plugin.reader.mongodbreader;
|
package com.alibaba.datax.plugin.reader.mongodbreader;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.*;
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Date;
|
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import com.alibaba.datax.common.element.BoolColumn;
|
import com.alibaba.datax.common.element.BoolColumn;
|
||||||
import com.alibaba.datax.common.element.DateColumn;
|
import com.alibaba.datax.common.element.DateColumn;
|
||||||
@ -59,11 +55,37 @@ public class MongoDBReader extends Reader {
|
|||||||
this.password = originalConfig.getString(KeyConstant.MONGO_USER_PASSWORD, originalConfig.getString(KeyConstant.MONGO_PASSWORD));
|
this.password = originalConfig.getString(KeyConstant.MONGO_USER_PASSWORD, originalConfig.getString(KeyConstant.MONGO_PASSWORD));
|
||||||
String database = originalConfig.getString(KeyConstant.MONGO_DB_NAME, originalConfig.getString(KeyConstant.MONGO_DATABASE));
|
String database = originalConfig.getString(KeyConstant.MONGO_DB_NAME, originalConfig.getString(KeyConstant.MONGO_DATABASE));
|
||||||
String authDb = originalConfig.getString(KeyConstant.MONGO_AUTHDB, database);
|
String authDb = originalConfig.getString(KeyConstant.MONGO_AUTHDB, database);
|
||||||
|
String sslMode = originalConfig.getString(KeyConstant.SSL_MODE);
|
||||||
|
String trustStorePath = originalConfig.getString(KeyConstant.TRUST_STORE_PATH);
|
||||||
|
String trustStorePwd = originalConfig.getString(KeyConstant.TRUST_STORE_PWD);
|
||||||
|
String keyStorePath = originalConfig.getString(KeyConstant.KEY_STORE_PATH);
|
||||||
|
String keyStorePwd = originalConfig.getString(KeyConstant.KEY_STORE_PWD);
|
||||||
|
Map<String,Integer> map = new HashMap<String, Integer>();
|
||||||
|
map.put("two-way", 1);
|
||||||
|
map.put("client-authentication", 2);
|
||||||
|
map.put("no-authentication", 3);
|
||||||
|
switch (map.get(sslMode)) {
|
||||||
|
case 1:
|
||||||
|
this.mongoClient = MongoUtil.initCredentialSSLMongoClient(this.originalConfig,
|
||||||
|
userName,password,authDb,trustStorePath,trustStorePwd,keyStorePath,keyStorePwd);
|
||||||
|
break;
|
||||||
|
case 2:
|
||||||
|
this.mongoClient = MongoUtil.initCredentialClientAuthenticationMongoClient(this.originalConfig,
|
||||||
|
userName,password,authDb,keyStorePath,keyStorePwd);
|
||||||
|
break;
|
||||||
|
case 3:
|
||||||
|
this.mongoClient = MongoUtil.initCredentialNoAuthenticationMongoClient(this.originalConfig,
|
||||||
|
userName,password,authDb);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
if(!Strings.isNullOrEmpty(this.userName) && !Strings.isNullOrEmpty(this.password)) {
|
if(!Strings.isNullOrEmpty(this.userName) && !Strings.isNullOrEmpty(this.password)) {
|
||||||
this.mongoClient = MongoUtil.initCredentialMongoClient(originalConfig,userName,password,authDb);
|
this.mongoClient = MongoUtil.initCredentialMongoClient(originalConfig,userName,password,authDb);
|
||||||
} else {
|
} else {
|
||||||
this.mongoClient = MongoUtil.initMongoClient(originalConfig);
|
this.mongoClient = MongoUtil.initMongoClient(originalConfig);
|
||||||
}
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -189,12 +211,36 @@ public class MongoDBReader extends Reader {
|
|||||||
this.password = readerSliceConfig.getString(KeyConstant.MONGO_USER_PASSWORD, readerSliceConfig.getString(KeyConstant.MONGO_PASSWORD));
|
this.password = readerSliceConfig.getString(KeyConstant.MONGO_USER_PASSWORD, readerSliceConfig.getString(KeyConstant.MONGO_PASSWORD));
|
||||||
this.database = readerSliceConfig.getString(KeyConstant.MONGO_DB_NAME, readerSliceConfig.getString(KeyConstant.MONGO_DATABASE));
|
this.database = readerSliceConfig.getString(KeyConstant.MONGO_DB_NAME, readerSliceConfig.getString(KeyConstant.MONGO_DATABASE));
|
||||||
this.authDb = readerSliceConfig.getString(KeyConstant.MONGO_AUTHDB, this.database);
|
this.authDb = readerSliceConfig.getString(KeyConstant.MONGO_AUTHDB, this.database);
|
||||||
|
String sslMode = readerSliceConfig.getString(KeyConstant.SSL_MODE);
|
||||||
|
String trustStorePath = readerSliceConfig.getString(KeyConstant.TRUST_STORE_PATH);
|
||||||
|
String trustStorePwd = readerSliceConfig.getString(KeyConstant.TRUST_STORE_PWD);
|
||||||
|
String keyStorePath = readerSliceConfig.getString(KeyConstant.KEY_STORE_PATH);
|
||||||
|
String keyStorePwd = readerSliceConfig.getString(KeyConstant.KEY_STORE_PWD);
|
||||||
|
Map<String,Integer> map = new HashMap<String, Integer>();
|
||||||
|
map.put("two-way", 1);
|
||||||
|
map.put("client-authentication", 2);
|
||||||
|
map.put("no-authentication", 3);
|
||||||
|
switch (map.get(sslMode)) {
|
||||||
|
case 1:
|
||||||
|
mongoClient = MongoUtil.initCredentialSSLMongoClient(this.readerSliceConfig,
|
||||||
|
userName,password,authDb,trustStorePath,trustStorePwd,keyStorePath,keyStorePwd);
|
||||||
|
break;
|
||||||
|
case 2:
|
||||||
|
mongoClient = MongoUtil.initCredentialClientAuthenticationMongoClient(this.readerSliceConfig,
|
||||||
|
userName,password,authDb,keyStorePath,keyStorePwd);
|
||||||
|
break;
|
||||||
|
case 3:
|
||||||
|
mongoClient = MongoUtil.initCredentialNoAuthenticationMongoClient(this.readerSliceConfig,
|
||||||
|
userName,password,authDb);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
if(!Strings.isNullOrEmpty(userName) && !Strings.isNullOrEmpty(password)) {
|
if(!Strings.isNullOrEmpty(userName) && !Strings.isNullOrEmpty(password)) {
|
||||||
mongoClient = MongoUtil.initCredentialMongoClient(readerSliceConfig,userName,password,authDb);
|
mongoClient = MongoUtil.initCredentialMongoClient(readerSliceConfig,userName,password,authDb);
|
||||||
} else {
|
} else {
|
||||||
mongoClient = MongoUtil.initMongoClient(readerSliceConfig);
|
mongoClient = MongoUtil.initMongoClient(readerSliceConfig);
|
||||||
}
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
this.collection = readerSliceConfig.getString(KeyConstant.MONGO_COLLECTION_NAME);
|
this.collection = readerSliceConfig.getString(KeyConstant.MONGO_COLLECTION_NAME);
|
||||||
this.query = readerSliceConfig.getString(KeyConstant.MONGO_QUERY);
|
this.query = readerSliceConfig.getString(KeyConstant.MONGO_QUERY);
|
||||||
this.mongodbColumnMeta = JSON.parseArray(readerSliceConfig.getString(KeyConstant.MONGO_COLUMN));
|
this.mongodbColumnMeta = JSON.parseArray(readerSliceConfig.getString(KeyConstant.MONGO_COLUMN));
|
||||||
|
@ -1,6 +1,11 @@
|
|||||||
package com.alibaba.datax.plugin.reader.mongodbreader.util;
|
package com.alibaba.datax.plugin.reader.mongodbreader.util;
|
||||||
|
|
||||||
|
import java.io.FileInputStream;
|
||||||
import java.net.UnknownHostException;
|
import java.net.UnknownHostException;
|
||||||
|
import java.security.KeyStore;
|
||||||
|
import java.security.SecureRandom;
|
||||||
|
import java.security.cert.CertificateException;
|
||||||
|
import java.security.cert.X509Certificate;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -11,9 +16,12 @@ import com.alibaba.datax.plugin.reader.mongodbreader.KeyConstant;
|
|||||||
import com.alibaba.datax.plugin.reader.mongodbreader.MongoDBReaderErrorCode;
|
import com.alibaba.datax.plugin.reader.mongodbreader.MongoDBReaderErrorCode;
|
||||||
|
|
||||||
import com.mongodb.MongoClient;
|
import com.mongodb.MongoClient;
|
||||||
|
import com.mongodb.MongoClientOptions;
|
||||||
import com.mongodb.MongoCredential;
|
import com.mongodb.MongoCredential;
|
||||||
import com.mongodb.ServerAddress;
|
import com.mongodb.ServerAddress;
|
||||||
|
|
||||||
|
import javax.net.ssl.*;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Created by jianying.wcj on 2015/3/17 0017.
|
* Created by jianying.wcj on 2015/3/17 0017.
|
||||||
* Modified by mingyan.zc on 2016/6/13.
|
* Modified by mingyan.zc on 2016/6/13.
|
||||||
@ -55,6 +63,143 @@ public class MongoUtil {
|
|||||||
throw DataXException.asDataXException(MongoDBReaderErrorCode.UNEXCEPT_EXCEPTION,"未知异常");
|
throw DataXException.asDataXException(MongoDBReaderErrorCode.UNEXCEPT_EXCEPTION,"未知异常");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* two-way 双向SSL验证, DO NOT Use System.setProperty to set javax.net.ssl.trustStore, javax.net.ssl.keyStore
|
||||||
|
* @param conf
|
||||||
|
* @param userName
|
||||||
|
* @param password
|
||||||
|
* @param database
|
||||||
|
* @param trustStorePath 信赖的证书库
|
||||||
|
* @param trustStorePwd
|
||||||
|
* @param keyStorePath client端keystore,用于server端验证client身份
|
||||||
|
* @param keyStorePwd
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public static MongoClient initCredentialSSLMongoClient(Configuration conf,String userName,String password,
|
||||||
|
String database,String trustStorePath,String trustStorePwd,String keyStorePath,String keyStorePwd) {
|
||||||
|
List<Object> addressList = conf.getList(KeyConstant.MONGO_ADDRESS);
|
||||||
|
if(!isHostPortPattern(addressList)) {
|
||||||
|
throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE,"不合法参数");
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
SSLContext sslContext = SSLContext.getInstance("SSL");
|
||||||
|
// set up a KeyManager for validation of server side if required
|
||||||
|
KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType()); // usually is jks
|
||||||
|
FileInputStream myKeyStore = new FileInputStream(keyStorePath);
|
||||||
|
keyStore.load(myKeyStore,keyStorePwd.toCharArray());
|
||||||
|
myKeyStore.close();
|
||||||
|
KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory
|
||||||
|
.getDefaultAlgorithm()); // default SunX509
|
||||||
|
kmf.init(keyStore, keyStorePwd.toCharArray());
|
||||||
|
// set up a TrustManager that trusts everything
|
||||||
|
KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
|
||||||
|
TrustManagerFactory tmf = TrustManagerFactory
|
||||||
|
.getInstance(TrustManagerFactory.getDefaultAlgorithm());
|
||||||
|
FileInputStream myTrustStore = new FileInputStream(trustStorePath);
|
||||||
|
trustStore.load(myTrustStore,trustStorePwd.toCharArray());
|
||||||
|
myTrustStore.close();
|
||||||
|
tmf.init(trustStore);
|
||||||
|
sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), new SecureRandom());
|
||||||
|
// set opts
|
||||||
|
MongoCredential credential = MongoCredential.createCredential(userName, database, password.toCharArray());
|
||||||
|
MongoClientOptions opts = MongoClientOptions.builder().
|
||||||
|
sslEnabled(true).socketFactory(sslContext.getSocketFactory()).sslInvalidHostNameAllowed(true).build();
|
||||||
|
return new MongoClient(parseServerAddress(addressList), Arrays.asList(credential), opts);
|
||||||
|
} catch (UnknownHostException e) {
|
||||||
|
throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_ADDRESS,"不合法的地址");
|
||||||
|
} catch (NumberFormatException e) {
|
||||||
|
throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE,"不合法参数");
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw DataXException.asDataXException(MongoDBReaderErrorCode.UNEXCEPT_EXCEPTION,"未知异常");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* client-authentication 单向验证,始终相信服务端
|
||||||
|
* @param conf
|
||||||
|
* @param userName
|
||||||
|
* @param password
|
||||||
|
* @param database
|
||||||
|
* @param keyStorePath
|
||||||
|
* @param keyStorePwd
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public static MongoClient initCredentialClientAuthenticationMongoClient(Configuration conf,String userName,
|
||||||
|
String password,String database,String keyStorePath,String keyStorePwd) {
|
||||||
|
List<Object> addressList = conf.getList(KeyConstant.MONGO_ADDRESS);
|
||||||
|
if(!isHostPortPattern(addressList)) {
|
||||||
|
throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE,"不合法参数");
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
SSLContext sslContext = SSLContext.getInstance("SSL");
|
||||||
|
// set up a KeyManager for validation of server side if required
|
||||||
|
KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType()); // usually is jks
|
||||||
|
FileInputStream myKeyStore = new FileInputStream(keyStorePath);
|
||||||
|
keyStore.load(myKeyStore,keyStorePwd.toCharArray());
|
||||||
|
myKeyStore.close();
|
||||||
|
KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory
|
||||||
|
.getDefaultAlgorithm()); // default SunX509
|
||||||
|
kmf.init(keyStore, keyStorePwd.toCharArray());
|
||||||
|
// set up a TrustManager that trusts everything
|
||||||
|
sslContext.init(kmf.getKeyManagers(), new TrustManager[] { new X509TrustManager() {
|
||||||
|
@Override
|
||||||
|
public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {}
|
||||||
|
@Override
|
||||||
|
public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {}
|
||||||
|
@Override
|
||||||
|
public X509Certificate[] getAcceptedIssuers() { return new X509Certificate[0]; }
|
||||||
|
}}, new SecureRandom());
|
||||||
|
MongoCredential credential = MongoCredential.createCredential(userName, database, password.toCharArray());
|
||||||
|
MongoClientOptions opts = MongoClientOptions.builder().
|
||||||
|
sslEnabled(true).socketFactory(sslContext.getSocketFactory()).sslInvalidHostNameAllowed(true).build();
|
||||||
|
return new MongoClient(parseServerAddress(addressList), Arrays.asList(credential), opts);
|
||||||
|
} catch (UnknownHostException e) {
|
||||||
|
throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_ADDRESS,"不合法的地址");
|
||||||
|
} catch (NumberFormatException e) {
|
||||||
|
throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE,"不合法参数");
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw DataXException.asDataXException(MongoDBReaderErrorCode.UNEXCEPT_EXCEPTION,"未知异常");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ssl通信,双向都不验证。不推荐。
|
||||||
|
* @param conf
|
||||||
|
* @param userName
|
||||||
|
* @param password
|
||||||
|
* @param database
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public static MongoClient initCredentialNoAuthenticationMongoClient(Configuration conf,String userName,String password,
|
||||||
|
String database) {
|
||||||
|
List<Object> addressList = conf.getList(KeyConstant.MONGO_ADDRESS);
|
||||||
|
if(!isHostPortPattern(addressList)) {
|
||||||
|
throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE,"不合法参数");
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
SSLContext sslContext = SSLContext.getInstance("SSL");
|
||||||
|
sslContext.init(null, new TrustManager[] { new X509TrustManager() {
|
||||||
|
@Override
|
||||||
|
public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {}
|
||||||
|
@Override
|
||||||
|
public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {}
|
||||||
|
@Override
|
||||||
|
public X509Certificate[] getAcceptedIssuers() { return new X509Certificate[0]; }
|
||||||
|
}}, new SecureRandom());
|
||||||
|
MongoCredential credential = MongoCredential.createCredential(userName, database, password.toCharArray());
|
||||||
|
MongoClientOptions opts = MongoClientOptions.builder().
|
||||||
|
sslEnabled(true).socketFactory(sslContext.getSocketFactory()).sslInvalidHostNameAllowed(true).build();
|
||||||
|
return new MongoClient(parseServerAddress(addressList), Arrays.asList(credential), opts);
|
||||||
|
} catch (UnknownHostException e) {
|
||||||
|
throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_ADDRESS,"不合法的地址");
|
||||||
|
} catch (NumberFormatException e) {
|
||||||
|
throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE,"不合法参数");
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw DataXException.asDataXException(MongoDBReaderErrorCode.UNEXCEPT_EXCEPTION,"未知异常");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 判断地址类型是否符合要求
|
* 判断地址类型是否符合要求
|
||||||
* @param addressList
|
* @param addressList
|
||||||
|
@ -55,6 +55,12 @@ MongoDBWriter通过Datax框架获取Reader生成的数据,然后将Datax支持
|
|||||||
"userPassword": "",
|
"userPassword": "",
|
||||||
"dbName": "tag_per_data",
|
"dbName": "tag_per_data",
|
||||||
"collectionName": "tag_data",
|
"collectionName": "tag_data",
|
||||||
|
"authDb": "authDb",
|
||||||
|
"sslMode": "two-way",
|
||||||
|
"trustStorePath": "/path/to/trustStore",
|
||||||
|
"trustStorePwd": "",
|
||||||
|
"keyStorePath": "/path/to/keyStore",
|
||||||
|
"keyStorePwd": "",
|
||||||
"column": [
|
"column": [
|
||||||
{
|
{
|
||||||
"name": "unique_id",
|
"name": "unique_id",
|
||||||
@ -133,6 +139,12 @@ MongoDBWriter通过Datax框架获取Reader生成的数据,然后将Datax支持
|
|||||||
* userName:MongoDB的用户名。【选填】
|
* userName:MongoDB的用户名。【选填】
|
||||||
* userPassword: MongoDB的密码。【选填】
|
* userPassword: MongoDB的密码。【选填】
|
||||||
* collectionName: MonogoDB的集合名。【必填】
|
* collectionName: MonogoDB的集合名。【必填】
|
||||||
|
* authDb: MongoDB用户的鉴权数据库。【选填】
|
||||||
|
* sslMode: MongoDB ssl/tls 加密访问选项,支持 two-way(双向验证) | client-authentication(服务端确认客服端身份,客服端直接信赖服务端) | no-authentication(双方均不验证证书,直接新任) 【选填】
|
||||||
|
* trustStorePath: 信赖的证书库路径,client-authentication时不需要。【选填】
|
||||||
|
* trustStorePwd: 信赖的证书库密码。【选填】
|
||||||
|
* keyStorePath: client端证书和密钥库。【选填】
|
||||||
|
* keyStorePwd: client端证书和密钥库密码。【选填】
|
||||||
* column:MongoDB的文档列名。【必填】
|
* column:MongoDB的文档列名。【必填】
|
||||||
* name:Column的名字。【必填】
|
* name:Column的名字。【必填】
|
||||||
* type:Column的类型。【必填】
|
* type:Column的类型。【必填】
|
||||||
|
@ -25,6 +25,7 @@ public class KeyConstant {
|
|||||||
* mongodb 数据库名
|
* mongodb 数据库名
|
||||||
*/
|
*/
|
||||||
public static final String MONGO_DB_NAME = "dbName";
|
public static final String MONGO_DB_NAME = "dbName";
|
||||||
|
public static final String MONGO_AUTHDB = "authDb";
|
||||||
/**
|
/**
|
||||||
* mongodb 集合名
|
* mongodb 集合名
|
||||||
*/
|
*/
|
||||||
@ -61,6 +62,26 @@ public class KeyConstant {
|
|||||||
* 指定用来判断是否覆盖的 业务主键
|
* 指定用来判断是否覆盖的 业务主键
|
||||||
*/
|
*/
|
||||||
public static final String UNIQUE_KEY = "replaceKey";
|
public static final String UNIQUE_KEY = "replaceKey";
|
||||||
|
/**
|
||||||
|
* sslMode: two-way | client-authentication | no-authentication
|
||||||
|
*/
|
||||||
|
public static final String SSL_MODE = "sslMode";
|
||||||
|
/**
|
||||||
|
* trustStore path
|
||||||
|
*/
|
||||||
|
public static final String TRUST_STORE_PATH = "trustStorePath";
|
||||||
|
/**
|
||||||
|
* trustStore password
|
||||||
|
*/
|
||||||
|
public static final String TRUST_STORE_PWD = "trustStorePwd";
|
||||||
|
/**
|
||||||
|
* keyStore path
|
||||||
|
*/
|
||||||
|
public static final String KEY_STORE_PATH = "keyStorePath";
|
||||||
|
/**
|
||||||
|
* keyStore password
|
||||||
|
*/
|
||||||
|
public static final String KEY_STORE_PWD = "keyStorePwd";
|
||||||
/**
|
/**
|
||||||
* 判断是否为数组类型
|
* 判断是否为数组类型
|
||||||
* @param type 数据类型
|
* @param type 数据类型
|
||||||
|
@ -22,7 +22,9 @@ import org.slf4j.Logger;
|
|||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
public class MongoDBWriter extends Writer{
|
public class MongoDBWriter extends Writer{
|
||||||
|
|
||||||
@ -320,11 +322,41 @@ public class MongoDBWriter extends Writer{
|
|||||||
this.userName = writerSliceConfig.getString(KeyConstant.MONGO_USER_NAME);
|
this.userName = writerSliceConfig.getString(KeyConstant.MONGO_USER_NAME);
|
||||||
this.password = writerSliceConfig.getString(KeyConstant.MONGO_USER_PASSWORD);
|
this.password = writerSliceConfig.getString(KeyConstant.MONGO_USER_PASSWORD);
|
||||||
this.database = writerSliceConfig.getString(KeyConstant.MONGO_DB_NAME);
|
this.database = writerSliceConfig.getString(KeyConstant.MONGO_DB_NAME);
|
||||||
|
String authDb = writerSliceConfig.getString(KeyConstant.MONGO_AUTHDB);
|
||||||
|
String sslMode = writerSliceConfig.getString(KeyConstant.SSL_MODE);
|
||||||
|
String trustStorePath = writerSliceConfig.getString(KeyConstant.TRUST_STORE_PATH);
|
||||||
|
String trustStorePwd = writerSliceConfig.getString(KeyConstant.TRUST_STORE_PWD);
|
||||||
|
String keyStorePath = writerSliceConfig.getString(KeyConstant.KEY_STORE_PATH);
|
||||||
|
String keyStorePwd = writerSliceConfig.getString(KeyConstant.KEY_STORE_PWD);
|
||||||
|
if(authDb == ""){
|
||||||
|
authDb = this.database;
|
||||||
|
}
|
||||||
|
Map<String,Integer> map = new HashMap<String, Integer>();
|
||||||
|
map.put("two-way", 1);
|
||||||
|
map.put("client-authentication", 2);
|
||||||
|
map.put("no-authentication", 3);
|
||||||
|
switch (map.get(sslMode)) {
|
||||||
|
case 1:
|
||||||
|
this.mongoClient = MongoUtil.initCredentialSSLMongoClient(this.writerSliceConfig,
|
||||||
|
userName,password,authDb,trustStorePath,trustStorePwd,keyStorePath,keyStorePwd);
|
||||||
|
break;
|
||||||
|
case 2:
|
||||||
|
this.mongoClient = MongoUtil.initCredentialClientAuthenticationMongoClient(this.writerSliceConfig,
|
||||||
|
userName,password,authDb,keyStorePath,keyStorePwd);
|
||||||
|
break;
|
||||||
|
case 3:
|
||||||
|
this.mongoClient = MongoUtil.initCredentialNoAuthenticationMongoClient(this.writerSliceConfig,
|
||||||
|
userName,password,authDb);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
if(!Strings.isNullOrEmpty(userName) && !Strings.isNullOrEmpty(password)) {
|
if(!Strings.isNullOrEmpty(userName) && !Strings.isNullOrEmpty(password)) {
|
||||||
this.mongoClient = MongoUtil.initCredentialMongoClient(this.writerSliceConfig,userName,password,database);
|
this.mongoClient = MongoUtil.initCredentialMongoClient(this.writerSliceConfig,userName,password,authDb);
|
||||||
} else {
|
} else {
|
||||||
this.mongoClient = MongoUtil.initMongoClient(this.writerSliceConfig);
|
this.mongoClient = MongoUtil.initMongoClient(this.writerSliceConfig);
|
||||||
}
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
}
|
||||||
this.collection = writerSliceConfig.getString(KeyConstant.MONGO_COLLECTION_NAME);
|
this.collection = writerSliceConfig.getString(KeyConstant.MONGO_COLLECTION_NAME);
|
||||||
this.batchSize = BATCH_SIZE;
|
this.batchSize = BATCH_SIZE;
|
||||||
this.mongodbColumnMeta = JSON.parseArray(writerSliceConfig.getString(KeyConstant.MONGO_COLUMN));
|
this.mongodbColumnMeta = JSON.parseArray(writerSliceConfig.getString(KeyConstant.MONGO_COLUMN));
|
||||||
|
@ -5,10 +5,19 @@ import com.alibaba.datax.common.util.Configuration;
|
|||||||
import com.alibaba.datax.plugin.writer.mongodbwriter.KeyConstant;
|
import com.alibaba.datax.plugin.writer.mongodbwriter.KeyConstant;
|
||||||
import com.alibaba.datax.plugin.writer.mongodbwriter.MongoDBWriterErrorCode;
|
import com.alibaba.datax.plugin.writer.mongodbwriter.MongoDBWriterErrorCode;
|
||||||
import com.mongodb.MongoClient;
|
import com.mongodb.MongoClient;
|
||||||
|
import com.mongodb.MongoClientOptions;
|
||||||
import com.mongodb.MongoCredential;
|
import com.mongodb.MongoCredential;
|
||||||
import com.mongodb.ServerAddress;
|
import com.mongodb.ServerAddress;
|
||||||
|
|
||||||
|
import javax.net.ssl.*;
|
||||||
|
import java.io.FileInputStream;
|
||||||
import java.net.UnknownHostException;
|
import java.net.UnknownHostException;
|
||||||
|
import java.security.KeyManagementException;
|
||||||
|
import java.security.KeyStore;
|
||||||
|
import java.security.NoSuchAlgorithmException;
|
||||||
|
import java.security.SecureRandom;
|
||||||
|
import java.security.cert.CertificateException;
|
||||||
|
import java.security.cert.X509Certificate;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -50,6 +59,143 @@ public class MongoUtil {
|
|||||||
throw DataXException.asDataXException(MongoDBWriterErrorCode.UNEXCEPT_EXCEPTION,"未知异常");
|
throw DataXException.asDataXException(MongoDBWriterErrorCode.UNEXCEPT_EXCEPTION,"未知异常");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* two-way 双向SSL验证, DO NOT Use System.setProperty to set javax.net.ssl.trustStore, javax.net.ssl.keyStore
|
||||||
|
* @param conf
|
||||||
|
* @param userName
|
||||||
|
* @param password
|
||||||
|
* @param database
|
||||||
|
* @param trustStorePath 信赖的证书路径,.crt格式的证书需要转换为jdk支持的格式,如pkcs12
|
||||||
|
* @param trustStorePwd
|
||||||
|
* @param keyStorePath client端keystore,用于server端验证client身份
|
||||||
|
* @param keyStorePwd
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public static MongoClient initCredentialSSLMongoClient(Configuration conf,String userName,String password,
|
||||||
|
String database,String trustStorePath,String trustStorePwd,String keyStorePath,String keyStorePwd) {
|
||||||
|
List<Object> addressList = conf.getList(KeyConstant.MONGO_ADDRESS);
|
||||||
|
if(!isHostPortPattern(addressList)) {
|
||||||
|
throw DataXException.asDataXException(MongoDBWriterErrorCode.ILLEGAL_VALUE,"不合法参数");
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
SSLContext sslContext = SSLContext.getInstance("SSL");
|
||||||
|
// set up a KeyManager for validation of server side if required
|
||||||
|
KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType()); // usually is jks
|
||||||
|
FileInputStream myKeyStore = new FileInputStream(keyStorePath);
|
||||||
|
keyStore.load(myKeyStore,keyStorePwd.toCharArray());
|
||||||
|
myKeyStore.close();
|
||||||
|
KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory
|
||||||
|
.getDefaultAlgorithm()); // default SunX509
|
||||||
|
kmf.init(keyStore, keyStorePwd.toCharArray());
|
||||||
|
// set up a TrustManager that trusts everything
|
||||||
|
KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
|
||||||
|
TrustManagerFactory tmf = TrustManagerFactory
|
||||||
|
.getInstance(TrustManagerFactory.getDefaultAlgorithm()); // default PKIX
|
||||||
|
FileInputStream myTrustStore = new FileInputStream(trustStorePath);
|
||||||
|
trustStore.load(myTrustStore,trustStorePwd.toCharArray());
|
||||||
|
myTrustStore.close();
|
||||||
|
tmf.init(trustStore);
|
||||||
|
sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), new SecureRandom());
|
||||||
|
// set opts
|
||||||
|
MongoCredential credential = MongoCredential.createCredential(userName, database, password.toCharArray());
|
||||||
|
MongoClientOptions opts = MongoClientOptions.builder().
|
||||||
|
sslEnabled(true).socketFactory(sslContext.getSocketFactory()).sslInvalidHostNameAllowed(true).build();
|
||||||
|
return new MongoClient(parseServerAddress(addressList), Arrays.asList(credential), opts);
|
||||||
|
} catch (UnknownHostException e) {
|
||||||
|
throw DataXException.asDataXException(MongoDBWriterErrorCode.ILLEGAL_ADDRESS,"不合法的地址");
|
||||||
|
} catch (NumberFormatException e) {
|
||||||
|
throw DataXException.asDataXException(MongoDBWriterErrorCode.ILLEGAL_VALUE,"不合法参数");
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw DataXException.asDataXException(MongoDBWriterErrorCode.UNEXCEPT_EXCEPTION,"未知异常");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* client-authentication 单向验证,始终相信服务端
|
||||||
|
* @param conf
|
||||||
|
* @param userName
|
||||||
|
* @param password
|
||||||
|
* @param database
|
||||||
|
* @param keyStorePath
|
||||||
|
* @param keyStorePwd
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public static MongoClient initCredentialClientAuthenticationMongoClient(Configuration conf,String userName,
|
||||||
|
String password,String database,String keyStorePath,String keyStorePwd) {
|
||||||
|
List<Object> addressList = conf.getList(KeyConstant.MONGO_ADDRESS);
|
||||||
|
if(!isHostPortPattern(addressList)) {
|
||||||
|
throw DataXException.asDataXException(MongoDBWriterErrorCode.ILLEGAL_VALUE,"不合法参数");
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
SSLContext sslContext = SSLContext.getInstance("SSL");
|
||||||
|
// set up a KeyManager for validation of server side if required
|
||||||
|
KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType()); // usually is jks
|
||||||
|
FileInputStream myKeyStore = new FileInputStream(keyStorePath);
|
||||||
|
keyStore.load(myKeyStore,keyStorePwd.toCharArray());
|
||||||
|
myKeyStore.close();
|
||||||
|
KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory
|
||||||
|
.getDefaultAlgorithm()); // default SunX509
|
||||||
|
kmf.init(keyStore, keyStorePwd.toCharArray());
|
||||||
|
// set up a TrustManager that trusts everything
|
||||||
|
sslContext.init(kmf.getKeyManagers(), new TrustManager[] { new X509TrustManager() {
|
||||||
|
@Override
|
||||||
|
public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {}
|
||||||
|
@Override
|
||||||
|
public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {}
|
||||||
|
@Override
|
||||||
|
public X509Certificate[] getAcceptedIssuers() { return new X509Certificate[0]; }
|
||||||
|
}}, new SecureRandom());
|
||||||
|
MongoCredential credential = MongoCredential.createCredential(userName, database, password.toCharArray());
|
||||||
|
MongoClientOptions opts = MongoClientOptions.builder().
|
||||||
|
sslEnabled(true).socketFactory(sslContext.getSocketFactory()).sslInvalidHostNameAllowed(true).build();
|
||||||
|
return new MongoClient(parseServerAddress(addressList), Arrays.asList(credential), opts);
|
||||||
|
} catch (UnknownHostException e) {
|
||||||
|
throw DataXException.asDataXException(MongoDBWriterErrorCode.ILLEGAL_ADDRESS,"不合法的地址");
|
||||||
|
} catch (NumberFormatException e) {
|
||||||
|
throw DataXException.asDataXException(MongoDBWriterErrorCode.ILLEGAL_VALUE,"不合法参数");
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw DataXException.asDataXException(MongoDBWriterErrorCode.UNEXCEPT_EXCEPTION,"未知异常");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ssl通信,双向都不验证。不推荐。
|
||||||
|
* @param conf
|
||||||
|
* @param userName
|
||||||
|
* @param password
|
||||||
|
* @param database
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public static MongoClient initCredentialNoAuthenticationMongoClient(Configuration conf,String userName,String password,
|
||||||
|
String database) {
|
||||||
|
List<Object> addressList = conf.getList(KeyConstant.MONGO_ADDRESS);
|
||||||
|
if(!isHostPortPattern(addressList)) {
|
||||||
|
throw DataXException.asDataXException(MongoDBWriterErrorCode.ILLEGAL_VALUE,"不合法参数");
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
SSLContext sslContext = SSLContext.getInstance("SSL");
|
||||||
|
sslContext.init(null, new TrustManager[] { new X509TrustManager() {
|
||||||
|
@Override
|
||||||
|
public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {}
|
||||||
|
@Override
|
||||||
|
public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {}
|
||||||
|
@Override
|
||||||
|
public X509Certificate[] getAcceptedIssuers() { return new X509Certificate[0]; }
|
||||||
|
}}, new SecureRandom());
|
||||||
|
MongoCredential credential = MongoCredential.createCredential(userName, database, password.toCharArray());
|
||||||
|
MongoClientOptions opts = MongoClientOptions.builder().
|
||||||
|
sslEnabled(true).socketFactory(sslContext.getSocketFactory()).sslInvalidHostNameAllowed(true).build();
|
||||||
|
return new MongoClient(parseServerAddress(addressList), Arrays.asList(credential), opts);
|
||||||
|
} catch (UnknownHostException e) {
|
||||||
|
throw DataXException.asDataXException(MongoDBWriterErrorCode.ILLEGAL_ADDRESS,"不合法的地址");
|
||||||
|
} catch (NumberFormatException e) {
|
||||||
|
throw DataXException.asDataXException(MongoDBWriterErrorCode.ILLEGAL_VALUE,"不合法参数");
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw DataXException.asDataXException(MongoDBWriterErrorCode.UNEXCEPT_EXCEPTION,"未知异常");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 判断地址类型是否符合要求
|
* 判断地址类型是否符合要求
|
||||||
* @param addressList
|
* @param addressList
|
||||||
|
Loading…
Reference in New Issue
Block a user