5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-04 02:52:19 +08:00

SQOOP-2941: Sqoop2: Provide option to re-create target directory if it exists in HDFS connector

(Boglarka Egyed via Abraham Fine)
This commit is contained in:
Abraham Fine 2016-12-06 11:30:09 -08:00
parent d9412c2b34
commit 42e7443d56
5 changed files with 123 additions and 77 deletions

View File

@ -54,6 +54,8 @@ public void initialize(final InitializerContext context, final LinkConfiguration
HdfsUtils.configurationToContext(configuration, context.getContext());
final boolean appendMode = Boolean.TRUE.equals(jobConfig.toJobConfig.appendMode);
final boolean deleteOutputDirectory = Boolean.TRUE.equals(jobConfig.toJobConfig.deleteOutputDirectory);
// Verification that given HDFS directory either don't exists or is empty
try {
@ -70,10 +72,14 @@ public Void run() throws Exception {
if (fs.isDirectory(path) && !appendMode) {
FileStatus[] fileStatuses = fs.listStatus(path);
if (fileStatuses.length != 0) {
if (deleteOutputDirectory) {
fs.delete(path, true);
} else {
throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007, "Output directory is not empty");
}
}
}
}
// Generate delegation tokens if we are on secured cluster
SecurityUtils.generateDelegationTokens(context.getContext(), path, configuration);

View File

@ -43,6 +43,8 @@ public class ToJobConfig {
@Input public Boolean appendMode;
@Input public Boolean deleteOutputDirectory;
public static class ToJobConfigValidator extends AbstractValidator<ToJobConfig> {
@Override
public void validate(ToJobConfig conf) {

View File

@ -59,6 +59,11 @@ toJobConfig.appendMode.example = true
toJobConfig.appendMode.help = If set to false, job will fail if output directory already exists. If set to true \
then imported data will be stored to already existing and possibly non empty directory.
toJobConfig.deleteOutputDirectory.label = Delete output directory
toJobConfig.deleteOutputDirectory.example = true
toJobConfig.deleteOutputDirectory.help = If set to false, job will fail if output directory already exists. If set to true \
then existing output directory will be deleted before job execution.
toJobConfig.overrideNullValue.label = Override null value
toJobConfig.overrideNullValue.example = true
toJobConfig.overrideNullValue.help = If set to true, then the null value will be overridden with the value set in \

View File

@ -24,6 +24,7 @@
import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
import org.apache.sqoop.job.etl.Initializer;
import org.apache.sqoop.job.etl.InitializerContext;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.io.File;
@ -36,19 +37,28 @@
*/
public class TestToInitializer extends TestHdfsBase {
private LinkConfiguration linkConfig;
private ToJobConfiguration jobConfig;
private InitializerContext initializerContext;
private Initializer initializer;
@BeforeMethod
public void setup() {
linkConfig = new LinkConfiguration();
jobConfig = new ToJobConfiguration();
linkConfig.linkConfig.uri = "file:///";
initializerContext = new InitializerContext(new MutableMapContext(), "test_user");
initializer= new HdfsToInitializer();
}
@Test
public void testWorkDirectoryBeingSet() {
final String TARGET_DIR = "/target/directory";
LinkConfiguration linkConfig = new LinkConfiguration();
ToJobConfiguration jobConfig = new ToJobConfiguration();
linkConfig.linkConfig.uri = "file:///";
jobConfig.toJobConfig.outputDirectory = TARGET_DIR;
InitializerContext initializerContext = new InitializerContext(new MutableMapContext(), "test_user");
Initializer initializer = new HdfsToInitializer();
initializer.initialize(initializerContext, linkConfig, jobConfig);
assertNotNull(initializerContext.getString(HdfsConstants.WORK_DIRECTORY));
@ -60,50 +70,69 @@ public void testOutputDirectoryIsAFile() throws Exception {
File file = File.createTempFile("MastersOfOrion", ".txt");
file.createNewFile();
LinkConfiguration linkConfig = new LinkConfiguration();
ToJobConfiguration jobConfig = new ToJobConfiguration();
linkConfig.linkConfig.uri = "file:///";
jobConfig.toJobConfig.outputDirectory = file.getAbsolutePath();
InitializerContext initializerContext = new InitializerContext(new MutableMapContext(), "test_user");
Initializer initializer = new HdfsToInitializer();
initializer.initialize(initializerContext, linkConfig, jobConfig);
}
@Test(expectedExceptions = SqoopException.class)
public void testOutputDirectoryIsNotEmpty() throws Exception {
public void testOutputDirectoryIsNotEmptyWithoutDeleteOption() throws Exception {
File dir = Files.createTempDir();
File file = File.createTempFile("MastersOfOrion", ".txt", dir);
LinkConfiguration linkConfig = new LinkConfiguration();
ToJobConfiguration jobConfig = new ToJobConfiguration();
linkConfig.linkConfig.uri = "file:///";
jobConfig.toJobConfig.outputDirectory = dir.getAbsolutePath();
InitializerContext initializerContext = new InitializerContext(new MutableMapContext(), "test_user");
Initializer initializer = new HdfsToInitializer();
initializer.initialize(initializerContext, linkConfig, jobConfig);
}
@Test
public void testOutputDirectoryIsNotEmptyWithDeleteOption() throws Exception {
File dir = Files.createTempDir();
File.createTempFile("MastersOfOrion", ".txt", dir);
jobConfig.toJobConfig.outputDirectory = dir.getAbsolutePath();
jobConfig.toJobConfig.deleteOutputDirectory = true;
initializer.initialize(initializerContext, linkConfig, jobConfig);
assertNotNull(initializerContext.getString(HdfsConstants.WORK_DIRECTORY));
assertTrue(initializerContext.getString(HdfsConstants.WORK_DIRECTORY).startsWith(dir.getAbsolutePath() + "/."));
}
@Test
public void testOutputDirectoryIsNotEmptyWithIncremental() throws Exception {
File dir = Files.createTempDir();
File file = File.createTempFile("MastersOfOrion", ".txt", dir);
File.createTempFile("MastersOfOrion", ".txt", dir);
LinkConfiguration linkConfig = new LinkConfiguration();
ToJobConfiguration jobConfig = new ToJobConfiguration();
linkConfig.linkConfig.uri = "file:///";
jobConfig.toJobConfig.outputDirectory = dir.getAbsolutePath();
jobConfig.toJobConfig.appendMode = true;
InitializerContext initializerContext = new InitializerContext(new MutableMapContext(), "test_user");
initializer.initialize(initializerContext, linkConfig, jobConfig);
assertNotNull(initializerContext.getString(HdfsConstants.WORK_DIRECTORY));
assertTrue(initializerContext.getString(HdfsConstants.WORK_DIRECTORY).startsWith(dir.getAbsolutePath()));
}
@Test(expectedExceptions = SqoopException.class)
public void testOutputDirectoryIsNotEmptyWithoutIncrementalWithoutDeleteOption() throws Exception {
File dir = Files.createTempDir();
File.createTempFile("MastersOfOrion", ".txt", dir);
jobConfig.toJobConfig.outputDirectory = dir.getAbsolutePath();
jobConfig.toJobConfig.appendMode = false;
initializer.initialize(initializerContext, linkConfig, jobConfig);
}
@Test
public void testOutputDirectoryIsNotEmptyWithoutIncrementalWithDeleteOption() throws Exception {
File dir = Files.createTempDir();
File.createTempFile("MastersOfOrion", ".txt", dir);
jobConfig.toJobConfig.outputDirectory = dir.getAbsolutePath();
jobConfig.toJobConfig.appendMode = false;
jobConfig.toJobConfig.deleteOutputDirectory = true;
Initializer initializer = new HdfsToInitializer();
initializer.initialize(initializerContext, linkConfig, jobConfig);
assertNotNull(initializerContext.getString(HdfsConstants.WORK_DIRECTORY));

View File

@ -30,6 +30,9 @@
import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider;
import org.apache.sqoop.test.utils.HdfsUtils;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
@ -39,12 +42,12 @@
@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class})
public class OutputDirectoryTest extends SqoopTestCase {
@Test
public void testOutputDirectoryIsAFile() throws Exception {
createAndLoadTableCities();
hdfsClient.delete(new Path(getMapreduceDirectory()), true);
hdfsClient.createNewFile(new Path(getMapreduceDirectory()));
private MJob job;
@BeforeMethod
public void setup() {
createAndLoadTableCities();
// RDBMS link
MLink rdbmsConnection = getClient().createLink("generic-jdbc-connector");
@ -57,10 +60,22 @@ public void testOutputDirectoryIsAFile() throws Exception {
saveLink(hdfsConnection);
// Job creation
MJob job = getClient().createJob(rdbmsConnection.getName(), hdfsConnection.getName());
job = getClient().createJob(rdbmsConnection.getName(), hdfsConnection.getName());
// Set rdbms "FROM" config
fillRdbmsFromConfig(job, "id");
}
@AfterMethod
public void cleanUp() {
dropTable();
}
@Test
public void testOutputDirectoryIsAFile() throws Exception {
hdfsClient.delete(new Path(getMapreduceDirectory()), true);
hdfsClient.createNewFile(new Path(getMapreduceDirectory()));
// fill the hdfs "TO" config
fillHdfsToConfig(job, ToFormat.TEXT_FILE);
@ -71,33 +86,14 @@ public void testOutputDirectoryIsAFile() throws Exception {
HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007.toString(),
"is a file"
);
dropTable();
}
@Test
public void testOutputDirectoryIsNotEmpty() throws Exception {
createAndLoadTableCities();
public void testOutputDirectoryIsNotEmptyWithoutDeleteOption() throws Exception {
hdfsClient.mkdirs(new Path(getMapreduceDirectory()));
hdfsClient.createNewFile(new Path(getMapreduceDirectory() + "/x"));
// RDBMS link
MLink rdbmsConnection = getClient().createLink("generic-jdbc-connector");
fillRdbmsLinkConfig(rdbmsConnection);
saveLink(rdbmsConnection);
// HDFS link
MLink hdfsConnection = getClient().createLink("hdfs-connector");
fillHdfsLink(hdfsConnection);
saveLink(hdfsConnection);
// Job creation
MJob job = getClient().createJob(rdbmsConnection.getName(), hdfsConnection.getName());
// Set rdbms "FROM" config
fillRdbmsFromConfig(job, "id");
// fill the hdfs "TO" config
fillHdfsToConfig(job, ToFormat.TEXT_FILE);
@ -107,32 +103,42 @@ public void testOutputDirectoryIsNotEmpty() throws Exception {
HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007.toString(),
"is not empty"
);
}
dropTable();
@Test
public void testOutputDirectoryIsNotEmptyWithDeleteOption() throws Exception {
hdfsClient.mkdirs(new Path(getMapreduceDirectory()));
hdfsClient.createNewFile(new Path(getMapreduceDirectory() + "/x"));
// fill the hdfs "TO" config
fillHdfsToConfig(job, ToFormat.TEXT_FILE);
job.getToJobConfig().getStringInput("toJobConfig.outputDirectory")
.setValue(getMapreduceDirectory());
fillHdfsToConfig(job, ToFormat.TEXT_FILE);
job.getToJobConfig().getBooleanInput("toJobConfig.deleteOutputDirectory").setValue(true);
saveJob(job);
executeJob(job);
// Assert correct output
assertTo(
"1,'USA','2004-10-23 00:00:00.000','San Francisco'",
"2,'USA','2004-10-24 00:00:00.000','Sunnyvale'",
"3,'Czech Republic','2004-10-25 00:00:00.000','Brno'",
"4,'USA','2004-10-26 00:00:00.000','Palo Alto'",
"5,'USA','2004-10-27 00:00:00.000','Martha\\'s Vineyard'"
);
}
@Test
public void testOutputDirectoryIsEmpty() throws Exception {
createAndLoadTableCities();
hdfsClient.mkdirs(new Path(getMapreduceDirectory()));
// RDBMS link
MLink rdbmsConnection = getClient().createLink("generic-jdbc-connector");
fillRdbmsLinkConfig(rdbmsConnection);
saveLink(rdbmsConnection);
// HDFS link
MLink hdfsConnection = getClient().createLink("hdfs-connector");
fillHdfsLink(hdfsConnection);
saveLink(hdfsConnection);
// Job creation
MJob job = getClient().createJob(rdbmsConnection.getName(), hdfsConnection.getName());
// Set rdbms "FROM" config
fillRdbmsFromConfig(job, "id");
// fill the hdfs "TO" config
fillHdfsToConfig(job, ToFormat.TEXT_FILE);
@ -148,12 +154,10 @@ public void testOutputDirectoryIsEmpty() throws Exception {
"4,'USA','2004-10-26 00:00:00.000','Palo Alto'",
"5,'USA','2004-10-27 00:00:00.000','Martha\\'s Vineyard'"
);
dropTable();
}
public void assertJobSubmissionFailure(MJob job, String ...fragments) throws Exception {
// Try to execute the job and verify that the it was not successful
// Try to execute the job and verify that it was not successful
try {
executeJob(job);
fail("Expected failure in the job submission.");