mirror of
https://github.com/apache/sqoop.git
synced 2025-05-08 05:31:41 +08:00
SQOOP-908: Create MySQL and PostgreSQL database providers
(Jarcec Cecho via Cheolsoo Park)
This commit is contained in:
parent
2642b66cfe
commit
f64dba2e01
12
pom.xml
12
pom.xml
@ -106,6 +106,8 @@ limitations under the License.
|
|||||||
<tomcat.major.version>6</tomcat.major.version>
|
<tomcat.major.version>6</tomcat.major.version>
|
||||||
<tomcat.minor.version>0.36</tomcat.minor.version>
|
<tomcat.minor.version>0.36</tomcat.minor.version>
|
||||||
<tomcat.version>${tomcat.major.version}.${tomcat.minor.version}</tomcat.version>
|
<tomcat.version>${tomcat.major.version}.${tomcat.minor.version}</tomcat.version>
|
||||||
|
<jdbc.mysql.version>5.1.23</jdbc.mysql.version>
|
||||||
|
<jdbc.postgresql.version>9.1-901.jdbc4</jdbc.postgresql.version>
|
||||||
</properties>
|
</properties>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
@ -350,6 +352,16 @@ limitations under the License.
|
|||||||
<artifactId>cargo-core-container-tomcat</artifactId>
|
<artifactId>cargo-core-container-tomcat</artifactId>
|
||||||
<version>${cargo.version}</version>
|
<version>${cargo.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>mysql</groupId>
|
||||||
|
<artifactId>mysql-connector-java</artifactId>
|
||||||
|
<version>${jdbc.mysql.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>postgresql</groupId>
|
||||||
|
<artifactId>postgresql</artifactId>
|
||||||
|
<version>${jdbc.postgresql.version}</version>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
</dependencyManagement>
|
</dependencyManagement>
|
||||||
|
|
||||||
|
10
test/pom.xml
10
test/pom.xml
@ -83,6 +83,16 @@ limitations under the License.
|
|||||||
<artifactId>derbyclient</artifactId>
|
<artifactId>derbyclient</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>mysql</groupId>
|
||||||
|
<artifactId>mysql-connector-java</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>postgresql</groupId>
|
||||||
|
<artifactId>postgresql</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<!-- Add classifier name to the JAR name -->
|
<!-- Add classifier name to the JAR name -->
|
||||||
|
@ -104,10 +104,18 @@ public String nullConstant() {
|
|||||||
return "NULL";
|
return "NULL";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getJdbcDriver() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Start the handler.
|
* Start the handler.
|
||||||
*/
|
*/
|
||||||
public void start() {
|
public void start() {
|
||||||
|
if(getJdbcDriver() != null) {
|
||||||
|
loadClass(getJdbcDriver());
|
||||||
|
}
|
||||||
|
|
||||||
// Create connection to the database server
|
// Create connection to the database server
|
||||||
try {
|
try {
|
||||||
setConnection(DriverManager.getConnection(getConnectionUrl(), getConnectionUsername(), getConnectionPassword()));
|
setConnection(DriverManager.getConnection(getConnectionUrl(), getConnectionUsername(), getConnectionPassword()));
|
||||||
|
@ -0,0 +1,39 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.sqoop.test.db;
|
||||||
|
|
||||||
|
import java.util.Properties;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create database provider.
|
||||||
|
*/
|
||||||
|
public class DatabaseProviderFactory {
|
||||||
|
|
||||||
|
public static final String PROVIDER_PROPERTY = "sqoop.provider.class";
|
||||||
|
|
||||||
|
public static DatabaseProvider getProvider(Properties properties) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
|
||||||
|
String className = properties.getProperty(PROVIDER_PROPERTY);
|
||||||
|
if(className == null) {
|
||||||
|
return new DerbyProvider();
|
||||||
|
}
|
||||||
|
|
||||||
|
Class klass = Class.forName(className);
|
||||||
|
return (DatabaseProvider)klass.newInstance();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -46,8 +46,6 @@ public void start() {
|
|||||||
throw new RuntimeException("Can't derby server", e);
|
throw new RuntimeException("Can't derby server", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load JDBC driver and create connection
|
|
||||||
loadClass(DRIVER);
|
|
||||||
super.start();
|
super.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -82,6 +80,11 @@ public String escape(String entity) {
|
|||||||
return "\"" + entity + "\"";
|
return "\"" + entity + "\"";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getJdbcDriver() {
|
||||||
|
return DRIVER;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getConnectionUrl() {
|
public String getConnectionUrl() {
|
||||||
return "jdbc:derby://localhost:1527/memory:sqoop;create=true";
|
return "jdbc:derby://localhost:1527/memory:sqoop;create=true";
|
||||||
|
@ -0,0 +1,83 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.sqoop.test.db;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* MySQL Provider that will connect to remote MySQL server.
|
||||||
|
*
|
||||||
|
* JDBC can be configured via system properties. Default value is server running
|
||||||
|
* on the same box (localhost) that is access via sqoop/sqoop credentials.
|
||||||
|
*/
|
||||||
|
public class MySQLProvider extends DatabaseProvider {
|
||||||
|
|
||||||
|
public static final String DRIVER = "com.mysql.jdbc.Driver";
|
||||||
|
|
||||||
|
private static final String CONNECTION = System.getProperties().getProperty(
|
||||||
|
"sqoop.provider.mysql.jdbc",
|
||||||
|
"jdbc:mysql://localhost/test"
|
||||||
|
);
|
||||||
|
|
||||||
|
private static final String USERNAME = System.getProperties().getProperty(
|
||||||
|
"sqoop.provider.mysql.username",
|
||||||
|
"sqoop"
|
||||||
|
);
|
||||||
|
|
||||||
|
private static final String PASSWORD = System.getProperties().getProperty(
|
||||||
|
"sqoop.provider.mysql.password",
|
||||||
|
"sqoop"
|
||||||
|
);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getConnectionUrl() {
|
||||||
|
return CONNECTION;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getConnectionUsername() {
|
||||||
|
return USERNAME;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getConnectionPassword() {
|
||||||
|
return PASSWORD;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String escapeColumnName(String columnName) {
|
||||||
|
return escape(columnName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String escapeTableName(String tableName) {
|
||||||
|
return escape(tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String escapeValueString(String value) {
|
||||||
|
return "\"" + value + "\"";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getJdbcDriver() {
|
||||||
|
return DRIVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String escape(String entity) {
|
||||||
|
return "`" + entity + "`";
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,83 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.sqoop.test.db;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* PostgreSQL Provider that will connect to remote PostgreSQL server.
|
||||||
|
*
|
||||||
|
* JDBC can be configured via system properties. Default value is server running
|
||||||
|
* on the same box (localhost) that is access via sqoop/sqoop credentials.
|
||||||
|
*/
|
||||||
|
public class PostgreSQLProvider extends DatabaseProvider {
|
||||||
|
|
||||||
|
public static final String DRIVER = "org.postgresql.Driver";
|
||||||
|
|
||||||
|
private static final String CONNECTION = System.getProperties().getProperty(
|
||||||
|
"sqoop.provider.postgresql.jdbc",
|
||||||
|
"jdbc:postgresql://localhost/test"
|
||||||
|
);
|
||||||
|
|
||||||
|
private static final String USERNAME = System.getProperties().getProperty(
|
||||||
|
"sqoop.provider.postgresql.username",
|
||||||
|
"sqoop"
|
||||||
|
);
|
||||||
|
|
||||||
|
private static final String PASSWORD = System.getProperties().getProperty(
|
||||||
|
"sqoop.provider.postgresql.password",
|
||||||
|
"sqoop"
|
||||||
|
);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getConnectionUrl() {
|
||||||
|
return CONNECTION;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getConnectionUsername() {
|
||||||
|
return USERNAME;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getConnectionPassword() {
|
||||||
|
return PASSWORD;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String escapeColumnName(String columnName) {
|
||||||
|
return escape(columnName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String escapeTableName(String tableName) {
|
||||||
|
return escape(tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String escapeValueString(String value) {
|
||||||
|
return "'" + value + "'";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getJdbcDriver() {
|
||||||
|
return DRIVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String escape(String entity) {
|
||||||
|
return "\"" + entity + "\"";
|
||||||
|
}
|
||||||
|
}
|
@ -93,6 +93,8 @@ public void start() throws Exception {
|
|||||||
jar.contains("jackson-") || // Jackson
|
jar.contains("jackson-") || // Jackson
|
||||||
jar.contains("derby") || // Derby drivers
|
jar.contains("derby") || // Derby drivers
|
||||||
jar.contains("avro-") || // Avro
|
jar.contains("avro-") || // Avro
|
||||||
|
jar.contains("mysql") || // MySQL JDBC driver
|
||||||
|
jar.contains("postgre") || // PostgreSQL JDBC driver
|
||||||
jar.contains("google") // Google libraries (guava, ...)
|
jar.contains("google") // Google libraries (guava, ...)
|
||||||
) {
|
) {
|
||||||
extraClassPath.add(jar);
|
extraClassPath.add(jar);
|
||||||
|
@ -63,7 +63,7 @@ abstract public class TomcatTestCase {
|
|||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void startServer() throws Exception {
|
public void startServer() throws Exception {
|
||||||
cluster = new TomcatSqoopMiniCluster(TMP_PATH);
|
cluster = new TomcatSqoopMiniCluster(getTemporaryPath());
|
||||||
cluster.start();
|
cluster.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -72,6 +72,10 @@ public void stopServer() throws Exception {
|
|||||||
cluster.stop();
|
cluster.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getTemporaryPath() {
|
||||||
|
return TMP_PATH;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return testing server URL
|
* Return testing server URL
|
||||||
*
|
*
|
||||||
@ -87,7 +91,7 @@ public String getServerUrl() {
|
|||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
public String getMapreduceDirectory() {
|
public String getMapreduceDirectory() {
|
||||||
return cluster.getTemporaryPath() + "/mapreduce-job-io";
|
return getTemporaryPath() + "/mapreduce-job-io";
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -17,9 +17,10 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.sqoop.integration.connector;
|
package org.apache.sqoop.integration.connector;
|
||||||
|
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
import org.apache.sqoop.integration.TomcatTestCase;
|
import org.apache.sqoop.integration.TomcatTestCase;
|
||||||
import org.apache.sqoop.test.db.DatabaseProvider;
|
import org.apache.sqoop.test.db.DatabaseProvider;
|
||||||
import org.apache.sqoop.test.db.DerbyProvider;
|
import org.apache.sqoop.test.db.DatabaseProviderFactory;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
|
||||||
@ -30,11 +31,14 @@
|
|||||||
*/
|
*/
|
||||||
abstract public class ConnectorTestCase extends TomcatTestCase {
|
abstract public class ConnectorTestCase extends TomcatTestCase {
|
||||||
|
|
||||||
|
private static final Logger LOG = Logger.getLogger(ConnectorTestCase.class);
|
||||||
|
|
||||||
protected DatabaseProvider provider;
|
protected DatabaseProvider provider;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void startProvider() {
|
public void startProvider() throws Exception {
|
||||||
provider = new DerbyProvider();
|
provider = DatabaseProviderFactory.getProvider(System.getProperties());
|
||||||
|
LOG.info("Starting database provider: " + provider.getClass().getName());
|
||||||
provider.start();
|
provider.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -36,12 +36,13 @@
|
|||||||
import org.apache.sqoop.model.MJob;
|
import org.apache.sqoop.model.MJob;
|
||||||
import org.apache.sqoop.model.MStringInput;
|
import org.apache.sqoop.model.MStringInput;
|
||||||
import org.apache.sqoop.model.MSubmission;
|
import org.apache.sqoop.model.MSubmission;
|
||||||
import org.apache.sqoop.test.db.DerbyProvider;
|
|
||||||
import org.apache.sqoop.validation.Status;
|
import org.apache.sqoop.validation.Status;
|
||||||
|
import org.apache.sqoop.validation.Validation;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
@ -82,8 +83,10 @@ public void testBasicTableImport() throws Exception {
|
|||||||
framework.getConnectionForms());
|
framework.getConnectionForms());
|
||||||
|
|
||||||
// Connector values
|
// Connector values
|
||||||
((MStringInput) (connection.getConnectorPart().getForms().get(0).getInputs().get(0))).setValue(DerbyProvider.DRIVER);
|
((MStringInput) (connection.getConnectorPart().getForms().get(0).getInputs().get(0))).setValue(provider.getJdbcDriver());
|
||||||
((MStringInput) (connection.getConnectorPart().getForms().get(0).getInputs().get(1))).setValue(provider.getConnectionUrl());
|
((MStringInput) (connection.getConnectorPart().getForms().get(0).getInputs().get(1))).setValue(provider.getConnectionUrl());
|
||||||
|
((MStringInput) (connection.getConnectorPart().getForms().get(0).getInputs().get(2))).setValue(provider.getConnectionUsername());
|
||||||
|
((MStringInput) (connection.getConnectorPart().getForms().get(0).getInputs().get(3))).setValue(provider.getConnectionPassword());
|
||||||
// Framework values
|
// Framework values
|
||||||
// No need to set anything
|
// No need to set anything
|
||||||
|
|
||||||
@ -124,7 +127,7 @@ public void testBasicTableImport() throws Exception {
|
|||||||
|
|
||||||
// Wait until the job finish
|
// Wait until the job finish
|
||||||
do {
|
do {
|
||||||
Thread.sleep(1000);
|
Thread.sleep(5000);
|
||||||
submission = submissionRequest.read(getServerUrl(), "" + job.getPersistenceId()).getSubmission();
|
submission = submissionRequest.read(getServerUrl(), "" + job.getPersistenceId()).getSubmission();
|
||||||
} while(submission.getStatus().isRunning());
|
} while(submission.getStatus().isRunning());
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user