diff --git a/core/src/main/conf/.secret.properties b/core/src/main/conf/.secret.properties index b807f8ad..43e1975b 100755 --- a/core/src/main/conf/.secret.properties +++ b/core/src/main/conf/.secret.properties @@ -6,4 +6,4 @@ current.publicKey= current.privateKey= current.service.username= current.service.password= - +db.encrypt.key= diff --git a/core/src/main/java/com/alibaba/datax/core/Engine.java b/core/src/main/java/com/alibaba/datax/core/Engine.java index 4ba9fc18..6942f484 100755 --- a/core/src/main/java/com/alibaba/datax/core/Engine.java +++ b/core/src/main/java/com/alibaba/datax/core/Engine.java @@ -9,10 +9,7 @@ import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.common.util.MessageSource; import com.alibaba.datax.core.job.JobContainer; import com.alibaba.datax.core.taskgroup.TaskGroupContainer; -import com.alibaba.datax.core.util.ConfigParser; -import com.alibaba.datax.core.util.ConfigurationValidate; -import com.alibaba.datax.core.util.ExceptionTracker; -import com.alibaba.datax.core.util.FrameworkErrorCode; +import com.alibaba.datax.core.util.*; import com.alibaba.datax.core.util.container.CoreConstant; import com.alibaba.datax.core.util.container.LoadUtil; import org.apache.commons.cli.BasicParser; @@ -50,11 +47,12 @@ public class Engine { boolean isJob = !("taskGroup".equalsIgnoreCase(allConf .getString(CoreConstant.DATAX_CORE_CONTAINER_MODEL))); //JobContainer会在schedule后再行进行设置和调整值 - int channelNumber =0; + int channelNumber = 0; AbstractContainer container; long instanceId; int taskGroupId = -1; if (isJob) { + JobDataBasePwdDecryptUtil.decrypt(allConf); allConf.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_MODE, RUNTIME_MODE); container = new JobContainer(allConf); instanceId = allConf.getLong( @@ -75,14 +73,14 @@ public class Engine { boolean perfReportEnable = allConf.getBool(CoreConstant.DATAX_CORE_REPORT_DATAX_PERFLOG, true); //standalone模式的 datax shell任务不进行汇报 - if(instanceId == -1){ + if (instanceId == -1) { perfReportEnable = false; } Configuration jobInfoConfig = allConf.getConfiguration(CoreConstant.DATAX_JOB_JOBINFO); //初始化PerfTrace PerfTrace perfTrace = PerfTrace.getInstance(isJob, instanceId, taskGroupId, traceEnable); - perfTrace.setJobInfo(jobInfoConfig,perfReportEnable,channelNumber); + perfTrace.setJobInfo(jobInfoConfig, perfReportEnable, channelNumber); container.start(); } @@ -96,12 +94,12 @@ public class Engine { filterSensitiveConfiguration(jobContent); - jobConfWithSetting.set("content",jobContent); + jobConfWithSetting.set("content", jobContent); return jobConfWithSetting.beautify(); } - public static Configuration filterSensitiveConfiguration(Configuration configuration){ + public static Configuration filterSensitiveConfiguration(Configuration configuration) { Set keys = configuration.getKeys(); for (final String key : keys) { boolean isSensitive = StringUtils.endsWithIgnoreCase(key, "password") @@ -171,8 +169,8 @@ public class Engine { /** * -1 表示未能解析到 jobId - * - * only for dsc & ds & datax 3 update + *

+ * only for dsc & ds & datax 3 update */ private static long parseJobIdFromUrl(List patternStringList, String url) { long result = -1; diff --git a/core/src/main/java/com/alibaba/datax/core/util/JobDataBasePwdDecryptUtil.java b/core/src/main/java/com/alibaba/datax/core/util/JobDataBasePwdDecryptUtil.java new file mode 100644 index 00000000..c0a00c29 --- /dev/null +++ b/core/src/main/java/com/alibaba/datax/core/util/JobDataBasePwdDecryptUtil.java @@ -0,0 +1,37 @@ +package com.alibaba.datax.core.util; + +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.core.util.container.CoreConstant; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * 数据库密码解密 + * + * @Author weizhao.dong + * @Date 2023/3/23 14:37 + * @Version 1.0 + */ +public class JobDataBasePwdDecryptUtil { + private static final Logger LOG = LoggerFactory.getLogger(JobDataBasePwdDecryptUtil.class); + + public static void decrypt(Configuration configuration) { + if (configuration.getBool(CoreConstant.DATAX_JOB_SETTING_PASSWD_ENCRYPT, false)) { + String readerPwd = configuration.getString(CoreConstant.DATA_JOB_READER_PARAMETER_PASSWORD); + String writePwd = configuration.getString(CoreConstant.DATA_JOB_WRITER_PARAMETER_PASSWORD); + //加密key + String key = SecretUtil.getSecurityProperties().getProperty(CoreConstant.DB_ENCRYPT_KEY); + if (StringUtils.isEmpty(key)) { + LOG.warn("{} is empty,use original password", CoreConstant.DB_ENCRYPT_KEY); + return; + } + if (StringUtils.isNotEmpty(readerPwd)) { + configuration.set(CoreConstant.DATA_JOB_READER_PARAMETER_PASSWORD, SecretUtil.decrypt3DES(readerPwd, key)); + } + if (StringUtils.isNotEmpty(writePwd)) { + configuration.set(CoreConstant.DATA_JOB_WRITER_PARAMETER_PASSWORD, SecretUtil.decrypt3DES(writePwd, key)); + } + } + } +} diff --git a/core/src/main/java/com/alibaba/datax/core/util/SecretUtil.java b/core/src/main/java/com/alibaba/datax/core/util/SecretUtil.java index 1a576aaa..e19cfbab 100755 --- a/core/src/main/java/com/alibaba/datax/core/util/SecretUtil.java +++ b/core/src/main/java/com/alibaba/datax/core/util/SecretUtil.java @@ -437,4 +437,13 @@ public class SecretUtil { } return versionKeyMap; } + + public static void main(String[] args) { + String key="1qaz2wsx"; + String passwd="BrPN#dEzqm"; + String encrypt= SecretUtil.encrypt3DES(passwd,key); + System.out.println("encrypt = " + encrypt); + System.out.println(SecretUtil.decrypt3DES(encrypt,key)); + + } } diff --git a/core/src/main/java/com/alibaba/datax/core/util/container/CoreConstant.java b/core/src/main/java/com/alibaba/datax/core/util/container/CoreConstant.java index a1ca164d..2f30dbb9 100755 --- a/core/src/main/java/com/alibaba/datax/core/util/container/CoreConstant.java +++ b/core/src/main/java/com/alibaba/datax/core/util/container/CoreConstant.java @@ -97,6 +97,8 @@ public class CoreConstant { public static final String DATAX_JOB_SETTING_DRYRUN = "job.setting.dryRun"; + public static final String DATAX_JOB_SETTING_PASSWD_ENCRYPT = "job.setting.passwdEncrypt"; + public static final String DATAX_JOB_PREHANDLER_PLUGINTYPE = "job.preHandler.pluginType"; public static final String DATAX_JOB_PREHANDLER_PLUGINNAME = "job.preHandler.pluginName"; @@ -104,6 +106,13 @@ public class CoreConstant { public static final String DATAX_JOB_POSTHANDLER_PLUGINTYPE = "job.postHandler.pluginType"; public static final String DATAX_JOB_POSTHANDLER_PLUGINNAME = "job.postHandler.pluginName"; + + public static final String DATA_JOB_READER_PARAMETER_PASSWORD="job.content[0].reader.parameter.password"; + + public static final String DATA_JOB_WRITER_PARAMETER_PASSWORD="job.content[0].writer.parameter.password"; + + + // ----------------------------- 局部使用的变量 public static final String JOB_WRITER = "writer"; @@ -149,6 +158,8 @@ public class CoreConstant { public static final String CURRENT_SERVICE_PASSWORD = "current.service.password"; + public static final String DB_ENCRYPT_KEY = "db.encrypt.key"; + // ----------------------------- 环境变量 --------------------------------- public static String DATAX_HOME = System.getProperty("datax.home"); diff --git a/core/src/test/java/ConfigurationTest.java b/core/src/test/java/ConfigurationTest.java new file mode 100644 index 00000000..150efdd5 --- /dev/null +++ b/core/src/test/java/ConfigurationTest.java @@ -0,0 +1,39 @@ +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.core.util.JobDataBasePwdDecryptUtil; +import com.alibaba.datax.core.util.SecretUtil; +import com.alibaba.datax.core.util.container.CoreConstant; +import com.alibaba.fastjson2.JSON; +import org.junit.Test; + +import java.io.File; + +/** + * @Author weizhao.dong + * @Date 2023/3/22 18:26 + * @Version 1.0 + */ +public class ConfigurationTest { + String key = "1qaz2wsx"; + String nativePassword = "12345678"; + + /** + * 测试密码解密 + */ + @Test + public void dbEncryptPasswrdTest() { + System.setProperty("datax.home",System.getProperty("user.dir")+"/src/test/resources"); + Configuration configuration = Configuration.from(new File("src/test/resources/dwd_g2park_inout_report_s.json")); + JobDataBasePwdDecryptUtil.decrypt(configuration); + assert configuration.getString(CoreConstant.DATA_JOB_READER_PARAMETER_PASSWORD).equals(nativePassword); + assert configuration.getString(CoreConstant.DATA_JOB_WRITER_PARAMETER_PASSWORD).equals(nativePassword); + } + + /** + * 生成加密密码 + */ + @Test + public void generateEncryptPassword() { + //密码为:ZnOZROJwLiMeI3FQluEhHg== + System.out.println(SecretUtil.encrypt3DES(nativePassword, key)); + } +} diff --git a/core/src/test/resources/conf/.secret.properties b/core/src/test/resources/conf/.secret.properties new file mode 100755 index 00000000..f1f2485f --- /dev/null +++ b/core/src/test/resources/conf/.secret.properties @@ -0,0 +1,9 @@ +#ds basicAuth config +auth.user= +auth.pass= +current.keyVersion= +current.publicKey= +current.privateKey= +current.service.username= +current.service.password= +db.encrypt.key=1qaz2wsx diff --git a/core/src/test/resources/dwd_g2park_inout_report_s.json b/core/src/test/resources/dwd_g2park_inout_report_s.json new file mode 100644 index 00000000..84b011ff --- /dev/null +++ b/core/src/test/resources/dwd_g2park_inout_report_s.json @@ -0,0 +1,55 @@ +{ + "job": { + "content": [ + { + "reader": { + "name": "mysqlreader", + "parameter": { + "column": [ + "1 AS inout_type","i.enter_time AS report_date","i.create_time as create_time","i.park_code AS park_code","i.inout_id as inout_id","i.enter_time as enter_time","i.depart_time as depart_time","CAST( YEAR ( i.enter_time ) AS SIGNED ) AS report_year","CAST( MONTH ( i.enter_time ) AS SIGNED ) AS report_month","CAST( HOUR ( i.enter_time ) AS SIGNED ) AS report_hour","(CASE WHEN CHAR_LENGTH(i.client_no) = 0 OR i.client_no IS NULL THEN '0' ELSE i.client_no END) AS client_no","IFNULL(i.in_gateway, '0') AS gateway_no","1 AS total_num","( CASE WHEN i.vehicle_category = 1 THEN 1 ELSE 0 END ) AS big_car_num","( CASE WHEN i.vehicle_category <> 1 THEN 1 ELSE 0 END ) AS small_car_num","( CASE WHEN (i.vehicle_no REGEXP '[京津沪渝冀豫云辽黑湘皖鲁新苏浙赣鄂桂甘晋蒙陕吉闽贵粤青藏川宁琼使领A-Z]{1}[A-Z]{1}(([0-9]{5}[DABCEFGHJK])|([DABCEFGHJK][A-HJ-NP-Z0-9][0-9]{4}))$' ) THEN 1 ELSE 0 END ) AS new_power_car_num","( CASE WHEN i.in_label = 2 OR i.in_label = 3 THEN 1 ELSE 0 END ) AS reserve_num","( CASE WHEN i.in_label = 1 THEN 1 ELSE 0 END ) AS white_list_num","( CASE WHEN i.vehicle_category = 1 AND ( i.in_label = 2 OR i.in_label = 3 ) THEN 1 ELSE 0 END ) AS big_car_reserve_num","( CASE WHEN i.vehicle_category = 1 AND i.in_label = 1 THEN 1 ELSE 0 END ) AS big_car_white_list_num","( CASE WHEN i.vehicle_category <> 1 AND ( i.in_label = 2 OR i.in_label = 3 ) THEN 1 ELSE 0 END ) AS small_car_reserve_num","( CASE WHEN i.vehicle_category <> 1 AND i.in_label = 1 THEN 1 ELSE 0 END ) AS small_car_white_list_num","( CASE WHEN ( i.in_label = 2 OR i.in_label = 3 ) AND ( i.vehicle_no REGEXP '[京津沪渝冀豫云辽黑湘皖鲁新苏浙赣鄂桂甘晋蒙陕吉闽贵粤青藏川宁琼使领A-Z]{1}[A-Z]{1}(([0-9]{5}[DABCEFGHJK])|([DABCEFGHJK][A-HJ-NP-Z0-9][0-9]{4}))$' ) THEN 1 ELSE 0 END ) AS new_power_reserve_num","( CASE WHEN i.in_label = 1 AND ( i.vehicle_no REGEXP '[京津沪渝冀豫云辽黑湘皖鲁新苏浙赣鄂桂甘晋蒙陕吉闽贵粤青藏川宁琼使领A-Z]{1}[A-Z]{1}(([0-9]{5}[DABCEFGHJK])|([DABCEFGHJK][A-HJ-NP-Z0-9][0-9]{4}))$' ) THEN 1 ELSE 0 END ) AS new_power_white_list_num"," i.deleted","CURRENT_TIMESTAMP AS update_time" + ], + "splitPk": "inout_id", + "connection": [ + { + "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/test_db?useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&rewriteBatchedStatements=true&useSSL=false&autoReconnect=true&serverTimezone=Asia/Shanghai&zeroDateTimeBehavior=convertToNull&characterEncoding=utf-8&allowMultiQueries=true"], + "table": ["g2park_inout_record i"] + } + ], + "password": "ZnOZROJwLiMeI3FQluEhHg==", + "username": "root", + "where": "i.enter_time is not null and update_time > '${updateTime}' " + } + }, + "writer": { + "name": "mysqlwriter", + "parameter": { + "writeMode": "replace", + "column": [ + "inout_type","report_date","create_time","park_code","inout_id","enter_time","depart_time","report_year","report_month","report_hour","client_no","gateway_no","total_num","big_car_num","small_car_num","new_power_car_num","reserve_num","white_list_num","big_car_reserve_num","big_car_white_list_num","small_car_reserve_num","small_car_white_list_num","new_power_reserve_num","new_power_white_list_num","deleted","update_time" + ], + "connection": [ + { + "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test_db?useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&rewriteBatchedStatements=true&useSSL=false&autoReconnect=true&serverTimezone=Asia/Shanghai&zeroDateTimeBehavior=convertToNull&characterEncoding=utf-8&allowMultiQueries=true", + "table": ["dwd_g2park_inout_report_s"] + } + ], + "password": "ZnOZROJwLiMeI3FQluEhHg==", + "username": "root", + "preSql": [], + "session": [ + "set session sql_mode='ANSI'" + ] + + } + } + } + ], + "setting": { + "speed": { + "channel": "2" + }, + "passwdEncrypt":true + + } + } +}