5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-03 23:22:51 +08:00

SQOOP-864: Introduce ETL context objects

(Jarcec Cecho via Cheolsoo Park)
This commit is contained in:
Cheolsoo Park 2013-02-10 12:55:31 -08:00
parent addc87ee4b
commit ed9c514361
39 changed files with 490 additions and 165 deletions

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.job.io;
package org.apache.sqoop.etl.io;
/**
* An intermediate layer for passing data from the MR framework

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.job.io;
package org.apache.sqoop.etl.io;
/**
* An intermediate layer for passing data from the ETL framework

View File

@ -0,0 +1,77 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.job.etl;
import org.apache.sqoop.common.ImmutableContext;
/**
* Basic context class for each actor containing only the connector/framework
* context object.
*/
public abstract class ActorContext {
ImmutableContext context;
public ActorContext(ImmutableContext context) {
this.context = context;
}
/**
* Context object associated with the particular actor
*
* @return
*/
public ImmutableContext getContext() {
return context;
}
/**
* Convenience method that will return value from wrapped context class.
*/
public String getString(String key) {
return context.getString(key);
}
/**
* Convenience method that will return value from wrapped context class.
*/
public String getString(String key, String defaultValue) {
return context.getString(key, defaultValue);
}
/**
* Convenience method that will return value from wrapped context class.
*/
public long getLong(String key, long defaultValue) {
return context.getLong(key, defaultValue);
}
/**
* Convenience method that will return value from wrapped context class.
*/
public int getInt(String key, int defaultValue) {
return context.getInt(key, defaultValue);
}
/**
* Convenience method that will return value from wrapped context class.
*/
public boolean getBoolean(String key, boolean defaultValue) {
return context.getBoolean(key, defaultValue);
}
}

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.job.etl;
import org.apache.sqoop.common.ImmutableContext;
/**
* Context implementation for Destroyer.
*
* This class is wrapping information if the run was successful or not.
*/
public class DestroyerContext extends ActorContext {
private boolean success;
public DestroyerContext(ImmutableContext context, boolean success) {
super(context);
this.success = success;
}
/**
* Return true if the job was successful.
*
* @return True if the job was successful
*/
public boolean isSuccess() {
return success;
}
}

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.job.etl;
import org.apache.sqoop.common.ImmutableContext;
import org.apache.sqoop.etl.io.DataWriter;
/**
* Context implementation for Extractor.
*
* This class is wrapping writer object.
*/
public class ExtractorContext extends ActorContext {
private DataWriter writer;
public ExtractorContext(ImmutableContext context, DataWriter writer) {
super(context);
this.writer = writer;
}
/**
* Return associated data writer object.
*
* @return Data writer object for extract output
*/
public DataWriter getDataWriter() {
return writer;
}
}

View File

@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.job.etl;
import org.apache.sqoop.common.MutableContext;
/**
*
* Context implementation for Initializer.
*
* This class is returning mutable context instead of immutable.
*/
public class InitializerContext extends ActorContext {
public InitializerContext(MutableContext context) {
super(context);
}
/**
* Return mutable context.
*
* Initializer can set up multiple properties that will be passed to each
* extractor and loader.
*
* @return Mutable context object
*/
@Override
public MutableContext getContext() {
return (MutableContext)super.getContext();
}
}

View File

@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.job.etl;
import org.apache.sqoop.common.ImmutableContext;
import org.apache.sqoop.etl.io.DataReader;
/**
* Context implementation for Loader.
*
* This class is also wrapping reader object.
*/
public class LoaderContext extends ActorContext {
DataReader reader;
public LoaderContext(ImmutableContext context, DataReader reader) {
super(context);
this.reader = reader;
}
/**
* Return associated data reader object.
*
* @return Data reader object for loader input
*/
public DataReader getDataReader() {
return reader;
}
}

View File

@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.job.etl;
import org.apache.sqoop.common.ImmutableContext;
/**
* Context implementation for Partitioner.
*
* This class is also wrapping number of maximal allowed partitions.
*/
public class PartitionerContext extends ActorContext {
private long maxPartitions;
public PartitionerContext(ImmutableContext context, long maxPartitions) {
super(context);
this.maxPartitions = maxPartitions;
}
/**
* Return maximal number of partitions.
*
* Framework will ensure that number of returned partitions is not bigger
* than this number.
*
* @return
*/
public long getMaxPartitions() {
return maxPartitions;
}
}

View File

