diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java index 70e0fde4..256faf7d 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java @@ -54,6 +54,8 @@ public void initialize(final InitializerContext context, final LinkConfiguration HdfsUtils.configurationToContext(configuration, context.getContext()); final boolean appendMode = Boolean.TRUE.equals(jobConfig.toJobConfig.appendMode); + final boolean deleteOutputDirectory = Boolean.TRUE.equals(jobConfig.toJobConfig.deleteOutputDirectory); + // Verification that given HDFS directory either don't exists or is empty try { @@ -70,7 +72,11 @@ public Void run() throws Exception { if (fs.isDirectory(path) && !appendMode) { FileStatus[] fileStatuses = fs.listStatus(path); if (fileStatuses.length != 0) { - throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007, "Output directory is not empty"); + if (deleteOutputDirectory) { + fs.delete(path, true); + } else { + throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007, "Output directory is not empty"); + } } } } diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfig.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfig.java index d76ba5fe..dc4b285b 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfig.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfig.java @@ -43,6 +43,8 @@ public class ToJobConfig { @Input public Boolean appendMode; + @Input public Boolean deleteOutputDirectory; + public static class ToJobConfigValidator extends AbstractValidator { @Override public void validate(ToJobConfig conf) { diff --git a/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties b/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties index 29efcedb..3b24a652 100644 --- a/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties +++ b/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties @@ -59,6 +59,11 @@ toJobConfig.appendMode.example = true toJobConfig.appendMode.help = If set to false, job will fail if output directory already exists. If set to true \ then imported data will be stored to already existing and possibly non empty directory. +toJobConfig.deleteOutputDirectory.label = Delete output directory +toJobConfig.deleteOutputDirectory.example = true +toJobConfig.deleteOutputDirectory.help = If set to false, job will fail if output directory already exists. If set to true \ + then existing output directory will be deleted before job execution. + toJobConfig.overrideNullValue.label = Override null value toJobConfig.overrideNullValue.example = true toJobConfig.overrideNullValue.help = If set to true, then the null value will be overridden with the value set in \ diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java index 5441702f..673f4474 100644 --- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java +++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java @@ -24,6 +24,7 @@ import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration; import org.apache.sqoop.job.etl.Initializer; import org.apache.sqoop.job.etl.InitializerContext; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import java.io.File; @@ -36,19 +37,28 @@ */ public class TestToInitializer extends TestHdfsBase { + private LinkConfiguration linkConfig; + private ToJobConfiguration jobConfig; + private InitializerContext initializerContext; + private Initializer initializer; + + @BeforeMethod + public void setup() { + linkConfig = new LinkConfiguration(); + jobConfig = new ToJobConfiguration(); + + linkConfig.linkConfig.uri = "file:///"; + + initializerContext = new InitializerContext(new MutableMapContext(), "test_user"); + initializer= new HdfsToInitializer(); + } + @Test public void testWorkDirectoryBeingSet() { final String TARGET_DIR = "/target/directory"; - LinkConfiguration linkConfig = new LinkConfiguration(); - ToJobConfiguration jobConfig = new ToJobConfiguration(); - - linkConfig.linkConfig.uri = "file:///"; jobConfig.toJobConfig.outputDirectory = TARGET_DIR; - InitializerContext initializerContext = new InitializerContext(new MutableMapContext(), "test_user"); - - Initializer initializer = new HdfsToInitializer(); initializer.initialize(initializerContext, linkConfig, jobConfig); assertNotNull(initializerContext.getString(HdfsConstants.WORK_DIRECTORY)); @@ -60,50 +70,69 @@ public void testOutputDirectoryIsAFile() throws Exception { File file = File.createTempFile("MastersOfOrion", ".txt"); file.createNewFile(); - LinkConfiguration linkConfig = new LinkConfiguration(); - ToJobConfiguration jobConfig = new ToJobConfiguration(); - - linkConfig.linkConfig.uri = "file:///"; jobConfig.toJobConfig.outputDirectory = file.getAbsolutePath(); - InitializerContext initializerContext = new InitializerContext(new MutableMapContext(), "test_user"); - - Initializer initializer = new HdfsToInitializer(); initializer.initialize(initializerContext, linkConfig, jobConfig); } @Test(expectedExceptions = SqoopException.class) - public void testOutputDirectoryIsNotEmpty() throws Exception { + public void testOutputDirectoryIsNotEmptyWithoutDeleteOption() throws Exception { File dir = Files.createTempDir(); File file = File.createTempFile("MastersOfOrion", ".txt", dir); - LinkConfiguration linkConfig = new LinkConfiguration(); - ToJobConfiguration jobConfig = new ToJobConfiguration(); - - linkConfig.linkConfig.uri = "file:///"; jobConfig.toJobConfig.outputDirectory = dir.getAbsolutePath(); - InitializerContext initializerContext = new InitializerContext(new MutableMapContext(), "test_user"); - - Initializer initializer = new HdfsToInitializer(); initializer.initialize(initializerContext, linkConfig, jobConfig); } + @Test + public void testOutputDirectoryIsNotEmptyWithDeleteOption() throws Exception { + File dir = Files.createTempDir(); + File.createTempFile("MastersOfOrion", ".txt", dir); + + jobConfig.toJobConfig.outputDirectory = dir.getAbsolutePath(); + jobConfig.toJobConfig.deleteOutputDirectory = true; + + initializer.initialize(initializerContext, linkConfig, jobConfig); + + assertNotNull(initializerContext.getString(HdfsConstants.WORK_DIRECTORY)); + assertTrue(initializerContext.getString(HdfsConstants.WORK_DIRECTORY).startsWith(dir.getAbsolutePath() + "/.")); + } + @Test public void testOutputDirectoryIsNotEmptyWithIncremental() throws Exception { File dir = Files.createTempDir(); - File file = File.createTempFile("MastersOfOrion", ".txt", dir); + File.createTempFile("MastersOfOrion", ".txt", dir); - LinkConfiguration linkConfig = new LinkConfiguration(); - ToJobConfiguration jobConfig = new ToJobConfiguration(); - - linkConfig.linkConfig.uri = "file:///"; jobConfig.toJobConfig.outputDirectory = dir.getAbsolutePath(); jobConfig.toJobConfig.appendMode = true; - InitializerContext initializerContext = new InitializerContext(new MutableMapContext(), "test_user"); + initializer.initialize(initializerContext, linkConfig, jobConfig); + + assertNotNull(initializerContext.getString(HdfsConstants.WORK_DIRECTORY)); + assertTrue(initializerContext.getString(HdfsConstants.WORK_DIRECTORY).startsWith(dir.getAbsolutePath())); + } + + @Test(expectedExceptions = SqoopException.class) + public void testOutputDirectoryIsNotEmptyWithoutIncrementalWithoutDeleteOption() throws Exception { + File dir = Files.createTempDir(); + File.createTempFile("MastersOfOrion", ".txt", dir); + + jobConfig.toJobConfig.outputDirectory = dir.getAbsolutePath(); + jobConfig.toJobConfig.appendMode = false; + + initializer.initialize(initializerContext, linkConfig, jobConfig); + } + + @Test + public void testOutputDirectoryIsNotEmptyWithoutIncrementalWithDeleteOption() throws Exception { + File dir = Files.createTempDir(); + File.createTempFile("MastersOfOrion", ".txt", dir); + + jobConfig.toJobConfig.outputDirectory = dir.getAbsolutePath(); + jobConfig.toJobConfig.appendMode = false; + jobConfig.toJobConfig.deleteOutputDirectory = true; - Initializer initializer = new HdfsToInitializer(); initializer.initialize(initializerContext, linkConfig, jobConfig); assertNotNull(initializerContext.getString(HdfsConstants.WORK_DIRECTORY)); diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/OutputDirectoryTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/OutputDirectoryTest.java index 722c126a..90b0d159 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/OutputDirectoryTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/OutputDirectoryTest.java @@ -30,6 +30,9 @@ import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider; import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider; import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider; +import org.apache.sqoop.test.utils.HdfsUtils; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; @@ -39,12 +42,12 @@ @Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class}) public class OutputDirectoryTest extends SqoopTestCase { - @Test - public void testOutputDirectoryIsAFile() throws Exception { - createAndLoadTableCities(); - hdfsClient.delete(new Path(getMapreduceDirectory()), true); - hdfsClient.createNewFile(new Path(getMapreduceDirectory())); + private MJob job; + + @BeforeMethod + public void setup() { + createAndLoadTableCities(); // RDBMS link MLink rdbmsConnection = getClient().createLink("generic-jdbc-connector"); @@ -57,10 +60,22 @@ public void testOutputDirectoryIsAFile() throws Exception { saveLink(hdfsConnection); // Job creation - MJob job = getClient().createJob(rdbmsConnection.getName(), hdfsConnection.getName()); + job = getClient().createJob(rdbmsConnection.getName(), hdfsConnection.getName()); // Set rdbms "FROM" config fillRdbmsFromConfig(job, "id"); + } + + @AfterMethod + public void cleanUp() { + dropTable(); + } + + @Test + public void testOutputDirectoryIsAFile() throws Exception { + + hdfsClient.delete(new Path(getMapreduceDirectory()), true); + hdfsClient.createNewFile(new Path(getMapreduceDirectory())); // fill the hdfs "TO" config fillHdfsToConfig(job, ToFormat.TEXT_FILE); @@ -71,33 +86,14 @@ public void testOutputDirectoryIsAFile() throws Exception { HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007.toString(), "is a file" ); - - dropTable(); } @Test - public void testOutputDirectoryIsNotEmpty() throws Exception { - createAndLoadTableCities(); + public void testOutputDirectoryIsNotEmptyWithoutDeleteOption() throws Exception { hdfsClient.mkdirs(new Path(getMapreduceDirectory())); hdfsClient.createNewFile(new Path(getMapreduceDirectory() + "/x")); - // RDBMS link - MLink rdbmsConnection = getClient().createLink("generic-jdbc-connector"); - fillRdbmsLinkConfig(rdbmsConnection); - saveLink(rdbmsConnection); - - // HDFS link - MLink hdfsConnection = getClient().createLink("hdfs-connector"); - fillHdfsLink(hdfsConnection); - saveLink(hdfsConnection); - - // Job creation - MJob job = getClient().createJob(rdbmsConnection.getName(), hdfsConnection.getName()); - - // Set rdbms "FROM" config - fillRdbmsFromConfig(job, "id"); - // fill the hdfs "TO" config fillHdfsToConfig(job, ToFormat.TEXT_FILE); @@ -107,32 +103,42 @@ public void testOutputDirectoryIsNotEmpty() throws Exception { HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007.toString(), "is not empty" ); + } - dropTable(); + @Test + public void testOutputDirectoryIsNotEmptyWithDeleteOption() throws Exception { + + hdfsClient.mkdirs(new Path(getMapreduceDirectory())); + hdfsClient.createNewFile(new Path(getMapreduceDirectory() + "/x")); + + // fill the hdfs "TO" config + fillHdfsToConfig(job, ToFormat.TEXT_FILE); + job.getToJobConfig().getStringInput("toJobConfig.outputDirectory") + .setValue(getMapreduceDirectory()); + + fillHdfsToConfig(job, ToFormat.TEXT_FILE); + job.getToJobConfig().getBooleanInput("toJobConfig.deleteOutputDirectory").setValue(true); + + + saveJob(job); + + executeJob(job); + + // Assert correct output + assertTo( + "1,'USA','2004-10-23 00:00:00.000','San Francisco'", + "2,'USA','2004-10-24 00:00:00.000','Sunnyvale'", + "3,'Czech Republic','2004-10-25 00:00:00.000','Brno'", + "4,'USA','2004-10-26 00:00:00.000','Palo Alto'", + "5,'USA','2004-10-27 00:00:00.000','Martha\\'s Vineyard'" + ); } @Test public void testOutputDirectoryIsEmpty() throws Exception { - createAndLoadTableCities(); hdfsClient.mkdirs(new Path(getMapreduceDirectory())); - // RDBMS link - MLink rdbmsConnection = getClient().createLink("generic-jdbc-connector"); - fillRdbmsLinkConfig(rdbmsConnection); - saveLink(rdbmsConnection); - - // HDFS link - MLink hdfsConnection = getClient().createLink("hdfs-connector"); - fillHdfsLink(hdfsConnection); - saveLink(hdfsConnection); - - // Job creation - MJob job = getClient().createJob(rdbmsConnection.getName(), hdfsConnection.getName()); - - // Set rdbms "FROM" config - fillRdbmsFromConfig(job, "id"); - // fill the hdfs "TO" config fillHdfsToConfig(job, ToFormat.TEXT_FILE); @@ -148,12 +154,10 @@ public void testOutputDirectoryIsEmpty() throws Exception { "4,'USA','2004-10-26 00:00:00.000','Palo Alto'", "5,'USA','2004-10-27 00:00:00.000','Martha\\'s Vineyard'" ); - - dropTable(); } public void assertJobSubmissionFailure(MJob job, String ...fragments) throws Exception { - // Try to execute the job and verify that the it was not successful + // Try to execute the job and verify that it was not successful try { executeJob(job); fail("Expected failure in the job submission.");