From 06e054bc8d5350f1f15d88a0c6e9b6b236820327 Mon Sep 17 00:00:00 2001 From: Bilung Lee Date: Tue, 6 Nov 2012 12:43:03 -0800 Subject: [PATCH] SQOOP-666 Introduce execution engine (Jarek Jarcec Cecho) --- core/pom.xml | 14 +- .../sqoop/framework/ExecutionEngine.java | 72 +++++++++ .../sqoop/framework/FrameworkConstants.java | 9 ++ .../sqoop/framework/FrameworkError.java | 4 + .../sqoop/framework/FrameworkManager.java | 138 ++++++++++++------ .../sqoop/framework/SubmissionEngine.java | 9 ++ .../sqoop/framework/SubmissionRequest.java | 60 +------- dist/src/main/server/conf/sqoop.properties | 5 + execution/mapreduce/pom.xml | 67 +++++++++ .../mapreduce/MRSubmissionRequest.java | 110 ++++++++++++++ .../mapreduce/MapreduceExecutionEngine.java | 74 ++++++++++ .../org/apache/sqoop/job/JobConstants.java | 0 .../org/apache/sqoop/job/PrefixContext.java | 0 .../job/etl/HdfsSequenceImportLoader.java | 0 .../sqoop/job/etl/HdfsTextImportLoader.java | 0 .../java/org/apache/sqoop/job/io/Data.java | 0 .../org/apache/sqoop/job/io/FieldTypes.java | 0 .../sqoop/job/mr/ConfigurationUtils.java | 0 .../sqoop/job/mr/SqoopFileOutputFormat.java | 0 .../apache/sqoop/job/mr/SqoopInputFormat.java | 0 .../org/apache/sqoop/job/mr/SqoopMapper.java | 0 .../sqoop/job/mr/SqoopNullOutputFormat.java | 0 .../job/mr/SqoopOutputFormatLoadExecutor.java | 0 .../org/apache/sqoop/job/mr/SqoopReducer.java | 0 .../org/apache/sqoop/job/mr/SqoopSplit.java | 0 .../java/org/apache/sqoop/job/FileUtils.java | 0 .../java/org/apache/sqoop/job/JobUtils.java | 0 .../org/apache/sqoop/job/TestHdfsLoad.java | 0 .../org/apache/sqoop/job/TestJobEngine.java | 0 .../org/apache/sqoop/job/TestMapReduce.java | 0 .../org/apache/sqoop/job}/io/TestData.java | 3 +- execution/pom.xml | 36 +++++ pom.xml | 1 + submission/mapreduce/pom.xml | 6 + .../mapreduce/MapreduceSubmissionEngine.java | 20 ++- 35 files changed, 515 insertions(+), 113 deletions(-) create mode 100644 core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java create mode 100644 execution/mapreduce/pom.xml create mode 100644 execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MRSubmissionRequest.java create mode 100644 execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java rename {core => execution/mapreduce}/src/main/java/org/apache/sqoop/job/JobConstants.java (100%) rename {core => execution/mapreduce}/src/main/java/org/apache/sqoop/job/PrefixContext.java (100%) rename {core => execution/mapreduce}/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java (100%) rename {core => execution/mapreduce}/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java (100%) rename {core => execution/mapreduce}/src/main/java/org/apache/sqoop/job/io/Data.java (100%) rename {core => execution/mapreduce}/src/main/java/org/apache/sqoop/job/io/FieldTypes.java (100%) rename {core => execution/mapreduce}/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java (100%) rename {core => execution/mapreduce}/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java (100%) rename {core => execution/mapreduce}/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java (100%) rename {core => execution/mapreduce}/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java (100%) rename {core => execution/mapreduce}/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java (100%) rename {core => execution/mapreduce}/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java (100%) rename {core => execution/mapreduce}/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java (100%) rename {core => execution/mapreduce}/src/main/java/org/apache/sqoop/job/mr/SqoopSplit.java (100%) rename {core => execution/mapreduce}/src/test/java/org/apache/sqoop/job/FileUtils.java (100%) rename {core => execution/mapreduce}/src/test/java/org/apache/sqoop/job/JobUtils.java (100%) rename {core => execution/mapreduce}/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java (100%) rename {core => execution/mapreduce}/src/test/java/org/apache/sqoop/job/TestJobEngine.java (100%) rename {core => execution/mapreduce}/src/test/java/org/apache/sqoop/job/TestMapReduce.java (100%) rename {core/src/test/java/org/apache/sqoop => execution/mapreduce/src/test/java/org/apache/sqoop/job}/io/TestData.java (97%) create mode 100644 execution/pom.xml diff --git a/core/pom.xml b/core/pom.xml index 028c2406..0732b2cc 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -36,29 +36,23 @@ limitations under the License. org.apache.sqoop sqoop-spi + org.apache.sqoop sqoop-common + commons-dbcp commons-dbcp - - org.apache.hadoop - hadoop-common - provided - - - org.apache.hadoop - hadoop-mapreduce-client-jobclient - provided - + junit junit test + diff --git a/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java b/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java new file mode 100644 index 00000000..e1ccdf6a --- /dev/null +++ b/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.framework; + +import org.apache.sqoop.common.ImmutableContext; +import org.apache.sqoop.connector.spi.SqoopConnector; +import org.apache.sqoop.model.MSubmission; + +/** + * Execution engine drive execution of sqoop submission (job). It's responsible + * for executing all defined steps in the import/export workflow. + */ +public abstract class ExecutionEngine { + + /** + * Initialize execution engine + * + * @param context Configuration context + */ + public void initialize(ImmutableContext context, String prefix) { + } + + /** + * Destroy execution engine when stopping server + */ + public void destroy() { + } + + /** + * Return new SubmissionRequest class or any subclass if it's needed by + * execution and submission engine combination. + * + * @param summary Submission summary + * @param connector Appropriate connector structure + * @param connectorConnection Connector connection configuration + * @param connectorJob Connector job configuration + * @param frameworkConnection Framework connection configuration + * @param frameworkJob Framework job configuration + * @return New Submission request object + */ + public SubmissionRequest createSubmissionRequest(MSubmission summary, + SqoopConnector connector, + Object connectorConnection, + Object connectorJob, + Object frameworkConnection, + Object frameworkJob) { + return new SubmissionRequest(summary, connector, + connectorConnection, connectorJob, frameworkConnection, frameworkJob); + } + + /** + * Prepare given submission request for import submission. + * + * @param request Submission request + */ + public abstract void prepareImportSubmission(SubmissionRequest request); +} diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkConstants.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkConstants.java index d6e70ca7..32da4e84 100644 --- a/core/src/main/java/org/apache/sqoop/framework/FrameworkConstants.java +++ b/core/src/main/java/org/apache/sqoop/framework/FrameworkConstants.java @@ -29,6 +29,9 @@ public final class FrameworkConstants { public static final String PREFIX_SUBMISSION_CONFIG = ConfigurationConstants.PREFIX_GLOBAL_CONFIG + "submission."; + public static final String PREFIX_EXECUTION_CONFIG = + ConfigurationConstants.PREFIX_GLOBAL_CONFIG + "execution."; + public static final String SYSCFG_SUBMISSION_ENGINE = PREFIX_SUBMISSION_CONFIG + "engine"; @@ -50,6 +53,12 @@ public final class FrameworkConstants { public static final String SYSCFG_SUBMISSION_UPDATE_SLEEP = PREFIX_SUBMISSION_UPDATE_CONFIG + "sleep"; + public static final String SYSCFG_EXECUTION_ENGINE = + PREFIX_EXECUTION_CONFIG + "engine"; + + public static final String PREFIX_EXECUTION_ENGINE_CONFIG = + SYSCFG_EXECUTION_ENGINE + "."; + // Connection/Job Configuration forms public static final String FORM_SECURITY = diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkError.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkError.java index 19d0d875..4277311c 100644 --- a/core/src/main/java/org/apache/sqoop/framework/FrameworkError.java +++ b/core/src/main/java/org/apache/sqoop/framework/FrameworkError.java @@ -38,6 +38,10 @@ public enum FrameworkError implements ErrorCode { FRAMEWORK_0006("Can't bootstrap job"), + FRAMEWORK_0007("Invalid execution engine"), + + FRAMEWORK_0008("Invalid combination of submission and execution engines"), + ; private final String message; diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java index 604d403d..7e10ddc3 100644 --- a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java +++ b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java @@ -19,7 +19,6 @@ import org.apache.log4j.Logger; import org.apache.sqoop.common.MapContext; -import org.apache.sqoop.common.MutableMapContext; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.ConnectorManager; import org.apache.sqoop.connector.spi.SqoopConnector; @@ -27,11 +26,8 @@ import org.apache.sqoop.framework.configuration.ConnectionConfiguration; import org.apache.sqoop.framework.configuration.ExportJobConfiguration; import org.apache.sqoop.framework.configuration.ImportJobConfiguration; -import org.apache.sqoop.job.JobConstants; import org.apache.sqoop.job.etl.CallbackBase; import org.apache.sqoop.job.etl.Destroyer; -import org.apache.sqoop.job.etl.HdfsTextImportLoader; -import org.apache.sqoop.job.etl.Importer; import org.apache.sqoop.job.etl.Initializer; import org.apache.sqoop.model.FormUtils; import org.apache.sqoop.model.MConnection; @@ -57,47 +53,93 @@ /** * Manager for Sqoop framework itself. * - * All Sqoop internals (job execution engine, metadata) should be handled - * within this manager. + * All Sqoop internals are handled in this class: + * * Submission engine + * * Execution engine + * * Framework metadata * * Current implementation of entire submission engine is using repository - * for keep of current track, so that server might be restarted at any time - * without any affect on running jobs. This approach however might not be the - * fastest way and we might want to introduce internal structures with running - * jobs in case that this approach will be too slow. + * for keeping track of running submissions. Thus, server might be restarted at + * any time without any affect on running jobs. This approach however might not + * be the fastest way and we might want to introduce internal structures for + * running jobs in case that this approach will be too slow. */ public final class FrameworkManager { private static final Logger LOG = Logger.getLogger(FrameworkManager.class); + /** + * Default interval for purging old submissions from repository. + */ private static final long DEFAULT_PURGE_THRESHOLD = 24*60*60*1000; + /** + * Default sleep interval for purge thread. + */ private static final long DEFAULT_PURGE_SLEEP = 24*60*60*1000; + /** + * Default interval for update thread. + */ private static final long DEFAULT_UPDATE_SLEEP = 60*5*1000; + /** + * Framework metadata structures in MForm format + */ private static MFramework mFramework; + /** + * Validator instance + */ private static final Validator validator; + /** + * Configured submission engine instance + */ private static SubmissionEngine submissionEngine; + /** + * Configured execution engine instance + */ + private static ExecutionEngine executionEngine; + + /** + * Purge thread that will periodically remove old submissions from repository. + */ private static PurgeThread purgeThread = null; + /** + * Update thread that will periodically check status of running submissions. + */ private static UpdateThread updateThread = null; + /** + * Synchronization variable between threads. + */ private static boolean running = true; + /** + * Specifies how old submissions should be removed from repository. + */ private static long purgeThreshold; + /** + * Number of milliseconds for purge thread to sleep. + */ private static long purgeSleep; + /** + * Number of milliseconds for update thread to slepp. + */ private static long updateSleep; + /** + * Mutex for creating new submissions. We're not allowing more then one + * running submission for one job. + */ private static final Object submissionMutex = new Object(); static { - MConnectionForms connectionForms = new MConnectionForms( FormUtils.toForms(getConnectionConfigurationClass()) ); @@ -123,23 +165,32 @@ public static synchronized void initialize() { String submissionEngineClassName = context.getString(FrameworkConstants.SYSCFG_SUBMISSION_ENGINE); - Class submissionEngineClass = - ClassUtils.loadClass(submissionEngineClassName); - - if (submissionEngineClass == null) { + submissionEngine = (SubmissionEngine) ClassUtils.instantiate(submissionEngineClassName); + if(submissionEngine == null) { throw new SqoopException(FrameworkError.FRAMEWORK_0001, - submissionEngineClassName); - } - - try { - submissionEngine = (SubmissionEngine)submissionEngineClass.newInstance(); - } catch (Exception ex) { - throw new SqoopException(FrameworkError.FRAMEWORK_0001, - submissionEngineClassName, ex); + submissionEngineClassName); } submissionEngine.initialize(context, FrameworkConstants.PREFIX_SUBMISSION_ENGINE_CONFIG); + // Execution engine + String executionEngineClassName = + context.getString(FrameworkConstants.SYSCFG_EXECUTION_ENGINE); + + executionEngine = (ExecutionEngine) ClassUtils.instantiate(executionEngineClassName); + if(executionEngine == null) { + throw new SqoopException(FrameworkError.FRAMEWORK_0007, + executionEngineClassName); + } + + // We need to make sure that user has configured compatible combination of + // submission engine and execution engine + if(! submissionEngine.isExecutionEngineSupported(executionEngine.getClass())) { + throw new SqoopException(FrameworkError.FRAMEWORK_0008); + } + + executionEngine.initialize(context, FrameworkConstants.PREFIX_EXECUTION_ENGINE_CONFIG); + // Set up worker threads purgeThreshold = context.getLong( FrameworkConstants.SYSCFG_SUBMISSION_PURGE_THRESHOLD, @@ -161,7 +212,6 @@ public static synchronized void initialize() { updateThread = new UpdateThread(); updateThread.start(); - LOG.info("Submission manager initialized: OK"); } @@ -189,6 +239,10 @@ public static synchronized void destroy() { if(submissionEngine != null) { submissionEngine.destroy(); } + + if(executionEngine != null) { + executionEngine.destroy(); + } } public static Validator getValidator() { @@ -253,22 +307,26 @@ public static MSubmission submit(long jobId) { // Create request object MSubmission summary = new MSubmission(jobId); - SubmissionRequest request = new SubmissionRequest(summary, connector, - connectorConnection, connectorJob, frameworkConnection, frameworkJob); + SubmissionRequest request = executionEngine.createSubmissionRequest( + summary, connector, + connectorConnection, connectorJob, + frameworkConnection, frameworkJob); request.setJobName(job.getName()); // Let's register all important jars // sqoop-common - request.addJar(ClassUtils.jarForClass(MapContext.class)); + request.addJarForClass(MapContext.class); // sqoop-core - request.addJar(ClassUtils.jarForClass(FrameworkManager.class)); + request.addJarForClass(FrameworkManager.class); // sqoop-spi - request.addJar(ClassUtils.jarForClass(SqoopConnector.class)); - // particular connector in use - request.addJar(ClassUtils.jarForClass(connector.getClass())); + request.addJarForClass(SqoopConnector.class); + // Execution engine jar + request.addJarForClass(executionEngine.getClass()); + // Connector in use + request.addJarForClass(connector.getClass()); // Extra libraries that Sqoop code requires - request.addJar(ClassUtils.jarForClass(JSONValue.class)); + request.addJarForClass(JSONValue.class); switch (job.getType()) { case IMPORT: @@ -308,7 +366,7 @@ public static MSubmission submit(long jobId) { // Bootstrap job from framework perspective switch (job.getType()) { case IMPORT: - bootstrapImportSubmission(request); + prepareImportSubmission(request); break; case EXPORT: // TODO(jarcec): Implement export path @@ -342,22 +400,14 @@ public static MSubmission submit(long jobId) { return summary; } - private static void bootstrapImportSubmission(SubmissionRequest request) { - Importer importer = (Importer)request.getConnectorCallbacks(); + private static void prepareImportSubmission(SubmissionRequest request) { ImportJobConfiguration jobConfiguration = (ImportJobConfiguration) request.getConfigFrameworkJob(); // Initialize the map-reduce part (all sort of required classes, ...) request.setOutputDirectory(jobConfiguration.outputDirectory); - // Defaults for classes are mostly fine for now. - - - // Set up framework context - MutableMapContext context = request.getFrameworkContext(); - context.setString(JobConstants.JOB_ETL_PARTITIONER, importer.getPartitioner().getName()); - context.setString(JobConstants.JOB_ETL_EXTRACTOR, importer.getExtractor().getName()); - context.setString(JobConstants.JOB_ETL_DESTROYER, importer.getDestroyer().getName()); - context.setString(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName()); + // Delegate rest of the job to execution engine + executionEngine.prepareImportSubmission(request); } /** diff --git a/core/src/main/java/org/apache/sqoop/framework/SubmissionEngine.java b/core/src/main/java/org/apache/sqoop/framework/SubmissionEngine.java index f4ad3f5a..71e4ec97 100644 --- a/core/src/main/java/org/apache/sqoop/framework/SubmissionEngine.java +++ b/core/src/main/java/org/apache/sqoop/framework/SubmissionEngine.java @@ -41,6 +41,15 @@ public void initialize(MapContext context, String prefix) { public void destroy() { } + /** + * Callback to verify that configured submission engine and execution engine + * are compatible. + * + * @param executionEngineClass Configured execution class. + * @return True if such execution engine is supported + */ + public abstract boolean isExecutionEngineSupported(Class executionEngineClass); + /** * Submit new job to remote (hadoop) cluster. This method *must* fill * submission.getSummary.setExternalId(), otherwise Sqoop framework won't diff --git a/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java b/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java index 27b0566c..c70a5cc5 100644 --- a/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java +++ b/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java @@ -17,15 +17,11 @@ */ package org.apache.sqoop.framework; -import org.apache.hadoop.io.NullWritable; import org.apache.sqoop.common.MutableMapContext; import org.apache.sqoop.connector.spi.SqoopConnector; import org.apache.sqoop.job.etl.CallbackBase; -import org.apache.sqoop.job.io.Data; -import org.apache.sqoop.job.mr.SqoopFileOutputFormat; -import org.apache.sqoop.job.mr.SqoopInputFormat; -import org.apache.sqoop.job.mr.SqoopMapper; import org.apache.sqoop.model.MSubmission; +import org.apache.sqoop.utils.ClassUtils; import java.util.LinkedList; import java.util.List; @@ -85,20 +81,6 @@ public class SubmissionRequest { */ String outputDirectory; - /** - * Map-reduce specific options. - * - * I'm using strings so that this class won't have direct dependency on - * hadoop libraries. - */ - Class inputFormatClass; - Class mapperClass; - Class mapOutputKeyClass; - Class mapOutputValueClass; - Class outputFormatClass; - Class outputKeyClass; - Class outputValueClass; - public SubmissionRequest(MSubmission submission, SqoopConnector connector, @@ -115,15 +97,6 @@ public SubmissionRequest(MSubmission submission, this.configConnectorJob = configConnectorJob; this.configFrameworkConnection = configFrameworkConnection; this.configFrameworkJob = configFrameworkJob; - - // TODO(Jarcec): Move this to job execution engine - this.inputFormatClass = SqoopInputFormat.class; - this.mapperClass = SqoopMapper.class; - this.mapOutputKeyClass = Data.class; - this.mapOutputValueClass = NullWritable.class; - this.outputFormatClass = SqoopFileOutputFormat.class; - this.outputKeyClass = Data.class; - this.outputValueClass = NullWritable.class; } public MSubmission getSummary() { @@ -150,6 +123,10 @@ public void addJar(String jar) { jars.add(jar); } + public void addJarForClass(Class klass) { + jars.add(ClassUtils.jarForClass(klass)); + } + public void addJars(List jars) { this.jars.addAll(jars); } @@ -193,31 +170,4 @@ public String getOutputDirectory() { public void setOutputDirectory(String outputDirectory) { this.outputDirectory = outputDirectory; } - public Class getInputFormatClass() { - return inputFormatClass; - } - - public Class getMapperClass() { - return mapperClass; - } - - public Class getMapOutputKeyClass() { - return mapOutputKeyClass; - } - - public Class getMapOutputValueClass() { - return mapOutputValueClass; - } - - public Class getOutputFormatClass() { - return outputFormatClass; - } - - public Class getOutputKeyClass() { - return outputKeyClass; - } - - public Class getOutputValueClass() { - return outputValueClass; - } } diff --git a/dist/src/main/server/conf/sqoop.properties b/dist/src/main/server/conf/sqoop.properties index d429c3a8..5131aad8 100755 --- a/dist/src/main/server/conf/sqoop.properties +++ b/dist/src/main/server/conf/sqoop.properties @@ -108,3 +108,8 @@ org.apache.sqoop.submission.engine=org.apache.sqoop.submission.mapreduce.Mapredu # Hadoop configuration directory org.apache.sqoop.submission.engine.mapreduce.configuration.directory=/etc/hadoop/conf/ + +# +# Execution engine configuration +# +org.apache.sqoop.execution.engine=org.apache.sqoop.execution.mapreduce.MapreduceExecutionEngine diff --git a/execution/mapreduce/pom.xml b/execution/mapreduce/pom.xml new file mode 100644 index 00000000..e529f556 --- /dev/null +++ b/execution/mapreduce/pom.xml @@ -0,0 +1,67 @@ + + + + 4.0.0 + + org.apache.sqoop + execution + 2.0.0-SNAPSHOT + + + org.apache.sqoop.execution + sqoop-execution-mapreduce + 2.0.0-SNAPSHOT + Sqoop Mapreduce Execution Engine + + + + org.apache.sqoop + sqoop-core + 2.0.0-SNAPSHOT + + + + junit + junit + test + + + + org.apache.sqoop + sqoop-core + 2.0.0-SNAPSHOT + test-jar + test + + + + org.apache.hadoop + hadoop-common + provided + + + + org.apache.hadoop + hadoop-mapreduce-client-jobclient + provided + + + + + diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MRSubmissionRequest.java b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MRSubmissionRequest.java new file mode 100644 index 00000000..3f372222 --- /dev/null +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MRSubmissionRequest.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.execution.mapreduce; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.sqoop.connector.spi.SqoopConnector; +import org.apache.sqoop.framework.SubmissionRequest; +import org.apache.sqoop.model.MSubmission; + +/** + * Map-reduce specific submission request containing all extra information + * needed for bootstrapping map-reduce job. + */ +public class MRSubmissionRequest extends SubmissionRequest { + + /** + * Map-reduce specific options. + */ + Class inputFormatClass; + Class mapperClass; + Class mapOutputKeyClass; + Class mapOutputValueClass; + Class outputFormatClass; + Class outputKeyClass; + Class outputValueClass; + + public MRSubmissionRequest(MSubmission submission, + SqoopConnector connector, + Object configConnectorConnection, + Object configConnectorJob, + Object configFrameworkConnection, + Object configFrameworkJob) { + super(submission, connector, configConnectorConnection, configConnectorJob, + configFrameworkConnection, configFrameworkJob); + } + + public Class getInputFormatClass() { + return inputFormatClass; + } + + public void setInputFormatClass(Class inputFormatClass) { + this.inputFormatClass = inputFormatClass; + } + + public Class getMapperClass() { + return mapperClass; + } + + public void setMapperClass(Class mapperClass) { + this.mapperClass = mapperClass; + } + + public Class getMapOutputKeyClass() { + return mapOutputKeyClass; + } + + public void setMapOutputKeyClass(Class mapOutputKeyClass) { + this.mapOutputKeyClass = mapOutputKeyClass; + } + + public Class getMapOutputValueClass() { + return mapOutputValueClass; + } + + public void setMapOutputValueClass(Class mapOutputValueClass) { + this.mapOutputValueClass = mapOutputValueClass; + } + + public Class getOutputFormatClass() { + return outputFormatClass; + } + + public void setOutputFormatClass(Class outputFormatClass) { + this.outputFormatClass = outputFormatClass; + } + + public Class getOutputKeyClass() { + return outputKeyClass; + } + + public void setOutputKeyClass(Class outputKeyClass) { + this.outputKeyClass = outputKeyClass; + } + + public Class getOutputValueClass() { + return outputValueClass; + } + + public void setOutputValueClass(Class outputValueClass) { + this.outputValueClass = outputValueClass; + } +} diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java new file mode 100644 index 00000000..77ca59ba --- /dev/null +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.execution.mapreduce; + +import org.apache.hadoop.io.NullWritable; +import org.apache.sqoop.common.MutableMapContext; +import org.apache.sqoop.connector.spi.SqoopConnector; +import org.apache.sqoop.framework.ExecutionEngine; +import org.apache.sqoop.framework.SubmissionRequest; +import org.apache.sqoop.job.JobConstants; +import org.apache.sqoop.job.etl.HdfsTextImportLoader; +import org.apache.sqoop.job.etl.Importer; +import org.apache.sqoop.job.io.Data; +import org.apache.sqoop.job.mr.SqoopFileOutputFormat; +import org.apache.sqoop.job.mr.SqoopInputFormat; +import org.apache.sqoop.job.mr.SqoopMapper; +import org.apache.sqoop.model.MSubmission; + +/** + * + */ +public class MapreduceExecutionEngine extends ExecutionEngine { + + @Override + public SubmissionRequest createSubmissionRequest(MSubmission summary, + SqoopConnector connector, + Object connectorConnection, + Object connectorJob, + Object frameworkConnection, + Object frameworkJob) { + return new MRSubmissionRequest(summary, connector, connectorConnection, + connectorJob, frameworkConnection, frameworkJob); + } + + @Override + public void prepareImportSubmission(SubmissionRequest gRequest) { + MRSubmissionRequest request = (MRSubmissionRequest) gRequest; + + // Configure map-reduce classes for import + request.setInputFormatClass(SqoopInputFormat.class); + + request.setMapperClass(SqoopMapper.class); + request.setMapOutputKeyClass(Data.class); + request.setMapOutputValueClass(NullWritable.class); + + request.setOutputFormatClass(SqoopFileOutputFormat.class); + request.setOutputKeyClass(Data.class); + request.setOutputValueClass(NullWritable.class); + + Importer importer = (Importer)request.getConnectorCallbacks(); + + // Set up framework context + MutableMapContext context = request.getFrameworkContext(); + context.setString(JobConstants.JOB_ETL_PARTITIONER, importer.getPartitioner().getName()); + context.setString(JobConstants.JOB_ETL_EXTRACTOR, importer.getExtractor().getName()); + context.setString(JobConstants.JOB_ETL_DESTROYER, importer.getDestroyer().getName()); + context.setString(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName()); + } +} diff --git a/core/src/main/java/org/apache/sqoop/job/JobConstants.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java similarity index 100% rename from core/src/main/java/org/apache/sqoop/job/JobConstants.java rename to execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java diff --git a/core/src/main/java/org/apache/sqoop/job/PrefixContext.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/PrefixContext.java similarity index 100% rename from core/src/main/java/org/apache/sqoop/job/PrefixContext.java rename to execution/mapreduce/src/main/java/org/apache/sqoop/job/PrefixContext.java diff --git a/core/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java similarity index 100% rename from core/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java rename to execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java diff --git a/core/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java similarity index 100% rename from core/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java rename to execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java diff --git a/core/src/main/java/org/apache/sqoop/job/io/Data.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java similarity index 100% rename from core/src/main/java/org/apache/sqoop/job/io/Data.java rename to execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java diff --git a/core/src/main/java/org/apache/sqoop/job/io/FieldTypes.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/FieldTypes.java similarity index 100% rename from core/src/main/java/org/apache/sqoop/job/io/FieldTypes.java rename to execution/mapreduce/src/main/java/org/apache/sqoop/job/io/FieldTypes.java diff --git a/core/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java similarity index 100% rename from core/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java rename to execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java diff --git a/core/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java similarity index 100% rename from core/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java rename to execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java diff --git a/core/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java similarity index 100% rename from core/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java rename to execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java diff --git a/core/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java similarity index 100% rename from core/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java rename to execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java diff --git a/core/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java similarity index 100% rename from core/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java rename to execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java diff --git a/core/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java similarity index 100% rename from core/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java rename to execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java diff --git a/core/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java similarity index 100% rename from core/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java rename to execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java diff --git a/core/src/main/java/org/apache/sqoop/job/mr/SqoopSplit.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopSplit.java similarity index 100% rename from core/src/main/java/org/apache/sqoop/job/mr/SqoopSplit.java rename to execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopSplit.java diff --git a/core/src/test/java/org/apache/sqoop/job/FileUtils.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/FileUtils.java similarity index 100% rename from core/src/test/java/org/apache/sqoop/job/FileUtils.java rename to execution/mapreduce/src/test/java/org/apache/sqoop/job/FileUtils.java diff --git a/core/src/test/java/org/apache/sqoop/job/JobUtils.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java similarity index 100% rename from core/src/test/java/org/apache/sqoop/job/JobUtils.java rename to execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java diff --git a/core/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java similarity index 100% rename from core/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java rename to execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java diff --git a/core/src/test/java/org/apache/sqoop/job/TestJobEngine.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestJobEngine.java similarity index 100% rename from core/src/test/java/org/apache/sqoop/job/TestJobEngine.java rename to execution/mapreduce/src/test/java/org/apache/sqoop/job/TestJobEngine.java diff --git a/core/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java similarity index 100% rename from core/src/test/java/org/apache/sqoop/job/TestMapReduce.java rename to execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java diff --git a/core/src/test/java/org/apache/sqoop/io/TestData.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestData.java similarity index 97% rename from core/src/test/java/org/apache/sqoop/io/TestData.java rename to execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestData.java index 9fe9d413..d4a7d4d6 100644 --- a/core/src/test/java/org/apache/sqoop/io/TestData.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestData.java @@ -15,13 +15,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.sqoop.io; +package org.apache.sqoop.job.io; import java.util.Arrays; import junit.framework.TestCase; -import org.apache.sqoop.job.io.Data; import org.junit.Test; public class TestData extends TestCase { diff --git a/execution/pom.xml b/execution/pom.xml new file mode 100644 index 00000000..fb9f8018 --- /dev/null +++ b/execution/pom.xml @@ -0,0 +1,36 @@ + + + + 4.0.0 + + + org.apache + sqoop + 2.0.0-SNAPSHOT + + + org.apache.sqoop + execution + Sqoop Execution Engines + pom + + + mapreduce + + + diff --git a/pom.xml b/pom.xml index a4915fd6..42113332 100644 --- a/pom.xml +++ b/pom.xml @@ -220,6 +220,7 @@ limitations under the License. client docs connector + execution submission dist diff --git a/submission/mapreduce/pom.xml b/submission/mapreduce/pom.xml index 03c06c0d..f8a7d3de 100644 --- a/submission/mapreduce/pom.xml +++ b/submission/mapreduce/pom.xml @@ -36,6 +36,12 @@ limitations under the License. 2.0.0-SNAPSHOT + + org.apache.sqoop.execution + sqoop-execution-mapreduce + 2.0.0-SNAPSHOT + + junit junit diff --git a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java index 94098def..b8415e3f 100644 --- a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java +++ b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java @@ -28,6 +28,8 @@ import org.apache.log4j.Logger; import org.apache.sqoop.common.MapContext; import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.execution.mapreduce.MRSubmissionRequest; +import org.apache.sqoop.execution.mapreduce.MapreduceExecutionEngine; import org.apache.sqoop.framework.SubmissionRequest; import org.apache.sqoop.framework.SubmissionEngine; import org.apache.sqoop.job.JobConstants; @@ -116,8 +118,22 @@ public void destroy() { * {@inheritDoc} */ @Override - @SuppressWarnings("unchecked") - public boolean submit(SubmissionRequest request) { + public boolean isExecutionEngineSupported(Class executionEngineClass) { + if(executionEngineClass == MapreduceExecutionEngine.class) { + return true; + } + + return false; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean submit(SubmissionRequest generalRequest) { + // We're supporting only map reduce jobs + MRSubmissionRequest request = (MRSubmissionRequest) generalRequest; + // Clone global configuration Configuration configuration = new Configuration(globalConfiguration);