5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-17 01:11:07 +08:00

SQOOP-2151: Sqoop2: Sqoop mapreduce job gets into deadlock when loader throws an exception

(Ted Malaska via Gwen Shapira)
This commit is contained in:
Gwen Shapira 2015-05-13 20:13:03 +03:00
parent b1c742b638
commit ad632361bb
2 changed files with 35 additions and 10 deletions

View File

@ -137,6 +137,14 @@ private void waitForConsumer() {
// In almost all cases, the exception will be SqoopException, // In almost all cases, the exception will be SqoopException,
// because all exceptions are caught and propagated as // because all exceptions are caught and propagated as
// SqoopExceptions // SqoopExceptions
//There are race conditions with exceptions where the free sema is
//no released. So sense we are in single threaded mode at this point
//we can ask if there are availablePermits and release if needed
if (free.availablePermits() == 0) {
free.release();
}
Throwable t = ex.getCause(); Throwable t = ex.getCause();
if (t instanceof SqoopException) { if (t instanceof SqoopException) {
throw (SqoopException) t; throw (SqoopException) t;
@ -144,6 +152,14 @@ private void waitForConsumer() {
//In the rare case, it was not a SqoopException //In the rare case, it was not a SqoopException
Throwables.propagate(t); Throwables.propagate(t);
} catch (Exception ex) { } catch (Exception ex) {
//There are race conditions with exceptions where the free sema is
//no released. So sense we are in single threaded mode at this point
//we can ask if there are availablePermits and release if needed
if (free.availablePermits() == 0) {
free.release();
}
throw new SqoopException(MRExecutionError.MAPRED_EXEC_0019, ex); throw new SqoopException(MRExecutionError.MAPRED_EXEC_0019, ex);
} }
} }

View File

@ -193,6 +193,10 @@ public void testSuccessfulContinuousLoader() throws Throwable {
SqoopOutputFormatLoadExecutor executor = new SqoopOutputFormatLoadExecutor(jobContextMock, SqoopOutputFormatLoadExecutor executor = new SqoopOutputFormatLoadExecutor(jobContextMock,
GoodContinuousLoader.class.getName(), getIDF(), getMatcher()); GoodContinuousLoader.class.getName(), getIDF(), getMatcher());
RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter(); RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter();
boolean exceptionThrown = false;
try {
IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF(); IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
SqoopWritable writable = new SqoopWritable(dataFormat); SqoopWritable writable = new SqoopWritable(dataFormat);
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
@ -206,9 +210,14 @@ public void testSuccessfulContinuousLoader() throws Throwable {
dataFormat.setCSVTextData(builder.toString()); dataFormat.setCSVTextData(builder.toString());
writer.write(writable, null); writer.write(writable, null);
} }
} catch (Throwable t) {
t.printStackTrace();
exceptionThrown = true;
}
writer.close(null); writer.close(null);
verify(jobContextMock, times(1)).getConfiguration(); verify(jobContextMock, times(1)).getConfiguration();
verify(jobContextMock, times(1)).getCounter(SqoopCounters.ROWS_WRITTEN); verify(jobContextMock, times(1)).getCounter(SqoopCounters.ROWS_WRITTEN);
Assert.assertFalse(exceptionThrown, "Exception Thrown during writing");
} }
@Test(expectedExceptions = SqoopException.class) @Test(expectedExceptions = SqoopException.class)