5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-05 07:19:27 +08:00

SQOOP-666 Introduce execution engine

(Jarek Jarcec Cecho)
This commit is contained in:
Bilung Lee 2012-11-06 12:43:03 -08:00
parent 25f3fd331f
commit 06e054bc8d
35 changed files with 515 additions and 113 deletions

View File

@ -36,29 +36,23 @@ limitations under the License.
<groupId>org.apache.sqoop</groupId>
<artifactId>sqoop-spi</artifactId>
</dependency>
<dependency>
<groupId>org.apache.sqoop</groupId>
<artifactId>sqoop-common</artifactId>
</dependency>
<dependency>
<groupId>commons-dbcp</groupId>
<artifactId>commons-dbcp</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@ -0,0 +1,72 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.framework;
import org.apache.sqoop.common.ImmutableContext;
import org.apache.sqoop.connector.spi.SqoopConnector;
import org.apache.sqoop.model.MSubmission;
/**
* Execution engine drive execution of sqoop submission (job). It's responsible
* for executing all defined steps in the import/export workflow.
*/
public abstract class ExecutionEngine {
/**
* Initialize execution engine
*
* @param context Configuration context
*/
public void initialize(ImmutableContext context, String prefix) {
}
/**
* Destroy execution engine when stopping server
*/
public void destroy() {
}
/**
* Return new SubmissionRequest class or any subclass if it's needed by
* execution and submission engine combination.
*
* @param summary Submission summary
* @param connector Appropriate connector structure
* @param connectorConnection Connector connection configuration
* @param connectorJob Connector job configuration
* @param frameworkConnection Framework connection configuration
* @param frameworkJob Framework job configuration
* @return New Submission request object
*/
public SubmissionRequest createSubmissionRequest(MSubmission summary,
SqoopConnector connector,
Object connectorConnection,
Object connectorJob,
Object frameworkConnection,
Object frameworkJob) {
return new SubmissionRequest(summary, connector,
connectorConnection, connectorJob, frameworkConnection, frameworkJob);
}
/**
* Prepare given submission request for import submission.
*
* @param request Submission request
*/
public abstract void prepareImportSubmission(SubmissionRequest request);
}

View File

