mirror of
https://github.com/apache/sqoop.git
synced 2025-05-19 02:10:54 +08:00
SQOOP-1321: Add ability to serialize SqoopOption into JobConf
(Jarek Jarcec Cecho via Venkat Ranganathan)
This commit is contained in:
parent
72e3bfdd66
commit
18f5b2a77e
@ -127,6 +127,20 @@ under the License.
|
|||||||
</property>
|
</property>
|
||||||
-->
|
-->
|
||||||
|
|
||||||
|
<!--
|
||||||
|
Enabling this option will instruct Sqoop to put all options that
|
||||||
|
were used in the invocation into created mapreduce job(s). This
|
||||||
|
become handy when one needs to investigate what exact options were
|
||||||
|
used in the Sqoop invocation.
|
||||||
|
-->
|
||||||
|
<!--
|
||||||
|
<property>
|
||||||
|
<name>sqoop.jobbase.serialize.sqoopoptions</name>
|
||||||
|
<value>true</value>
|
||||||
|
<description>If true, then all options will be serialized into job.xml
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
-->
|
||||||
|
|
||||||
<!--
|
<!--
|
||||||
SERVER CONFIGURATION: If you plan to run a Sqoop metastore on this machine,
|
SERVER CONFIGURATION: If you plan to run a Sqoop metastore on this machine,
|
||||||
|
@ -379,7 +379,7 @@ public void runExport() throws ExportException, IOException {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Job job = new Job(conf);
|
Job job = createJob(conf);
|
||||||
try {
|
try {
|
||||||
// Set the external jar to use for the job.
|
// Set the external jar to use for the job.
|
||||||
job.getConfiguration().set("mapred.jar", ormJarFile);
|
job.getConfiguration().set("mapred.jar", ormJarFile);
|
||||||
|
@ -227,7 +227,7 @@ public void runImport(String tableName, String ormJarFile, String splitByCol,
|
|||||||
|
|
||||||
loadJars(conf, ormJarFile, tableClassName);
|
loadJars(conf, ormJarFile, tableClassName);
|
||||||
|
|
||||||
Job job = new Job(conf);
|
Job job = createJob(conf);
|
||||||
try {
|
try {
|
||||||
// Set the external jar to use for the job.
|
// Set the external jar to use for the job.
|
||||||
job.getConfiguration().set("mapred.jar", ormJarFile);
|
job.getConfiguration().set("mapred.jar", ormJarFile);
|
||||||
|
@ -22,6 +22,7 @@
|
|||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
@ -48,6 +49,9 @@ public class JobBase {
|
|||||||
|
|
||||||
public static final Log LOG = LogFactory.getLog(JobBase.class.getName());
|
public static final Log LOG = LogFactory.getLog(JobBase.class.getName());
|
||||||
|
|
||||||
|
public static final String SERIALIZE_SQOOPOPTIONS = "sqoop.jobbase.serialize.sqoopoptions";
|
||||||
|
public static final boolean SERIALIZE_SQOOPOPTIONS_DEFAULT = false;
|
||||||
|
|
||||||
protected SqoopOptions options;
|
protected SqoopOptions options;
|
||||||
protected Class<? extends Mapper> mapperClass;
|
protected Class<? extends Mapper> mapperClass;
|
||||||
protected Class<? extends InputFormat> inputFormatClass;
|
protected Class<? extends InputFormat> inputFormatClass;
|
||||||
@ -308,6 +312,39 @@ public Job getJob() {
|
|||||||
return mrJob;
|
return mrJob;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create new Job object in unified way for all types of jobs.
|
||||||
|
*
|
||||||
|
* @param configuration Hadoop configuration that should be used
|
||||||
|
* @return New job object, created object won't be persisted in the instance
|
||||||
|
*/
|
||||||
|
public Job createJob(Configuration configuration) throws IOException {
|
||||||
|
// Put the SqoopOptions into job if requested
|
||||||
|
if(configuration.getBoolean(SERIALIZE_SQOOPOPTIONS, SERIALIZE_SQOOPOPTIONS_DEFAULT)) {
|
||||||
|
putSqoopOptionsToConfiguration(options, configuration);
|
||||||
|
}
|
||||||
|
|
||||||
|
return new Job(configuration);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Iterates over serialized form of SqoopOptions and put them into Configuration
|
||||||
|
* object.
|
||||||
|
*
|
||||||
|
* @param opts SqoopOptions that should be serialized
|
||||||
|
* @param configuration Target configuration object
|
||||||
|
*/
|
||||||
|
public void putSqoopOptionsToConfiguration(SqoopOptions opts, Configuration configuration) {
|
||||||
|
for(Map.Entry<Object, Object> e : opts.writeProperties().entrySet()) {
|
||||||
|
String key = (String)e.getKey();
|
||||||
|
String value = (String)e.getValue();
|
||||||
|
|
||||||
|
// We don't need to do if(value is empty) because that is already done
|
||||||
|
// for us by the SqoopOptions.writeProperties() method.
|
||||||
|
configuration.set("sqoop.opt." + key, value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Actually run the MapReduce job.
|
* Actually run the MapReduce job.
|
||||||
*/
|
*/
|
||||||
|
@ -59,7 +59,7 @@ public MergeJob(final SqoopOptions opts) {
|
|||||||
|
|
||||||
public boolean runMergeJob() throws IOException {
|
public boolean runMergeJob() throws IOException {
|
||||||
Configuration conf = options.getConf();
|
Configuration conf = options.getConf();
|
||||||
Job job = new Job(conf);
|
Job job = createJob(conf);
|
||||||
|
|
||||||
String userClassName = options.getClassName();
|
String userClassName = options.getClassName();
|
||||||
if (null == userClassName) {
|
if (null == userClassName) {
|
||||||
|
Loading…
Reference in New Issue
Block a user