From cb8214806b4e47dc3ad30d5bff0a42b04a412a06 Mon Sep 17 00:00:00 2001 From: Jarek Jarcec Cecho Date: Tue, 14 Oct 2014 17:03:12 -0400 Subject: [PATCH] SQOOP-1585: Sqoop2: Prefix mapreduce classes with MR ( no functionality change) (Veena Basavaraj via Jarek Jarcec Cecho) --- .../sqoop/job/etl/ExtractorContext.java | 1 - .../mapreduce/MapreduceExecutionEngine.java | 16 ++++---- ...cutionError.java => MRExecutionError.java} | 4 +- ...{JobConstants.java => MRJobConstants.java} | 4 +- .../java/org/apache/sqoop/job/io/Data.java | 26 ++++++------- ...onUtils.java => MRConfigurationUtils.java} | 30 +++++++-------- .../sqoop/job/mr/SqoopDestroyerExecutor.java | 16 ++++---- .../sqoop/job/mr/SqoopFileOutputFormat.java | 8 ++-- .../apache/sqoop/job/mr/SqoopInputFormat.java | 18 ++++----- .../org/apache/sqoop/job/mr/SqoopMapper.java | 26 ++++++------- .../sqoop/job/mr/SqoopNullOutputFormat.java | 2 +- .../job/mr/SqoopOutputFormatLoadExecutor.java | 28 +++++++------- ...nnable.java => SqoopProgressRunnable.java} | 8 ++-- .../org/apache/sqoop/job/mr/SqoopReducer.java | 4 +- .../org/apache/sqoop/job/mr/SqoopSplit.java | 6 +-- .../org/apache/sqoop/job/TestMapReduce.java | 32 ++++++++-------- .../org/apache/sqoop/job/TestMatching.java | 12 +++--- .../sqoop/job/io/SqoopWritableTest.java | 2 +- ...ils.java => TestMRConfigurationUtils.java} | 38 +++++++++---------- .../mr/TestSqoopOutputFormatLoadExecutor.java | 10 ++--- .../mapreduce/MapreduceSubmissionEngine.java | 22 +++++------ 21 files changed, 156 insertions(+), 157 deletions(-) rename execution/mapreduce/src/main/java/org/apache/sqoop/job/{MapreduceExecutionError.java => MRExecutionError.java} (96%) rename execution/mapreduce/src/main/java/org/apache/sqoop/job/{JobConstants.java => MRJobConstants.java} (97%) rename execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/{ConfigurationUtils.java => MRConfigurationUtils.java} (88%) rename execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/{ProgressRunnable.java => SqoopProgressRunnable.java} (84%) rename execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/{TestConfigurationUtils.java => TestMRConfigurationUtils.java} (70%) diff --git a/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java b/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java index 3272b56d..4875ed0b 100644 --- a/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java +++ b/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java @@ -19,7 +19,6 @@ import org.apache.sqoop.common.ImmutableContext; import org.apache.sqoop.etl.io.DataWriter; -import org.apache.sqoop.schema.Schema; /** * Context implementation for Extractor. diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java index 47f84789..9b3eb441 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java @@ -21,7 +21,7 @@ import org.apache.sqoop.common.MutableMapContext; import org.apache.sqoop.driver.ExecutionEngine; import org.apache.sqoop.driver.JobRequest; -import org.apache.sqoop.job.JobConstants; +import org.apache.sqoop.job.MRJobConstants; import org.apache.sqoop.job.etl.From; import org.apache.sqoop.job.etl.To; import org.apache.sqoop.job.io.SqoopWritable; @@ -64,16 +64,16 @@ public void prepareJob(JobRequest jobRequest) { From from = (From) mrJobRequest.getFrom(); To to = (To) mrJobRequest.getTo(); MutableMapContext context = mrJobRequest.getDriverContext(); - context.setString(JobConstants.JOB_ETL_PARTITIONER, from.getPartitioner().getName()); - context.setString(JobConstants.JOB_ETL_EXTRACTOR, from.getExtractor().getName()); - context.setString(JobConstants.JOB_ETL_LOADER, to.getLoader().getName()); - context.setString(JobConstants.JOB_ETL_FROM_DESTROYER, from.getDestroyer().getName()); - context.setString(JobConstants.JOB_ETL_TO_DESTROYER, to.getDestroyer().getName()); - context.setString(JobConstants.INTERMEDIATE_DATA_FORMAT, + context.setString(MRJobConstants.JOB_ETL_PARTITIONER, from.getPartitioner().getName()); + context.setString(MRJobConstants.JOB_ETL_EXTRACTOR, from.getExtractor().getName()); + context.setString(MRJobConstants.JOB_ETL_LOADER, to.getLoader().getName()); + context.setString(MRJobConstants.JOB_ETL_FROM_DESTROYER, from.getDestroyer().getName()); + context.setString(MRJobConstants.JOB_ETL_TO_DESTROYER, to.getDestroyer().getName()); + context.setString(MRJobConstants.INTERMEDIATE_DATA_FORMAT, mrJobRequest.getIntermediateDataFormat().getName()); if(mrJobRequest.getExtractors() != null) { - context.setInteger(JobConstants.JOB_ETL_EXTRACTOR_NUM, mrJobRequest.getExtractors()); + context.setInteger(MRJobConstants.JOB_ETL_EXTRACTOR_NUM, mrJobRequest.getExtractors()); } } diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/MapreduceExecutionError.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRExecutionError.java similarity index 96% rename from execution/mapreduce/src/main/java/org/apache/sqoop/job/MapreduceExecutionError.java rename to execution/mapreduce/src/main/java/org/apache/sqoop/job/MRExecutionError.java index 1dc12d19..e70b7e23 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/MapreduceExecutionError.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRExecutionError.java @@ -22,7 +22,7 @@ /** * */ -public enum MapreduceExecutionError implements ErrorCode { +public enum MRExecutionError implements ErrorCode { MAPRED_EXEC_0000("Unknown error"), @@ -83,7 +83,7 @@ public enum MapreduceExecutionError implements ErrorCode { private final String message; - private MapreduceExecutionError(String message) { + private MRExecutionError(String message) { this.message = message; } diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRJobConstants.java similarity index 97% rename from execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java rename to execution/mapreduce/src/main/java/org/apache/sqoop/job/MRJobConstants.java index 349bb601..67021a8e 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRJobConstants.java @@ -20,7 +20,7 @@ import org.apache.sqoop.core.ConfigurationConstants; import org.apache.sqoop.driver.DriverConstants; -public final class JobConstants extends Constants { +public final class MRJobConstants extends Constants { /** * All job related configuration is prefixed with this: * org.apache.sqoop.job. @@ -75,7 +75,7 @@ public final class JobConstants extends Constants { public static final String INTERMEDIATE_DATA_FORMAT = DriverConstants.PREFIX_EXECUTION_CONFIG + "intermediate.format"; - private JobConstants() { + private MRJobConstants() { // Disable explicit object creation } } diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java index 5423b7b9..139883e7 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java @@ -29,7 +29,7 @@ import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.io.WritableUtils; import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.job.MapreduceExecutionError; +import org.apache.sqoop.job.MRExecutionError; public class Data implements WritableComparable { @@ -76,7 +76,7 @@ public void setContent(Object content, int type) { this.content = content; break; default: - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0012, String.valueOf(type)); + throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type)); } } @@ -87,7 +87,7 @@ public Object getContent(int targetType) { case ARRAY_RECORD: return parse(); default: - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0012, String.valueOf(targetType)); + throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(targetType)); } } @@ -141,7 +141,7 @@ public int hashCode() { } return result; default: - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0012, String.valueOf(type)); + throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type)); } } @@ -156,7 +156,7 @@ public void readFields(DataInput in) throws IOException { readArray(in); break; default: - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0012, String.valueOf(type)); + throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type)); } } @@ -171,7 +171,7 @@ public void write(DataOutput out) throws IOException { writeArray(out); break; default: - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0012, String.valueOf(type)); + throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type)); } } @@ -249,7 +249,7 @@ private void readArray(DataInput in) throws IOException { default: throw new IOException( - new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0012, Integer.toString(type)) + new SqoopException(MRExecutionError.MAPRED_EXEC_0012, Integer.toString(type)) ); } } @@ -307,7 +307,7 @@ private void writeArray(DataOutput out) throws IOException { } else { throw new IOException( - new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0012, + new SqoopException(MRExecutionError.MAPRED_EXEC_0012, array[i].getClass().getName() ) ); @@ -351,7 +351,7 @@ private String format() { return sb.toString(); default: - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0012, String.valueOf(type)); + throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type)); } } @@ -399,7 +399,7 @@ else if (position > 0 && record[position-1] != stringEscape) { return (Object[])content; default: - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0012, String.valueOf(type)); + throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type)); } } @@ -418,7 +418,7 @@ private int parseField(ArrayList list, char[] record, case FieldTypes.UTF: if (field.charAt(0) != stringDelimiter || field.charAt(field.length()-1) != stringDelimiter) { - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0022); + throw new SqoopException(MRExecutionError.MAPRED_EXEC_0022); } list.add(index, unescape(field.substring(1, field.length()-1))); break; @@ -426,7 +426,7 @@ private int parseField(ArrayList list, char[] record, case FieldTypes.BIN: if (field.charAt(0) != '[' || field.charAt(field.length()-1) != ']') { - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0022); + throw new SqoopException(MRExecutionError.MAPRED_EXEC_0022); } String[] splits = field.substring(1, field.length()-1).split(String.valueOf(',')); @@ -474,7 +474,7 @@ private int parseField(ArrayList list, char[] record, break; default: - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0012, String.valueOf(fieldType)); + throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(fieldType)); } return ++index; diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/MRConfigurationUtils.java similarity index 88% rename from execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java rename to execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/MRConfigurationUtils.java index 0fa07f7d..03a1dec0 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/MRConfigurationUtils.java @@ -23,7 +23,7 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.log4j.PropertyConfigurator; import org.apache.sqoop.common.Direction; -import org.apache.sqoop.job.JobConstants; +import org.apache.sqoop.job.MRJobConstants; import org.apache.sqoop.json.util.SchemaSerialization; import org.apache.sqoop.model.ConfigUtils; import org.apache.sqoop.schema.Schema; @@ -38,43 +38,43 @@ * Helper class to store and load various information in/from MapReduce configuration * object and JobConf object. */ -public final class ConfigurationUtils { +public final class MRConfigurationUtils { - private static final String MR_JOB_CONFIG_CLASS_FROM_CONNECTOR_LINK = JobConstants.PREFIX_JOB_CONFIG + "config.class.connector.from.link"; + private static final String MR_JOB_CONFIG_CLASS_FROM_CONNECTOR_LINK = MRJobConstants.PREFIX_JOB_CONFIG + "config.class.connector.from.link"; - private static final String MR_JOB_CONFIG_CLASS_TO_CONNECTOR_LINK = JobConstants.PREFIX_JOB_CONFIG + "config.class.connector.to.link"; + private static final String MR_JOB_CONFIG_CLASS_TO_CONNECTOR_LINK = MRJobConstants.PREFIX_JOB_CONFIG + "config.class.connector.to.link"; - private static final String MR_JOB_CONFIG_CLASS_FROM_CONNECTOR_JOB = JobConstants.PREFIX_JOB_CONFIG + "config.class.connector.from.job"; + private static final String MR_JOB_CONFIG_CLASS_FROM_CONNECTOR_JOB = MRJobConstants.PREFIX_JOB_CONFIG + "config.class.connector.from.job"; - private static final String MR_JOB_CONFIG_CLASS_TO_CONNECTOR_JOB = JobConstants.PREFIX_JOB_CONFIG + "config.class.connector.to.job"; + private static final String MR_JOB_CONFIG_CLASS_TO_CONNECTOR_JOB = MRJobConstants.PREFIX_JOB_CONFIG + "config.class.connector.to.job"; - private static final String MR_JOB_CONFIG_DRIVER_CONFIG_CLASS = JobConstants.PREFIX_JOB_CONFIG + "config.class.driver"; + private static final String MR_JOB_CONFIG_DRIVER_CONFIG_CLASS = MRJobConstants.PREFIX_JOB_CONFIG + "config.class.driver"; - private static final String MR_JOB_CONFIG_FROM_CONNECTOR_LINK = JobConstants.PREFIX_JOB_CONFIG + "config.connector.from.link"; + private static final String MR_JOB_CONFIG_FROM_CONNECTOR_LINK = MRJobConstants.PREFIX_JOB_CONFIG + "config.connector.from.link"; private static final Text MR_JOB_CONFIG_FROM_CONNECTOR_LINK_KEY = new Text(MR_JOB_CONFIG_FROM_CONNECTOR_LINK); - private static final String MR_JOB_CONFIG_TO_CONNECTOR_LINK = JobConstants.PREFIX_JOB_CONFIG + "config.connector.to.link"; + private static final String MR_JOB_CONFIG_TO_CONNECTOR_LINK = MRJobConstants.PREFIX_JOB_CONFIG + "config.connector.to.link"; private static final Text MR_JOB_CONFIG_TO_CONNECTOR_LINK_KEY = new Text(MR_JOB_CONFIG_TO_CONNECTOR_LINK); - private static final String MR_JOB_CONFIG_FROM_JOB_CONFIG = JobConstants.PREFIX_JOB_CONFIG + "config.connector.from.job"; + private static final String MR_JOB_CONFIG_FROM_JOB_CONFIG = MRJobConstants.PREFIX_JOB_CONFIG + "config.connector.from.job"; private static final Text MR_JOB_CONFIG_FROM_JOB_CONFIG_KEY = new Text(MR_JOB_CONFIG_FROM_JOB_CONFIG); - private static final String MR_JOB_CONFIG_TO_JOB_CONFIG = JobConstants.PREFIX_JOB_CONFIG + "config.connector.to.job"; + private static final String MR_JOB_CONFIG_TO_JOB_CONFIG = MRJobConstants.PREFIX_JOB_CONFIG + "config.connector.to.job"; private static final Text MR_JOB_CONFIG_TO_JOB_CONFIG_KEY = new Text(MR_JOB_CONFIG_TO_JOB_CONFIG); - private static final String MR_JOB_CONFIG_DRIVER_CONFIG = JobConstants.PREFIX_JOB_CONFIG + "config.driver"; + private static final String MR_JOB_CONFIG_DRIVER_CONFIG = MRJobConstants.PREFIX_JOB_CONFIG + "config.driver"; private static final Text MR_JOB_CONFIG_DRIVER_CONFIG_KEY = new Text(MR_JOB_CONFIG_DRIVER_CONFIG); - private static final String SCHEMA_FROM = JobConstants.PREFIX_JOB_CONFIG + "schema.connector.from"; + private static final String SCHEMA_FROM = MRJobConstants.PREFIX_JOB_CONFIG + "schema.connector.from"; private static final Text SCHEMA_FROM_KEY = new Text(SCHEMA_FROM); - private static final String SCHEMA_TO = JobConstants.PREFIX_JOB_CONFIG + "schema.connector.to"; + private static final String SCHEMA_TO = MRJobConstants.PREFIX_JOB_CONFIG + "schema.connector.to"; private static final Text SCHEMA_TO_KEY = new Text(SCHEMA_TO); @@ -259,7 +259,7 @@ private static Object loadConfiguration(JobConf configuration, String classPrope return object; } - private ConfigurationUtils() { + private MRConfigurationUtils() { // Instantiation is prohibited } diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java index 8d2a1da8..b3859261 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java @@ -20,7 +20,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.log4j.Logger; import org.apache.sqoop.common.Direction; -import org.apache.sqoop.job.JobConstants; +import org.apache.sqoop.job.MRJobConstants; import org.apache.sqoop.common.PrefixContext; import org.apache.sqoop.job.etl.Destroyer; import org.apache.sqoop.job.etl.DestroyerContext; @@ -47,13 +47,13 @@ public static void executeDestroyer(boolean success, Configuration configuration switch (direction) { default: case FROM: - destroyerPropertyName = JobConstants.JOB_ETL_FROM_DESTROYER; - prefixPropertyName = JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT; + destroyerPropertyName = MRJobConstants.JOB_ETL_FROM_DESTROYER; + prefixPropertyName = MRJobConstants.PREFIX_CONNECTOR_FROM_CONTEXT; break; case TO: - destroyerPropertyName = JobConstants.JOB_ETL_TO_DESTROYER; - prefixPropertyName = JobConstants.PREFIX_CONNECTOR_TO_CONTEXT; + destroyerPropertyName = MRJobConstants.JOB_ETL_TO_DESTROYER; + prefixPropertyName = MRJobConstants.PREFIX_CONNECTOR_TO_CONTEXT; break; } @@ -66,11 +66,11 @@ public static void executeDestroyer(boolean success, Configuration configuration // Objects that should be pass to the Destroyer execution PrefixContext subContext = new PrefixContext(configuration, prefixPropertyName); - Object configConnection = ConfigurationUtils.getConnectorConnectionConfig(direction, configuration); - Object configJob = ConfigurationUtils.getConnectorJobConfig(direction, configuration); + Object configConnection = MRConfigurationUtils.getConnectorConnectionConfig(direction, configuration); + Object configJob = MRConfigurationUtils.getConnectorJobConfig(direction, configuration); // Propagate connector schema in every case for now - Schema schema = ConfigurationUtils.getConnectorSchema(direction, configuration); + Schema schema = MRConfigurationUtils.getConnectorSchema(direction, configuration); DestroyerContext destroyerContext = new DestroyerContext(subContext, success, schema); diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java index ca77e160..f451044d 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java @@ -34,7 +34,7 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.log4j.Logger; import org.apache.sqoop.common.Direction; -import org.apache.sqoop.job.JobConstants; +import org.apache.sqoop.job.MRJobConstants; import org.apache.sqoop.job.io.SqoopWritable; /** @@ -56,13 +56,13 @@ public RecordWriter getRecordWriter( Path filepath = getDefaultWorkFile(context, ""); String filename = filepath.toString(); - conf.set(JobConstants.JOB_MR_OUTPUT_FILE, filename); + conf.set(MRJobConstants.JOB_MR_OUTPUT_FILE, filename); boolean isCompressed = getCompressOutput(context); if (isCompressed) { String codecname = - conf.get(JobConstants.HADOOP_COMPRESS_CODEC, DEFAULT_CODEC.getName()); - conf.set(JobConstants.JOB_MR_OUTPUT_CODEC, codecname); + conf.get(MRJobConstants.HADOOP_COMPRESS_CODEC, DEFAULT_CODEC.getName()); + conf.set(MRJobConstants.JOB_MR_OUTPUT_CODEC, codecname); } return new SqoopOutputFormatLoadExecutor(context).getRecordWriter(); diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java index 1c1133a8..d2cf5e4e 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java @@ -31,8 +31,8 @@ import org.apache.log4j.Logger; import org.apache.sqoop.common.Direction; import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.job.JobConstants; -import org.apache.sqoop.job.MapreduceExecutionError; +import org.apache.sqoop.job.MRJobConstants; +import org.apache.sqoop.job.MRExecutionError; import org.apache.sqoop.common.PrefixContext; import org.apache.sqoop.job.etl.Partition; import org.apache.sqoop.job.etl.Partitioner; @@ -59,15 +59,15 @@ public List getSplits(JobContext context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); - String partitionerName = conf.get(JobConstants.JOB_ETL_PARTITIONER); + String partitionerName = conf.get(MRJobConstants.JOB_ETL_PARTITIONER); Partitioner partitioner = (Partitioner) ClassUtils.instantiate(partitionerName); - PrefixContext connectorContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT); - Object connectorConnection = ConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, conf); - Object connectorJob = ConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf); - Schema schema = ConfigurationUtils.getConnectorSchema(Direction.FROM, conf); + PrefixContext connectorContext = new PrefixContext(conf, MRJobConstants.PREFIX_CONNECTOR_FROM_CONTEXT); + Object connectorConnection = MRConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, conf); + Object connectorJob = MRConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf); + Schema schema = MRConfigurationUtils.getConnectorSchema(Direction.FROM, conf); - long maxPartitions = conf.getLong(JobConstants.JOB_ETL_EXTRACTOR_NUM, 10); + long maxPartitions = conf.getLong(MRJobConstants.JOB_ETL_EXTRACTOR_NUM, 10); PartitionerContext partitionerContext = new PartitionerContext(connectorContext, maxPartitions, schema); List partitions = partitioner.getPartitions(partitionerContext, connectorConnection, connectorJob); @@ -80,7 +80,7 @@ public List getSplits(JobContext context) } if(splits.size() > maxPartitions) { - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0025, + throw new SqoopException(MRExecutionError.MAPRED_EXEC_0025, String.format("Got %d, max was %d", splits.size(), maxPartitions)); } diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java index 03d84d4b..d31aa209 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java @@ -31,8 +31,8 @@ import org.apache.sqoop.connector.idf.IntermediateDataFormat; import org.apache.sqoop.connector.matcher.Matcher; import org.apache.sqoop.connector.matcher.MatcherFactory; -import org.apache.sqoop.job.JobConstants; -import org.apache.sqoop.job.MapreduceExecutionError; +import org.apache.sqoop.job.MRJobConstants; +import org.apache.sqoop.job.MRExecutionError; import org.apache.sqoop.common.PrefixContext; import org.apache.sqoop.job.etl.Extractor; import org.apache.sqoop.job.etl.ExtractorContext; @@ -47,7 +47,7 @@ public class SqoopMapper extends Mapper { static { - ConfigurationUtils.configureLogging(); + MRConfigurationUtils.configureLogging(); } public static final Logger LOG = Logger.getLogger(SqoopMapper.class); @@ -63,14 +63,14 @@ public class SqoopMapper extends Mapper) ClassUtils .instantiate(intermediateDataFormatName); fromDataFormat.setSchema(matcher.getFromSchema()); @@ -79,16 +79,16 @@ public void run(Context context) throws IOException, InterruptedException { toDataFormat.setSchema(matcher.getToSchema()); // Objects that should be passed to the Executor execution - PrefixContext subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT); - Object fromConfig = ConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, conf); - Object fromJob = ConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf); + PrefixContext subContext = new PrefixContext(conf, MRJobConstants.PREFIX_CONNECTOR_FROM_CONTEXT); + Object fromConfig = MRConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, conf); + Object fromJob = MRConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf); SqoopSplit split = context.getCurrentKey(); ExtractorContext extractorContext = new ExtractorContext(subContext, new SqoopMapDataWriter(context)); try { LOG.info("Starting progress service"); - progressService.scheduleAtFixedRate(new ProgressRunnable(context), 0, 2, TimeUnit.MINUTES); + progressService.scheduleAtFixedRate(new SqoopProgressRunnable(context), 0, 2, TimeUnit.MINUTES); LOG.info("Running extractor class " + extractorName); extractor.extract(extractorContext, fromConfig, fromJob, split.getPartition()); @@ -96,7 +96,7 @@ public void run(Context context) throws IOException, InterruptedException { context.getCounter(SqoopCounters.ROWS_READ) .increment(extractor.getRowsRead()); } catch (Exception e) { - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0017, e); + throw new SqoopException(MRExecutionError.MAPRED_EXEC_0017, e); } finally { LOG.info("Stopping progress service"); progressService.shutdown(); @@ -145,7 +145,7 @@ private void writeContent() { writable.setString(toDataFormat.getTextData()); context.write(writable, NullWritable.get()); } catch (Exception e) { - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0013, e); + throw new SqoopException(MRExecutionError.MAPRED_EXEC_0013, e); } } } diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java index 594b5e90..1148c4a6 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java @@ -28,7 +28,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.log4j.Logger; import org.apache.sqoop.common.Direction; -import org.apache.sqoop.job.JobConstants; +import org.apache.sqoop.job.MRJobConstants; import org.apache.sqoop.job.io.SqoopWritable; import java.io.IOException; diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java index 1ebd3e43..91089814 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java @@ -36,8 +36,8 @@ import org.apache.sqoop.connector.idf.IntermediateDataFormat; import org.apache.sqoop.connector.matcher.Matcher; import org.apache.sqoop.connector.matcher.MatcherFactory; -import org.apache.sqoop.job.JobConstants; -import org.apache.sqoop.job.MapreduceExecutionError; +import org.apache.sqoop.job.MRJobConstants; +import org.apache.sqoop.job.MRExecutionError; import org.apache.sqoop.common.PrefixContext; import org.apache.sqoop.job.etl.Loader; import org.apache.sqoop.job.etl.LoaderContext; @@ -75,10 +75,10 @@ public SqoopOutputFormatLoadExecutor(JobContext jobctx) { context = jobctx; writer = new SqoopRecordWriter(); matcher = MatcherFactory.getMatcher( - ConfigurationUtils.getConnectorSchema(Direction.FROM, context.getConfiguration()), - ConfigurationUtils.getConnectorSchema(Direction.TO, context.getConfiguration())); + MRConfigurationUtils.getConnectorSchema(Direction.FROM, context.getConfiguration()), + MRConfigurationUtils.getConnectorSchema(Direction.TO, context.getConfiguration())); dataFormat = (IntermediateDataFormat) ClassUtils.instantiate(context - .getConfiguration().get(JobConstants.INTERMEDIATE_DATA_FORMAT)); + .getConfiguration().get(MRJobConstants.INTERMEDIATE_DATA_FORMAT)); dataFormat.setSchema(matcher.getToSchema()); } @@ -141,7 +141,7 @@ private void waitForConsumer() { //In the rare case, it was not a SqoopException Throwables.propagate(t); } catch (Exception ex) { - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0019, ex); + throw new SqoopException(MRExecutionError.MAPRED_EXEC_0019, ex); } } @@ -186,7 +186,7 @@ public Object readContent() throws InterruptedException { } catch (Throwable t) { readerFinished = true; LOG.error("Caught exception e while getting content ", t); - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0018, t); + throw new SqoopException(MRExecutionError.MAPRED_EXEC_0018, t); } finally { releaseSema(); } @@ -221,7 +221,7 @@ public void run() { Configuration conf = null; if (!isTest) { conf = context.getConfiguration(); - loaderName = conf.get(JobConstants.JOB_ETL_LOADER); + loaderName = conf.get(MRJobConstants.JOB_ETL_LOADER); } Loader loader = (Loader) ClassUtils.instantiate(loaderName); @@ -233,11 +233,11 @@ public void run() { if (!isTest) { // Using the TO schema since the IDF returns data in TO schema - schema = ConfigurationUtils.getConnectorSchema(Direction.TO, conf); + schema = MRConfigurationUtils.getConnectorSchema(Direction.TO, conf); - subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_TO_CONTEXT); - configConnection = ConfigurationUtils.getConnectorConnectionConfig(Direction.TO, conf); - configJob = ConfigurationUtils.getConnectorJobConfig(Direction.TO, conf); + subContext = new PrefixContext(conf, MRJobConstants.PREFIX_CONNECTOR_TO_CONTEXT); + configConnection = MRConfigurationUtils.getConnectorConnectionConfig(Direction.TO, conf); + configJob = MRConfigurationUtils.getConnectorJobConfig(Direction.TO, conf); } // Create loader context @@ -252,7 +252,7 @@ public void run() { // Release so that the writer can tell Sqoop something went // wrong. free.release(); - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0018, t); + throw new SqoopException(MRExecutionError.MAPRED_EXEC_0018, t); } // if no exception happens yet and reader finished before writer, @@ -264,7 +264,7 @@ public void run() { // Release so that the writer can tell Sqoop something went // wrong. free.release(); - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0019); + throw new SqoopException(MRExecutionError.MAPRED_EXEC_0019); } // inform writer that reader is finished diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ProgressRunnable.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopProgressRunnable.java similarity index 84% rename from execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ProgressRunnable.java rename to execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopProgressRunnable.java index 4c2e206c..cd4f8b9d 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ProgressRunnable.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopProgressRunnable.java @@ -24,17 +24,17 @@ /** * Runnable that will ping mapreduce context about progress. */ -public class ProgressRunnable implements Runnable { +public class SqoopProgressRunnable implements Runnable { - public static final Logger LOG = Logger.getLogger(ProgressRunnable.class); + public static final Logger LOG = Logger.getLogger(SqoopProgressRunnable.class); /** * Context class that we should use for reporting progress. */ private final TaskInputOutputContext context; - public ProgressRunnable(final TaskInputOutputContext ctxt) { - this.context = ctxt; + public SqoopProgressRunnable(final TaskInputOutputContext ctx) { + this.context = ctx; } @Override diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java index a55534a3..cf023c3b 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java @@ -33,7 +33,7 @@ public class SqoopReducer extends Reducer { static { - ConfigurationUtils.configureLogging(); + MRConfigurationUtils.configureLogging(); } public static final Logger LOG = Logger.getLogger(SqoopReducer.class); @@ -46,7 +46,7 @@ public class SqoopReducer extends Reducer clz = ClassUtils.loadClass(className); if (clz == null) { - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0009, className); + throw new SqoopException(MRExecutionError.MAPRED_EXEC_0009, className); } try { partition = (Partition) clz.newInstance(); } catch (Exception e) { - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0010, className, e); + throw new SqoopException(MRExecutionError.MAPRED_EXEC_0010, className, e); } // read Partition object content partition.readFields(in); diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java index e3b68e2a..6d0dcb4a 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java @@ -45,7 +45,7 @@ import org.apache.sqoop.job.etl.PartitionerContext; import org.apache.sqoop.job.io.Data; import org.apache.sqoop.job.io.SqoopWritable; -import org.apache.sqoop.job.mr.ConfigurationUtils; +import org.apache.sqoop.job.mr.MRConfigurationUtils; import org.apache.sqoop.job.mr.SqoopInputFormat; import org.apache.sqoop.job.mr.SqoopMapper; import org.apache.sqoop.job.mr.SqoopNullOutputFormat; @@ -68,8 +68,8 @@ public class TestMapReduce { @Test public void testInputFormat() throws Exception { Configuration conf = new Configuration(); - conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); - conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT, + conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); + conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName()); Job job = new Job(conf); @@ -87,17 +87,17 @@ public void testInputFormat() throws Exception { @Test public void testMapper() throws Exception { Configuration conf = new Configuration(); - conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); - conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); - conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT, + conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); + conf.set(MRJobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); + conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName()); Schema schema = new Schema("Test"); schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2")) .addColumn(new org.apache.sqoop.schema.type.Text("3")); Job job = new Job(conf); - ConfigurationUtils.setConnectorSchema(Direction.FROM, job, schema); - ConfigurationUtils.setConnectorSchema(Direction.TO, job, schema); + MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, schema); + MRConfigurationUtils.setConnectorSchema(Direction.TO, job, schema); boolean success = JobUtils.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class, DummyOutputFormat.class); Assert.assertEquals("Job failed!", true, success); @@ -106,20 +106,20 @@ public void testMapper() throws Exception { @Test public void testOutputFormat() throws Exception { Configuration conf = new Configuration(); - conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); - conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); - conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName()); - conf.set(JobConstants.JOB_ETL_FROM_DESTROYER, DummyFromDestroyer.class.getName()); - conf.set(JobConstants.JOB_ETL_TO_DESTROYER, DummyToDestroyer.class.getName()); - conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT, + conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); + conf.set(MRJobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); + conf.set(MRJobConstants.JOB_ETL_LOADER, DummyLoader.class.getName()); + conf.set(MRJobConstants.JOB_ETL_FROM_DESTROYER, DummyFromDestroyer.class.getName()); + conf.set(MRJobConstants.JOB_ETL_TO_DESTROYER, DummyToDestroyer.class.getName()); + conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName()); Schema schema = new Schema("Test"); schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2")) .addColumn(new Text("3")); Job job = new Job(conf); - ConfigurationUtils.setConnectorSchema(Direction.FROM, job, schema); - ConfigurationUtils.setConnectorSchema(Direction.TO, job, schema); + MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, schema); + MRConfigurationUtils.setConnectorSchema(Direction.TO, job, schema); boolean success = JobUtils.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class, SqoopNullOutputFormat.class); diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java index 7f9a1473..665a65b6 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java @@ -42,7 +42,7 @@ import org.apache.sqoop.job.etl.PartitionerContext; import org.apache.sqoop.job.io.Data; import org.apache.sqoop.job.io.SqoopWritable; -import org.apache.sqoop.job.mr.ConfigurationUtils; +import org.apache.sqoop.job.mr.MRConfigurationUtils; import org.apache.sqoop.job.mr.SqoopInputFormat; import org.apache.sqoop.job.mr.SqoopMapper; import org.apache.sqoop.schema.Schema; @@ -123,14 +123,14 @@ public static Collection data() { @Test public void testSchemaMatching() throws Exception { Configuration conf = new Configuration(); - conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); - conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); - conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT, + conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); + conf.set(MRJobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); + conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName()); Job job = new Job(conf); - ConfigurationUtils.setConnectorSchema(Direction.FROM, job, from); - ConfigurationUtils.setConnectorSchema(Direction.TO, job, to); + MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, from); + MRConfigurationUtils.setConnectorSchema(Direction.TO, job, to); JobUtils.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class, DummyOutputFormat.class); boolean success = JobUtils.runJob(job.getConfiguration(), diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java index f5742a2b..68ce5ed0 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java @@ -31,7 +31,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; -import org.apache.sqoop.job.JobConstants; +import org.apache.sqoop.job.MRJobConstants; import org.junit.Assert; import org.junit.Test; diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestConfigurationUtils.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestMRConfigurationUtils.java similarity index 70% rename from execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestConfigurationUtils.java rename to execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestMRConfigurationUtils.java index 501e32c1..972b5555 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestConfigurationUtils.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestMRConfigurationUtils.java @@ -39,7 +39,7 @@ * to hadoop JobConf object. This implementation was chosen because it's not clear * how MapReduce is converting one object to another. */ -public class TestConfigurationUtils { +public class TestMRConfigurationUtils { Job job; JobConf jobConfSpy; @@ -61,49 +61,49 @@ public void setUpHadoopJobConf() throws Exception { @Test public void testLinkConfiguration() throws Exception { - ConfigurationUtils.setConnectorLinkConfig(Direction.FROM, job, getConfig()); + MRConfigurationUtils.setConnectorLinkConfig(Direction.FROM, job, getConfig()); setUpHadoopJobConf(); - assertEquals(getConfig(), ConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, jobConfSpy)); + assertEquals(getConfig(), MRConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, jobConfSpy)); - ConfigurationUtils.setConnectorLinkConfig(Direction.TO, job, getConfig()); + MRConfigurationUtils.setConnectorLinkConfig(Direction.TO, job, getConfig()); setUpHadoopJobConf(); - assertEquals(getConfig(), ConfigurationUtils.getConnectorConnectionConfig(Direction.TO, jobConfSpy)); + assertEquals(getConfig(), MRConfigurationUtils.getConnectorConnectionConfig(Direction.TO, jobConfSpy)); } @Test public void testJobConfiguration() throws Exception { - ConfigurationUtils.setConnectorJobConfig(Direction.FROM, job, getConfig()); + MRConfigurationUtils.setConnectorJobConfig(Direction.FROM, job, getConfig()); setUpHadoopJobConf(); - assertEquals(getConfig(), ConfigurationUtils.getConnectorJobConfig(Direction.FROM, jobConfSpy)); + assertEquals(getConfig(), MRConfigurationUtils.getConnectorJobConfig(Direction.FROM, jobConfSpy)); - ConfigurationUtils.setConnectorJobConfig(Direction.TO, job, getConfig()); + MRConfigurationUtils.setConnectorJobConfig(Direction.TO, job, getConfig()); setUpHadoopJobConf(); - assertEquals(getConfig(), ConfigurationUtils.getConnectorJobConfig(Direction.TO, jobConfSpy)); + assertEquals(getConfig(), MRConfigurationUtils.getConnectorJobConfig(Direction.TO, jobConfSpy)); } @Test public void testDriverConfiguration() throws Exception { - ConfigurationUtils.setDriverConfig(job, getConfig()); + MRConfigurationUtils.setDriverConfig(job, getConfig()); setUpHadoopJobConf(); - assertEquals(getConfig(), ConfigurationUtils.getDriverConfig(jobConfSpy)); + assertEquals(getConfig(), MRConfigurationUtils.getDriverConfig(jobConfSpy)); } @Test public void testConnectorSchema() throws Exception { - ConfigurationUtils.setConnectorSchema(Direction.FROM, job, getSchema("a")); - assertEquals(getSchema("a"), ConfigurationUtils.getConnectorSchema(Direction.FROM, jobConfSpy)); + MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, getSchema("a")); + assertEquals(getSchema("a"), MRConfigurationUtils.getConnectorSchema(Direction.FROM, jobConfSpy)); - ConfigurationUtils.setConnectorSchema(Direction.TO, job, getSchema("b")); - assertEquals(getSchema("b"), ConfigurationUtils.getConnectorSchema(Direction.TO, jobConfSpy)); + MRConfigurationUtils.setConnectorSchema(Direction.TO, job, getSchema("b")); + assertEquals(getSchema("b"), MRConfigurationUtils.getConnectorSchema(Direction.TO, jobConfSpy)); } @Test public void testConnectorSchemaNull() throws Exception { - ConfigurationUtils.setConnectorSchema(Direction.FROM, job, null); - assertNull(ConfigurationUtils.getConnectorSchema(Direction.FROM, jobConfSpy)); + MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, null); + assertNull(MRConfigurationUtils.getConnectorSchema(Direction.FROM, jobConfSpy)); - ConfigurationUtils.setConnectorSchema(Direction.TO, job, null); - assertNull(ConfigurationUtils.getConnectorSchema(Direction.FROM, jobConfSpy)); + MRConfigurationUtils.setConnectorSchema(Direction.TO, job, null); + assertNull(MRConfigurationUtils.getConnectorSchema(Direction.FROM, jobConfSpy)); } private Schema getSchema(String name) { diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java index 1f411d2a..5bd11f03 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java @@ -24,7 +24,7 @@ import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; import org.apache.sqoop.connector.idf.IntermediateDataFormat; -import org.apache.sqoop.job.JobConstants; +import org.apache.sqoop.job.MRJobConstants; import org.apache.sqoop.job.etl.Loader; import org.apache.sqoop.job.etl.LoaderContext; import org.apache.sqoop.job.io.SqoopWritable; @@ -120,13 +120,13 @@ public void load(LoaderContext context, Object cc, Object jc) throws Exception { @Before public void setUp() { conf = new Configuration(); - conf.setIfUnset(JobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName()); + conf.setIfUnset(MRJobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName()); } @Test(expected = BrokenBarrierException.class) public void testWhenLoaderThrows() throws Throwable { - conf.set(JobConstants.JOB_ETL_LOADER, ThrowingLoader.class.getName()); + conf.set(MRJobConstants.JOB_ETL_LOADER, ThrowingLoader.class.getName()); SqoopOutputFormatLoadExecutor executor = new SqoopOutputFormatLoadExecutor(true, ThrowingLoader.class.getName()); RecordWriter writer = executor.getRecordWriter(); @@ -145,7 +145,7 @@ public void testWhenLoaderThrows() throws Throwable { @Test public void testSuccessfulContinuousLoader() throws Throwable { - conf.set(JobConstants.JOB_ETL_LOADER, GoodContinuousLoader.class.getName()); + conf.set(MRJobConstants.JOB_ETL_LOADER, GoodContinuousLoader.class.getName()); SqoopOutputFormatLoadExecutor executor = new SqoopOutputFormatLoadExecutor(true, GoodContinuousLoader.class.getName()); RecordWriter writer = executor.getRecordWriter(); @@ -192,7 +192,7 @@ public void testSuccessfulLoader() throws Throwable { @Test(expected = ConcurrentModificationException.class) public void testThrowingContinuousLoader() throws Throwable { - conf.set(JobConstants.JOB_ETL_LOADER, ThrowingContinuousLoader.class.getName()); + conf.set(MRJobConstants.JOB_ETL_LOADER, ThrowingContinuousLoader.class.getName()); SqoopOutputFormatLoadExecutor executor = new SqoopOutputFormatLoadExecutor(true, ThrowingContinuousLoader.class.getName()); RecordWriter writer = executor.getRecordWriter(); diff --git a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java index 0c492ef8..646e8cbf 100644 --- a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java +++ b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java @@ -39,8 +39,8 @@ import org.apache.sqoop.execution.mapreduce.MRJobRequest; import org.apache.sqoop.execution.mapreduce.MapreduceExecutionEngine; import org.apache.sqoop.driver.JobRequest; -import org.apache.sqoop.job.JobConstants; -import org.apache.sqoop.job.mr.ConfigurationUtils; +import org.apache.sqoop.job.MRJobConstants; +import org.apache.sqoop.job.mr.MRConfigurationUtils; import org.apache.sqoop.submission.SubmissionStatus; import org.apache.sqoop.submission.counter.Counter; import org.apache.sqoop.submission.counter.CounterGroup; @@ -172,7 +172,7 @@ public boolean submit(JobRequest mrJobRequest) { continue; } configuration.set( - JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT + entry.getKey(), + MRJobConstants.PREFIX_CONNECTOR_FROM_CONTEXT + entry.getKey(), entry.getValue()); } @@ -182,7 +182,7 @@ public boolean submit(JobRequest mrJobRequest) { continue; } configuration.set( - JobConstants.PREFIX_CONNECTOR_TO_CONTEXT + entry.getKey(), + MRJobConstants.PREFIX_CONNECTOR_TO_CONTEXT + entry.getKey(), entry.getValue()); } @@ -202,17 +202,17 @@ public boolean submit(JobRequest mrJobRequest) { Job job = new Job(configuration); // link configs - ConfigurationUtils.setConnectorLinkConfig(Direction.FROM, job, request.getConnectorLinkConfig(Direction.FROM)); - ConfigurationUtils.setConnectorLinkConfig(Direction.TO, job, request.getConnectorLinkConfig(Direction.TO)); + MRConfigurationUtils.setConnectorLinkConfig(Direction.FROM, job, request.getConnectorLinkConfig(Direction.FROM)); + MRConfigurationUtils.setConnectorLinkConfig(Direction.TO, job, request.getConnectorLinkConfig(Direction.TO)); // from and to configs - ConfigurationUtils.setConnectorJobConfig(Direction.FROM, job, request.getJobConfig(Direction.FROM)); - ConfigurationUtils.setConnectorJobConfig(Direction.TO, job, request.getJobConfig(Direction.TO)); + MRConfigurationUtils.setConnectorJobConfig(Direction.FROM, job, request.getJobConfig(Direction.FROM)); + MRConfigurationUtils.setConnectorJobConfig(Direction.TO, job, request.getJobConfig(Direction.TO)); - ConfigurationUtils.setDriverConfig(job, request.getDriverConfig()); + MRConfigurationUtils.setDriverConfig(job, request.getDriverConfig()); // @TODO(Abe): Persist TO schema. - ConfigurationUtils.setConnectorSchema(Direction.FROM, job, request.getSummary().getFromSchema()); - ConfigurationUtils.setConnectorSchema(Direction.TO, job, request.getSummary().getToSchema()); + MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, request.getSummary().getFromSchema()); + MRConfigurationUtils.setConnectorSchema(Direction.TO, job, request.getSummary().getToSchema()); if(request.getJobName() != null) { job.setJobName("Sqoop: " + request.getJobName());