mirror of
https://github.com/apache/sqoop.git
synced 2025-05-10 05:09:41 +08:00
SQOOP-895: Sqoop2: Do not serialize framework and connector configurations into mapreduce configuration object
(Jarek Jarcec Cecho via Kate Ting)
This commit is contained in:
parent
c4ddeb7ff7
commit
0a0a65a29f
@ -17,6 +17,7 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.sqoop.job;
|
package org.apache.sqoop.job;
|
||||||
|
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.sqoop.core.ConfigurationConstants;
|
import org.apache.sqoop.core.ConfigurationConstants;
|
||||||
|
|
||||||
public final class JobConstants extends Constants {
|
public final class JobConstants extends Constants {
|
||||||
@ -67,15 +68,27 @@ public final class JobConstants extends Constants {
|
|||||||
public static final String JOB_CONFIG_CONNECTOR_CONNECTION =
|
public static final String JOB_CONFIG_CONNECTOR_CONNECTION =
|
||||||
PREFIX_JOB_CONFIG + "config.connector.connection";
|
PREFIX_JOB_CONFIG + "config.connector.connection";
|
||||||
|
|
||||||
|
public static final Text JOB_CONFIG_CONNECTOR_CONNECTION_KEY =
|
||||||
|
new Text(JOB_CONFIG_CONNECTOR_CONNECTION);
|
||||||
|
|
||||||
public static final String JOB_CONFIG_CONNECTOR_JOB =
|
public static final String JOB_CONFIG_CONNECTOR_JOB =
|
||||||
PREFIX_JOB_CONFIG + "config.connector.job";
|
PREFIX_JOB_CONFIG + "config.connector.job";
|
||||||
|
|
||||||
|
public static final Text JOB_CONFIG_CONNECTOR_JOB_KEY =
|
||||||
|
new Text(JOB_CONFIG_CONNECTOR_JOB);
|
||||||
|
|
||||||
public static final String JOB_CONFIG_FRAMEWORK_CONNECTION =
|
public static final String JOB_CONFIG_FRAMEWORK_CONNECTION =
|
||||||
PREFIX_JOB_CONFIG + "config.framework.connection";
|
PREFIX_JOB_CONFIG + "config.framework.connection";
|
||||||
|
|
||||||
|
public static final Text JOB_CONFIG_FRAMEWORK_CONNECTION_KEY =
|
||||||
|
new Text(JOB_CONFIG_FRAMEWORK_CONNECTION);
|
||||||
|
|
||||||
public static final String JOB_CONFIG_FRAMEWORK_JOB =
|
public static final String JOB_CONFIG_FRAMEWORK_JOB =
|
||||||
PREFIX_JOB_CONFIG + "config.framework.job";
|
PREFIX_JOB_CONFIG + "config.framework.job";
|
||||||
|
|
||||||
|
public static final Text JOB_CONFIG_FRAMEWORK_JOB_KEY =
|
||||||
|
new Text(JOB_CONFIG_FRAMEWORK_JOB);
|
||||||
|
|
||||||
public static final String PREFIX_CONNECTOR_CONTEXT =
|
public static final String PREFIX_CONNECTOR_CONTEXT =
|
||||||
PREFIX_JOB_CONFIG + "connector.context.";
|
PREFIX_JOB_CONFIG + "connector.context.";
|
||||||
|
|
||||||
|
@ -18,6 +18,8 @@
|
|||||||
package org.apache.sqoop.job.mr;
|
package org.apache.sqoop.job.mr;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.mapred.JobConf;
|
||||||
import org.apache.sqoop.job.JobConstants;
|
import org.apache.sqoop.job.JobConstants;
|
||||||
import org.apache.sqoop.model.FormUtils;
|
import org.apache.sqoop.model.FormUtils;
|
||||||
import org.apache.sqoop.model.MJob;
|
import org.apache.sqoop.model.MJob;
|
||||||
@ -33,48 +35,49 @@ public static MJob.Type getJobType(Configuration configuration) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public static Object getConnectorConnection(Configuration configuration) {
|
public static Object getConnectorConnection(Configuration configuration) {
|
||||||
return loadConfiguration(configuration,
|
return loadConfiguration((JobConf) configuration,
|
||||||
JobConstants.JOB_CONFIG_CLASS_CONNECTOR_CONNECTION,
|
JobConstants.JOB_CONFIG_CLASS_CONNECTOR_CONNECTION,
|
||||||
JobConstants.JOB_CONFIG_CONNECTOR_CONNECTION);
|
JobConstants.JOB_CONFIG_CONNECTOR_CONNECTION_KEY);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Object getConnectorJob(Configuration configuration) {
|
public static Object getConnectorJob(Configuration configuration) {
|
||||||
return loadConfiguration(configuration,
|
return loadConfiguration((JobConf) configuration,
|
||||||
JobConstants.JOB_CONFIG_CLASS_CONNECTOR_JOB,
|
JobConstants.JOB_CONFIG_CLASS_CONNECTOR_JOB,
|
||||||
JobConstants.JOB_CONFIG_CONNECTOR_JOB);
|
JobConstants.JOB_CONFIG_CONNECTOR_JOB_KEY);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Object getFrameworkConnection(Configuration configuration) {
|
public static Object getFrameworkConnection(Configuration configuration) {
|
||||||
return loadConfiguration(configuration,
|
return loadConfiguration((JobConf) configuration,
|
||||||
JobConstants.JOB_CONFIG_CLASS_FRAMEWORK_CONNECTION,
|
JobConstants.JOB_CONFIG_CLASS_FRAMEWORK_CONNECTION,
|
||||||
JobConstants.JOB_CONFIG_FRAMEWORK_CONNECTION);
|
JobConstants.JOB_CONFIG_FRAMEWORK_CONNECTION_KEY);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Object getFrameworkJob(Configuration configuration) {
|
public static Object getFrameworkJob(Configuration configuration) {
|
||||||
return loadConfiguration(configuration,
|
return loadConfiguration((JobConf) configuration,
|
||||||
JobConstants.JOB_CONFIG_CLASS_FRAMEWORK_JOB,
|
JobConstants.JOB_CONFIG_CLASS_FRAMEWORK_JOB,
|
||||||
JobConstants.JOB_CONFIG_FRAMEWORK_JOB);
|
JobConstants.JOB_CONFIG_FRAMEWORK_JOB_KEY);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Load configuration instance serialized in Hadoop configuration object
|
* Load configuration instance serialized in Hadoop credentials cache.
|
||||||
* @param configuration Hadoop configuration object associated with the job
|
*
|
||||||
|
* @param configuration JobConf object associated with the job
|
||||||
* @param classProperty Property with stored configuration class name
|
* @param classProperty Property with stored configuration class name
|
||||||
* @param valueProperty Property with stored JSON representation of the
|
* @param valueProperty Property with stored JSON representation of the
|
||||||
* configuration object
|
* configuration object
|
||||||
* @return New instance with loaded data
|
* @return New instance with loaded data
|
||||||
*/
|
*/
|
||||||
private static Object loadConfiguration(Configuration configuration,
|
private static Object loadConfiguration(JobConf configuration, String classProperty, Text valueProperty) {
|
||||||
String classProperty,
|
|
||||||
String valueProperty) {
|
|
||||||
// Create new instance of configuration class
|
// Create new instance of configuration class
|
||||||
Object object = ClassUtils.instantiate(configuration.get(classProperty));
|
Object object = ClassUtils.instantiate(configuration.get(classProperty));
|
||||||
if(object == null) {
|
if(object == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
String json = new String(configuration.getCredentials().getSecretKey(valueProperty));
|
||||||
|
|
||||||
// Fill it with JSON data
|
// Fill it with JSON data
|
||||||
FormUtils.fillValues(configuration.get(valueProperty), object);
|
FormUtils.fillValues(json, object);
|
||||||
|
|
||||||
// And give it back
|
// And give it back
|
||||||
return object;
|
return object;
|
||||||
|
@ -19,6 +19,7 @@
|
|||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.mapred.JobClient;
|
import org.apache.hadoop.mapred.JobClient;
|
||||||
import org.apache.hadoop.mapred.JobConf;
|
import org.apache.hadoop.mapred.JobConf;
|
||||||
import org.apache.hadoop.mapred.JobID;
|
import org.apache.hadoop.mapred.JobID;
|
||||||
@ -26,6 +27,7 @@
|
|||||||
import org.apache.hadoop.mapred.RunningJob;
|
import org.apache.hadoop.mapred.RunningJob;
|
||||||
import org.apache.hadoop.mapreduce.Job;
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||||
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
import org.apache.sqoop.common.MapContext;
|
import org.apache.sqoop.common.MapContext;
|
||||||
import org.apache.sqoop.common.SqoopException;
|
import org.apache.sqoop.common.SqoopException;
|
||||||
@ -181,16 +183,6 @@ public boolean submit(SubmissionRequest generalRequest) {
|
|||||||
configuration.set(JobConstants.JOB_CONFIG_CLASS_FRAMEWORK_JOB,
|
configuration.set(JobConstants.JOB_CONFIG_CLASS_FRAMEWORK_JOB,
|
||||||
request.getConfigFrameworkJob().getClass().getName());
|
request.getConfigFrameworkJob().getClass().getName());
|
||||||
|
|
||||||
// And finally configuration data
|
|
||||||
configuration.set(JobConstants.JOB_CONFIG_CONNECTOR_CONNECTION,
|
|
||||||
FormUtils.toJson(request.getConfigConnectorConnection()));
|
|
||||||
configuration.set(JobConstants.JOB_CONFIG_CONNECTOR_JOB,
|
|
||||||
FormUtils.toJson(request.getConfigConnectorJob()));
|
|
||||||
configuration.set(JobConstants.JOB_CONFIG_FRAMEWORK_CONNECTION,
|
|
||||||
FormUtils.toJson(request.getConfigFrameworkConnection()));
|
|
||||||
configuration.set(JobConstants.JOB_CONFIG_FRAMEWORK_JOB,
|
|
||||||
FormUtils.toJson(request.getConfigFrameworkConnection()));
|
|
||||||
|
|
||||||
// Set up notification URL if it's available
|
// Set up notification URL if it's available
|
||||||
if(request.getNotificationUrl() != null) {
|
if(request.getNotificationUrl() != null) {
|
||||||
configuration.set("job.end.notification.url", request.getNotificationUrl());
|
configuration.set("job.end.notification.url", request.getNotificationUrl());
|
||||||
@ -217,6 +209,17 @@ public boolean submit(SubmissionRequest generalRequest) {
|
|||||||
try {
|
try {
|
||||||
Job job = new Job(configuration);
|
Job job = new Job(configuration);
|
||||||
|
|
||||||
|
// And finally put all configuration objects to credentials cache
|
||||||
|
Credentials credentials = job.getCredentials();
|
||||||
|
credentials.addSecretKey(JobConstants.JOB_CONFIG_CONNECTOR_CONNECTION_KEY,
|
||||||
|
FormUtils.toJson(request.getConfigConnectorConnection()).getBytes());
|
||||||
|
credentials.addSecretKey(JobConstants.JOB_CONFIG_CONNECTOR_JOB_KEY,
|
||||||
|
FormUtils.toJson(request.getConfigConnectorJob()).getBytes());
|
||||||
|
credentials.addSecretKey(JobConstants.JOB_CONFIG_FRAMEWORK_CONNECTION_KEY,
|
||||||
|
FormUtils.toJson(request.getConfigFrameworkConnection()).getBytes());
|
||||||
|
credentials.addSecretKey(JobConstants.JOB_CONFIG_FRAMEWORK_JOB_KEY,
|
||||||
|
FormUtils.toJson(request.getConfigFrameworkConnection()).getBytes());
|
||||||
|
|
||||||
if(request.getJobName() != null) {
|
if(request.getJobName() != null) {
|
||||||
job.setJobName("Sqoop: " + request.getJobName());
|
job.setJobName("Sqoop: " + request.getJobName());
|
||||||
} else {
|
} else {
|
||||||
|
Loading…
Reference in New Issue
Block a user