5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-04 21:51:00 +08:00

SQOOP-882: Sqoop2 integration: Auxiliary classes for various database support

(Jarcec Cecho via Cheolsoo Park)
This commit is contained in:
Cheolsoo Park 2013-02-23 19:28:48 -08:00
parent df3a266c6c
commit b0ac2e4e4b
9 changed files with 703 additions and 4 deletions

10
pom.xml
View File

@ -335,6 +335,16 @@ limitations under the License.
<artifactId>derby</artifactId>
<version>${derby.version}</version>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derbynet</artifactId>
<version>${derby.version}</version>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derbyclient</artifactId>
<version>${derby.version}</version>
</dependency>
<dependency>
<groupId>org.codehaus.cargo</groupId>
<artifactId>cargo-core-container-tomcat</artifactId>

View File

@ -73,6 +73,16 @@ limitations under the License.
<artifactId>cargo-core-container-tomcat</artifactId>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derbynet</artifactId>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derbyclient</artifactId>
</dependency>
</dependencies>
<!-- Add classifier name to the JAR name -->

View File

@ -0,0 +1,274 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.test.db;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.LinkedList;
import java.util.List;
/**
* Database provider for testing purpose.
*
* Provider contains all methods needed to bootstrap and run the tests on remote
* databases. This is abstract implementation that is database agnostic. Each
* supported database server have it's own concrete implementation that fills
* the gaps in database differences.
*/
abstract public class DatabaseProvider {
private static final Logger LOG = Logger.getLogger(DatabaseProvider.class);
/**
* Internal connection to the database.
*/
private Connection connection;
/**
* JDBC Url to the remote database system.
*
* This will be passed to the Sqoop2 server during tests.
*
* @return String
*/
abstract public String getConnectionUrl();
/**
* Connection username.
*
* This will be passed to the Sqoop2 server during tests.
*
* @return String
*/
abstract public String getConnectionUsername();
/**
* Connection password.
*
* This will be passed to the Sqoop2 server during tests.
*
* @return String
*/
abstract public String getConnectionPassword();
/**
* Escape column name based on specific database requirements.
*
* @param columnName Column name
* @return Escaped column name
*/
abstract public String escapeColumnName(String columnName);
/**
* Escape table name based on specific database requirements.
*
* @param tableName Table name
* @return Escaped table name
*/
abstract public String escapeTableName(String tableName);
/**
* Escape string value that can be safely used in the queries.
*
* @param value String value
* @return Escaped string value
*/
abstract public String escapeValueString(String value);
/**
* String constant that can be used to denote null (unknown) value.
*
* @return String encoding null value
*/
public String nullConstant() {
return "NULL";
}
/**
* Start the handler.
*/
public void start() {
// Create connection to the database server
try {
setConnection(DriverManager.getConnection(getConnectionUrl(), getConnectionUsername(), getConnectionPassword()));
} catch (SQLException e) {
LOG.error("Can't create connection", e);
throw new RuntimeException("Can't create connection", e);
}
}
/**
* Stop the handler.
*/
public void stop() {
// Close connection to the database server
if(connection != null) {
try {
connection.close();
} catch (SQLException e) {
LOG.info("Ignored exception on closing connection", e);
}
}
}
/**
* Return connection to the database.
*
* @return
*/
public Connection getConnection() {
return connection;
}
/**
* Set connection to a new object.
*
* @param connection New connection object
*/
protected void setConnection(Connection connection) {
this.connection = connection;
}
/**
* Execute DDL or DML query.
*
* This method will throw RuntimeException on failure.
*
* @param query DDL or DML query.
*/
public void executeUpdate(String query) {
LOG.info("Executing query: " + query);
Statement stmt = null;
try {
stmt = connection.createStatement();
stmt.executeUpdate(query);
} catch (SQLException e) {
LOG.error("Error in executing query", e);
throw new RuntimeException("Error in executing query", e);
} finally {
try {
if(stmt != null) {
stmt.close();
}
} catch (SQLException e) {
LOG.info("Cant' close statement", e);
}
}
}
/**
* Create new table.
*
* @param name Table name
* @param primaryKey Primary key column(0) or null if table should not have any
* @param columns List of double values column name and value for example ... "id", "varchar(50)"...
*/
public void createTable(String name, String primaryKey, String ...columns) {
// Columns are in form of two strings - name and type
if(columns.length == 0 || columns.length % 2 != 0) {
throw new RuntimeException("Incorrect number of parameters.");
}
// Drop the table in case that it already exists
dropTable(name);
StringBuilder sb = new StringBuilder("CREATE TABLE ");
sb.append(escapeTableName(name)).append("(");
// Column list
List<String> columnList = new LinkedList<String>();
for(int i = 0; i < columns.length; i += 2) {
String column = escapeColumnName(columns[i]) + " " + columns[i + 1];
columnList.add(column);
}
sb.append(StringUtils.join(columnList, ", "));
if(primaryKey != null) {
sb.append(", PRIMARY KEY(").append(escapeColumnName(primaryKey)).append(")");
}
sb.append(")");
executeUpdate(sb.toString());
}
/**
* Insert new row into the table.
*
* @param tableName Table name
* @param values List of objects that should be inserted
*/
public void insertRow(String tableName, Object ...values) {
StringBuilder sb = new StringBuilder("INSERT INTO ");
sb.append(escapeTableName(tableName));
sb.append(" VALUES (");
List<String> valueList = new LinkedList<String>();
for(Object value : values) {
if(value == null) {
valueList.add(nullConstant());
} else if(value.getClass() == String.class) {
valueList.add(escapeValueString((String)value));
} else {
valueList.add(value.toString());
}
}
sb.append(StringUtils.join(valueList, ", "));
sb.append(")");
executeUpdate(sb.toString());
}
/**
* Drop table.
*
* Any exceptions will be ignored.
*
* @param tableName
*/
public void dropTable(String tableName) {
StringBuilder sb = new StringBuilder("DROP TABLE ");
sb.append(escapeTableName(tableName));
try {
executeUpdate(sb.toString());
} catch(RuntimeException e) {
LOG.info("Ignoring exception: " + e);
}
}
/**
* Load class.
*
* @param className Class name
*/
public void loadClass(String className) {
try {
Class.forName(className);
} catch (ClassNotFoundException e) {
throw new RuntimeException("Class not found: " + className, e);
}
}
}

