5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-04 20:30:06 +08:00

SQOOP-1585: Sqoop2: Prefix mapreduce classes with MR ( no functionality change)

(Veena Basavaraj via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2014-10-14 17:03:12 -04:00
parent 68577fbf71
commit cb8214806b
21 changed files with 156 additions and 157 deletions

View File

@ -19,7 +19,6 @@
import org.apache.sqoop.common.ImmutableContext;
import org.apache.sqoop.etl.io.DataWriter;
import org.apache.sqoop.schema.Schema;
/**
* Context implementation for Extractor.

View File

@ -21,7 +21,7 @@
import org.apache.sqoop.common.MutableMapContext;
import org.apache.sqoop.driver.ExecutionEngine;
import org.apache.sqoop.driver.JobRequest;
import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.MRJobConstants;
import org.apache.sqoop.job.etl.From;
import org.apache.sqoop.job.etl.To;
import org.apache.sqoop.job.io.SqoopWritable;
@ -64,16 +64,16 @@ public void prepareJob(JobRequest jobRequest) {
From from = (From) mrJobRequest.getFrom();
To to = (To) mrJobRequest.getTo();
MutableMapContext context = mrJobRequest.getDriverContext();
context.setString(JobConstants.JOB_ETL_PARTITIONER, from.getPartitioner().getName());
context.setString(JobConstants.JOB_ETL_EXTRACTOR, from.getExtractor().getName());
context.setString(JobConstants.JOB_ETL_LOADER, to.getLoader().getName());
context.setString(JobConstants.JOB_ETL_FROM_DESTROYER, from.getDestroyer().getName());
context.setString(JobConstants.JOB_ETL_TO_DESTROYER, to.getDestroyer().getName());
context.setString(JobConstants.INTERMEDIATE_DATA_FORMAT,
context.setString(MRJobConstants.JOB_ETL_PARTITIONER, from.getPartitioner().getName());
context.setString(MRJobConstants.JOB_ETL_EXTRACTOR, from.getExtractor().getName());
context.setString(MRJobConstants.JOB_ETL_LOADER, to.getLoader().getName());
context.setString(MRJobConstants.JOB_ETL_FROM_DESTROYER, from.getDestroyer().getName());
context.setString(MRJobConstants.JOB_ETL_TO_DESTROYER, to.getDestroyer().getName());
context.setString(MRJobConstants.INTERMEDIATE_DATA_FORMAT,
mrJobRequest.getIntermediateDataFormat().getName());
if(mrJobRequest.getExtractors() != null) {
context.setInteger(JobConstants.JOB_ETL_EXTRACTOR_NUM, mrJobRequest.getExtractors());
context.setInteger(MRJobConstants.JOB_ETL_EXTRACTOR_NUM, mrJobRequest.getExtractors());
}
}

View File

@ -22,7 +22,7 @@
/**
*
*/
public enum MapreduceExecutionError implements ErrorCode {
public enum MRExecutionError implements ErrorCode {
MAPRED_EXEC_0000("Unknown error"),
@ -83,7 +83,7 @@ public enum MapreduceExecutionError implements ErrorCode {
private final String message;
private MapreduceExecutionError(String message) {
private MRExecutionError(String message) {
this.message = message;
}

View File

@ -20,7 +20,7 @@
import org.apache.sqoop.core.ConfigurationConstants;
import org.apache.sqoop.driver.DriverConstants;
public final class JobConstants extends Constants {
public final class MRJobConstants extends Constants {
/**
* All job related configuration is prefixed with this:
* <tt>org.apache.sqoop.job.</tt>
@ -75,7 +75,7 @@ public final class JobConstants extends Constants {
public static final String INTERMEDIATE_DATA_FORMAT =
DriverConstants.PREFIX_EXECUTION_CONFIG + "intermediate.format";
private JobConstants() {
private MRJobConstants() {
// Disable explicit object creation
}
}

View File

@ -29,7 +29,7 @@
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableUtils;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.job.MapreduceExecutionError;
import org.apache.sqoop.job.MRExecutionError;
public class Data implements WritableComparable<Data> {
@ -76,7 +76,7 @@ public void setContent(Object content, int type) {
this.content = content;
break;
default:
throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0012, String.valueOf(type));
throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type));
}
}
@ -87,7 +87,7 @@ public Object getContent(int targetType) {
case ARRAY_RECORD:
return parse();
default:
throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0012, String.valueOf(targetType));
throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(targetType));
}
}
@ -141,7 +141,7 @@ public int hashCode() {
}
return result;
default:
throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0012, String.valueOf(type));
throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type));
}
}
@ -156,7 +156,7 @@ public void readFields(DataInput in) throws IOException {
readArray(in);
break;
default:
throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0012, String.valueOf(type));
throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type));
}
}
@ -171,7 +171,7 @@ public void write(DataOutput out) throws IOException {
writeArray(out);
break;
default:
throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0012, String.valueOf(type));
throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type));
}
}
@ -249,7 +249,7 @@ private void readArray(DataInput in) throws IOException {
default:
throw new IOException(
new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0012, Integer.toString(type))
new SqoopException(MRExecutionError.MAPRED_EXEC_0012, Integer.toString(type))
);
}
}
@ -307,7 +307,7 @@ private void writeArray(DataOutput out) throws IOException {
} else {
throw new IOException(
new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0012,
new SqoopException(MRExecutionError.MAPRED_EXEC_0012,
array[i].getClass().getName()
)
);
@ -351,7 +351,7 @@ private String format() {
return sb.toString();
default:
throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0012, String.valueOf(type));
throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type));
}
}
@ -399,7 +399,7 @@ else if (position > 0 && record[position-1] != stringEscape) {
return (Object[])content;
default:
throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0012, String.valueOf(type));
throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type));
}
}
@ -418,7 +418,7 @@ private int parseField(ArrayList<Object> list, char[] record,
case FieldTypes.UTF:
if (field.charAt(0) != stringDelimiter ||
field.charAt(field.length()-1) != stringDelimiter) {
throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0022);
throw new SqoopException(MRExecutionError.MAPRED_EXEC_0022);
}
list.add(index, unescape(field.substring(1, field.length()-1)));
break;
@ -426,7 +426,7 @@ private int parseField(ArrayList<Object> list, char[] record,
case FieldTypes.BIN:
if (field.charAt(0) != '[' ||
field.charAt(field.length()-1) != ']') {
throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0022);
throw new SqoopException(MRExecutionError.MAPRED_EXEC_0022);
}
String[] splits =
field.substring(1, field.length()-1).split(String.valueOf(','));
@ -474,7 +474,7 @@ private int parseField(ArrayList<Object> list, char[] record,
break;
default:
throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0012, String.valueOf(fieldType));
throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(fieldType));
}
return ++index;

View File

@ -23,7 +23,7 @@
import org.apache.hadoop.mapreduce.Job;
import org.apache.log4j.PropertyConfigurator;
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.MRJobConstants;
import org.apache.sqoop.json.util.SchemaSerialization;
import org.apache.sqoop.model.ConfigUtils;
import org.apache.sqoop.schema.Schema;
@ -38,43 +38,43 @@
* Helper class to store and load various information in/from MapReduce configuration
* object and JobConf object.
*/
public final class ConfigurationUtils {
public final class MRConfigurationUtils {
private static final String MR_JOB_CONFIG_CLASS_FROM_CONNECTOR_LINK = JobConstants.PREFIX_JOB_CONFIG + "config.class.connector.from.link";
private static final String MR_JOB_CONFIG_CLASS_FROM_CONNECTOR_LINK = MRJobConstants.PREFIX_JOB_CONFIG + "config.class.connector.from.link";
private static final String MR_JOB_CONFIG_CLASS_TO_CONNECTOR_LINK = JobConstants.PREFIX_JOB_CONFIG + "config.class.connector.to.link";
private static final String MR_JOB_CONFIG_CLASS_TO_CONNECTOR_LINK = MRJobConstants.PREFIX_JOB_CONFIG + "config.class.connector.to.link";
private static final String MR_JOB_CONFIG_CLASS_FROM_CONNECTOR_JOB = JobConstants.PREFIX_JOB_CONFIG + "config.class.connector.from.job";
private static final String MR_JOB_CONFIG_CLASS_FROM_CONNECTOR_JOB = MRJobConstants.PREFIX_JOB_CONFIG + "config.class.connector.from.job";
private static final String MR_JOB_CONFIG_CLASS_TO_CONNECTOR_JOB = JobConstants.PREFIX_JOB_CONFIG + "config.class.connector.to.job";
private static final String MR_JOB_CONFIG_CLASS_TO_CONNECTOR_JOB = MRJobConstants.PREFIX_JOB_CONFIG + "config.class.connector.to.job";
private static final String MR_JOB_CONFIG_DRIVER_CONFIG_CLASS = JobConstants.PREFIX_JOB_CONFIG + "config.class.driver";
private static final String MR_JOB_CONFIG_DRIVER_CONFIG_CLASS = MRJobConstants.PREFIX_JOB_CONFIG + "config.class.driver";
private static final String MR_JOB_CONFIG_FROM_CONNECTOR_LINK = JobConstants.PREFIX_JOB_CONFIG + "config.connector.from.link";
private static final String MR_JOB_CONFIG_FROM_CONNECTOR_LINK = MRJobConstants.PREFIX_JOB_CONFIG + "config.connector.from.link";
private static final Text MR_JOB_CONFIG_FROM_CONNECTOR_LINK_KEY = new Text(MR_JOB_CONFIG_FROM_CONNECTOR_LINK);
private static final String MR_JOB_CONFIG_TO_CONNECTOR_LINK = JobConstants.PREFIX_JOB_CONFIG + "config.connector.to.link";
private static final String MR_JOB_CONFIG_TO_CONNECTOR_LINK = MRJobConstants.PREFIX_JOB_CONFIG + "config.connector.to.link";
private static final Text MR_JOB_CONFIG_TO_CONNECTOR_LINK_KEY = new Text(MR_JOB_CONFIG_TO_CONNECTOR_LINK);
private static final String MR_JOB_CONFIG_FROM_JOB_CONFIG = JobConstants.PREFIX_JOB_CONFIG + "config.connector.from.job";
private static final String MR_JOB_CONFIG_FROM_JOB_CONFIG = MRJobConstants.PREFIX_JOB_CONFIG + "config.connector.from.job";
private static final Text MR_JOB_CONFIG_FROM_JOB_CONFIG_KEY = new Text(MR_JOB_CONFIG_FROM_JOB_CONFIG);
private static final String MR_JOB_CONFIG_TO_JOB_CONFIG = JobConstants.PREFIX_JOB_CONFIG + "config.connector.to.job";
private static final String MR_JOB_CONFIG_TO_JOB_CONFIG = MRJobConstants.PREFIX_JOB_CONFIG + "config.connector.to.job";
private static final Text MR_JOB_CONFIG_TO_JOB_CONFIG_KEY = new Text(MR_JOB_CONFIG_TO_JOB_CONFIG);
private static final String MR_JOB_CONFIG_DRIVER_CONFIG = JobConstants.PREFIX_JOB_CONFIG + "config.driver";
private static final String MR_JOB_CONFIG_DRIVER_CONFIG = MRJobConstants.PREFIX_JOB_CONFIG + "config.driver";
private static final Text MR_JOB_CONFIG_DRIVER_CONFIG_KEY = new Text(MR_JOB_CONFIG_DRIVER_CONFIG);
private static final String SCHEMA_FROM = JobConstants.PREFIX_JOB_CONFIG + "schema.connector.from";
private static final String SCHEMA_FROM = MRJobConstants.PREFIX_JOB_CONFIG + "schema.connector.from";
private static final Text SCHEMA_FROM_KEY = new Text(SCHEMA_FROM);
private static final String SCHEMA_TO = JobConstants.PREFIX_JOB_CONFIG + "schema.connector.to";
private static final String SCHEMA_TO = MRJobConstants.PREFIX_JOB_CONFIG + "schema.connector.to";
private static final Text SCHEMA_TO_KEY = new Text(SCHEMA_TO);
@ -259,7 +259,7 @@ private static Object loadConfiguration(JobConf configuration, String classPrope
return object;
}
private ConfigurationUtils() {
private MRConfigurationUtils() {
// Instantiation is prohibited
}

View File

@ -20,7 +20,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.MRJobConstants;
import org.apache.sqoop.common.PrefixContext;
import org.apache.sqoop.job.etl.Destroyer;
import org.apache.sqoop.job.etl.DestroyerContext;
@ -47,13 +47,13 @@ public static void executeDestroyer(boolean success, Configuration configuration
switch (direction) {
default:
case FROM:
destroyerPropertyName = JobConstants.JOB_ETL_FROM_DESTROYER;
prefixPropertyName = JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT;
destroyerPropertyName = MRJobConstants.JOB_ETL_FROM_DESTROYER;
prefixPropertyName = MRJobConstants.PREFIX_CONNECTOR_FROM_CONTEXT;
break;
case TO:
destroyerPropertyName = JobConstants.JOB_ETL_TO_DESTROYER;
prefixPropertyName = JobConstants.PREFIX_CONNECTOR_TO_CONTEXT;
destroyerPropertyName = MRJobConstants.JOB_ETL_TO_DESTROYER;
prefixPropertyName = MRJobConstants.PREFIX_CONNECTOR_TO_CONTEXT;
break;
}
@ -66,11 +66,11 @@ public static void executeDestroyer(boolean success, Configuration configuration
// Objects that should be pass to the Destroyer execution
PrefixContext subContext = new PrefixContext(configuration, prefixPropertyName);
Object configConnection = ConfigurationUtils.getConnectorConnectionConfig(direction, configuration);
Object configJob = ConfigurationUtils.getConnectorJobConfig(direction, configuration);
Object configConnection = MRConfigurationUtils.getConnectorConnectionConfig(direction, configuration);
Object configJob = MRConfigurationUtils.getConnectorJobConfig(direction, configuration);
// Propagate connector schema in every case for now
Schema schema = ConfigurationUtils.getConnectorSchema(direction, configuration);
Schema schema = MRConfigurationUtils.getConnectorSchema(direction, configuration);
DestroyerContext destroyerContext = new DestroyerContext(subContext, success, schema);

View File

@ -34,7 +34,7 @@
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.MRJobConstants;
import org.apache.sqoop.job.io.SqoopWritable;
/**
@ -56,13 +56,13 @@ public RecordWriter<SqoopWritable, NullWritable> getRecordWriter(
Path filepath = getDefaultWorkFile(context, "");
String filename = filepath.toString();
conf.set(JobConstants.JOB_MR_OUTPUT_FILE, filename);
conf.set(MRJobConstants.JOB_MR_OUTPUT_FILE, filename);
boolean isCompressed = getCompressOutput(context);
if (isCompressed) {
String codecname =
conf.get(JobConstants.HADOOP_COMPRESS_CODEC, DEFAULT_CODEC.getName());
conf.set(JobConstants.JOB_MR_OUTPUT_CODEC, codecname);
conf.get(MRJobConstants.HADOOP_COMPRESS_CODEC, DEFAULT_CODEC.getName());
conf.set(MRJobConstants.JOB_MR_OUTPUT_CODEC, codecname);
}
return new SqoopOutputFormatLoadExecutor(context).getRecordWriter();

View File

@ -31,8 +31,8 @@
import org.apache.log4j.Logger;
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.MapreduceExecutionError;
import org.apache.sqoop.job.MRJobConstants;
import org.apache.sqoop.job.MRExecutionError;
import org.apache.sqoop.common.PrefixContext;
import org.apache.sqoop.job.etl.Partition;
import org.apache.sqoop.job.etl.Partitioner;
@ -59,15 +59,15 @@ public List<InputSplit> getSplits(JobContext context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
String partitionerName = conf.get(JobConstants.JOB_ETL_PARTITIONER);
String partitionerName = conf.get(MRJobConstants.JOB_ETL_PARTITIONER);
Partitioner partitioner = (Partitioner) ClassUtils.instantiate(partitionerName);
PrefixContext connectorContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT);
Object connectorConnection = ConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, conf);
Object connectorJob = ConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf);
Schema schema = ConfigurationUtils.getConnectorSchema(Direction.FROM, conf);
PrefixContext connectorContext = new PrefixContext(conf, MRJobConstants.PREFIX_CONNECTOR_FROM_CONTEXT);
Object connectorConnection = MRConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, conf);
Object connectorJob = MRConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf);
Schema schema = MRConfigurationUtils.getConnectorSchema(Direction.FROM, conf);
long maxPartitions = conf.getLong(JobConstants.JOB_ETL_EXTRACTOR_NUM, 10);
long maxPartitions = conf.getLong(MRJobConstants.JOB_ETL_EXTRACTOR_NUM, 10);
PartitionerContext partitionerContext = new PartitionerContext(connectorContext, maxPartitions, schema);
List<Partition> partitions = partitioner.getPartitions(partitionerContext, connectorConnection, connectorJob);
@ -80,7 +80,7 @@ public List<InputSplit> getSplits(JobContext context)
}
if(splits.size() > maxPartitions) {
throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0025,
throw new SqoopException(MRExecutionError.MAPRED_EXEC_0025,
String.format("Got %d, max was %d", splits.size(), maxPartitions));
}

View File

@ -31,8 +31,8 @@
import org.apache.sqoop.connector.idf.IntermediateDataFormat;
import org.apache.sqoop.connector.matcher.Matcher;
import org.apache.sqoop.connector.matcher.MatcherFactory;
import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.MapreduceExecutionError;
import org.apache.sqoop.job.MRJobConstants;
import org.apache.sqoop.job.MRExecutionError;
import org.apache.sqoop.common.PrefixContext;
import org.apache.sqoop.job.etl.Extractor;
import org.apache.sqoop.job.etl.ExtractorContext;
@ -47,7 +47,7 @@
public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable, NullWritable> {
static {
ConfigurationUtils.configureLogging();
MRConfigurationUtils.configureLogging();
}
public static final Logger LOG = Logger.getLogger(SqoopMapper.class);
@ -63,14 +63,14 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable,
public void run(Context context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
String extractorName = conf.get(JobConstants.JOB_ETL_EXTRACTOR);
String extractorName = conf.get(MRJobConstants.JOB_ETL_EXTRACTOR);
Extractor extractor = (Extractor) ClassUtils.instantiate(extractorName);
matcher = MatcherFactory.getMatcher(
ConfigurationUtils.getConnectorSchema(Direction.FROM, conf),
ConfigurationUtils.getConnectorSchema(Direction.TO, conf));
MRConfigurationUtils.getConnectorSchema(Direction.FROM, conf),
MRConfigurationUtils.getConnectorSchema(Direction.TO, conf));
String intermediateDataFormatName = conf.get(JobConstants.INTERMEDIATE_DATA_FORMAT);
String intermediateDataFormatName = conf.get(MRJobConstants.INTERMEDIATE_DATA_FORMAT);
fromDataFormat = (IntermediateDataFormat<String>) ClassUtils
.instantiate(intermediateDataFormatName);
fromDataFormat.setSchema(matcher.getFromSchema());
@ -79,16 +79,16 @@ public void run(Context context) throws IOException, InterruptedException {
toDataFormat.setSchema(matcher.getToSchema());
// Objects that should be passed to the Executor execution
PrefixContext subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT);
Object fromConfig = ConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, conf);
Object fromJob = ConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf);
PrefixContext subContext = new PrefixContext(conf, MRJobConstants.PREFIX_CONNECTOR_FROM_CONTEXT);
Object fromConfig = MRConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, conf);
Object fromJob = MRConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf);
SqoopSplit split = context.getCurrentKey();
ExtractorContext extractorContext = new ExtractorContext(subContext, new SqoopMapDataWriter(context));
try {
LOG.info("Starting progress service");
progressService.scheduleAtFixedRate(new ProgressRunnable(context), 0, 2, TimeUnit.MINUTES);
progressService.scheduleAtFixedRate(new SqoopProgressRunnable(context), 0, 2, TimeUnit.MINUTES);
LOG.info("Running extractor class " + extractorName);
extractor.extract(extractorContext, fromConfig, fromJob, split.getPartition());
@ -96,7 +96,7 @@ public void run(Context context) throws IOException, InterruptedException {
context.getCounter(SqoopCounters.ROWS_READ)
.increment(extractor.getRowsRead());
} catch (Exception e) {
throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0017, e);
throw new SqoopException(MRExecutionError.MAPRED_EXEC_0017, e);
} finally {
LOG.info("Stopping progress service");
progressService.shutdown();
@ -145,7 +145,7 @@ private void writeContent() {
writable.setString(toDataFormat.getTextData());
context.write(writable, NullWritable.get());
} catch (Exception e) {
throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0013, e);
throw new SqoopException(MRExecutionError.MAPRED_EXEC_0013, e);
}
}
}

View File

@ -28,7 +28,7 @@
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.MRJobConstants;
import org.apache.sqoop.job.io.SqoopWritable;
import java.io.IOException;

View File

@ -36,8 +36,8 @@
import org.apache.sqoop.connector.idf.IntermediateDataFormat;
import org.apache.sqoop.connector.matcher.Matcher;
import org.apache.sqoop.connector.matcher.MatcherFactory;
import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.MapreduceExecutionError;
import org.apache.sqoop.job.MRJobConstants;
import org.apache.sqoop.job.MRExecutionError;
import org.apache.sqoop.common.PrefixContext;
import org.apache.sqoop.job.etl.Loader;
import org.apache.sqoop.job.etl.LoaderContext;
@ -75,10 +75,10 @@ public SqoopOutputFormatLoadExecutor(JobContext jobctx) {
context = jobctx;
writer = new SqoopRecordWriter();
matcher = MatcherFactory.getMatcher(
ConfigurationUtils.getConnectorSchema(Direction.FROM, context.getConfiguration()),
ConfigurationUtils.getConnectorSchema(Direction.TO, context.getConfiguration()));
MRConfigurationUtils.getConnectorSchema(Direction.FROM, context.getConfiguration()),
MRConfigurationUtils.getConnectorSchema(Direction.TO, context.getConfiguration()));
dataFormat = (IntermediateDataFormat<String>) ClassUtils.instantiate(context
.getConfiguration().get(JobConstants.INTERMEDIATE_DATA_FORMAT));
.getConfiguration().get(MRJobConstants.INTERMEDIATE_DATA_FORMAT));
dataFormat.setSchema(matcher.getToSchema());
}
@ -141,7 +141,7 @@ private void waitForConsumer() {
//In the rare case, it was not a SqoopException
Throwables.propagate(t);
} catch (Exception ex) {
throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0019, ex);
throw new SqoopException(MRExecutionError.MAPRED_EXEC_0019, ex);
}
}
@ -186,7 +186,7 @@ public Object readContent() throws InterruptedException {
} catch (Throwable t) {
readerFinished = true;
LOG.error("Caught exception e while getting content ", t);
throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0018, t);
throw new SqoopException(MRExecutionError.MAPRED_EXEC_0018, t);
} finally {
releaseSema();
}
@ -221,7 +221,7 @@ public void run() {
Configuration conf = null;
if (!isTest) {
conf = context.getConfiguration();
loaderName = conf.get(JobConstants.JOB_ETL_LOADER);
loaderName = conf.get(MRJobConstants.JOB_ETL_LOADER);
}
Loader loader = (Loader) ClassUtils.instantiate(loaderName);
@ -233,11 +233,11 @@ public void run() {
if (!isTest) {
// Using the TO schema since the IDF returns data in TO schema
schema = ConfigurationUtils.getConnectorSchema(Direction.TO, conf);
schema = MRConfigurationUtils.getConnectorSchema(Direction.TO, conf);
subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_TO_CONTEXT);
configConnection = ConfigurationUtils.getConnectorConnectionConfig(Direction.TO, conf);
configJob = ConfigurationUtils.getConnectorJobConfig(Direction.TO, conf);
subContext = new PrefixContext(conf, MRJobConstants.PREFIX_CONNECTOR_TO_CONTEXT);
configConnection = MRConfigurationUtils.getConnectorConnectionConfig(Direction.TO, conf);
configJob = MRConfigurationUtils.getConnectorJobConfig(Direction.TO, conf);
}
// Create loader context
@ -252,7 +252,7 @@ public void run() {
// Release so that the writer can tell Sqoop something went
// wrong.
free.release();
throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0018, t);
throw new SqoopException(MRExecutionError.MAPRED_EXEC_0018, t);
}
// if no exception happens yet and reader finished before writer,
@ -264,7 +264,7 @@ public void run() {
// Release so that the writer can tell Sqoop something went
// wrong.
free.release();
throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0019);
throw new SqoopException(MRExecutionError.MAPRED_EXEC_0019);
}
// inform writer that reader is finished

View File

@ -24,17 +24,17 @@
/**
* Runnable that will ping mapreduce context about progress.
*/
public class ProgressRunnable implements Runnable {
public class SqoopProgressRunnable implements Runnable {
public static final Logger LOG = Logger.getLogger(ProgressRunnable.class);
public static final Logger LOG = Logger.getLogger(SqoopProgressRunnable.class);
/**
* Context class that we should use for reporting progress.
*/
private final TaskInputOutputContext<?,?,?,?> context;
public ProgressRunnable(final TaskInputOutputContext<?,?,?,?> ctxt) {
this.context = ctxt;
public SqoopProgressRunnable(final TaskInputOutputContext<?,?,?,?> ctx) {
this.context = ctx;
}
@Override

View File

@ -33,7 +33,7 @@
public class SqoopReducer extends Reducer<SqoopWritable, NullWritable, SqoopWritable, NullWritable> {
static {
ConfigurationUtils.configureLogging();
MRConfigurationUtils.configureLogging();
}
public static final Logger LOG = Logger.getLogger(SqoopReducer.class);
@ -46,7 +46,7 @@ public class SqoopReducer extends Reducer<SqoopWritable, NullWritable, SqoopWrit
public void run(Context context) throws IOException, InterruptedException {
try {
LOG.info("Starting progress service");
progressService.scheduleAtFixedRate(new ProgressRunnable(context), 0, 2, TimeUnit.MINUTES);
progressService.scheduleAtFixedRate(new SqoopProgressRunnable(context), 0, 2, TimeUnit.MINUTES);
// Delegating all functionality to our parent
super.run(context);

View File

@ -24,7 +24,7 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.job.MapreduceExecutionError;
import org.apache.sqoop.job.MRExecutionError;
import org.apache.sqoop.job.etl.Partition;
import org.apache.sqoop.utils.ClassUtils;
@ -60,12 +60,12 @@ public void readFields(DataInput in) throws IOException {
// instantiate Partition object
Class<?> clz = ClassUtils.loadClass(className);
if (clz == null) {
throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0009, className);
throw new SqoopException(MRExecutionError.MAPRED_EXEC_0009, className);
}
try {
partition = (Partition) clz.newInstance();
} catch (Exception e) {
throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0010, className, e);
throw new SqoopException(MRExecutionError.MAPRED_EXEC_0010, className, e);
}
// read Partition object content
partition.readFields(in);

View File

@ -45,7 +45,7 @@
import org.apache.sqoop.job.etl.PartitionerContext;
import org.apache.sqoop.job.io.Data;
import org.apache.sqoop.job.io.SqoopWritable;
import org.apache.sqoop.job.mr.ConfigurationUtils;
import org.apache.sqoop.job.mr.MRConfigurationUtils;
import org.apache.sqoop.job.mr.SqoopInputFormat;
import org.apache.sqoop.job.mr.SqoopMapper;
import org.apache.sqoop.job.mr.SqoopNullOutputFormat;
@ -68,8 +68,8 @@ public class TestMapReduce {
@Test
public void testInputFormat() throws Exception {
Configuration conf = new Configuration();
conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT,
CSVIntermediateDataFormat.class.getName());
Job job = new Job(conf);
@ -87,17 +87,17 @@ public void testInputFormat() throws Exception {
@Test
public void testMapper() throws Exception {
Configuration conf = new Configuration();
conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
conf.set(MRJobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT,
CSVIntermediateDataFormat.class.getName());
Schema schema = new Schema("Test");
schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
.addColumn(new org.apache.sqoop.schema.type.Text("3"));
Job job = new Job(conf);
ConfigurationUtils.setConnectorSchema(Direction.FROM, job, schema);
ConfigurationUtils.setConnectorSchema(Direction.TO, job, schema);
MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, schema);
MRConfigurationUtils.setConnectorSchema(Direction.TO, job, schema);
boolean success = JobUtils.runJob(job.getConfiguration(),
SqoopInputFormat.class, SqoopMapper.class, DummyOutputFormat.class);
Assert.assertEquals("Job failed!", true, success);
@ -106,20 +106,20 @@ public void testMapper() throws Exception {
@Test
public void testOutputFormat() throws Exception {
Configuration conf = new Configuration();
conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
conf.set(JobConstants.JOB_ETL_FROM_DESTROYER, DummyFromDestroyer.class.getName());
conf.set(JobConstants.JOB_ETL_TO_DESTROYER, DummyToDestroyer.class.getName());
conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
conf.set(MRJobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
conf.set(MRJobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
conf.set(MRJobConstants.JOB_ETL_FROM_DESTROYER, DummyFromDestroyer.class.getName());
conf.set(MRJobConstants.JOB_ETL_TO_DESTROYER, DummyToDestroyer.class.getName());
conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT,
CSVIntermediateDataFormat.class.getName());
Schema schema = new Schema("Test");
schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
.addColumn(new Text("3"));
Job job = new Job(conf);
ConfigurationUtils.setConnectorSchema(Direction.FROM, job, schema);
ConfigurationUtils.setConnectorSchema(Direction.TO, job, schema);
MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, schema);
MRConfigurationUtils.setConnectorSchema(Direction.TO, job, schema);
boolean success = JobUtils.runJob(job.getConfiguration(),
SqoopInputFormat.class, SqoopMapper.class,
SqoopNullOutputFormat.class);

View File

@ -42,7 +42,7 @@
import org.apache.sqoop.job.etl.PartitionerContext;
import org.apache.sqoop.job.io.Data;
import org.apache.sqoop.job.io.SqoopWritable;
import org.apache.sqoop.job.mr.ConfigurationUtils;
import org.apache.sqoop.job.mr.MRConfigurationUtils;
import org.apache.sqoop.job.mr.SqoopInputFormat;
import org.apache.sqoop.job.mr.SqoopMapper;
import org.apache.sqoop.schema.Schema;
@ -123,14 +123,14 @@ public static Collection<Object[]> data() {
@Test
public void testSchemaMatching() throws Exception {
Configuration conf = new Configuration();
conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
conf.set(MRJobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT,
CSVIntermediateDataFormat.class.getName());
Job job = new Job(conf);
ConfigurationUtils.setConnectorSchema(Direction.FROM, job, from);
ConfigurationUtils.setConnectorSchema(Direction.TO, job, to);
MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, from);
MRConfigurationUtils.setConnectorSchema(Direction.TO, job, to);
JobUtils.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class,
DummyOutputFormat.class);
boolean success = JobUtils.runJob(job.getConfiguration(),

View File

@ -31,7 +31,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.MRJobConstants;
import org.junit.Assert;
import org.junit.Test;

View File

@ -39,7 +39,7 @@
* to hadoop JobConf object. This implementation was chosen because it's not clear
* how MapReduce is converting one object to another.
*/
public class TestConfigurationUtils {
public class TestMRConfigurationUtils {
Job job;
JobConf jobConfSpy;
@ -61,49 +61,49 @@ public void setUpHadoopJobConf() throws Exception {
@Test
public void testLinkConfiguration() throws Exception {
ConfigurationUtils.setConnectorLinkConfig(Direction.FROM, job, getConfig());
MRConfigurationUtils.setConnectorLinkConfig(Direction.FROM, job, getConfig());
setUpHadoopJobConf();
assertEquals(getConfig(), ConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, jobConfSpy));
assertEquals(getConfig(), MRConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, jobConfSpy));
ConfigurationUtils.setConnectorLinkConfig(Direction.TO, job, getConfig());
MRConfigurationUtils.setConnectorLinkConfig(Direction.TO, job, getConfig());
setUpHadoopJobConf();
assertEquals(getConfig(), ConfigurationUtils.getConnectorConnectionConfig(Direction.TO, jobConfSpy));
assertEquals(getConfig(), MRConfigurationUtils.getConnectorConnectionConfig(Direction.TO, jobConfSpy));
}
@Test
public void testJobConfiguration() throws Exception {
ConfigurationUtils.setConnectorJobConfig(Direction.FROM, job, getConfig());
MRConfigurationUtils.setConnectorJobConfig(Direction.FROM, job, getConfig());
setUpHadoopJobConf();
assertEquals(getConfig(), ConfigurationUtils.getConnectorJobConfig(Direction.FROM, jobConfSpy));
assertEquals(getConfig(), MRConfigurationUtils.getConnectorJobConfig(Direction.FROM, jobConfSpy));
ConfigurationUtils.setConnectorJobConfig(Direction.TO, job, getConfig());
MRConfigurationUtils.setConnectorJobConfig(Direction.TO, job, getConfig());
setUpHadoopJobConf();
assertEquals(getConfig(), ConfigurationUtils.getConnectorJobConfig(Direction.TO, jobConfSpy));
assertEquals(getConfig(), MRConfigurationUtils.getConnectorJobConfig(Direction.TO, jobConfSpy));
}
@Test
public void testDriverConfiguration() throws Exception {
ConfigurationUtils.setDriverConfig(job, getConfig());
MRConfigurationUtils.setDriverConfig(job, getConfig());
setUpHadoopJobConf();
assertEquals(getConfig(), ConfigurationUtils.getDriverConfig(jobConfSpy));
assertEquals(getConfig(), MRConfigurationUtils.getDriverConfig(jobConfSpy));
}
@Test
public void testConnectorSchema() throws Exception {
ConfigurationUtils.setConnectorSchema(Direction.FROM, job, getSchema("a"));
assertEquals(getSchema("a"), ConfigurationUtils.getConnectorSchema(Direction.FROM, jobConfSpy));
MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, getSchema("a"));
assertEquals(getSchema("a"), MRConfigurationUtils.getConnectorSchema(Direction.FROM, jobConfSpy));
ConfigurationUtils.setConnectorSchema(Direction.TO, job, getSchema("b"));
assertEquals(getSchema("b"), ConfigurationUtils.getConnectorSchema(Direction.TO, jobConfSpy));
MRConfigurationUtils.setConnectorSchema(Direction.TO, job, getSchema("b"));
assertEquals(getSchema("b"), MRConfigurationUtils.getConnectorSchema(Direction.TO, jobConfSpy));
}
@Test
public void testConnectorSchemaNull() throws Exception {
ConfigurationUtils.setConnectorSchema(Direction.FROM, job, null);
assertNull(ConfigurationUtils.getConnectorSchema(Direction.FROM, jobConfSpy));
MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, null);
assertNull(MRConfigurationUtils.getConnectorSchema(Direction.FROM, jobConfSpy));
ConfigurationUtils.setConnectorSchema(Direction.TO, job, null);
assertNull(ConfigurationUtils.getConnectorSchema(Direction.FROM, jobConfSpy));
MRConfigurationUtils.setConnectorSchema(Direction.TO, job, null);
assertNull(MRConfigurationUtils.getConnectorSchema(Direction.FROM, jobConfSpy));
}
private Schema getSchema(String name) {

View File

@ -24,7 +24,7 @@
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
import org.apache.sqoop.connector.idf.IntermediateDataFormat;
import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.MRJobConstants;
import org.apache.sqoop.job.etl.Loader;
import org.apache.sqoop.job.etl.LoaderContext;
import org.apache.sqoop.job.io.SqoopWritable;
@ -120,13 +120,13 @@ public void load(LoaderContext context, Object cc, Object jc) throws Exception {
@Before
public void setUp() {
conf = new Configuration();
conf.setIfUnset(JobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
conf.setIfUnset(MRJobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
}
@Test(expected = BrokenBarrierException.class)
public void testWhenLoaderThrows() throws Throwable {
conf.set(JobConstants.JOB_ETL_LOADER, ThrowingLoader.class.getName());
conf.set(MRJobConstants.JOB_ETL_LOADER, ThrowingLoader.class.getName());
SqoopOutputFormatLoadExecutor executor = new
SqoopOutputFormatLoadExecutor(true, ThrowingLoader.class.getName());
RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter();
@ -145,7 +145,7 @@ public void testWhenLoaderThrows() throws Throwable {
@Test
public void testSuccessfulContinuousLoader() throws Throwable {
conf.set(JobConstants.JOB_ETL_LOADER, GoodContinuousLoader.class.getName());
conf.set(MRJobConstants.JOB_ETL_LOADER, GoodContinuousLoader.class.getName());
SqoopOutputFormatLoadExecutor executor = new
SqoopOutputFormatLoadExecutor(true, GoodContinuousLoader.class.getName());
RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter();
@ -192,7 +192,7 @@ public void testSuccessfulLoader() throws Throwable {
@Test(expected = ConcurrentModificationException.class)
public void testThrowingContinuousLoader() throws Throwable {
conf.set(JobConstants.JOB_ETL_LOADER, ThrowingContinuousLoader.class.getName());
conf.set(MRJobConstants.JOB_ETL_LOADER, ThrowingContinuousLoader.class.getName());
SqoopOutputFormatLoadExecutor executor = new
SqoopOutputFormatLoadExecutor(true, ThrowingContinuousLoader.class.getName());
RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter();

View File

@ -39,8 +39,8 @@
import org.apache.sqoop.execution.mapreduce.MRJobRequest;
import org.apache.sqoop.execution.mapreduce.MapreduceExecutionEngine;
import org.apache.sqoop.driver.JobRequest;
import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.mr.ConfigurationUtils;
import org.apache.sqoop.job.MRJobConstants;
import org.apache.sqoop.job.mr.MRConfigurationUtils;
import org.apache.sqoop.submission.SubmissionStatus;
import org.apache.sqoop.submission.counter.Counter;
import org.apache.sqoop.submission.counter.CounterGroup;
@ -172,7 +172,7 @@ public boolean submit(JobRequest mrJobRequest) {
continue;
}
configuration.set(
JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT + entry.getKey(),
MRJobConstants.PREFIX_CONNECTOR_FROM_CONTEXT + entry.getKey(),
entry.getValue());
}
@ -182,7 +182,7 @@ public boolean submit(JobRequest mrJobRequest) {
continue;
}
configuration.set(
JobConstants.PREFIX_CONNECTOR_TO_CONTEXT + entry.getKey(),
MRJobConstants.PREFIX_CONNECTOR_TO_CONTEXT + entry.getKey(),
entry.getValue());
}
@ -202,17 +202,17 @@ public boolean submit(JobRequest mrJobRequest) {
Job job = new Job(configuration);
// link configs
ConfigurationUtils.setConnectorLinkConfig(Direction.FROM, job, request.getConnectorLinkConfig(Direction.FROM));
ConfigurationUtils.setConnectorLinkConfig(Direction.TO, job, request.getConnectorLinkConfig(Direction.TO));
MRConfigurationUtils.setConnectorLinkConfig(Direction.FROM, job, request.getConnectorLinkConfig(Direction.FROM));
MRConfigurationUtils.setConnectorLinkConfig(Direction.TO, job, request.getConnectorLinkConfig(Direction.TO));
// from and to configs
ConfigurationUtils.setConnectorJobConfig(Direction.FROM, job, request.getJobConfig(Direction.FROM));
ConfigurationUtils.setConnectorJobConfig(Direction.TO, job, request.getJobConfig(Direction.TO));
MRConfigurationUtils.setConnectorJobConfig(Direction.FROM, job, request.getJobConfig(Direction.FROM));
MRConfigurationUtils.setConnectorJobConfig(Direction.TO, job, request.getJobConfig(Direction.TO));
ConfigurationUtils.setDriverConfig(job, request.getDriverConfig());
MRConfigurationUtils.setDriverConfig(job, request.getDriverConfig());
// @TODO(Abe): Persist TO schema.
ConfigurationUtils.setConnectorSchema(Direction.FROM, job, request.getSummary().getFromSchema());
ConfigurationUtils.setConnectorSchema(Direction.TO, job, request.getSummary().getToSchema());
MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, request.getSummary().getFromSchema());
MRConfigurationUtils.setConnectorSchema(Direction.TO, job, request.getSummary().getToSchema());
if(request.getJobName() != null) {
job.setJobName("Sqoop: " + request.getJobName());