5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-03 22:34:30 +08:00

SQOOP-3136: Add support to Sqoop being able to handle different file

system urls (e.g. s3a://some-bucket/tmp/sqoop)

(Illya Yalovyy via Attila Szabo)
This commit is contained in:
Attila Szabo 2017-02-24 07:40:48 +01:00
parent d8c4b3ccdd
commit 9466a0c7d9
18 changed files with 162 additions and 70 deletions

View File

@ -59,7 +59,7 @@ public static LobReaderCache getCache() {
*/
public static Path qualify(Path path, Configuration conf)
throws IOException {
return org.apache.sqoop.io.LobReaderCache.qualify(path, conf);
return org.apache.sqoop.util.FileSystemUtil.makeQualified(path, conf);
}
}

View File

@ -115,7 +115,7 @@ private String getHiveBinPath() {
* from where we put it, before running Hive LOAD DATA INPATH.
*/
private void removeTempLogs(Path tablePath) throws IOException {
FileSystem fs = FileSystem.get(configuration);
FileSystem fs = tablePath.getFileSystem(configuration);
Path logsPath = new Path(tablePath, "_logs");
if (fs.exists(logsPath)) {
LOG.info("Removing temporary files from import process: " + logsPath);
@ -263,7 +263,7 @@ public void importTable(String inputTableName, String outputTableName,
* @throws IOException
*/
private void cleanUp(Path outputPath) throws IOException {
FileSystem fs = FileSystem.get(configuration);
FileSystem fs = outputPath.getFileSystem(configuration);
// HIVE is not always removing input directory after LOAD DATA statement
// (which is our export directory). We're removing export directory in case

View File

@ -36,6 +36,7 @@
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.manager.ConnManager;
import org.apache.sqoop.util.FileSystemUtil;
/**
* Creates (Hive-specific) SQL DDL statements to create tables to hold data
@ -271,8 +272,7 @@ public Path getFinalPath() throws IOException {
} else {
tablePath = warehouseDir + inputTableName;
}
FileSystem fs = FileSystem.get(configuration);
return new Path(tablePath).makeQualified(fs);
return FileSystemUtil.makeQualified(new Path(tablePath), configuration);
}
/**

View File

@ -24,10 +24,10 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import com.cloudera.sqoop.io.LobFile;
import org.apache.sqoop.util.FileSystemUtil;
/**
* A cache of open LobFile.Reader objects.
@ -55,7 +55,7 @@ public LobFile.Reader get(Path path, Configuration conf)
throws IOException {
LobFile.Reader reader = null;
Path canonicalPath = qualify(path, conf);
Path canonicalPath = FileSystemUtil.makeQualified(path, conf);
// Look up an entry in the cache.
synchronized(this) {
reader = readerMap.remove(canonicalPath);
@ -111,24 +111,4 @@ protected synchronized void finalize() throws Throwable {
protected LobReaderCache() {
this.readerMap = new TreeMap<Path, LobFile.Reader>();
}
/**
* Created a fully-qualified path object.
* @param path the path to fully-qualify with its fs URI.
* @param conf the current Hadoop FS configuration.
* @return a new path representing the same location as the input 'path',
* but with a fully-qualified URI.
*/
public static Path qualify(Path path, Configuration conf)
throws IOException {
if (null == path) {
return null;
}
FileSystem fs = path.getFileSystem(conf);
if (null == fs) {
fs = FileSystem.get(conf);
}
return path.makeQualified(fs);
}
}

View File

@ -28,6 +28,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.sqoop.util.FileSystemUtil;
/**
* An output stream that writes to an underlying filesystem, opening
@ -90,7 +91,7 @@ private void openNextFile() throws IOException {
FileSystem fs = destFile.getFileSystem(conf);
LOG.debug("Opening next output file: " + destFile);
if (fs.exists(destFile)) {
Path canonicalDest = destFile.makeQualified(fs);
Path canonicalDest = fs.makeQualified(destFile);
throw new IOException("Destination file " + canonicalDest
+ " already exists");
}

View File

@ -79,7 +79,7 @@ public LargeObjectLoader(Configuration conf, Path workPath)
throws IOException {
this.conf = conf;
this.workPath = workPath;
this.fs = FileSystem.get(conf);
this.fs = workPath.getFileSystem(conf);
this.curBlobWriter = null;
this.curClobWriter = null;
}

View File

@ -714,16 +714,16 @@ public static void writeOutputFile(org.apache.hadoop.conf.Configuration conf,
Path uniqueFileName = null;
try {
FileSystem fileSystem = FileSystem.get(conf);
// NOTE: This code is not thread-safe.
// i.e. A race-condition could still cause this code to 'fail'.
int suffix = 0;
String fileNameTemplate = fileName + "%s";
Path outputDirectory = new Path(getOutputDirectory(conf));
FileSystem fileSystem = outputDirectory.getFileSystem(conf);
while (true) {
uniqueFileName =
new Path(getOutputDirectory(conf), String.format(fileNameTemplate,
new Path(outputDirectory, String.format(fileNameTemplate,
suffix == 0 ? "" : String.format(" (%d)", suffix)));
if (!fileSystem.exists(uniqueFileName)) {
break;

View File

@ -47,6 +47,7 @@
import org.apache.hadoop.net.NetworkTopology;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sqoop.util.FileSystemUtil;
/**
* This file was ported from Hadoop 2.0.2-alpha
@ -224,11 +225,9 @@ public List<InputSplit> getSplits(JobContext job)
// times, one time each for each pool in the next loop.
List<Path> newpaths = new LinkedList<Path>();
for (int i = 0; i < paths.length; i++) {
FileSystem fs = paths[i].getFileSystem(conf);
//the scheme and authority will be kept if the path is
//a valid path for a non-default file system
Path p = fs.makeQualified(paths[i]);
Path p = FileSystemUtil.makeQualified(paths[i], conf);
newpaths.add(p);
}
paths = null;

View File

@ -28,6 +28,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
@ -48,6 +49,7 @@
import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;
import com.cloudera.sqoop.orm.AvroSchemaGenerator;
import org.apache.sqoop.util.FileSystemUtil;
import org.kitesdk.data.Datasets;
import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat;
@ -141,8 +143,8 @@ private String getKiteUri(Configuration conf, String tableName) throws IOExcepti
options.getHiveTableName();
return String.format("dataset:hive:/%s/%s", hiveDatabase, hiveTable);
} else {
FileSystem fs = FileSystem.get(conf);
return "dataset:" + fs.makeQualified(getContext().getDestination());
Path destination = getContext().getDestination();
return "dataset:" + FileSystemUtil.makeQualified(destination, conf);
}
}

View File

@ -50,6 +50,7 @@
import java.io.IOException;
import java.sql.SQLException;
import java.util.Date;
import org.apache.sqoop.util.FileSystemUtil;
/**
* Base class for running an export MapReduce job.
@ -232,7 +233,7 @@ protected Path getInputPath() throws IOException {
}
Path inputPath = new Path(context.getOptions().getExportDir());
Configuration conf = options.getConf();
inputPath = inputPath.makeQualified(FileSystem.get(conf));
inputPath = FileSystemUtil.makeQualified(inputPath, conf);
return inputPath;
}

View File

@ -92,11 +92,10 @@ protected void jobSetup(Job job) throws IOException, ImportException {
protected void completeImport(Job job) throws IOException, ImportException {
super.completeImport(job);
FileSystem fileSystem = FileSystem.get(job.getConfiguration());
// Make the bulk load files source directory accessible to the world
// so that the hbase user can deal with it
Path bulkLoadDir = getContext().getDestination();
FileSystem fileSystem = bulkLoadDir.getFileSystem(job.getConfiguration());
setPermission(fileSystem, fileSystem.getFileStatus(bulkLoadDir),
FsPermission.createImmutable((short) 00777));
@ -120,8 +119,9 @@ protected void completeImport(Job job) throws IOException, ImportException {
protected void jobTeardown(Job job) throws IOException, ImportException {
super.jobTeardown(job);
// Delete the hfiles directory after we are finished.
FileSystem fileSystem = FileSystem.get(job.getConfiguration());
fileSystem.delete(getContext().getDestination(), true);
Path destination = getContext().getDestination();
FileSystem fileSystem = destination.getFileSystem(job.getConfiguration());
fileSystem.delete(destination, true);
}
/**

View File

@ -25,7 +25,6 @@
import com.cloudera.sqoop.mapreduce.db.DBOutputFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.DefaultStringifier;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
@ -38,6 +37,7 @@
import java.io.IOException;
import java.util.Map;
import org.apache.sqoop.util.FileSystemUtil;
/**
* Run an export using JDBC (JDBC-based ExportOutputFormat).
@ -79,8 +79,7 @@ protected void configureInputFormat(Job job, String tableName,
} else if (fileType == FileType.PARQUET_FILE) {
LOG.debug("Configuring for Parquet export");
configureGenericRecordExportInputFormat(job, tableName);
FileSystem fs = FileSystem.get(job.getConfiguration());
String uri = "dataset:" + fs.makeQualified(getInputPath());
String uri = "dataset:" + FileSystemUtil.makeQualified(getInputPath(), job.getConfiguration());
DatasetKeyInputFormat.configure(job).readFrom(uri);
}
}

View File

@ -26,7 +26,6 @@
import java.util.StringTokenizer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.DefaultStringifier;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
@ -43,6 +42,7 @@
import com.cloudera.sqoop.mapreduce.ExportJobBase;
import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
import com.cloudera.sqoop.mapreduce.db.DBOutputFormat;
import org.apache.sqoop.util.FileSystemUtil;
/**
* Run an update-based export using JDBC (JDBC-based UpdateOutputFormat).
@ -187,8 +187,7 @@ protected void configureInputFormat(Job job, String tableName, String tableClass
} else if (fileType == FileType.PARQUET_FILE) {
LOG.debug("Configuring for Parquet export");
configureGenericRecordExportInputFormat(job, tableName);
FileSystem fs = FileSystem.get(job.getConfiguration());
String uri = "dataset:" + fs.makeQualified(getInputPath());
String uri = "dataset:" + FileSystemUtil.makeQualified(getInputPath(), job.getConfiguration());
DatasetKeyInputFormat.configure(job).readFrom(uri);
}
}

View File

@ -46,6 +46,7 @@
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.mapreduce.JobBase;
import org.apache.sqoop.util.FileSystemUtil;
/**
* Run a MapReduce job that merges two datasets.
@ -111,9 +112,9 @@ public boolean runMergeJob() throws IOException {
Path newPath = new Path(options.getMergeNewPath());
Configuration jobConf = job.getConfiguration();
FileSystem fs = FileSystem.get(jobConf);
oldPath = oldPath.makeQualified(fs);
newPath = newPath.makeQualified(fs);
oldPath = FileSystemUtil.makeQualified(oldPath, jobConf);
newPath = FileSystemUtil.makeQualified(newPath, jobConf);
propagateOptionsToJob(job);

View File

@ -300,7 +300,6 @@ private boolean initIncrementalConstraints(SqoopOptions options,
return true;
}
FileSystem fs = FileSystem.get(options.getConf());
SqoopOptions.IncrementalMode incrementalMode = options.getIncrementalMode();
String nextIncrementalValue = null;
@ -325,11 +324,14 @@ private boolean initIncrementalConstraints(SqoopOptions options,
}
break;
case DateLastModified:
if (options.getMergeKeyCol() == null && !options.isAppendMode()
&& fs.exists(getOutputPath(options, context.getTableName(), false))) {
throw new ImportException("--" + MERGE_KEY_ARG + " or " + "--" + APPEND_ARG
+ " is required when using --" + this.INCREMENT_TYPE_ARG
+ " lastmodified and the output directory exists.");
if (options.getMergeKeyCol() == null && !options.isAppendMode()) {
Path outputPath = getOutputPath(options, context.getTableName(), false);
FileSystem fs = outputPath.getFileSystem(options.getConf());
if (fs.exists(outputPath)) {
throw new ImportException("--" + MERGE_KEY_ARG + " or " + "--" + APPEND_ARG
+ " is required when using --" + this.INCREMENT_TYPE_ARG
+ " lastmodified and the output directory exists.");
}
}
checkColumnType = manager.getColumnTypes(options.getTableName(),
options.getSqlQuery()).get(options.getIncrementalTestColumn());
@ -436,10 +438,14 @@ private boolean initIncrementalConstraints(SqoopOptions options,
* Merge HDFS output directories
*/
protected void lastModifiedMerge(SqoopOptions options, ImportJobContext context) throws IOException {
FileSystem fs = FileSystem.get(options.getConf());
if (context.getDestination() != null && fs.exists(context.getDestination())) {
if (context.getDestination() == null) {
return;
}
Path userDestDir = getOutputPath(options, context.getTableName(), false);
FileSystem fs = userDestDir.getFileSystem(options.getConf());
if (fs.exists(context.getDestination())) {
LOG.info("Final destination exists, will run merge job.");
Path userDestDir = getOutputPath(options, context.getTableName(), false);
if (fs.exists(userDestDir)) {
String tableClassName = null;
if (!context.getConnManager().isORMFacilitySelfManaged()) {
@ -541,8 +547,8 @@ protected boolean importTable(SqoopOptions options, String tableName,
private void deleteTargetDir(ImportJobContext context) throws IOException {
SqoopOptions options = context.getOptions();
FileSystem fs = FileSystem.get(options.getConf());
Path destDir = context.getDestination();
FileSystem fs = destDir.getFileSystem(options.getConf());
if (fs.exists(destDir)) {
fs.delete(destDir, true);

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.util;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
public final class FileSystemUtil {
private FileSystemUtil() {
}
/**
* Creates a fully-qualified path object.
* @param path the path to fully-qualify with its file system URI.
* @param conf the current Hadoop configuration.
* @return a new path representing the same location as the input path,
* but with a fully-qualified URI. Returns {@code null} if provided path is {@code null};
*/
public static Path makeQualified(Path path, Configuration conf)
throws IOException {
if (null == path) {
return null;
}
return path.getFileSystem(conf).makeQualified(path);
}
}

View File

@ -18,16 +18,11 @@
package org.apache.sqoop.util;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -40,15 +35,14 @@ private FileUploader() { }
public static void uploadFilesToDFS(String srcBasePath, String src,
String destBasePath, String dest, Configuration conf) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path targetPath = null;
Path srcPath = new Path(srcBasePath, src);
if (destBasePath == null || destBasePath.length() == 0) {
if (destBasePath == null || destBasePath.isEmpty()) {
destBasePath = ".";
}
targetPath = new Path(destBasePath, dest);
Path targetPath = new Path(destBasePath, dest);
FileSystem fs = targetPath.getFileSystem(conf);
if (!fs.exists(targetPath)) {
fs.mkdirs(targetPath);

View File

@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.util;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.junit.Test;
import org.junit.Before;
import static org.junit.Assert.*;
public class TestFileSystemUtil {
private Configuration conf;
@Before
public void setUp() {
conf = new Configuration();
conf.set("fs.my.impl", MyFileSystem.class.getTypeName());
}
@Test
public void testMakeQualifiedWhenPathIsNullThenReturnsNull() throws IOException {
assertNull(FileSystemUtil.makeQualified(null, conf));
}
@Test
public void testMakeQualifiedWhenPathIsRelativeThenReturnDefault() throws IOException {
Path actual = FileSystemUtil.makeQualified(new Path("foo/bar"), conf);
assertEquals("file", actual.toUri().getScheme());
}
@Test
public void testMakeQualifiedWhenPathHasCustomSchemaThenReturnSameSchema() throws IOException {
Path actual = FileSystemUtil.makeQualified(new Path("my:/foo/bar"), conf);
assertEquals("my", actual.toUri().getScheme());
}
@Test(expected = IOException.class)
public void testMakeQualifiedWhenPathHasBadSchemaThenThrowsIOException() throws IOException {
FileSystemUtil.makeQualified(new Path("nosuchfs://foo/bar"), conf);
}
public static final class MyFileSystem extends RawLocalFileSystem {
@Override
public URI getUri() { return URI.create("my:///"); }
}
}