From 92062d5343e6318fe703b06e30c52920938818b0 Mon Sep 17 00:00:00 2001 From: Cheolsoo Park Date: Wed, 30 Jan 2013 15:13:18 -0800 Subject: [PATCH] SQOOP-842: Put partition to template in Extractor as well (Jarcec Cecho via Cheolsoo Park) --- .../jdbc/GenericJdbcImportExtractor.java | 6 +++--- .../job/etl/HdfsSequenceExportExtractor.java | 15 ++++++++------- .../sqoop/job/etl/HdfsTextExportExtractor.java | 8 +++++--- .../java/org/apache/sqoop/job/TestHdfsLoad.java | 2 +- .../java/org/apache/sqoop/job/TestMapReduce.java | 2 +- .../java/org/apache/sqoop/job/etl/Extractor.java | 6 +++--- 6 files changed, 21 insertions(+), 18 deletions(-) diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java index f4389a3a..9db33286 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java @@ -30,13 +30,13 @@ import org.apache.sqoop.job.etl.Extractor; import org.apache.sqoop.job.io.DataWriter; -public class GenericJdbcImportExtractor extends Extractor { +public class GenericJdbcImportExtractor extends Extractor { public static final Logger LOG = Logger.getLogger(GenericJdbcImportExtractor.class); private long rowsRead = 0; @Override - public void run(ImmutableContext context, ConnectionConfiguration connection, ImportJobConfiguration job, Partition partition, DataWriter writer) { + public void run(ImmutableContext context, ConnectionConfiguration connection, ImportJobConfiguration job, GenericJdbcImportPartition partition, DataWriter writer) { String driver = connection.connection.jdbcDriver; String url = connection.connection.connectionString; String username = connection.connection.username; @@ -44,7 +44,7 @@ public void run(ImmutableContext context, ConnectionConfiguration connection, Im GenericJdbcExecutor executor = new GenericJdbcExecutor(driver, url, username, password); String query = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL); - String conditions = ((GenericJdbcImportPartition)partition).getConditions(); + String conditions = partition.getConditions(); query = query.replace(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, conditions); LOG.info("Using query: " + query); diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java index 3a04e59d..45b61667 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java @@ -27,12 +27,14 @@ import org.apache.hadoop.io.Text; import org.apache.sqoop.common.ImmutableContext; import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.framework.configuration.ConnectionConfiguration; +import org.apache.sqoop.framework.configuration.ExportJobConfiguration; import org.apache.sqoop.job.MapreduceExecutionError; import org.apache.sqoop.job.PrefixContext; import org.apache.sqoop.job.io.Data; import org.apache.sqoop.job.io.DataWriter; -public class HdfsSequenceExportExtractor extends Extractor { +public class HdfsSequenceExportExtractor extends Extractor { public static final Log LOG = LogFactory.getLog(HdfsSequenceExportExtractor.class.getName()); @@ -47,19 +49,18 @@ public HdfsSequenceExportExtractor() { } @Override - public void run(ImmutableContext context, Object connectionConfiguration, - Object jobConfiguration, Partition partition, DataWriter writer) { + public void run(ImmutableContext context, ConnectionConfiguration connectionConfiguration, + ExportJobConfiguration jobConfiguration, HdfsExportPartition partition, DataWriter writer) { writer.setFieldDelimiter(fieldDelimiter); conf = ((PrefixContext)context).getConfiguration(); datawriter = writer; try { - HdfsExportPartition p = (HdfsExportPartition)partition; - LOG.info("Working on partition: " + p); - int numFiles = p.getNumberOfFiles(); + LOG.info("Working on partition: " + partition); + int numFiles = partition.getNumberOfFiles(); for (int i=0; i { public static final Log LOG = LogFactory.getLog(HdfsTextExportExtractor.class.getName()); @@ -53,8 +55,8 @@ public HdfsTextExportExtractor() { } @Override - public void run(ImmutableContext context, Object connectionConfiguration, - Object jobConfiguration, Partition partition, DataWriter writer) { + public void run(ImmutableContext context, ConnectionConfiguration connectionConfiguration, + ExportJobConfiguration jobConfiguration, HdfsExportPartition partition, DataWriter writer) { writer.setFieldDelimiter(fieldDelimiter); conf = ((PrefixContext)context).getConfiguration(); diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java index 4e6209d7..6e1c9588 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java @@ -217,7 +217,7 @@ public List getPartitions(ImmutableContext context, long maxPartition public static class DummyExtractor extends Extractor { @Override - public void run(ImmutableContext context, Object oc, Object oj, Partition partition, DataWriter writer) { + public void run(ImmutableContext context, Object oc, Object oj, Object partition, DataWriter writer) { int id = ((DummyPartition)partition).getId(); for (int row = 0; row < NUMBER_OF_ROWS_PER_ID; row++) { Object[] array = new Object[] { diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java index 85900650..427132e2 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java @@ -133,7 +133,7 @@ public List getPartitions(ImmutableContext context, long maxPartition public static class DummyExtractor extends Extractor { @Override - public void run(ImmutableContext context, Object oc, Object oj, Partition partition, DataWriter writer) { + public void run(ImmutableContext context, Object oc, Object oj, Object partition, DataWriter writer) { int id = ((DummyPartition)partition).getId(); for (int row = 0; row < NUMBER_OF_ROWS_PER_PARTITION; row++) { writer.writeArrayRecord(new Object[] { diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java b/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java index fac6f052..300cf4eb 100644 --- a/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java +++ b/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java @@ -24,7 +24,7 @@ * This allows connector to extract data from a source system * based on each partition. */ -public abstract class Extractor { +public abstract class Extractor { public abstract void run(ImmutableContext context, ConnectionConfiguration connectionConfiguration, @@ -34,14 +34,14 @@ public abstract void run(ImmutableContext context, /** * Return the number of rows read by the last call to - * {@linkplain Extractor#run(org.apache.sqoop.common.ImmutableContext, java.lang.Object, java.lang.Object, org.apache.sqoop.job.etl.Partition, org.apache.sqoop.job.io.DataWriter) } + * {@linkplain Extractor#run(org.apache.sqoop.common.ImmutableContext, java.lang.Object, java.lang.Object, Partition, org.apache.sqoop.job.io.DataWriter) } * method. This method returns only the number of rows read in the last call, * and not a cumulative total of the number of rows read by this Extractor * since its creation. If no calls were made to the run method, this method's * behavior is undefined. * * @return the number of rows read by the last call to - * {@linkplain Extractor#run(org.apache.sqoop.common.ImmutableContext, java.lang.Object, java.lang.Object, org.apache.sqoop.job.etl.Partition, org.apache.sqoop.job.io.DataWriter) } + * {@linkplain Extractor#run(org.apache.sqoop.common.ImmutableContext, java.lang.Object, java.lang.Object, Partition, org.apache.sqoop.job.io.DataWriter) } */ public abstract long getRowsRead();