diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java index 3c091a25..c9d6f104 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java @@ -137,6 +137,14 @@ private void waitForConsumer() { // In almost all cases, the exception will be SqoopException, // because all exceptions are caught and propagated as // SqoopExceptions + + //There are race conditions with exceptions where the free sema is + //no released. So sense we are in single threaded mode at this point + //we can ask if there are availablePermits and release if needed + if (free.availablePermits() == 0) { + free.release(); + } + Throwable t = ex.getCause(); if (t instanceof SqoopException) { throw (SqoopException) t; @@ -144,6 +152,14 @@ private void waitForConsumer() { //In the rare case, it was not a SqoopException Throwables.propagate(t); } catch (Exception ex) { + + //There are race conditions with exceptions where the free sema is + //no released. So sense we are in single threaded mode at this point + //we can ask if there are availablePermits and release if needed + if (free.availablePermits() == 0) { + free.release(); + } + throw new SqoopException(MRExecutionError.MAPRED_EXEC_0019, ex); } } diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java index 71c98db5..3208e8ac 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java @@ -193,22 +193,31 @@ public void testSuccessfulContinuousLoader() throws Throwable { SqoopOutputFormatLoadExecutor executor = new SqoopOutputFormatLoadExecutor(jobContextMock, GoodContinuousLoader.class.getName(), getIDF(), getMatcher()); RecordWriter writer = executor.getRecordWriter(); - IntermediateDataFormat dataFormat = MRJobTestUtil.getTestIDF(); - SqoopWritable writable = new SqoopWritable(dataFormat); - for (int i = 0; i < 10; i++) { - StringBuilder builder = new StringBuilder(); - for (int count = 0; count < 100; count++) { - builder.append(String.valueOf(count)); - if (count != 99) { - builder.append(","); + + boolean exceptionThrown = false; + + try { + IntermediateDataFormat dataFormat = MRJobTestUtil.getTestIDF(); + SqoopWritable writable = new SqoopWritable(dataFormat); + for (int i = 0; i < 10; i++) { + StringBuilder builder = new StringBuilder(); + for (int count = 0; count < 100; count++) { + builder.append(String.valueOf(count)); + if (count != 99) { + builder.append(","); + } } + dataFormat.setCSVTextData(builder.toString()); + writer.write(writable, null); } - dataFormat.setCSVTextData(builder.toString()); - writer.write(writable, null); + } catch (Throwable t) { + t.printStackTrace(); + exceptionThrown = true; } writer.close(null); verify(jobContextMock, times(1)).getConfiguration(); verify(jobContextMock, times(1)).getCounter(SqoopCounters.ROWS_WRITTEN); + Assert.assertFalse(exceptionThrown, "Exception Thrown during writing"); } @Test(expectedExceptions = SqoopException.class)