5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-21 03:10:49 +08:00

SQOOP-842: Put partition to template in Extractor as well

(Jarcec Cecho via Cheolsoo Park)
This commit is contained in:
Cheolsoo Park 2013-01-30 15:13:18 -08:00
parent 03408d573e
commit 92062d5343
6 changed files with 21 additions and 18 deletions

View File

@ -30,13 +30,13 @@
import org.apache.sqoop.job.etl.Extractor;
import org.apache.sqoop.job.io.DataWriter;
public class GenericJdbcImportExtractor extends Extractor<ConnectionConfiguration, ImportJobConfiguration> {
public class GenericJdbcImportExtractor extends Extractor<ConnectionConfiguration, ImportJobConfiguration, GenericJdbcImportPartition> {
public static final Logger LOG = Logger.getLogger(GenericJdbcImportExtractor.class);
private long rowsRead = 0;
@Override
public void run(ImmutableContext context, ConnectionConfiguration connection, ImportJobConfiguration job, Partition partition, DataWriter writer) {
public void run(ImmutableContext context, ConnectionConfiguration connection, ImportJobConfiguration job, GenericJdbcImportPartition partition, DataWriter writer) {
String driver = connection.connection.jdbcDriver;
String url = connection.connection.connectionString;
String username = connection.connection.username;
@ -44,7 +44,7 @@ public void run(ImmutableContext context, ConnectionConfiguration connection, Im
GenericJdbcExecutor executor = new GenericJdbcExecutor(driver, url, username, password);
String query = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL);
String conditions = ((GenericJdbcImportPartition)partition).getConditions();
String conditions = partition.getConditions();
query = query.replace(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, conditions);
LOG.info("Using query: " + query);

View File

@ -27,12 +27,14 @@
import org.apache.hadoop.io.Text;
import org.apache.sqoop.common.ImmutableContext;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.framework.configuration.ConnectionConfiguration;
import org.apache.sqoop.framework.configuration.ExportJobConfiguration;
import org.apache.sqoop.job.MapreduceExecutionError;
import org.apache.sqoop.job.PrefixContext;
import org.apache.sqoop.job.io.Data;
import org.apache.sqoop.job.io.DataWriter;
public class HdfsSequenceExportExtractor extends Extractor {
public class HdfsSequenceExportExtractor extends Extractor<ConnectionConfiguration, ExportJobConfiguration, HdfsExportPartition> {
public static final Log LOG =
LogFactory.getLog(HdfsSequenceExportExtractor.class.getName());
@ -47,19 +49,18 @@ public HdfsSequenceExportExtractor() {
}
@Override
public void run(ImmutableContext context, Object connectionConfiguration,
Object jobConfiguration, Partition partition, DataWriter writer) {
public void run(ImmutableContext context, ConnectionConfiguration connectionConfiguration,
ExportJobConfiguration jobConfiguration, HdfsExportPartition partition, DataWriter writer) {
writer.setFieldDelimiter(fieldDelimiter);
conf = ((PrefixContext)context).getConfiguration();
datawriter = writer;
try {
HdfsExportPartition p = (HdfsExportPartition)partition;
LOG.info("Working on partition: " + p);
int numFiles = p.getNumberOfFiles();
LOG.info("Working on partition: " + partition);
int numFiles = partition.getNumberOfFiles();
for (int i=0; i<numFiles; i++) {
extractFile(p.getFile(i), p.getOffset(i), p.getLength(i));
extractFile(partition.getFile(i), partition.getOffset(i), partition.getLength(i));
}
} catch (IOException e) {
throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0017, e);

View File

@ -33,12 +33,14 @@
import org.apache.hadoop.util.LineReader;
import org.apache.sqoop.common.ImmutableContext;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.framework.configuration.ConnectionConfiguration;
import org.apache.sqoop.framework.configuration.ExportJobConfiguration;
import org.apache.sqoop.job.MapreduceExecutionError;
import org.apache.sqoop.job.PrefixContext;
import org.apache.sqoop.job.io.Data;
import org.apache.sqoop.job.io.DataWriter;
public class HdfsTextExportExtractor extends Extractor {
public class HdfsTextExportExtractor extends Extractor<ConnectionConfiguration, ExportJobConfiguration, HdfsExportPartition> {
public static final Log LOG =
LogFactory.getLog(HdfsTextExportExtractor.class.getName());
@ -53,8 +55,8 @@ public HdfsTextExportExtractor() {
}
@Override
public void run(ImmutableContext context, Object connectionConfiguration,
Object jobConfiguration, Partition partition, DataWriter writer) {
public void run(ImmutableContext context, ConnectionConfiguration connectionConfiguration,
ExportJobConfiguration jobConfiguration, HdfsExportPartition partition, DataWriter writer) {
writer.setFieldDelimiter(fieldDelimiter);
conf = ((PrefixContext)context).getConfiguration();

View File

@ -217,7 +217,7 @@ public List<Partition> getPartitions(ImmutableContext context, long maxPartition
public static class DummyExtractor extends Extractor {
@Override
public void run(ImmutableContext context, Object oc, Object oj, Partition partition, DataWriter writer) {
public void run(ImmutableContext context, Object oc, Object oj, Object partition, DataWriter writer) {
int id = ((DummyPartition)partition).getId();
for (int row = 0; row < NUMBER_OF_ROWS_PER_ID; row++) {
Object[] array = new Object[] {

View File

@ -133,7 +133,7 @@ public List<Partition> getPartitions(ImmutableContext context, long maxPartition
public static class DummyExtractor extends Extractor {
@Override
public void run(ImmutableContext context, Object oc, Object oj, Partition partition, DataWriter writer) {
public void run(ImmutableContext context, Object oc, Object oj, Object partition, DataWriter writer) {
int id = ((DummyPartition)partition).getId();
for (int row = 0; row < NUMBER_OF_ROWS_PER_PARTITION; row++) {
writer.writeArrayRecord(new Object[] {

View File

@ -24,7 +24,7 @@
* This allows connector to extract data from a source system
* based on each partition.
*/
public abstract class Extractor<ConnectionConfiguration, JobConfiguration> {
public abstract class Extractor<ConnectionConfiguration, JobConfiguration, Partition> {
public abstract void run(ImmutableContext context,
ConnectionConfiguration connectionConfiguration,
@ -34,14 +34,14 @@ public abstract void run(ImmutableContext context,
/**
* Return the number of rows read by the last call to
* {@linkplain Extractor#run(org.apache.sqoop.common.ImmutableContext, java.lang.Object, java.lang.Object, org.apache.sqoop.job.etl.Partition, org.apache.sqoop.job.io.DataWriter) }
* {@linkplain Extractor#run(org.apache.sqoop.common.ImmutableContext, java.lang.Object, java.lang.Object, Partition, org.apache.sqoop.job.io.DataWriter) }
* method. This method returns only the number of rows read in the last call,
* and not a cumulative total of the number of rows read by this Extractor
* since its creation. If no calls were made to the run method, this method's
* behavior is undefined.
*
* @return the number of rows read by the last call to
* {@linkplain Extractor#run(org.apache.sqoop.common.ImmutableContext, java.lang.Object, java.lang.Object, org.apache.sqoop.job.etl.Partition, org.apache.sqoop.job.io.DataWriter) }
* {@linkplain Extractor#run(org.apache.sqoop.common.ImmutableContext, java.lang.Object, java.lang.Object, Partition, org.apache.sqoop.job.io.DataWriter) }
*/
public abstract long getRowsRead();