5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-04 02:39:53 +08:00

SQOOP-2971: OraOop does not close connections properly

(Szabolcs Vasas via Kate Ting)
This commit is contained in:
Kate Ting 2016-07-01 16:00:03 -07:00
parent c339b23b6c
commit 7808c6bc3e
4 changed files with 110 additions and 6 deletions

View File

@ -136,8 +136,16 @@ public List<InputSplit> getSplits(JobContext jobContext) throws IOException {
}
}
connection.commit();
} catch (SQLException ex) {
try {
connection.rollback();
} catch (SQLException e) {
LOG.error("Cannot rollback transaction.", e);
}
throw new IOException(ex);
} finally {
closeConnection();
}
return splits;
@ -199,13 +207,11 @@ protected RecordReader<LongWritable, T> createDBRecordReader(
// the current
// value of the URL_PROPERTY...
return new OraOopDBRecordReader<T>(split, inputClass, conf, dbConf
.getConnection(), dbConf, dbConf.getInputConditions(), dbConf
return new OraOopDBRecordReader<T>(split, inputClass, conf,
getConnection(), dbConf, dbConf.getInputConditions(), dbConf
.getInputFieldNames(), dbConf.getInputTableName());
} catch (SQLException ex) {
throw new IOException(ex);
} catch (ClassNotFoundException ex) {
throw new IOException(ex);
}
}

View File

@ -158,8 +158,11 @@ public void write(DataOutput output) throws IOException {
@Override
/** {@inheritDoc} */
public void setConf(Configuration conf) {
setDbConf(new DBConfiguration(conf));
}
dbConf = new DBConfiguration(conf);
public void setDbConf(DBConfiguration dbConf) {
this.dbConf = dbConf;
try {
getConnection();
@ -389,7 +392,9 @@ protected void closeConnection() {
this.connection.close();
this.connection = null;
}
} catch (SQLException sqlE) { /* ignore exception on close. */ }
} catch (SQLException sqlE) {
LOG.error("Cannot close JDBC connection.", sqlE);
}
}
}

View File

@ -55,6 +55,7 @@
import org.apache.sqoop.manager.netezza.DirectNetezzaHCatImportManualTest;
import org.apache.sqoop.manager.netezza.NetezzaExportManualTest;
import org.apache.sqoop.manager.netezza.NetezzaImportManualTest;
import org.apache.sqoop.manager.oracle.OraOopDataDrivenDBInputFormatConnectionCloseTest;
import org.apache.sqoop.manager.oracle.OracleCallExportTest;
import org.apache.sqoop.manager.oracle.OracleIncrementalImportTest;
import org.apache.sqoop.manager.oracle.OracleSplitterTest;
@ -98,6 +99,7 @@ public static Test suite() {
suite.addTestSuite(OracleCompatTest.class);
suite.addTestSuite(OracleIncrementalImportTest.class);
suite.addTestSuite(OracleSplitterTest.class);
suite.addTestSuite(OraOopDataDrivenDBInputFormatConnectionCloseTest.class);
// SQL Server
suite.addTestSuite(SQLServerDatatypeExportDelimitedFileManualTest.class);

View File

@ -0,0 +1,91 @@
package org.apache.sqoop.manager.oracle;
import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobContext;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class OraOopDataDrivenDBInputFormatConnectionCloseTest extends TestCase {
private static final OraOopLog LOG = OraOopLogFactory.getLog(
TestOraOopDataDrivenDBInputFormat.class.getName());
private static final String ORACLE_PREPARED_STATEMENT_CLASS = "oracle.jdbc.OraclePreparedStatement";
private OraOopDataDrivenDBInputFormat inputFormat;
private Connection mockConnection;
private JobContext mockJobContext;
@Before
public void setUp() throws Exception {
Configuration configuration = new Configuration();
configuration.set(DBConfiguration.USERNAME_PROPERTY, "Oracle user");
configuration.setInt(OraOopConstants.ORAOOP_DESIRED_NUMBER_OF_MAPPERS, 1);
Class<? extends PreparedStatement> preparedStatementClass =
(Class<? extends PreparedStatement>) Class.forName(ORACLE_PREPARED_STATEMENT_CLASS);
PreparedStatement mockPreparedStatement = mock(preparedStatementClass);
ResultSet mockResultSet = mock(ResultSet.class);
when(mockResultSet.next()).thenReturn(true).thenReturn(false);
when(mockPreparedStatement.executeQuery()).thenReturn(mockResultSet);
mockConnection = mock(Connection.class);
DatabaseMetaData dbMetaData = mock(DatabaseMetaData.class);
when(dbMetaData.getDatabaseProductName()).thenReturn("Oracle");
when(mockConnection.getMetaData()).thenReturn(dbMetaData);
when(mockConnection.prepareStatement(anyString())).thenReturn(mockPreparedStatement);
DBConfiguration dbConf = mock(DBConfiguration.class);
when(dbConf.getConnection()).thenReturn(mockConnection);
when(dbConf.getConf()).thenReturn(configuration);
when(dbConf.getInputTableName()).thenReturn("InputTable");
mockJobContext = mock(JobContext.class);
when(mockJobContext.getConfiguration()).thenReturn(configuration);
inputFormat = new OraOopDataDrivenDBInputFormat();
inputFormat.setDbConf(dbConf);
}
@Test
public void testGetSplitsClosesConnectionProperly() throws Exception {
inputFormat.getSplits(mockJobContext);
verify(mockConnection).commit();
verify(mockConnection).close();
}
@Test
public void testGetSplitsClosesConnectionProperlyWhenExceptionIsThrown() throws Exception {
doThrow(new SQLException("For the sake of testing the commit fails.")).when(mockConnection).commit();
try {
inputFormat.getSplits(mockJobContext);
} catch (IOException e) {
LOG.debug("An expected exception is thrown in testSplitsClosesConnectionProperlyWhenExceptionIsThrown, ignoring.");
}
verify(mockConnection).rollback();
verify(mockConnection).close();
}
}