5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-02 23:40:04 +08:00

SQOOP-3378: Error during direct Netezza import/export can interrupt process in uncontrolled ways

(Daniel Voros via Szabolcs Vasas)
This commit is contained in:
Szabolcs Vasas 2018-10-11 10:36:41 +02:00
parent 71523079bc
commit 40f0b74c01
5 changed files with 519 additions and 104 deletions

View File

@ -18,26 +18,14 @@
package org.apache.sqoop.mapreduce.db.netezza; package org.apache.sqoop.mapreduce.db.netezza;
import java.io.BufferedOutputStream; import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.sql.Connection;
import java.sql.SQLException;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.sqoop.io.NamedFifo; import org.apache.sqoop.io.NamedFifo;
import org.apache.sqoop.lib.DelimiterSet;
import org.apache.sqoop.lib.SqoopRecord; import org.apache.sqoop.lib.SqoopRecord;
import org.apache.sqoop.manager.DirectNetezzaManager; import org.apache.sqoop.manager.DirectNetezzaManager;
import org.apache.sqoop.mapreduce.SqoopMapper; import org.apache.sqoop.mapreduce.SqoopMapper;
@ -46,7 +34,14 @@
import org.apache.sqoop.util.PerfCounters; import org.apache.sqoop.util.PerfCounters;
import org.apache.sqoop.util.TaskId; import org.apache.sqoop.util.TaskId;
import org.apache.sqoop.lib.DelimiterSet; import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.concurrent.atomic.AtomicBoolean;
/** /**
* Netezza export mapper using external tables. * Netezza export mapper using external tables.
@ -59,8 +54,10 @@ public abstract class NetezzaExternalTableExportMapper<K, V> extends
*/ */
private Configuration conf; private Configuration conf;
private DBConfiguration dbc; @VisibleForTesting
private File fifoFile; DBConfiguration dbc;
@VisibleForTesting
File fifoFile;
private Connection con; private Connection con;
private OutputStream recordWriter; private OutputStream recordWriter;
public static final Log LOG = LogFactory public static final Log LOG = LogFactory
@ -69,8 +66,12 @@ public abstract class NetezzaExternalTableExportMapper<K, V> extends
private PerfCounters counter; private PerfCounters counter;
private DelimiterSet outputDelimiters; private DelimiterSet outputDelimiters;
private String localLogDir = null; private String localLogDir = null;
private String logDir = null; @VisibleForTesting
private File taskAttemptDir = null; String logDir = null;
@VisibleForTesting
File taskAttemptDir = null;
private AtomicBoolean jdbcFailed = new AtomicBoolean(false);
private String getSqlStatement(DelimiterSet delimiters) throws IOException { private String getSqlStatement(DelimiterSet delimiters) throws IOException {
@ -168,9 +169,13 @@ private void initNetezzaExternalTableExport(Context context)
taskAttemptDir = TaskId.getLocalWorkPath(conf); taskAttemptDir = TaskId.getLocalWorkPath(conf);
localLogDir = localLogDir =
DirectNetezzaManager.getLocalLogDir(context.getTaskAttemptID()); DirectNetezzaManager.getLocalLogDir(context.getTaskAttemptID());
logDir = conf.get(DirectNetezzaManager.NETEZZA_LOG_DIR_OPT); if (logDir == null) { // need to be able to set in tests
logDir = conf.get(DirectNetezzaManager.NETEZZA_LOG_DIR_OPT);
}
dbc = new DBConfiguration(conf); if (dbc == null) { // need to be able to mock in tests
dbc = new DBConfiguration(conf);
}
File taskAttemptDir = TaskId.getLocalWorkPath(conf); File taskAttemptDir = TaskId.getLocalWorkPath(conf);
char fd = (char) conf.getInt(DelimiterSet.INPUT_FIELD_DELIM_KEY, ','); char fd = (char) conf.getInt(DelimiterSet.INPUT_FIELD_DELIM_KEY, ',');
@ -196,7 +201,7 @@ private void initNetezzaExternalTableExport(Context context)
boolean cleanup = false; boolean cleanup = false;
try { try {
con = dbc.getConnection(); con = dbc.getConnection();
extTableThread = new NetezzaJDBCStatementRunner(Thread.currentThread(), extTableThread = new NetezzaJDBCStatementRunner(jdbcFailed,
con, sqlStmt); con, sqlStmt);
} catch (SQLException sqle) { } catch (SQLException sqle) {
cleanup = true; cleanup = true;
@ -226,49 +231,43 @@ private void initNetezzaExternalTableExport(Context context)
public void run(Context context) throws IOException, InterruptedException { public void run(Context context) throws IOException, InterruptedException {
setup(context); setup(context);
initNetezzaExternalTableExport(context); initNetezzaExternalTableExport(context);
if (extTableThread.isAlive()) { try {
try { while (context.nextKeyValue()) {
while (context.nextKeyValue()) { // Fail fast if there was an error during JDBC operation
if (Thread.interrupted()) { if (jdbcFailed.get()) {
if (!extTableThread.isAlive()) { break;
break;
}
}
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
cleanup(context);
} finally {
try {
recordWriter.close();
extTableThread.join();
} catch (Exception e) {
LOG.debug("Exception cleaning up mapper operation : " + e.getMessage());
}
counter.stopClock();
LOG.info("Transferred " + counter.toString());
FileUploader.uploadFilesToDFS(taskAttemptDir.getAbsolutePath(),
localLogDir, logDir, context.getJobID().toString(),
conf);
if (extTableThread.hasExceptions()) {
extTableThread.printException();
throw new IOException(extTableThread.getException());
} }
map(context.getCurrentKey(), context.getCurrentValue(), context);
} }
cleanup(context);
} finally {
try {
recordWriter.close();
extTableThread.join();
} catch (Exception e) {
LOG.debug("Exception cleaning up mapper operation : " + e.getMessage());
}
counter.stopClock();
LOG.info("Transferred " + counter.toString());
FileUploader.uploadFilesToDFS(taskAttemptDir.getAbsolutePath(),
localLogDir, logDir, context.getJobID().toString(),
conf);
if (extTableThread.hasExceptions()) {
extTableThread.printException();
throw new IOException(extTableThread.getException());
}
} }
} }
protected void writeTextRecord(Text record) throws IOException, protected void writeTextRecord(Text record) throws IOException {
InterruptedException {
String outputStr = record.toString() + "\n"; String outputStr = record.toString() + "\n";
byte[] outputBytes = outputStr.getBytes("UTF-8"); byte[] outputBytes = outputStr.getBytes("UTF-8");
counter.addBytes(outputBytes.length); counter.addBytes(outputBytes.length);
recordWriter.write(outputBytes, 0, outputBytes.length); recordWriter.write(outputBytes, 0, outputBytes.length);
} }
protected void writeSqoopRecord(SqoopRecord sqr) throws IOException, protected void writeSqoopRecord(SqoopRecord sqr) throws IOException {
InterruptedException {
String outputStr = sqr.toString(this.outputDelimiters); String outputStr = sqr.toString(this.outputDelimiters);
byte[] outputBytes = outputStr.getBytes("UTF-8"); byte[] outputBytes = outputStr.getBytes("UTF-8");
counter.addBytes(outputBytes.length); counter.addBytes(outputBytes.length);

View File

@ -18,22 +18,10 @@
package org.apache.sqoop.mapreduce.db.netezza; package org.apache.sqoop.mapreduce.db.netezza;
import java.io.BufferedReader; import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.sql.Connection;
import java.sql.SQLException;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.sqoop.config.ConfigurationHelper; import org.apache.sqoop.config.ConfigurationHelper;
@ -42,10 +30,18 @@
import org.apache.sqoop.manager.DirectNetezzaManager; import org.apache.sqoop.manager.DirectNetezzaManager;
import org.apache.sqoop.mapreduce.AutoProgressMapper; import org.apache.sqoop.mapreduce.AutoProgressMapper;
import org.apache.sqoop.mapreduce.db.DBConfiguration; import org.apache.sqoop.mapreduce.db.DBConfiguration;
import org.apache.sqoop.util.FileUploader;
import org.apache.sqoop.util.PerfCounters; import org.apache.sqoop.util.PerfCounters;
import org.apache.sqoop.util.TaskId; import org.apache.sqoop.util.TaskId;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.concurrent.atomic.AtomicBoolean;
/** /**
* Netezza import mapper using external tables. * Netezza import mapper using external tables.
*/ */
@ -57,8 +53,10 @@ public abstract class NetezzaExternalTableImportMapper<K, V> extends
*/ */
private Configuration conf; private Configuration conf;
private DBConfiguration dbc; @VisibleForTesting
private File fifoFile; DBConfiguration dbc;
@VisibleForTesting
File fifoFile;
private int numMappers; private int numMappers;
private Connection con; private Connection con;
private BufferedReader recordReader; private BufferedReader recordReader;
@ -66,7 +64,11 @@ public abstract class NetezzaExternalTableImportMapper<K, V> extends
.getLog(NetezzaExternalTableImportMapper.class.getName()); .getLog(NetezzaExternalTableImportMapper.class.getName());
private NetezzaJDBCStatementRunner extTableThread; private NetezzaJDBCStatementRunner extTableThread;
private PerfCounters counter; private PerfCounters counter;
private File taskAttemptDir = null; @VisibleForTesting
File taskAttemptDir = null;
private AtomicBoolean jdbcFailed = new AtomicBoolean(false);
private String getSqlStatement(int myId) throws IOException { private String getSqlStatement(int myId) throws IOException {
char fd = (char) conf.getInt(DelimiterSet.OUTPUT_FIELD_DELIM_KEY, ','); char fd = (char) conf.getInt(DelimiterSet.OUTPUT_FIELD_DELIM_KEY, ',');
@ -143,8 +145,9 @@ private String getSqlStatement(int myId) throws IOException {
private void initNetezzaExternalTableImport(int myId) throws IOException { private void initNetezzaExternalTableImport(int myId) throws IOException {
taskAttemptDir = TaskId.getLocalWorkPath(conf); if (taskAttemptDir == null) {
taskAttemptDir = TaskId.getLocalWorkPath(conf);
}
this.fifoFile = new File(taskAttemptDir, ("nzexttable-" + myId + ".txt")); this.fifoFile = new File(taskAttemptDir, ("nzexttable-" + myId + ".txt"));
String filename = fifoFile.toString(); String filename = fifoFile.toString();
NamedFifo nf; NamedFifo nf;
@ -163,7 +166,7 @@ private void initNetezzaExternalTableImport(int myId) throws IOException {
boolean cleanup = false; boolean cleanup = false;
try { try {
con = dbc.getConnection(); con = dbc.getConnection();
extTableThread = new NetezzaJDBCStatementRunner(Thread.currentThread(), extTableThread = new NetezzaJDBCStatementRunner(jdbcFailed,
con, sqlStmt); con, sqlStmt);
} catch (SQLException sqle) { } catch (SQLException sqle) {
cleanup = true; cleanup = true;
@ -197,39 +200,38 @@ public void map(Integer dataSliceId, NullWritable val, Context context)
conf = context.getConfiguration(); conf = context.getConfiguration();
dbc = new DBConfiguration(conf); if (dbc == null) { // need to be able to mock in tests
dbc = new DBConfiguration(conf);
}
numMappers = ConfigurationHelper.getConfNumMaps(conf); numMappers = ConfigurationHelper.getConfNumMaps(conf);
char rd = (char) conf.getInt(DelimiterSet.OUTPUT_RECORD_DELIM_KEY, '\n'); char rd = (char) conf.getInt(DelimiterSet.OUTPUT_RECORD_DELIM_KEY, '\n');
initNetezzaExternalTableImport(dataSliceId); initNetezzaExternalTableImport(dataSliceId);
counter = new PerfCounters(); counter = new PerfCounters();
counter.startClock(); counter.startClock();
Text outputRecord = new Text(); Text outputRecord = new Text();
if (extTableThread.isAlive()) { try {
try { String inputRecord = recordReader.readLine();
String inputRecord = recordReader.readLine(); while (inputRecord != null) {
while (inputRecord != null) { // Fail fast if there was an error during JDBC operation
if (Thread.interrupted()) { if (jdbcFailed.get()) {
if (!extTableThread.isAlive()) { break;
break;
}
}
outputRecord.set(inputRecord + rd);
// May be we should set the output to be String for faster performance
// There is no real benefit in changing it to Text and then
// converting it back in our case
writeRecord(outputRecord, context);
counter.addBytes(1 + inputRecord.length());
inputRecord = recordReader.readLine();
}
} finally {
recordReader.close();
extTableThread.join();
counter.stopClock();
LOG.info("Transferred " + counter.toString());
if (extTableThread.hasExceptions()) {
extTableThread.printException();
throw new IOException(extTableThread.getException());
} }
outputRecord.set(inputRecord + rd);
// May be we should set the output to be String for faster performance
// There is no real benefit in changing it to Text and then
// converting it back in our case
writeRecord(outputRecord, context);
counter.addBytes(1 + inputRecord.length());
inputRecord = recordReader.readLine();
}
} finally {
recordReader.close();
extTableThread.join();
counter.stopClock();
LOG.info("Transferred " + counter.toString());
if (extTableThread.hasExceptions()) {
extTableThread.printException();
throw new IOException(extTableThread.getException());
} }
} }
} }

View File

@ -21,6 +21,7 @@
import java.sql.Connection; import java.sql.Connection;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -38,7 +39,7 @@ public class NetezzaJDBCStatementRunner extends Thread {
private Connection con; private Connection con;
private Exception exception; private Exception exception;
private PreparedStatement ps; private PreparedStatement ps;
private Thread parent; private AtomicBoolean failed;
public boolean hasExceptions() { public boolean hasExceptions() {
return exception != null; return exception != null;
@ -58,9 +59,16 @@ public Throwable getException() {
return exception; return exception;
} }
public NetezzaJDBCStatementRunner(Thread parent, Connection con, /**
String sqlStatement) throws SQLException { * Execute Netezza SQL statement on given connection.
this.parent = parent; * @param failed Set this to true if the operation fails.
* @param con connection
* @param sqlStatement statement to execute
* @throws SQLException
*/
public NetezzaJDBCStatementRunner(AtomicBoolean failed, Connection con,
String sqlStatement) throws SQLException {
this.failed = failed;
this.con = con; this.con = con;
this.ps = con.prepareStatement(sqlStatement); this.ps = con.prepareStatement(sqlStatement);
this.exception = null; this.exception = null;
@ -89,7 +97,7 @@ public void run() {
con = null; con = null;
} }
if (interruptParent) { if (interruptParent) {
this.parent.interrupt(); failed.set(true);
} }
} }
} }

View File

@ -0,0 +1,225 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce.db.netezza;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.sqoop.mapreduce.db.DBConfiguration;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.RuleChain;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.Verifier;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestNetezzaExternalTableExportMapper {
// chained rule, see #rules
private Verifier verifyThatLogsAreUploaded = new Verifier() {
@Override public void verify() {
File jobDir = tmpFolder.getRoot().toPath().resolve("job_job001_0001").resolve("job__0000-0-0").toFile();
assertThat(jobDir.exists(), is(true));
assertThat(jobDir.listFiles().length, is(equalTo(1)));
assertThat(jobDir.listFiles()[0].getName(), is(equalTo("TEST.nzlog")));
try {
assertThat(FileUtils.readFileToString(jobDir.listFiles()[0]), is(equalTo("test log")));
} catch (IOException e) {
e.printStackTrace();
fail("Failed to read log file.");
}
}
};
// chained rule, see #rules
private TemporaryFolder tmpFolder = new TemporaryFolder();
// need to keep tmpFolder around to verify logs
@Rule
public RuleChain rules = RuleChain.outerRule(tmpFolder).around(verifyThatLogsAreUploaded);
@Rule
public final ExpectedException exception = ExpectedException.none();
private static final SQLException testException = new SQLException("failed in test");
private NetezzaExternalTableExportMapper<LongWritable, Text> mapper;
private Mapper.Context context;
@Before
public void setUp() throws Exception {
mapper = basicMockingOfMapper();
context = getContext();
}
@Test
public void testPassingJDBC() throws Exception {
withNoopJDBCOperation(mapper).run(context);
}
@Test
public void testFailingJDBC() throws Exception {
withFailingJDBCOperation(mapper);
exception.expect(IOException.class);
exception.expectCause(is(equalTo(testException)));
mapper.run(context);
}
/**
* Creates an instance of NetezzaExternalTableExportMapper with the
* necessary fields mocked to be able to call the run() method without errors.
* @return
*/
private NetezzaExternalTableExportMapper<LongWritable, Text> basicMockingOfMapper() {
NetezzaExternalTableExportMapper<LongWritable, Text> mapper = new NetezzaExternalTableExportMapper<LongWritable, Text>() {
@Override
public void map(LongWritable key, Text text, Context context) {
// no-op. Don't read from context, mock won't be ready to handle that.
}
};
mapper.logDir = tmpFolder.getRoot().getAbsolutePath();
return mapper;
}
/**
* Mocks mapper's DB connection in a way that leads to SQLException during the JDBC operation.
* @param mapper will modify this object
* @return modified mapper
* @throws Exception
*/
private NetezzaExternalTableExportMapper<LongWritable, Text> withFailingJDBCOperation(NetezzaExternalTableExportMapper<LongWritable, Text> mapper) throws Exception {
Connection connectionMock = mock(Connection.class);
// PreparadStatement mock should imitate loading stuff from FIFO into Netezza
PreparedStatement psMock = mock(PreparedStatement.class);
when(psMock.execute()).then(invocation -> {
// Write log file under taskAttemptDir to be able to check log upload
File logFile = mapper.taskAttemptDir.toPath().resolve("job__0000-0-0").resolve("TEST.nzlog").toFile();
FileUtils.writeStringToFile(logFile, "test log");
// Need to open FIFO for reading, otherwise writing would hang
FileInputStream fis = new FileInputStream(mapper.fifoFile.getAbsoluteFile());
// Simulate delay
Thread.sleep(200);
throw testException;
});
when(connectionMock.prepareStatement(anyString())).thenReturn(psMock);
DBConfiguration dbcMock = mock(DBConfiguration.class);
when(dbcMock.getConnection()).thenReturn(connectionMock);
mapper.dbc = dbcMock;
return mapper;
}
/**
* Mocks mapper's DB connection to execute a no-op JDBC operation.
* @param mapper will modify this object
* @return modified mapper
* @throws Exception
*/
private NetezzaExternalTableExportMapper<LongWritable, Text> withNoopJDBCOperation(NetezzaExternalTableExportMapper<LongWritable, Text> mapper) throws Exception {
Connection connectionMock = mock(Connection.class);
// PreparadStatement mock should imitate loading stuff from FIFO into Netezza
PreparedStatement psMock = mock(PreparedStatement.class);
when(psMock.execute()).then(invocation -> {
// Write log file under taskAttemptDir to be able to check log upload
File logFile = mapper.taskAttemptDir.toPath().resolve("job__0000-0-0").resolve("TEST.nzlog").toFile();
FileUtils.writeStringToFile(logFile, "test log");
// Need to open FIFO for reading, otherwise writing would hang
new FileInputStream(mapper.fifoFile.getAbsoluteFile());
// Simulate delay
Thread.sleep(200);
return true;
});
when(connectionMock.prepareStatement(anyString())).thenReturn(psMock);
DBConfiguration dbcMock = mock(DBConfiguration.class);
when(dbcMock.getConnection()).thenReturn(connectionMock);
mapper.dbc = dbcMock;
return mapper;
}
/**
* Creates simple mapreduce context that says it has a single record but won't actually
* return any records as tests are not expected to read the records.
* @return
* @throws java.io.IOException
* @throws InterruptedException
*/
private Mapper.Context getContext() throws java.io.IOException, InterruptedException {
Mapper.Context context = mock(Mapper.Context.class);
Configuration conf = new Configuration();
when(context.getConfiguration()).thenReturn(conf);
TaskAttemptID taskAttemptID = new TaskAttemptID();
when(context.getTaskAttemptID()).thenReturn(taskAttemptID);
JobID jobID = new JobID("job001", 1);
when(context.getJobID()).thenReturn(jobID);
// Simulate a single record by answering 'true' once
when(context.nextKeyValue()).thenAnswer(new Answer<Object>() {
boolean answer = true;
@Override
public Object answer(InvocationOnMock invocation) {
if (answer == true) {
answer = false;
return true;
}
return false;
}
});
return context;
}
}

View File

@ -0,0 +1,181 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce.db.netezza;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.sqoop.mapreduce.db.DBConfiguration;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestNetezzaExternalTableImportMapper {
@Rule
public TemporaryFolder tmpFolder = new TemporaryFolder();
@Rule
public final ExpectedException exception = ExpectedException.none();
private static final SQLException testException = new SQLException("failed in test");
private NetezzaExternalTableImportMapper<LongWritable, Text> mapper;
private Mapper.Context context;
@Before
public void setUp() {
mapper = basicMockingOfMapper();
context = getContext();
}
@Test
public void testPassingJDBC() throws Exception {
withNoopJDBCOperation(mapper).map(1, null, context);
}
@Test
public void testFailingJDBC() throws Exception {
withFailingJDBCOperation(mapper);
exception.expect(IOException.class);
exception.expectCause(is(equalTo(testException)));
mapper.map(1, null, context);
}
/**
* Creates an instance of NetezzaExternalTableExportMapper with the
* necessary fields mocked to be able to call the run() method without errors.
* @return
*/
private NetezzaExternalTableImportMapper<LongWritable, Text> basicMockingOfMapper() {
return new NetezzaExternalTableImportMapper<LongWritable, Text>() {
@Override
protected void writeRecord(Text text, Context context) {
// no-op. Don't read from context, mock won't be ready to handle that.
}
};
}
/**
* Mocks mapper's DB connection in a way that leads to SQLException during the JDBC operation.
* @param mapper will modify this object
* @return modified mapper
* @throws Exception
*/
private NetezzaExternalTableImportMapper<LongWritable, Text> withFailingJDBCOperation(NetezzaExternalTableImportMapper<LongWritable, Text> mapper) throws Exception {
Connection connectionMock = mock(Connection.class);
// PreparadStatement mock should imitate loading stuff from FIFO into Netezza
PreparedStatement psMock = mock(PreparedStatement.class);
when(psMock.execute()).then(invocation -> {
// Write log file under taskAttemptDir to be able to check log upload
File logFile = mapper.taskAttemptDir.toPath().resolve("job__0000-0-0").resolve("TEST.nzlog").toFile();
FileUtils.writeStringToFile(logFile, "test log");
// Need to open FIFO for writing, otherwise reading would hang
FileOutputStream fos = new FileOutputStream(mapper.fifoFile.getAbsoluteFile());
// Simulate delay
Thread.sleep(200);
// Write single record, then throw
fos.write("test record".getBytes());
fos.close();
throw testException;
});
when(connectionMock.prepareStatement(anyString())).thenReturn(psMock);
DBConfiguration dbcMock = mock(DBConfiguration.class);
when(dbcMock.getConnection()).thenReturn(connectionMock);
mapper.dbc = dbcMock;
return mapper;
}
/**
* Mocks mapper's DB connection to execute a no-op JDBC operation.
* @param mapper will modify this object
* @return modified mapper
* @throws Exception
*/
private NetezzaExternalTableImportMapper<LongWritable, Text> withNoopJDBCOperation(NetezzaExternalTableImportMapper<LongWritable, Text> mapper) throws Exception {
Connection connectionMock = mock(Connection.class);
// PreparadStatement mock should imitate loading stuff from FIFO into Netezza
PreparedStatement psMock = mock(PreparedStatement.class);
when(psMock.execute()).then(invocation -> {
// Need to open FIFO for writing, otherwise reading would hang
FileOutputStream fos = new FileOutputStream(mapper.fifoFile.getAbsoluteFile());
// Simulate delay
Thread.sleep(200);
// Write single record and return
fos.write("test record".getBytes());
fos.close();
return true;
});
when(connectionMock.prepareStatement(anyString())).thenReturn(psMock);
DBConfiguration dbcMock = mock(DBConfiguration.class);
when(dbcMock.getConnection()).thenReturn(connectionMock);
mapper.dbc = dbcMock;
return mapper;
}
/**
* Creates simple mapreduce context that says it has a single record but won't actually
* return any records as tests are not expected to read the records.
* @return
* @throws IOException
* @throws InterruptedException
*/
private Mapper.Context getContext() {
Mapper.Context context = mock(Mapper.Context.class);
Configuration conf = new Configuration();
when(context.getConfiguration()).thenReturn(conf);
TaskAttemptID taskAttemptID = new TaskAttemptID();
when(context.getTaskAttemptID()).thenReturn(taskAttemptID);
return context;
}
}