diff --git a/client/src/main/java/org/apache/sqoop/client/shell/CreateConnectionFunction.java b/client/src/main/java/org/apache/sqoop/client/shell/CreateConnectionFunction.java index 734276df..4df1c710 100644 --- a/client/src/main/java/org/apache/sqoop/client/shell/CreateConnectionFunction.java +++ b/client/src/main/java/org/apache/sqoop/client/shell/CreateConnectionFunction.java @@ -85,7 +85,7 @@ private void createConnection(String connectorId) throws IOException { ResourceBundle frameworkBundle = frameworkBean.getResourceBundle(); MConnector connector = connectorBean.getConnectors().get(0); - ResourceBundle connectorBundle = connectorBean.getResourceBundles().get(0); + ResourceBundle connectorBundle = connectorBean.getResourceBundles().get(connector.getPersistenceId()); MConnection connection = new MConnection(connector.getPersistenceId(), connector.getConnectionForms(), diff --git a/client/src/main/java/org/apache/sqoop/client/shell/CreateJobFunction.java b/client/src/main/java/org/apache/sqoop/client/shell/CreateJobFunction.java index 0b685bf1..3aa6c4fd 100644 --- a/client/src/main/java/org/apache/sqoop/client/shell/CreateJobFunction.java +++ b/client/src/main/java/org/apache/sqoop/client/shell/CreateJobFunction.java @@ -103,7 +103,7 @@ private void createJob(String connectionId, String type) throws IOException { connectorBean = readConnector(String.valueOf(connection.getConnectorId())); MConnector connector = connectorBean.getConnectors().get(0); - ResourceBundle connectorBundle = connectorBean.getResourceBundles().get(0); + ResourceBundle connectorBundle = connectorBean.getResourceBundles().get(connection.getPersistenceId()); MJob.Type jobType = MJob.Type.valueOf(type.toUpperCase()); diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java index 13574b2d..b2e59f75 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java @@ -29,7 +29,7 @@ public class GenericJdbcExportLoader extends Loader { private int batchesPerTransaction = DEFAULT_BATCHES_PER_TRANSACTION; @Override - public void run(ImmutableContext context, DataReader reader) throws Exception{ + public void load(ImmutableContext context, Object oc, Object oj, DataReader reader) throws Exception{ String driver = context.getString( GenericJdbcConnectorConstants.CONNECTOR_JDBC_DRIVER); String url = context.getString( diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java index c97693dc..bf3c5e22 100644 --- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java @@ -23,11 +23,8 @@ import org.apache.sqoop.common.MutableContext; import org.apache.sqoop.common.MutableMapContext; -import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; -import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration; import org.apache.sqoop.job.etl.Loader; import org.apache.sqoop.job.io.DataReader; -import org.junit.Test; public class TestExportLoader extends TestCase { @@ -75,7 +72,7 @@ public void testInsert() throws Exception { Loader loader = new GenericJdbcExportLoader(); DummyReader reader = new DummyReader(); - loader.run(context, reader); + loader.load(context, null, null, reader); int index = START; ResultSet rs = executor.executeQuery("SELECT * FROM " diff --git a/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java b/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java index ae14d9a8..f43942df 100644 --- a/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java +++ b/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java @@ -52,9 +52,15 @@ public SubmissionRequest createSubmissionRequest() { } /** - * Prepare given submission request for import submission. + * Prepare given submission request for import job type. * * @param request Submission request */ public abstract void prepareImportSubmission(SubmissionRequest request); + + /** + * Prepare given submission request for export job type.. + * @param request + */ + public abstract void prepareExportSubmission(SubmissionRequest request); } diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java index 66746437..a5ac74f8 100644 --- a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java +++ b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java @@ -403,7 +403,7 @@ public static MSubmission submit(long jobId) { prepareImportSubmission(request); break; case EXPORT: - // TODO(jarcec): Implement export path + prepareExportSubmission(request); break; default: throw new SqoopException(FrameworkError.FRAMEWORK_0005, @@ -450,6 +450,19 @@ private static void prepareImportSubmission(SubmissionRequest request) { executionEngine.prepareImportSubmission(request); } + private static void prepareExportSubmission(SubmissionRequest request) { + ExportJobConfiguration jobConfiguration = (ExportJobConfiguration) request.getConfigFrameworkJob(); + + // We're directly moving configured number of extractors and loaders to + // underlying request object. In the future we might need to throttle this + // count based on other running jobs to meet our SLAs. + request.setExtractors(jobConfiguration.throttling.extractors); + request.setLoaders(jobConfiguration.throttling.loaders); + + // Delegate rest of the job to execution engine + executionEngine.prepareExportSubmission(request); + } + /** * Callback that will be called only if we failed to submit the job to the * remote cluster. diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/ExportJobConfiguration.java b/core/src/main/java/org/apache/sqoop/framework/configuration/ExportJobConfiguration.java index 330aff0d..d533089c 100644 --- a/core/src/main/java/org/apache/sqoop/framework/configuration/ExportJobConfiguration.java +++ b/core/src/main/java/org/apache/sqoop/framework/configuration/ExportJobConfiguration.java @@ -26,5 +26,11 @@ @ConfigurationClass public class ExportJobConfiguration { - @Form OutputForm output; + @Form public InputForm input; + + @Form public ThrottlingForm throttling; + + public ExportJobConfiguration() { + throttling = new ThrottlingForm(); + } } diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/ImportJobConfiguration.java b/core/src/main/java/org/apache/sqoop/framework/configuration/ImportJobConfiguration.java index c674fc26..2a35eb98 100644 --- a/core/src/main/java/org/apache/sqoop/framework/configuration/ImportJobConfiguration.java +++ b/core/src/main/java/org/apache/sqoop/framework/configuration/ImportJobConfiguration.java @@ -29,4 +29,9 @@ public class ImportJobConfiguration { @Form public OutputForm output; @Form public ThrottlingForm throttling; + + public ImportJobConfiguration() { + output = new OutputForm(); + throttling = new ThrottlingForm(); + } } diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/InputForm.java b/core/src/main/java/org/apache/sqoop/framework/configuration/InputForm.java new file mode 100644 index 00000000..c97a5de9 --- /dev/null +++ b/core/src/main/java/org/apache/sqoop/framework/configuration/InputForm.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.framework.configuration; + +import org.apache.sqoop.model.FormClass; +import org.apache.sqoop.model.Input; + +/** + * + */ +@FormClass +public class InputForm { + + @Input(size = 50) public String inputDirectory; +} diff --git a/core/src/main/resources/framework-resources.properties b/core/src/main/resources/framework-resources.properties index db409462..cebc90ec 100644 --- a/core/src/main/resources/framework-resources.properties +++ b/core/src/main/resources/framework-resources.properties @@ -44,6 +44,14 @@ output.outputDirectory.help = Output directory for final data output.ignored.label = Ignored output.ignored.help = This value is ignored +# Input Form +# +input.label = Input configuration +input.help = Specifies information required to get data from Hadoop ecosystem + +input.inputDirectory.label = Input directory +input.inputDirectory.help = Directory that should be exported + # Throttling From # throttling.label = Throttling resources diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java index e2163ff0..06872ca7 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java @@ -18,32 +18,44 @@ package org.apache.sqoop.execution.mapreduce; import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.sqoop.common.MutableMapContext; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.framework.ExecutionEngine; import org.apache.sqoop.framework.SubmissionRequest; +import org.apache.sqoop.framework.configuration.ExportJobConfiguration; import org.apache.sqoop.framework.configuration.ImportJobConfiguration; import org.apache.sqoop.framework.configuration.OutputFormat; import org.apache.sqoop.job.JobConstants; import org.apache.sqoop.job.MapreduceExecutionError; +import org.apache.sqoop.job.etl.Exporter; +import org.apache.sqoop.job.etl.HdfsExportPartitioner; import org.apache.sqoop.job.etl.HdfsSequenceImportLoader; +import org.apache.sqoop.job.etl.HdfsTextExportExtractor; import org.apache.sqoop.job.etl.HdfsTextImportLoader; import org.apache.sqoop.job.etl.Importer; import org.apache.sqoop.job.io.Data; import org.apache.sqoop.job.mr.SqoopFileOutputFormat; import org.apache.sqoop.job.mr.SqoopInputFormat; import org.apache.sqoop.job.mr.SqoopMapper; +import org.apache.sqoop.job.mr.SqoopNullOutputFormat; /** * */ public class MapreduceExecutionEngine extends ExecutionEngine { + /** + * {@inheritDoc} + */ @Override public SubmissionRequest createSubmissionRequest() { return new MRSubmissionRequest(); } + /** + * {@inheritDoc} + */ @Override public void prepareImportSubmission(SubmissionRequest gRequest) { MRSubmissionRequest request = (MRSubmissionRequest) gRequest; @@ -82,4 +94,40 @@ public void prepareImportSubmission(SubmissionRequest gRequest) { "Format: " + jobConf.output.outputFormat); } } + + /** + * {@inheritDoc} + */ + @Override + public void prepareExportSubmission(SubmissionRequest gRequest) { + MRSubmissionRequest request = (MRSubmissionRequest) gRequest; + ExportJobConfiguration jobConf = (ExportJobConfiguration) request.getConfigFrameworkJob(); + + // Configure map-reduce classes for import + request.setInputFormatClass(SqoopInputFormat.class); + + request.setMapperClass(SqoopMapper.class); + request.setMapOutputKeyClass(Data.class); + request.setMapOutputValueClass(NullWritable.class); + + request.setOutputFormatClass(SqoopNullOutputFormat.class); + request.setOutputKeyClass(Data.class); + request.setOutputValueClass(NullWritable.class); + + Exporter exporter = (Exporter)request.getConnectorCallbacks(); + + // Set up framework context + MutableMapContext context = request.getFrameworkContext(); + context.setString(JobConstants.JOB_ETL_PARTITIONER, HdfsExportPartitioner.class.getName()); + context.setString(JobConstants.JOB_ETL_LOADER, exporter.getLoader().getName()); + context.setString(JobConstants.JOB_ETL_DESTROYER, exporter.getDestroyer().getName()); + + // We should make one extractor that will be able to read all supported file types + context.setString(JobConstants.JOB_ETL_EXTRACTOR, HdfsTextExportExtractor.class.getName()); + context.setString(FileInputFormat.INPUT_DIR, jobConf.input.inputDirectory); + + if(request.getExtractors() != null) { + context.setInteger(JobConstants.JOB_ETL_EXTRACTOR_NUM, request.getExtractors()); + } + } } diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java index 9e7ea4e1..7ffd97c3 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.HashMap; import java.util.Set; @@ -30,7 +31,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; @@ -42,7 +42,7 @@ import org.apache.hadoop.net.NetworkTopology; import org.apache.sqoop.common.ImmutableContext; import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.job.Constants; +import org.apache.sqoop.job.JobConstants; import org.apache.sqoop.job.MapreduceExecutionError; import org.apache.sqoop.job.PrefixContext; @@ -68,12 +68,10 @@ public class HdfsExportPartitioner extends Partitioner { @Override public List getPartitions(ImmutableContext context, - long maxPartitions, Object connectionConfiguration, Object jobConfiguration) { + long numTasks, Object connectionConfiguration, Object jobConfiguration) { Configuration conf = ((PrefixContext)context).getConfiguration(); try { - int numTasks = Integer.parseInt(conf.get( - Constants.JOB_ETL_NUMBER_PARTITIONS)); long numInputBytes = getInputSize(conf); maxSplitSize = numInputBytes / numTasks; @@ -117,24 +115,21 @@ public List getPartitions(ImmutableContext context, // all the files in input set String indir = conf.get(FileInputFormat.INPUT_DIR); FileSystem fs = FileSystem.get(conf); - Path[] paths = FileUtil.stat2Paths(fs.listStatus(new Path(indir))); + + List paths = new LinkedList(); + for(FileStatus status : fs.listStatus(new Path(indir))) { + if(!status.isDirectory()) { + paths.add(status.getPath()); + } + } + List partitions = new ArrayList(); - if (paths.length == 0) { + if (paths.size() == 0) { return partitions; } - // Convert them to Paths first. This is a costly operation and - // we should do it first, otherwise we will incur doing it multiple - // times, one time each for each pool in the next loop. - List newpaths = new ArrayList(); - for (int i = 0; i < paths.length; i++) { - Path p = new Path(paths[i].toUri().getPath()); - newpaths.add(p); - } - paths = null; - // create splits for all files that are not in any pool. - getMoreSplits(conf, newpaths.toArray(new Path[newpaths.size()]), + getMoreSplits(conf, paths, maxSize, minSizeNode, minSizeRack, partitions); // free up rackToNodes map @@ -161,7 +156,7 @@ private long getInputSize(Configuration conf) throws IOException { /** * Return all the splits in the specified set of paths */ - private void getMoreSplits(Configuration conf, Path[] paths, + private void getMoreSplits(Configuration conf, List paths, long maxSize, long minSizeNode, long minSizeRack, List partitions) throws IOException { @@ -180,14 +175,14 @@ private void getMoreSplits(Configuration conf, Path[] paths, HashMap> nodeToBlocks = new HashMap>(); - files = new OneFileInfo[paths.length]; - if (paths.length == 0) { + files = new OneFileInfo[paths.size()]; + if (paths.size() == 0) { return; } // populate all the blocks for all files - for (int i = 0; i < paths.length; i++) { - files[i] = new OneFileInfo(paths[i], conf, isSplitable(conf, paths[i]), + for (int i = 0; i < paths.size(); i++) { + files[i] = new OneFileInfo(paths.get(i), conf, isSplitable(conf, paths.get(i)), rackToBlocks, blockToNodes, nodeToBlocks, rackToNodes, maxSize); } diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java index a706ea84..a5d6b9ca 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java @@ -46,7 +46,7 @@ public HdfsSequenceImportLoader() { } @Override - public void run(ImmutableContext context, DataReader reader) throws Exception{ + public void load(ImmutableContext context, Object oc, Object oj, DataReader reader) throws Exception{ reader.setFieldDelimiter(fieldDelimiter); Configuration conf = new Configuration(); diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java index 55eb3890..490b1c21 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java @@ -46,7 +46,7 @@ public HdfsTextImportLoader() { } @Override - public void run(ImmutableContext context, DataReader reader) throws Exception{ + public void load(ImmutableContext context, Object oc, Object oj, DataReader reader) throws Exception{ reader.setFieldDelimiter(fieldDelimiter); Configuration conf = new Configuration(); diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java index 3bd1e1b3..71b47247 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java @@ -169,11 +169,28 @@ public void run() { String loaderName = conf.get(JobConstants.JOB_ETL_LOADER); Loader loader = (Loader) ClassUtils.instantiate(loaderName); - // Get together framework context as configuration prefix by nothing - PrefixContext frameworkContext = new PrefixContext(conf, ""); + // Objects that should be pass to the Executor execution + PrefixContext subContext = null; + Object configConnection = null; + Object configJob = null; + + switch (ConfigurationUtils.getJobType(conf)) { + case EXPORT: + subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_CONTEXT); + configConnection = ConfigurationUtils.getConnectorConnection(conf); + configJob = ConfigurationUtils.getConnectorJob(conf); + break; + case IMPORT: + subContext = new PrefixContext(conf, ""); + configConnection = ConfigurationUtils.getFrameworkConnection(conf); + configJob = ConfigurationUtils.getFrameworkJob(conf); + break; + default: + throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0023); + } try { - loader.run(frameworkContext, reader); + loader.load(subContext, configConnection, configJob, reader); } catch (Throwable t) { LOG.error("Error while loading data out of MR job.", t); throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0018, t); diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java index 585fac7c..9edf0ba0 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java @@ -233,7 +233,7 @@ private String padZeros(int number, int digits) { public static class DummyLoader extends Loader { @Override - public void run(ImmutableContext context, DataReader reader) + public void load(ImmutableContext context, Object oc, Object oj, DataReader reader) throws Exception { int index = 1; int sum = 0; diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java index c8caecd4..85900650 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java @@ -216,7 +216,7 @@ public static class DummyLoader extends Loader { private Data actual = new Data(); @Override - public void run(ImmutableContext context, DataReader reader) throws Exception{ + public void load(ImmutableContext context, Object oc, Object oj, DataReader reader) throws Exception{ Object[] array; while ((array = reader.readArrayRecord()) != null) { actual.setContent(array, Data.ARRAY_RECORD); diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java b/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java index 046b939b..3148e496 100644 --- a/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java +++ b/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java @@ -25,6 +25,18 @@ */ public abstract class Loader { - public abstract void run(ImmutableContext context, DataReader reader) throws Exception; + /** + * Load data to target. + * + * @param context Context object + * @param connectionConfiguration Connection configuration + * @param jobConfiguration Job configuration + * @param reader Data reader object + * @throws Exception + */ + public abstract void load(ImmutableContext context, + Object connectionConfiguration, + Object jobConfiguration, + DataReader reader) throws Exception; }