diff --git a/mongodbreader/doc/mongodbreader.md b/mongodbreader/doc/mongodbreader.md index 297e598c..ee803fcc 100644 --- a/mongodbreader/doc/mongodbreader.md +++ b/mongodbreader/doc/mongodbreader.md @@ -27,6 +27,12 @@ MongoDBReader通过Datax框架从MongoDB并行的读取数据,通过主控的J "userPassword": "", "dbName": "tag_per_data", "collectionName": "tag_data12", + "authDb": "authDb", + "sslMode": "two-way", + "trustStorePath": "/path/to/trustStore", + "trustStorePwd": "", + "keyStorePath": "/path/to/keyStore", + "keyStorePwd": "", "column": [ { "name": "unique_id", @@ -128,6 +134,12 @@ MongoDBReader通过Datax框架从MongoDB并行的读取数据,通过主控的J * userPassword: MongoDB的密码。【选填】 * authDb: MongoDB认证数据库【选填】 * collectionName: MonogoDB的集合名。【必填】 +* authDb: MongoDB用户的鉴权数据库。【选填】 +* sslMode: MongoDB ssl/tls 加密访问选项,支持 two-way(双向验证) | client-authentication(服务端确认客服端身份,客服端直接信赖服务端) | no-authentication(双方均不验证证书,直接新任) 【选填】 +* trustStorePath: 信赖的证书库路径,client-authentication时不需要。【选填】 +* trustStorePwd: 信赖的证书库密码。【选填】 +* keyStorePath: client端证书和密钥库。【选填】 +* keyStorePwd: client端证书和密钥库密码。【选填】 * column:MongoDB的文档列名。【必填】 * name:Column的名字。【必填】 * type:Column的类型。【选填】 diff --git a/mongodbreader/src/main/java/com/alibaba/datax/plugin/reader/mongodbreader/KeyConstant.java b/mongodbreader/src/main/java/com/alibaba/datax/plugin/reader/mongodbreader/KeyConstant.java index fbc83d51..1da3e7dd 100644 --- a/mongodbreader/src/main/java/com/alibaba/datax/plugin/reader/mongodbreader/KeyConstant.java +++ b/mongodbreader/src/main/java/com/alibaba/datax/plugin/reader/mongodbreader/KeyConstant.java @@ -94,4 +94,28 @@ public class KeyConstant { public static boolean isDocumentType(String type) { return type.startsWith(DOCUMENT_TYPE); } + + /** + * sslMode: two-way | client-authentication | no-authentication + */ + public static final String SSL_MODE = "sslMode"; + /** + * trustStore path + */ + public static final String TRUST_STORE_PATH = "trustStorePath"; + /** + * trustStore password + */ + public static final String TRUST_STORE_PWD = "trustStorePwd"; + /** + * keyStore path + */ + public static final String KEY_STORE_PATH = "keyStorePath"; + /** + * keyStore password + */ + public static final String KEY_STORE_PWD = "keyStorePwd"; + public static boolean isValueTrue(String value){ + return "true".equals(value); + } } diff --git a/mongodbreader/src/main/java/com/alibaba/datax/plugin/reader/mongodbreader/MongoDBReader.java b/mongodbreader/src/main/java/com/alibaba/datax/plugin/reader/mongodbreader/MongoDBReader.java index 4d129a5a..79d350d3 100644 --- a/mongodbreader/src/main/java/com/alibaba/datax/plugin/reader/mongodbreader/MongoDBReader.java +++ b/mongodbreader/src/main/java/com/alibaba/datax/plugin/reader/mongodbreader/MongoDBReader.java @@ -1,10 +1,6 @@ package com.alibaba.datax.plugin.reader.mongodbreader; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Date; -import java.util.Iterator; -import java.util.List; +import java.util.*; import com.alibaba.datax.common.element.BoolColumn; import com.alibaba.datax.common.element.DateColumn; @@ -59,11 +55,37 @@ public class MongoDBReader extends Reader { this.password = originalConfig.getString(KeyConstant.MONGO_USER_PASSWORD, originalConfig.getString(KeyConstant.MONGO_PASSWORD)); String database = originalConfig.getString(KeyConstant.MONGO_DB_NAME, originalConfig.getString(KeyConstant.MONGO_DATABASE)); String authDb = originalConfig.getString(KeyConstant.MONGO_AUTHDB, database); - if(!Strings.isNullOrEmpty(this.userName) && !Strings.isNullOrEmpty(this.password)) { - this.mongoClient = MongoUtil.initCredentialMongoClient(originalConfig,userName,password,authDb); - } else { - this.mongoClient = MongoUtil.initMongoClient(originalConfig); + String sslMode = originalConfig.getString(KeyConstant.SSL_MODE); + String trustStorePath = originalConfig.getString(KeyConstant.TRUST_STORE_PATH); + String trustStorePwd = originalConfig.getString(KeyConstant.TRUST_STORE_PWD); + String keyStorePath = originalConfig.getString(KeyConstant.KEY_STORE_PATH); + String keyStorePwd = originalConfig.getString(KeyConstant.KEY_STORE_PWD); + Map map = new HashMap(); + map.put("two-way", 1); + map.put("client-authentication", 2); + map.put("no-authentication", 3); + switch (map.get(sslMode)) { + case 1: + this.mongoClient = MongoUtil.initCredentialSSLMongoClient(this.originalConfig, + userName,password,authDb,trustStorePath,trustStorePwd,keyStorePath,keyStorePwd); + break; + case 2: + this.mongoClient = MongoUtil.initCredentialClientAuthenticationMongoClient(this.originalConfig, + userName,password,authDb,keyStorePath,keyStorePwd); + break; + case 3: + this.mongoClient = MongoUtil.initCredentialNoAuthenticationMongoClient(this.originalConfig, + userName,password,authDb); + break; + default: + if(!Strings.isNullOrEmpty(this.userName) && !Strings.isNullOrEmpty(this.password)) { + this.mongoClient = MongoUtil.initCredentialMongoClient(originalConfig,userName,password,authDb); + } else { + this.mongoClient = MongoUtil.initMongoClient(originalConfig); + } + break; } + } @Override @@ -189,12 +211,36 @@ public class MongoDBReader extends Reader { this.password = readerSliceConfig.getString(KeyConstant.MONGO_USER_PASSWORD, readerSliceConfig.getString(KeyConstant.MONGO_PASSWORD)); this.database = readerSliceConfig.getString(KeyConstant.MONGO_DB_NAME, readerSliceConfig.getString(KeyConstant.MONGO_DATABASE)); this.authDb = readerSliceConfig.getString(KeyConstant.MONGO_AUTHDB, this.database); - if(!Strings.isNullOrEmpty(userName) && !Strings.isNullOrEmpty(password)) { - mongoClient = MongoUtil.initCredentialMongoClient(readerSliceConfig,userName,password,authDb); - } else { - mongoClient = MongoUtil.initMongoClient(readerSliceConfig); + String sslMode = readerSliceConfig.getString(KeyConstant.SSL_MODE); + String trustStorePath = readerSliceConfig.getString(KeyConstant.TRUST_STORE_PATH); + String trustStorePwd = readerSliceConfig.getString(KeyConstant.TRUST_STORE_PWD); + String keyStorePath = readerSliceConfig.getString(KeyConstant.KEY_STORE_PATH); + String keyStorePwd = readerSliceConfig.getString(KeyConstant.KEY_STORE_PWD); + Map map = new HashMap(); + map.put("two-way", 1); + map.put("client-authentication", 2); + map.put("no-authentication", 3); + switch (map.get(sslMode)) { + case 1: + mongoClient = MongoUtil.initCredentialSSLMongoClient(this.readerSliceConfig, + userName,password,authDb,trustStorePath,trustStorePwd,keyStorePath,keyStorePwd); + break; + case 2: + mongoClient = MongoUtil.initCredentialClientAuthenticationMongoClient(this.readerSliceConfig, + userName,password,authDb,keyStorePath,keyStorePwd); + break; + case 3: + mongoClient = MongoUtil.initCredentialNoAuthenticationMongoClient(this.readerSliceConfig, + userName,password,authDb); + break; + default: + if(!Strings.isNullOrEmpty(userName) && !Strings.isNullOrEmpty(password)) { + mongoClient = MongoUtil.initCredentialMongoClient(readerSliceConfig,userName,password,authDb); + } else { + mongoClient = MongoUtil.initMongoClient(readerSliceConfig); + } + break; } - this.collection = readerSliceConfig.getString(KeyConstant.MONGO_COLLECTION_NAME); this.query = readerSliceConfig.getString(KeyConstant.MONGO_QUERY); this.mongodbColumnMeta = JSON.parseArray(readerSliceConfig.getString(KeyConstant.MONGO_COLUMN)); diff --git a/mongodbreader/src/main/java/com/alibaba/datax/plugin/reader/mongodbreader/util/MongoUtil.java b/mongodbreader/src/main/java/com/alibaba/datax/plugin/reader/mongodbreader/util/MongoUtil.java index ae7a2dd3..b88588cf 100644 --- a/mongodbreader/src/main/java/com/alibaba/datax/plugin/reader/mongodbreader/util/MongoUtil.java +++ b/mongodbreader/src/main/java/com/alibaba/datax/plugin/reader/mongodbreader/util/MongoUtil.java @@ -1,6 +1,11 @@ package com.alibaba.datax.plugin.reader.mongodbreader.util; +import java.io.FileInputStream; import java.net.UnknownHostException; +import java.security.KeyStore; +import java.security.SecureRandom; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -11,9 +16,12 @@ import com.alibaba.datax.plugin.reader.mongodbreader.KeyConstant; import com.alibaba.datax.plugin.reader.mongodbreader.MongoDBReaderErrorCode; import com.mongodb.MongoClient; +import com.mongodb.MongoClientOptions; import com.mongodb.MongoCredential; import com.mongodb.ServerAddress; +import javax.net.ssl.*; + /** * Created by jianying.wcj on 2015/3/17 0017. * Modified by mingyan.zc on 2016/6/13. @@ -55,6 +63,143 @@ public class MongoUtil { throw DataXException.asDataXException(MongoDBReaderErrorCode.UNEXCEPT_EXCEPTION,"未知异常"); } } + + /** + * two-way 双向SSL验证, DO NOT Use System.setProperty to set javax.net.ssl.trustStore, javax.net.ssl.keyStore + * @param conf + * @param userName + * @param password + * @param database + * @param trustStorePath 信赖的证书库 + * @param trustStorePwd + * @param keyStorePath client端keystore,用于server端验证client身份 + * @param keyStorePwd + * @return + */ + public static MongoClient initCredentialSSLMongoClient(Configuration conf,String userName,String password, + String database,String trustStorePath,String trustStorePwd,String keyStorePath,String keyStorePwd) { + List addressList = conf.getList(KeyConstant.MONGO_ADDRESS); + if(!isHostPortPattern(addressList)) { + throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE,"不合法参数"); + } + try { + SSLContext sslContext = SSLContext.getInstance("SSL"); + // set up a KeyManager for validation of server side if required + KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType()); // usually is jks + FileInputStream myKeyStore = new FileInputStream(keyStorePath); + keyStore.load(myKeyStore,keyStorePwd.toCharArray()); + myKeyStore.close(); + KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory + .getDefaultAlgorithm()); // default SunX509 + kmf.init(keyStore, keyStorePwd.toCharArray()); + // set up a TrustManager that trusts everything + KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType()); + TrustManagerFactory tmf = TrustManagerFactory + .getInstance(TrustManagerFactory.getDefaultAlgorithm()); + FileInputStream myTrustStore = new FileInputStream(trustStorePath); + trustStore.load(myTrustStore,trustStorePwd.toCharArray()); + myTrustStore.close(); + tmf.init(trustStore); + sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), new SecureRandom()); + // set opts + MongoCredential credential = MongoCredential.createCredential(userName, database, password.toCharArray()); + MongoClientOptions opts = MongoClientOptions.builder(). + sslEnabled(true).socketFactory(sslContext.getSocketFactory()).sslInvalidHostNameAllowed(true).build(); + return new MongoClient(parseServerAddress(addressList), Arrays.asList(credential), opts); + } catch (UnknownHostException e) { + throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_ADDRESS,"不合法的地址"); + } catch (NumberFormatException e) { + throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE,"不合法参数"); + } catch (Exception e) { + throw DataXException.asDataXException(MongoDBReaderErrorCode.UNEXCEPT_EXCEPTION,"未知异常"); + } + } + + /** + * client-authentication 单向验证,始终相信服务端 + * @param conf + * @param userName + * @param password + * @param database + * @param keyStorePath + * @param keyStorePwd + * @return + */ + public static MongoClient initCredentialClientAuthenticationMongoClient(Configuration conf,String userName, + String password,String database,String keyStorePath,String keyStorePwd) { + List addressList = conf.getList(KeyConstant.MONGO_ADDRESS); + if(!isHostPortPattern(addressList)) { + throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE,"不合法参数"); + } + try { + SSLContext sslContext = SSLContext.getInstance("SSL"); + // set up a KeyManager for validation of server side if required + KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType()); // usually is jks + FileInputStream myKeyStore = new FileInputStream(keyStorePath); + keyStore.load(myKeyStore,keyStorePwd.toCharArray()); + myKeyStore.close(); + KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory + .getDefaultAlgorithm()); // default SunX509 + kmf.init(keyStore, keyStorePwd.toCharArray()); + // set up a TrustManager that trusts everything + sslContext.init(kmf.getKeyManagers(), new TrustManager[] { new X509TrustManager() { + @Override + public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {} + @Override + public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {} + @Override + public X509Certificate[] getAcceptedIssuers() { return new X509Certificate[0]; } + }}, new SecureRandom()); + MongoCredential credential = MongoCredential.createCredential(userName, database, password.toCharArray()); + MongoClientOptions opts = MongoClientOptions.builder(). + sslEnabled(true).socketFactory(sslContext.getSocketFactory()).sslInvalidHostNameAllowed(true).build(); + return new MongoClient(parseServerAddress(addressList), Arrays.asList(credential), opts); + } catch (UnknownHostException e) { + throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_ADDRESS,"不合法的地址"); + } catch (NumberFormatException e) { + throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE,"不合法参数"); + } catch (Exception e) { + throw DataXException.asDataXException(MongoDBReaderErrorCode.UNEXCEPT_EXCEPTION,"未知异常"); + } + } + + /** + * ssl通信,双向都不验证。不推荐。 + * @param conf + * @param userName + * @param password + * @param database + * @return + */ + public static MongoClient initCredentialNoAuthenticationMongoClient(Configuration conf,String userName,String password, + String database) { + List addressList = conf.getList(KeyConstant.MONGO_ADDRESS); + if(!isHostPortPattern(addressList)) { + throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE,"不合法参数"); + } + try { + SSLContext sslContext = SSLContext.getInstance("SSL"); + sslContext.init(null, new TrustManager[] { new X509TrustManager() { + @Override + public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {} + @Override + public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {} + @Override + public X509Certificate[] getAcceptedIssuers() { return new X509Certificate[0]; } + }}, new SecureRandom()); + MongoCredential credential = MongoCredential.createCredential(userName, database, password.toCharArray()); + MongoClientOptions opts = MongoClientOptions.builder(). + sslEnabled(true).socketFactory(sslContext.getSocketFactory()).sslInvalidHostNameAllowed(true).build(); + return new MongoClient(parseServerAddress(addressList), Arrays.asList(credential), opts); + } catch (UnknownHostException e) { + throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_ADDRESS,"不合法的地址"); + } catch (NumberFormatException e) { + throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE,"不合法参数"); + } catch (Exception e) { + throw DataXException.asDataXException(MongoDBReaderErrorCode.UNEXCEPT_EXCEPTION,"未知异常"); + } + } + /** * 判断地址类型是否符合要求 * @param addressList diff --git a/mongodbwriter/doc/mongodbwriter.md b/mongodbwriter/doc/mongodbwriter.md index 93f50290..62976281 100644 --- a/mongodbwriter/doc/mongodbwriter.md +++ b/mongodbwriter/doc/mongodbwriter.md @@ -55,6 +55,12 @@ MongoDBWriter通过Datax框架获取Reader生成的数据,然后将Datax支持 "userPassword": "", "dbName": "tag_per_data", "collectionName": "tag_data", + "authDb": "authDb", + "sslMode": "two-way", + "trustStorePath": "/path/to/trustStore", + "trustStorePwd": "", + "keyStorePath": "/path/to/keyStore", + "keyStorePwd": "", "column": [ { "name": "unique_id", @@ -133,6 +139,12 @@ MongoDBWriter通过Datax框架获取Reader生成的数据,然后将Datax支持 * userName:MongoDB的用户名。【选填】 * userPassword: MongoDB的密码。【选填】 * collectionName: MonogoDB的集合名。【必填】 +* authDb: MongoDB用户的鉴权数据库。【选填】 +* sslMode: MongoDB ssl/tls 加密访问选项,支持 two-way(双向验证) | client-authentication(服务端确认客服端身份,客服端直接信赖服务端) | no-authentication(双方均不验证证书,直接新任) 【选填】 +* trustStorePath: 信赖的证书库路径,client-authentication时不需要。【选填】 +* trustStorePwd: 信赖的证书库密码。【选填】 +* keyStorePath: client端证书和密钥库。【选填】 +* keyStorePwd: client端证书和密钥库密码。【选填】 * column:MongoDB的文档列名。【必填】 * name:Column的名字。【必填】 * type:Column的类型。【必填】 diff --git a/mongodbwriter/src/main/java/com/alibaba/datax/plugin/writer/mongodbwriter/KeyConstant.java b/mongodbwriter/src/main/java/com/alibaba/datax/plugin/writer/mongodbwriter/KeyConstant.java index 40de3124..4b921e5b 100644 --- a/mongodbwriter/src/main/java/com/alibaba/datax/plugin/writer/mongodbwriter/KeyConstant.java +++ b/mongodbwriter/src/main/java/com/alibaba/datax/plugin/writer/mongodbwriter/KeyConstant.java @@ -25,6 +25,7 @@ public class KeyConstant { * mongodb 数据库名 */ public static final String MONGO_DB_NAME = "dbName"; + public static final String MONGO_AUTHDB = "authDb"; /** * mongodb 集合名 */ @@ -61,6 +62,26 @@ public class KeyConstant { * 指定用来判断是否覆盖的 业务主键 */ public static final String UNIQUE_KEY = "replaceKey"; + /** + * sslMode: two-way | client-authentication | no-authentication + */ + public static final String SSL_MODE = "sslMode"; + /** + * trustStore path + */ + public static final String TRUST_STORE_PATH = "trustStorePath"; + /** + * trustStore password + */ + public static final String TRUST_STORE_PWD = "trustStorePwd"; + /** + * keyStore path + */ + public static final String KEY_STORE_PATH = "keyStorePath"; + /** + * keyStore password + */ + public static final String KEY_STORE_PWD = "keyStorePwd"; /** * 判断是否为数组类型 * @param type 数据类型 diff --git a/mongodbwriter/src/main/java/com/alibaba/datax/plugin/writer/mongodbwriter/MongoDBWriter.java b/mongodbwriter/src/main/java/com/alibaba/datax/plugin/writer/mongodbwriter/MongoDBWriter.java index 76f35a40..b1343345 100644 --- a/mongodbwriter/src/main/java/com/alibaba/datax/plugin/writer/mongodbwriter/MongoDBWriter.java +++ b/mongodbwriter/src/main/java/com/alibaba/datax/plugin/writer/mongodbwriter/MongoDBWriter.java @@ -22,7 +22,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; public class MongoDBWriter extends Writer{ @@ -320,10 +322,40 @@ public class MongoDBWriter extends Writer{ this.userName = writerSliceConfig.getString(KeyConstant.MONGO_USER_NAME); this.password = writerSliceConfig.getString(KeyConstant.MONGO_USER_PASSWORD); this.database = writerSliceConfig.getString(KeyConstant.MONGO_DB_NAME); - if(!Strings.isNullOrEmpty(userName) && !Strings.isNullOrEmpty(password)) { - this.mongoClient = MongoUtil.initCredentialMongoClient(this.writerSliceConfig,userName,password,database); - } else { - this.mongoClient = MongoUtil.initMongoClient(this.writerSliceConfig); + String authDb = writerSliceConfig.getString(KeyConstant.MONGO_AUTHDB); + String sslMode = writerSliceConfig.getString(KeyConstant.SSL_MODE); + String trustStorePath = writerSliceConfig.getString(KeyConstant.TRUST_STORE_PATH); + String trustStorePwd = writerSliceConfig.getString(KeyConstant.TRUST_STORE_PWD); + String keyStorePath = writerSliceConfig.getString(KeyConstant.KEY_STORE_PATH); + String keyStorePwd = writerSliceConfig.getString(KeyConstant.KEY_STORE_PWD); + if(authDb == ""){ + authDb = this.database; + } + Map map = new HashMap(); + map.put("two-way", 1); + map.put("client-authentication", 2); + map.put("no-authentication", 3); + switch (map.get(sslMode)) { + case 1: + this.mongoClient = MongoUtil.initCredentialSSLMongoClient(this.writerSliceConfig, + userName,password,authDb,trustStorePath,trustStorePwd,keyStorePath,keyStorePwd); + break; + case 2: + this.mongoClient = MongoUtil.initCredentialClientAuthenticationMongoClient(this.writerSliceConfig, + userName,password,authDb,keyStorePath,keyStorePwd); + break; + case 3: + this.mongoClient = MongoUtil.initCredentialNoAuthenticationMongoClient(this.writerSliceConfig, + userName,password,authDb); + break; + default: + if(!Strings.isNullOrEmpty(userName) && !Strings.isNullOrEmpty(password)) { + this.mongoClient = MongoUtil.initCredentialMongoClient(this.writerSliceConfig,userName,password,authDb); + } else { + this.mongoClient = MongoUtil.initMongoClient(this.writerSliceConfig); + } + break; + } this.collection = writerSliceConfig.getString(KeyConstant.MONGO_COLLECTION_NAME); this.batchSize = BATCH_SIZE; diff --git a/mongodbwriter/src/main/java/com/alibaba/datax/plugin/writer/mongodbwriter/util/MongoUtil.java b/mongodbwriter/src/main/java/com/alibaba/datax/plugin/writer/mongodbwriter/util/MongoUtil.java index 17334be4..a8865335 100644 --- a/mongodbwriter/src/main/java/com/alibaba/datax/plugin/writer/mongodbwriter/util/MongoUtil.java +++ b/mongodbwriter/src/main/java/com/alibaba/datax/plugin/writer/mongodbwriter/util/MongoUtil.java @@ -5,10 +5,19 @@ import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.plugin.writer.mongodbwriter.KeyConstant; import com.alibaba.datax.plugin.writer.mongodbwriter.MongoDBWriterErrorCode; import com.mongodb.MongoClient; +import com.mongodb.MongoClientOptions; import com.mongodb.MongoCredential; import com.mongodb.ServerAddress; +import javax.net.ssl.*; +import java.io.FileInputStream; import java.net.UnknownHostException; +import java.security.KeyManagementException; +import java.security.KeyStore; +import java.security.NoSuchAlgorithmException; +import java.security.SecureRandom; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -50,6 +59,143 @@ public class MongoUtil { throw DataXException.asDataXException(MongoDBWriterErrorCode.UNEXCEPT_EXCEPTION,"未知异常"); } } + + /** + * two-way 双向SSL验证, DO NOT Use System.setProperty to set javax.net.ssl.trustStore, javax.net.ssl.keyStore + * @param conf + * @param userName + * @param password + * @param database + * @param trustStorePath 信赖的证书路径,.crt格式的证书需要转换为jdk支持的格式,如pkcs12 + * @param trustStorePwd + * @param keyStorePath client端keystore,用于server端验证client身份 + * @param keyStorePwd + * @return + */ + public static MongoClient initCredentialSSLMongoClient(Configuration conf,String userName,String password, + String database,String trustStorePath,String trustStorePwd,String keyStorePath,String keyStorePwd) { + List addressList = conf.getList(KeyConstant.MONGO_ADDRESS); + if(!isHostPortPattern(addressList)) { + throw DataXException.asDataXException(MongoDBWriterErrorCode.ILLEGAL_VALUE,"不合法参数"); + } + try { + SSLContext sslContext = SSLContext.getInstance("SSL"); + // set up a KeyManager for validation of server side if required + KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType()); // usually is jks + FileInputStream myKeyStore = new FileInputStream(keyStorePath); + keyStore.load(myKeyStore,keyStorePwd.toCharArray()); + myKeyStore.close(); + KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory + .getDefaultAlgorithm()); // default SunX509 + kmf.init(keyStore, keyStorePwd.toCharArray()); + // set up a TrustManager that trusts everything + KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType()); + TrustManagerFactory tmf = TrustManagerFactory + .getInstance(TrustManagerFactory.getDefaultAlgorithm()); // default PKIX + FileInputStream myTrustStore = new FileInputStream(trustStorePath); + trustStore.load(myTrustStore,trustStorePwd.toCharArray()); + myTrustStore.close(); + tmf.init(trustStore); + sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), new SecureRandom()); + // set opts + MongoCredential credential = MongoCredential.createCredential(userName, database, password.toCharArray()); + MongoClientOptions opts = MongoClientOptions.builder(). + sslEnabled(true).socketFactory(sslContext.getSocketFactory()).sslInvalidHostNameAllowed(true).build(); + return new MongoClient(parseServerAddress(addressList), Arrays.asList(credential), opts); + } catch (UnknownHostException e) { + throw DataXException.asDataXException(MongoDBWriterErrorCode.ILLEGAL_ADDRESS,"不合法的地址"); + } catch (NumberFormatException e) { + throw DataXException.asDataXException(MongoDBWriterErrorCode.ILLEGAL_VALUE,"不合法参数"); + } catch (Exception e) { + throw DataXException.asDataXException(MongoDBWriterErrorCode.UNEXCEPT_EXCEPTION,"未知异常"); + } + } + + /** + * client-authentication 单向验证,始终相信服务端 + * @param conf + * @param userName + * @param password + * @param database + * @param keyStorePath + * @param keyStorePwd + * @return + */ + public static MongoClient initCredentialClientAuthenticationMongoClient(Configuration conf,String userName, + String password,String database,String keyStorePath,String keyStorePwd) { + List addressList = conf.getList(KeyConstant.MONGO_ADDRESS); + if(!isHostPortPattern(addressList)) { + throw DataXException.asDataXException(MongoDBWriterErrorCode.ILLEGAL_VALUE,"不合法参数"); + } + try { + SSLContext sslContext = SSLContext.getInstance("SSL"); + // set up a KeyManager for validation of server side if required + KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType()); // usually is jks + FileInputStream myKeyStore = new FileInputStream(keyStorePath); + keyStore.load(myKeyStore,keyStorePwd.toCharArray()); + myKeyStore.close(); + KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory + .getDefaultAlgorithm()); // default SunX509 + kmf.init(keyStore, keyStorePwd.toCharArray()); + // set up a TrustManager that trusts everything + sslContext.init(kmf.getKeyManagers(), new TrustManager[] { new X509TrustManager() { + @Override + public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {} + @Override + public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {} + @Override + public X509Certificate[] getAcceptedIssuers() { return new X509Certificate[0]; } + }}, new SecureRandom()); + MongoCredential credential = MongoCredential.createCredential(userName, database, password.toCharArray()); + MongoClientOptions opts = MongoClientOptions.builder(). + sslEnabled(true).socketFactory(sslContext.getSocketFactory()).sslInvalidHostNameAllowed(true).build(); + return new MongoClient(parseServerAddress(addressList), Arrays.asList(credential), opts); + } catch (UnknownHostException e) { + throw DataXException.asDataXException(MongoDBWriterErrorCode.ILLEGAL_ADDRESS,"不合法的地址"); + } catch (NumberFormatException e) { + throw DataXException.asDataXException(MongoDBWriterErrorCode.ILLEGAL_VALUE,"不合法参数"); + } catch (Exception e) { + throw DataXException.asDataXException(MongoDBWriterErrorCode.UNEXCEPT_EXCEPTION,"未知异常"); + } + } + + /** + * ssl通信,双向都不验证。不推荐。 + * @param conf + * @param userName + * @param password + * @param database + * @return + */ + public static MongoClient initCredentialNoAuthenticationMongoClient(Configuration conf,String userName,String password, + String database) { + List addressList = conf.getList(KeyConstant.MONGO_ADDRESS); + if(!isHostPortPattern(addressList)) { + throw DataXException.asDataXException(MongoDBWriterErrorCode.ILLEGAL_VALUE,"不合法参数"); + } + try { + SSLContext sslContext = SSLContext.getInstance("SSL"); + sslContext.init(null, new TrustManager[] { new X509TrustManager() { + @Override + public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {} + @Override + public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {} + @Override + public X509Certificate[] getAcceptedIssuers() { return new X509Certificate[0]; } + }}, new SecureRandom()); + MongoCredential credential = MongoCredential.createCredential(userName, database, password.toCharArray()); + MongoClientOptions opts = MongoClientOptions.builder(). + sslEnabled(true).socketFactory(sslContext.getSocketFactory()).sslInvalidHostNameAllowed(true).build(); + return new MongoClient(parseServerAddress(addressList), Arrays.asList(credential), opts); + } catch (UnknownHostException e) { + throw DataXException.asDataXException(MongoDBWriterErrorCode.ILLEGAL_ADDRESS,"不合法的地址"); + } catch (NumberFormatException e) { + throw DataXException.asDataXException(MongoDBWriterErrorCode.ILLEGAL_VALUE,"不合法参数"); + } catch (Exception e) { + throw DataXException.asDataXException(MongoDBWriterErrorCode.UNEXCEPT_EXCEPTION,"未知异常"); + } + } + /** * 判断地址类型是否符合要求 * @param addressList