5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-04 03:11:00 +08:00

SQOOP-2339: Move sub-directory might fail in append mode

(Qian Xu via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2015-04-29 13:36:34 -07:00
parent d54dddf0b8
commit fa4e903650
3 changed files with 68 additions and 3 deletions

View File

@ -177,7 +177,7 @@ private void moveFiles(FileSystem fs, Path sourceDir, Path targetDir,
if (fileStatus.isDir()) { // move all subdirectories
// pass target dir as initial dest to prevent nesting inside preexisting dir
if (fs.rename(fileStatus.getPath(), targetDir)) {
if (!fs.exists(targetDir) && fs.rename(fileStatus.getPath(), targetDir)) {
LOG.debug("Directory: " + sourceFilename + " renamed to: " + sourceFilename);
} else {
int dirNumber = 0;

View File

@ -112,7 +112,7 @@ private void runParquetImportTest(String codec) throws IOException {
DatasetReader<GenericRecord> reader = getReader();
try {
GenericRecord record1 = reader.next();
//assertNull(record1);
assertNotNull(record1);
assertEquals("DATA_COL0", true, record1.get("DATA_COL0"));
assertEquals("DATA_COL1", 100, record1.get("DATA_COL1"));
assertEquals("DATA_COL2", 200L, record1.get("DATA_COL2"));
@ -124,6 +124,7 @@ private void runParquetImportTest(String codec) throws IOException {
ByteBuffer b = ((ByteBuffer) object);
assertEquals((byte) 1, b.get(0));
assertEquals((byte) 2, b.get(1));
assertFalse(reader.hasNext());
} finally {
reader.close();
}
@ -145,8 +146,10 @@ public void testOverrideTypeMapping() throws IOException {
DatasetReader<GenericRecord> reader = getReader();
try {
assertTrue(reader.hasNext());
GenericRecord record1 = reader.next();
assertEquals("DATA_COL0", "10", record1.get("DATA_COL0"));
assertFalse(reader.hasNext());
} finally {
reader.close();
}
@ -168,8 +171,10 @@ public void testFirstUnderscoreInColumnName() throws IOException {
DatasetReader<GenericRecord> reader = getReader();
try {
assertTrue(reader.hasNext());
GenericRecord record1 = reader.next();
assertEquals("__NAME", 1987, record1.get("__NAME"));
assertFalse(reader.hasNext());
} finally {
reader.close();
}
@ -184,13 +189,50 @@ public void testNullableParquetImport() throws IOException, SQLException {
DatasetReader<GenericRecord> reader = getReader();
try {
assertTrue(reader.hasNext());
GenericRecord record1 = reader.next();
assertNull(record1.get("DATA_COL0"));
assertFalse(reader.hasNext());
} finally {
reader.close();
}
}
public void testIncrementalParquetImport() throws IOException, SQLException {
String [] types = { "INT" };
String [] vals = { "1" };
createTableWithColTypes(types, vals);
runImport(getOutputArgv(true, null));
runImportAgain(getOutputArgv(true, new String[]{"--append"}));
DatasetReader<GenericRecord> reader = getReader();
try {
assertTrue(reader.hasNext());
GenericRecord record1 = reader.next();
assertEquals(1, record1.get("DATA_COL0"));
record1 = reader.next();
assertEquals(1, record1.get("DATA_COL0"));
assertFalse(reader.hasNext());
} finally {
reader.close();
}
}
public void testOverwriteParquetDatasetFail() throws IOException, SQLException {
String [] types = { "INT" };
String [] vals = {};
createTableWithColTypes(types, vals);
runImport(getOutputArgv(true, null));
try {
runImportAgain(getOutputArgv(true, null));
fail("");
} catch (IOException ex) {
// ok
}
}
private CompressionType getCompressionType() {
return getDataset().getDescriptor().getCompressionType();
}
@ -208,6 +250,15 @@ private Dataset<GenericRecord> getDataset() {
return Datasets.load(uri, GenericRecord.class);
}
@Override
public void tearDown() {
super.tearDown();
String uri = "dataset:file:" + getTablePath();
if (Datasets.exists(uri)) {
Datasets.delete(uri);
}
}
private void checkField(Field field, String name, Type type) {
assertEquals(name, field.name());
assertEquals(Type.UNION, field.schema().getType());

View File

@ -206,7 +206,15 @@ protected void verifyImport(String expectedVal, String [] importCols) {
* execution).
*/
protected void runImport(SqoopTool tool, String [] argv) throws IOException {
removeTableDir();
boolean cleanup = true;
runImport(cleanup, tool, argv);
}
private void runImport(boolean cleanup, SqoopTool tool,
String [] argv) throws IOException {
if (cleanup) {
removeTableDir();
}
// run the tool through the normal entry-point.
int ret;
@ -234,4 +242,10 @@ protected void runImport(String [] argv) throws IOException {
runImport(new ImportTool(), argv);
}
protected void runImportAgain(String[] argv)
throws IOException {
boolean cleanup = false;
runImport(cleanup, new ImportTool(), argv);
}
}