@ -18,17 +18,17 @@
package org.apache.sqoop.connector.jdbc;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.ImmutableContext;
import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration;
import org.apache.sqoop.job.etl.Destroyer;
import org.apache.sqoop.job.etl.DestroyerContext;
public class GenericJdbcExportDestroyer extends Destroyer<ConnectionConfiguration, ExportJobConfiguration> {
private static final Logger LOG = Logger.getLogger(GenericJdbcExportDestroyer.class);
@Override
public void destroy(boolean success, ImmutableContext context, ConnectionConfiguration connection, ExportJobConfiguration job) {
public void destroy(DestroyerContext context, ConnectionConfiguration connection, ExportJobConfiguration job) {
LOG.info("Running generic JDBC connector destroyer");
}
}

View File

@ -21,12 +21,12 @@
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.sqoop.common.ImmutableContext;
import org.apache.sqoop.common.MutableContext;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration;
import org.apache.sqoop.job.etl.Initializer;
import org.apache.sqoop.job.etl.InitializerContext;
import org.apache.sqoop.utils.ClassUtils;
public class GenericJdbcExportInitializer extends Initializer<ConnectionConfiguration, ExportJobConfiguration> {
@ -34,17 +34,17 @@ public class GenericJdbcExportInitializer extends Initializer<ConnectionConfigur
private GenericJdbcExecutor executor;
@Override
public void initialize(MutableContext context, ConnectionConfiguration connection, ExportJobConfiguration job) {
configureJdbcProperties(context, connection, job);
public void initialize(InitializerContext context, ConnectionConfiguration connection, ExportJobConfiguration job) {
configureJdbcProperties(context.getContext(), connection, job);
try {
configureTableProperties(context, connection, job);
configureTableProperties(context.getContext(), connection, job);
} finally {
executor.close();
}
}
@Override
public List<String> getJars(ImmutableContext context, ConnectionConfiguration connection, ExportJobConfiguration job) {
public List<String> getJars(InitializerContext context, ConnectionConfiguration connection, ExportJobConfiguration job) {
List<String> jars = new LinkedList<String>();
jars.add(ClassUtils.jarForClass(connection.connection.jdbcDriver));

View File

@ -17,11 +17,10 @@
*/
package org.apache.sqoop.connector.jdbc;
import org.apache.sqoop.common.ImmutableContext;
import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration;
import org.apache.sqoop.job.etl.Loader;
import org.apache.sqoop.job.io.DataReader;
import org.apache.sqoop.job.etl.LoaderContext;
public class GenericJdbcExportLoader extends Loader<ConnectionConfiguration, ExportJobConfiguration> {
@ -31,7 +30,7 @@ public class GenericJdbcExportLoader extends Loader<ConnectionConfiguration, Exp
private int batchesPerTransaction = DEFAULT_BATCHES_PER_TRANSACTION;
@Override
public void load(ImmutableContext context, ConnectionConfiguration connection, ExportJobConfiguration job, DataReader reader) throws Exception{
public void load(LoaderContext context, ConnectionConfiguration connection, ExportJobConfiguration job) throws Exception{
String driver = connection.connection.jdbcDriver;
String url = connection.connection.connectionString;
String username = connection.connection.username;
@ -46,7 +45,7 @@ public void load(ImmutableContext context, ConnectionConfiguration connection, E
int numberOfBatches = 0;
Object[] array;
while ((array = reader.readArrayRecord()) != null) {
while ((array = context.getDataReader().readArrayRecord()) != null) {
numberOfRows++;
executor.addBatch(array);

View File

@ -17,16 +17,19 @@
*/
package org.apache.sqoop.connector.jdbc;
import org.apache.sqoop.common.ImmutableContext;
import org.apache.log4j.Logger;
import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
import org.apache.sqoop.job.etl.Destroyer;
import org.apache.sqoop.job.etl.DestroyerContext;
public class GenericJdbcImportDestroyer extends Destroyer<ConnectionConfiguration, ImportJobConfiguration> {
private static final Logger LOG = Logger.getLogger(GenericJdbcExportDestroyer.class);
@Override
public void destroy(boolean success, ImmutableContext context, ConnectionConfiguration connection, ImportJobConfiguration job) {
// No explicit action at the moment
public void destroy(DestroyerContext context, ConnectionConfiguration connection, ImportJobConfiguration job) {
LOG.info("Running generic JDBC connector destroyer");
}
}

View File

@ -22,13 +22,11 @@
import java.sql.SQLException;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.ImmutableContext;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
import org.apache.sqoop.job.etl.Partition;
import org.apache.sqoop.job.etl.ExtractorContext;
import org.apache.sqoop.job.etl.Extractor;
import org.apache.sqoop.job.io.DataWriter;
public class GenericJdbcImportExtractor extends Extractor<ConnectionConfiguration, ImportJobConfiguration, GenericJdbcImportPartition> {
@ -36,7 +34,7 @@ public class GenericJdbcImportExtractor extends Extractor<ConnectionConfiguratio
private long rowsRead = 0;
@Override
public void run(ImmutableContext context, ConnectionConfiguration connection, ImportJobConfiguration job, GenericJdbcImportPartition partition, DataWriter writer) {
public void extract(ExtractorContext context, ConnectionConfiguration connection, ImportJobConfiguration job, GenericJdbcImportPartition partition) {
String driver = connection.connection.jdbcDriver;
String url = connection.connection.connectionString;
String username = connection.connection.username;
@ -59,7 +57,7 @@ public void run(ImmutableContext context, ConnectionConfiguration connection, Im
for (int i = 0; i< column; i++) {
array[i] = resultSet.getObject(i+1);
}
writer.writeArrayRecord(array);
context.getDataWriter().writeArrayRecord(array);
rowsRead++;
}
} catch (SQLException e) {

View File

@ -25,13 +25,13 @@
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.ImmutableContext;
import org.apache.sqoop.common.MutableContext;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
import org.apache.sqoop.job.Constants;
import org.apache.sqoop.job.etl.Initializer;
import org.apache.sqoop.job.etl.InitializerContext;
import org.apache.sqoop.utils.ClassUtils;
public class GenericJdbcImportInitializer extends Initializer<ConnectionConfiguration, ImportJobConfiguration> {
@ -42,18 +42,18 @@ public class GenericJdbcImportInitializer extends Initializer<ConnectionConfigur
private GenericJdbcExecutor executor;
@Override
public void initialize(MutableContext context, ConnectionConfiguration connection, ImportJobConfiguration job) {
configureJdbcProperties(context, connection, job);
public void initialize(InitializerContext context, ConnectionConfiguration connection, ImportJobConfiguration job) {
configureJdbcProperties(context.getContext(), connection, job);
try {
configurePartitionProperties(context, connection, job);
configureTableProperties(context, connection, job);
configurePartitionProperties(context.getContext(), connection, job);
configureTableProperties(context.getContext(), connection, job);
} finally {
executor.close();
}
}
@Override
public List<String> getJars(ImmutableContext context, ConnectionConfiguration connection, ImportJobConfiguration job) {
public List<String> getJars(InitializerContext context, ConnectionConfiguration connection, ImportJobConfiguration job) {
List<String> jars = new LinkedList<String>();
jars.add(ClassUtils.jarForClass(connection.connection.jdbcDriver));

View File

@ -21,12 +21,12 @@
import java.util.LinkedList;
import java.util.List;
import org.apache.sqoop.common.ImmutableContext;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
import org.apache.sqoop.job.etl.Partition;
import org.apache.sqoop.job.etl.Partitioner;
import org.apache.sqoop.job.etl.PartitionerContext;
public class GenericJdbcImportPartitioner extends Partitioner<ConnectionConfiguration, ImportJobConfiguration> {
@ -37,8 +37,8 @@ public class GenericJdbcImportPartitioner extends Partitioner<ConnectionConfigur
private String partitionMaxValue;
@Override
public List<Partition> getPartitions(ImmutableContext context, long maxPartitions, ConnectionConfiguration connection, ImportJobConfiguration job) {
numberPartitions = maxPartitions;
public List<Partition> getPartitions(PartitionerContext context,ConnectionConfiguration connection, ImportJobConfiguration job) {
numberPartitions = context.getMaxPartitions();
partitionColumnName = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME);
partitionColumnType = context.getInt(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, -1);
partitionMinValue = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE);

View File

@ -23,8 +23,8 @@
import org.apache.sqoop.common.MutableMapContext;
import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration;
import org.apache.sqoop.job.Constants;
import org.apache.sqoop.job.etl.Initializer;
import org.apache.sqoop.job.etl.InitializerContext;
public class TestExportInitializer extends TestCase {
@ -66,9 +66,10 @@ public void testTableName() throws Exception {
jobConf.table.tableName = tableName;
MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context);
Initializer initializer = new GenericJdbcExportInitializer();
initializer.initialize(context, connConf, jobConf);
initializer.initialize(initializerContext, connConf, jobConf);
verifyResult(context,
"INSERT INTO " + executor.delimitIdentifier(tableName)
@ -85,9 +86,10 @@ public void testTableNameWithTableColumns() throws Exception {
jobConf.table.columns = tableColumns;
MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context);
Initializer initializer = new GenericJdbcExportInitializer();
initializer.initialize(context, connConf, jobConf);
initializer.initialize(initializerContext, connConf, jobConf);
verifyResult(context,
"INSERT INTO " + executor.delimitIdentifier(tableName)
@ -103,9 +105,10 @@ public void testTableSql() throws Exception {
jobConf.table.sql = tableSql;
MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context);
Initializer initializer = new GenericJdbcExportInitializer();
initializer.initialize(context, connConf, jobConf);
initializer.initialize(initializerContext, connConf, jobConf);
verifyResult(context,
"INSERT INTO " + executor.delimitIdentifier(tableName)

View File

@ -26,7 +26,8 @@
import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration;
import org.apache.sqoop.job.etl.Loader;
import org.apache.sqoop.job.io.DataReader;
import org.apache.sqoop.job.etl.LoaderContext;
import org.apache.sqoop.etl.io.DataReader;
public class TestExportLoader extends TestCase {
@ -73,8 +74,8 @@ public void testInsert() throws Exception {
Loader loader = new GenericJdbcExportLoader();
DummyReader reader = new DummyReader();
loader.load(context, connectionConfig, jobConfig, reader);
LoaderContext loaderContext = new LoaderContext(context, reader);
loader.load(loaderContext, connectionConfig, jobConfig);
int index = START;
ResultSet rs = executor.executeQuery("SELECT * FROM "

View File

@ -24,7 +24,8 @@
import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
import org.apache.sqoop.job.etl.Extractor;
import org.apache.sqoop.job.io.DataWriter;
import org.apache.sqoop.job.etl.ExtractorContext;
import org.apache.sqoop.etl.io.DataWriter;
public class TestImportExtractor extends TestCase {
@ -80,18 +81,19 @@ public void testQuery() throws Exception {
Extractor extractor = new GenericJdbcImportExtractor();
DummyWriter writer = new DummyWriter();
ExtractorContext extractorContext = new ExtractorContext(context, writer);
partition = new GenericJdbcImportPartition();
partition.setConditions("-50.0 <= DCOL AND DCOL < -16.6666666666666665");
extractor.run(context, connectionConfig, jobConfig, partition, writer);
extractor.extract(extractorContext, connectionConfig, jobConfig, partition);
partition = new GenericJdbcImportPartition();
partition.setConditions("-16.6666666666666665 <= DCOL AND DCOL < 16.666666666666667");
extractor.run(context, connectionConfig, jobConfig, partition, writer);
extractor.extract(extractorContext, connectionConfig, jobConfig, partition);
partition = new GenericJdbcImportPartition();
partition.setConditions("16.666666666666667 <= DCOL AND DCOL <= 50.0");
extractor.run(context, connectionConfig, jobConfig, partition, writer);
extractor.extract(extractorContext, connectionConfig, jobConfig, partition);
}
public void testSubquery() throws Exception {
@ -113,18 +115,19 @@ public void testSubquery() throws Exception {
Extractor extractor = new GenericJdbcImportExtractor();
DummyWriter writer = new DummyWriter();
ExtractorContext extractorContext = new ExtractorContext(context, writer);
partition = new GenericJdbcImportPartition();
partition.setConditions("-50 <= ICOL AND ICOL < -16");
extractor.run(context, connectionConfig, jobConfig, partition, writer);
extractor.extract(extractorContext, connectionConfig, jobConfig, partition);
partition = new GenericJdbcImportPartition();
partition.setConditions("-16 <= ICOL AND ICOL < 17");
extractor.run(context, connectionConfig, jobConfig, partition, writer);
extractor.extract(extractorContext, connectionConfig, jobConfig, partition);
partition = new GenericJdbcImportPartition();
partition.setConditions("17 <= ICOL AND ICOL < 50");
extractor.run(context, connectionConfig, jobConfig, partition, writer);
extractor.extract(extractorContext, connectionConfig, jobConfig, partition);
}
public class DummyWriter extends DataWriter {

View File

@ -27,6 +27,7 @@
import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
import org.apache.sqoop.job.Constants;
import org.apache.sqoop.job.etl.Initializer;
import org.apache.sqoop.job.etl.InitializerContext;
public class TestImportInitializer extends TestCase {
@ -78,9 +79,10 @@ public void testTableName() throws Exception {
jobConf.table.tableName = tableName;
MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context);
Initializer initializer = new GenericJdbcImportInitializer();
initializer.initialize(context, connConf, jobConf);
initializer.initialize(initializerContext, connConf, jobConf);
verifyResult(context,
"SELECT * FROM " + executor.delimitIdentifier(tableName)
@ -102,9 +104,10 @@ public void testTableNameWithTableColumns() throws Exception {
jobConf.table.columns = tableColumns;
MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context);
Initializer initializer = new GenericJdbcImportInitializer();
initializer.initialize(context, connConf, jobConf);
initializer.initialize(initializerContext, connConf, jobConf);
verifyResult(context,
"SELECT ICOL,VCOL FROM " + executor.delimitIdentifier(tableName)
@ -126,9 +129,10 @@ public void testTableSql() throws Exception {
jobConf.table.partitionColumn = "DCOL";
MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context);
Initializer initializer = new GenericJdbcImportInitializer();
initializer.initialize(context, connConf, jobConf);
initializer.initialize(initializerContext, connConf, jobConf);
verifyResult(context,
"SELECT * FROM " + executor.delimitIdentifier(tableName)
@ -151,9 +155,10 @@ public void testTableSqlWithTableColumns() throws Exception {
jobConf.table.partitionColumn = "DCOL";
MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context);
Initializer initializer = new GenericJdbcImportInitializer();
initializer.initialize(context, connConf, jobConf);
initializer.initialize(initializerContext, connConf, jobConf);
verifyResult(context,
"SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL FROM "

View File

@ -30,6 +30,7 @@
import org.apache.sqoop.job.Constants;
import org.apache.sqoop.job.etl.Partition;
import org.apache.sqoop.job.etl.Partitioner;
import org.apache.sqoop.job.etl.PartitionerContext;
public class TestImportPartitioner extends TestCase {
@ -55,7 +56,8 @@ public void testIntegerEvenPartition() throws Exception {
ImportJobConfiguration jobConf = new ImportJobConfiguration();
Partitioner partitioner = new GenericJdbcImportPartitioner();
List<Partition> partitions = partitioner.getPartitions(context, 5, connConf, jobConf);
PartitionerContext partitionerContext = new PartitionerContext(context, 5);
List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
verifyResult(partitions, new String[] {
"-5 <= ICOL AND ICOL < -3",
@ -85,7 +87,8 @@ public void testIntegerUnevenPartition() throws Exception {
ImportJobConfiguration jobConf = new ImportJobConfiguration();
Partitioner partitioner = new GenericJdbcImportPartitioner();
List<Partition> partitions = partitioner.getPartitions(context, 3, connConf, jobConf);
PartitionerContext partitionerContext = new PartitionerContext(context, 3);
List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
verifyResult(partitions, new String[] {
"-5 <= ICOL AND ICOL < -1",
@ -113,7 +116,8 @@ public void testIntegerOverPartition() throws Exception {
ImportJobConfiguration jobConf = new ImportJobConfiguration();
Partitioner partitioner = new GenericJdbcImportPartitioner();
List<Partition> partitions = partitioner.getPartitions(context, 13, connConf, jobConf);
PartitionerContext partitionerContext = new PartitionerContext(context, 13);
List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
verifyResult(partitions, new String[] {
"-5 <= ICOL AND ICOL < -4",
@ -148,7 +152,8 @@ public void testFloatingPointEvenPartition() throws Exception {
ImportJobConfiguration jobConf = new ImportJobConfiguration();
Partitioner partitioner = new GenericJdbcImportPartitioner();
List<Partition> partitions = partitioner.getPartitions(context, 5, connConf, jobConf);
PartitionerContext partitionerContext = new PartitionerContext(context, 5);
List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
verifyResult(partitions, new String[] {
"-5.0 <= DCOL AND DCOL < -3.0",
@ -179,7 +184,8 @@ public void testFloatingPointUnevenPartition() throws Exception {
ImportJobConfiguration jobConf = new ImportJobConfiguration();
Partitioner partitioner = new GenericJdbcImportPartitioner();
List<Partition> partitions = partitioner.getPartitions(context, 3, connConf, jobConf);
PartitionerContext partitionerContext = new PartitionerContext(context, 3);
List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
verifyResult(partitions, new String[] {
"-5.0 <= DCOL AND DCOL < -1.6666666666666665",

View File

@ -28,7 +28,9 @@
import org.apache.sqoop.framework.configuration.ImportJobConfiguration;
import org.apache.sqoop.job.etl.CallbackBase;
import org.apache.sqoop.job.etl.Destroyer;
import org.apache.sqoop.job.etl.DestroyerContext;
import org.apache.sqoop.job.etl.Initializer;
import org.apache.sqoop.job.etl.InitializerContext;
import org.apache.sqoop.model.FormUtils;
import org.apache.sqoop.model.MConnection;
import org.apache.sqoop.model.MConnectionForms;
@ -425,13 +427,16 @@ public MSubmission submit(long jobId) {
"Can't create initializer instance: " + initializerClass.getName());
}
// Initializer context
InitializerContext initializerContext = new InitializerContext(request.getConnectorContext());
// Initialize submission from connector perspective
initializer.initialize(request.getConnectorContext(),
initializer.initialize(initializerContext,
request.getConfigConnectorConnection(),
request.getConfigConnectorJob());
// Add job specific jars to
request.addJars(initializer.getJars(request.getConnectorContext(),
request.addJars(initializer.getJars(initializerContext,
request.getConfigConnectorConnection(),
request.getConfigConnectorJob()));
@ -516,9 +521,10 @@ private void destroySubmission(SubmissionRequest request) {
"Can't create destroyer instance: " + destroyerClass.getName());
}
DestroyerContext destroyerContext = new DestroyerContext(request.getConnectorContext(), false);
// Initialize submission from connector perspective
destroyer.destroy(false, request.getConnectorContext(),
request.getConfigConnectorConnection(), request.getConfigConnectorJob());
destroyer.destroy(destroyerContext, request.getConfigConnectorConnection(), request.getConfigConnectorJob());
}
public MSubmission stop(long jobId) {

View File

@ -36,10 +36,8 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.sqoop.common.ImmutableContext;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.MapreduceExecutionError;
@ -66,13 +64,14 @@ public class HdfsExportPartitioner extends Partitioner {
new HashMap<String, Set<String>>();
@Override
public List<Partition> getPartitions(ImmutableContext context,
long numTasks, Object connectionConfiguration, Object jobConfiguration) {
Configuration conf = ((PrefixContext)context).getConfiguration();
public List<Partition> getPartitions(PartitionerContext context,
Object connectionConfiguration, Object jobConfiguration) {
Configuration conf = ((PrefixContext)context.getContext()).getConfiguration();
try {
long numInputBytes = getInputSize(conf);
maxSplitSize = numInputBytes / numTasks;
maxSplitSize = numInputBytes / context.getMaxPartitions();
long minSizeNode = 0;
long minSizeRack = 0;

View File

@ -25,14 +25,13 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.sqoop.common.ImmutableContext;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.framework.configuration.ConnectionConfiguration;
import org.apache.sqoop.framework.configuration.ExportJobConfiguration;
import org.apache.sqoop.job.MapreduceExecutionError;
import org.apache.sqoop.job.PrefixContext;
import org.apache.sqoop.job.io.Data;
import org.apache.sqoop.job.io.DataWriter;
import org.apache.sqoop.etl.io.DataWriter;
public class HdfsSequenceExportExtractor extends Extractor<ConnectionConfiguration, ExportJobConfiguration, HdfsExportPartition> {
@ -40,7 +39,7 @@ public class HdfsSequenceExportExtractor extends Extractor<ConnectionConfigurati
LogFactory.getLog(HdfsSequenceExportExtractor.class.getName());
private Configuration conf;
private DataWriter datawriter;
private DataWriter dataWriter;
private final char fieldDelimiter;
@ -49,12 +48,12 @@ public HdfsSequenceExportExtractor() {
}
@Override
public void run(ImmutableContext context, ConnectionConfiguration connectionConfiguration,
ExportJobConfiguration jobConfiguration, HdfsExportPartition partition, DataWriter writer) {
writer.setFieldDelimiter(fieldDelimiter);
public void extract(ExtractorContext context, ConnectionConfiguration connectionConfiguration,
ExportJobConfiguration jobConfiguration, HdfsExportPartition partition) {
conf = ((PrefixContext)context).getConfiguration();
datawriter = writer;
conf = ((PrefixContext)context.getContext()).getConfiguration();
dataWriter = context.getDataWriter();
dataWriter.setFieldDelimiter(fieldDelimiter);
try {
LOG.info("Working on partition: " + partition);
@ -84,7 +83,7 @@ private void extractFile(Path file, long start, long length)
Text line = new Text();
boolean hasNext = filereader.next(line);
while (hasNext) {
datawriter.writeCsvRecord(line.toString());
dataWriter.writeCsvRecord(line.toString());
line = new Text();
hasNext = filereader.next(line);
if(filereader.getPosition() >= end && filereader.syncSeen()) {

View File

@ -27,12 +27,11 @@
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.sqoop.common.ImmutableContext;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.MapreduceExecutionError;
import org.apache.sqoop.job.io.Data;
import org.apache.sqoop.job.io.DataReader;
import org.apache.sqoop.etl.io.DataReader;
import org.apache.sqoop.utils.ClassUtils;
public class HdfsSequenceImportLoader extends Loader {
@ -46,13 +45,13 @@ public HdfsSequenceImportLoader() {
}
@Override
public void load(ImmutableContext context, Object oc, Object oj, DataReader reader) throws Exception{
public void load(LoaderContext context, Object oc, Object oj) throws Exception {
DataReader reader = context.getDataReader();
reader.setFieldDelimiter(fieldDelimiter);
Configuration conf = new Configuration();
// Configuration conf = ((EtlContext)context).getConfiguration();
String filename =
context.getString(JobConstants.JOB_MR_OUTPUT_FILE);
String filename = context.getString(JobConstants.JOB_MR_OUTPUT_FILE);
String codecname = context.getString(JobConstants.JOB_MR_OUTPUT_CODEC);
CompressionCodec codec = null;

View File

@ -18,7 +18,6 @@
package org.apache.sqoop.job.etl;
import java.io.IOException;
import java.nio.charset.Charset;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -31,14 +30,13 @@
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.util.LineReader;
import org.apache.sqoop.common.ImmutableContext;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.framework.configuration.ConnectionConfiguration;
import org.apache.sqoop.framework.configuration.ExportJobConfiguration;
import org.apache.sqoop.job.MapreduceExecutionError;
import org.apache.sqoop.job.PrefixContext;
import org.apache.sqoop.job.io.Data;
import org.apache.sqoop.job.io.DataWriter;
import org.apache.sqoop.etl.io.DataWriter;
public class HdfsTextExportExtractor extends Extractor<ConnectionConfiguration, ExportJobConfiguration, HdfsExportPartition> {
@ -46,7 +44,7 @@ public class HdfsTextExportExtractor extends Extractor<ConnectionConfiguration,
LogFactory.getLog(HdfsTextExportExtractor.class.getName());
private Configuration conf;
private DataWriter datawriter;
private DataWriter dataWriter;
private final char fieldDelimiter;
@ -55,15 +53,15 @@ public HdfsTextExportExtractor() {
}
@Override
public void run(ImmutableContext context, ConnectionConfiguration connectionConfiguration,
ExportJobConfiguration jobConfiguration, HdfsExportPartition partition, DataWriter writer) {
writer.setFieldDelimiter(fieldDelimiter);
public void extract(ExtractorContext context, ConnectionConfiguration connectionConfiguration,
ExportJobConfiguration jobConfiguration, HdfsExportPartition partition) {
conf = ((PrefixContext)context).getConfiguration();
datawriter = writer;
conf = ((PrefixContext)context.getContext()).getConfiguration();
dataWriter = context.getDataWriter();
dataWriter.setFieldDelimiter(fieldDelimiter);
try {
HdfsExportPartition p = (HdfsExportPartition)partition;
HdfsExportPartition p = partition;
LOG.info("Working on partition: " + p);
int numFiles = p.getNumberOfFiles();
for (int i=0; i<numFiles; i++) {
@ -120,7 +118,7 @@ private void extractFile(Path file, long start, long length)
} else {
next = fileseeker.getPos();
}
datawriter.writeCsvRecord(line.toString());
dataWriter.writeCsvRecord(line.toString());
}
LOG.info("Extracting ended on position: " + fileseeker.getPos());
}

View File

@ -27,12 +27,11 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.sqoop.common.ImmutableContext;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.MapreduceExecutionError;
import org.apache.sqoop.job.io.Data;
import org.apache.sqoop.job.io.DataReader;
import org.apache.sqoop.etl.io.DataReader;
import org.apache.sqoop.utils.ClassUtils;
public class HdfsTextImportLoader extends Loader {
@ -46,7 +45,8 @@ public HdfsTextImportLoader() {
}
@Override
public void load(ImmutableContext context, Object oc, Object oj, DataReader reader) throws Exception{
public void load(LoaderContext context, Object oc, Object oj) throws Exception{
DataReader reader = context.getDataReader();
reader.setFieldDelimiter(fieldDelimiter);
Configuration conf = new Configuration();

View File

@ -23,6 +23,7 @@
import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.PrefixContext;
import org.apache.sqoop.job.etl.Destroyer;
import org.apache.sqoop.job.etl.DestroyerContext;
import org.apache.sqoop.utils.ClassUtils;
/**
@ -54,8 +55,10 @@ public static void executeDestroyer(boolean success, Configuration configuration
Object configConnection = ConfigurationUtils.getConnectorConnection(configuration);
Object configJob = ConfigurationUtils.getConnectorJob(configuration);
DestroyerContext destroyerContext = new DestroyerContext(subContext, success);
LOG.info("Executing destroyer class " + destroyer.getClass());
destroyer.destroy(success, subContext, configConnection, configJob);
destroyer.destroy(destroyerContext, configConnection, configJob);
}
private SqoopDestroyerExecutor() {

View File

@ -36,6 +36,7 @@
import org.apache.sqoop.job.PrefixContext;
import org.apache.sqoop.job.etl.Partition;
import org.apache.sqoop.job.etl.Partitioner;
import org.apache.sqoop.job.etl.PartitionerContext;
import org.apache.sqoop.utils.ClassUtils;
/**
@ -65,8 +66,9 @@ public List<InputSplit> getSplits(JobContext context)
Object connectorJob = ConfigurationUtils.getConnectorJob(conf);
long maxPartitions = conf.getLong(JobConstants.JOB_ETL_EXTRACTOR_NUM, 10);
PartitionerContext partitionerContext = new PartitionerContext(connectorContext, maxPartitions);
List<Partition> partitions = partitioner.getPartitions(connectorContext, maxPartitions, connectorConnection, connectorJob);
List<Partition> partitions = partitioner.getPartitions(partitionerContext, connectorConnection, connectorJob);
List<InputSplit> splits = new LinkedList<InputSplit>();
for (Partition partition : partitions) {
LOG.debug("Partition: " + partition);

View File

@ -29,8 +29,9 @@
import org.apache.sqoop.job.MapreduceExecutionError;
import org.apache.sqoop.job.PrefixContext;
import org.apache.sqoop.job.etl.Extractor;
import org.apache.sqoop.job.etl.ExtractorContext;
import org.apache.sqoop.job.io.Data;
import org.apache.sqoop.job.io.DataWriter;
import org.apache.sqoop.etl.io.DataWriter;
import org.apache.sqoop.submission.counter.SqoopCounters;
import org.apache.sqoop.utils.ClassUtils;
@ -72,11 +73,11 @@ public void run(Context context) throws IOException, InterruptedException {
}
SqoopSplit split = context.getCurrentKey();
ExtractorContext extractorContext = new ExtractorContext(subContext, new MapDataWriter(context));
try {
LOG.info("Running extractor class " + extractorName);
extractor.run(subContext, configConnection, configJob, split.getPartition(),
new MapDataWriter(context));
extractor.extract(extractorContext, configConnection, configJob, split.getPartition());
LOG.info("Extractor has finished");
context.getCounter(SqoopCounters.ROWS_READ)
.increment(extractor.getRowsRead());

View File

@ -36,8 +36,9 @@
import org.apache.sqoop.job.MapreduceExecutionError;
import org.apache.sqoop.job.PrefixContext;
import org.apache.sqoop.job.etl.Loader;
import org.apache.sqoop.job.etl.LoaderContext;
import org.apache.sqoop.job.io.Data;
import org.apache.sqoop.job.io.DataReader;
import org.apache.sqoop.etl.io.DataReader;
import org.apache.sqoop.utils.ClassUtils;
public class SqoopOutputFormatLoadExecutor {
@ -208,8 +209,11 @@ public void run() {
}
}
// Create loader context
LoaderContext loaderContext = new LoaderContext(subContext, reader);
LOG.info("Running loader class " + loaderName);
loader.load(subContext, configConnection, configJob, reader);
loader.load(loaderContext, configConnection, configJob);
LOG.info("Loader has finished");
} catch (Throwable t) {
readerFinished = true;

View File

@ -33,15 +33,13 @@
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.sqoop.common.ImmutableContext;
import org.apache.sqoop.job.etl.HdfsExportPartitioner;
import org.apache.sqoop.job.etl.HdfsSequenceExportExtractor;
import org.apache.sqoop.job.etl.HdfsSequenceImportLoader;
import org.apache.sqoop.job.etl.HdfsTextExportExtractor;
import org.apache.sqoop.job.etl.Loader;
import org.apache.sqoop.job.etl.LoaderContext;
import org.apache.sqoop.job.io.Data;
import org.apache.sqoop.job.io.DataReader;
import org.apache.sqoop.job.mr.SqoopFileOutputFormat;
import org.junit.Test;
@ -228,12 +226,11 @@ private String padZeros(int number, int digits) {
public static class DummyLoader extends Loader {
@Override
public void load(ImmutableContext context, Object oc, Object oj, DataReader reader)
throws Exception {
public void load(LoaderContext context, Object oc, Object oj) throws Exception {
int index = 1;
int sum = 0;
Object[] array;
while ((array = reader.readArrayRecord()) != null) {
while ((array = context.getDataReader().readArrayRecord()) != null) {
sum += Integer.valueOf(array[0].toString());
index++;
};

View File

@ -34,14 +34,14 @@
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.sqoop.common.ImmutableContext;
import org.apache.sqoop.job.etl.Extractor;
import org.apache.sqoop.job.etl.ExtractorContext;
import org.apache.sqoop.job.etl.HdfsSequenceImportLoader;
import org.apache.sqoop.job.etl.HdfsTextImportLoader;
import org.apache.sqoop.job.etl.Partition;
import org.apache.sqoop.job.etl.Partitioner;
import org.apache.sqoop.job.etl.PartitionerContext;
import org.apache.sqoop.job.io.Data;
import org.apache.sqoop.job.io.DataWriter;
import org.apache.sqoop.job.mr.SqoopFileOutputFormat;
public class TestHdfsLoad extends TestCase {
@ -204,7 +204,7 @@ public String toString() {
public static class DummyPartitioner extends Partitioner {
@Override
public List<Partition> getPartitions(ImmutableContext context, long maxPartitions, Object oc, Object oj) {
public List<Partition> getPartitions(PartitionerContext context, Object oc, Object oj) {
List<Partition> partitions = new LinkedList<Partition>();
for (int id = START_ID; id <= NUMBER_OF_IDS; id++) {
DummyPartition partition = new DummyPartition();
@ -217,7 +217,7 @@ public List<Partition> getPartitions(ImmutableContext context, long maxPartition
public static class DummyExtractor extends Extractor {
@Override
public void run(ImmutableContext context, Object oc, Object oj, Object partition, DataWriter writer) {
public void extract(ExtractorContext context, Object oc, Object oj, Object partition) {
int id = ((DummyPartition)partition).getId();
for (int row = 0; row < NUMBER_OF_ROWS_PER_ID; row++) {
Object[] array = new Object[] {
@ -225,7 +225,7 @@ public void run(ImmutableContext context, Object oc, Object oj, Object partition
(double) (id * NUMBER_OF_ROWS_PER_ID + row),
String.valueOf(id*NUMBER_OF_ROWS_PER_ID+row)
};
writer.writeArrayRecord(array);
context.getDataWriter().writeArrayRecord(array);
}
}

View File

@ -34,14 +34,14 @@
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.sqoop.common.ImmutableContext;
import org.apache.sqoop.job.etl.Extractor;
import org.apache.sqoop.job.etl.ExtractorContext;
import org.apache.sqoop.job.etl.Loader;
import org.apache.sqoop.job.etl.LoaderContext;
import org.apache.sqoop.job.etl.Partition;
import org.apache.sqoop.job.etl.Partitioner;
import org.apache.sqoop.job.etl.PartitionerContext;
import org.apache.sqoop.job.io.Data;
import org.apache.sqoop.job.io.DataReader;
import org.apache.sqoop.job.io.DataWriter;
import org.apache.sqoop.job.mr.SqoopInputFormat;
import org.apache.sqoop.job.mr.SqoopMapper;
import org.apache.sqoop.job.mr.SqoopNullOutputFormat;
@ -120,7 +120,7 @@ public String toString() {
public static class DummyPartitioner extends Partitioner {
@Override
public List<Partition> getPartitions(ImmutableContext context, long maxPartitions, Object oc, Object oj) {
public List<Partition> getPartitions(PartitionerContext context, Object oc, Object oj) {
List<Partition> partitions = new LinkedList<Partition>();
for (int id = START_PARTITION; id <= NUMBER_OF_PARTITIONS; id++) {
DummyPartition partition = new DummyPartition();
@ -133,10 +133,10 @@ public List<Partition> getPartitions(ImmutableContext context, long maxPartition
public static class DummyExtractor extends Extractor {
@Override
public void run(ImmutableContext context, Object oc, Object oj, Object partition, DataWriter writer) {
public void extract(ExtractorContext context, Object oc, Object oj, Object partition) {
int id = ((DummyPartition)partition).getId();
for (int row = 0; row < NUMBER_OF_ROWS_PER_PARTITION; row++) {
writer.writeArrayRecord(new Object[] {
context.getDataWriter().writeArrayRecord(new Object[] {
id * NUMBER_OF_ROWS_PER_PARTITION + row,
(double) (id * NUMBER_OF_ROWS_PER_PARTITION + row),
String.valueOf(id*NUMBER_OF_ROWS_PER_PARTITION+row)});
@ -216,9 +216,9 @@ public static class DummyLoader extends Loader {
private Data actual = new Data();
@Override
public void load(ImmutableContext context, Object oc, Object oj, DataReader reader) throws Exception{
public void load(LoaderContext context, Object oc, Object oj) throws Exception{
Object[] array;
while ((array = reader.readArrayRecord()) != null) {
while ((array = context.getDataReader().readArrayRecord()) != null) {
actual.setContent(array, Data.ARRAY_RECORD);
expected.setContent(new Object[] {

View File

@ -22,12 +22,11 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.sqoop.common.ImmutableContext;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.etl.Loader;
import org.apache.sqoop.job.etl.LoaderContext;
import org.apache.sqoop.job.io.Data;
import org.apache.sqoop.job.io.DataReader;
import org.junit.Before;
import org.junit.Test;
@ -46,9 +45,8 @@ public ThrowingLoader() {
}
@Override
public void load(ImmutableContext context, Object connectionConfiguration,
Object jobConfiguration, DataReader reader) throws Exception {
reader.readContent(Data.CSV_RECORD);
public void load(LoaderContext context, Object cc, Object jc) throws Exception {
context.getDataReader().readContent(Data.CSV_RECORD);
throw new BrokenBarrierException();
}
}
@ -59,12 +57,11 @@ public ThrowingContinuousLoader() {
}
@Override
public void load(ImmutableContext context, Object connectionConfiguration,
Object jobConfiguration, DataReader reader) throws Exception {
public void load(LoaderContext context, Object cc, Object jc) throws Exception {
int runCount = 0;
Object o;
String[] arr;
while ((o = reader.readContent(Data.CSV_RECORD)) != null) {
while ((o = context.getDataReader().readContent(Data.CSV_RECORD)) != null) {
arr = o.toString().split(",");
Assert.assertEquals(100, arr.length);
for (int i = 0; i < arr.length; i++) {
@ -85,9 +82,8 @@ public GoodLoader() {
}
@Override
public void load(ImmutableContext context, Object connectionConfiguration,
Object jobConfiguration, DataReader reader) throws Exception {
String[] arr = reader.readContent(Data.CSV_RECORD).toString().split(",");
public void load(LoaderContext context, Object cc, Object jc) throws Exception {
String[] arr = context.getDataReader().readContent(Data.CSV_RECORD).toString().split(",");
Assert.assertEquals(100, arr.length);
for (int i = 0; i < arr.length; i++) {
Assert.assertEquals(i, Integer.parseInt(arr[i]));
@ -102,12 +98,11 @@ public GoodContinuousLoader() {
}
@Override
public void load(ImmutableContext context, Object connectionConfiguration,
Object jobConfiguration, DataReader reader) throws Exception {
public void load(LoaderContext context, Object cc, Object jc) throws Exception {
int runCount = 0;
Object o;
String[] arr;
while ((o = reader.readContent(Data.CSV_RECORD)) != null) {
while ((o = context.getDataReader().readContent(Data.CSV_RECORD)) != null) {
arr = o.toString().split(",");
Assert.assertEquals(100, arr.length);
for (int i = 0; i < arr.length; i++) {

View File

@ -17,8 +17,6 @@
*/
package org.apache.sqoop.job.etl;
import org.apache.sqoop.common.ImmutableContext;
/**
* This allows connector to define work to complete execution, for example,
* resource cleaning.
@ -28,13 +26,11 @@ public abstract class Destroyer<ConnectionConfiguration, JobConfiguration> {
/**
* Callback to clean up after job execution.
*
* @param success True if the execution was successfull
* @param context Connector context object
* @param context Destroyer context
* @param connectionConfiguration Connection configuration object
* @param jobConfiguration Job configuration object
*/
public abstract void destroy(boolean success,
ImmutableContext context,
public abstract void destroy(DestroyerContext context,
ConnectionConfiguration connectionConfiguration,
JobConfiguration jobConfiguration);

View File

@ -17,31 +17,35 @@
*/
package org.apache.sqoop.job.etl;
import org.apache.sqoop.common.ImmutableContext;
import org.apache.sqoop.job.io.DataWriter;
/**
* This allows connector to extract data from a source system
* based on each partition.
*/
public abstract class Extractor<ConnectionConfiguration, JobConfiguration, Partition> {
public abstract void run(ImmutableContext context,
ConnectionConfiguration connectionConfiguration,
JobConfiguration jobConfiguration,
Partition partition,
DataWriter writer);
/**
* Extract data from source and pass them into the framework.
*
* @param context Extractor context object
* @param connectionConfiguration Connection configuration
* @param jobConfiguration Job configuration
* @param partition Partition that this extract should work on
*/
public abstract void extract(ExtractorContext context,
ConnectionConfiguration connectionConfiguration,
JobConfiguration jobConfiguration,
Partition partition);
/**
* Return the number of rows read by the last call to
* {@linkplain Extractor#run(org.apache.sqoop.common.ImmutableContext, java.lang.Object, java.lang.Object, Partition, org.apache.sqoop.job.io.DataWriter) }
* {@linkplain Extractor#extract(org.apache.sqoop.job.etl.ExtractorContext, java.lang.Object, java.lang.Object, Partition) }
* method. This method returns only the number of rows read in the last call,
* and not a cumulative total of the number of rows read by this Extractor
* since its creation. If no calls were made to the run method, this method's
* behavior is undefined.
*
* @return the number of rows read by the last call to
* {@linkplain Extractor#run(org.apache.sqoop.common.ImmutableContext, java.lang.Object, java.lang.Object, Partition, org.apache.sqoop.job.io.DataWriter) }
* {@linkplain Extractor#extract(org.apache.sqoop.job.etl.ExtractorContext, java.lang.Object, java.lang.Object, Partition) }
*/
public abstract long getRowsRead();

View File

@ -17,9 +17,6 @@
*/
package org.apache.sqoop.job.etl;
import org.apache.sqoop.common.ImmutableContext;
import org.apache.sqoop.common.MutableContext;
import java.util.LinkedList;
import java.util.List;
@ -34,11 +31,11 @@ public abstract class Initializer<ConnectionConfiguration, JobConfiguration> {
* needed temporary values might be saved to context object and they will be
* promoted to all other part of the workflow automatically.
*
* @param context Changeable context object, purely for connector usage
* @param context Initializer context object
* @param connectionConfiguration Connector's connection configuration object
* @param jobConfiguration Connector's job configuration object
*/
public abstract void initialize(MutableContext context,
public abstract void initialize(InitializerContext context,
ConnectionConfiguration connectionConfiguration,
JobConfiguration jobConfiguration);
@ -49,7 +46,7 @@ public abstract void initialize(MutableContext context,
*
* @return
*/
public List<String> getJars(ImmutableContext context,
public List<String> getJars(InitializerContext context,
ConnectionConfiguration connectionConfiguration,
JobConfiguration jobConfiguration) {
return new LinkedList<String>();

View File

@ -17,9 +17,6 @@
*/
package org.apache.sqoop.job.etl;
import org.apache.sqoop.common.ImmutableContext;
import org.apache.sqoop.job.io.DataReader;
/**
* This allows connector to load data into a target system.
*/
@ -28,15 +25,13 @@ public abstract class Loader<ConnectionConfiguration, JobConfiguration> {
/**
* Load data to target.
*
* @param context Context object
* @param context Loader context object
* @param connectionConfiguration Connection configuration
* @param jobConfiguration Job configuration
* @param reader Data reader object
* @throws Exception
*/
public abstract void load(ImmutableContext context,
public abstract void load(LoaderContext context,
ConnectionConfiguration connectionConfiguration,
JobConfiguration jobConfiguration,
DataReader reader) throws Exception;
JobConfiguration jobConfiguration) throws Exception;
}

View File

@ -17,8 +17,6 @@
*/
package org.apache.sqoop.job.etl;
import org.apache.sqoop.common.ImmutableContext;
import java.util.List;
/**
@ -27,8 +25,17 @@
*/
public abstract class Partitioner<ConnectionConfiguration, JobConfiguration> {
public abstract List<Partition> getPartitions(ImmutableContext context,
long maxPartitions,
/**
* Partition input data into partitions.
*
* Each partition will be then processed in separate extractor.
*
* @param context Partitioner context object
* @param connectionConfiguration Connection configuration
* @param jobConfiguration Job configuration
* @return
*/
public abstract List<Partition> getPartitions(PartitionerContext context,
ConnectionConfiguration connectionConfiguration,
JobConfiguration jobConfiguration);