5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-03 18:00:26 +08:00

Append mode import and target-dir output

Signed-off-by: Aaron Kimball <aaron@cloudera.com>

git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1149913 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Bayer 2011-07-22 20:03:56 +00:00
parent 7ecb3ee286
commit 568a827a1c
15 changed files with 750 additions and 33 deletions

View File

@ -86,6 +86,8 @@ public enum FileLayout {
private String debugSqlCmd;
private String driverClassName;
private String warehouseDir;
private String targetDir;
private boolean append;
private FileLayout layout;
private boolean direct; // if true and conn is mysql, use mysqldump.
private String tmpDir; // where temp data goes; usually /tmp
@ -619,6 +621,22 @@ public void setWarehouseDir(String warehouse) {
this.warehouseDir = warehouse;
}
public String getTargetDir() {
return this.targetDir;
}
public void setTargetDir(String dir) {
this.targetDir = dir;
}
public void setAppendMode(boolean doAppend) {
this.append = doAppend;
}
public boolean isAppendMode() {
return this.append;
}
/**
* @return the destination file format
*/

View File

@ -65,7 +65,7 @@ public void importTable(ImportJobContext context)
MySQLDumpImportJob importer = null;
try {
importer = new MySQLDumpImportJob(options);
importer = new MySQLDumpImportJob(options, context);
} catch (ClassNotFoundException cnfe) {
throw new IOException("Could not load required classes", cnfe);
}

View File

@ -372,7 +372,7 @@ public void importTable(ImportJobContext context)
// This writer will be closed by AsyncSink.
SplittableBufferedWriter w = DirectImportUtils.createHdfsSink(
options.getConf(), options, tableName);
options.getConf(), options, context);
// Actually start the psql dump.
p = Runtime.getRuntime().exec(args.toArray(new String[0]),

View File

@ -21,6 +21,7 @@
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.lib.db.DataDrivenDBInputFormat;
import com.cloudera.sqoop.SqoopOptions;
import org.apache.hadoop.fs.Path;
/**
* A set of parameters describing an import operation; this is passed to
@ -32,13 +33,15 @@ public class ImportJobContext {
private String jarFile;
private SqoopOptions options;
private Class<? extends InputFormat> inputFormatClass;
private Path destination;
public ImportJobContext(final String table, final String jar,
final SqoopOptions opts) {
final SqoopOptions opts, final Path destination) {
this.tableName = table;
this.jarFile = jar;
this.options = opts;
this.inputFormatClass = DataDrivenDBInputFormat.class;
this.destination = destination;
}
/** @return the name of the table to import. */
@ -67,5 +70,14 @@ public void setInputFormat(Class<? extends InputFormat> ifClass) {
public Class<? extends InputFormat> getInputFormat() {
return this.inputFormatClass;
}
/**
* @return the destination path to where the output files will
* be first saved.
*/
public Path getDestination() {
return this.destination;
}
}

View File

@ -304,7 +304,7 @@ public void importTable(ImportJobContext context)
SqoopOptions opts = context.getOptions();
DataDrivenImportJob importer =
new DataDrivenImportJob(opts, context.getInputFormat());
new DataDrivenImportJob(opts, context.getInputFormat(), context);
String splitCol = getSplitColumn(opts, tableName);
if (null == splitCol && opts.getNumMappers() > 1) {

View File

@ -41,6 +41,7 @@
import com.cloudera.sqoop.lib.LargeObjectLoader;
import com.cloudera.sqoop.shims.HadoopShim;
import com.cloudera.sqoop.shims.ShimLoader;
import com.cloudera.sqoop.manager.ImportJobContext;
/**
* Actually runs a jdbc import job using the ORM files generated by the
@ -53,12 +54,13 @@ public class DataDrivenImportJob extends ImportJobBase {
@SuppressWarnings("unchecked")
public DataDrivenImportJob(final SqoopOptions opts) {
super(opts, null, DataDrivenDBInputFormat.class, null);
super(opts, null, DataDrivenDBInputFormat.class, null, null);
}
public DataDrivenImportJob(final SqoopOptions opts,
final Class<? extends InputFormat> inputFormatClass) {
super(opts, null, inputFormatClass, null);
final Class<? extends InputFormat> inputFormatClass,
ImportJobContext context) {
super(opts, null, inputFormatClass, null, context);
}
@Override

View File

@ -24,7 +24,6 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.GzipCodec;
@ -40,6 +39,7 @@
import com.cloudera.sqoop.shims.HadoopShim;
import com.cloudera.sqoop.util.ImportException;
import com.cloudera.sqoop.util.PerfCounters;
import com.cloudera.sqoop.manager.ImportJobContext;
/**
* Base class for running an import MapReduce job.
@ -47,6 +47,8 @@
*/
public class ImportJobBase extends JobBase {
private ImportJobContext context;
public static final Log LOG = LogFactory.getLog(
ImportJobBase.class.getName());
@ -55,14 +57,16 @@ public ImportJobBase() {
}
public ImportJobBase(final SqoopOptions opts) {
this(opts, null, null, null);
this(opts, null, null, null, null);
}
public ImportJobBase(final SqoopOptions opts,
final Class<? extends Mapper> mapperClass,
final Class<? extends InputFormat> inputFormatClass,
final Class<? extends OutputFormat> outputFormatClass) {
final Class<? extends OutputFormat> outputFormatClass,
final ImportJobContext context) {
super(opts, mapperClass, inputFormatClass, outputFormatClass);
this.context = context;
}
/**
@ -71,17 +75,7 @@ public ImportJobBase(final SqoopOptions opts,
@Override
protected void configureOutputFormat(Job job, String tableName,
String tableClassName) throws ClassNotFoundException, IOException {
String hdfsWarehouseDir = options.getWarehouseDir();
Path outputPath;
if (null != hdfsWarehouseDir) {
Path hdfsWarehousePath = new Path(hdfsWarehouseDir);
hdfsWarehousePath.makeQualified(FileSystem.get(job.getConfiguration()));
outputPath = new Path(hdfsWarehousePath, tableName);
} else {
outputPath = new Path(tableName);
}
job.setOutputFormatClass(getOutputFormatClass());
if (options.getFileLayout() == SqoopOptions.FileLayout.SequenceFile) {
@ -95,6 +89,7 @@ protected void configureOutputFormat(Job job, String tableName,
CompressionType.BLOCK);
}
Path outputPath = context.getDestination();
FileOutputFormat.setOutputPath(job, outputPath);
}

View File

@ -38,6 +38,7 @@
import com.cloudera.sqoop.manager.ConnManager;
import com.cloudera.sqoop.manager.MySQLUtils;
import com.cloudera.sqoop.shims.ShimLoader;
import com.cloudera.sqoop.manager.ImportJobContext;
/**
* Class that runs an import job using mysqldump in the mapper.
@ -47,13 +48,13 @@ public class MySQLDumpImportJob extends ImportJobBase {
public static final Log LOG =
LogFactory.getLog(MySQLDumpImportJob.class.getName());
public MySQLDumpImportJob(final SqoopOptions opts)
public MySQLDumpImportJob(final SqoopOptions opts, ImportJobContext context)
throws ClassNotFoundException {
super(opts, MySQLDumpMapper.class,
(Class<? extends InputFormat>) ShimLoader.getShimClass(
"com.cloudera.sqoop.mapreduce.MySQLDumpInputFormat"),
(Class<? extends OutputFormat>) ShimLoader.getShimClass(
"com.cloudera.sqoop.mapreduce.RawKeyTextOutputFormat"));
"com.cloudera.sqoop.mapreduce.RawKeyTextOutputFormat"), context);
}
/**

View File

@ -71,6 +71,9 @@ public abstract class BaseSqoopTool extends SqoopTool {
public static final String HADOOP_HOME_ARG = "hadoop-home";
public static final String HIVE_HOME_ARG = "hive-home";
public static final String WAREHOUSE_DIR_ARG = "warehouse-dir";
public static final String TARGET_DIR_ARG = "target-dir";
public static final String APPEND_ARG = "append";
public static final String FMT_SEQUENCEFILE_ARG = "as-sequencefile";
public static final String FMT_TEXTFILE_ARG = "as-textfile";
public static final String HIVE_IMPORT_ARG = "hive-import";

View File

@ -33,7 +33,9 @@
import com.cloudera.sqoop.cli.ToolOptions;
import com.cloudera.sqoop.hive.HiveImport;
import com.cloudera.sqoop.manager.ImportJobContext;
import com.cloudera.sqoop.util.AppendUtils;
import com.cloudera.sqoop.util.ImportException;
import org.apache.hadoop.fs.Path;
/**
* Tool that performs database imports to HDFS.
@ -72,7 +74,7 @@ protected boolean init(SqoopOptions sqoopOpts) {
public List<String> getGeneratedJarFiles() {
return this.codeGenerator.getGeneratedJarFiles();
}
protected void importTable(SqoopOptions options, String tableName,
HiveImport hiveImport) throws IOException, ImportException {
String jarFile = null;
@ -82,15 +84,48 @@ protected void importTable(SqoopOptions options, String tableName,
// Do the actual import.
ImportJobContext context = new ImportJobContext(tableName, jarFile,
options);
options, getOutputPath(options, tableName));
manager.importTable(context);
if (options.isAppendMode()) {
AppendUtils app = new AppendUtils(context);
app.append();
}
// If the user wants this table to be in Hive, perform that post-load.
if (options.doHiveImport()) {
hiveImport.importTable(tableName, options.getHiveTableName(), false);
}
}
/**
* @return the output path for the imported files;
* in append mode this will point to a temporary folder.
*/
private Path getOutputPath(SqoopOptions options, String tableName) {
// Get output directory
String hdfsWarehouseDir = options.getWarehouseDir();
String hdfsTargetDir = options.getTargetDir();
Path outputPath = null;
if (options.isAppendMode()) {
// Use temporary path, later removed when appending
outputPath = AppendUtils.getTempAppendDir(tableName);
LOG.debug("Using temporary folder: " + outputPath.getName());
} else {
// Try in this order: target-dir or warehouse-dir
if (hdfsTargetDir != null) {
outputPath = new Path(hdfsTargetDir);
} else if (hdfsWarehouseDir != null) {
outputPath = new Path(hdfsWarehouseDir, tableName);
} else {
outputPath = new Path(tableName);
}
}
return outputPath;
}
@Override
/** {@inheritDoc} */
public int run(SqoopOptions options) {
@ -172,6 +207,14 @@ protected RelatedOptions getImportOptions() {
.hasArg().withDescription("WHERE clause to use during import")
.withLongOpt(WHERE_ARG)
.create());
importOpts.addOption(OptionBuilder
.withDescription("Imports data in append mode")
.withLongOpt(APPEND_ARG)
.create());
importOpts.addOption(OptionBuilder.withArgName("dir")
.hasArg().withDescription("HDFS plain table destination")
.withLongOpt(TARGET_DIR_ARG)
.create());
}
importOpts.addOption(OptionBuilder.withArgName("dir")
@ -278,12 +321,21 @@ public void applyOptions(CommandLine in, SqoopOptions out)
if (in.hasOption(WHERE_ARG)) {
out.setWhereClause(in.getOptionValue(WHERE_ARG));
}
if (in.hasOption(TARGET_DIR_ARG)) {
out.setTargetDir(in.getOptionValue(TARGET_DIR_ARG));
}
if (in.hasOption(APPEND_ARG)) {
out.setAppendMode(true);
}
}
if (in.hasOption(WAREHOUSE_DIR_ARG)) {
out.setWarehouseDir(in.getOptionValue(WAREHOUSE_DIR_ARG));
}
if (in.hasOption(FMT_SEQUENCEFILE_ARG)) {
out.setFileLayout(SqoopOptions.FileLayout.SequenceFile);
}
@ -338,6 +390,11 @@ protected void validateImportOptions(SqoopOptions options)
&& options.getClassName() == null) {
throw new InvalidOptionsException("Jar specified with --jar-file, but no "
+ "class specified with --class-name." + HELP_STR);
} else if (options.getTargetDir() != null
&& options.getWarehouseDir() != null) {
throw new InvalidOptionsException(
"--target-dir with --warehouse-dir are incompatible options"
+ HELP_STR);
}
}

View File

@ -0,0 +1,216 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.sqoop.util;
import java.io.IOException;
import java.text.NumberFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import com.cloudera.sqoop.manager.ImportJobContext;
import com.cloudera.sqoop.SqoopOptions;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Utilities used when appending imported files to an existing dir.
*/
public class AppendUtils {
public static final Log LOG = LogFactory.getLog(AppendUtils.class.getName());
private static final SimpleDateFormat DATE_FORM = new SimpleDateFormat(
"ddHHmmssSSS");
private static final String TEMP_IMPORT_ROOT = "_sqoop";
private static final int PARTITION_DIGITS = 5;
private static final String FILEPART_SEPARATOR = "-";
private static final String FILEEXT_SEPARATOR = ".";
private ImportJobContext context = null;
public AppendUtils(ImportJobContext context) {
this.context = context;
}
/**
* Moves the imported files from temporary directory to specified target-dir,
* renaming partition number if appending file exists.
*/
public void append() throws IOException {
SqoopOptions options = context.getOptions();
FileSystem fs = FileSystem.get(options.getConf());
Path tempDir = context.getDestination();
// Try in this order: target-dir or warehouse-dir
Path userDestDir = null;
if (options.getTargetDir() != null) {
userDestDir = new Path(options.getTargetDir());
} else if (options.getWarehouseDir() != null) {
userDestDir = new Path(options.getWarehouseDir(),
context.getTableName());
} else {
userDestDir = new Path(context.getTableName());
}
int nextPartition = 0;
// Create directory in case
if (!fs.exists(userDestDir)) {
LOG.info("Creating missing output directory - " + userDestDir.getName());
fs.mkdirs(userDestDir);
nextPartition = 0;
} else {
LOG.info("Appending to directory " + userDestDir.getName());
// Get the right next partition for the imported files
nextPartition = getNextPartition(fs, userDestDir);
}
// move files
moveFiles(fs, tempDir, userDestDir, nextPartition);
// delete temporary path
LOG.debug("Deleting temporary folder "
+ context.getDestination().getName());
fs.delete(context.getDestination(), true);
}
/**
* Returns the greatest partition number available for appending, for data
* files in targetDir.
*/
private int getNextPartition(FileSystem fs, Path targetDir)
throws IOException {
int nextPartition = 0;
FileStatus[] existingFiles = fs.listStatus(targetDir);
if (existingFiles != null && existingFiles.length > 0) {
Pattern patt = Pattern.compile("part.*-([0-9][0-9][0-9][0-9][0-9]).*");
for (FileStatus fileStat : existingFiles) {
if (!fileStat.isDir()) {
String filename = fileStat.getPath().getName();
Matcher mat = patt.matcher(filename);
if (mat.matches()) {
int thisPart = Integer.parseInt(mat.group(1));
if (thisPart >= nextPartition) {
nextPartition = thisPart;
nextPartition++;
}
}
}
}
}
if (nextPartition > 0) {
LOG.info("Using found partition " + nextPartition);
}
return nextPartition;
}
/**
* Move files from source to target using a specified starting partition.
*/
private void moveFiles(FileSystem fs, Path sourceDir, Path targetDir,
int partitionStart) throws IOException {
NumberFormat numpart = NumberFormat.getInstance();
numpart.setMinimumIntegerDigits(PARTITION_DIGITS);
numpart.setGroupingUsed(false);
Pattern patt = Pattern.compile("part.*-([0-9][0-9][0-9][0-9][0-9]).*");
FileStatus[] tempFiles = fs.listStatus(sourceDir);
// Move and rename files & directories from temporary to target-dir thus
// appending file's next partition
for (FileStatus fileStat : tempFiles) {
if (!fileStat.isDir()) {
// Move imported data files
String filename = fileStat.getPath().getName();
Matcher mat = patt.matcher(filename);
if (mat.matches()) {
String name = getFilename(filename);
String fileToMove = name.concat(numpart.format(partitionStart++));
String extension = getFileExtension(filename);
if (extension != null) {
fileToMove = fileToMove.concat(extension);
}
LOG.debug("Filename: " + filename + " repartitioned to: "
+ fileToMove);
fs.rename(fileStat.getPath(), new Path(targetDir, fileToMove));
}
} else {
// Move directories (_logs & any other)
String dirName = fileStat.getPath().getName();
Path path = new Path(targetDir, dirName);
int dirNumber = 0;
while (fs.exists(path)) {
path = new Path(targetDir, dirName.concat("-").concat(
numpart.format(dirNumber++)));
}
LOG.debug("Directory: " + dirName + " renamed to: " + path.getName());
fs.rename(fileStat.getPath(), path);
}
}
}
/** returns the name component of a file. */
private String getFilename(String filename) {
String result = null;
int pos = filename.lastIndexOf(FILEPART_SEPARATOR);
if (pos != -1) {
result = filename.substring(0, pos + 1);
} else {
pos = filename.lastIndexOf(FILEEXT_SEPARATOR);
if (pos != -1) {
result = filename.substring(0, pos);
} else {
result = filename;
}
}
return result;
}
/** returns the extension component of a filename. */
private String getFileExtension(String filename) {
int pos = filename.lastIndexOf(FILEEXT_SEPARATOR);
if (pos != -1) {
return filename.substring(pos, filename.length());
} else {
return null;
}
}
/**
* Creates a unique path object inside the sqoop temporary directory.
*
* @param tableName
* @return a path pointing to the temporary directory
*/
public static Path getTempAppendDir(String tableName) {
String timeId = DATE_FORM.format(new Date(System.currentTimeMillis()));
String tempDir = TEMP_IMPORT_ROOT + Path.SEPARATOR + timeId + tableName;
return new Path(tempDir);
}
}

View File

@ -33,6 +33,7 @@
import com.cloudera.sqoop.io.SplittingOutputStream;
import com.cloudera.sqoop.io.SplittableBufferedWriter;
import org.apache.hadoop.util.Shell;
import com.cloudera.sqoop.manager.ImportJobContext;
/**
* Utility methods that are common to various the direct import managers.
@ -70,16 +71,10 @@ public static void setFilePermissions(File file, String modstr)
* returned stream.
*/
public static SplittableBufferedWriter createHdfsSink(Configuration conf,
SqoopOptions options, String tableName) throws IOException {
SqoopOptions options, ImportJobContext context) throws IOException {
FileSystem fs = FileSystem.get(conf);
String warehouseDir = options.getWarehouseDir();
Path destDir = null;
if (null != warehouseDir) {
destDir = new Path(new Path(warehouseDir), tableName);
} else {
destDir = new Path(tableName);
}
Path destDir = context.getDestination();
LOG.debug("Writing to filesystem: " + conf.get("fs.default.name"));
LOG.debug("Creating destination directory " + destDir);

View File

@ -57,6 +57,8 @@ public static Test suite() {
suite.addTestSuite(TestMultiMaps.class);
suite.addTestSuite(TestSplitBy.class);
suite.addTestSuite(TestWhere.class);
suite.addTestSuite(TestTargetDir.class);
suite.addTestSuite(TestAppendUtils.class);
suite.addTestSuite(TestHiveImport.class);
suite.addTestSuite(TestRecordParser.class);
suite.addTestSuite(TestFieldFormatter.class);

View File

@ -0,0 +1,265 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.sqoop;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.cloudera.sqoop.testutil.CommonArgs;
import com.cloudera.sqoop.testutil.HsqldbTestServer;
import com.cloudera.sqoop.testutil.ImportJobTestCase;
import com.cloudera.sqoop.tool.ImportTool;
/**
* Test that --append works.
*/
public class TestAppendUtils extends ImportJobTestCase {
private static final int PARTITION_DIGITS = 5;
private static final String FILEPART_SEPARATOR = "-";
public static final Log LOG = LogFactory.getLog(TestAppendUtils.class
.getName());
/**
* Create the argv to pass to Sqoop.
*
* @return the argv as an array of strings.
*/
protected ArrayList getOutputlessArgv(boolean includeHadoopFlags,
String[] colNames, Configuration conf) {
if (null == colNames) {
colNames = getColNames();
}
String splitByCol = colNames[0];
String columnsString = "";
for (String col : colNames) {
columnsString += col + ",";
}
ArrayList<String> args = new ArrayList<String>();
if (includeHadoopFlags) {
CommonArgs.addHadoopFlags(args);
}
args.add("--table");
args.add(getTableName());
args.add("--columns");
args.add(columnsString);
args.add("--split-by");
args.add(splitByCol);
args.add("--connect");
args.add(getConnectString());
args.add("--as-sequencefile");
args.add("--num-mappers");
args.add("1");
args.addAll(getExtraArgs(conf));
return args;
}
// this test just uses the two int table.
protected String getTableName() {
return HsqldbTestServer.getTableName();
}
/** the same than ImportJobTestCase but without removing tabledir. */
protected void runUncleanImport(String[] argv) throws IOException {
// run the tool through the normal entry-point.
int ret;
try {
Configuration conf = getConf();
SqoopOptions opts = getSqoopOptions(conf);
Sqoop sqoop = new Sqoop(new ImportTool(), conf, opts);
ret = Sqoop.runSqoop(sqoop, argv);
} catch (Exception e) {
LOG.error("Got exception running Sqoop: " + e.toString());
e.printStackTrace();
ret = 1;
}
// expect a successful return.
if (0 != ret) {
throw new IOException("Failure during job; return status " + ret);
}
}
/** @return FileStatus for data files only. */
private FileStatus[] listFiles(FileSystem fs, Path path) throws IOException {
FileStatus[] fileStatuses = fs.listStatus(path);
ArrayList files = new ArrayList();
Pattern patt = Pattern.compile("part.*-([0-9][0-9][0-9][0-9][0-9]).*");
for (FileStatus fstat : fileStatuses) {
String fname = fstat.getPath().getName();
if (!fstat.isDir()) {
Matcher mat = patt.matcher(fname);
if (mat.matches()) {
files.add(fstat);
}
}
}
return (FileStatus[]) files.toArray(new FileStatus[files.size()]);
}
private class StatusPathComparator implements Comparator<FileStatus> {
@Override
public int compare(FileStatus fs1, FileStatus fs2) {
return fs1.getPath().toString().compareTo(fs2.getPath().toString());
}
}
/** @return a concat. string with file-creation dates excluding folders. */
private String getFileCreationTimeImage(FileSystem fs, Path outputPath,
int fileCount) throws IOException {
// create string image with all file creation dates
StringBuffer image = new StringBuffer();
FileStatus[] fileStatuses = listFiles(fs, outputPath);
// sort the file statuses by path so we have a stable order for
// using 'fileCount'.
Arrays.sort(fileStatuses, new StatusPathComparator());
for (int i = 0; i < fileStatuses.length && i < fileCount; i++) {
image.append(fileStatuses[i].getPath() + "="
+ fileStatuses[i].getModificationTime());
}
return image.toString();
}
/** @return the number part of a partition */
private int getFilePartition(Path file) {
String filename = file.getName();
int pos = filename.lastIndexOf(FILEPART_SEPARATOR);
if (pos != -1) {
String part = filename.substring(pos + 1, pos + 1 + PARTITION_DIGITS);
return Integer.parseInt(part);
} else {
return 0;
}
}
/**
* Test for ouput path file-count increase, current files untouched and new
* correct partition number.
*
* @throws IOException
*/
public void runAppendTest(ArrayList args, Path outputPath)
throws IOException {
try {
// ensure non-existing output dir for insert phase
FileSystem fs = FileSystem.get(getConf());
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
// run Sqoop in INSERT mode
String[] argv = (String[]) args.toArray(new String[0]);
runUncleanImport(argv);
// get current file count
FileStatus[] fileStatuses = listFiles(fs, outputPath);
Arrays.sort(fileStatuses, new StatusPathComparator());
int previousFileCount = fileStatuses.length;
// get string image with all file creation dates
String previousImage = getFileCreationTimeImage(fs, outputPath,
previousFileCount);
// get current last partition number
Path lastFile = fileStatuses[fileStatuses.length - 1].getPath();
int lastPartition = getFilePartition(lastFile);
// run Sqoop in APPEND mode
args.add("--append");
argv = (String[]) args.toArray(new String[0]);
runUncleanImport(argv);
// check directory file increase
fileStatuses = listFiles(fs, outputPath);
Arrays.sort(fileStatuses, new StatusPathComparator());
int currentFileCount = fileStatuses.length;
assertTrue("Output directory didn't got increased in file count ",
currentFileCount > previousFileCount);
// check previous files weren't modified, also works for partition
// overlapping
String currentImage = getFileCreationTimeImage(fs, outputPath,
previousFileCount);
assertEquals("Previous files to appending operation were modified",
currentImage, previousImage);
// check that exists at least 1 new correlative partition
// let's use a different way than the code being tested
Path newFile = fileStatuses[previousFileCount].getPath(); // there is a
// new bound now
int newPartition = getFilePartition(newFile);
assertTrue("New partition file isn't correlative",
lastPartition + 1 == newPartition);
} catch (Exception e) {
LOG.error("Got Exception: " + StringUtils.stringifyException(e));
fail(e.toString());
}
}
/** independent to target-dir. */
public void testAppend() throws IOException {
ArrayList args = getOutputlessArgv(false, HsqldbTestServer.getFieldNames(),
getConf());
args.add("--warehouse-dir");
args.add(getWarehouseDir());
Path output = new Path(getWarehouseDir(), HsqldbTestServer.getTableName());
runAppendTest(args, output);
}
/** working with target-dir. */
public void testAppendToTargetDir() throws IOException {
ArrayList args = getOutputlessArgv(false, HsqldbTestServer.getFieldNames(),
getConf());
String targetDir = getWarehouseDir() + "/tempTargetDir";
args.add("--target-dir");
args.add(targetDir);
// there's no need for a new param
// in diff. w/--warehouse-dir there will no be $tablename dir
Path output = new Path(targetDir);
runAppendTest(args, output);
}
}

View File

@ -0,0 +1,151 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.sqoop;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.StringUtils;
import com.cloudera.sqoop.testutil.CommonArgs;
import com.cloudera.sqoop.testutil.HsqldbTestServer;
import com.cloudera.sqoop.testutil.ImportJobTestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Test that --target-dir works.
*/
public class TestTargetDir extends ImportJobTestCase {
public static final Log LOG = LogFactory
.getLog(TestTargetDir.class.getName());
/**
* Create the argv to pass to Sqoop.
*
* @return the argv as an array of strings.
*/
protected ArrayList getOutputArgv(boolean includeHadoopFlags) {
ArrayList<String> args = new ArrayList<String>();
if (includeHadoopFlags) {
CommonArgs.addHadoopFlags(args);
}
args.add("--table");
args.add(HsqldbTestServer.getTableName());
args.add("--connect");
args.add(HsqldbTestServer.getUrl());
args.add("--split-by");
args.add("INTFIELD1");
args.add("--as-sequencefile");
return args;
}
// this test just uses the two int table.
protected String getTableName() {
return HsqldbTestServer.getTableName();
}
/** test invalid argument exception if several output options. */
public void testSeveralOutputsIOException() throws IOException {
try {
ArrayList args = getOutputArgv(true);
args.add("--warehouse-dir");
args.add(getWarehouseDir());
args.add("--target-dir");
args.add(getWarehouseDir());
String[] argv = (String[]) args.toArray(new String[0]);
runImport(argv);
fail("warehouse-dir & target-dir were set and run "
+ "without problem reported");
} catch (IOException e) {
// expected
}
}
/** test target-dir contains imported files. */
public void testTargetDir() throws IOException {
try {
String targetDir = getWarehouseDir() + "/tempTargetDir";
ArrayList args = getOutputArgv(true);
args.add("--target-dir");
args.add(targetDir);
// delete target-dir if exists and recreate it
FileSystem fs = FileSystem.get(getConf());
Path outputPath = new Path(targetDir);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
String[] argv = (String[]) args.toArray(new String[0]);
runImport(argv);
ContentSummary summ = fs.getContentSummary(outputPath);
assertTrue("There's no new imported files in target-dir",
summ.getFileCount() > 0);
} catch (Exception e) {
LOG.error("Got Exception: " + StringUtils.stringifyException(e));
fail(e.toString());
}
}
/** test target-dir breaks if already existing
* (only allowed in append mode). */
public void testExistingTargetDir() throws IOException {
try {
String targetDir = getWarehouseDir() + "/tempTargetDir";
ArrayList args = getOutputArgv(true);
args.add("--target-dir");
args.add(targetDir);
// delete target-dir if exists and recreate it
FileSystem fs = FileSystem.get(getConf());
Path outputPath = new Path(targetDir);
if (!fs.exists(outputPath)) {
fs.mkdirs(outputPath);
}
String[] argv = (String[]) args.toArray(new String[0]);
runImport(argv);
fail("Existing target-dir run without problem report");
} catch (IOException e) {
// expected
}
}
}