diff --git a/src/java/com/cloudera/sqoop/SqoopOptions.java b/src/java/com/cloudera/sqoop/SqoopOptions.java index 6e740592..7b811610 100644 --- a/src/java/com/cloudera/sqoop/SqoopOptions.java +++ b/src/java/com/cloudera/sqoop/SqoopOptions.java @@ -86,6 +86,8 @@ public enum FileLayout { private String debugSqlCmd; private String driverClassName; private String warehouseDir; + private String targetDir; + private boolean append; private FileLayout layout; private boolean direct; // if true and conn is mysql, use mysqldump. private String tmpDir; // where temp data goes; usually /tmp @@ -619,6 +621,22 @@ public void setWarehouseDir(String warehouse) { this.warehouseDir = warehouse; } + public String getTargetDir() { + return this.targetDir; + } + + public void setTargetDir(String dir) { + this.targetDir = dir; + } + + public void setAppendMode(boolean doAppend) { + this.append = doAppend; + } + + public boolean isAppendMode() { + return this.append; + } + /** * @return the destination file format */ diff --git a/src/java/com/cloudera/sqoop/manager/DirectMySQLManager.java b/src/java/com/cloudera/sqoop/manager/DirectMySQLManager.java index 465698ae..c4a89856 100644 --- a/src/java/com/cloudera/sqoop/manager/DirectMySQLManager.java +++ b/src/java/com/cloudera/sqoop/manager/DirectMySQLManager.java @@ -65,7 +65,7 @@ public void importTable(ImportJobContext context) MySQLDumpImportJob importer = null; try { - importer = new MySQLDumpImportJob(options); + importer = new MySQLDumpImportJob(options, context); } catch (ClassNotFoundException cnfe) { throw new IOException("Could not load required classes", cnfe); } diff --git a/src/java/com/cloudera/sqoop/manager/DirectPostgresqlManager.java b/src/java/com/cloudera/sqoop/manager/DirectPostgresqlManager.java index be766d3f..a2fd06fb 100644 --- a/src/java/com/cloudera/sqoop/manager/DirectPostgresqlManager.java +++ b/src/java/com/cloudera/sqoop/manager/DirectPostgresqlManager.java @@ -372,7 +372,7 @@ public void importTable(ImportJobContext context) // This writer will be closed by AsyncSink. SplittableBufferedWriter w = DirectImportUtils.createHdfsSink( - options.getConf(), options, tableName); + options.getConf(), options, context); // Actually start the psql dump. p = Runtime.getRuntime().exec(args.toArray(new String[0]), diff --git a/src/java/com/cloudera/sqoop/manager/ImportJobContext.java b/src/java/com/cloudera/sqoop/manager/ImportJobContext.java index 27da0b15..56b7169e 100644 --- a/src/java/com/cloudera/sqoop/manager/ImportJobContext.java +++ b/src/java/com/cloudera/sqoop/manager/ImportJobContext.java @@ -21,6 +21,7 @@ import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.lib.db.DataDrivenDBInputFormat; import com.cloudera.sqoop.SqoopOptions; +import org.apache.hadoop.fs.Path; /** * A set of parameters describing an import operation; this is passed to @@ -32,13 +33,15 @@ public class ImportJobContext { private String jarFile; private SqoopOptions options; private Class inputFormatClass; + private Path destination; public ImportJobContext(final String table, final String jar, - final SqoopOptions opts) { + final SqoopOptions opts, final Path destination) { this.tableName = table; this.jarFile = jar; this.options = opts; this.inputFormatClass = DataDrivenDBInputFormat.class; + this.destination = destination; } /** @return the name of the table to import. */ @@ -67,5 +70,14 @@ public void setInputFormat(Class ifClass) { public Class getInputFormat() { return this.inputFormatClass; } + + /** + * @return the destination path to where the output files will + * be first saved. + */ + public Path getDestination() { + return this.destination; + } + } diff --git a/src/java/com/cloudera/sqoop/manager/SqlManager.java b/src/java/com/cloudera/sqoop/manager/SqlManager.java index 98a682e7..5f0a62b2 100644 --- a/src/java/com/cloudera/sqoop/manager/SqlManager.java +++ b/src/java/com/cloudera/sqoop/manager/SqlManager.java @@ -304,7 +304,7 @@ public void importTable(ImportJobContext context) SqoopOptions opts = context.getOptions(); DataDrivenImportJob importer = - new DataDrivenImportJob(opts, context.getInputFormat()); + new DataDrivenImportJob(opts, context.getInputFormat(), context); String splitCol = getSplitColumn(opts, tableName); if (null == splitCol && opts.getNumMappers() > 1) { diff --git a/src/java/com/cloudera/sqoop/mapreduce/DataDrivenImportJob.java b/src/java/com/cloudera/sqoop/mapreduce/DataDrivenImportJob.java index 24e6748b..a02082c0 100644 --- a/src/java/com/cloudera/sqoop/mapreduce/DataDrivenImportJob.java +++ b/src/java/com/cloudera/sqoop/mapreduce/DataDrivenImportJob.java @@ -41,6 +41,7 @@ import com.cloudera.sqoop.lib.LargeObjectLoader; import com.cloudera.sqoop.shims.HadoopShim; import com.cloudera.sqoop.shims.ShimLoader; +import com.cloudera.sqoop.manager.ImportJobContext; /** * Actually runs a jdbc import job using the ORM files generated by the @@ -53,12 +54,13 @@ public class DataDrivenImportJob extends ImportJobBase { @SuppressWarnings("unchecked") public DataDrivenImportJob(final SqoopOptions opts) { - super(opts, null, DataDrivenDBInputFormat.class, null); + super(opts, null, DataDrivenDBInputFormat.class, null, null); } public DataDrivenImportJob(final SqoopOptions opts, - final Class inputFormatClass) { - super(opts, null, inputFormatClass, null); + final Class inputFormatClass, + ImportJobContext context) { + super(opts, null, inputFormatClass, null, context); } @Override diff --git a/src/java/com/cloudera/sqoop/mapreduce/ImportJobBase.java b/src/java/com/cloudera/sqoop/mapreduce/ImportJobBase.java index 601d9637..c3ea09a4 100644 --- a/src/java/com/cloudera/sqoop/mapreduce/ImportJobBase.java +++ b/src/java/com/cloudera/sqoop/mapreduce/ImportJobBase.java @@ -24,7 +24,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.compress.GzipCodec; @@ -40,6 +39,7 @@ import com.cloudera.sqoop.shims.HadoopShim; import com.cloudera.sqoop.util.ImportException; import com.cloudera.sqoop.util.PerfCounters; +import com.cloudera.sqoop.manager.ImportJobContext; /** * Base class for running an import MapReduce job. @@ -47,6 +47,8 @@ */ public class ImportJobBase extends JobBase { + private ImportJobContext context; + public static final Log LOG = LogFactory.getLog( ImportJobBase.class.getName()); @@ -55,14 +57,16 @@ public ImportJobBase() { } public ImportJobBase(final SqoopOptions opts) { - this(opts, null, null, null); + this(opts, null, null, null, null); } public ImportJobBase(final SqoopOptions opts, final Class mapperClass, final Class inputFormatClass, - final Class outputFormatClass) { + final Class outputFormatClass, + final ImportJobContext context) { super(opts, mapperClass, inputFormatClass, outputFormatClass); + this.context = context; } /** @@ -71,17 +75,7 @@ public ImportJobBase(final SqoopOptions opts, @Override protected void configureOutputFormat(Job job, String tableName, String tableClassName) throws ClassNotFoundException, IOException { - String hdfsWarehouseDir = options.getWarehouseDir(); - Path outputPath; - - if (null != hdfsWarehouseDir) { - Path hdfsWarehousePath = new Path(hdfsWarehouseDir); - hdfsWarehousePath.makeQualified(FileSystem.get(job.getConfiguration())); - outputPath = new Path(hdfsWarehousePath, tableName); - } else { - outputPath = new Path(tableName); - } - + job.setOutputFormatClass(getOutputFormatClass()); if (options.getFileLayout() == SqoopOptions.FileLayout.SequenceFile) { @@ -95,6 +89,7 @@ protected void configureOutputFormat(Job job, String tableName, CompressionType.BLOCK); } + Path outputPath = context.getDestination(); FileOutputFormat.setOutputPath(job, outputPath); } diff --git a/src/java/com/cloudera/sqoop/mapreduce/MySQLDumpImportJob.java b/src/java/com/cloudera/sqoop/mapreduce/MySQLDumpImportJob.java index 6c483042..21769178 100644 --- a/src/java/com/cloudera/sqoop/mapreduce/MySQLDumpImportJob.java +++ b/src/java/com/cloudera/sqoop/mapreduce/MySQLDumpImportJob.java @@ -38,6 +38,7 @@ import com.cloudera.sqoop.manager.ConnManager; import com.cloudera.sqoop.manager.MySQLUtils; import com.cloudera.sqoop.shims.ShimLoader; +import com.cloudera.sqoop.manager.ImportJobContext; /** * Class that runs an import job using mysqldump in the mapper. @@ -47,13 +48,13 @@ public class MySQLDumpImportJob extends ImportJobBase { public static final Log LOG = LogFactory.getLog(MySQLDumpImportJob.class.getName()); - public MySQLDumpImportJob(final SqoopOptions opts) + public MySQLDumpImportJob(final SqoopOptions opts, ImportJobContext context) throws ClassNotFoundException { super(opts, MySQLDumpMapper.class, (Class) ShimLoader.getShimClass( "com.cloudera.sqoop.mapreduce.MySQLDumpInputFormat"), (Class) ShimLoader.getShimClass( - "com.cloudera.sqoop.mapreduce.RawKeyTextOutputFormat")); + "com.cloudera.sqoop.mapreduce.RawKeyTextOutputFormat"), context); } /** diff --git a/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java b/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java index 9bef2c55..03af6a45 100644 --- a/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java +++ b/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java @@ -71,6 +71,9 @@ public abstract class BaseSqoopTool extends SqoopTool { public static final String HADOOP_HOME_ARG = "hadoop-home"; public static final String HIVE_HOME_ARG = "hive-home"; public static final String WAREHOUSE_DIR_ARG = "warehouse-dir"; + public static final String TARGET_DIR_ARG = "target-dir"; + public static final String APPEND_ARG = "append"; + public static final String FMT_SEQUENCEFILE_ARG = "as-sequencefile"; public static final String FMT_TEXTFILE_ARG = "as-textfile"; public static final String HIVE_IMPORT_ARG = "hive-import"; diff --git a/src/java/com/cloudera/sqoop/tool/ImportTool.java b/src/java/com/cloudera/sqoop/tool/ImportTool.java index 7a9a9110..2bbd462a 100644 --- a/src/java/com/cloudera/sqoop/tool/ImportTool.java +++ b/src/java/com/cloudera/sqoop/tool/ImportTool.java @@ -33,7 +33,9 @@ import com.cloudera.sqoop.cli.ToolOptions; import com.cloudera.sqoop.hive.HiveImport; import com.cloudera.sqoop.manager.ImportJobContext; +import com.cloudera.sqoop.util.AppendUtils; import com.cloudera.sqoop.util.ImportException; +import org.apache.hadoop.fs.Path; /** * Tool that performs database imports to HDFS. @@ -72,7 +74,7 @@ protected boolean init(SqoopOptions sqoopOpts) { public List getGeneratedJarFiles() { return this.codeGenerator.getGeneratedJarFiles(); } - + protected void importTable(SqoopOptions options, String tableName, HiveImport hiveImport) throws IOException, ImportException { String jarFile = null; @@ -82,15 +84,48 @@ protected void importTable(SqoopOptions options, String tableName, // Do the actual import. ImportJobContext context = new ImportJobContext(tableName, jarFile, - options); + options, getOutputPath(options, tableName)); + manager.importTable(context); + + if (options.isAppendMode()) { + AppendUtils app = new AppendUtils(context); + app.append(); + } // If the user wants this table to be in Hive, perform that post-load. if (options.doHiveImport()) { hiveImport.importTable(tableName, options.getHiveTableName(), false); } } + + /** + * @return the output path for the imported files; + * in append mode this will point to a temporary folder. + */ + private Path getOutputPath(SqoopOptions options, String tableName) { + // Get output directory + String hdfsWarehouseDir = options.getWarehouseDir(); + String hdfsTargetDir = options.getTargetDir(); + Path outputPath = null; + if (options.isAppendMode()) { + // Use temporary path, later removed when appending + outputPath = AppendUtils.getTempAppendDir(tableName); + LOG.debug("Using temporary folder: " + outputPath.getName()); + } else { + // Try in this order: target-dir or warehouse-dir + if (hdfsTargetDir != null) { + outputPath = new Path(hdfsTargetDir); + } else if (hdfsWarehouseDir != null) { + outputPath = new Path(hdfsWarehouseDir, tableName); + } else { + outputPath = new Path(tableName); + } + } + return outputPath; + } + @Override /** {@inheritDoc} */ public int run(SqoopOptions options) { @@ -172,6 +207,14 @@ protected RelatedOptions getImportOptions() { .hasArg().withDescription("WHERE clause to use during import") .withLongOpt(WHERE_ARG) .create()); + importOpts.addOption(OptionBuilder + .withDescription("Imports data in append mode") + .withLongOpt(APPEND_ARG) + .create()); + importOpts.addOption(OptionBuilder.withArgName("dir") + .hasArg().withDescription("HDFS plain table destination") + .withLongOpt(TARGET_DIR_ARG) + .create()); } importOpts.addOption(OptionBuilder.withArgName("dir") @@ -278,12 +321,21 @@ public void applyOptions(CommandLine in, SqoopOptions out) if (in.hasOption(WHERE_ARG)) { out.setWhereClause(in.getOptionValue(WHERE_ARG)); } + + if (in.hasOption(TARGET_DIR_ARG)) { + out.setTargetDir(in.getOptionValue(TARGET_DIR_ARG)); + } + + if (in.hasOption(APPEND_ARG)) { + out.setAppendMode(true); + } } if (in.hasOption(WAREHOUSE_DIR_ARG)) { out.setWarehouseDir(in.getOptionValue(WAREHOUSE_DIR_ARG)); } + if (in.hasOption(FMT_SEQUENCEFILE_ARG)) { out.setFileLayout(SqoopOptions.FileLayout.SequenceFile); } @@ -338,6 +390,11 @@ protected void validateImportOptions(SqoopOptions options) && options.getClassName() == null) { throw new InvalidOptionsException("Jar specified with --jar-file, but no " + "class specified with --class-name." + HELP_STR); + } else if (options.getTargetDir() != null + && options.getWarehouseDir() != null) { + throw new InvalidOptionsException( + "--target-dir with --warehouse-dir are incompatible options" + + HELP_STR); } } diff --git a/src/java/com/cloudera/sqoop/util/AppendUtils.java b/src/java/com/cloudera/sqoop/util/AppendUtils.java new file mode 100644 index 00000000..21784513 --- /dev/null +++ b/src/java/com/cloudera/sqoop/util/AppendUtils.java @@ -0,0 +1,216 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop.util; + +import java.io.IOException; +import java.text.NumberFormat; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import com.cloudera.sqoop.manager.ImportJobContext; +import com.cloudera.sqoop.SqoopOptions; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Utilities used when appending imported files to an existing dir. + */ +public class AppendUtils { + + public static final Log LOG = LogFactory.getLog(AppendUtils.class.getName()); + + private static final SimpleDateFormat DATE_FORM = new SimpleDateFormat( + "ddHHmmssSSS"); + private static final String TEMP_IMPORT_ROOT = "_sqoop"; + + private static final int PARTITION_DIGITS = 5; + private static final String FILEPART_SEPARATOR = "-"; + private static final String FILEEXT_SEPARATOR = "."; + + private ImportJobContext context = null; + + public AppendUtils(ImportJobContext context) { + this.context = context; + } + + /** + * Moves the imported files from temporary directory to specified target-dir, + * renaming partition number if appending file exists. + */ + public void append() throws IOException { + + SqoopOptions options = context.getOptions(); + FileSystem fs = FileSystem.get(options.getConf()); + Path tempDir = context.getDestination(); + + // Try in this order: target-dir or warehouse-dir + Path userDestDir = null; + if (options.getTargetDir() != null) { + userDestDir = new Path(options.getTargetDir()); + } else if (options.getWarehouseDir() != null) { + userDestDir = new Path(options.getWarehouseDir(), + context.getTableName()); + } else { + userDestDir = new Path(context.getTableName()); + } + + int nextPartition = 0; + + // Create directory in case + if (!fs.exists(userDestDir)) { + LOG.info("Creating missing output directory - " + userDestDir.getName()); + fs.mkdirs(userDestDir); + nextPartition = 0; + } else { + LOG.info("Appending to directory " + userDestDir.getName()); + // Get the right next partition for the imported files + nextPartition = getNextPartition(fs, userDestDir); + } + + // move files + moveFiles(fs, tempDir, userDestDir, nextPartition); + + // delete temporary path + LOG.debug("Deleting temporary folder " + + context.getDestination().getName()); + fs.delete(context.getDestination(), true); + } + + /** + * Returns the greatest partition number available for appending, for data + * files in targetDir. + */ + private int getNextPartition(FileSystem fs, Path targetDir) + throws IOException { + + int nextPartition = 0; + FileStatus[] existingFiles = fs.listStatus(targetDir); + if (existingFiles != null && existingFiles.length > 0) { + Pattern patt = Pattern.compile("part.*-([0-9][0-9][0-9][0-9][0-9]).*"); + for (FileStatus fileStat : existingFiles) { + if (!fileStat.isDir()) { + String filename = fileStat.getPath().getName(); + Matcher mat = patt.matcher(filename); + if (mat.matches()) { + int thisPart = Integer.parseInt(mat.group(1)); + if (thisPart >= nextPartition) { + nextPartition = thisPart; + nextPartition++; + } + } + } + } + } + + if (nextPartition > 0) { + LOG.info("Using found partition " + nextPartition); + } + + return nextPartition; + } + + /** + * Move files from source to target using a specified starting partition. + */ + private void moveFiles(FileSystem fs, Path sourceDir, Path targetDir, + int partitionStart) throws IOException { + + NumberFormat numpart = NumberFormat.getInstance(); + numpart.setMinimumIntegerDigits(PARTITION_DIGITS); + numpart.setGroupingUsed(false); + Pattern patt = Pattern.compile("part.*-([0-9][0-9][0-9][0-9][0-9]).*"); + FileStatus[] tempFiles = fs.listStatus(sourceDir); + // Move and rename files & directories from temporary to target-dir thus + // appending file's next partition + for (FileStatus fileStat : tempFiles) { + if (!fileStat.isDir()) { + // Move imported data files + String filename = fileStat.getPath().getName(); + Matcher mat = patt.matcher(filename); + if (mat.matches()) { + String name = getFilename(filename); + String fileToMove = name.concat(numpart.format(partitionStart++)); + String extension = getFileExtension(filename); + if (extension != null) { + fileToMove = fileToMove.concat(extension); + } + LOG.debug("Filename: " + filename + " repartitioned to: " + + fileToMove); + fs.rename(fileStat.getPath(), new Path(targetDir, fileToMove)); + } + } else { + // Move directories (_logs & any other) + String dirName = fileStat.getPath().getName(); + Path path = new Path(targetDir, dirName); + int dirNumber = 0; + while (fs.exists(path)) { + path = new Path(targetDir, dirName.concat("-").concat( + numpart.format(dirNumber++))); + } + LOG.debug("Directory: " + dirName + " renamed to: " + path.getName()); + fs.rename(fileStat.getPath(), path); + } + } + } + + /** returns the name component of a file. */ + private String getFilename(String filename) { + String result = null; + int pos = filename.lastIndexOf(FILEPART_SEPARATOR); + if (pos != -1) { + result = filename.substring(0, pos + 1); + } else { + pos = filename.lastIndexOf(FILEEXT_SEPARATOR); + if (pos != -1) { + result = filename.substring(0, pos); + } else { + result = filename; + } + } + return result; + } + + /** returns the extension component of a filename. */ + private String getFileExtension(String filename) { + int pos = filename.lastIndexOf(FILEEXT_SEPARATOR); + if (pos != -1) { + return filename.substring(pos, filename.length()); + } else { + return null; + } + } + + /** + * Creates a unique path object inside the sqoop temporary directory. + * + * @param tableName + * @return a path pointing to the temporary directory + */ + public static Path getTempAppendDir(String tableName) { + String timeId = DATE_FORM.format(new Date(System.currentTimeMillis())); + String tempDir = TEMP_IMPORT_ROOT + Path.SEPARATOR + timeId + tableName; + return new Path(tempDir); + } + +} diff --git a/src/java/com/cloudera/sqoop/util/DirectImportUtils.java b/src/java/com/cloudera/sqoop/util/DirectImportUtils.java index 538a78e2..a6d63219 100644 --- a/src/java/com/cloudera/sqoop/util/DirectImportUtils.java +++ b/src/java/com/cloudera/sqoop/util/DirectImportUtils.java @@ -33,6 +33,7 @@ import com.cloudera.sqoop.io.SplittingOutputStream; import com.cloudera.sqoop.io.SplittableBufferedWriter; import org.apache.hadoop.util.Shell; +import com.cloudera.sqoop.manager.ImportJobContext; /** * Utility methods that are common to various the direct import managers. @@ -70,16 +71,10 @@ public static void setFilePermissions(File file, String modstr) * returned stream. */ public static SplittableBufferedWriter createHdfsSink(Configuration conf, - SqoopOptions options, String tableName) throws IOException { + SqoopOptions options, ImportJobContext context) throws IOException { FileSystem fs = FileSystem.get(conf); - String warehouseDir = options.getWarehouseDir(); - Path destDir = null; - if (null != warehouseDir) { - destDir = new Path(new Path(warehouseDir), tableName); - } else { - destDir = new Path(tableName); - } + Path destDir = context.getDestination(); LOG.debug("Writing to filesystem: " + conf.get("fs.default.name")); LOG.debug("Creating destination directory " + destDir); diff --git a/src/test/com/cloudera/sqoop/SmokeTests.java b/src/test/com/cloudera/sqoop/SmokeTests.java index 4573c0d4..e7aef42c 100644 --- a/src/test/com/cloudera/sqoop/SmokeTests.java +++ b/src/test/com/cloudera/sqoop/SmokeTests.java @@ -57,6 +57,8 @@ public static Test suite() { suite.addTestSuite(TestMultiMaps.class); suite.addTestSuite(TestSplitBy.class); suite.addTestSuite(TestWhere.class); + suite.addTestSuite(TestTargetDir.class); + suite.addTestSuite(TestAppendUtils.class); suite.addTestSuite(TestHiveImport.class); suite.addTestSuite(TestRecordParser.class); suite.addTestSuite(TestFieldFormatter.class); diff --git a/src/test/com/cloudera/sqoop/TestAppendUtils.java b/src/test/com/cloudera/sqoop/TestAppendUtils.java new file mode 100644 index 00000000..a5f039a0 --- /dev/null +++ b/src/test/com/cloudera/sqoop/TestAppendUtils.java @@ -0,0 +1,265 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.StringUtils; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import com.cloudera.sqoop.testutil.CommonArgs; +import com.cloudera.sqoop.testutil.HsqldbTestServer; +import com.cloudera.sqoop.testutil.ImportJobTestCase; +import com.cloudera.sqoop.tool.ImportTool; + +/** + * Test that --append works. + */ +public class TestAppendUtils extends ImportJobTestCase { + + private static final int PARTITION_DIGITS = 5; + private static final String FILEPART_SEPARATOR = "-"; + + public static final Log LOG = LogFactory.getLog(TestAppendUtils.class + .getName()); + + /** + * Create the argv to pass to Sqoop. + * + * @return the argv as an array of strings. + */ + protected ArrayList getOutputlessArgv(boolean includeHadoopFlags, + String[] colNames, Configuration conf) { + if (null == colNames) { + colNames = getColNames(); + } + + String splitByCol = colNames[0]; + String columnsString = ""; + for (String col : colNames) { + columnsString += col + ","; + } + + ArrayList args = new ArrayList(); + + if (includeHadoopFlags) { + CommonArgs.addHadoopFlags(args); + } + + args.add("--table"); + args.add(getTableName()); + args.add("--columns"); + args.add(columnsString); + args.add("--split-by"); + args.add(splitByCol); + args.add("--connect"); + args.add(getConnectString()); + args.add("--as-sequencefile"); + args.add("--num-mappers"); + args.add("1"); + + args.addAll(getExtraArgs(conf)); + + return args; + } + + // this test just uses the two int table. + protected String getTableName() { + return HsqldbTestServer.getTableName(); + } + + /** the same than ImportJobTestCase but without removing tabledir. */ + protected void runUncleanImport(String[] argv) throws IOException { + // run the tool through the normal entry-point. + int ret; + try { + Configuration conf = getConf(); + SqoopOptions opts = getSqoopOptions(conf); + Sqoop sqoop = new Sqoop(new ImportTool(), conf, opts); + ret = Sqoop.runSqoop(sqoop, argv); + } catch (Exception e) { + LOG.error("Got exception running Sqoop: " + e.toString()); + e.printStackTrace(); + ret = 1; + } + + // expect a successful return. + if (0 != ret) { + throw new IOException("Failure during job; return status " + ret); + } + } + + /** @return FileStatus for data files only. */ + private FileStatus[] listFiles(FileSystem fs, Path path) throws IOException { + FileStatus[] fileStatuses = fs.listStatus(path); + ArrayList files = new ArrayList(); + Pattern patt = Pattern.compile("part.*-([0-9][0-9][0-9][0-9][0-9]).*"); + for (FileStatus fstat : fileStatuses) { + String fname = fstat.getPath().getName(); + if (!fstat.isDir()) { + Matcher mat = patt.matcher(fname); + if (mat.matches()) { + files.add(fstat); + } + } + } + return (FileStatus[]) files.toArray(new FileStatus[files.size()]); + } + + private class StatusPathComparator implements Comparator { + + @Override + public int compare(FileStatus fs1, FileStatus fs2) { + return fs1.getPath().toString().compareTo(fs2.getPath().toString()); + } + } + + /** @return a concat. string with file-creation dates excluding folders. */ + private String getFileCreationTimeImage(FileSystem fs, Path outputPath, + int fileCount) throws IOException { + // create string image with all file creation dates + StringBuffer image = new StringBuffer(); + FileStatus[] fileStatuses = listFiles(fs, outputPath); + // sort the file statuses by path so we have a stable order for + // using 'fileCount'. + Arrays.sort(fileStatuses, new StatusPathComparator()); + for (int i = 0; i < fileStatuses.length && i < fileCount; i++) { + image.append(fileStatuses[i].getPath() + "=" + + fileStatuses[i].getModificationTime()); + } + return image.toString(); + } + + /** @return the number part of a partition */ + private int getFilePartition(Path file) { + String filename = file.getName(); + int pos = filename.lastIndexOf(FILEPART_SEPARATOR); + if (pos != -1) { + String part = filename.substring(pos + 1, pos + 1 + PARTITION_DIGITS); + return Integer.parseInt(part); + } else { + return 0; + } + } + + /** + * Test for ouput path file-count increase, current files untouched and new + * correct partition number. + * + * @throws IOException + */ + public void runAppendTest(ArrayList args, Path outputPath) + throws IOException { + + try { + + // ensure non-existing output dir for insert phase + FileSystem fs = FileSystem.get(getConf()); + if (fs.exists(outputPath)) { + fs.delete(outputPath, true); + } + + // run Sqoop in INSERT mode + String[] argv = (String[]) args.toArray(new String[0]); + runUncleanImport(argv); + + // get current file count + FileStatus[] fileStatuses = listFiles(fs, outputPath); + Arrays.sort(fileStatuses, new StatusPathComparator()); + int previousFileCount = fileStatuses.length; + + // get string image with all file creation dates + String previousImage = getFileCreationTimeImage(fs, outputPath, + previousFileCount); + + // get current last partition number + Path lastFile = fileStatuses[fileStatuses.length - 1].getPath(); + int lastPartition = getFilePartition(lastFile); + + // run Sqoop in APPEND mode + args.add("--append"); + argv = (String[]) args.toArray(new String[0]); + runUncleanImport(argv); + + // check directory file increase + fileStatuses = listFiles(fs, outputPath); + Arrays.sort(fileStatuses, new StatusPathComparator()); + int currentFileCount = fileStatuses.length; + assertTrue("Output directory didn't got increased in file count ", + currentFileCount > previousFileCount); + + // check previous files weren't modified, also works for partition + // overlapping + String currentImage = getFileCreationTimeImage(fs, outputPath, + previousFileCount); + assertEquals("Previous files to appending operation were modified", + currentImage, previousImage); + + // check that exists at least 1 new correlative partition + // let's use a different way than the code being tested + Path newFile = fileStatuses[previousFileCount].getPath(); // there is a + // new bound now + int newPartition = getFilePartition(newFile); + assertTrue("New partition file isn't correlative", + lastPartition + 1 == newPartition); + + } catch (Exception e) { + LOG.error("Got Exception: " + StringUtils.stringifyException(e)); + fail(e.toString()); + } + } + + /** independent to target-dir. */ + public void testAppend() throws IOException { + ArrayList args = getOutputlessArgv(false, HsqldbTestServer.getFieldNames(), + getConf()); + args.add("--warehouse-dir"); + args.add(getWarehouseDir()); + + Path output = new Path(getWarehouseDir(), HsqldbTestServer.getTableName()); + runAppendTest(args, output); + } + + /** working with target-dir. */ + public void testAppendToTargetDir() throws IOException { + ArrayList args = getOutputlessArgv(false, HsqldbTestServer.getFieldNames(), + getConf()); + String targetDir = getWarehouseDir() + "/tempTargetDir"; + args.add("--target-dir"); + args.add(targetDir); + + // there's no need for a new param + // in diff. w/--warehouse-dir there will no be $tablename dir + Path output = new Path(targetDir); + runAppendTest(args, output); + } + +} + diff --git a/src/test/com/cloudera/sqoop/TestTargetDir.java b/src/test/com/cloudera/sqoop/TestTargetDir.java new file mode 100644 index 00000000..2ee5a715 --- /dev/null +++ b/src/test/com/cloudera/sqoop/TestTargetDir.java @@ -0,0 +1,151 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop; + +import java.io.IOException; +import java.util.ArrayList; + +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.StringUtils; + +import com.cloudera.sqoop.testutil.CommonArgs; +import com.cloudera.sqoop.testutil.HsqldbTestServer; +import com.cloudera.sqoop.testutil.ImportJobTestCase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Test that --target-dir works. + */ +public class TestTargetDir extends ImportJobTestCase { + + public static final Log LOG = LogFactory + .getLog(TestTargetDir.class.getName()); + + /** + * Create the argv to pass to Sqoop. + * + * @return the argv as an array of strings. + */ + protected ArrayList getOutputArgv(boolean includeHadoopFlags) { + ArrayList args = new ArrayList(); + + if (includeHadoopFlags) { + CommonArgs.addHadoopFlags(args); + } + + args.add("--table"); + args.add(HsqldbTestServer.getTableName()); + args.add("--connect"); + args.add(HsqldbTestServer.getUrl()); + args.add("--split-by"); + args.add("INTFIELD1"); + args.add("--as-sequencefile"); + + return args; + } + + // this test just uses the two int table. + protected String getTableName() { + return HsqldbTestServer.getTableName(); + } + + /** test invalid argument exception if several output options. */ + public void testSeveralOutputsIOException() throws IOException { + + try { + ArrayList args = getOutputArgv(true); + args.add("--warehouse-dir"); + args.add(getWarehouseDir()); + args.add("--target-dir"); + args.add(getWarehouseDir()); + + String[] argv = (String[]) args.toArray(new String[0]); + runImport(argv); + + fail("warehouse-dir & target-dir were set and run " + + "without problem reported"); + + } catch (IOException e) { + // expected + } + } + + /** test target-dir contains imported files. */ + public void testTargetDir() throws IOException { + + try { + String targetDir = getWarehouseDir() + "/tempTargetDir"; + + ArrayList args = getOutputArgv(true); + args.add("--target-dir"); + args.add(targetDir); + + // delete target-dir if exists and recreate it + FileSystem fs = FileSystem.get(getConf()); + Path outputPath = new Path(targetDir); + if (fs.exists(outputPath)) { + fs.delete(outputPath, true); + } + + String[] argv = (String[]) args.toArray(new String[0]); + runImport(argv); + + ContentSummary summ = fs.getContentSummary(outputPath); + + assertTrue("There's no new imported files in target-dir", + summ.getFileCount() > 0); + + } catch (Exception e) { + LOG.error("Got Exception: " + StringUtils.stringifyException(e)); + fail(e.toString()); + } + } + + /** test target-dir breaks if already existing + * (only allowed in append mode). */ + public void testExistingTargetDir() throws IOException { + + try { + String targetDir = getWarehouseDir() + "/tempTargetDir"; + + ArrayList args = getOutputArgv(true); + args.add("--target-dir"); + args.add(targetDir); + + // delete target-dir if exists and recreate it + FileSystem fs = FileSystem.get(getConf()); + Path outputPath = new Path(targetDir); + if (!fs.exists(outputPath)) { + fs.mkdirs(outputPath); + } + + String[] argv = (String[]) args.toArray(new String[0]); + runImport(argv); + + fail("Existing target-dir run without problem report"); + + } catch (IOException e) { + // expected + } + } +}