5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-06 21:23:03 +08:00

SQOOP-2306: Sqoop2: Use LinkConfiguration in Executor rather then LinkConfig

(Jarek Jarcec Cecho via Abraham Elmahrek)
This commit is contained in:
Abraham Elmahrek 2015-04-20 17:38:32 -07:00
parent 9f05b4a5cf
commit ec5dfc42f9
12 changed files with 38 additions and 35 deletions

View File

@ -20,6 +20,7 @@
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.jdbc.configuration.LinkConfig; import org.apache.sqoop.connector.jdbc.configuration.LinkConfig;
import org.apache.sqoop.connector.jdbc.configuration.LinkConfiguration;
import org.apache.sqoop.error.code.GenericJdbcConnectorError; import org.apache.sqoop.error.code.GenericJdbcConnectorError;
import org.apache.sqoop.schema.Schema; import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.schema.type.Column; import org.apache.sqoop.schema.type.Column;
@ -58,7 +59,7 @@ public class GenericJdbcExecutor {
/** /**
* User configured link with credentials and such * User configured link with credentials and such
*/ */
private LinkConfig linkConfig; private LinkConfiguration link;
/** /**
* Internal connection object (we'll hold to it) * Internal connection object (we'll hold to it)
@ -70,23 +71,24 @@ public class GenericJdbcExecutor {
*/ */
private PreparedStatement preparedStatement; private PreparedStatement preparedStatement;
public GenericJdbcExecutor(LinkConfig linkConfig) { public GenericJdbcExecutor(LinkConfiguration linkConfig) {
assert linkConfig != null;
assert linkConfig.connectionString != null;
// Persist link configuration for future use // Persist link configuration for future use
this.linkConfig = linkConfig; this.link = linkConfig;
assert link != null;
assert link.linkConfig != null;
assert link.linkConfig.connectionString != null;
// Load/register the JDBC driver to JVM // Load/register the JDBC driver to JVM
Class driverClass = ClassUtils.loadClass(linkConfig.jdbcDriver); Class driverClass = ClassUtils.loadClass(link.linkConfig.jdbcDriver);
if(driverClass == null) { if(driverClass == null) {
throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0000, linkConfig.jdbcDriver); throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0000, link.linkConfig.jdbcDriver);
} }
// Properties that we will use for the connection // Properties that we will use for the connection
Properties properties = new Properties(); Properties properties = new Properties();
if(linkConfig.jdbcProperties != null) { if(link.linkConfig.jdbcProperties != null) {
properties.putAll(linkConfig.jdbcProperties); properties.putAll(link.linkConfig.jdbcProperties);
} }
// Propagate username and password to the properties // Propagate username and password to the properties
@ -96,16 +98,16 @@ public GenericJdbcExecutor(LinkConfig linkConfig) {
// * getConnection(url, properties) // * getConnection(url, properties)
// As we have to use properties, we need to use the later // As we have to use properties, we need to use the later
// method and hence we have to persist the credentials there. // method and hence we have to persist the credentials there.
if(linkConfig.username != null) { if(link.linkConfig.username != null) {
properties.put(JDBC_PROPERTY_USERNAME, linkConfig.username); properties.put(JDBC_PROPERTY_USERNAME, link.linkConfig.username);
} }
if(linkConfig.password != null) { if(link.linkConfig.password != null) {
properties.put(JDBC_PROPERTY_PASSWORD, linkConfig.password); properties.put(JDBC_PROPERTY_PASSWORD, link.linkConfig.password);
} }
// Finally create the connection // Finally create the connection
try { try {
connection = DriverManager.getConnection(linkConfig.connectionString, properties); connection = DriverManager.getConnection(link.linkConfig.connectionString, properties);
} catch (SQLException e) { } catch (SQLException e) {
logSQLException(e); logSQLException(e);
throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0001, e); throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0001, e);

View File

@ -42,7 +42,7 @@ public class GenericJdbcExtractor extends Extractor<LinkConfiguration, FromJobCo
private long rowsRead = 0; private long rowsRead = 0;
@Override @Override
public void extract(ExtractorContext context, LinkConfiguration linkConfig, FromJobConfiguration fromJobConfig, GenericJdbcPartition partition) { public void extract(ExtractorContext context, LinkConfiguration linkConfig, FromJobConfiguration fromJobConfig, GenericJdbcPartition partition) {
GenericJdbcExecutor executor = new GenericJdbcExecutor(linkConfig.linkConfig); GenericJdbcExecutor executor = new GenericJdbcExecutor(linkConfig);
String query = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL); String query = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL);
String conditions = partition.getConditions(); String conditions = partition.getConditions();

View File

@ -47,7 +47,7 @@ public class GenericJdbcFromInitializer extends Initializer<LinkConfiguration, F
@Override @Override
public void initialize(InitializerContext context, LinkConfiguration linkConfig, FromJobConfiguration fromJobConfig) { public void initialize(InitializerContext context, LinkConfiguration linkConfig, FromJobConfiguration fromJobConfig) {
executor = new GenericJdbcExecutor(linkConfig.linkConfig); executor = new GenericJdbcExecutor(linkConfig);
try { try {
configurePartitionProperties(context.getContext(), linkConfig, fromJobConfig); configurePartitionProperties(context.getContext(), linkConfig, fromJobConfig);
@ -68,7 +68,7 @@ public Set<String> getJars(InitializerContext context, LinkConfiguration linkCon
@Override @Override
public Schema getSchema(InitializerContext context, LinkConfiguration linkConfig, FromJobConfiguration fromJobConfig) { public Schema getSchema(InitializerContext context, LinkConfiguration linkConfig, FromJobConfiguration fromJobConfig) {
executor = new GenericJdbcExecutor(linkConfig.linkConfig); executor = new GenericJdbcExecutor(linkConfig);
String schemaName = fromJobConfig.fromJobConfig.tableName; String schemaName = fromJobConfig.fromJobConfig.tableName;
if(schemaName == null) { if(schemaName == null) {

View File

@ -32,7 +32,7 @@ public class GenericJdbcLoader extends Loader<LinkConfiguration, ToJobConfigurat
@Override @Override
public void load(LoaderContext context, LinkConfiguration linkConfig, ToJobConfiguration toJobConfig) throws Exception{ public void load(LoaderContext context, LinkConfiguration linkConfig, ToJobConfiguration toJobConfig) throws Exception{
GenericJdbcExecutor executor = new GenericJdbcExecutor(linkConfig.linkConfig); GenericJdbcExecutor executor = new GenericJdbcExecutor(linkConfig);
executor.setAutoCommit(false); executor.setAutoCommit(false);
String sql = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_TO_DATA_SQL); String sql = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_TO_DATA_SQL);
executor.beginBatch(sql); executor.beginBatch(sql);

View File

@ -42,7 +42,7 @@ public void destroy(DestroyerContext context, LinkConfiguration linkConfig, ToJo
} }
private void moveDataToDestinationTable(LinkConfiguration linkConfig, boolean success, String stageTableName, String tableName) { private void moveDataToDestinationTable(LinkConfiguration linkConfig, boolean success, String stageTableName, String tableName) {
GenericJdbcExecutor executor = new GenericJdbcExecutor(linkConfig.linkConfig); GenericJdbcExecutor executor = new GenericJdbcExecutor(linkConfig);
try { try {
if(success) { if(success) {
LOG.info("Job completed, transferring data from stage fromTable to " + LOG.info("Job completed, transferring data from stage fromTable to " +

View File

@ -44,7 +44,7 @@ public class GenericJdbcToInitializer extends Initializer<LinkConfiguration, ToJ
@Override @Override
public void initialize(InitializerContext context, LinkConfiguration linkConfig, ToJobConfiguration toJobConfig) { public void initialize(InitializerContext context, LinkConfiguration linkConfig, ToJobConfiguration toJobConfig) {
executor = new GenericJdbcExecutor(linkConfig.linkConfig); executor = new GenericJdbcExecutor(linkConfig);
try { try {
configureTableProperties(context.getContext(), linkConfig, toJobConfig); configureTableProperties(context.getContext(), linkConfig, toJobConfig);
} finally { } finally {
@ -61,7 +61,7 @@ public Set<String> getJars(InitializerContext context, LinkConfiguration linkCon
@Override @Override
public Schema getSchema(InitializerContext context, LinkConfiguration linkConfig, ToJobConfiguration toJobConfig) { public Schema getSchema(InitializerContext context, LinkConfiguration linkConfig, ToJobConfiguration toJobConfig) {
executor = new GenericJdbcExecutor(linkConfig.linkConfig); executor = new GenericJdbcExecutor(linkConfig);
String schemaName = toJobConfig.toJobConfig.tableName; String schemaName = toJobConfig.toJobConfig.tableName;

View File

@ -19,6 +19,7 @@
import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.jdbc.configuration.LinkConfig; import org.apache.sqoop.connector.jdbc.configuration.LinkConfig;
import org.apache.sqoop.connector.jdbc.configuration.LinkConfiguration;
import org.testng.annotations.BeforeMethod; import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test; import org.testng.annotations.Test;
@ -35,7 +36,7 @@ public class GenericJdbcExecutorTest {
public GenericJdbcExecutorTest() { public GenericJdbcExecutorTest() {
table = getClass().getSimpleName().toUpperCase(); table = getClass().getSimpleName().toUpperCase();
emptyTable = table + "_EMPTY"; emptyTable = table + "_EMPTY";
executor = new GenericJdbcExecutor(GenericJdbcTestConstants.LINK_CONFIG); executor = new GenericJdbcExecutor(GenericJdbcTestConstants.LINK_CONFIGURATION);
} }
@BeforeMethod(alwaysRun = true) @BeforeMethod(alwaysRun = true)
@ -62,11 +63,11 @@ public void setUp() {
@Test(expectedExceptions = SqoopException.class) @Test(expectedExceptions = SqoopException.class)
public void testUnknownDriver() { public void testUnknownDriver() {
LinkConfig linkConfig = new LinkConfig(); LinkConfiguration link = new LinkConfiguration();
linkConfig.jdbcDriver = "net.jarcec.driver.MyAwesomeDatabase"; link.linkConfig.jdbcDriver = "net.jarcec.driver.MyAwesomeDatabase";
linkConfig.connectionString = "jdbc:awesome:"; link.linkConfig.connectionString = "jdbc:awesome:";
new GenericJdbcExecutor(linkConfig); new GenericJdbcExecutor(link);
} }
@Test @Test

View File

@ -17,7 +17,7 @@
*/ */
package org.apache.sqoop.connector.jdbc; package org.apache.sqoop.connector.jdbc;
import org.apache.sqoop.connector.jdbc.configuration.LinkConfig; import org.apache.sqoop.connector.jdbc.configuration.LinkConfiguration;
public class GenericJdbcTestConstants { public class GenericJdbcTestConstants {
@ -34,9 +34,9 @@ public class GenericJdbcTestConstants {
/** /**
* Test link configuration * Test link configuration
*/ */
public static final LinkConfig LINK_CONFIG = new LinkConfig(); public static final LinkConfiguration LINK_CONFIGURATION = new LinkConfiguration();
static { static {
LINK_CONFIG.jdbcDriver = DRIVER; LINK_CONFIGURATION.linkConfig.jdbcDriver = DRIVER;
LINK_CONFIG.connectionString = URL; LINK_CONFIGURATION.linkConfig.connectionString = URL;
} }
} }

View File

@ -59,7 +59,7 @@ public TestExtractor() {
@BeforeMethod(alwaysRun = true) @BeforeMethod(alwaysRun = true)
public void setUp() { public void setUp() {
executor = new GenericJdbcExecutor(GenericJdbcTestConstants.LINK_CONFIG); executor = new GenericJdbcExecutor(GenericJdbcTestConstants.LINK_CONFIGURATION);
if (!executor.existTable(tableName)) { if (!executor.existTable(tableName)) {
executor.executeUpdate("CREATE TABLE " executor.executeUpdate("CREATE TABLE "

View File

@ -61,7 +61,7 @@ public TestFromInitializer() {
@BeforeMethod(alwaysRun = true) @BeforeMethod(alwaysRun = true)
public void setUp() { public void setUp() {
executor = new GenericJdbcExecutor(GenericJdbcTestConstants.LINK_CONFIG); executor = new GenericJdbcExecutor(GenericJdbcTestConstants.LINK_CONFIGURATION);
String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName); String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
if (!executor.existTable(tableName)) { if (!executor.existTable(tableName)) {

View File

@ -69,7 +69,7 @@ public TestLoader(int numberOfRows) {
@BeforeMethod(alwaysRun = true) @BeforeMethod(alwaysRun = true)
public void setUp() { public void setUp() {
executor = new GenericJdbcExecutor(GenericJdbcTestConstants.LINK_CONFIG); executor = new GenericJdbcExecutor(GenericJdbcTestConstants.LINK_CONFIGURATION);
if (!executor.existTable(tableName)) { if (!executor.existTable(tableName)) {
executor.executeUpdate("CREATE TABLE " executor.executeUpdate("CREATE TABLE "

View File

@ -59,7 +59,7 @@ public TestToInitializer() {
@BeforeMethod(alwaysRun = true) @BeforeMethod(alwaysRun = true)
public void setUp() { public void setUp() {
executor = new GenericJdbcExecutor(GenericJdbcTestConstants.LINK_CONFIG); executor = new GenericJdbcExecutor(GenericJdbcTestConstants.LINK_CONFIGURATION);
String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName); String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
if (!executor.existTable(tableName)) { if (!executor.existTable(tableName)) {