From 7fbd3ba94b970ca4c2053d3930dbf6d84989213e Mon Sep 17 00:00:00 2001 From: Jarek Jarcec Cecho Date: Fri, 19 Oct 2012 13:28:18 -0700 Subject: [PATCH] SQOOP-610: Job submission engine for import (Bilung Lee via Jarek Jarcec Cecho) --- .../org/apache/sqoop/job/JobConstants.java | 10 + .../java/org/apache/sqoop/job/JobEngine.java | 37 ++++ .../apache/sqoop/job/etl/EtlFramework.java | 148 +++++++++++++ .../org/apache/sqoop/job/etl/EtlOptions.java | 165 +++++++++++++++ .../org/apache/sqoop/job/mr/MrExecution.java | 153 ++++++++++++++ .../org/apache/sqoop/job/mr/SqoopReducer.java | 35 +++ .../org/apache/sqoop/job/TestJobEngine.java | 199 ++++++++++++++++++ 7 files changed, 747 insertions(+) create mode 100644 core/src/main/java/org/apache/sqoop/job/JobEngine.java create mode 100644 core/src/main/java/org/apache/sqoop/job/etl/EtlFramework.java create mode 100644 core/src/main/java/org/apache/sqoop/job/etl/EtlOptions.java create mode 100644 core/src/main/java/org/apache/sqoop/job/mr/MrExecution.java create mode 100644 core/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java create mode 100644 core/src/test/java/org/apache/sqoop/job/TestJobEngine.java diff --git a/core/src/main/java/org/apache/sqoop/job/JobConstants.java b/core/src/main/java/org/apache/sqoop/job/JobConstants.java index a032c724..2b0ec18b 100644 --- a/core/src/main/java/org/apache/sqoop/job/JobConstants.java +++ b/core/src/main/java/org/apache/sqoop/job/JobConstants.java @@ -21,6 +21,16 @@ public final class JobConstants extends Constants { + // Metadata constants + + public static final String INPUT_JOB_JOB_TYPE = "inp-job-job-type"; + public static final String INPUT_JOB_STORAGE_TYPE = "inp-job-storage-type"; + public static final String INPUT_JOB_FORMAT_TYPE = "inp-job-format-type"; + public static final String INPUT_JOB_OUTPUT_CODEC = "inp-job-output-codec"; + public static final String INPUT_JOB_MAX_EXTRACTORS = "inp-job-max-extractors"; + public static final String INPUT_JOB_MAX_LOADERS = "inp-job-max-loaders"; + + /** * All job related configuration is prefixed with this: * org.apache.sqoop.job. diff --git a/core/src/main/java/org/apache/sqoop/job/JobEngine.java b/core/src/main/java/org/apache/sqoop/job/JobEngine.java new file mode 100644 index 00000000..fa3e484a --- /dev/null +++ b/core/src/main/java/org/apache/sqoop/job/JobEngine.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.job; + +import org.apache.sqoop.job.etl.EtlFramework; +import org.apache.sqoop.job.etl.EtlOptions; +import org.apache.sqoop.job.mr.MrExecution; + +/** + * This class supports Sqoop job execution. + */ +public class JobEngine { + + public void run(EtlOptions options) { + EtlFramework etl = new EtlFramework(options); + MrExecution mr = new MrExecution(etl); + mr.initialize(); + mr.run(); + mr.destroy(); + } + +} diff --git a/core/src/main/java/org/apache/sqoop/job/etl/EtlFramework.java b/core/src/main/java/org/apache/sqoop/job/etl/EtlFramework.java new file mode 100644 index 00000000..ce7f9884 --- /dev/null +++ b/core/src/main/java/org/apache/sqoop/job/etl/EtlFramework.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.job.etl; + +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.core.CoreError; +import org.apache.sqoop.job.etl.Destroyer; +import org.apache.sqoop.job.etl.Exporter; +import org.apache.sqoop.job.etl.Extractor; +import org.apache.sqoop.job.etl.Importer; +import org.apache.sqoop.job.etl.Initializer; +import org.apache.sqoop.job.etl.Loader; +import org.apache.sqoop.job.etl.Partitioner; +import org.apache.sqoop.job.etl.EtlOptions.FormatType; +import org.apache.sqoop.job.etl.EtlOptions.JobType; +import org.apache.sqoop.job.etl.EtlOptions.StorageType; + +/** + * This class encapsulates the whole ETL framework. + * + * For import: + * Initializer (connector-defined) + * -> Partitioner (connector-defined) + * -> Extractor (connector-defined) + * -> Loader (framework-defined) + * -> Destroyer (connector-defined) + * + * For export: + * Initializer (connector-defined) + * -> Partitioner (framework-defined) + * -> Extractor (framework-defined) + * -> Loader (connector-defined) + * -> Destroyer (connector-defined) + */ +public class EtlFramework { + + private Class initializer; + private Class partitioner; + private Class extractor; + private Class loader; + private Class destroyer; + + private boolean requireFieldNames; + private boolean requireOutputDirectory; + + private EtlOptions options; + + public EtlFramework(EtlOptions inputs) { + this.options = inputs; + JobType jobType = options.getJobType(); + switch (jobType) { + case IMPORT: + constructImport(); + break; + case EXPORT: + constructExport(); + break; + default: + throw new SqoopException(CoreError.CORE_0012, jobType.toString()); + } + } + + public EtlOptions getOptions() { + return options; + } + + public Class getInitializer() { + return initializer; + } + + public Class getPartitioner() { + return partitioner; + } + + public Class getExtractor() { + return extractor; + } + + public Class getLoader() { + return loader; + } + + public Class getDestroyer() { + return destroyer; + } + + public boolean isFieldNamesRequired() { + return requireFieldNames; + } + + public boolean isOutputDirectoryRequired() { + return requireOutputDirectory; + } + + private void constructImport() { + Importer importer = options.getConnector().getImporter(); + initializer = importer.getInitializer(); + partitioner = importer.getPartitioner(); + extractor = importer.getExtractor(); + destroyer = importer.getDestroyer(); + + StorageType storageType = options.getStorageType(); + switch (storageType) { + case HDFS: + FormatType formatType = options.getFormatType(); + switch (formatType) { + case TEXT: + loader = HdfsTextImportLoader.class; + requireOutputDirectory = true; + break; + case SEQUENCE: + loader = HdfsSequenceImportLoader.class; + requireOutputDirectory = true; + break; + default: + throw new SqoopException(CoreError.CORE_0012, formatType.toString()); + } + break; + default: + throw new SqoopException(CoreError.CORE_0012, storageType.toString()); + } + } + + private void constructExport() { + Exporter exporter = options.getConnector().getExporter(); + initializer = exporter.getInitializer(); + loader = exporter.getLoader(); + destroyer = exporter.getDestroyer(); + + // FIXME: set partitioner/extractor based on storage/format types + } + +} diff --git a/core/src/main/java/org/apache/sqoop/job/etl/EtlOptions.java b/core/src/main/java/org/apache/sqoop/job/etl/EtlOptions.java new file mode 100644 index 00000000..e45c0ff4 --- /dev/null +++ b/core/src/main/java/org/apache/sqoop/job/etl/EtlOptions.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.job.etl; + +import java.util.HashMap; + +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.connector.spi.SqoopConnector; +import org.apache.sqoop.core.CoreError; +import org.apache.sqoop.job.JobConstants; + +/** + * This class retrieves information for job execution from user-input options. + */ +public class EtlOptions implements Options { + + HashMap store = new HashMap(); + + public EtlOptions(SqoopConnector connector) { + this.connector = connector; + } + + private SqoopConnector connector; + public SqoopConnector getConnector() { + return connector; + } + + private JobType jobType = null; + public enum JobType { + IMPORT, + EXPORT + } + public JobType getJobType() { + if (jobType != null) { + return jobType; + } + + String option = store.get(JobConstants.INPUT_JOB_JOB_TYPE); + if (option == null || option.equalsIgnoreCase("IMPORT")) { + jobType = JobType.IMPORT; + } else if (option.equalsIgnoreCase("EXPORT")) { + jobType = JobType.EXPORT; + } else { + throw new SqoopException(CoreError.CORE_0012, option); + } + return jobType; + } + + private StorageType storageType = null; + public enum StorageType { + HDFS + } + public StorageType getStorageType() { + if (storageType != null) { + return storageType; + } + + String option = store.get(JobConstants.INPUT_JOB_STORAGE_TYPE); + if (option == null || option.equalsIgnoreCase("HDFS")) { + storageType = StorageType.HDFS; + } else { + throw new SqoopException(CoreError.CORE_0012, option); + } + return storageType; + } + + private FormatType formatType = null; + public enum FormatType { + TEXT, + SEQUENCE + } + public FormatType getFormatType() { + if (formatType != null) { + return formatType; + } + + String option = store.get(JobConstants.INPUT_JOB_FORMAT_TYPE); + if (option == null || option.equalsIgnoreCase("TEXT")) { + formatType = FormatType.TEXT; + } else if (option.equalsIgnoreCase("SEQUENCE")) { + formatType = FormatType.SEQUENCE; + } else { + throw new SqoopException(CoreError.CORE_0012, option); + } + return formatType; + } + + public String getOutputCodec() { + return store.get(JobConstants.INPUT_JOB_OUTPUT_CODEC); + } + + private int maxExtractors = -1; + public int getMaxExtractors() { + if (maxExtractors != -1) { + return maxExtractors; + } + + String option = store.get(JobConstants.INPUT_JOB_MAX_EXTRACTORS); + if (option != null) { + maxExtractors = Integer.parseInt(option); + } else { + JobType type = getJobType(); + switch (type) { + case IMPORT: + maxExtractors = 4; + break; + case EXPORT: + maxExtractors = 1; + break; + default: + throw new SqoopException(CoreError.CORE_0012, type.toString()); + } + } + return maxExtractors; + } + + private int maxLoaders = -1; + public int getMaxLoaders() { + if (maxLoaders != -1) { + return maxLoaders; + } + + String option = store.get(JobConstants.INPUT_JOB_MAX_LOADERS); + if (option != null) { + maxLoaders = Integer.parseInt(option); + } else { + JobType type = getJobType(); + switch (type) { + case IMPORT: + maxLoaders = 1; + break; + case EXPORT: + maxLoaders = 4; + break; + default: + throw new SqoopException(CoreError.CORE_0012, type.toString()); + } + } + return maxLoaders; + } + + public void setOption(String key, String value) { + store.put(key, value); + } + + @Override + public String getOption(String key) { + return store.get(key); + } +} diff --git a/core/src/main/java/org/apache/sqoop/job/mr/MrExecution.java b/core/src/main/java/org/apache/sqoop/job/mr/MrExecution.java new file mode 100644 index 00000000..bd4c108d --- /dev/null +++ b/core/src/main/java/org/apache/sqoop/job/mr/MrExecution.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.job.mr; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.core.CoreError; +import org.apache.sqoop.job.JobConstants; +import org.apache.sqoop.job.etl.Destroyer; +import org.apache.sqoop.job.etl.EtlContext; +import org.apache.sqoop.job.etl.EtlFramework; +import org.apache.sqoop.job.etl.EtlMutableContext; +import org.apache.sqoop.job.etl.Initializer; +import org.apache.sqoop.job.etl.EtlOptions; +import org.apache.sqoop.job.etl.EtlOptions.JobType; +import org.apache.sqoop.job.io.Data; + +/** + * This class encapsulates the whole MapReduce execution. + */ +public class MrExecution { + + private Configuration conf; + private EtlFramework etl; + + public MrExecution(EtlFramework etl) { + this.conf = new Configuration(); + this.etl = etl; + } + + public void initialize() { + EtlOptions options = etl.getOptions(); + + conf.setInt(JobConstants.JOB_ETL_NUMBER_PARTITIONS, + options.getMaxExtractors()); + + if (options.getOutputCodec() != null) { + conf.setBoolean(FileOutputFormat.COMPRESS, true); + conf.set(FileOutputFormat.COMPRESS_CODEC, options.getOutputCodec()); + } + + conf.set(JobConstants.JOB_ETL_PARTITIONER, etl.getPartitioner().getName()); + conf.set(JobConstants.JOB_ETL_EXTRACTOR, etl.getExtractor().getName()); + conf.set(JobConstants.JOB_ETL_LOADER, etl.getLoader().getName()); + + EtlMutableContext context = new EtlMutableContext(conf); + + Class initializer = etl.getInitializer(); + if (initializer != null) { + Initializer instance; + try { + instance = (Initializer) initializer.newInstance(); + } catch (Exception e) { + throw new SqoopException(CoreError.CORE_0010, initializer.getName(), e); + } + instance.run(context, options); + } + + JobType jobType = etl.getOptions().getJobType(); + switch (jobType) { + case IMPORT: + checkImportConfiguration(context); + break; + case EXPORT: + checkExportConfiguration(context); + break; + default: + throw new SqoopException(CoreError.CORE_0012, jobType.toString()); + } + } + + public void run() { + EtlOptions options = etl.getOptions(); + + try { + Job job = Job.getInstance(conf); + + job.setInputFormatClass(SqoopInputFormat.class); + job.setMapperClass(SqoopMapper.class); + job.setMapOutputKeyClass(Data.class); + job.setMapOutputValueClass(NullWritable.class); + if (options.getMaxLoaders() > 1) { + job.setReducerClass(SqoopReducer.class); + job.setNumReduceTasks(options.getMaxLoaders()); + } + job.setOutputFormatClass((etl.isOutputDirectoryRequired()) ? + SqoopFileOutputFormat.class : SqoopNullOutputFormat.class); + job.setOutputKeyClass(Data.class); + job.setOutputValueClass(NullWritable.class); + + boolean success = job.waitForCompletion(true); + if (!success) { + throw new SqoopException(CoreError.CORE_0008); + } + + } catch (Exception e) { + throw new SqoopException(CoreError.CORE_0008, e); + } + } + + public void destroy() { + Class destroyer = etl.getDestroyer(); + if (destroyer != null) { + Destroyer instance; + try { + instance = (Destroyer) destroyer.newInstance(); + } catch (Exception e) { + throw new SqoopException(CoreError.CORE_0010, destroyer.getName(), e); + } + instance.run(new EtlContext(conf)); + } + } + + private void checkImportConfiguration(EtlMutableContext context) { + if (etl.isFieldNamesRequired() && + context.getString(JobConstants.JOB_ETL_FIELD_NAMES) == null) { + throw new SqoopException(CoreError.CORE_0020, "field names"); + } + + if (etl.isOutputDirectoryRequired()) { + String outputDirectory = + context.getString(JobConstants.JOB_ETL_OUTPUT_DIRECTORY); + if (outputDirectory == null) { + throw new SqoopException(CoreError.CORE_0020, "output directory"); + } else { + context.setString(FileOutputFormat.OUTDIR, outputDirectory); + } + } + } + + private void checkExportConfiguration(EtlMutableContext context) { + // TODO: check export related configuration + } + +} diff --git a/core/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java b/core/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java new file mode 100644 index 00000000..d2361482 --- /dev/null +++ b/core/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.job.mr; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.sqoop.job.io.Data; + +/** + * A reducer to perform reduce function. + */ +public class SqoopReducer + extends Reducer { + + public static final Log LOG = + LogFactory.getLog(SqoopReducer.class.getName()); + +} diff --git a/core/src/test/java/org/apache/sqoop/job/TestJobEngine.java b/core/src/test/java/org/apache/sqoop/job/TestJobEngine.java new file mode 100644 index 00000000..e653c22e --- /dev/null +++ b/core/src/test/java/org/apache/sqoop/job/TestJobEngine.java @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.job; + +import java.io.BufferedReader; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.LinkedList; +import java.util.List; +import java.util.Locale; +import java.util.ResourceBundle; + +import junit.framework.TestCase; + +import org.apache.sqoop.connector.spi.SqoopConnector; +import org.apache.sqoop.job.JobEngine; +import org.apache.sqoop.job.etl.Context; +import org.apache.sqoop.job.etl.EtlOptions; +import org.apache.sqoop.job.etl.Exporter; +import org.apache.sqoop.job.etl.Extractor; +import org.apache.sqoop.job.etl.Importer; +import org.apache.sqoop.job.etl.Initializer; +import org.apache.sqoop.job.etl.MutableContext; +import org.apache.sqoop.job.etl.Options; +import org.apache.sqoop.job.etl.Partition; +import org.apache.sqoop.job.etl.Partitioner; +import org.apache.sqoop.job.io.Data; +import org.apache.sqoop.job.io.DataWriter; +import org.apache.sqoop.model.MConnectionForms; +import org.apache.sqoop.model.MJob.Type; +import org.apache.sqoop.model.MJobForms; +import org.apache.sqoop.validation.Validator; +import org.junit.Test; + +public class TestJobEngine extends TestCase { + + private static final String DATA_DIR = TestJobEngine.class.getSimpleName(); + private static final String WAREHOUSE_ROOT = "/tmp/sqoop/warehouse/"; + + private static final String OUTPUT_DIR = WAREHOUSE_ROOT + DATA_DIR; + private static final String OUTPUT_FILE = "part-r-00000"; + private static final int START_PARTITION = 1; + private static final int NUMBER_OF_PARTITIONS = 9; + private static final int NUMBER_OF_ROWS_PER_PARTITION = 10; + + @Test + public void testImport() throws Exception { + FileUtils.delete(OUTPUT_DIR); + + DummyConnector connector = new DummyConnector(); + EtlOptions options = new EtlOptions(connector); + + JobEngine engine = new JobEngine(); + engine.run(options); + + String fileName = OUTPUT_DIR + "/" + OUTPUT_FILE; + InputStream filestream = FileUtils.open(fileName); + BufferedReader filereader = new BufferedReader(new InputStreamReader( + filestream, Data.CHARSET_NAME)); + verifyOutput(filereader); + } + + private void verifyOutput(BufferedReader reader) + throws IOException { + String line = null; + int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION; + Data expected = new Data(); + while ((line = reader.readLine()) != null){ + expected.setContent(new Object[] { + new Integer(index), + new Double(index), + String.valueOf(index) }, + Data.ARRAY_RECORD); + index++; + + assertEquals(expected.toString(), line); + } + reader.close(); + + assertEquals(NUMBER_OF_PARTITIONS*NUMBER_OF_ROWS_PER_PARTITION, + index-START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION); + } + + public class DummyConnector implements SqoopConnector { + + @Override + public Importer getImporter() { + return new Importer( + DummyImportInitializer.class, + DummyImportPartitioner.class, + DummyImportExtractor.class, + null); + } + + @Override + public Exporter getExporter() { + fail("This method should not be invoked."); + return null; + } + + @Override + public ResourceBundle getBundle(Locale locale) { + fail("This method should not be invoked."); + return null; + } + + @Override + public Validator getValidator() { + fail("This method should not be invoked."); + return null; + } + + @Override + public Class getConnectionConfigurationClass() { + fail("This method should not be invoked."); + return null; + } + + @Override + public Class getJobConfigurationClass(Type jobType) { + fail("This method should not be invoked."); + return null; + } + } + + public static class DummyImportInitializer extends Initializer { + @Override + public void run(MutableContext context, Options options) { + context.setString(Constants.JOB_ETL_OUTPUT_DIRECTORY, OUTPUT_DIR); + } + } + + public static class DummyImportPartition extends Partition { + private int id; + + public void setId(int id) { + this.id = id; + } + + public int getId() { + return id; + } + + @Override + public void readFields(DataInput in) throws IOException { + id = in.readInt(); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(id); + } + } + + public static class DummyImportPartitioner extends Partitioner { + @Override + public List run(Context context) { + List partitions = new LinkedList(); + for (int id = START_PARTITION; id <= NUMBER_OF_PARTITIONS; id++) { + DummyImportPartition partition = new DummyImportPartition(); + partition.setId(id); + partitions.add(partition); + } + return partitions; + } + } + + public static class DummyImportExtractor extends Extractor { + @Override + public void run(Context context, Partition partition, DataWriter writer) { + int id = ((DummyImportPartition)partition).getId(); + for (int row = 0; row < NUMBER_OF_ROWS_PER_PARTITION; row++) { + writer.writeArrayRecord(new Object[] { + new Integer(id*NUMBER_OF_ROWS_PER_PARTITION+row), + new Double(id*NUMBER_OF_ROWS_PER_PARTITION+row), + String.valueOf(id*NUMBER_OF_ROWS_PER_PARTITION+row)}); + } + } + } + +}