@ -29,6 +29,9 @@ public final class FrameworkConstants {
public static final String PREFIX_SUBMISSION_CONFIG =
ConfigurationConstants.PREFIX_GLOBAL_CONFIG + "submission.";
public static final String PREFIX_EXECUTION_CONFIG =
ConfigurationConstants.PREFIX_GLOBAL_CONFIG + "execution.";
public static final String SYSCFG_SUBMISSION_ENGINE =
PREFIX_SUBMISSION_CONFIG + "engine";
@ -50,6 +53,12 @@ public final class FrameworkConstants {
public static final String SYSCFG_SUBMISSION_UPDATE_SLEEP =
PREFIX_SUBMISSION_UPDATE_CONFIG + "sleep";
public static final String SYSCFG_EXECUTION_ENGINE =
PREFIX_EXECUTION_CONFIG + "engine";
public static final String PREFIX_EXECUTION_ENGINE_CONFIG =
SYSCFG_EXECUTION_ENGINE + ".";
// Connection/Job Configuration forms
public static final String FORM_SECURITY =

View File

@ -38,6 +38,10 @@ public enum FrameworkError implements ErrorCode {
FRAMEWORK_0006("Can't bootstrap job"),
FRAMEWORK_0007("Invalid execution engine"),
FRAMEWORK_0008("Invalid combination of submission and execution engines"),
;
private final String message;

View File

@ -19,7 +19,6 @@
import org.apache.log4j.Logger;
import org.apache.sqoop.common.MapContext;
import org.apache.sqoop.common.MutableMapContext;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.ConnectorManager;
import org.apache.sqoop.connector.spi.SqoopConnector;
@ -27,11 +26,8 @@
import org.apache.sqoop.framework.configuration.ConnectionConfiguration;
import org.apache.sqoop.framework.configuration.ExportJobConfiguration;
import org.apache.sqoop.framework.configuration.ImportJobConfiguration;
import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.etl.CallbackBase;
import org.apache.sqoop.job.etl.Destroyer;
import org.apache.sqoop.job.etl.HdfsTextImportLoader;
import org.apache.sqoop.job.etl.Importer;
import org.apache.sqoop.job.etl.Initializer;
import org.apache.sqoop.model.FormUtils;
import org.apache.sqoop.model.MConnection;
@ -57,47 +53,93 @@
/**
* Manager for Sqoop framework itself.
*
* All Sqoop internals (job execution engine, metadata) should be handled
* within this manager.
* All Sqoop internals are handled in this class:
* * Submission engine
* * Execution engine
* * Framework metadata
*
* Current implementation of entire submission engine is using repository
* for keep of current track, so that server might be restarted at any time
* without any affect on running jobs. This approach however might not be the
* fastest way and we might want to introduce internal structures with running
* jobs in case that this approach will be too slow.
* for keeping track of running submissions. Thus, server might be restarted at
* any time without any affect on running jobs. This approach however might not
* be the fastest way and we might want to introduce internal structures for
* running jobs in case that this approach will be too slow.
*/
public final class FrameworkManager {
private static final Logger LOG = Logger.getLogger(FrameworkManager.class);
/**
* Default interval for purging old submissions from repository.
*/
private static final long DEFAULT_PURGE_THRESHOLD = 24*60*60*1000;
/**
* Default sleep interval for purge thread.
*/
private static final long DEFAULT_PURGE_SLEEP = 24*60*60*1000;
/**
* Default interval for update thread.
*/
private static final long DEFAULT_UPDATE_SLEEP = 60*5*1000;
/**
* Framework metadata structures in MForm format
*/
private static MFramework mFramework;
/**
* Validator instance
*/
private static final Validator validator;
/**
* Configured submission engine instance
*/
private static SubmissionEngine submissionEngine;
/**
* Configured execution engine instance
*/
private static ExecutionEngine executionEngine;
/**
* Purge thread that will periodically remove old submissions from repository.
*/
private static PurgeThread purgeThread = null;
/**
* Update thread that will periodically check status of running submissions.
*/
private static UpdateThread updateThread = null;
/**
* Synchronization variable between threads.
*/
private static boolean running = true;
/**
* Specifies how old submissions should be removed from repository.
*/
private static long purgeThreshold;
/**
* Number of milliseconds for purge thread to sleep.
*/
private static long purgeSleep;
/**
* Number of milliseconds for update thread to slepp.
*/
private static long updateSleep;
/**
* Mutex for creating new submissions. We're not allowing more then one
* running submission for one job.
*/
private static final Object submissionMutex = new Object();
static {
MConnectionForms connectionForms = new MConnectionForms(
FormUtils.toForms(getConnectionConfigurationClass())
);
@ -123,23 +165,32 @@ public static synchronized void initialize() {
String submissionEngineClassName =
context.getString(FrameworkConstants.SYSCFG_SUBMISSION_ENGINE);
Class<?> submissionEngineClass =
ClassUtils.loadClass(submissionEngineClassName);
if (submissionEngineClass == null) {
submissionEngine = (SubmissionEngine) ClassUtils.instantiate(submissionEngineClassName);
if(submissionEngine == null) {
throw new SqoopException(FrameworkError.FRAMEWORK_0001,
submissionEngineClassName);
}
try {
submissionEngine = (SubmissionEngine)submissionEngineClass.newInstance();
} catch (Exception ex) {
throw new SqoopException(FrameworkError.FRAMEWORK_0001,
submissionEngineClassName, ex);
submissionEngineClassName);
}
submissionEngine.initialize(context, FrameworkConstants.PREFIX_SUBMISSION_ENGINE_CONFIG);
// Execution engine
String executionEngineClassName =
context.getString(FrameworkConstants.SYSCFG_EXECUTION_ENGINE);
executionEngine = (ExecutionEngine) ClassUtils.instantiate(executionEngineClassName);
if(executionEngine == null) {
throw new SqoopException(FrameworkError.FRAMEWORK_0007,
executionEngineClassName);
}
// We need to make sure that user has configured compatible combination of
// submission engine and execution engine
if(! submissionEngine.isExecutionEngineSupported(executionEngine.getClass())) {
throw new SqoopException(FrameworkError.FRAMEWORK_0008);
}
executionEngine.initialize(context, FrameworkConstants.PREFIX_EXECUTION_ENGINE_CONFIG);
// Set up worker threads
purgeThreshold = context.getLong(
FrameworkConstants.SYSCFG_SUBMISSION_PURGE_THRESHOLD,
@ -161,7 +212,6 @@ public static synchronized void initialize() {
updateThread = new UpdateThread();
updateThread.start();
LOG.info("Submission manager initialized: OK");
}
@ -189,6 +239,10 @@ public static synchronized void destroy() {
if(submissionEngine != null) {
submissionEngine.destroy();
}
if(executionEngine != null) {
executionEngine.destroy();
}
}
public static Validator getValidator() {
@ -253,22 +307,26 @@ public static MSubmission submit(long jobId) {
// Create request object
MSubmission summary = new MSubmission(jobId);
SubmissionRequest request = new SubmissionRequest(summary, connector,
connectorConnection, connectorJob, frameworkConnection, frameworkJob);
SubmissionRequest request = executionEngine.createSubmissionRequest(
summary, connector,
connectorConnection, connectorJob,
frameworkConnection, frameworkJob);
request.setJobName(job.getName());
// Let's register all important jars
// sqoop-common
request.addJar(ClassUtils.jarForClass(MapContext.class));
request.addJarForClass(MapContext.class);
// sqoop-core
request.addJar(ClassUtils.jarForClass(FrameworkManager.class));
request.addJarForClass(FrameworkManager.class);
// sqoop-spi
request.addJar(ClassUtils.jarForClass(SqoopConnector.class));
// particular connector in use
request.addJar(ClassUtils.jarForClass(connector.getClass()));
request.addJarForClass(SqoopConnector.class);
// Execution engine jar
request.addJarForClass(executionEngine.getClass());
// Connector in use
request.addJarForClass(connector.getClass());
// Extra libraries that Sqoop code requires
request.addJar(ClassUtils.jarForClass(JSONValue.class));
request.addJarForClass(JSONValue.class);
switch (job.getType()) {
case IMPORT:
@ -308,7 +366,7 @@ public static MSubmission submit(long jobId) {
// Bootstrap job from framework perspective
switch (job.getType()) {
case IMPORT:
bootstrapImportSubmission(request);
prepareImportSubmission(request);
break;
case EXPORT:
// TODO(jarcec): Implement export path
@ -342,22 +400,14 @@ public static MSubmission submit(long jobId) {
return summary;
}
private static void bootstrapImportSubmission(SubmissionRequest request) {
Importer importer = (Importer)request.getConnectorCallbacks();
private static void prepareImportSubmission(SubmissionRequest request) {
ImportJobConfiguration jobConfiguration = (ImportJobConfiguration) request.getConfigFrameworkJob();
// Initialize the map-reduce part (all sort of required classes, ...)
request.setOutputDirectory(jobConfiguration.outputDirectory);
// Defaults for classes are mostly fine for now.
// Set up framework context
MutableMapContext context = request.getFrameworkContext();
context.setString(JobConstants.JOB_ETL_PARTITIONER, importer.getPartitioner().getName());
context.setString(JobConstants.JOB_ETL_EXTRACTOR, importer.getExtractor().getName());
context.setString(JobConstants.JOB_ETL_DESTROYER, importer.getDestroyer().getName());
context.setString(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName());
// Delegate rest of the job to execution engine
executionEngine.prepareImportSubmission(request);
}
/**

View File

@ -41,6 +41,15 @@ public void initialize(MapContext context, String prefix) {
public void destroy() {
}
/**
* Callback to verify that configured submission engine and execution engine
* are compatible.
*
* @param executionEngineClass Configured execution class.
* @return True if such execution engine is supported
*/
public abstract boolean isExecutionEngineSupported(Class executionEngineClass);
/**
* Submit new job to remote (hadoop) cluster. This method *must* fill
* submission.getSummary.setExternalId(), otherwise Sqoop framework won't

View File

@ -17,15 +17,11 @@
*/
package org.apache.sqoop.framework;
import org.apache.hadoop.io.NullWritable;
import org.apache.sqoop.common.MutableMapContext;
import org.apache.sqoop.connector.spi.SqoopConnector;
import org.apache.sqoop.job.etl.CallbackBase;
import org.apache.sqoop.job.io.Data;
import org.apache.sqoop.job.mr.SqoopFileOutputFormat;
import org.apache.sqoop.job.mr.SqoopInputFormat;
import org.apache.sqoop.job.mr.SqoopMapper;
import org.apache.sqoop.model.MSubmission;
import org.apache.sqoop.utils.ClassUtils;
import java.util.LinkedList;
import java.util.List;
@ -85,20 +81,6 @@ public class SubmissionRequest {
*/
String outputDirectory;
/**
* Map-reduce specific options.
*
* I'm using strings so that this class won't have direct dependency on
* hadoop libraries.
*/
Class inputFormatClass;
Class mapperClass;
Class mapOutputKeyClass;
Class mapOutputValueClass;
Class outputFormatClass;
Class outputKeyClass;
Class outputValueClass;
public SubmissionRequest(MSubmission submission,
SqoopConnector connector,
@ -115,15 +97,6 @@ public SubmissionRequest(MSubmission submission,
this.configConnectorJob = configConnectorJob;
this.configFrameworkConnection = configFrameworkConnection;
this.configFrameworkJob = configFrameworkJob;
// TODO(Jarcec): Move this to job execution engine
this.inputFormatClass = SqoopInputFormat.class;
this.mapperClass = SqoopMapper.class;
this.mapOutputKeyClass = Data.class;
this.mapOutputValueClass = NullWritable.class;
this.outputFormatClass = SqoopFileOutputFormat.class;
this.outputKeyClass = Data.class;
this.outputValueClass = NullWritable.class;
}
public MSubmission getSummary() {
@ -150,6 +123,10 @@ public void addJar(String jar) {
jars.add(jar);
}
public void addJarForClass(Class klass) {
jars.add(ClassUtils.jarForClass(klass));
}
public void addJars(List<String> jars) {
this.jars.addAll(jars);
}
@ -193,31 +170,4 @@ public String getOutputDirectory() {
public void setOutputDirectory(String outputDirectory) {
this.outputDirectory = outputDirectory;
}
public Class getInputFormatClass() {
return inputFormatClass;
}
public Class getMapperClass() {
return mapperClass;
}
public Class getMapOutputKeyClass() {
return mapOutputKeyClass;
}
public Class getMapOutputValueClass() {
return mapOutputValueClass;
}
public Class getOutputFormatClass() {
return outputFormatClass;
}
public Class getOutputKeyClass() {
return outputKeyClass;
}
public Class getOutputValueClass() {
return outputValueClass;
}
}

View File

@ -108,3 +108,8 @@ org.apache.sqoop.submission.engine=org.apache.sqoop.submission.mapreduce.Mapredu
# Hadoop configuration directory
org.apache.sqoop.submission.engine.mapreduce.configuration.directory=/etc/hadoop/conf/
#
# Execution engine configuration
#
org.apache.sqoop.execution.engine=org.apache.sqoop.execution.mapreduce.MapreduceExecutionEngine

View File

@ -0,0 +1,67 @@
<?xml version="1.0"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.sqoop</groupId>
<artifactId>execution</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<groupId>org.apache.sqoop.execution</groupId>
<artifactId>sqoop-execution-mapreduce</artifactId>
<version>2.0.0-SNAPSHOT</version>
<name>Sqoop Mapreduce Execution Engine</name>
<dependencies>
<dependency>
<groupId>org.apache.sqoop</groupId>
<artifactId>sqoop-core</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.sqoop</groupId>
<artifactId>sqoop-core</artifactId>
<version>2.0.0-SNAPSHOT</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,110 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.execution.mapreduce;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.sqoop.connector.spi.SqoopConnector;
import org.apache.sqoop.framework.SubmissionRequest;
import org.apache.sqoop.model.MSubmission;
/**
* Map-reduce specific submission request containing all extra information
* needed for bootstrapping map-reduce job.
*/
public class MRSubmissionRequest extends SubmissionRequest {
/**
* Map-reduce specific options.
*/
Class<? extends InputFormat> inputFormatClass;
Class<? extends Mapper> mapperClass;
Class<? extends Writable> mapOutputKeyClass;
Class<? extends Writable> mapOutputValueClass;
Class<? extends OutputFormat> outputFormatClass;
Class<? extends Writable> outputKeyClass;
Class<? extends Writable> outputValueClass;
public MRSubmissionRequest(MSubmission submission,
SqoopConnector connector,
Object configConnectorConnection,
Object configConnectorJob,
Object configFrameworkConnection,
Object configFrameworkJob) {
super(submission, connector, configConnectorConnection, configConnectorJob,
configFrameworkConnection, configFrameworkJob);
}
public Class<? extends InputFormat> getInputFormatClass() {
return inputFormatClass;
}
public void setInputFormatClass(Class<? extends InputFormat> inputFormatClass) {
this.inputFormatClass = inputFormatClass;
}
public Class<? extends Mapper> getMapperClass() {
return mapperClass;
}
public void setMapperClass(Class<? extends Mapper> mapperClass) {
this.mapperClass = mapperClass;
}
public Class<? extends Writable> getMapOutputKeyClass() {
return mapOutputKeyClass;
}
public void setMapOutputKeyClass(Class<? extends Writable> mapOutputKeyClass) {
this.mapOutputKeyClass = mapOutputKeyClass;
}
public Class<? extends Writable> getMapOutputValueClass() {
return mapOutputValueClass;
}
public void setMapOutputValueClass(Class<? extends Writable> mapOutputValueClass) {
this.mapOutputValueClass = mapOutputValueClass;
}
public Class<? extends OutputFormat> getOutputFormatClass() {
return outputFormatClass;
}
public void setOutputFormatClass(Class<? extends OutputFormat> outputFormatClass) {
this.outputFormatClass = outputFormatClass;
}
public Class<? extends Writable> getOutputKeyClass() {
return outputKeyClass;
}
public void setOutputKeyClass(Class<? extends Writable> outputKeyClass) {
this.outputKeyClass = outputKeyClass;
}
public Class<? extends Writable> getOutputValueClass() {
return outputValueClass;
}
public void setOutputValueClass(Class<? extends Writable> outputValueClass) {
this.outputValueClass = outputValueClass;
}
}

View File

@ -0,0 +1,74 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.execution.mapreduce;
import org.apache.hadoop.io.NullWritable;
import org.apache.sqoop.common.MutableMapContext;
import org.apache.sqoop.connector.spi.SqoopConnector;
import org.apache.sqoop.framework.ExecutionEngine;
import org.apache.sqoop.framework.SubmissionRequest;
import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.etl.HdfsTextImportLoader;
import org.apache.sqoop.job.etl.Importer;
import org.apache.sqoop.job.io.Data;
import org.apache.sqoop.job.mr.SqoopFileOutputFormat;
import org.apache.sqoop.job.mr.SqoopInputFormat;
import org.apache.sqoop.job.mr.SqoopMapper;
import org.apache.sqoop.model.MSubmission;
/**
*
*/
public class MapreduceExecutionEngine extends ExecutionEngine {
@Override
public SubmissionRequest createSubmissionRequest(MSubmission summary,
SqoopConnector connector,
Object connectorConnection,
Object connectorJob,
Object frameworkConnection,
Object frameworkJob) {
return new MRSubmissionRequest(summary, connector, connectorConnection,
connectorJob, frameworkConnection, frameworkJob);
}
@Override
public void prepareImportSubmission(SubmissionRequest gRequest) {
MRSubmissionRequest request = (MRSubmissionRequest) gRequest;
// Configure map-reduce classes for import
request.setInputFormatClass(SqoopInputFormat.class);
request.setMapperClass(SqoopMapper.class);
request.setMapOutputKeyClass(Data.class);
request.setMapOutputValueClass(NullWritable.class);
request.setOutputFormatClass(SqoopFileOutputFormat.class);
request.setOutputKeyClass(Data.class);
request.setOutputValueClass(NullWritable.class);
Importer importer = (Importer)request.getConnectorCallbacks();
// Set up framework context
MutableMapContext context = request.getFrameworkContext();
context.setString(JobConstants.JOB_ETL_PARTITIONER, importer.getPartitioner().getName());
context.setString(JobConstants.JOB_ETL_EXTRACTOR, importer.getExtractor().getName());
context.setString(JobConstants.JOB_ETL_DESTROYER, importer.getDestroyer().getName());
context.setString(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName());
}
}

View File

@ -15,13 +15,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.io;
package org.apache.sqoop.job.io;
import java.util.Arrays;
import junit.framework.TestCase;
import org.apache.sqoop.job.io.Data;
import org.junit.Test;
public class TestData extends TestCase {

36
execution/pom.xml Normal file
View File

@ -0,0 +1,36 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
--><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache</groupId>
<artifactId>sqoop</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<groupId>org.apache.sqoop</groupId>
<artifactId>execution</artifactId>
<name>Sqoop Execution Engines</name>
<packaging>pom</packaging>
<modules>
<module>mapreduce</module>
</modules>
</project>

View File

@ -220,6 +220,7 @@ limitations under the License.
<module>client</module>
<module>docs</module>
<module>connector</module>
<module>execution</module>
<module>submission</module>
<module>dist</module>
</modules>

View File

@ -36,6 +36,12 @@ limitations under the License.
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.sqoop.execution</groupId>
<artifactId>sqoop-execution-mapreduce</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>

View File

@ -28,6 +28,8 @@
import org.apache.log4j.Logger;
import org.apache.sqoop.common.MapContext;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.execution.mapreduce.MRSubmissionRequest;
import org.apache.sqoop.execution.mapreduce.MapreduceExecutionEngine;
import org.apache.sqoop.framework.SubmissionRequest;
import org.apache.sqoop.framework.SubmissionEngine;
import org.apache.sqoop.job.JobConstants;
@ -116,8 +118,22 @@ public void destroy() {
* {@inheritDoc}
*/
@Override
@SuppressWarnings("unchecked")
public boolean submit(SubmissionRequest request) {
public boolean isExecutionEngineSupported(Class executionEngineClass) {
if(executionEngineClass == MapreduceExecutionEngine.class) {
return true;
}
return false;
}
/**
* {@inheritDoc}
*/
@Override
public boolean submit(SubmissionRequest generalRequest) {
// We're supporting only map reduce jobs
MRSubmissionRequest request = (MRSubmissionRequest) generalRequest;
// Clone global configuration
Configuration configuration = new Configuration(globalConfiguration);