diff --git a/pom.xml b/pom.xml index 5e1b43b6..b5694730 100644 --- a/pom.xml +++ b/pom.xml @@ -335,6 +335,16 @@ limitations under the License. derby ${derby.version} + + org.apache.derby + derbynet + ${derby.version} + + + org.apache.derby + derbyclient + ${derby.version} + org.codehaus.cargo cargo-core-container-tomcat diff --git a/test/pom.xml b/test/pom.xml index 66382b6f..fe47ff75 100644 --- a/test/pom.xml +++ b/test/pom.xml @@ -73,6 +73,16 @@ limitations under the License. cargo-core-container-tomcat + + org.apache.derby + derbynet + + + + org.apache.derby + derbyclient + + diff --git a/test/src/main/java/org/apache/sqoop/test/db/DatabaseProvider.java b/test/src/main/java/org/apache/sqoop/test/db/DatabaseProvider.java new file mode 100644 index 00000000..364ff610 --- /dev/null +++ b/test/src/main/java/org/apache/sqoop/test/db/DatabaseProvider.java @@ -0,0 +1,274 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.test.db; + +import org.apache.commons.lang.StringUtils; +import org.apache.log4j.Logger; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.LinkedList; +import java.util.List; + +/** + * Database provider for testing purpose. + * + * Provider contains all methods needed to bootstrap and run the tests on remote + * databases. This is abstract implementation that is database agnostic. Each + * supported database server have it's own concrete implementation that fills + * the gaps in database differences. + */ +abstract public class DatabaseProvider { + + private static final Logger LOG = Logger.getLogger(DatabaseProvider.class); + + /** + * Internal connection to the database. + */ + private Connection connection; + + /** + * JDBC Url to the remote database system. + * + * This will be passed to the Sqoop2 server during tests. + * + * @return String + */ + abstract public String getConnectionUrl(); + + /** + * Connection username. + * + * This will be passed to the Sqoop2 server during tests. + * + * @return String + */ + abstract public String getConnectionUsername(); + + /** + * Connection password. + * + * This will be passed to the Sqoop2 server during tests. + * + * @return String + */ + abstract public String getConnectionPassword(); + + /** + * Escape column name based on specific database requirements. + * + * @param columnName Column name + * @return Escaped column name + */ + abstract public String escapeColumnName(String columnName); + + /** + * Escape table name based on specific database requirements. + * + * @param tableName Table name + * @return Escaped table name + */ + abstract public String escapeTableName(String tableName); + + /** + * Escape string value that can be safely used in the queries. + * + * @param value String value + * @return Escaped string value + */ + abstract public String escapeValueString(String value); + + /** + * String constant that can be used to denote null (unknown) value. + * + * @return String encoding null value + */ + public String nullConstant() { + return "NULL"; + } + + /** + * Start the handler. + */ + public void start() { + // Create connection to the database server + try { + setConnection(DriverManager.getConnection(getConnectionUrl(), getConnectionUsername(), getConnectionPassword())); + } catch (SQLException e) { + LOG.error("Can't create connection", e); + throw new RuntimeException("Can't create connection", e); + } + } + + /** + * Stop the handler. + */ + public void stop() { + // Close connection to the database server + if(connection != null) { + try { + connection.close(); + } catch (SQLException e) { + LOG.info("Ignored exception on closing connection", e); + } + } + } + + /** + * Return connection to the database. + * + * @return + */ + public Connection getConnection() { + return connection; + } + + /** + * Set connection to a new object. + * + * @param connection New connection object + */ + protected void setConnection(Connection connection) { + this.connection = connection; + } + + /** + * Execute DDL or DML query. + * + * This method will throw RuntimeException on failure. + * + * @param query DDL or DML query. + */ + public void executeUpdate(String query) { + LOG.info("Executing query: " + query); + Statement stmt = null; + + try { + stmt = connection.createStatement(); + stmt.executeUpdate(query); + } catch (SQLException e) { + LOG.error("Error in executing query", e); + throw new RuntimeException("Error in executing query", e); + } finally { + try { + if(stmt != null) { + stmt.close(); + } + } catch (SQLException e) { + LOG.info("Cant' close statement", e); + } + } + } + + /** + * Create new table. + * + * @param name Table name + * @param primaryKey Primary key column(0) or null if table should not have any + * @param columns List of double values column name and value for example ... "id", "varchar(50)"... + */ + public void createTable(String name, String primaryKey, String ...columns) { + // Columns are in form of two strings - name and type + if(columns.length == 0 || columns.length % 2 != 0) { + throw new RuntimeException("Incorrect number of parameters."); + } + + // Drop the table in case that it already exists + dropTable(name); + + StringBuilder sb = new StringBuilder("CREATE TABLE "); + sb.append(escapeTableName(name)).append("("); + + // Column list + List columnList = new LinkedList(); + for(int i = 0; i < columns.length; i += 2) { + String column = escapeColumnName(columns[i]) + " " + columns[i + 1]; + columnList.add(column); + } + sb.append(StringUtils.join(columnList, ", ")); + + if(primaryKey != null) { + sb.append(", PRIMARY KEY(").append(escapeColumnName(primaryKey)).append(")"); + } + + sb.append(")"); + + executeUpdate(sb.toString()); + } + + /** + * Insert new row into the table. + * + * @param tableName Table name + * @param values List of objects that should be inserted + */ + public void insertRow(String tableName, Object ...values) { + StringBuilder sb = new StringBuilder("INSERT INTO "); + sb.append(escapeTableName(tableName)); + sb.append(" VALUES ("); + + List valueList = new LinkedList(); + for(Object value : values) { + if(value == null) { + valueList.add(nullConstant()); + } else if(value.getClass() == String.class) { + valueList.add(escapeValueString((String)value)); + } else { + valueList.add(value.toString()); + } + } + + sb.append(StringUtils.join(valueList, ", ")); + sb.append(")"); + + executeUpdate(sb.toString()); + } + + /** + * Drop table. + * + * Any exceptions will be ignored. + * + * @param tableName + */ + public void dropTable(String tableName) { + StringBuilder sb = new StringBuilder("DROP TABLE "); + sb.append(escapeTableName(tableName)); + + try { + executeUpdate(sb.toString()); + } catch(RuntimeException e) { + LOG.info("Ignoring exception: " + e); + } + } + + /** + * Load class. + * + * @param className Class name + */ + public void loadClass(String className) { + try { + Class.forName(className); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Class not found: " + className, e); + } + } +} diff --git a/test/src/main/java/org/apache/sqoop/test/db/DerbyProvider.java b/test/src/main/java/org/apache/sqoop/test/db/DerbyProvider.java new file mode 100644 index 00000000..402fab97 --- /dev/null +++ b/test/src/main/java/org/apache/sqoop/test/db/DerbyProvider.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.test.db; + +import org.apache.log4j.Logger; +import org.apache.derby.drda.NetworkServerControl; + +import java.net.InetAddress; + +/** + * Implementation of database provider that is based on embedded derby server. + * + * This provider will work out of the box without any extra configuration. + */ +public class DerbyProvider extends DatabaseProvider { + + private static final Logger LOG = Logger.getLogger(DerbyProvider.class); + + public static final String DRIVER = "org.apache.derby.jdbc.ClientDriver"; + + NetworkServerControl server = null; + + @Override + public void start() { + // Start embedded server + try { + server = new NetworkServerControl(InetAddress.getByName("localhost"), 1527); + server.start(null); + } catch (Exception e) { + LOG.error("Can't start Derby network server", e); + throw new RuntimeException("Can't derby server", e); + } + + // Load JDBC driver and create connection + loadClass(DRIVER); + super.start(); + } + + @Override + public void stop() { + super.stop(); + + // Shutdown embedded server + try { + server.shutdown(); + } catch (Exception e) { + LOG.info("Can't shut down embedded server", e); + } + } + + @Override + public String escapeColumnName(String columnName) { + return escape(columnName); + } + + @Override + public String escapeTableName(String tableName) { + return escape(tableName); + } + + @Override + public String escapeValueString(String value) { + return "'" + value + "'"; + } + + public String escape(String entity) { + return "\"" + entity + "\""; + } + + @Override + public String getConnectionUrl() { + return "jdbc:derby://localhost:1527/memory:sqoop;create=true"; + } + + @Override + public String getConnectionUsername() { + return null; + } + + @Override + public String getConnectionPassword() { + return null; + } +} diff --git a/test/src/main/java/org/apache/sqoop/test/minicluster/TomcatSqoopMiniCluster.java b/test/src/main/java/org/apache/sqoop/test/minicluster/TomcatSqoopMiniCluster.java index 5fa294a2..ed6c5966 100644 --- a/test/src/main/java/org/apache/sqoop/test/minicluster/TomcatSqoopMiniCluster.java +++ b/test/src/main/java/org/apache/sqoop/test/minicluster/TomcatSqoopMiniCluster.java @@ -86,12 +86,13 @@ public void start() throws Exception { List extraClassPath = new LinkedList(); String []classpath = System.getProperty("java.class.path").split(":"); for(String jar : classpath) { - System.out.println("JAR: " + jar); if(jar.contains("hadoop-") || // Hadoop jars jar.contains("commons-") || // Apache Commons libraries jar.contains("log4j-") || // Log4j jar.contains("slf4j-") || // Slf4j jar.contains("jackson-") || // Jackson + jar.contains("derby") || // Derby drivers + jar.contains("avro-") || // Avro jar.contains("google") // Google libraries (guava, ...) ) { extraClassPath.add(jar); @@ -129,7 +130,7 @@ protected Map getLoggerConfiguration() { properties.put("org.apache.sqoop.log4j.appender.file.MaxFileSize", "25MB"); properties.put("org.apache.sqoop.log4j.appender.file.MaxBackupIndex", "5"); properties.put("org.apache.sqoop.log4j.appender.file.layout", "org.apache.log4j.PatternLayout"); - properties.put("org.apache.sqoop.log4j.appender.file.layout.ConversionPattern", "%d{ISO8601} %-5p %c{2} [%l] %m%n"); + properties.put("org.apache.sqoop.log4j.appender.file.layout.ConversionPattern", "%d{ISO8601} %-5p %c{2} [%l] %m%n\\n"); properties.put("org.apache.sqoop.log4j.debug", "true"); properties.put("org.apache.sqoop.log4j.rootCategory", "WARN, file"); properties.put("org.apache.sqoop.log4j.category.org.apache.sqoop", "DEBUG"); diff --git a/test/src/test/java/org/apache/sqoop/integration/TomcatTestCase.java b/test/src/test/java/org/apache/sqoop/integration/TomcatTestCase.java index eacf3045..7c8a9781 100644 --- a/test/src/test/java/org/apache/sqoop/integration/TomcatTestCase.java +++ b/test/src/test/java/org/apache/sqoop/integration/TomcatTestCase.java @@ -17,16 +17,33 @@ */ package org.apache.sqoop.integration; +import org.apache.log4j.Logger; +import org.apache.commons.lang.StringUtils; import org.apache.sqoop.test.minicluster.TomcatSqoopMiniCluster; import org.junit.After; import org.junit.Before; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.FilenameFilter; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; + +import static org.junit.Assert.fail; + /** * Basic test case that will bootstrap Sqoop server running in external Tomcat * process. */ abstract public class TomcatTestCase { + private static final Logger LOG = Logger.getLogger(TomcatTestCase.class); + /** * Temporary path that will be used for this test. * @@ -45,13 +62,13 @@ abstract public class TomcatTestCase { private TomcatSqoopMiniCluster cluster; @Before - public void setUp() throws Exception { + public void startServer() throws Exception { cluster = new TomcatSqoopMiniCluster(TMP_PATH); cluster.start(); } @After - public void cleanUp() throws Exception { + public void stopServer() throws Exception { cluster.stop(); } @@ -63,4 +80,65 @@ public void cleanUp() throws Exception { public String getServerUrl() { return cluster.getServerUrl(); } + + /** + * Get input/output directory for mapreduce job. + * + * @return + */ + public String getMapreduceDirectory() { + return cluster.getTemporaryPath() + "/mapreduce-job-io"; + } + + /** + * Return list of file names that are outputs of mapreduce job. + * + * @return + */ + public String[] getOutputFilesMapreduce() { + File dir = new File(getMapreduceDirectory()); + return dir.list(new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + return name.startsWith("part-"); + } + }); + } + + /** + * Assert that mapreduce has generated following lines. + * + * As the lines can be spread between multiple files the ordering do not make + * a difference. + * + * @param lines + * @throws IOException + */ + protected void assertMapreduceOutput(String... lines) throws IOException { + Set setLines = new HashSet(Arrays.asList(lines)); + List notFound = new LinkedList(); + + String []files = getOutputFilesMapreduce(); + + for(String file : files) { + String filePath = getMapreduceDirectory() + "/" + file; + BufferedReader br = new BufferedReader(new FileReader((filePath))); + + String line; + while ((line = br.readLine()) != null) { + if (!setLines.remove(line)) { + notFound.add(line); + } + } + br.close(); + } + + if(!setLines.isEmpty() || !notFound.isEmpty()) { + LOG.error("Expected lines that weren't present in the files:"); + LOG.error("\t" + StringUtils.join(setLines, "\n\t")); + LOG.error("Extra lines in files that weren't expected:"); + LOG.error("\t" + StringUtils.join(notFound, "\n\t")); + fail("Output do not match expectations."); + } + } } diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/ConnectorTestCase.java b/test/src/test/java/org/apache/sqoop/integration/connector/ConnectorTestCase.java new file mode 100644 index 00000000..595810fd --- /dev/null +++ b/test/src/test/java/org/apache/sqoop/integration/connector/ConnectorTestCase.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.integration.connector; + +import org.apache.sqoop.integration.TomcatTestCase; +import org.apache.sqoop.test.db.DatabaseProvider; +import org.apache.sqoop.test.db.DerbyProvider; +import org.junit.After; +import org.junit.Before; + +/** + * Base test case for connector testing. + * + * It will create and initialize database provider prior every test execution. + */ +abstract public class ConnectorTestCase extends TomcatTestCase { + + protected DatabaseProvider provider; + + @Before + public void startProvider() { + provider = new DerbyProvider(); + provider.start(); + } + + @After + public void stopProvider() { + provider.stop(); + } + + public String getTableName() { + return getClass().getSimpleName(); + } + + protected void createTable(String primaryKey, String ...columns) { + provider.createTable(getTableName(), primaryKey, columns); + } + + protected void dropTable() { + provider.dropTable(getTableName()); + } + + protected void insertRow(Object ...values) { + provider.insertRow(getTableName(), values); + } +} diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableImportTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableImportTest.java new file mode 100644 index 00000000..3a8b1b55 --- /dev/null +++ b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableImportTest.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.integration.connector.jdbc.generic; + +import org.apache.log4j.Logger; +import org.apache.sqoop.client.request.ConnectionRequest; +import org.apache.sqoop.client.request.ConnectorRequest; +import org.apache.sqoop.client.request.FrameworkRequest; +import org.apache.sqoop.client.request.JobRequest; +import org.apache.sqoop.client.request.SubmissionRequest; +import org.apache.sqoop.framework.configuration.OutputFormat; +import org.apache.sqoop.framework.configuration.StorageType; +import org.apache.sqoop.integration.connector.ConnectorTestCase; +import org.apache.sqoop.json.ConnectorBean; +import org.apache.sqoop.json.FrameworkBean; +import org.apache.sqoop.json.ValidationBean; +import org.apache.sqoop.model.MConnection; +import org.apache.sqoop.model.MConnector; +import org.apache.sqoop.model.MEnumInput; +import org.apache.sqoop.model.MFramework; +import org.apache.sqoop.model.MJob; +import org.apache.sqoop.model.MStringInput; +import org.apache.sqoop.model.MSubmission; +import org.apache.sqoop.test.db.DerbyProvider; +import org.apache.sqoop.validation.Status; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * Proof of concept implementation of first "real" integration test. + * + * Will be improved when client API will be created. + */ +public class TableImportTest extends ConnectorTestCase { + + private static final Logger LOG = Logger.getLogger(TableImportTest.class); + + /** + * This test is proof of concept. + * + * It will be refactored once we will create reasonable client interface. + */ + @Test + public void testBasicTableImport() throws Exception { + createTable("id", + "id", "int", + "txt", "varchar(50)" + ); + insertRow(1, "San Francisco"); + insertRow(2, "Sunnyvale"); + insertRow(3, "Brno"); + + // Connection creation and job submission will be refactored once + // the client API for embedding Sqoop client will be ready. + + // Connection creation + FrameworkBean frameworkBean = (new FrameworkRequest()).read(getServerUrl()); + ConnectorBean connectorBean = (new ConnectorRequest()).read(getServerUrl(), "1"); + MFramework framework = frameworkBean.getFramework(); + MConnector connector = connectorBean.getConnectors().get(0); + MConnection connection = new MConnection(connector.getPersistenceId(), + connector.getConnectionForms(), + framework.getConnectionForms()); + + // Connector values + ((MStringInput) (connection.getConnectorPart().getForms().get(0).getInputs().get(0))).setValue(DerbyProvider.DRIVER); + ((MStringInput) (connection.getConnectorPart().getForms().get(0).getInputs().get(1))).setValue(provider.getConnectionUrl()); + // Framework values + // No need to set anything + + ValidationBean validationBean = (new ConnectionRequest()).create(getServerUrl(), connection); + + assertEquals(Status.FINE, validationBean.getConnectorValidation().getStatus()); + assertEquals(Status.FINE, validationBean.getFrameworkValidation().getStatus()); + assertNotNull(validationBean.getId()); + connection.setPersistenceId(validationBean.getId()); + + // Job creation + MJob job = new MJob( + connector.getPersistenceId(), + connection.getPersistenceId(), + MJob.Type.IMPORT, + connector.getJobForms(MJob.Type.IMPORT), + framework.getJobForms(MJob.Type.IMPORT) + ); + + // Connector values + ((MStringInput) (job.getConnectorPart().getForms().get(0).getInputs().get(0))).setValue(provider.escapeTableName(getTableName())); + ((MStringInput) (job.getConnectorPart().getForms().get(0).getInputs().get(3))).setValue(provider.escapeColumnName("id")); + // Framework values + ((MEnumInput) (job.getFrameworkPart().getForms().get(0).getInputs().get(0))).setValue(StorageType.HDFS.toString()); + ((MEnumInput) (job.getFrameworkPart().getForms().get(0).getInputs().get(1))).setValue(OutputFormat.TEXT_FILE.toString()); + ((MStringInput) (job.getFrameworkPart().getForms().get(0).getInputs().get(2))).setValue(getMapreduceDirectory()); + + validationBean = (new JobRequest()).create(getServerUrl(), job); + assertEquals(Status.FINE, validationBean.getConnectorValidation().getStatus()); + assertEquals(Status.FINE, validationBean.getFrameworkValidation().getStatus()); + assertNotNull(validationBean.getId()); + job.setPersistenceId(validationBean.getId()); + + SubmissionRequest submissionRequest = new SubmissionRequest(); + + MSubmission submission = submissionRequest.create(getServerUrl(), "" + job.getPersistenceId()).getSubmission(); + assertTrue(submission.getStatus().isRunning()); + + // Wait until the job finish + do { + Thread.sleep(1000); + submission = submissionRequest.read(getServerUrl(), "" + job.getPersistenceId()).getSubmission(); + } while(submission.getStatus().isRunning()); + + // Assert correct output + assertMapreduceOutput( + "1,'San Francisco'", + "2,'Sunnyvale'", + "3,'Brno'" + ); + + // Clean up testing table + dropTable(); + } + +} diff --git a/test/src/test/resources/log4j.properties b/test/src/test/resources/log4j.properties new file mode 100644 index 00000000..44ffced2 --- /dev/null +++ b/test/src/test/resources/log4j.properties @@ -0,0 +1,24 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Set root logger level to DEBUG and its only appender to A1. +log4j.rootLogger=DEBUG, A1 + +# A1 is set to be a ConsoleAppender. +log4j.appender.A1=org.apache.log4j.ConsoleAppender + +# A1 uses PatternLayout. +log4j.appender.A1.layout=org.apache.log4j.PatternLayout +log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n