5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-02 07:10:32 +08:00

SQOOP-3353: Sqoop should not check incremental constraints for HBase imports

(Szabolcs Vasas via Boglarka Egyed)
This commit is contained in:
Boglarka Egyed 2018-07-27 10:56:47 +02:00
parent eefb7a0f3e
commit a06f2f3a83
3 changed files with 84 additions and 9 deletions

View File

@ -329,7 +329,7 @@ private boolean initIncrementalConstraints(SqoopOptions options,
}
break;
case DateLastModified:
if (options.getMergeKeyCol() == null && !options.isAppendMode()) {
if (shouldCheckExistingOutputDirectory(options)) {
Path outputPath = getOutputPath(options, context.getTableName(), false);
FileSystem fs = outputPath.getFileSystem(options.getConf());
if (fs.exists(outputPath)) {
@ -1189,6 +1189,14 @@ public void validateOptions(SqoopOptions options)
validateAccumuloOptions(options);
}
boolean shouldCheckExistingOutputDirectory(SqoopOptions options) {
return options.getMergeKeyCol() == null && !options.isAppendMode() && !isHBaseImport(options);
}
private boolean isHBaseImport(SqoopOptions options) {
return options.getHBaseTable() != null;
}
private boolean isHiveImportNeeded(SqoopOptions options) {
if (!options.doHiveImport()) {
return false;

View File

@ -31,6 +31,7 @@
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.util.ToolRunner;
import org.apache.sqoop.metastore.SavedJobsTestBase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -275,20 +276,35 @@ private void createIdVarcharTable(String tableName,
}
}
private Path getTablePath(String tableName) {
Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
return new Path(warehouse, tableName);
}
private FileSystem getLocalFileSystem() throws IOException {
return FileSystem.getLocal(new Configuration());
}
/**
* Delete all files in a directory for a table.
*/
public void clearDir(String tableName) {
try {
FileSystem fs = FileSystem.getLocal(new Configuration());
Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
Path tableDir = new Path(warehouse, tableName);
FileSystem fs = getLocalFileSystem();
Path tableDir = getTablePath(tableName);
fs.delete(tableDir, true);
} catch (Exception e) {
fail("Got unexpected exception: " + StringUtils.stringifyException(e));
}
}
public void createDir(String tableName) throws IOException {
FileSystem fs = FileSystem.getLocal(new Configuration());
Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
Path tableDir = new Path(warehouse, tableName);
fs.mkdirs(tableDir);
}
/**
* Look at a directory that should contain files full of an imported 'id'
* column. Assert that all numbers in [0, expectedNums) are present
@ -839,6 +855,25 @@ public void testFullLastModifiedImport() throws Exception {
assertDirOfNumbers(TABLE_NAME, 10);
}
@Test
public void testLastModifiedImportWithExistingOutputDirectoryFails() throws Exception {
final String TABLE_NAME = "failWithExistingOutputDirectory";
createDir(TABLE_NAME);
Timestamp thePast = new Timestamp(System.currentTimeMillis() - 100);
createTimestampTable(TABLE_NAME, 10, thePast);
List<String> args = getArgListForTable(TABLE_NAME, true, false);
SqoopOptions options = new SqoopOptions(newConf());
options.setThrowOnError(true);
thrown.expectMessage("--merge-key or --append is required when using --incremental lastmodified and the output directory exists.");
Sqoop sqoop = new Sqoop(new ImportTool(), options.getConf(), options);
ToolRunner.run(sqoop.getConf(), sqoop, args.toArray(new String[0]));
}
@Test
public void testNoImportFromTheFuture() throws Exception {
// If last-modified dates for writes are serialized to be in the

View File

@ -21,7 +21,9 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
@ -38,6 +40,7 @@
import org.apache.sqoop.hive.HiveClientFactory;
import org.apache.sqoop.util.ExpectedLogMessage;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.theories.DataPoints;
@ -59,6 +62,23 @@ public class TestImportTool {
@Rule
public ExpectedLogMessage logMessage = new ExpectedLogMessage();
private ImportTool importTool;
private ImportTool importToolSpy;
private CodeGenTool codeGenTool;
private HiveClientFactory hiveClientFactory;
@Before
public void before() {
codeGenTool = mock(CodeGenTool.class);
hiveClientFactory = mock(HiveClientFactory.class);
importTool = new ImportTool("import", codeGenTool, false, hiveClientFactory);
importToolSpy = spy(importTool);
}
@Theory
public void esnureTransactionIsolationLevelsAreMappedToTheRightValues(Object[] values)
throws Exception {
@ -74,9 +94,7 @@ public void testImportToolHandlesAvroSchemaMismatchExceptionProperly() throws Ex
final String actualSchemaString = "actualSchema";
final String errorMessage = "Import failed";
ImportTool importTool = spy(new ImportTool("import", mock(CodeGenTool.class), false, mock(HiveClientFactory.class)));
doReturn(true).when(importTool).init(any(SqoopOptions.class));
doReturn(true).when(importToolSpy).init(any(SqoopOptions.class));
Schema writtenWithSchema = mock(Schema.class);
when(writtenWithSchema.toString()).thenReturn(writtenWithSchemaString);
@ -84,13 +102,13 @@ public void testImportToolHandlesAvroSchemaMismatchExceptionProperly() throws Ex
when(actualSchema.toString()).thenReturn(actualSchemaString);
AvroSchemaMismatchException expectedException = new AvroSchemaMismatchException(errorMessage, writtenWithSchema, actualSchema);
doThrow(expectedException).when(importTool).importTable(any(SqoopOptions.class));
doThrow(expectedException).when(importToolSpy).importTable(any(SqoopOptions.class));
SqoopOptions sqoopOptions = mock(SqoopOptions.class);
when(sqoopOptions.doHiveImport()).thenReturn(true);
logMessage.expectError(expectedException.getMessage());
int result = importTool.run(sqoopOptions);
int result = importToolSpy.run(sqoopOptions);
assertEquals(1, result);
}
@ -106,4 +124,18 @@ public void testExternalTableNoHiveImportThrowsException() throws InvalidOptions
Assert.fail("testExternalTableNoHiveImportThrowsException unit test failed!");
}
@Test
public void testShouldCheckExistingOutputDirectoryReturnsFalseForHBaseImport() {
SqoopOptions sqoopOptions = mock(SqoopOptions.class);
when(sqoopOptions.getHBaseTable()).thenReturn("hbasetable");
assertFalse(importTool.shouldCheckExistingOutputDirectory(sqoopOptions));
}
@Test
public void testShouldCheckExistingOutputDirectoryReturnsTrueForNonHBaseImport() {
SqoopOptions sqoopOptions = mock(SqoopOptions.class);
assertTrue(importTool.shouldCheckExistingOutputDirectory(sqoopOptions));
}
}