diff --git a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableExportMapper.java b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableExportMapper.java index 5bf21880..6dbb98d7 100644 --- a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableExportMapper.java +++ b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableExportMapper.java @@ -18,26 +18,14 @@ package org.apache.sqoop.mapreduce.db.netezza; -import java.io.BufferedOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.sql.Connection; -import java.sql.SQLException; - -import org.apache.commons.io.IOUtils; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.sqoop.io.NamedFifo; +import org.apache.sqoop.lib.DelimiterSet; import org.apache.sqoop.lib.SqoopRecord; import org.apache.sqoop.manager.DirectNetezzaManager; import org.apache.sqoop.mapreduce.SqoopMapper; @@ -46,7 +34,14 @@ import org.apache.sqoop.util.PerfCounters; import org.apache.sqoop.util.TaskId; -import org.apache.sqoop.lib.DelimiterSet; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.concurrent.atomic.AtomicBoolean; /** * Netezza export mapper using external tables. @@ -59,8 +54,10 @@ public abstract class NetezzaExternalTableExportMapper extends */ private Configuration conf; - private DBConfiguration dbc; - private File fifoFile; + @VisibleForTesting + DBConfiguration dbc; + @VisibleForTesting + File fifoFile; private Connection con; private OutputStream recordWriter; public static final Log LOG = LogFactory @@ -69,8 +66,12 @@ public abstract class NetezzaExternalTableExportMapper extends private PerfCounters counter; private DelimiterSet outputDelimiters; private String localLogDir = null; - private String logDir = null; - private File taskAttemptDir = null; + @VisibleForTesting + String logDir = null; + @VisibleForTesting + File taskAttemptDir = null; + + private AtomicBoolean jdbcFailed = new AtomicBoolean(false); private String getSqlStatement(DelimiterSet delimiters) throws IOException { @@ -168,9 +169,13 @@ private void initNetezzaExternalTableExport(Context context) taskAttemptDir = TaskId.getLocalWorkPath(conf); localLogDir = DirectNetezzaManager.getLocalLogDir(context.getTaskAttemptID()); - logDir = conf.get(DirectNetezzaManager.NETEZZA_LOG_DIR_OPT); + if (logDir == null) { // need to be able to set in tests + logDir = conf.get(DirectNetezzaManager.NETEZZA_LOG_DIR_OPT); + } - dbc = new DBConfiguration(conf); + if (dbc == null) { // need to be able to mock in tests + dbc = new DBConfiguration(conf); + } File taskAttemptDir = TaskId.getLocalWorkPath(conf); char fd = (char) conf.getInt(DelimiterSet.INPUT_FIELD_DELIM_KEY, ','); @@ -196,7 +201,7 @@ private void initNetezzaExternalTableExport(Context context) boolean cleanup = false; try { con = dbc.getConnection(); - extTableThread = new NetezzaJDBCStatementRunner(Thread.currentThread(), + extTableThread = new NetezzaJDBCStatementRunner(jdbcFailed, con, sqlStmt); } catch (SQLException sqle) { cleanup = true; @@ -226,49 +231,43 @@ private void initNetezzaExternalTableExport(Context context) public void run(Context context) throws IOException, InterruptedException { setup(context); initNetezzaExternalTableExport(context); - if (extTableThread.isAlive()) { - try { - while (context.nextKeyValue()) { - if (Thread.interrupted()) { - if (!extTableThread.isAlive()) { - break; - } - } - map(context.getCurrentKey(), context.getCurrentValue(), context); - } - cleanup(context); - } finally { - try { - recordWriter.close(); - extTableThread.join(); - } catch (Exception e) { - LOG.debug("Exception cleaning up mapper operation : " + e.getMessage()); - } - counter.stopClock(); - LOG.info("Transferred " + counter.toString()); - FileUploader.uploadFilesToDFS(taskAttemptDir.getAbsolutePath(), - localLogDir, logDir, context.getJobID().toString(), - conf); - - if (extTableThread.hasExceptions()) { - extTableThread.printException(); - throw new IOException(extTableThread.getException()); + try { + while (context.nextKeyValue()) { + // Fail fast if there was an error during JDBC operation + if (jdbcFailed.get()) { + break; } + map(context.getCurrentKey(), context.getCurrentValue(), context); } + cleanup(context); + } finally { + try { + recordWriter.close(); + extTableThread.join(); + } catch (Exception e) { + LOG.debug("Exception cleaning up mapper operation : " + e.getMessage()); + } + counter.stopClock(); + LOG.info("Transferred " + counter.toString()); + FileUploader.uploadFilesToDFS(taskAttemptDir.getAbsolutePath(), + localLogDir, logDir, context.getJobID().toString(), + conf); + if (extTableThread.hasExceptions()) { + extTableThread.printException(); + throw new IOException(extTableThread.getException()); + } } } - protected void writeTextRecord(Text record) throws IOException, - InterruptedException { + protected void writeTextRecord(Text record) throws IOException { String outputStr = record.toString() + "\n"; byte[] outputBytes = outputStr.getBytes("UTF-8"); counter.addBytes(outputBytes.length); recordWriter.write(outputBytes, 0, outputBytes.length); } - protected void writeSqoopRecord(SqoopRecord sqr) throws IOException, - InterruptedException { + protected void writeSqoopRecord(SqoopRecord sqr) throws IOException { String outputStr = sqr.toString(this.outputDelimiters); byte[] outputBytes = outputStr.getBytes("UTF-8"); counter.addBytes(outputBytes.length); diff --git a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableImportMapper.java b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableImportMapper.java index 306062aa..3124b172 100644 --- a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableImportMapper.java +++ b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableImportMapper.java @@ -18,22 +18,10 @@ package org.apache.sqoop.mapreduce.db.netezza; -import java.io.BufferedReader; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStreamReader; -import java.sql.Connection; -import java.sql.SQLException; - -import org.apache.commons.io.IOUtils; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.sqoop.config.ConfigurationHelper; @@ -42,10 +30,18 @@ import org.apache.sqoop.manager.DirectNetezzaManager; import org.apache.sqoop.mapreduce.AutoProgressMapper; import org.apache.sqoop.mapreduce.db.DBConfiguration; -import org.apache.sqoop.util.FileUploader; import org.apache.sqoop.util.PerfCounters; import org.apache.sqoop.util.TaskId; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.concurrent.atomic.AtomicBoolean; + /** * Netezza import mapper using external tables. */ @@ -57,8 +53,10 @@ public abstract class NetezzaExternalTableImportMapper extends */ private Configuration conf; - private DBConfiguration dbc; - private File fifoFile; + @VisibleForTesting + DBConfiguration dbc; + @VisibleForTesting + File fifoFile; private int numMappers; private Connection con; private BufferedReader recordReader; @@ -66,7 +64,11 @@ public abstract class NetezzaExternalTableImportMapper extends .getLog(NetezzaExternalTableImportMapper.class.getName()); private NetezzaJDBCStatementRunner extTableThread; private PerfCounters counter; - private File taskAttemptDir = null; + @VisibleForTesting + File taskAttemptDir = null; + + private AtomicBoolean jdbcFailed = new AtomicBoolean(false); + private String getSqlStatement(int myId) throws IOException { char fd = (char) conf.getInt(DelimiterSet.OUTPUT_FIELD_DELIM_KEY, ','); @@ -143,8 +145,9 @@ private String getSqlStatement(int myId) throws IOException { private void initNetezzaExternalTableImport(int myId) throws IOException { - taskAttemptDir = TaskId.getLocalWorkPath(conf); - + if (taskAttemptDir == null) { + taskAttemptDir = TaskId.getLocalWorkPath(conf); + } this.fifoFile = new File(taskAttemptDir, ("nzexttable-" + myId + ".txt")); String filename = fifoFile.toString(); NamedFifo nf; @@ -163,7 +166,7 @@ private void initNetezzaExternalTableImport(int myId) throws IOException { boolean cleanup = false; try { con = dbc.getConnection(); - extTableThread = new NetezzaJDBCStatementRunner(Thread.currentThread(), + extTableThread = new NetezzaJDBCStatementRunner(jdbcFailed, con, sqlStmt); } catch (SQLException sqle) { cleanup = true; @@ -197,39 +200,38 @@ public void map(Integer dataSliceId, NullWritable val, Context context) conf = context.getConfiguration(); - dbc = new DBConfiguration(conf); + if (dbc == null) { // need to be able to mock in tests + dbc = new DBConfiguration(conf); + } numMappers = ConfigurationHelper.getConfNumMaps(conf); char rd = (char) conf.getInt(DelimiterSet.OUTPUT_RECORD_DELIM_KEY, '\n'); initNetezzaExternalTableImport(dataSliceId); counter = new PerfCounters(); counter.startClock(); Text outputRecord = new Text(); - if (extTableThread.isAlive()) { - try { - String inputRecord = recordReader.readLine(); - while (inputRecord != null) { - if (Thread.interrupted()) { - if (!extTableThread.isAlive()) { - break; - } - } - outputRecord.set(inputRecord + rd); - // May be we should set the output to be String for faster performance - // There is no real benefit in changing it to Text and then - // converting it back in our case - writeRecord(outputRecord, context); - counter.addBytes(1 + inputRecord.length()); - inputRecord = recordReader.readLine(); - } - } finally { - recordReader.close(); - extTableThread.join(); - counter.stopClock(); - LOG.info("Transferred " + counter.toString()); - if (extTableThread.hasExceptions()) { - extTableThread.printException(); - throw new IOException(extTableThread.getException()); + try { + String inputRecord = recordReader.readLine(); + while (inputRecord != null) { + // Fail fast if there was an error during JDBC operation + if (jdbcFailed.get()) { + break; } + outputRecord.set(inputRecord + rd); + // May be we should set the output to be String for faster performance + // There is no real benefit in changing it to Text and then + // converting it back in our case + writeRecord(outputRecord, context); + counter.addBytes(1 + inputRecord.length()); + inputRecord = recordReader.readLine(); + } + } finally { + recordReader.close(); + extTableThread.join(); + counter.stopClock(); + LOG.info("Transferred " + counter.toString()); + if (extTableThread.hasExceptions()) { + extTableThread.printException(); + throw new IOException(extTableThread.getException()); } } } diff --git a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaJDBCStatementRunner.java b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaJDBCStatementRunner.java index cedfd235..a6a4481b 100644 --- a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaJDBCStatementRunner.java +++ b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaJDBCStatementRunner.java @@ -21,6 +21,7 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -38,7 +39,7 @@ public class NetezzaJDBCStatementRunner extends Thread { private Connection con; private Exception exception; private PreparedStatement ps; - private Thread parent; + private AtomicBoolean failed; public boolean hasExceptions() { return exception != null; @@ -58,9 +59,16 @@ public Throwable getException() { return exception; } - public NetezzaJDBCStatementRunner(Thread parent, Connection con, - String sqlStatement) throws SQLException { - this.parent = parent; + /** + * Execute Netezza SQL statement on given connection. + * @param failed Set this to true if the operation fails. + * @param con connection + * @param sqlStatement statement to execute + * @throws SQLException + */ + public NetezzaJDBCStatementRunner(AtomicBoolean failed, Connection con, + String sqlStatement) throws SQLException { + this.failed = failed; this.con = con; this.ps = con.prepareStatement(sqlStatement); this.exception = null; @@ -89,7 +97,7 @@ public void run() { con = null; } if (interruptParent) { - this.parent.interrupt(); + failed.set(true); } } } diff --git a/src/test/org/apache/sqoop/mapreduce/db/netezza/TestNetezzaExternalTableExportMapper.java b/src/test/org/apache/sqoop/mapreduce/db/netezza/TestNetezzaExternalTableExportMapper.java new file mode 100644 index 00000000..5e558717 --- /dev/null +++ b/src/test/org/apache/sqoop/mapreduce/db/netezza/TestNetezzaExternalTableExportMapper.java @@ -0,0 +1,225 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce.db.netezza; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.sqoop.mapreduce.db.DBConfiguration; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.RuleChain; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.Verifier; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestNetezzaExternalTableExportMapper { + + // chained rule, see #rules + private Verifier verifyThatLogsAreUploaded = new Verifier() { + @Override public void verify() { + File jobDir = tmpFolder.getRoot().toPath().resolve("job_job001_0001").resolve("job__0000-0-0").toFile(); + assertThat(jobDir.exists(), is(true)); + assertThat(jobDir.listFiles().length, is(equalTo(1))); + assertThat(jobDir.listFiles()[0].getName(), is(equalTo("TEST.nzlog"))); + try { + assertThat(FileUtils.readFileToString(jobDir.listFiles()[0]), is(equalTo("test log"))); + } catch (IOException e) { + e.printStackTrace(); + fail("Failed to read log file."); + } + } + }; + + // chained rule, see #rules + private TemporaryFolder tmpFolder = new TemporaryFolder(); + + // need to keep tmpFolder around to verify logs + @Rule + public RuleChain rules = RuleChain.outerRule(tmpFolder).around(verifyThatLogsAreUploaded); + + @Rule + public final ExpectedException exception = ExpectedException.none(); + + private static final SQLException testException = new SQLException("failed in test"); + + private NetezzaExternalTableExportMapper mapper; + private Mapper.Context context; + + @Before + public void setUp() throws Exception { + mapper = basicMockingOfMapper(); + context = getContext(); + } + + @Test + public void testPassingJDBC() throws Exception { + withNoopJDBCOperation(mapper).run(context); + } + + @Test + public void testFailingJDBC() throws Exception { + withFailingJDBCOperation(mapper); + + exception.expect(IOException.class); + exception.expectCause(is(equalTo(testException))); + mapper.run(context); + } + + /** + * Creates an instance of NetezzaExternalTableExportMapper with the + * necessary fields mocked to be able to call the run() method without errors. + * @return + */ + private NetezzaExternalTableExportMapper basicMockingOfMapper() { + NetezzaExternalTableExportMapper mapper = new NetezzaExternalTableExportMapper() { + @Override + public void map(LongWritable key, Text text, Context context) { + // no-op. Don't read from context, mock won't be ready to handle that. + } + }; + + mapper.logDir = tmpFolder.getRoot().getAbsolutePath(); + + return mapper; + } + + /** + * Mocks mapper's DB connection in a way that leads to SQLException during the JDBC operation. + * @param mapper will modify this object + * @return modified mapper + * @throws Exception + */ + private NetezzaExternalTableExportMapper withFailingJDBCOperation(NetezzaExternalTableExportMapper mapper) throws Exception { + Connection connectionMock = mock(Connection.class); + + // PreparadStatement mock should imitate loading stuff from FIFO into Netezza + PreparedStatement psMock = mock(PreparedStatement.class); + when(psMock.execute()).then(invocation -> { + // Write log file under taskAttemptDir to be able to check log upload + File logFile = mapper.taskAttemptDir.toPath().resolve("job__0000-0-0").resolve("TEST.nzlog").toFile(); + FileUtils.writeStringToFile(logFile, "test log"); + + // Need to open FIFO for reading, otherwise writing would hang + FileInputStream fis = new FileInputStream(mapper.fifoFile.getAbsoluteFile()); + + // Simulate delay + Thread.sleep(200); + throw testException; + }); + when(connectionMock.prepareStatement(anyString())).thenReturn(psMock); + + DBConfiguration dbcMock = mock(DBConfiguration.class); + when(dbcMock.getConnection()).thenReturn(connectionMock); + mapper.dbc = dbcMock; + return mapper; + } + + + /** + * Mocks mapper's DB connection to execute a no-op JDBC operation. + * @param mapper will modify this object + * @return modified mapper + * @throws Exception + */ + private NetezzaExternalTableExportMapper withNoopJDBCOperation(NetezzaExternalTableExportMapper mapper) throws Exception { + Connection connectionMock = mock(Connection.class); + + // PreparadStatement mock should imitate loading stuff from FIFO into Netezza + PreparedStatement psMock = mock(PreparedStatement.class); + when(psMock.execute()).then(invocation -> { + // Write log file under taskAttemptDir to be able to check log upload + File logFile = mapper.taskAttemptDir.toPath().resolve("job__0000-0-0").resolve("TEST.nzlog").toFile(); + FileUtils.writeStringToFile(logFile, "test log"); + + // Need to open FIFO for reading, otherwise writing would hang + new FileInputStream(mapper.fifoFile.getAbsoluteFile()); + + // Simulate delay + Thread.sleep(200); + return true; + }); + when(connectionMock.prepareStatement(anyString())).thenReturn(psMock); + + DBConfiguration dbcMock = mock(DBConfiguration.class); + when(dbcMock.getConnection()).thenReturn(connectionMock); + mapper.dbc = dbcMock; + return mapper; + } + + + /** + * Creates simple mapreduce context that says it has a single record but won't actually + * return any records as tests are not expected to read the records. + * @return + * @throws java.io.IOException + * @throws InterruptedException + */ + private Mapper.Context getContext() throws java.io.IOException, InterruptedException { + Mapper.Context context = mock(Mapper.Context.class); + + Configuration conf = new Configuration(); + when(context.getConfiguration()).thenReturn(conf); + + TaskAttemptID taskAttemptID = new TaskAttemptID(); + when(context.getTaskAttemptID()).thenReturn(taskAttemptID); + + JobID jobID = new JobID("job001", 1); + when(context.getJobID()).thenReturn(jobID); + + // Simulate a single record by answering 'true' once + when(context.nextKeyValue()).thenAnswer(new Answer() { + boolean answer = true; + + @Override + public Object answer(InvocationOnMock invocation) { + if (answer == true) { + answer = false; + return true; + } + return false; + } + }); + + return context; + } + +} \ No newline at end of file diff --git a/src/test/org/apache/sqoop/mapreduce/db/netezza/TestNetezzaExternalTableImportMapper.java b/src/test/org/apache/sqoop/mapreduce/db/netezza/TestNetezzaExternalTableImportMapper.java new file mode 100644 index 00000000..1a694378 --- /dev/null +++ b/src/test/org/apache/sqoop/mapreduce/db/netezza/TestNetezzaExternalTableImportMapper.java @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce.db.netezza; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.sqoop.mapreduce.db.DBConfiguration; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestNetezzaExternalTableImportMapper { + + @Rule + public TemporaryFolder tmpFolder = new TemporaryFolder(); + + @Rule + public final ExpectedException exception = ExpectedException.none(); + + private static final SQLException testException = new SQLException("failed in test"); + + private NetezzaExternalTableImportMapper mapper; + private Mapper.Context context; + + @Before + public void setUp() { + mapper = basicMockingOfMapper(); + context = getContext(); + } + + @Test + public void testPassingJDBC() throws Exception { + withNoopJDBCOperation(mapper).map(1, null, context); + } + + @Test + public void testFailingJDBC() throws Exception { + withFailingJDBCOperation(mapper); + + exception.expect(IOException.class); + exception.expectCause(is(equalTo(testException))); + mapper.map(1, null, context); + } + + /** + * Creates an instance of NetezzaExternalTableExportMapper with the + * necessary fields mocked to be able to call the run() method without errors. + * @return + */ + private NetezzaExternalTableImportMapper basicMockingOfMapper() { + return new NetezzaExternalTableImportMapper() { + @Override + protected void writeRecord(Text text, Context context) { + // no-op. Don't read from context, mock won't be ready to handle that. + } + }; + } + + /** + * Mocks mapper's DB connection in a way that leads to SQLException during the JDBC operation. + * @param mapper will modify this object + * @return modified mapper + * @throws Exception + */ + private NetezzaExternalTableImportMapper withFailingJDBCOperation(NetezzaExternalTableImportMapper mapper) throws Exception { + Connection connectionMock = mock(Connection.class); + + // PreparadStatement mock should imitate loading stuff from FIFO into Netezza + PreparedStatement psMock = mock(PreparedStatement.class); + when(psMock.execute()).then(invocation -> { + // Write log file under taskAttemptDir to be able to check log upload + File logFile = mapper.taskAttemptDir.toPath().resolve("job__0000-0-0").resolve("TEST.nzlog").toFile(); + FileUtils.writeStringToFile(logFile, "test log"); + + // Need to open FIFO for writing, otherwise reading would hang + FileOutputStream fos = new FileOutputStream(mapper.fifoFile.getAbsoluteFile()); + + // Simulate delay + Thread.sleep(200); + + // Write single record, then throw + fos.write("test record".getBytes()); + fos.close(); + throw testException; + }); + when(connectionMock.prepareStatement(anyString())).thenReturn(psMock); + + DBConfiguration dbcMock = mock(DBConfiguration.class); + when(dbcMock.getConnection()).thenReturn(connectionMock); + mapper.dbc = dbcMock; + return mapper; + } + + + /** + * Mocks mapper's DB connection to execute a no-op JDBC operation. + * @param mapper will modify this object + * @return modified mapper + * @throws Exception + */ + private NetezzaExternalTableImportMapper withNoopJDBCOperation(NetezzaExternalTableImportMapper mapper) throws Exception { + Connection connectionMock = mock(Connection.class); + + // PreparadStatement mock should imitate loading stuff from FIFO into Netezza + PreparedStatement psMock = mock(PreparedStatement.class); + when(psMock.execute()).then(invocation -> { + // Need to open FIFO for writing, otherwise reading would hang + FileOutputStream fos = new FileOutputStream(mapper.fifoFile.getAbsoluteFile()); + + // Simulate delay + Thread.sleep(200); + + // Write single record and return + fos.write("test record".getBytes()); + fos.close(); + return true; + }); + when(connectionMock.prepareStatement(anyString())).thenReturn(psMock); + + DBConfiguration dbcMock = mock(DBConfiguration.class); + when(dbcMock.getConnection()).thenReturn(connectionMock); + mapper.dbc = dbcMock; + return mapper; + } + + + /** + * Creates simple mapreduce context that says it has a single record but won't actually + * return any records as tests are not expected to read the records. + * @return + * @throws IOException + * @throws InterruptedException + */ + private Mapper.Context getContext() { + Mapper.Context context = mock(Mapper.Context.class); + + Configuration conf = new Configuration(); + when(context.getConfiguration()).thenReturn(conf); + + TaskAttemptID taskAttemptID = new TaskAttemptID(); + when(context.getTaskAttemptID()).thenReturn(taskAttemptID); + + return context; + } + +} \ No newline at end of file