View File

@ -0,0 +1,99 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.test.db;
import org.apache.log4j.Logger;
import org.apache.derby.drda.NetworkServerControl;
import java.net.InetAddress;
/**
* Implementation of database provider that is based on embedded derby server.
*
* This provider will work out of the box without any extra configuration.
*/
public class DerbyProvider extends DatabaseProvider {
private static final Logger LOG = Logger.getLogger(DerbyProvider.class);
public static final String DRIVER = "org.apache.derby.jdbc.ClientDriver";
NetworkServerControl server = null;
@Override
public void start() {
// Start embedded server
try {
server = new NetworkServerControl(InetAddress.getByName("localhost"), 1527);
server.start(null);
} catch (Exception e) {
LOG.error("Can't start Derby network server", e);
throw new RuntimeException("Can't derby server", e);
}
// Load JDBC driver and create connection
loadClass(DRIVER);
super.start();
}
@Override
public void stop() {
super.stop();
// Shutdown embedded server
try {
server.shutdown();
} catch (Exception e) {
LOG.info("Can't shut down embedded server", e);
}
}
@Override
public String escapeColumnName(String columnName) {
return escape(columnName);
}
@Override
public String escapeTableName(String tableName) {
return escape(tableName);
}
@Override
public String escapeValueString(String value) {
return "'" + value + "'";
}
public String escape(String entity) {
return "\"" + entity + "\"";
}
@Override
public String getConnectionUrl() {
return "jdbc:derby://localhost:1527/memory:sqoop;create=true";
}
@Override
public String getConnectionUsername() {
return null;
}
@Override
public String getConnectionPassword() {
return null;
}
}

View File

