5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-06 17:01:59 +08:00

SQOOP-610: Job submission engine for import

(Bilung Lee via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2012-10-19 13:28:18 -07:00
parent 6cd9e96888
commit 7fbd3ba94b
7 changed files with 747 additions and 0 deletions

View File

@ -21,6 +21,16 @@
public final class JobConstants extends Constants { public final class JobConstants extends Constants {
// Metadata constants
public static final String INPUT_JOB_JOB_TYPE = "inp-job-job-type";
public static final String INPUT_JOB_STORAGE_TYPE = "inp-job-storage-type";
public static final String INPUT_JOB_FORMAT_TYPE = "inp-job-format-type";
public static final String INPUT_JOB_OUTPUT_CODEC = "inp-job-output-codec";
public static final String INPUT_JOB_MAX_EXTRACTORS = "inp-job-max-extractors";
public static final String INPUT_JOB_MAX_LOADERS = "inp-job-max-loaders";
/** /**
* All job related configuration is prefixed with this: * All job related configuration is prefixed with this:
* <tt>org.apache.sqoop.job.</tt> * <tt>org.apache.sqoop.job.</tt>

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.job;
import org.apache.sqoop.job.etl.EtlFramework;
import org.apache.sqoop.job.etl.EtlOptions;
import org.apache.sqoop.job.mr.MrExecution;
/**
* This class supports Sqoop job execution.
*/
public class JobEngine {
public void run(EtlOptions options) {
EtlFramework etl = new EtlFramework(options);
MrExecution mr = new MrExecution(etl);
mr.initialize();
mr.run();
mr.destroy();
}
}

View File

@ -0,0 +1,148 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.job.etl;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.core.CoreError;
import org.apache.sqoop.job.etl.Destroyer;
import org.apache.sqoop.job.etl.Exporter;
import org.apache.sqoop.job.etl.Extractor;
import org.apache.sqoop.job.etl.Importer;
import org.apache.sqoop.job.etl.Initializer;
import org.apache.sqoop.job.etl.Loader;
import org.apache.sqoop.job.etl.Partitioner;
import org.apache.sqoop.job.etl.EtlOptions.FormatType;
import org.apache.sqoop.job.etl.EtlOptions.JobType;
import org.apache.sqoop.job.etl.EtlOptions.StorageType;
/**
* This class encapsulates the whole ETL framework.
*
* For import:
* Initializer (connector-defined)
* -> Partitioner (connector-defined)
* -> Extractor (connector-defined)
* -> Loader (framework-defined)
* -> Destroyer (connector-defined)
*
* For export:
* Initializer (connector-defined)
* -> Partitioner (framework-defined)
* -> Extractor (framework-defined)
* -> Loader (connector-defined)
* -> Destroyer (connector-defined)
*/
public class EtlFramework {
private Class<? extends Initializer> initializer;
private Class<? extends Partitioner> partitioner;
private Class<? extends Extractor> extractor;
private Class<? extends Loader> loader;
private Class<? extends Destroyer> destroyer;
private boolean requireFieldNames;
private boolean requireOutputDirectory;
private EtlOptions options;
public EtlFramework(EtlOptions inputs) {
this.options = inputs;
JobType jobType = options.getJobType();
switch (jobType) {
case IMPORT:
constructImport();
break;
case EXPORT:
constructExport();
break;
default:
throw new SqoopException(CoreError.CORE_0012, jobType.toString());
}
}
public EtlOptions getOptions() {
return options;
}
public Class<? extends Initializer> getInitializer() {
return initializer;
}
public Class<? extends Partitioner> getPartitioner() {
return partitioner;
}
public Class<? extends Extractor> getExtractor() {
return extractor;
}
public Class<? extends Loader> getLoader() {
return loader;
}
public Class<? extends Destroyer> getDestroyer() {
return destroyer;
}
public boolean isFieldNamesRequired() {
return requireFieldNames;
}
public boolean isOutputDirectoryRequired() {
return requireOutputDirectory;
}
private void constructImport() {
Importer importer = options.getConnector().getImporter();
initializer = importer.getInitializer();
partitioner = importer.getPartitioner();
extractor = importer.getExtractor();
destroyer = importer.getDestroyer();
StorageType storageType = options.getStorageType();
switch (storageType) {
case HDFS:
FormatType formatType = options.getFormatType();
switch (formatType) {
case TEXT:
loader = HdfsTextImportLoader.class;
requireOutputDirectory = true;
break;
case SEQUENCE:
loader = HdfsSequenceImportLoader.class;
requireOutputDirectory = true;
break;
default:
throw new SqoopException(CoreError.CORE_0012, formatType.toString());
}
break;
default:
throw new SqoopException(CoreError.CORE_0012, storageType.toString());
}
}
private void constructExport() {
Exporter exporter = options.getConnector().getExporter();
initializer = exporter.getInitializer();
loader = exporter.getLoader();
destroyer = exporter.getDestroyer();
// FIXME: set partitioner/extractor based on storage/format types
}
}

View File

@ -0,0 +1,165 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.job.etl;
import java.util.HashMap;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.spi.SqoopConnector;
import org.apache.sqoop.core.CoreError;
import org.apache.sqoop.job.JobConstants;
/**
* This class retrieves information for job execution from user-input options.
*/
public class EtlOptions implements Options {
HashMap<String, String> store = new HashMap<String, String>();
public EtlOptions(SqoopConnector connector) {
this.connector = connector;
}
private SqoopConnector connector;
public SqoopConnector getConnector() {
return connector;
}
private JobType jobType = null;
public enum JobType {
IMPORT,
EXPORT
}
public JobType getJobType() {
if (jobType != null) {
return jobType;
}
String option = store.get(JobConstants.INPUT_JOB_JOB_TYPE);
if (option == null || option.equalsIgnoreCase("IMPORT")) {
jobType = JobType.IMPORT;
} else if (option.equalsIgnoreCase("EXPORT")) {
jobType = JobType.EXPORT;
} else {
throw new SqoopException(CoreError.CORE_0012, option);
}
return jobType;
}
private StorageType storageType = null;
public enum StorageType {
HDFS
}
public StorageType getStorageType() {
if (storageType != null) {
return storageType;
}
String option = store.get(JobConstants.INPUT_JOB_STORAGE_TYPE);
if (option == null || option.equalsIgnoreCase("HDFS")) {
storageType = StorageType.HDFS;
} else {
throw new SqoopException(CoreError.CORE_0012, option);
}
return storageType;
}
private FormatType formatType = null;
public enum FormatType {
TEXT,
SEQUENCE
}
public FormatType getFormatType() {
if (formatType != null) {
return formatType;
}
String option = store.get(JobConstants.INPUT_JOB_FORMAT_TYPE);
if (option == null || option.equalsIgnoreCase("TEXT")) {
formatType = FormatType.TEXT;
} else if (option.equalsIgnoreCase("SEQUENCE")) {
formatType = FormatType.SEQUENCE;
} else {
throw new SqoopException(CoreError.CORE_0012, option);
}
return formatType;
}
public String getOutputCodec() {
return store.get(JobConstants.INPUT_JOB_OUTPUT_CODEC);
}
private int maxExtractors = -1;
public int getMaxExtractors() {
if (maxExtractors != -1) {
return maxExtractors;
}
String option = store.get(JobConstants.INPUT_JOB_MAX_EXTRACTORS);
if (option != null) {
maxExtractors = Integer.parseInt(option);
} else {
JobType type = getJobType();
switch (type) {
case IMPORT:
maxExtractors = 4;
break;
case EXPORT:
maxExtractors = 1;
break;
default:
throw new SqoopException(CoreError.CORE_0012, type.toString());
}
}
return maxExtractors;
}
private int maxLoaders = -1;
public int getMaxLoaders() {
if (maxLoaders != -1) {
return maxLoaders;
}
String option = store.get(JobConstants.INPUT_JOB_MAX_LOADERS);
if (option != null) {
maxLoaders = Integer.parseInt(option);
} else {
JobType type = getJobType();
switch (type) {
case IMPORT:
maxLoaders = 1;
break;
case EXPORT:
maxLoaders = 4;
break;
default:
throw new SqoopException(CoreError.CORE_0012, type.toString());
}
}
return maxLoaders;
}
public void setOption(String key, String value) {
store.put(key, value);
}
@Override
public String getOption(String key) {
return store.get(key);
}
}

View File

@ -0,0 +1,153 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.job.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.core.CoreError;
import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.etl.Destroyer;
import org.apache.sqoop.job.etl.EtlContext;
import org.apache.sqoop.job.etl.EtlFramework;
import org.apache.sqoop.job.etl.EtlMutableContext;
import org.apache.sqoop.job.etl.Initializer;
import org.apache.sqoop.job.etl.EtlOptions;
import org.apache.sqoop.job.etl.EtlOptions.JobType;
import org.apache.sqoop.job.io.Data;
/**
* This class encapsulates the whole MapReduce execution.
*/
public class MrExecution {
private Configuration conf;
private EtlFramework etl;
public MrExecution(EtlFramework etl) {
this.conf = new Configuration();
this.etl = etl;
}
public void initialize() {
EtlOptions options = etl.getOptions();
conf.setInt(JobConstants.JOB_ETL_NUMBER_PARTITIONS,
options.getMaxExtractors());
if (options.getOutputCodec() != null) {
conf.setBoolean(FileOutputFormat.COMPRESS, true);
conf.set(FileOutputFormat.COMPRESS_CODEC, options.getOutputCodec());
}
conf.set(JobConstants.JOB_ETL_PARTITIONER, etl.getPartitioner().getName());
conf.set(JobConstants.JOB_ETL_EXTRACTOR, etl.getExtractor().getName());
conf.set(JobConstants.JOB_ETL_LOADER, etl.getLoader().getName());
EtlMutableContext context = new EtlMutableContext(conf);
Class<? extends Initializer> initializer = etl.getInitializer();
if (initializer != null) {
Initializer instance;
try {
instance = (Initializer) initializer.newInstance();
} catch (Exception e) {
throw new SqoopException(CoreError.CORE_0010, initializer.getName(), e);
}
instance.run(context, options);
}
JobType jobType = etl.getOptions().getJobType();
switch (jobType) {
case IMPORT:
checkImportConfiguration(context);
break;
case EXPORT:
checkExportConfiguration(context);
break;
default:
throw new SqoopException(CoreError.CORE_0012, jobType.toString());
}
}
public void run() {
EtlOptions options = etl.getOptions();
try {
Job job = Job.getInstance(conf);
job.setInputFormatClass(SqoopInputFormat.class);
job.setMapperClass(SqoopMapper.class);
job.setMapOutputKeyClass(Data.class);
job.setMapOutputValueClass(NullWritable.class);
if (options.getMaxLoaders() > 1) {
job.setReducerClass(SqoopReducer.class);
job.setNumReduceTasks(options.getMaxLoaders());
}
job.setOutputFormatClass((etl.isOutputDirectoryRequired()) ?
SqoopFileOutputFormat.class : SqoopNullOutputFormat.class);
job.setOutputKeyClass(Data.class);
job.setOutputValueClass(NullWritable.class);
boolean success = job.waitForCompletion(true);
if (!success) {
throw new SqoopException(CoreError.CORE_0008);
}
} catch (Exception e) {
throw new SqoopException(CoreError.CORE_0008, e);
}
}
public void destroy() {
Class<? extends Destroyer> destroyer = etl.getDestroyer();
if (destroyer != null) {
Destroyer instance;
try {
instance = (Destroyer) destroyer.newInstance();
} catch (Exception e) {
throw new SqoopException(CoreError.CORE_0010, destroyer.getName(), e);
}
instance.run(new EtlContext(conf));
}
}
private void checkImportConfiguration(EtlMutableContext context) {
if (etl.isFieldNamesRequired() &&
context.getString(JobConstants.JOB_ETL_FIELD_NAMES) == null) {
throw new SqoopException(CoreError.CORE_0020, "field names");
}
if (etl.isOutputDirectoryRequired()) {
String outputDirectory =
context.getString(JobConstants.JOB_ETL_OUTPUT_DIRECTORY);
if (outputDirectory == null) {
throw new SqoopException(CoreError.CORE_0020, "output directory");
} else {
context.setString(FileOutputFormat.OUTDIR, outputDirectory);
}
}
}
private void checkExportConfiguration(EtlMutableContext context) {
// TODO: check export related configuration
}
}

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.job.mr;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.sqoop.job.io.Data;
/**
* A reducer to perform reduce function.
*/
public class SqoopReducer
extends Reducer<Data, NullWritable, Data, NullWritable> {
public static final Log LOG =
LogFactory.getLog(SqoopReducer.class.getName());
}

View File

@ -0,0 +1,199 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.job;
import java.io.BufferedReader;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.ResourceBundle;
import junit.framework.TestCase;
import org.apache.sqoop.connector.spi.SqoopConnector;
import org.apache.sqoop.job.JobEngine;
import org.apache.sqoop.job.etl.Context;
import org.apache.sqoop.job.etl.EtlOptions;
import org.apache.sqoop.job.etl.Exporter;
import org.apache.sqoop.job.etl.Extractor;
import org.apache.sqoop.job.etl.Importer;
import org.apache.sqoop.job.etl.Initializer;
import org.apache.sqoop.job.etl.MutableContext;
import org.apache.sqoop.job.etl.Options;
import org.apache.sqoop.job.etl.Partition;
import org.apache.sqoop.job.etl.Partitioner;
import org.apache.sqoop.job.io.Data;
import org.apache.sqoop.job.io.DataWriter;
import org.apache.sqoop.model.MConnectionForms;
import org.apache.sqoop.model.MJob.Type;
import org.apache.sqoop.model.MJobForms;
import org.apache.sqoop.validation.Validator;
import org.junit.Test;
public class TestJobEngine extends TestCase {
private static final String DATA_DIR = TestJobEngine.class.getSimpleName();
private static final String WAREHOUSE_ROOT = "/tmp/sqoop/warehouse/";
private static final String OUTPUT_DIR = WAREHOUSE_ROOT + DATA_DIR;
private static final String OUTPUT_FILE = "part-r-00000";
private static final int START_PARTITION = 1;
private static final int NUMBER_OF_PARTITIONS = 9;
private static final int NUMBER_OF_ROWS_PER_PARTITION = 10;
@Test
public void testImport() throws Exception {
FileUtils.delete(OUTPUT_DIR);
DummyConnector connector = new DummyConnector();
EtlOptions options = new EtlOptions(connector);
JobEngine engine = new JobEngine();
engine.run(options);
String fileName = OUTPUT_DIR + "/" + OUTPUT_FILE;
InputStream filestream = FileUtils.open(fileName);
BufferedReader filereader = new BufferedReader(new InputStreamReader(
filestream, Data.CHARSET_NAME));
verifyOutput(filereader);
}
private void verifyOutput(BufferedReader reader)
throws IOException {
String line = null;
int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION;
Data expected = new Data();
while ((line = reader.readLine()) != null){
expected.setContent(new Object[] {
new Integer(index),
new Double(index),
String.valueOf(index) },
Data.ARRAY_RECORD);
index++;
assertEquals(expected.toString(), line);
}
reader.close();
assertEquals(NUMBER_OF_PARTITIONS*NUMBER_OF_ROWS_PER_PARTITION,
index-START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION);
}
public class DummyConnector implements SqoopConnector {
@Override
public Importer getImporter() {
return new Importer(
DummyImportInitializer.class,
DummyImportPartitioner.class,
DummyImportExtractor.class,
null);
}
@Override
public Exporter getExporter() {
fail("This method should not be invoked.");
return null;
}
@Override
public ResourceBundle getBundle(Locale locale) {
fail("This method should not be invoked.");
return null;
}
@Override
public Validator getValidator() {
fail("This method should not be invoked.");
return null;
}
@Override
public Class getConnectionConfigurationClass() {
fail("This method should not be invoked.");
return null;
}
@Override
public Class getJobConfigurationClass(Type jobType) {
fail("This method should not be invoked.");
return null;
}
}
public static class DummyImportInitializer extends Initializer {
@Override
public void run(MutableContext context, Options options) {
context.setString(Constants.JOB_ETL_OUTPUT_DIRECTORY, OUTPUT_DIR);
}
}
public static class DummyImportPartition extends Partition {
private int id;
public void setId(int id) {
this.id = id;
}
public int getId() {
return id;
}
@Override
public void readFields(DataInput in) throws IOException {
id = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(id);
}
}
public static class DummyImportPartitioner extends Partitioner {
@Override
public List<Partition> run(Context context) {
List<Partition> partitions = new LinkedList<Partition>();
for (int id = START_PARTITION; id <= NUMBER_OF_PARTITIONS; id++) {
DummyImportPartition partition = new DummyImportPartition();
partition.setId(id);
partitions.add(partition);
}
return partitions;
}
}
public static class DummyImportExtractor extends Extractor {
@Override
public void run(Context context, Partition partition, DataWriter writer) {
int id = ((DummyImportPartition)partition).getId();
for (int row = 0; row < NUMBER_OF_ROWS_PER_PARTITION; row++) {
writer.writeArrayRecord(new Object[] {
new Integer(id*NUMBER_OF_ROWS_PER_PARTITION+row),
new Double(id*NUMBER_OF_ROWS_PER_PARTITION+row),
String.valueOf(id*NUMBER_OF_ROWS_PER_PARTITION+row)});
}
}
}
}