From 9466a0c7d9a585a94a472fc672cad87c30c8125b Mon Sep 17 00:00:00 2001 From: Attila Szabo Date: Fri, 24 Feb 2017 07:40:48 +0100 Subject: [PATCH] SQOOP-3136: Add support to Sqoop being able to handle different file system urls (e.g. s3a://some-bucket/tmp/sqoop) (Illya Yalovyy via Attila Szabo) --- .../com/cloudera/sqoop/io/LobReaderCache.java | 2 +- .../org/apache/sqoop/hive/HiveImport.java | 4 +- .../org/apache/sqoop/hive/TableDefWriter.java | 4 +- .../org/apache/sqoop/io/LobReaderCache.java | 24 +------ .../sqoop/io/SplittingOutputStream.java | 3 +- .../apache/sqoop/lib/LargeObjectLoader.java | 2 +- .../sqoop/manager/oracle/OraOopUtilities.java | 6 +- .../mapreduce/CombineFileInputFormat.java | 5 +- .../sqoop/mapreduce/DataDrivenImportJob.java | 6 +- .../apache/sqoop/mapreduce/ExportJobBase.java | 3 +- .../sqoop/mapreduce/HBaseBulkImportJob.java | 8 +-- .../apache/sqoop/mapreduce/JdbcExportJob.java | 5 +- .../sqoop/mapreduce/JdbcUpdateExportJob.java | 5 +- .../org/apache/sqoop/mapreduce/MergeJob.java | 7 +- .../org/apache/sqoop/tool/ImportTool.java | 26 +++++--- .../org/apache/sqoop/util/FileSystemUtil.java | 45 +++++++++++++ .../org/apache/sqoop/util/FileUploader.java | 12 +--- .../apache/sqoop/util/TestFileSystemUtil.java | 65 +++++++++++++++++++ 18 files changed, 162 insertions(+), 70 deletions(-) create mode 100644 src/java/org/apache/sqoop/util/FileSystemUtil.java create mode 100644 src/test/org/apache/sqoop/util/TestFileSystemUtil.java diff --git a/src/java/com/cloudera/sqoop/io/LobReaderCache.java b/src/java/com/cloudera/sqoop/io/LobReaderCache.java index 3394296d..89d31d31 100644 --- a/src/java/com/cloudera/sqoop/io/LobReaderCache.java +++ b/src/java/com/cloudera/sqoop/io/LobReaderCache.java @@ -59,7 +59,7 @@ public static LobReaderCache getCache() { */ public static Path qualify(Path path, Configuration conf) throws IOException { - return org.apache.sqoop.io.LobReaderCache.qualify(path, conf); + return org.apache.sqoop.util.FileSystemUtil.makeQualified(path, conf); } } diff --git a/src/java/org/apache/sqoop/hive/HiveImport.java b/src/java/org/apache/sqoop/hive/HiveImport.java index 48283752..153d0912 100644 --- a/src/java/org/apache/sqoop/hive/HiveImport.java +++ b/src/java/org/apache/sqoop/hive/HiveImport.java @@ -115,7 +115,7 @@ private String getHiveBinPath() { * from where we put it, before running Hive LOAD DATA INPATH. */ private void removeTempLogs(Path tablePath) throws IOException { - FileSystem fs = FileSystem.get(configuration); + FileSystem fs = tablePath.getFileSystem(configuration); Path logsPath = new Path(tablePath, "_logs"); if (fs.exists(logsPath)) { LOG.info("Removing temporary files from import process: " + logsPath); @@ -263,7 +263,7 @@ public void importTable(String inputTableName, String outputTableName, * @throws IOException */ private void cleanUp(Path outputPath) throws IOException { - FileSystem fs = FileSystem.get(configuration); + FileSystem fs = outputPath.getFileSystem(configuration); // HIVE is not always removing input directory after LOAD DATA statement // (which is our export directory). We're removing export directory in case diff --git a/src/java/org/apache/sqoop/hive/TableDefWriter.java b/src/java/org/apache/sqoop/hive/TableDefWriter.java index c9962e9c..32fcca30 100644 --- a/src/java/org/apache/sqoop/hive/TableDefWriter.java +++ b/src/java/org/apache/sqoop/hive/TableDefWriter.java @@ -36,6 +36,7 @@ import com.cloudera.sqoop.SqoopOptions; import com.cloudera.sqoop.manager.ConnManager; +import org.apache.sqoop.util.FileSystemUtil; /** * Creates (Hive-specific) SQL DDL statements to create tables to hold data @@ -271,8 +272,7 @@ public Path getFinalPath() throws IOException { } else { tablePath = warehouseDir + inputTableName; } - FileSystem fs = FileSystem.get(configuration); - return new Path(tablePath).makeQualified(fs); + return FileSystemUtil.makeQualified(new Path(tablePath), configuration); } /** diff --git a/src/java/org/apache/sqoop/io/LobReaderCache.java b/src/java/org/apache/sqoop/io/LobReaderCache.java index bd753740..dbfa4f18 100644 --- a/src/java/org/apache/sqoop/io/LobReaderCache.java +++ b/src/java/org/apache/sqoop/io/LobReaderCache.java @@ -24,10 +24,10 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import com.cloudera.sqoop.io.LobFile; +import org.apache.sqoop.util.FileSystemUtil; /** * A cache of open LobFile.Reader objects. @@ -55,7 +55,7 @@ public LobFile.Reader get(Path path, Configuration conf) throws IOException { LobFile.Reader reader = null; - Path canonicalPath = qualify(path, conf); + Path canonicalPath = FileSystemUtil.makeQualified(path, conf); // Look up an entry in the cache. synchronized(this) { reader = readerMap.remove(canonicalPath); @@ -111,24 +111,4 @@ protected synchronized void finalize() throws Throwable { protected LobReaderCache() { this.readerMap = new TreeMap(); } - - /** - * Created a fully-qualified path object. - * @param path the path to fully-qualify with its fs URI. - * @param conf the current Hadoop FS configuration. - * @return a new path representing the same location as the input 'path', - * but with a fully-qualified URI. - */ - public static Path qualify(Path path, Configuration conf) - throws IOException { - if (null == path) { - return null; - } - - FileSystem fs = path.getFileSystem(conf); - if (null == fs) { - fs = FileSystem.get(conf); - } - return path.makeQualified(fs); - } } diff --git a/src/java/org/apache/sqoop/io/SplittingOutputStream.java b/src/java/org/apache/sqoop/io/SplittingOutputStream.java index 5f98192e..129b508e 100644 --- a/src/java/org/apache/sqoop/io/SplittingOutputStream.java +++ b/src/java/org/apache/sqoop/io/SplittingOutputStream.java @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.sqoop.util.FileSystemUtil; /** * An output stream that writes to an underlying filesystem, opening @@ -90,7 +91,7 @@ private void openNextFile() throws IOException { FileSystem fs = destFile.getFileSystem(conf); LOG.debug("Opening next output file: " + destFile); if (fs.exists(destFile)) { - Path canonicalDest = destFile.makeQualified(fs); + Path canonicalDest = fs.makeQualified(destFile); throw new IOException("Destination file " + canonicalDest + " already exists"); } diff --git a/src/java/org/apache/sqoop/lib/LargeObjectLoader.java b/src/java/org/apache/sqoop/lib/LargeObjectLoader.java index 70c0f4eb..b8525fe8 100644 --- a/src/java/org/apache/sqoop/lib/LargeObjectLoader.java +++ b/src/java/org/apache/sqoop/lib/LargeObjectLoader.java @@ -79,7 +79,7 @@ public LargeObjectLoader(Configuration conf, Path workPath) throws IOException { this.conf = conf; this.workPath = workPath; - this.fs = FileSystem.get(conf); + this.fs = workPath.getFileSystem(conf); this.curBlobWriter = null; this.curClobWriter = null; } diff --git a/src/java/org/apache/sqoop/manager/oracle/OraOopUtilities.java b/src/java/org/apache/sqoop/manager/oracle/OraOopUtilities.java index e81588cb..e73fd686 100644 --- a/src/java/org/apache/sqoop/manager/oracle/OraOopUtilities.java +++ b/src/java/org/apache/sqoop/manager/oracle/OraOopUtilities.java @@ -714,16 +714,16 @@ public static void writeOutputFile(org.apache.hadoop.conf.Configuration conf, Path uniqueFileName = null; try { - FileSystem fileSystem = FileSystem.get(conf); - // NOTE: This code is not thread-safe. // i.e. A race-condition could still cause this code to 'fail'. int suffix = 0; String fileNameTemplate = fileName + "%s"; + Path outputDirectory = new Path(getOutputDirectory(conf)); + FileSystem fileSystem = outputDirectory.getFileSystem(conf); while (true) { uniqueFileName = - new Path(getOutputDirectory(conf), String.format(fileNameTemplate, + new Path(outputDirectory, String.format(fileNameTemplate, suffix == 0 ? "" : String.format(" (%d)", suffix))); if (!fileSystem.exists(uniqueFileName)) { break; diff --git a/src/java/org/apache/sqoop/mapreduce/CombineFileInputFormat.java b/src/java/org/apache/sqoop/mapreduce/CombineFileInputFormat.java index e08f9978..fd2cf892 100644 --- a/src/java/org/apache/sqoop/mapreduce/CombineFileInputFormat.java +++ b/src/java/org/apache/sqoop/mapreduce/CombineFileInputFormat.java @@ -47,6 +47,7 @@ import org.apache.hadoop.net.NetworkTopology; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.sqoop.util.FileSystemUtil; /** * This file was ported from Hadoop 2.0.2-alpha @@ -224,11 +225,9 @@ public List getSplits(JobContext job) // times, one time each for each pool in the next loop. List newpaths = new LinkedList(); for (int i = 0; i < paths.length; i++) { - FileSystem fs = paths[i].getFileSystem(conf); - //the scheme and authority will be kept if the path is //a valid path for a non-default file system - Path p = fs.makeQualified(paths[i]); + Path p = FileSystemUtil.makeQualified(paths[i], conf); newpaths.add(p); } paths = null; diff --git a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java index 260bc29e..dc49282e 100644 --- a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java +++ b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java @@ -28,6 +28,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; @@ -48,6 +49,7 @@ import com.cloudera.sqoop.mapreduce.db.DBConfiguration; import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat; import com.cloudera.sqoop.orm.AvroSchemaGenerator; +import org.apache.sqoop.util.FileSystemUtil; import org.kitesdk.data.Datasets; import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat; @@ -141,8 +143,8 @@ private String getKiteUri(Configuration conf, String tableName) throws IOExcepti options.getHiveTableName(); return String.format("dataset:hive:/%s/%s", hiveDatabase, hiveTable); } else { - FileSystem fs = FileSystem.get(conf); - return "dataset:" + fs.makeQualified(getContext().getDestination()); + Path destination = getContext().getDestination(); + return "dataset:" + FileSystemUtil.makeQualified(destination, conf); } } diff --git a/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java b/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java index 27f84dad..c7609a53 100644 --- a/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java +++ b/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java @@ -50,6 +50,7 @@ import java.io.IOException; import java.sql.SQLException; import java.util.Date; +import org.apache.sqoop.util.FileSystemUtil; /** * Base class for running an export MapReduce job. @@ -232,7 +233,7 @@ protected Path getInputPath() throws IOException { } Path inputPath = new Path(context.getOptions().getExportDir()); Configuration conf = options.getConf(); - inputPath = inputPath.makeQualified(FileSystem.get(conf)); + inputPath = FileSystemUtil.makeQualified(inputPath, conf); return inputPath; } diff --git a/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java b/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java index b32cdd1c..2bbfffe0 100644 --- a/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java +++ b/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java @@ -92,11 +92,10 @@ protected void jobSetup(Job job) throws IOException, ImportException { protected void completeImport(Job job) throws IOException, ImportException { super.completeImport(job); - FileSystem fileSystem = FileSystem.get(job.getConfiguration()); - // Make the bulk load files source directory accessible to the world // so that the hbase user can deal with it Path bulkLoadDir = getContext().getDestination(); + FileSystem fileSystem = bulkLoadDir.getFileSystem(job.getConfiguration()); setPermission(fileSystem, fileSystem.getFileStatus(bulkLoadDir), FsPermission.createImmutable((short) 00777)); @@ -120,8 +119,9 @@ protected void completeImport(Job job) throws IOException, ImportException { protected void jobTeardown(Job job) throws IOException, ImportException { super.jobTeardown(job); // Delete the hfiles directory after we are finished. - FileSystem fileSystem = FileSystem.get(job.getConfiguration()); - fileSystem.delete(getContext().getDestination(), true); + Path destination = getContext().getDestination(); + FileSystem fileSystem = destination.getFileSystem(job.getConfiguration()); + fileSystem.delete(destination, true); } /** diff --git a/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java b/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java index 626119b0..6f9afaf9 100644 --- a/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java +++ b/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java @@ -25,7 +25,6 @@ import com.cloudera.sqoop.mapreduce.db.DBOutputFormat; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.DefaultStringifier; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; @@ -38,6 +37,7 @@ import java.io.IOException; import java.util.Map; +import org.apache.sqoop.util.FileSystemUtil; /** * Run an export using JDBC (JDBC-based ExportOutputFormat). @@ -79,8 +79,7 @@ protected void configureInputFormat(Job job, String tableName, } else if (fileType == FileType.PARQUET_FILE) { LOG.debug("Configuring for Parquet export"); configureGenericRecordExportInputFormat(job, tableName); - FileSystem fs = FileSystem.get(job.getConfiguration()); - String uri = "dataset:" + fs.makeQualified(getInputPath()); + String uri = "dataset:" + FileSystemUtil.makeQualified(getInputPath(), job.getConfiguration()); DatasetKeyInputFormat.configure(job).readFrom(uri); } } diff --git a/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java b/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java index f9112807..d13b5600 100644 --- a/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java +++ b/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java @@ -26,7 +26,6 @@ import java.util.StringTokenizer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.DefaultStringifier; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; @@ -43,6 +42,7 @@ import com.cloudera.sqoop.mapreduce.ExportJobBase; import com.cloudera.sqoop.mapreduce.db.DBConfiguration; import com.cloudera.sqoop.mapreduce.db.DBOutputFormat; +import org.apache.sqoop.util.FileSystemUtil; /** * Run an update-based export using JDBC (JDBC-based UpdateOutputFormat). @@ -187,8 +187,7 @@ protected void configureInputFormat(Job job, String tableName, String tableClass } else if (fileType == FileType.PARQUET_FILE) { LOG.debug("Configuring for Parquet export"); configureGenericRecordExportInputFormat(job, tableName); - FileSystem fs = FileSystem.get(job.getConfiguration()); - String uri = "dataset:" + fs.makeQualified(getInputPath()); + String uri = "dataset:" + FileSystemUtil.makeQualified(getInputPath(), job.getConfiguration()); DatasetKeyInputFormat.configure(job).readFrom(uri); } } diff --git a/src/java/org/apache/sqoop/mapreduce/MergeJob.java b/src/java/org/apache/sqoop/mapreduce/MergeJob.java index 5b6c4dfa..8b1cba33 100644 --- a/src/java/org/apache/sqoop/mapreduce/MergeJob.java +++ b/src/java/org/apache/sqoop/mapreduce/MergeJob.java @@ -46,6 +46,7 @@ import com.cloudera.sqoop.SqoopOptions; import com.cloudera.sqoop.mapreduce.JobBase; +import org.apache.sqoop.util.FileSystemUtil; /** * Run a MapReduce job that merges two datasets. @@ -111,9 +112,9 @@ public boolean runMergeJob() throws IOException { Path newPath = new Path(options.getMergeNewPath()); Configuration jobConf = job.getConfiguration(); - FileSystem fs = FileSystem.get(jobConf); - oldPath = oldPath.makeQualified(fs); - newPath = newPath.makeQualified(fs); + + oldPath = FileSystemUtil.makeQualified(oldPath, jobConf); + newPath = FileSystemUtil.makeQualified(newPath, jobConf); propagateOptionsToJob(job); diff --git a/src/java/org/apache/sqoop/tool/ImportTool.java b/src/java/org/apache/sqoop/tool/ImportTool.java index 258ef798..d1c97497 100644 --- a/src/java/org/apache/sqoop/tool/ImportTool.java +++ b/src/java/org/apache/sqoop/tool/ImportTool.java @@ -300,7 +300,6 @@ private boolean initIncrementalConstraints(SqoopOptions options, return true; } - FileSystem fs = FileSystem.get(options.getConf()); SqoopOptions.IncrementalMode incrementalMode = options.getIncrementalMode(); String nextIncrementalValue = null; @@ -325,11 +324,14 @@ private boolean initIncrementalConstraints(SqoopOptions options, } break; case DateLastModified: - if (options.getMergeKeyCol() == null && !options.isAppendMode() - && fs.exists(getOutputPath(options, context.getTableName(), false))) { - throw new ImportException("--" + MERGE_KEY_ARG + " or " + "--" + APPEND_ARG - + " is required when using --" + this.INCREMENT_TYPE_ARG - + " lastmodified and the output directory exists."); + if (options.getMergeKeyCol() == null && !options.isAppendMode()) { + Path outputPath = getOutputPath(options, context.getTableName(), false); + FileSystem fs = outputPath.getFileSystem(options.getConf()); + if (fs.exists(outputPath)) { + throw new ImportException("--" + MERGE_KEY_ARG + " or " + "--" + APPEND_ARG + + " is required when using --" + this.INCREMENT_TYPE_ARG + + " lastmodified and the output directory exists."); + } } checkColumnType = manager.getColumnTypes(options.getTableName(), options.getSqlQuery()).get(options.getIncrementalTestColumn()); @@ -436,10 +438,14 @@ private boolean initIncrementalConstraints(SqoopOptions options, * Merge HDFS output directories */ protected void lastModifiedMerge(SqoopOptions options, ImportJobContext context) throws IOException { - FileSystem fs = FileSystem.get(options.getConf()); - if (context.getDestination() != null && fs.exists(context.getDestination())) { + if (context.getDestination() == null) { + return; + } + + Path userDestDir = getOutputPath(options, context.getTableName(), false); + FileSystem fs = userDestDir.getFileSystem(options.getConf()); + if (fs.exists(context.getDestination())) { LOG.info("Final destination exists, will run merge job."); - Path userDestDir = getOutputPath(options, context.getTableName(), false); if (fs.exists(userDestDir)) { String tableClassName = null; if (!context.getConnManager().isORMFacilitySelfManaged()) { @@ -541,8 +547,8 @@ protected boolean importTable(SqoopOptions options, String tableName, private void deleteTargetDir(ImportJobContext context) throws IOException { SqoopOptions options = context.getOptions(); - FileSystem fs = FileSystem.get(options.getConf()); Path destDir = context.getDestination(); + FileSystem fs = destDir.getFileSystem(options.getConf()); if (fs.exists(destDir)) { fs.delete(destDir, true); diff --git a/src/java/org/apache/sqoop/util/FileSystemUtil.java b/src/java/org/apache/sqoop/util/FileSystemUtil.java new file mode 100644 index 00000000..1493e095 --- /dev/null +++ b/src/java/org/apache/sqoop/util/FileSystemUtil.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.util; + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + +public final class FileSystemUtil { + private FileSystemUtil() { + } + + + /** + * Creates a fully-qualified path object. + * @param path the path to fully-qualify with its file system URI. + * @param conf the current Hadoop configuration. + * @return a new path representing the same location as the input path, + * but with a fully-qualified URI. Returns {@code null} if provided path is {@code null}; + */ + public static Path makeQualified(Path path, Configuration conf) + throws IOException { + if (null == path) { + return null; + } + + return path.getFileSystem(conf).makeQualified(path); + } +} diff --git a/src/java/org/apache/sqoop/util/FileUploader.java b/src/java/org/apache/sqoop/util/FileUploader.java index 155cffce..673a05b3 100644 --- a/src/java/org/apache/sqoop/util/FileUploader.java +++ b/src/java/org/apache/sqoop/util/FileUploader.java @@ -18,16 +18,11 @@ package org.apache.sqoop.util; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; import java.io.IOException; -import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -40,15 +35,14 @@ private FileUploader() { } public static void uploadFilesToDFS(String srcBasePath, String src, String destBasePath, String dest, Configuration conf) throws IOException { - FileSystem fs = FileSystem.get(conf); - Path targetPath = null; Path srcPath = new Path(srcBasePath, src); - if (destBasePath == null || destBasePath.length() == 0) { + if (destBasePath == null || destBasePath.isEmpty()) { destBasePath = "."; } - targetPath = new Path(destBasePath, dest); + Path targetPath = new Path(destBasePath, dest); + FileSystem fs = targetPath.getFileSystem(conf); if (!fs.exists(targetPath)) { fs.mkdirs(targetPath); diff --git a/src/test/org/apache/sqoop/util/TestFileSystemUtil.java b/src/test/org/apache/sqoop/util/TestFileSystemUtil.java new file mode 100644 index 00000000..fef74af0 --- /dev/null +++ b/src/test/org/apache/sqoop/util/TestFileSystemUtil.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.util; + +import java.io.IOException; +import java.net.URI; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.junit.Test; +import org.junit.Before; +import static org.junit.Assert.*; + +public class TestFileSystemUtil { + private Configuration conf; + + @Before + public void setUp() { + conf = new Configuration(); + conf.set("fs.my.impl", MyFileSystem.class.getTypeName()); + } + + @Test + public void testMakeQualifiedWhenPathIsNullThenReturnsNull() throws IOException { + assertNull(FileSystemUtil.makeQualified(null, conf)); + } + + @Test + public void testMakeQualifiedWhenPathIsRelativeThenReturnDefault() throws IOException { + Path actual = FileSystemUtil.makeQualified(new Path("foo/bar"), conf); + assertEquals("file", actual.toUri().getScheme()); + } + + @Test + public void testMakeQualifiedWhenPathHasCustomSchemaThenReturnSameSchema() throws IOException { + Path actual = FileSystemUtil.makeQualified(new Path("my:/foo/bar"), conf); + assertEquals("my", actual.toUri().getScheme()); + } + + @Test(expected = IOException.class) + public void testMakeQualifiedWhenPathHasBadSchemaThenThrowsIOException() throws IOException { + FileSystemUtil.makeQualified(new Path("nosuchfs://foo/bar"), conf); + } + + public static final class MyFileSystem extends RawLocalFileSystem { + @Override + public URI getUri() { return URI.create("my:///"); } + } +}