5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-09 21:01:20 +08:00

SQOOP-927: Sqoop2: Integration: Mapreduce specific tests should be running on MiniCluster

(Abraham Elmahrek via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2013-10-22 16:41:34 -07:00
parent 9addddfe3f
commit 39eb1e56d3
9 changed files with 432 additions and 42 deletions

View File

@ -164,6 +164,11 @@ limitations under the License.
<version>${hadoop.1.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minicluster</artifactId>
<version>${hadoop.1.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
</profile>
@ -485,7 +490,7 @@ limitations under the License.
<version>2.12</version>
<configuration>
<forkMode>always</forkMode>
<forkedProcessTimeoutInSeconds>900</forkedProcessTimeoutInSeconds>
<forkedProcessTimeoutInSeconds>3600</forkedProcessTimeoutInSeconds>
<redirectTestOutputToFile>true</redirectTestOutputToFile>
<argLine>-Xms256m -Xmx1g</argLine>
</configuration>

View File

@ -18,12 +18,14 @@
package org.apache.sqoop.test.asserts;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;
import org.apache.sqoop.test.utils.HdfsUtils;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Arrays;
import java.util.HashSet;
import java.util.LinkedList;
@ -35,8 +37,6 @@
/**
* Assert methods suitable for checking HDFS files and directories.
*
* TODO: This module will require clean up to work on MiniCluster/Real cluster.
*/
public class HdfsAsserts {
@ -49,15 +49,13 @@ public class HdfsAsserts {
* @param lines Expected lines
* @throws IOException
*/
public static void assertMapreduceOutput(String directory, String... lines) throws IOException {
public static void assertMapreduceOutput(FileSystem fs, String directory, String... lines) throws IOException {
Set<String> setLines = new HashSet<String>(Arrays.asList(lines));
List<String> notFound = new LinkedList<String>();
String []files = HdfsUtils.getOutputMapreduceFiles(directory);
for(String file : files) {
String filePath = directory + "/" + file;
BufferedReader br = new BufferedReader(new FileReader((filePath)));
Path[] files = HdfsUtils.getOutputMapreduceFiles(fs, directory);
for(Path file : files) {
BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(file)));
String line;
while ((line = br.readLine()) != null) {
@ -83,8 +81,8 @@ public static void assertMapreduceOutput(String directory, String... lines) thro
* @param directory Mapreduce output directory
* @param expectedFiles Expected number of files
*/
public static void assertMapreduceOutputFiles(String directory, int expectedFiles) {
String []files = HdfsUtils.getOutputMapreduceFiles(directory);
public static void assertMapreduceOutputFiles(FileSystem fs, String directory, int expectedFiles) throws IOException {
Path[] files = HdfsUtils.getOutputMapreduceFiles(fs, directory);
assertEquals(expectedFiles, files.length);
}

View File

@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.test.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.sqoop.test.utils.HdfsUtils;
/**
* Represents a local cluster.
* It uses an unchanged Configuration object.
* HadoopRunner implementation that is using LocalJobRunner for executing mapreduce jobs and local filesystem instead of HDFS.
*/
public class HadoopLocalRunner extends HadoopRunner {
@Override
public Configuration prepareConfiguration(Configuration conf) {
return conf;
}
@Override
public void start() throws Exception {
// Do nothing!
}
@Override
public void stop() throws Exception {
// Do nothing!
}
@Override
public String getTestDirectory() {
return HdfsUtils.joinPathFragments(getTemporaryPath(), "/mapreduce-job-io");
}
}

View File

@ -0,0 +1,93 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.test.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.log4j.Logger;
/**
* Represents a minicluster setup.
* It creates a configuration object and mutates it.
* Clients that need to connect to the miniclusters should use
* the provided configuration object.
*/
public class HadoopMiniClusterRunner extends HadoopRunner {
private static final Logger LOG = Logger.getLogger(HadoopMiniClusterRunner.class);
/**
* Hadoop HDFS cluster
*/
protected MiniDFSCluster dfsCluster;
/**
* Hadoop MR cluster
*/
protected MiniMRCluster mrCluster;
@Override
public Configuration prepareConfiguration(Configuration config) {
config.set("dfs.block.access.token.enable", "false");
config.set("dfs.permissions", "true");
config.set("hadoop.security.authentication", "simple");
config.set("mapred.tasktracker.map.tasks.maximum", "1");
config.set("mapred.tasktracker.reduce.tasks.maximum", "1");
config.set("mapred.submit.replication", "1");
config.set("yarn.resourcemanager.scheduler.class", "org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler");
config.set("yarn.application.classpath", System.getProperty("java.class.path"));
return config;
}
@SuppressWarnings("deprecation")
@Override
public void start() throws Exception {
System.setProperty("test.build.data", getDataDir());
LOG.info("test.build.data set to: " + getDataDir());
System.setProperty("hadoop.log.dir", getLogDir());
LOG.info("log dir set to: " + getLogDir());
// Start DFS server
LOG.info("Starting DFS cluster...");
dfsCluster = new MiniDFSCluster(config, 1, true, null);
if (dfsCluster.isClusterUp()) {
LOG.info("Started DFS cluster on port: " + dfsCluster.getNameNodePort());
} else {
LOG.error("Could not start DFS cluster");
}
// Start MR server
LOG.info("Starting MR cluster");
mrCluster = new MiniMRCluster(0, 0, 1, dfsCluster.getFileSystem().getUri().toString(), 1, null, null, null, new JobConf(config));
LOG.info("Started MR cluster");
config = prepareConfiguration(mrCluster.createJobConf());
}
@Override
public void stop() throws Exception {
LOG.info("Stopping MR cluster");
mrCluster.shutdown();
LOG.info("Stopped MR cluster");
LOG.info("Stopping DFS cluster");
dfsCluster.shutdown();
LOG.info("Stopped DFS cluster");
}
}

View File

@ -0,0 +1,128 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.test.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.sqoop.test.utils.HdfsUtils;
/**
* Hadoop cluster runner for testing purpose.
*
* Runner provides methods for bootstrapping and using Hadoop cluster.
* This abstract implementation is agnostic about in what mode Hadoop is running.
* Each mode will have it's own concrete implementation (for example LocalJobRunner, MiniCluster or Real existing cluster).
*/
public abstract class HadoopRunner {
/**
* Temporary path that can be used as a root for other directories storing various data like logs or stored HDFS files.
*/
private String temporaryPath;
/**
* Configuration object for Hadoop.
*/
protected Configuration config = null;
/**
* Prepare configuration object.
* This method should be called once before the start method is called.
*
* @param config is the configuration object to prepare.
*/
abstract public Configuration prepareConfiguration(Configuration config);
/**
* Start hadoop cluster.
*
* @throws Exception
*/
abstract public void start() throws Exception;
/**
* Stop hadoop cluster.
*
* @throws Exception
*/
abstract public void stop() throws Exception;
/**
* Return working directory on HDFS instance that this HadoopRunner is using.
*
* This directory might be on local filesystem in case of local mode.
*/
public String getTestDirectory() {
return "/mapreduce-job-io";
}
/**
* Get temporary path.
*
* @return
*/
public String getTemporaryPath() {
return temporaryPath;
}
/**
* Set temporary path.
*
* @param temporaryPath
*/
public void setTemporaryPath(String temporaryPath) {
this.temporaryPath = temporaryPath;
}
/**
* Return directory on local filesystem where logs and other
* data generated by the Hadoop Cluster should be stored.
*
* @return
*/
public String getDataDir() {
return HdfsUtils.joinPathFragments(temporaryPath, "data");
}
/**
* Return directory on local filesystem where logs and other
* data generated by the Hadoop Cluster should be stored.
*
* @return
*/
public String getLogDir() {
return HdfsUtils.joinPathFragments(temporaryPath, "log");
}
/**
* Get hadoop configuration.
*
* @return
*/
public Configuration getConfiguration() {
return config;
}
/**
* Set the configuration object that should be used with Miniclusters.
*
* @param config
*/
public void setConfiguration(Configuration config) {
this.config = config;
}
}

View File

@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.test.hadoop;
import java.util.Properties;
/**
* Create database provider.
*/
public class HadoopRunnerFactory {
public static final String CLUSTER_CLASS_PROPERTY = "sqoop.hadoop.runner.class";
public static HadoopRunner getHadoopCluster(Properties properties, Class<? extends HadoopRunner> defaultClusterClass) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
String className = properties.getProperty(CLUSTER_CLASS_PROPERTY);
if(className == null) {
return defaultClusterClass.newInstance();
}
Class<?> klass = Class.forName(className);
return (HadoopRunner)klass.newInstance();
}
}

View File

@ -17,6 +17,8 @@
*/
package org.apache.sqoop.test.testcases;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapred.JobConf;
import org.apache.log4j.Logger;
import org.apache.sqoop.client.SubmissionCallback;
import org.apache.sqoop.framework.configuration.OutputFormat;
@ -31,6 +33,8 @@
import org.apache.sqoop.test.data.UbuntuReleases;
import org.apache.sqoop.test.db.DatabaseProvider;
import org.apache.sqoop.test.db.DatabaseProviderFactory;
import org.apache.sqoop.test.hadoop.HadoopMiniClusterRunner;
import org.apache.sqoop.test.hadoop.HadoopRunnerFactory;
import org.apache.sqoop.validation.Status;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -70,6 +74,19 @@ public void finished(MSubmission submission) {
}
};
@BeforeClass
public static void startHadoop() throws Exception {
// Start Hadoop Clusters
hadoopCluster = HadoopRunnerFactory.getHadoopCluster(System.getProperties(), HadoopMiniClusterRunner.class);
hadoopCluster.setTemporaryPath(TMP_PATH_BASE);
hadoopCluster.setConfiguration( hadoopCluster.prepareConfiguration(new JobConf()) );
hadoopCluster.start();
// Initialize Hdfs Client
hdfsClient = FileSystem.get(hadoopCluster.getConfiguration());
LOG.debug("HDFS Client: " + hdfsClient);
}
@BeforeClass
public static void startProvider() throws Exception {
provider = DatabaseProviderFactory.getProvider(System.getProperties());

View File

@ -17,24 +17,30 @@
*/
package org.apache.sqoop.test.testcases;
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapred.JobConf;
import org.apache.log4j.Logger;
import org.apache.sqoop.client.SqoopClient;
import org.apache.sqoop.test.asserts.HdfsAsserts;
import org.apache.sqoop.test.hadoop.HadoopRunner;
import org.apache.sqoop.test.hadoop.HadoopRunnerFactory;
import org.apache.sqoop.test.hadoop.HadoopLocalRunner;
import org.apache.sqoop.test.minicluster.TomcatSqoopMiniCluster;
import org.apache.sqoop.test.utils.HdfsUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.rules.TestName;
import java.io.IOException;
/**
* Basic test case that will bootstrap Sqoop server running in external Tomcat
* process.
*/
abstract public class TomcatTestCase {
private static final Logger LOG = Logger.getLogger(TomcatTestCase.class);
@Rule public TestName name = new TestName();
@ -47,7 +53,7 @@ abstract public class TomcatTestCase {
* pick up configured java.io.tmpdir value. The last results is /tmp/ directory
* in case that no property is set.
*/
private static final String TMP_PATH_BASE =
protected static final String TMP_PATH_BASE =
System.getProperty("sqoop.integration.tmpdir", System.getProperty("java.io.tmpdir", "/tmp")) + "/sqoop-cargo-tests/";
/**
@ -61,6 +67,16 @@ abstract public class TomcatTestCase {
*/
private String tmpPath;
/**
* Hadoop cluster
*/
protected static HadoopRunner hadoopCluster;
/**
* Hadoop client
*/
protected static FileSystem hdfsClient;
/**
* Tomcat based Sqoop mini cluster
*/
@ -71,13 +87,27 @@ abstract public class TomcatTestCase {
*/
private SqoopClient client;
@BeforeClass
public static void startHadoop() throws Exception {
// Start Hadoop Clusters
hadoopCluster = HadoopRunnerFactory.getHadoopCluster(System.getProperties(), HadoopLocalRunner.class);
hadoopCluster.setTemporaryPath(TMP_PATH_BASE);
hadoopCluster.setConfiguration( hadoopCluster.prepareConfiguration(new JobConf()) );
hadoopCluster.start();
// Initialize Hdfs Client
hdfsClient = FileSystem.get(hadoopCluster.getConfiguration());
LOG.debug("HDFS Client: " + hdfsClient);
}
@Before
public void startServer() throws Exception {
// Set up the temporary path
tmpPath = TMP_PATH_BASE + getClass().getName() + "/" + name.getMethodName() + "/";
// Get and set temporary path in hadoop cluster.
tmpPath = HdfsUtils.joinPathFragments(TMP_PATH_BASE, getClass().getName(), name.getMethodName());
LOG.debug("Temporary Directory: " + tmpPath);
// Set up and start server
cluster = new TomcatSqoopMiniCluster(getTemporaryPath());
// Start server
cluster = new TomcatSqoopMiniCluster(tmpPath, hadoopCluster.getConfiguration());
cluster.start();
// Initialize Sqoop Client API
@ -89,6 +119,11 @@ public void stopServer() throws Exception {
cluster.stop();
}
@AfterClass
public static void stopHadoop() throws Exception {
hadoopCluster.stop();
}
/**
* Return SqoopClient configured to talk to testing server.
*
@ -112,12 +147,12 @@ public String getServerUrl() {
}
/**
* Get input/output directory for mapreduce job.
* Return mapreduce base directory.
*
* @return
*/
public String getMapreduceDirectory() {
return getTemporaryPath() + "/mapreduce-job-io";
return HdfsUtils.joinPathFragments(hadoopCluster.getTestDirectory(), getClass().getName(), name.getMethodName());
}
/**
@ -130,7 +165,7 @@ public String getMapreduceDirectory() {
* @throws IOException
*/
protected void assertMapreduceOutput(String... lines) throws IOException {
HdfsAsserts.assertMapreduceOutput(getMapreduceDirectory(), lines);
HdfsAsserts.assertMapreduceOutput(hdfsClient, getMapreduceDirectory(), lines);
}
/**
@ -138,8 +173,8 @@ protected void assertMapreduceOutput(String... lines) throws IOException {
*
* @param expectedFiles Expected number of files
*/
protected void assertMapreduceOutputFiles(int expectedFiles) {
HdfsAsserts.assertMapreduceOutputFiles(getMapreduceDirectory(), expectedFiles);
protected void assertMapreduceOutputFiles(int expectedFiles) throws IOException {
HdfsAsserts.assertMapreduceOutputFiles(hdfsClient, getMapreduceDirectory(), expectedFiles);
}
/**
@ -150,6 +185,6 @@ protected void assertMapreduceOutputFiles(int expectedFiles) {
* @throws IOException
*/
protected void createInputMapreduceFile(String filename, String...lines) throws IOException {
HdfsUtils.createFile(getMapreduceDirectory(), filename, lines);
HdfsUtils.createFile(hdfsClient, HdfsUtils.joinPathFragments(getMapreduceDirectory(), filename), lines);
}
}

View File

@ -17,50 +17,76 @@
*/
package org.apache.sqoop.test.utils;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;
import java.io.File;
import java.io.FilenameFilter;
import java.io.BufferedWriter;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Arrays;
import java.io.OutputStreamWriter;
import java.util.LinkedList;
/**
* Handy utilities to work with HDFS
*
* TODO: This module will require clean up to work on MiniCluster/Real cluster.
*/
public class HdfsUtils {
@SuppressWarnings("unused")
private static final Logger LOG = Logger.getLogger(HdfsUtils.class);
private static final char PATH_SEPARATOR = '/';
/**
* Get list of mapreduce output files from given directory.
*
* @param directory Directory to be searched for files generated by MR
* @return
* @throws IOException
* @throws FileNotFoundException
*/
public static String [] getOutputMapreduceFiles(String directory) {
File dir = new File(directory);
return dir.list(new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
return name.startsWith("part-");
public static Path [] getOutputMapreduceFiles(FileSystem fs, String directory) throws FileNotFoundException, IOException {
LinkedList<Path> files = new LinkedList<Path>();
for (FileStatus fileStatus : fs.listStatus(new Path(directory))) {
if (fileStatus.getPath().getName().startsWith("part-")) {
files.add(fileStatus.getPath());
}
});
}
return files.toArray(new Path[files.size()]);
}
/**
* Create HDFS file with given content.
*
* @param fs filesystem object
* @param directory Directory where the file should be created
* @param filename File name
* @param lines Individual lines that should be written into the file
* @throws IOException
*/
public static void createFile(String directory, String filename, String ...lines) throws IOException {
File outputFile = new File(directory, filename);
FileUtils.writeLines(outputFile, Arrays.asList(lines));
public static void createFile(FileSystem fs, String path, String ...lines) throws IOException {
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fs.create(new Path(path), true)));
for (String line : lines) {
writer.write(line);
writer.newLine();
}
writer.close();
}
/**
* Join several path fragments together.
* @param paths
*/
public static String joinPathFragments(String ...paths){
StringBuilder builder = new StringBuilder();
for (String path : paths) {
builder.append(path);
if (path.charAt(path.length() - 1) != PATH_SEPARATOR) {
builder.append(PATH_SEPARATOR);
}
}
return builder.toString();
}
private HdfsUtils() {