5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-05 01:51:37 +08:00

SQOOP-1783: Sqoop2: Create derby integration upgrade tests

(Jarek Jarcec Cecho via Abraham Elmahrek)
This commit is contained in:
Abraham Elmahrek 2014-12-15 10:17:31 -08:00
parent 80662d6d45
commit 4b8fa56931
8 changed files with 393 additions and 1 deletions

View File

@ -93,6 +93,7 @@ limitations under the License.
<commons-dbcp.version>1.4</commons-dbcp.version>
<commons-lang.version>2.5</commons-lang.version>
<commons-io.version>2.4</commons-io.version>
<commons-compress.version>1.9</commons-compress.version>
<derby.version>10.8.2.2</derby.version>
<hadoop.1.version>1.0.3</hadoop.1.version>
<hadoop.2.version>2.5.1</hadoop.2.version>
@ -389,6 +390,11 @@ limitations under the License.
<artifactId>json-simple</artifactId>
<version>${json-simple.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>${commons-compress.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>

View File

@ -117,6 +117,11 @@ limitations under the License.
<artifactId>postgresql</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</dependency>
</dependencies>
<!-- Add classifier name to the JAR name -->

View File

@ -130,6 +130,8 @@ protected void prepareTemporaryPath() throws IOException {
mapToProperties(sqoopProperties, getSubmissionEngineConfiguration());
mapToProperties(sqoopProperties, getExecutionEngineConfiguration());
mapToProperties(sqoopProperties, getSecurityConfiguration());
mapToProperties(sqoopProperties, getConnectorManagerConfiguration());
mapToProperties(sqoopProperties, getDriverManagerConfiguration());
FileUtils.writeLines(f, sqoopProperties);
@ -208,4 +210,19 @@ protected Map<String, String> getSecurityConfiguration() {
return properties;
}
protected Map<String, String> getConnectorManagerConfiguration() {
Map<String, String> properties = new HashMap<String, String>();
properties.put("org.apache.sqoop.connector.autoupgrade", "true");
return properties;
}
protected Map<String, String> getDriverManagerConfiguration() {
Map<String, String> properties = new HashMap<String, String>();
properties.put("org.apache.sqoop.driver.autoupgrade", "true");
return properties;
}
}

View File

@ -17,8 +17,10 @@
*/
package org.apache.sqoop.test.testcases;
import java.io.File;
import java.io.IOException;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapred.JobConf;
import org.apache.log4j.Logger;
@ -104,10 +106,12 @@ public static void startHadoop() throws Exception {
public void startServer() throws Exception {
// Get and set temporary path in hadoop cluster.
tmpPath = HdfsUtils.joinPathFragments(TMP_PATH_BASE, getClass().getName(), name.getMethodName());
FileUtils.deleteDirectory(new File(tmpPath));
LOG.debug("Temporary Directory: " + tmpPath);
// Start server
cluster = new TomcatSqoopMiniCluster(tmpPath, hadoopCluster.getConfiguration());
cluster = createSqoopMiniCluster();
cluster.start();
// Initialize Sqoop Client API
@ -124,6 +128,17 @@ public static void stopHadoop() throws Exception {
hadoopCluster.stop();
}
/**
* Create Sqoop MiniCluster instance that should be used for this test.
*
* This method will be executed only once prior each test execution.
*
* @return New instance of test mini cluster
*/
public TomcatSqoopMiniCluster createSqoopMiniCluster() throws Exception {
return new TomcatSqoopMiniCluster(getSqoopMiniClusterTemporaryPath(), hadoopCluster.getConfiguration());
}
/**
* Return SqoopClient configured to talk to testing server.
*
@ -137,6 +152,10 @@ public String getTemporaryPath() {
return tmpPath;
}
public String getSqoopMiniClusterTemporaryPath() {
return HdfsUtils.joinPathFragments(tmpPath, "sqoop-mini-cluster");
}
/**
* Return testing server URL
*

View File

@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.test.utils;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.log4j.Logger;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
/**
* Utility methods to work with compressed files and stream.
*/
public class CompressionUtils {
private static final Logger LOG = Logger.getLogger(CompressionUtils.class);
/**
* Untar given stream (tar.gz archive) to given directory.
*
* Directory structure will be preserved.
*
* @param inputStream InputStream of tar.gz archive
* @param targetDirectory Target directory for tarball content
* @throws IOException
*/
public static void untarStreamToDirectory(InputStream inputStream, String targetDirectory) throws IOException {
assert inputStream != null;
assert targetDirectory != null;
LOG.info("Untaring archive to directory: " + targetDirectory);
TarArchiveInputStream in = new TarArchiveInputStream(new GzipCompressorInputStream(inputStream));
TarArchiveEntry entry = null;
int BUFFER_SIZE = 2048;
while ((entry = (TarArchiveEntry) in.getNextEntry()) != null) {
LOG.info("Untaring file: " + entry.getName());
if (entry.isDirectory()) {
(new File( HdfsUtils.joinPathFragments(targetDirectory, entry.getName()))).mkdirs();
} else {
int count;
byte data[] = new byte[BUFFER_SIZE];
FileOutputStream fos = new FileOutputStream(HdfsUtils.joinPathFragments(targetDirectory, entry.getName()));
BufferedOutputStream dest = new BufferedOutputStream(fos, BUFFER_SIZE);
while ((count = in.read(data, 0, BUFFER_SIZE)) != -1) {
dest.write(data, 0, count);
}
dest.close();
}
}
in.close();
}
private CompressionUtils() {
// No instantiation
}
}

View File

@ -0,0 +1,91 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.integration.repository.derby.upgrade;
import java.util.HashMap;
import java.util.Map;
/**
* This version contains the following structures:
* Generic JDBC Connector link with name "Link1" and id 1
* Generic JDBC Connector link with blank name and id 2
* HDFS Connector link with name "Link3" and id 3
* HDFS Connector link with blank name and id 4
* HDFS Connector link with blank name and id 5
* HDFS Connector link with blank name and id 6
* Job (-f 1 -t 3) with name "Import" and id 1
* Job (-f 1 -t 3) with name "Query" and id 2
* Job (-f 3 -t 1) with name "Export" and id 3
* Job (-f 3 -t 1) with blank name and id 4
* Job (-f 3 -t 1) with blank name and id 5
* Job with id 1 has been executed 3 times
* Job with id 2 has been executed 3 times
* Job with id 3 has been executed 1 times
* Link with id 4 has been disabled
* Link with id 5 has been disabled
* Job with id 4 has been disabled
* Job with id 5 has been disabled
*/
public class Derby1_99_4UpgradeTest extends DerbyRepositoryUpgradeTest {
@Override
public String getPathToRepositoryTarball() {
return "/repository/derby/derby-repository-1.99.4.tar.gz";
}
@Override
public int getNumberOfLinks() {
return 6;
}
@Override
public int getNumberOfJobs() {
return 5;
}
@Override
public Map<Integer, Integer> getNumberOfSubmissions() {
HashMap<Integer, Integer> ret = new HashMap<Integer, Integer>();
ret.put(1, 3);
ret.put(2, 3);
ret.put(3, 1);
ret.put(4, 0);
ret.put(5, 0);
return ret;
}
@Override
public Integer[] getDisabledLinkIds() {
return new Integer[] {4, 5};
}
@Override
public Integer[] getDisabledJobIds() {
return new Integer[] {4, 5};
}
@Override
public Integer[] getDeleteLinkIds() {
return new Integer[] {1, 2, 3, 4, 5, 6};
}
@Override
public Integer[] getDeleteJobIds() {
return new Integer[] {1, 2, 3, 4, 5};
}
}

View File

@ -0,0 +1,173 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.integration.repository.derby.upgrade;
import org.apache.hadoop.conf.Configuration;
import org.apache.sqoop.test.minicluster.TomcatSqoopMiniCluster;
import org.apache.sqoop.test.testcases.TomcatTestCase;
import org.apache.sqoop.test.utils.CompressionUtils;
import org.apache.sqoop.test.utils.HdfsUtils;
import org.junit.Test;
import org.apache.log4j.Logger;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
/**
* Abstract test case for testing upgrade from previous version to the "most recent one".
*
* In order to properly test that we can upgrade older release to a new one, we are storing
* repository dumps created by previous releases. The test cases takes this existing repository
* dump, un-archive it into working directly and starts the server with pointing derby to
* this working directly. On the start up the server will perform complete upgrade to the
* latest version, including any schema changes and connector data changes.We run several
* tests on the repository to ensure that it's in state that we're expecting.
*
* Each tested version should have a child test case that is implementing the abstract
* methods describing content of the repository (what links/jobs it have, ...).
*
*/
public abstract class DerbyRepositoryUpgradeTest extends TomcatTestCase {
private static final Logger LOG = Logger.getLogger(DerbyRepositoryUpgradeTest.class);
/**
* Custom Sqoop mini cluster that points derby repository to real on-disk structures.
*/
public static class DerbySqoopMiniCluster extends TomcatSqoopMiniCluster {
private String repositoryPath;
public DerbySqoopMiniCluster(String repositoryPath, String temporaryPath, Configuration configuration) throws Exception {
super(temporaryPath, configuration);
this.repositoryPath = repositoryPath;
}
protected Map<String, String> getRepositoryConfiguration() {
Map<String, String> properties = new HashMap<String, String>();
properties.put("org.apache.sqoop.repository.provider", "org.apache.sqoop.repository.JdbcRepositoryProvider");
properties.put("org.apache.sqoop.repository.jdbc.handler", "org.apache.sqoop.repository.derby.DerbyRepositoryHandler");
properties.put("org.apache.sqoop.repository.jdbc.transaction.isolation", "READ_COMMITTED");
properties.put("org.apache.sqoop.repository.jdbc.maximum.connections", "10");
properties.put("org.apache.sqoop.repository.jdbc.url", "jdbc:derby:" + repositoryPath);
properties.put("org.apache.sqoop.repository.jdbc.driver", "org.apache.derby.jdbc.EmbeddedDriver");
properties.put("org.apache.sqoop.repository.jdbc.user", "sa");
properties.put("org.apache.sqoop.repository.jdbc.password", "");
return properties;
}
}
/**
* Return resource location with the repository tarball
*/
public abstract String getPathToRepositoryTarball();
/**
* Number of links that were stored in the repository
*/
public abstract int getNumberOfLinks();
/**
* Number of jobs that were stored in the repository
*/
public abstract int getNumberOfJobs();
/**
* Map of job id -> number of submissions that were stored in the repository
*/
public abstract Map<Integer, Integer> getNumberOfSubmissions();
/**
* List of link ids that should be disabled
*/
public abstract Integer[] getDisabledLinkIds();
/**
* List of job ids that should be disabled
*/
public abstract Integer[] getDisabledJobIds();
/**
* List of link ids that we should delete using the id
*/
public abstract Integer[] getDeleteLinkIds();
/**
* List of job ids that we should delete using the id
*/
public abstract Integer[] getDeleteJobIds();
public String getRepositoryPath() {
return HdfsUtils.joinPathFragments(getTemporaryPath(), "repo");
}
@Override
public TomcatSqoopMiniCluster createSqoopMiniCluster() throws Exception {
// Prepare older repository structures
InputStream tarballStream = getClass().getResourceAsStream(getPathToRepositoryTarball());
assertNotNull(tarballStream);
CompressionUtils.untarStreamToDirectory(tarballStream, getRepositoryPath());
// And use them for new Derby repo instance
return new DerbySqoopMiniCluster(getRepositoryPath(), getSqoopMiniClusterTemporaryPath(), hadoopCluster.getConfiguration());
}
@Test
public void testPostUpgrade() throws Exception {
// Please note that the upgrade itself is done on startup and hence prior calling this test
// method. We're just verifying that Server has started and behaves and we are expecting.
// We could further enhance the checks here, couple of ideas for the future:
// * Add a check that will verify that the upgrade indeed happened (it's implied at the moment)
// * Run selected jobs to ensure that they in state where they can run?
// Verify that we have expected number of objects
assertEquals(getNumberOfLinks(), getClient().getLinks().size());
assertEquals(getNumberOfJobs(), getClient().getJobs().size());
for(Map.Entry<Integer, Integer> entry : getNumberOfSubmissions().entrySet()) {
// Skipping due to SQOOP-1782
// assertEquals((int)entry.getValue(), getClient().getSubmissionsForJob(entry.getKey()).size());
}
// Verify that disabled status is preserved
for(Integer id : getDisabledLinkIds()) {
assertFalse(getClient().getLink(id).getEnabled());
}
for(Integer id : getDisabledJobIds()) {
assertFalse(getClient().getJob(id).getEnabled());
}
// Remove all objects
for(Integer id : getDeleteJobIds()) {
getClient().deleteJob(id);
}
for(Integer id : getDeleteLinkIds()) {
getClient().deleteLink(id);
}
// We should end up with empty repository
assertEquals(0, getClient().getLinks().size());
assertEquals(0, getClient().getJobs().size());
}
}