@ -86,12 +86,13 @@ public void start() throws Exception {
List<String> extraClassPath = new LinkedList<String>();
String []classpath = System.getProperty("java.class.path").split(":");
for(String jar : classpath) {
System.out.println("JAR: " + jar);
if(jar.contains("hadoop-") || // Hadoop jars
jar.contains("commons-") || // Apache Commons libraries
jar.contains("log4j-") || // Log4j
jar.contains("slf4j-") || // Slf4j
jar.contains("jackson-") || // Jackson
jar.contains("derby") || // Derby drivers
jar.contains("avro-") || // Avro
jar.contains("google") // Google libraries (guava, ...)
) {
extraClassPath.add(jar);
@ -129,7 +130,7 @@ protected Map<String, String> getLoggerConfiguration() {
properties.put("org.apache.sqoop.log4j.appender.file.MaxFileSize", "25MB");
properties.put("org.apache.sqoop.log4j.appender.file.MaxBackupIndex", "5");
properties.put("org.apache.sqoop.log4j.appender.file.layout", "org.apache.log4j.PatternLayout");
properties.put("org.apache.sqoop.log4j.appender.file.layout.ConversionPattern", "%d{ISO8601} %-5p %c{2} [%l] %m%n");
properties.put("org.apache.sqoop.log4j.appender.file.layout.ConversionPattern", "%d{ISO8601} %-5p %c{2} [%l] %m%n\\n");
properties.put("org.apache.sqoop.log4j.debug", "true");
properties.put("org.apache.sqoop.log4j.rootCategory", "WARN, file");
properties.put("org.apache.sqoop.log4j.category.org.apache.sqoop", "DEBUG");

View File

@ -17,16 +17,33 @@
*/
package org.apache.sqoop.integration;
import org.apache.log4j.Logger;
import org.apache.commons.lang.StringUtils;
import org.apache.sqoop.test.minicluster.TomcatSqoopMiniCluster;
import org.junit.After;
import org.junit.Before;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.FilenameFilter;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import static org.junit.Assert.fail;
/**
* Basic test case that will bootstrap Sqoop server running in external Tomcat
* process.
*/
abstract public class TomcatTestCase {
private static final Logger LOG = Logger.getLogger(TomcatTestCase.class);
/**
* Temporary path that will be used for this test.
*
@ -45,13 +62,13 @@ abstract public class TomcatTestCase {
private TomcatSqoopMiniCluster cluster;
@Before
public void setUp() throws Exception {
public void startServer() throws Exception {
cluster = new TomcatSqoopMiniCluster(TMP_PATH);
cluster.start();
}
@After
public void cleanUp() throws Exception {
public void stopServer() throws Exception {
cluster.stop();
}
@ -63,4 +80,65 @@ public void cleanUp() throws Exception {
public String getServerUrl() {
return cluster.getServerUrl();
}
/**
* Get input/output directory for mapreduce job.
*
* @return
*/
public String getMapreduceDirectory() {
return cluster.getTemporaryPath() + "/mapreduce-job-io";
}
/**
* Return list of file names that are outputs of mapreduce job.
*
* @return
*/
public String[] getOutputFilesMapreduce() {
File dir = new File(getMapreduceDirectory());
return dir.list(new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
return name.startsWith("part-");
}
});
}
/**
* Assert that mapreduce has generated following lines.
*
* As the lines can be spread between multiple files the ordering do not make
* a difference.
*
* @param lines
* @throws IOException
*/
protected void assertMapreduceOutput(String... lines) throws IOException {
Set<String> setLines = new HashSet<String>(Arrays.asList(lines));
List<String> notFound = new LinkedList<String>();
String []files = getOutputFilesMapreduce();
for(String file : files) {
String filePath = getMapreduceDirectory() + "/" + file;
BufferedReader br = new BufferedReader(new FileReader((filePath)));
String line;
while ((line = br.readLine()) != null) {
if (!setLines.remove(line)) {
notFound.add(line);
}
}
br.close();
}
if(!setLines.isEmpty() || !notFound.isEmpty()) {
LOG.error("Expected lines that weren't present in the files:");
LOG.error("\t" + StringUtils.join(setLines, "\n\t"));
LOG.error("Extra lines in files that weren't expected:");
LOG.error("\t" + StringUtils.join(notFound, "\n\t"));
fail("Output do not match expectations.");
}
}
}

View File

@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.integration.connector;
import org.apache.sqoop.integration.TomcatTestCase;
import org.apache.sqoop.test.db.DatabaseProvider;
import org.apache.sqoop.test.db.DerbyProvider;
import org.junit.After;
import org.junit.Before;
/**
* Base test case for connector testing.
*
* It will create and initialize database provider prior every test execution.
*/
abstract public class ConnectorTestCase extends TomcatTestCase {
protected DatabaseProvider provider;
@Before
public void startProvider() {
provider = new DerbyProvider();
provider.start();
}
@After
public void stopProvider() {
provider.stop();
}
public String getTableName() {
return getClass().getSimpleName();
}
protected void createTable(String primaryKey, String ...columns) {
provider.createTable(getTableName(), primaryKey, columns);
}
protected void dropTable() {
provider.dropTable(getTableName());
}
protected void insertRow(Object ...values) {
provider.insertRow(getTableName(), values);
}
}

View File

@ -0,0 +1,142 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.integration.connector.jdbc.generic;
import org.apache.log4j.Logger;
import org.apache.sqoop.client.request.ConnectionRequest;
import org.apache.sqoop.client.request.ConnectorRequest;
import org.apache.sqoop.client.request.FrameworkRequest;
import org.apache.sqoop.client.request.JobRequest;
import org.apache.sqoop.client.request.SubmissionRequest;
import org.apache.sqoop.framework.configuration.OutputFormat;
import org.apache.sqoop.framework.configuration.StorageType;
import org.apache.sqoop.integration.connector.ConnectorTestCase;
import org.apache.sqoop.json.ConnectorBean;
import org.apache.sqoop.json.FrameworkBean;
import org.apache.sqoop.json.ValidationBean;
import org.apache.sqoop.model.MConnection;
import org.apache.sqoop.model.MConnector;
import org.apache.sqoop.model.MEnumInput;
import org.apache.sqoop.model.MFramework;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MStringInput;
import org.apache.sqoop.model.MSubmission;
import org.apache.sqoop.test.db.DerbyProvider;
import org.apache.sqoop.validation.Status;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/**
* Proof of concept implementation of first "real" integration test.
*
* Will be improved when client API will be created.
*/
public class TableImportTest extends ConnectorTestCase {
private static final Logger LOG = Logger.getLogger(TableImportTest.class);
/**
* This test is proof of concept.
*
* It will be refactored once we will create reasonable client interface.
*/
@Test
public void testBasicTableImport() throws Exception {
createTable("id",
"id", "int",
"txt", "varchar(50)"
);
insertRow(1, "San Francisco");
insertRow(2, "Sunnyvale");
insertRow(3, "Brno");
// Connection creation and job submission will be refactored once
// the client API for embedding Sqoop client will be ready.
// Connection creation
FrameworkBean frameworkBean = (new FrameworkRequest()).read(getServerUrl());
ConnectorBean connectorBean = (new ConnectorRequest()).read(getServerUrl(), "1");
MFramework framework = frameworkBean.getFramework();
MConnector connector = connectorBean.getConnectors().get(0);
MConnection connection = new MConnection(connector.getPersistenceId(),
connector.getConnectionForms(),
framework.getConnectionForms());
// Connector values
((MStringInput) (connection.getConnectorPart().getForms().get(0).getInputs().get(0))).setValue(DerbyProvider.DRIVER);
((MStringInput) (connection.getConnectorPart().getForms().get(0).getInputs().get(1))).setValue(provider.getConnectionUrl());
// Framework values
// No need to set anything
ValidationBean validationBean = (new ConnectionRequest()).create(getServerUrl(), connection);
assertEquals(Status.FINE, validationBean.getConnectorValidation().getStatus());
assertEquals(Status.FINE, validationBean.getFrameworkValidation().getStatus());
assertNotNull(validationBean.getId());
connection.setPersistenceId(validationBean.getId());
// Job creation
MJob job = new MJob(
connector.getPersistenceId(),
connection.getPersistenceId(),
MJob.Type.IMPORT,
connector.getJobForms(MJob.Type.IMPORT),
framework.getJobForms(MJob.Type.IMPORT)
);
// Connector values
((MStringInput) (job.getConnectorPart().getForms().get(0).getInputs().get(0))).setValue(provider.escapeTableName(getTableName()));
((MStringInput) (job.getConnectorPart().getForms().get(0).getInputs().get(3))).setValue(provider.escapeColumnName("id"));
// Framework values
((MEnumInput) (job.getFrameworkPart().getForms().get(0).getInputs().get(0))).setValue(StorageType.HDFS.toString());
((MEnumInput) (job.getFrameworkPart().getForms().get(0).getInputs().get(1))).setValue(OutputFormat.TEXT_FILE.toString());
((MStringInput) (job.getFrameworkPart().getForms().get(0).getInputs().get(2))).setValue(getMapreduceDirectory());
validationBean = (new JobRequest()).create(getServerUrl(), job);
assertEquals(Status.FINE, validationBean.getConnectorValidation().getStatus());
assertEquals(Status.FINE, validationBean.getFrameworkValidation().getStatus());
assertNotNull(validationBean.getId());
job.setPersistenceId(validationBean.getId());
SubmissionRequest submissionRequest = new SubmissionRequest();
MSubmission submission = submissionRequest.create(getServerUrl(), "" + job.getPersistenceId()).getSubmission();
assertTrue(submission.getStatus().isRunning());
// Wait until the job finish
do {
Thread.sleep(1000);
submission = submissionRequest.read(getServerUrl(), "" + job.getPersistenceId()).getSubmission();
} while(submission.getStatus().isRunning());
// Assert correct output
assertMapreduceOutput(
"1,'San Francisco'",
"2,'Sunnyvale'",
"3,'Brno'"
);
// Clean up testing table
dropTable();
}
}

View File

@ -0,0 +1,24 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Set root logger level to DEBUG and its only appender to A1.
log4j.rootLogger=DEBUG, A1
# A1 is set to be a ConsoleAppender.
log4j.appender.A1=org.apache.log4j.ConsoleAppender
# A1 uses PatternLayout.
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n