diff --git a/pom.xml b/pom.xml index efb96591..a5e42fa8 100644 --- a/pom.xml +++ b/pom.xml @@ -93,6 +93,7 @@ limitations under the License. 1.4 2.5 2.4 + 1.9 10.8.2.2 1.0.3 2.5.1 @@ -389,6 +390,11 @@ limitations under the License. json-simple ${json-simple.version} + + org.apache.commons + commons-compress + ${commons-compress.version} + com.google.guava guava diff --git a/test/pom.xml b/test/pom.xml index 35d36c15..38b22798 100644 --- a/test/pom.xml +++ b/test/pom.xml @@ -117,6 +117,11 @@ limitations under the License. postgresql + + org.apache.commons + commons-compress + + diff --git a/test/src/main/java/org/apache/sqoop/test/minicluster/SqoopMiniCluster.java b/test/src/main/java/org/apache/sqoop/test/minicluster/SqoopMiniCluster.java index 4322b1cd..e4eecbff 100644 --- a/test/src/main/java/org/apache/sqoop/test/minicluster/SqoopMiniCluster.java +++ b/test/src/main/java/org/apache/sqoop/test/minicluster/SqoopMiniCluster.java @@ -130,6 +130,8 @@ protected void prepareTemporaryPath() throws IOException { mapToProperties(sqoopProperties, getSubmissionEngineConfiguration()); mapToProperties(sqoopProperties, getExecutionEngineConfiguration()); mapToProperties(sqoopProperties, getSecurityConfiguration()); + mapToProperties(sqoopProperties, getConnectorManagerConfiguration()); + mapToProperties(sqoopProperties, getDriverManagerConfiguration()); FileUtils.writeLines(f, sqoopProperties); @@ -208,4 +210,19 @@ protected Map getSecurityConfiguration() { return properties; } + protected Map getConnectorManagerConfiguration() { + Map properties = new HashMap(); + + properties.put("org.apache.sqoop.connector.autoupgrade", "true"); + + return properties; + } + + protected Map getDriverManagerConfiguration() { + Map properties = new HashMap(); + + properties.put("org.apache.sqoop.driver.autoupgrade", "true"); + + return properties; + } } diff --git a/test/src/main/java/org/apache/sqoop/test/testcases/TomcatTestCase.java b/test/src/main/java/org/apache/sqoop/test/testcases/TomcatTestCase.java index 5e1d564a..0b0a0a24 100644 --- a/test/src/main/java/org/apache/sqoop/test/testcases/TomcatTestCase.java +++ b/test/src/main/java/org/apache/sqoop/test/testcases/TomcatTestCase.java @@ -17,8 +17,10 @@ */ package org.apache.sqoop.test.testcases; +import java.io.File; import java.io.IOException; +import org.apache.commons.io.FileUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.mapred.JobConf; import org.apache.log4j.Logger; @@ -104,10 +106,12 @@ public static void startHadoop() throws Exception { public void startServer() throws Exception { // Get and set temporary path in hadoop cluster. tmpPath = HdfsUtils.joinPathFragments(TMP_PATH_BASE, getClass().getName(), name.getMethodName()); + FileUtils.deleteDirectory(new File(tmpPath)); + LOG.debug("Temporary Directory: " + tmpPath); // Start server - cluster = new TomcatSqoopMiniCluster(tmpPath, hadoopCluster.getConfiguration()); + cluster = createSqoopMiniCluster(); cluster.start(); // Initialize Sqoop Client API @@ -124,6 +128,17 @@ public static void stopHadoop() throws Exception { hadoopCluster.stop(); } + /** + * Create Sqoop MiniCluster instance that should be used for this test. + * + * This method will be executed only once prior each test execution. + * + * @return New instance of test mini cluster + */ + public TomcatSqoopMiniCluster createSqoopMiniCluster() throws Exception { + return new TomcatSqoopMiniCluster(getSqoopMiniClusterTemporaryPath(), hadoopCluster.getConfiguration()); + } + /** * Return SqoopClient configured to talk to testing server. * @@ -137,6 +152,10 @@ public String getTemporaryPath() { return tmpPath; } + public String getSqoopMiniClusterTemporaryPath() { + return HdfsUtils.joinPathFragments(tmpPath, "sqoop-mini-cluster"); + } + /** * Return testing server URL * diff --git a/test/src/main/java/org/apache/sqoop/test/utils/CompressionUtils.java b/test/src/main/java/org/apache/sqoop/test/utils/CompressionUtils.java new file mode 100644 index 00000000..d7255c3e --- /dev/null +++ b/test/src/main/java/org/apache/sqoop/test/utils/CompressionUtils.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.test.utils; + +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; +import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; +import org.apache.log4j.Logger; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; + +/** + * Utility methods to work with compressed files and stream. + */ +public class CompressionUtils { + + private static final Logger LOG = Logger.getLogger(CompressionUtils.class); + + /** + * Untar given stream (tar.gz archive) to given directory. + * + * Directory structure will be preserved. + * + * @param inputStream InputStream of tar.gz archive + * @param targetDirectory Target directory for tarball content + * @throws IOException + */ + public static void untarStreamToDirectory(InputStream inputStream, String targetDirectory) throws IOException { + assert inputStream != null; + assert targetDirectory != null; + + LOG.info("Untaring archive to directory: " + targetDirectory); + + TarArchiveInputStream in = new TarArchiveInputStream(new GzipCompressorInputStream(inputStream)); + TarArchiveEntry entry = null; + + int BUFFER_SIZE = 2048; + + while ((entry = (TarArchiveEntry) in.getNextEntry()) != null) { + LOG.info("Untaring file: " + entry.getName()); + + if (entry.isDirectory()) { + (new File( HdfsUtils.joinPathFragments(targetDirectory, entry.getName()))).mkdirs(); + } else { + int count; + byte data[] = new byte[BUFFER_SIZE]; + + FileOutputStream fos = new FileOutputStream(HdfsUtils.joinPathFragments(targetDirectory, entry.getName())); + BufferedOutputStream dest = new BufferedOutputStream(fos, BUFFER_SIZE); + while ((count = in.read(data, 0, BUFFER_SIZE)) != -1) { + dest.write(data, 0, count); + } + dest.close(); + } + } + in.close(); + } + + private CompressionUtils() { + // No instantiation + } +} diff --git a/test/src/test/java/org/apache/sqoop/integration/repository/derby/upgrade/Derby1_99_4UpgradeTest.java b/test/src/test/java/org/apache/sqoop/integration/repository/derby/upgrade/Derby1_99_4UpgradeTest.java new file mode 100644 index 00000000..85ce8ff5 --- /dev/null +++ b/test/src/test/java/org/apache/sqoop/integration/repository/derby/upgrade/Derby1_99_4UpgradeTest.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.integration.repository.derby.upgrade; + +import java.util.HashMap; +import java.util.Map; + +/** + * This version contains the following structures: + * Generic JDBC Connector link with name "Link1" and id 1 + * Generic JDBC Connector link with blank name and id 2 + * HDFS Connector link with name "Link3" and id 3 + * HDFS Connector link with blank name and id 4 + * HDFS Connector link with blank name and id 5 + * HDFS Connector link with blank name and id 6 + * Job (-f 1 -t 3) with name "Import" and id 1 + * Job (-f 1 -t 3) with name "Query" and id 2 + * Job (-f 3 -t 1) with name "Export" and id 3 + * Job (-f 3 -t 1) with blank name and id 4 + * Job (-f 3 -t 1) with blank name and id 5 + * Job with id 1 has been executed 3 times + * Job with id 2 has been executed 3 times + * Job with id 3 has been executed 1 times + * Link with id 4 has been disabled + * Link with id 5 has been disabled + * Job with id 4 has been disabled + * Job with id 5 has been disabled + */ +public class Derby1_99_4UpgradeTest extends DerbyRepositoryUpgradeTest { + + @Override + public String getPathToRepositoryTarball() { + return "/repository/derby/derby-repository-1.99.4.tar.gz"; + } + + @Override + public int getNumberOfLinks() { + return 6; + } + + @Override + public int getNumberOfJobs() { + return 5; + } + + @Override + public Map getNumberOfSubmissions() { + HashMap ret = new HashMap(); + ret.put(1, 3); + ret.put(2, 3); + ret.put(3, 1); + ret.put(4, 0); + ret.put(5, 0); + return ret; + } + + @Override + public Integer[] getDisabledLinkIds() { + return new Integer[] {4, 5}; + } + + @Override + public Integer[] getDisabledJobIds() { + return new Integer[] {4, 5}; + } + + @Override + public Integer[] getDeleteLinkIds() { + return new Integer[] {1, 2, 3, 4, 5, 6}; + } + + @Override + public Integer[] getDeleteJobIds() { + return new Integer[] {1, 2, 3, 4, 5}; + } +} diff --git a/test/src/test/java/org/apache/sqoop/integration/repository/derby/upgrade/DerbyRepositoryUpgradeTest.java b/test/src/test/java/org/apache/sqoop/integration/repository/derby/upgrade/DerbyRepositoryUpgradeTest.java new file mode 100644 index 00000000..f41ba2e2 --- /dev/null +++ b/test/src/test/java/org/apache/sqoop/integration/repository/derby/upgrade/DerbyRepositoryUpgradeTest.java @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.integration.repository.derby.upgrade; + +import org.apache.hadoop.conf.Configuration; +import org.apache.sqoop.test.minicluster.TomcatSqoopMiniCluster; +import org.apache.sqoop.test.testcases.TomcatTestCase; +import org.apache.sqoop.test.utils.CompressionUtils; +import org.apache.sqoop.test.utils.HdfsUtils; +import org.junit.Test; + +import org.apache.log4j.Logger; +import java.io.InputStream; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; + +/** + * Abstract test case for testing upgrade from previous version to the "most recent one". + * + * In order to properly test that we can upgrade older release to a new one, we are storing + * repository dumps created by previous releases. The test cases takes this existing repository + * dump, un-archive it into working directly and starts the server with pointing derby to + * this working directly. On the start up the server will perform complete upgrade to the + * latest version, including any schema changes and connector data changes.We run several + * tests on the repository to ensure that it's in state that we're expecting. + * + * Each tested version should have a child test case that is implementing the abstract + * methods describing content of the repository (what links/jobs it have, ...). + * + */ +public abstract class DerbyRepositoryUpgradeTest extends TomcatTestCase { + + private static final Logger LOG = Logger.getLogger(DerbyRepositoryUpgradeTest.class); + + /** + * Custom Sqoop mini cluster that points derby repository to real on-disk structures. + */ + public static class DerbySqoopMiniCluster extends TomcatSqoopMiniCluster { + private String repositoryPath; + + public DerbySqoopMiniCluster(String repositoryPath, String temporaryPath, Configuration configuration) throws Exception { + super(temporaryPath, configuration); + this.repositoryPath = repositoryPath; + } + + protected Map getRepositoryConfiguration() { + Map properties = new HashMap(); + + properties.put("org.apache.sqoop.repository.provider", "org.apache.sqoop.repository.JdbcRepositoryProvider"); + properties.put("org.apache.sqoop.repository.jdbc.handler", "org.apache.sqoop.repository.derby.DerbyRepositoryHandler"); + properties.put("org.apache.sqoop.repository.jdbc.transaction.isolation", "READ_COMMITTED"); + properties.put("org.apache.sqoop.repository.jdbc.maximum.connections", "10"); + properties.put("org.apache.sqoop.repository.jdbc.url", "jdbc:derby:" + repositoryPath); + properties.put("org.apache.sqoop.repository.jdbc.driver", "org.apache.derby.jdbc.EmbeddedDriver"); + properties.put("org.apache.sqoop.repository.jdbc.user", "sa"); + properties.put("org.apache.sqoop.repository.jdbc.password", ""); + + return properties; + } + } + + /** + * Return resource location with the repository tarball + */ + public abstract String getPathToRepositoryTarball(); + + /** + * Number of links that were stored in the repository + */ + public abstract int getNumberOfLinks(); + + /** + * Number of jobs that were stored in the repository + */ + public abstract int getNumberOfJobs(); + + /** + * Map of job id -> number of submissions that were stored in the repository + */ + public abstract Map getNumberOfSubmissions(); + + /** + * List of link ids that should be disabled + */ + public abstract Integer[] getDisabledLinkIds(); + + /** + * List of job ids that should be disabled + */ + public abstract Integer[] getDisabledJobIds(); + + /** + * List of link ids that we should delete using the id + */ + public abstract Integer[] getDeleteLinkIds(); + + /** + * List of job ids that we should delete using the id + */ + public abstract Integer[] getDeleteJobIds(); + + public String getRepositoryPath() { + return HdfsUtils.joinPathFragments(getTemporaryPath(), "repo"); + } + + @Override + public TomcatSqoopMiniCluster createSqoopMiniCluster() throws Exception { + // Prepare older repository structures + InputStream tarballStream = getClass().getResourceAsStream(getPathToRepositoryTarball()); + assertNotNull(tarballStream); + CompressionUtils.untarStreamToDirectory(tarballStream, getRepositoryPath()); + + // And use them for new Derby repo instance + return new DerbySqoopMiniCluster(getRepositoryPath(), getSqoopMiniClusterTemporaryPath(), hadoopCluster.getConfiguration()); + } + + @Test + public void testPostUpgrade() throws Exception { + // Please note that the upgrade itself is done on startup and hence prior calling this test + // method. We're just verifying that Server has started and behaves and we are expecting. + + // We could further enhance the checks here, couple of ideas for the future: + // * Add a check that will verify that the upgrade indeed happened (it's implied at the moment) + // * Run selected jobs to ensure that they in state where they can run? + + // Verify that we have expected number of objects + assertEquals(getNumberOfLinks(), getClient().getLinks().size()); + assertEquals(getNumberOfJobs(), getClient().getJobs().size()); + for(Map.Entry entry : getNumberOfSubmissions().entrySet()) { + // Skipping due to SQOOP-1782 + // assertEquals((int)entry.getValue(), getClient().getSubmissionsForJob(entry.getKey()).size()); + } + + // Verify that disabled status is preserved + for(Integer id : getDisabledLinkIds()) { + assertFalse(getClient().getLink(id).getEnabled()); + } + for(Integer id : getDisabledJobIds()) { + assertFalse(getClient().getJob(id).getEnabled()); + } + + // Remove all objects + for(Integer id : getDeleteJobIds()) { + getClient().deleteJob(id); + } + for(Integer id : getDeleteLinkIds()) { + getClient().deleteLink(id); + } + + // We should end up with empty repository + assertEquals(0, getClient().getLinks().size()); + assertEquals(0, getClient().getJobs().size()); + } +} diff --git a/test/src/test/resources/repository/derby/derby-repository-1.99.4.tar.gz b/test/src/test/resources/repository/derby/derby-repository-1.99.4.tar.gz new file mode 100644 index 00000000..7a6ceed1 Binary files /dev/null and b/test/src/test/resources/repository/derby/derby-repository-1.99.4.tar.gz differ