mirror of
https://github.com/apache/sqoop.git
synced 2025-05-06 22:11:40 +08:00
SQOOP-2246: Sqoop2: Use jdbcProperties when creating database connection in GenericJDBCExecutor
(Jarek Jarcec Cecho via Abraham Elmahrek)
This commit is contained in:
parent
d68737ecfd
commit
598607cde6
@ -19,9 +19,11 @@
|
|||||||
|
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
import org.apache.sqoop.common.SqoopException;
|
import org.apache.sqoop.common.SqoopException;
|
||||||
|
import org.apache.sqoop.connector.jdbc.configuration.LinkConfig;
|
||||||
import org.apache.sqoop.error.code.GenericJdbcConnectorError;
|
import org.apache.sqoop.error.code.GenericJdbcConnectorError;
|
||||||
import org.apache.sqoop.schema.Schema;
|
import org.apache.sqoop.schema.Schema;
|
||||||
import org.apache.sqoop.schema.type.Column;
|
import org.apache.sqoop.schema.type.Column;
|
||||||
|
import org.apache.sqoop.utils.ClassUtils;
|
||||||
import org.joda.time.DateTime;
|
import org.joda.time.DateTime;
|
||||||
import org.joda.time.LocalDate;
|
import org.joda.time.LocalDate;
|
||||||
import org.joda.time.LocalTime;
|
import org.joda.time.LocalTime;
|
||||||
@ -35,25 +37,75 @@
|
|||||||
import java.sql.SQLException;
|
import java.sql.SQLException;
|
||||||
import java.sql.Statement;
|
import java.sql.Statement;
|
||||||
import java.sql.Timestamp;
|
import java.sql.Timestamp;
|
||||||
|
import java.util.Properties;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Database executor that is based on top of JDBC spec.
|
||||||
|
*/
|
||||||
public class GenericJdbcExecutor {
|
public class GenericJdbcExecutor {
|
||||||
|
|
||||||
private static final Logger LOG =
|
private static final Logger LOG = Logger.getLogger(GenericJdbcExecutor.class);
|
||||||
Logger.getLogger(GenericJdbcExecutor.class);
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Keys for JDBC properties
|
||||||
|
*
|
||||||
|
* We're following JDBC 4 spec:
|
||||||
|
* http://download.oracle.com/otn-pub/jcp/jdbc-4_1-mrel-spec/jdbc4.1-fr-spec.pdf?AuthParam=1426813649_0155f473b02dbca8bbd417dd061669d7
|
||||||
|
*/
|
||||||
|
public static final String JDBC_PROPERTY_USERNAME = "user";
|
||||||
|
public static final String JDBC_PROPERTY_PASSWORD = "password";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* User configured link with credentials and such
|
||||||
|
*/
|
||||||
|
private LinkConfig linkConfig;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Internal connection object (we'll hold to it)
|
||||||
|
*/
|
||||||
private Connection connection;
|
private Connection connection;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Prepare statement
|
||||||
|
*/
|
||||||
private PreparedStatement preparedStatement;
|
private PreparedStatement preparedStatement;
|
||||||
|
|
||||||
public GenericJdbcExecutor(String driver, String url,
|
public GenericJdbcExecutor(LinkConfig linkConfig) {
|
||||||
String username, String password) {
|
assert linkConfig != null;
|
||||||
|
assert linkConfig.connectionString != null;
|
||||||
|
|
||||||
|
// Persist link configuration for future use
|
||||||
|
this.linkConfig = linkConfig;
|
||||||
|
|
||||||
|
// Load/register the JDBC driver to JVM
|
||||||
|
Class driverClass = ClassUtils.loadClass(linkConfig.jdbcDriver);
|
||||||
|
if(driverClass == null) {
|
||||||
|
throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0000, linkConfig.jdbcDriver);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Properties that we will use for the connection
|
||||||
|
Properties properties = new Properties();
|
||||||
|
if(linkConfig.jdbcProperties != null) {
|
||||||
|
properties.putAll(linkConfig.jdbcProperties);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Propagate username and password to the properties
|
||||||
|
//
|
||||||
|
// DriverManager have two relevant API for us:
|
||||||
|
// * getConnection(url, username, password)
|
||||||
|
// * getConnection(url, properties)
|
||||||
|
// As we have to use properties, we need to use the later
|
||||||
|
// method and hence we have to persist the credentials there.
|
||||||
|
if(linkConfig.username != null) {
|
||||||
|
properties.put(JDBC_PROPERTY_USERNAME, linkConfig.username);
|
||||||
|
}
|
||||||
|
if(linkConfig.password != null) {
|
||||||
|
properties.put(JDBC_PROPERTY_PASSWORD, linkConfig.password);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Finally create the connection
|
||||||
try {
|
try {
|
||||||
Class.forName(driver);
|
connection = DriverManager.getConnection(linkConfig.connectionString, properties);
|
||||||
connection = DriverManager.getConnection(url, username, password);
|
|
||||||
|
|
||||||
} catch (ClassNotFoundException e) {
|
|
||||||
throw new SqoopException(
|
|
||||||
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0000, driver, e);
|
|
||||||
|
|
||||||
} catch (SQLException e) {
|
} catch (SQLException e) {
|
||||||
logSQLException(e);
|
logSQLException(e);
|
||||||
throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0001, e);
|
throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0001, e);
|
||||||
|
@ -41,13 +41,8 @@ public class GenericJdbcExtractor extends Extractor<LinkConfiguration, FromJobCo
|
|||||||
|
|
||||||
private long rowsRead = 0;
|
private long rowsRead = 0;
|
||||||
@Override
|
@Override
|
||||||
public void extract(ExtractorContext context, LinkConfiguration linkConfig,
|
public void extract(ExtractorContext context, LinkConfiguration linkConfig, FromJobConfiguration fromJobConfig, GenericJdbcPartition partition) {
|
||||||
FromJobConfiguration fromJobConfig, GenericJdbcPartition partition) {
|
GenericJdbcExecutor executor = new GenericJdbcExecutor(linkConfig.linkConfig);
|
||||||
String driver = linkConfig.linkConfig.jdbcDriver;
|
|
||||||
String url = linkConfig.linkConfig.connectionString;
|
|
||||||
String username = linkConfig.linkConfig.username;
|
|
||||||
String password = linkConfig.linkConfig.password;
|
|
||||||
GenericJdbcExecutor executor = new GenericJdbcExecutor(driver, url, username, password);
|
|
||||||
|
|
||||||
String query = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL);
|
String query = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL);
|
||||||
String conditions = partition.getConditions();
|
String conditions = partition.getConditions();
|
||||||
|
@ -47,7 +47,8 @@ public class GenericJdbcFromInitializer extends Initializer<LinkConfiguration, F
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void initialize(InitializerContext context, LinkConfiguration linkConfig, FromJobConfiguration fromJobConfig) {
|
public void initialize(InitializerContext context, LinkConfiguration linkConfig, FromJobConfiguration fromJobConfig) {
|
||||||
configureJdbcProperties(context.getContext(), linkConfig, fromJobConfig);
|
executor = new GenericJdbcExecutor(linkConfig.linkConfig);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
configurePartitionProperties(context.getContext(), linkConfig, fromJobConfig);
|
configurePartitionProperties(context.getContext(), linkConfig, fromJobConfig);
|
||||||
configureTableProperties(context.getContext(), linkConfig, fromJobConfig);
|
configureTableProperties(context.getContext(), linkConfig, fromJobConfig);
|
||||||
@ -67,7 +68,7 @@ public Set<String> getJars(InitializerContext context, LinkConfiguration linkCon
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Schema getSchema(InitializerContext context, LinkConfiguration linkConfig, FromJobConfiguration fromJobConfig) {
|
public Schema getSchema(InitializerContext context, LinkConfiguration linkConfig, FromJobConfiguration fromJobConfig) {
|
||||||
configureJdbcProperties(context.getContext(), linkConfig, fromJobConfig);
|
executor = new GenericJdbcExecutor(linkConfig.linkConfig);
|
||||||
|
|
||||||
String schemaName = fromJobConfig.fromJobConfig.tableName;
|
String schemaName = fromJobConfig.fromJobConfig.tableName;
|
||||||
if(schemaName == null) {
|
if(schemaName == null) {
|
||||||
@ -115,18 +116,6 @@ public Schema getSchema(InitializerContext context, LinkConfiguration linkConfig
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void configureJdbcProperties(MutableContext context, LinkConfiguration linkConfig, FromJobConfiguration fromJobConfig) {
|
|
||||||
String driver = linkConfig.linkConfig.jdbcDriver;
|
|
||||||
String url = linkConfig.linkConfig.connectionString;
|
|
||||||
String username = linkConfig.linkConfig.username;
|
|
||||||
String password = linkConfig.linkConfig.password;
|
|
||||||
|
|
||||||
assert driver != null;
|
|
||||||
assert url != null;
|
|
||||||
|
|
||||||
executor = new GenericJdbcExecutor(driver, url, username, password);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void configurePartitionProperties(MutableContext context, LinkConfiguration linkConfig, FromJobConfiguration jobConf) throws SQLException {
|
private void configurePartitionProperties(MutableContext context, LinkConfiguration linkConfig, FromJobConfiguration jobConf) throws SQLException {
|
||||||
// Assertions that should be valid (verified via validator)
|
// Assertions that should be valid (verified via validator)
|
||||||
assert (jobConf.fromJobConfig.tableName != null && jobConf.fromJobConfig.sql == null) ||
|
assert (jobConf.fromJobConfig.tableName != null && jobConf.fromJobConfig.sql == null) ||
|
||||||
|
@ -32,11 +32,7 @@ public class GenericJdbcLoader extends Loader<LinkConfiguration, ToJobConfigurat
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void load(LoaderContext context, LinkConfiguration linkConfig, ToJobConfiguration toJobConfig) throws Exception{
|
public void load(LoaderContext context, LinkConfiguration linkConfig, ToJobConfiguration toJobConfig) throws Exception{
|
||||||
String driver = linkConfig.linkConfig.jdbcDriver;
|
GenericJdbcExecutor executor = new GenericJdbcExecutor(linkConfig.linkConfig);
|
||||||
String url = linkConfig.linkConfig.connectionString;
|
|
||||||
String username = linkConfig.linkConfig.username;
|
|
||||||
String password = linkConfig.linkConfig.password;
|
|
||||||
GenericJdbcExecutor executor = new GenericJdbcExecutor(driver, url, username, password);
|
|
||||||
executor.setAutoCommit(false);
|
executor.setAutoCommit(false);
|
||||||
String sql = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_TO_DATA_SQL);
|
String sql = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_TO_DATA_SQL);
|
||||||
executor.beginBatch(sql);
|
executor.beginBatch(sql);
|
||||||
|
@ -41,13 +41,8 @@ public void destroy(DestroyerContext context, LinkConfiguration linkConfig, ToJo
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void moveDataToDestinationTable(LinkConfiguration linkConfig,
|
private void moveDataToDestinationTable(LinkConfiguration linkConfig, boolean success, String stageTableName, String tableName) {
|
||||||
boolean success, String stageTableName, String tableName) {
|
GenericJdbcExecutor executor = new GenericJdbcExecutor(linkConfig.linkConfig);
|
||||||
GenericJdbcExecutor executor =
|
|
||||||
new GenericJdbcExecutor(linkConfig.linkConfig.jdbcDriver,
|
|
||||||
linkConfig.linkConfig.connectionString,
|
|
||||||
linkConfig.linkConfig.username,
|
|
||||||
linkConfig.linkConfig.password);
|
|
||||||
try {
|
try {
|
||||||
if(success) {
|
if(success) {
|
||||||
LOG.info("Job completed, transferring data from stage fromTable to " +
|
LOG.info("Job completed, transferring data from stage fromTable to " +
|
||||||
|
@ -44,7 +44,7 @@ public class GenericJdbcToInitializer extends Initializer<LinkConfiguration, ToJ
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void initialize(InitializerContext context, LinkConfiguration linkConfig, ToJobConfiguration toJobConfig) {
|
public void initialize(InitializerContext context, LinkConfiguration linkConfig, ToJobConfiguration toJobConfig) {
|
||||||
configureJdbcProperties(context.getContext(), linkConfig, toJobConfig);
|
executor = new GenericJdbcExecutor(linkConfig.linkConfig);
|
||||||
try {
|
try {
|
||||||
configureTableProperties(context.getContext(), linkConfig, toJobConfig);
|
configureTableProperties(context.getContext(), linkConfig, toJobConfig);
|
||||||
} finally {
|
} finally {
|
||||||
@ -61,7 +61,7 @@ public Set<String> getJars(InitializerContext context, LinkConfiguration linkCon
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Schema getSchema(InitializerContext context, LinkConfiguration linkConfig, ToJobConfiguration toJobConfig) {
|
public Schema getSchema(InitializerContext context, LinkConfiguration linkConfig, ToJobConfiguration toJobConfig) {
|
||||||
configureJdbcProperties(context.getContext(), linkConfig, toJobConfig);
|
executor = new GenericJdbcExecutor(linkConfig.linkConfig);
|
||||||
|
|
||||||
String schemaName = toJobConfig.toJobConfig.tableName;
|
String schemaName = toJobConfig.toJobConfig.tableName;
|
||||||
|
|
||||||
@ -109,18 +109,6 @@ public Schema getSchema(InitializerContext context, LinkConfiguration linkConfig
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void configureJdbcProperties(MutableContext context, LinkConfiguration linkConfig, ToJobConfiguration toJobConfig) {
|
|
||||||
String driver = linkConfig.linkConfig.jdbcDriver;
|
|
||||||
String url = linkConfig.linkConfig.connectionString;
|
|
||||||
String username = linkConfig.linkConfig.username;
|
|
||||||
String password = linkConfig.linkConfig.password;
|
|
||||||
|
|
||||||
assert driver != null;
|
|
||||||
assert url != null;
|
|
||||||
|
|
||||||
executor = new GenericJdbcExecutor(driver, url, username, password);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void configureTableProperties(MutableContext context, LinkConfiguration linkConfig, ToJobConfiguration toJobConfig) {
|
private void configureTableProperties(MutableContext context, LinkConfiguration linkConfig, ToJobConfiguration toJobConfig) {
|
||||||
String dataSql;
|
String dataSql;
|
||||||
|
|
||||||
|
@ -17,6 +17,8 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.sqoop.connector.jdbc;
|
package org.apache.sqoop.connector.jdbc;
|
||||||
|
|
||||||
|
import org.apache.sqoop.common.SqoopException;
|
||||||
|
import org.apache.sqoop.connector.jdbc.configuration.LinkConfig;
|
||||||
import org.testng.annotations.BeforeMethod;
|
import org.testng.annotations.BeforeMethod;
|
||||||
import org.testng.annotations.Test;
|
import org.testng.annotations.Test;
|
||||||
|
|
||||||
@ -33,8 +35,7 @@ public class GenericJdbcExecutorTest {
|
|||||||
public GenericJdbcExecutorTest() {
|
public GenericJdbcExecutorTest() {
|
||||||
table = getClass().getSimpleName().toUpperCase();
|
table = getClass().getSimpleName().toUpperCase();
|
||||||
emptyTable = table + "_EMPTY";
|
emptyTable = table + "_EMPTY";
|
||||||
executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER,
|
executor = new GenericJdbcExecutor(GenericJdbcTestConstants.LINK_CONFIG);
|
||||||
GenericJdbcTestConstants.URL, null, null);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@BeforeMethod(alwaysRun = true)
|
@BeforeMethod(alwaysRun = true)
|
||||||
@ -59,6 +60,15 @@ public void setUp() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(expectedExceptions = SqoopException.class)
|
||||||
|
public void testUnknownDriver() {
|
||||||
|
LinkConfig linkConfig = new LinkConfig();
|
||||||
|
linkConfig.jdbcDriver = "net.jarcec.driver.MyAwesomeDatabase";
|
||||||
|
linkConfig.connectionString = "jdbc:awesome:";
|
||||||
|
|
||||||
|
new GenericJdbcExecutor(linkConfig);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDeleteTableData() throws Exception {
|
public void testDeleteTableData() throws Exception {
|
||||||
executor.deleteTableData(table);
|
executor.deleteTableData(table);
|
||||||
|
@ -17,9 +17,26 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.sqoop.connector.jdbc;
|
package org.apache.sqoop.connector.jdbc;
|
||||||
|
|
||||||
|
import org.apache.sqoop.connector.jdbc.configuration.LinkConfig;
|
||||||
|
|
||||||
public class GenericJdbcTestConstants {
|
public class GenericJdbcTestConstants {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Testing Driver
|
||||||
|
*/
|
||||||
public static final String DRIVER = "org.apache.derby.jdbc.EmbeddedDriver";
|
public static final String DRIVER = "org.apache.derby.jdbc.EmbeddedDriver";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Testing database (in memory derby)
|
||||||
|
*/
|
||||||
public static final String URL = "jdbc:derby:memory:TESTDB;create=true";
|
public static final String URL = "jdbc:derby:memory:TESTDB;create=true";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test link configuration
|
||||||
|
*/
|
||||||
|
public static final LinkConfig LINK_CONFIG = new LinkConfig();
|
||||||
|
static {
|
||||||
|
LINK_CONFIG.jdbcDriver = DRIVER;
|
||||||
|
LINK_CONFIG.connectionString = URL;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -59,8 +59,7 @@ public TestExtractor() {
|
|||||||
|
|
||||||
@BeforeMethod(alwaysRun = true)
|
@BeforeMethod(alwaysRun = true)
|
||||||
public void setUp() {
|
public void setUp() {
|
||||||
executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER,
|
executor = new GenericJdbcExecutor(GenericJdbcTestConstants.LINK_CONFIG);
|
||||||
GenericJdbcTestConstants.URL, null, null);
|
|
||||||
|
|
||||||
if (!executor.existTable(tableName)) {
|
if (!executor.existTable(tableName)) {
|
||||||
executor.executeUpdate("CREATE TABLE "
|
executor.executeUpdate("CREATE TABLE "
|
||||||
|
@ -61,8 +61,7 @@ public TestFromInitializer() {
|
|||||||
|
|
||||||
@BeforeMethod(alwaysRun = true)
|
@BeforeMethod(alwaysRun = true)
|
||||||
public void setUp() {
|
public void setUp() {
|
||||||
executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER,
|
executor = new GenericJdbcExecutor(GenericJdbcTestConstants.LINK_CONFIG);
|
||||||
GenericJdbcTestConstants.URL, null, null);
|
|
||||||
|
|
||||||
String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
|
String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
|
||||||
if (!executor.existTable(tableName)) {
|
if (!executor.existTable(tableName)) {
|
||||||
|
@ -69,8 +69,7 @@ public TestLoader(int numberOfRows) {
|
|||||||
|
|
||||||
@BeforeMethod(alwaysRun = true)
|
@BeforeMethod(alwaysRun = true)
|
||||||
public void setUp() {
|
public void setUp() {
|
||||||
executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER,
|
executor = new GenericJdbcExecutor(GenericJdbcTestConstants.LINK_CONFIG);
|
||||||
GenericJdbcTestConstants.URL, null, null);
|
|
||||||
|
|
||||||
if (!executor.existTable(tableName)) {
|
if (!executor.existTable(tableName)) {
|
||||||
executor.executeUpdate("CREATE TABLE "
|
executor.executeUpdate("CREATE TABLE "
|
||||||
|
@ -59,8 +59,7 @@ public TestToInitializer() {
|
|||||||
|
|
||||||
@BeforeMethod(alwaysRun = true)
|
@BeforeMethod(alwaysRun = true)
|
||||||
public void setUp() {
|
public void setUp() {
|
||||||
executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER,
|
executor = new GenericJdbcExecutor(GenericJdbcTestConstants.LINK_CONFIG);
|
||||||
GenericJdbcTestConstants.URL, null, null);
|
|
||||||
|
|
||||||
String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
|
String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
|
||||||
if (!executor.existTable(tableName)) {
|
if (!executor.existTable(tableName)) {
|
||||||
|
Loading…
Reference in New Issue
Block a user