5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-19 02:10:54 +08:00

SQOOP-740 Provide export submission implementation

(Jarek Jarcec Cecho)
This commit is contained in:
Bilung Lee 2012-12-07 15:07:34 -08:00
parent e9868cb72e
commit 9ac49d95c2
18 changed files with 178 additions and 41 deletions

View File

@ -85,7 +85,7 @@ private void createConnection(String connectorId) throws IOException {
ResourceBundle frameworkBundle = frameworkBean.getResourceBundle(); ResourceBundle frameworkBundle = frameworkBean.getResourceBundle();
MConnector connector = connectorBean.getConnectors().get(0); MConnector connector = connectorBean.getConnectors().get(0);
ResourceBundle connectorBundle = connectorBean.getResourceBundles().get(0); ResourceBundle connectorBundle = connectorBean.getResourceBundles().get(connector.getPersistenceId());
MConnection connection = new MConnection(connector.getPersistenceId(), MConnection connection = new MConnection(connector.getPersistenceId(),
connector.getConnectionForms(), connector.getConnectionForms(),

View File

@ -103,7 +103,7 @@ private void createJob(String connectionId, String type) throws IOException {
connectorBean = readConnector(String.valueOf(connection.getConnectorId())); connectorBean = readConnector(String.valueOf(connection.getConnectorId()));
MConnector connector = connectorBean.getConnectors().get(0); MConnector connector = connectorBean.getConnectors().get(0);
ResourceBundle connectorBundle = connectorBean.getResourceBundles().get(0); ResourceBundle connectorBundle = connectorBean.getResourceBundles().get(connection.getPersistenceId());
MJob.Type jobType = MJob.Type.valueOf(type.toUpperCase()); MJob.Type jobType = MJob.Type.valueOf(type.toUpperCase());

View File

@ -29,7 +29,7 @@ public class GenericJdbcExportLoader extends Loader {
private int batchesPerTransaction = DEFAULT_BATCHES_PER_TRANSACTION; private int batchesPerTransaction = DEFAULT_BATCHES_PER_TRANSACTION;
@Override @Override
public void run(ImmutableContext context, DataReader reader) throws Exception{ public void load(ImmutableContext context, Object oc, Object oj, DataReader reader) throws Exception{
String driver = context.getString( String driver = context.getString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_DRIVER); GenericJdbcConnectorConstants.CONNECTOR_JDBC_DRIVER);
String url = context.getString( String url = context.getString(

View File

@ -23,11 +23,8 @@
import org.apache.sqoop.common.MutableContext; import org.apache.sqoop.common.MutableContext;
import org.apache.sqoop.common.MutableMapContext; import org.apache.sqoop.common.MutableMapContext;
import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration;
import org.apache.sqoop.job.etl.Loader; import org.apache.sqoop.job.etl.Loader;
import org.apache.sqoop.job.io.DataReader; import org.apache.sqoop.job.io.DataReader;
import org.junit.Test;
public class TestExportLoader extends TestCase { public class TestExportLoader extends TestCase {
@ -75,7 +72,7 @@ public void testInsert() throws Exception {
Loader loader = new GenericJdbcExportLoader(); Loader loader = new GenericJdbcExportLoader();
DummyReader reader = new DummyReader(); DummyReader reader = new DummyReader();
loader.run(context, reader); loader.load(context, null, null, reader);
int index = START; int index = START;
ResultSet rs = executor.executeQuery("SELECT * FROM " ResultSet rs = executor.executeQuery("SELECT * FROM "

View File

@ -52,9 +52,15 @@ public SubmissionRequest createSubmissionRequest() {
} }
/** /**
* Prepare given submission request for import submission. * Prepare given submission request for import job type.
* *
* @param request Submission request * @param request Submission request
*/ */
public abstract void prepareImportSubmission(SubmissionRequest request); public abstract void prepareImportSubmission(SubmissionRequest request);
/**
* Prepare given submission request for export job type..
* @param request
*/
public abstract void prepareExportSubmission(SubmissionRequest request);
} }

View File

@ -403,7 +403,7 @@ public static MSubmission submit(long jobId) {
prepareImportSubmission(request); prepareImportSubmission(request);
break; break;
case EXPORT: case EXPORT:
// TODO(jarcec): Implement export path prepareExportSubmission(request);
break; break;
default: default:
throw new SqoopException(FrameworkError.FRAMEWORK_0005, throw new SqoopException(FrameworkError.FRAMEWORK_0005,
@ -450,6 +450,19 @@ private static void prepareImportSubmission(SubmissionRequest request) {
executionEngine.prepareImportSubmission(request); executionEngine.prepareImportSubmission(request);
} }
private static void prepareExportSubmission(SubmissionRequest request) {
ExportJobConfiguration jobConfiguration = (ExportJobConfiguration) request.getConfigFrameworkJob();
// We're directly moving configured number of extractors and loaders to
// underlying request object. In the future we might need to throttle this
// count based on other running jobs to meet our SLAs.
request.setExtractors(jobConfiguration.throttling.extractors);
request.setLoaders(jobConfiguration.throttling.loaders);
// Delegate rest of the job to execution engine
executionEngine.prepareExportSubmission(request);
}
/** /**
* Callback that will be called only if we failed to submit the job to the * Callback that will be called only if we failed to submit the job to the
* remote cluster. * remote cluster.

View File

@ -26,5 +26,11 @@
@ConfigurationClass @ConfigurationClass
public class ExportJobConfiguration { public class ExportJobConfiguration {
@Form OutputForm output; @Form public InputForm input;
@Form public ThrottlingForm throttling;
public ExportJobConfiguration() {
throttling = new ThrottlingForm();
}
} }

View File

@ -29,4 +29,9 @@ public class ImportJobConfiguration {
@Form public OutputForm output; @Form public OutputForm output;
@Form public ThrottlingForm throttling; @Form public ThrottlingForm throttling;
public ImportJobConfiguration() {
output = new OutputForm();
throttling = new ThrottlingForm();
}
} }

View File

@ -0,0 +1,30 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.framework.configuration;
import org.apache.sqoop.model.FormClass;
import org.apache.sqoop.model.Input;
/**
*
*/
@FormClass
public class InputForm {
@Input(size = 50) public String inputDirectory;
}

View File

@ -44,6 +44,14 @@ output.outputDirectory.help = Output directory for final data
output.ignored.label = Ignored output.ignored.label = Ignored
output.ignored.help = This value is ignored output.ignored.help = This value is ignored
# Input Form
#
input.label = Input configuration
input.help = Specifies information required to get data from Hadoop ecosystem
input.inputDirectory.label = Input directory
input.inputDirectory.help = Directory that should be exported
# Throttling From # Throttling From
# #
throttling.label = Throttling resources throttling.label = Throttling resources

View File

@ -18,32 +18,44 @@
package org.apache.sqoop.execution.mapreduce; package org.apache.sqoop.execution.mapreduce;
import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.sqoop.common.MutableMapContext; import org.apache.sqoop.common.MutableMapContext;
import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.framework.ExecutionEngine; import org.apache.sqoop.framework.ExecutionEngine;
import org.apache.sqoop.framework.SubmissionRequest; import org.apache.sqoop.framework.SubmissionRequest;
import org.apache.sqoop.framework.configuration.ExportJobConfiguration;
import org.apache.sqoop.framework.configuration.ImportJobConfiguration; import org.apache.sqoop.framework.configuration.ImportJobConfiguration;
import org.apache.sqoop.framework.configuration.OutputFormat; import org.apache.sqoop.framework.configuration.OutputFormat;
import org.apache.sqoop.job.JobConstants; import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.MapreduceExecutionError; import org.apache.sqoop.job.MapreduceExecutionError;
import org.apache.sqoop.job.etl.Exporter;
import org.apache.sqoop.job.etl.HdfsExportPartitioner;
import org.apache.sqoop.job.etl.HdfsSequenceImportLoader; import org.apache.sqoop.job.etl.HdfsSequenceImportLoader;
import org.apache.sqoop.job.etl.HdfsTextExportExtractor;
import org.apache.sqoop.job.etl.HdfsTextImportLoader; import org.apache.sqoop.job.etl.HdfsTextImportLoader;
import org.apache.sqoop.job.etl.Importer; import org.apache.sqoop.job.etl.Importer;
import org.apache.sqoop.job.io.Data; import org.apache.sqoop.job.io.Data;
import org.apache.sqoop.job.mr.SqoopFileOutputFormat; import org.apache.sqoop.job.mr.SqoopFileOutputFormat;
import org.apache.sqoop.job.mr.SqoopInputFormat; import org.apache.sqoop.job.mr.SqoopInputFormat;
import org.apache.sqoop.job.mr.SqoopMapper; import org.apache.sqoop.job.mr.SqoopMapper;
import org.apache.sqoop.job.mr.SqoopNullOutputFormat;
/** /**
* *
*/ */
public class MapreduceExecutionEngine extends ExecutionEngine { public class MapreduceExecutionEngine extends ExecutionEngine {
/**
* {@inheritDoc}
*/
@Override @Override
public SubmissionRequest createSubmissionRequest() { public SubmissionRequest createSubmissionRequest() {
return new MRSubmissionRequest(); return new MRSubmissionRequest();
} }
/**
* {@inheritDoc}
*/
@Override @Override
public void prepareImportSubmission(SubmissionRequest gRequest) { public void prepareImportSubmission(SubmissionRequest gRequest) {
MRSubmissionRequest request = (MRSubmissionRequest) gRequest; MRSubmissionRequest request = (MRSubmissionRequest) gRequest;
@ -82,4 +94,40 @@ public void prepareImportSubmission(SubmissionRequest gRequest) {
"Format: " + jobConf.output.outputFormat); "Format: " + jobConf.output.outputFormat);
} }
} }
/**
* {@inheritDoc}
*/
@Override
public void prepareExportSubmission(SubmissionRequest gRequest) {
MRSubmissionRequest request = (MRSubmissionRequest) gRequest;
ExportJobConfiguration jobConf = (ExportJobConfiguration) request.getConfigFrameworkJob();
// Configure map-reduce classes for import
request.setInputFormatClass(SqoopInputFormat.class);
request.setMapperClass(SqoopMapper.class);
request.setMapOutputKeyClass(Data.class);
request.setMapOutputValueClass(NullWritable.class);
request.setOutputFormatClass(SqoopNullOutputFormat.class);
request.setOutputKeyClass(Data.class);
request.setOutputValueClass(NullWritable.class);
Exporter exporter = (Exporter)request.getConnectorCallbacks();
// Set up framework context
MutableMapContext context = request.getFrameworkContext();
context.setString(JobConstants.JOB_ETL_PARTITIONER, HdfsExportPartitioner.class.getName());
context.setString(JobConstants.JOB_ETL_LOADER, exporter.getLoader().getName());
context.setString(JobConstants.JOB_ETL_DESTROYER, exporter.getDestroyer().getName());
// We should make one extractor that will be able to read all supported file types
context.setString(JobConstants.JOB_ETL_EXTRACTOR, HdfsTextExportExtractor.class.getName());
context.setString(FileInputFormat.INPUT_DIR, jobConf.input.inputDirectory);
if(request.getExtractors() != null) {
context.setInteger(JobConstants.JOB_ETL_EXTRACTOR_NUM, request.getExtractors());
}
}
} }

View File

@ -22,6 +22,7 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.HashMap; import java.util.HashMap;
import java.util.Set; import java.util.Set;
@ -30,7 +31,6 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
@ -42,7 +42,7 @@
import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.NetworkTopology;
import org.apache.sqoop.common.ImmutableContext; import org.apache.sqoop.common.ImmutableContext;
import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.job.Constants; import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.MapreduceExecutionError; import org.apache.sqoop.job.MapreduceExecutionError;
import org.apache.sqoop.job.PrefixContext; import org.apache.sqoop.job.PrefixContext;
@ -68,12 +68,10 @@ public class HdfsExportPartitioner extends Partitioner {
@Override @Override
public List<Partition> getPartitions(ImmutableContext context, public List<Partition> getPartitions(ImmutableContext context,
long maxPartitions, Object connectionConfiguration, Object jobConfiguration) { long numTasks, Object connectionConfiguration, Object jobConfiguration) {
Configuration conf = ((PrefixContext)context).getConfiguration(); Configuration conf = ((PrefixContext)context).getConfiguration();
try { try {
int numTasks = Integer.parseInt(conf.get(
Constants.JOB_ETL_NUMBER_PARTITIONS));
long numInputBytes = getInputSize(conf); long numInputBytes = getInputSize(conf);
maxSplitSize = numInputBytes / numTasks; maxSplitSize = numInputBytes / numTasks;
@ -117,24 +115,21 @@ public List<Partition> getPartitions(ImmutableContext context,
// all the files in input set // all the files in input set
String indir = conf.get(FileInputFormat.INPUT_DIR); String indir = conf.get(FileInputFormat.INPUT_DIR);
FileSystem fs = FileSystem.get(conf); FileSystem fs = FileSystem.get(conf);
Path[] paths = FileUtil.stat2Paths(fs.listStatus(new Path(indir)));
List<Path> paths = new LinkedList<Path>();
for(FileStatus status : fs.listStatus(new Path(indir))) {
if(!status.isDirectory()) {
paths.add(status.getPath());
}
}
List<Partition> partitions = new ArrayList<Partition>(); List<Partition> partitions = new ArrayList<Partition>();
if (paths.length == 0) { if (paths.size() == 0) {
return partitions; return partitions;
} }
// Convert them to Paths first. This is a costly operation and
// we should do it first, otherwise we will incur doing it multiple
// times, one time each for each pool in the next loop.
List<Path> newpaths = new ArrayList<Path>();
for (int i = 0; i < paths.length; i++) {
Path p = new Path(paths[i].toUri().getPath());
newpaths.add(p);
}
paths = null;
// create splits for all files that are not in any pool. // create splits for all files that are not in any pool.
getMoreSplits(conf, newpaths.toArray(new Path[newpaths.size()]), getMoreSplits(conf, paths,
maxSize, minSizeNode, minSizeRack, partitions); maxSize, minSizeNode, minSizeRack, partitions);
// free up rackToNodes map // free up rackToNodes map
@ -161,7 +156,7 @@ private long getInputSize(Configuration conf) throws IOException {
/** /**
* Return all the splits in the specified set of paths * Return all the splits in the specified set of paths
*/ */
private void getMoreSplits(Configuration conf, Path[] paths, private void getMoreSplits(Configuration conf, List<Path> paths,
long maxSize, long minSizeNode, long minSizeRack, long maxSize, long minSizeNode, long minSizeRack,
List<Partition> partitions) throws IOException { List<Partition> partitions) throws IOException {
@ -180,14 +175,14 @@ private void getMoreSplits(Configuration conf, Path[] paths,
HashMap<String, List<OneBlockInfo>> nodeToBlocks = HashMap<String, List<OneBlockInfo>> nodeToBlocks =
new HashMap<String, List<OneBlockInfo>>(); new HashMap<String, List<OneBlockInfo>>();
files = new OneFileInfo[paths.length]; files = new OneFileInfo[paths.size()];
if (paths.length == 0) { if (paths.size() == 0) {
return; return;
} }
// populate all the blocks for all files // populate all the blocks for all files
for (int i = 0; i < paths.length; i++) { for (int i = 0; i < paths.size(); i++) {
files[i] = new OneFileInfo(paths[i], conf, isSplitable(conf, paths[i]), files[i] = new OneFileInfo(paths.get(i), conf, isSplitable(conf, paths.get(i)),
rackToBlocks, blockToNodes, nodeToBlocks, rackToBlocks, blockToNodes, nodeToBlocks,
rackToNodes, maxSize); rackToNodes, maxSize);
} }

View File

@ -46,7 +46,7 @@ public HdfsSequenceImportLoader() {
} }
@Override @Override
public void run(ImmutableContext context, DataReader reader) throws Exception{ public void load(ImmutableContext context, Object oc, Object oj, DataReader reader) throws Exception{
reader.setFieldDelimiter(fieldDelimiter); reader.setFieldDelimiter(fieldDelimiter);
Configuration conf = new Configuration(); Configuration conf = new Configuration();

View File

@ -46,7 +46,7 @@ public HdfsTextImportLoader() {
} }
@Override @Override
public void run(ImmutableContext context, DataReader reader) throws Exception{ public void load(ImmutableContext context, Object oc, Object oj, DataReader reader) throws Exception{
reader.setFieldDelimiter(fieldDelimiter); reader.setFieldDelimiter(fieldDelimiter);
Configuration conf = new Configuration(); Configuration conf = new Configuration();

View File

@ -169,11 +169,28 @@ public void run() {
String loaderName = conf.get(JobConstants.JOB_ETL_LOADER); String loaderName = conf.get(JobConstants.JOB_ETL_LOADER);
Loader loader = (Loader) ClassUtils.instantiate(loaderName); Loader loader = (Loader) ClassUtils.instantiate(loaderName);
// Get together framework context as configuration prefix by nothing // Objects that should be pass to the Executor execution
PrefixContext frameworkContext = new PrefixContext(conf, ""); PrefixContext subContext = null;
Object configConnection = null;
Object configJob = null;
switch (ConfigurationUtils.getJobType(conf)) {
case EXPORT:
subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_CONTEXT);
configConnection = ConfigurationUtils.getConnectorConnection(conf);
configJob = ConfigurationUtils.getConnectorJob(conf);
break;
case IMPORT:
subContext = new PrefixContext(conf, "");
configConnection = ConfigurationUtils.getFrameworkConnection(conf);
configJob = ConfigurationUtils.getFrameworkJob(conf);
break;
default:
throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0023);
}
try { try {
loader.run(frameworkContext, reader); loader.load(subContext, configConnection, configJob, reader);
} catch (Throwable t) { } catch (Throwable t) {
LOG.error("Error while loading data out of MR job.", t); LOG.error("Error while loading data out of MR job.", t);
throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0018, t); throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0018, t);

View File

@ -233,7 +233,7 @@ private String padZeros(int number, int digits) {
public static class DummyLoader extends Loader { public static class DummyLoader extends Loader {
@Override @Override
public void run(ImmutableContext context, DataReader reader) public void load(ImmutableContext context, Object oc, Object oj, DataReader reader)
throws Exception { throws Exception {
int index = 1; int index = 1;
int sum = 0; int sum = 0;

View File

@ -216,7 +216,7 @@ public static class DummyLoader extends Loader {
private Data actual = new Data(); private Data actual = new Data();
@Override @Override
public void run(ImmutableContext context, DataReader reader) throws Exception{ public void load(ImmutableContext context, Object oc, Object oj, DataReader reader) throws Exception{
Object[] array; Object[] array;
while ((array = reader.readArrayRecord()) != null) { while ((array = reader.readArrayRecord()) != null) {
actual.setContent(array, Data.ARRAY_RECORD); actual.setContent(array, Data.ARRAY_RECORD);

View File

@ -25,6 +25,18 @@
*/ */
public abstract class Loader { public abstract class Loader {
public abstract void run(ImmutableContext context, DataReader reader) throws Exception; /**
* Load data to target.
*
* @param context Context object
* @param connectionConfiguration Connection configuration
* @param jobConfiguration Job configuration
* @param reader Data reader object
* @throws Exception
*/
public abstract void load(ImmutableContext context,
Object connectionConfiguration,
Object jobConfiguration,
DataReader reader) throws Exception;
} }