mirror of
https://github.com/apache/sqoop.git
synced 2025-05-17 01:11:07 +08:00
SQOOP-929: Add more Netezza direct mode tests
(Venkat Ranganathan via Jarek Jarcec Cecho)
This commit is contained in:
parent
05976e709d
commit
b4b9920c4f
@ -18,56 +18,30 @@
|
||||
|
||||
package com.cloudera.sqoop.manager;
|
||||
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.io.OutputStreamWriter;
|
||||
import java.sql.Connection;
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.Statement;
|
||||
import java.sql.SQLException;
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.sqoop.manager.DirectNetezzaManager;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
|
||||
import com.cloudera.sqoop.SqoopOptions;
|
||||
import com.cloudera.sqoop.TestExport;
|
||||
import com.cloudera.sqoop.TestExport.ColumnGenerator;
|
||||
|
||||
/**
|
||||
* Test the DirectNetezzaManager implementation's exportJob() functionality.
|
||||
*/
|
||||
public class DirectNetezzaExportManualTest extends TestExport {
|
||||
public class DirectNetezzaExportManualTest extends NetezzaExportManualTest {
|
||||
|
||||
public static final Log LOG = LogFactory.getLog(
|
||||
DirectNetezzaExportManualTest.class.getName());
|
||||
public static final Log LOG = LogFactory
|
||||
.getLog(DirectNetezzaExportManualTest.class.getName());
|
||||
|
||||
static final String TABLE_PREFIX = "EMPNZ";
|
||||
|
||||
// instance variables populated during setUp, used during tests.
|
||||
private DirectNetezzaManager manager;
|
||||
private Connection conn;
|
||||
|
||||
@Override
|
||||
protected Connection getConnection() {
|
||||
return conn;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean useHsqldbTestServer() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getConnectString() {
|
||||
return NetezzaTestUtils.getNZConnectString();
|
||||
}
|
||||
static final String TABLE_PREFIX = "EMPNZ_D_EXP";
|
||||
|
||||
@Override
|
||||
protected String getTablePrefix() {
|
||||
@ -75,50 +49,14 @@ protected String getTablePrefix() {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getDropTableStatement(String tableName) {
|
||||
return "DROP TABLE " + tableName;
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
super.setUp();
|
||||
conn = getConnection();
|
||||
SqoopOptions options = new SqoopOptions(
|
||||
NetezzaTestUtils.getNZConnectString(), getTableName());
|
||||
options.setUsername(NetezzaTestUtils.getNZUser());
|
||||
options.setPassword(NetezzaTestUtils.getNZPassword());
|
||||
this.manager = new DirectNetezzaManager(options);
|
||||
|
||||
try {
|
||||
this.conn = manager.getConnection();
|
||||
this.conn.setAutoCommit(false);
|
||||
} catch (SQLException sqlE) {
|
||||
LOG.error("Encountered SQL Exception: " + sqlE);
|
||||
sqlE.printStackTrace();
|
||||
fail("SQLException when running test setUp(): " + sqlE);
|
||||
}
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
super.tearDown();
|
||||
if (null != manager) {
|
||||
try {
|
||||
manager.close();
|
||||
} catch (SQLException sqlE) {
|
||||
LOG.error("Got SQLException: " + sqlE.toString());
|
||||
fail("Got SQLException: " + sqlE.toString());
|
||||
}
|
||||
}
|
||||
this.conn = null;
|
||||
this.manager = null;
|
||||
|
||||
protected boolean isDirectMode() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String [] getCodeGenArgv(String... extraArgs) {
|
||||
protected String[] getCodeGenArgv(String... extraArgs) {
|
||||
|
||||
String [] moreArgs = new String[extraArgs.length + 4];
|
||||
String[] moreArgs = new String[extraArgs.length + 4];
|
||||
int i = 0;
|
||||
for (i = 0; i < extraArgs.length; i++) {
|
||||
moreArgs[i] = extraArgs[i];
|
||||
@ -134,153 +72,117 @@ public void tearDown() {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String [] getArgv(boolean includeHadoopFlags,
|
||||
int rowsPerStatement, int statementsPerTx, String... additionalArgv) {
|
||||
protected String[] getArgv(boolean includeHadoopFlags, int rowsPerStatement,
|
||||
int statementsPerTx, String... additionalArgv) {
|
||||
|
||||
String [] subArgv = newStrArray(additionalArgv, "--direct",
|
||||
"--username", NetezzaTestUtils.getNZUser(), "--password",
|
||||
NetezzaTestUtils.getNZPassword());
|
||||
return super.getArgv(includeHadoopFlags, rowsPerStatement,
|
||||
statementsPerTx, subArgv);
|
||||
String[] argV = super.getArgv(includeHadoopFlags,
|
||||
rowsPerStatement, statementsPerTx);
|
||||
String[] subArgV = newStrArray(argV, "--direct",
|
||||
"--username", NetezzaTestUtils.getNZUser(), "--password",
|
||||
NetezzaTestUtils.getNZPassword());
|
||||
String[] newArgV = new String[subArgV.length + additionalArgv.length];
|
||||
int i = 0;
|
||||
for (String s : subArgV) {
|
||||
newArgV[i++] = s;
|
||||
}
|
||||
for (String s: additionalArgv) {
|
||||
newArgV[i++] = s;
|
||||
}
|
||||
return newArgV;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Create the table definition to export to, removing any prior table. By
|
||||
* specifying ColumnGenerator arguments, you can add extra columns to the
|
||||
* table of arbitrary type.
|
||||
*/
|
||||
@Override
|
||||
public void createTable(ColumnGenerator... extraColumns) throws SQLException {
|
||||
PreparedStatement statement = conn.prepareStatement(
|
||||
getDropTableStatement(getTableName()), ResultSet.TYPE_FORWARD_ONLY,
|
||||
ResultSet.CONCUR_READ_ONLY);
|
||||
try {
|
||||
statement.executeUpdate();
|
||||
conn.commit();
|
||||
} catch (SQLException sqle) {
|
||||
conn.rollback();
|
||||
} finally {
|
||||
statement.close();
|
||||
}
|
||||
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("CREATE TABLE ");
|
||||
sb.append(getTableName());
|
||||
sb.append(" (id INT NOT NULL PRIMARY KEY, msg VARCHAR(64)");
|
||||
int colNum = 0;
|
||||
for (ColumnGenerator gen : extraColumns) {
|
||||
sb.append(", " + forIdx(colNum++) + " " + gen.getType());
|
||||
}
|
||||
sb.append(")");
|
||||
|
||||
statement = conn.prepareStatement(sb.toString(),
|
||||
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
|
||||
try {
|
||||
statement.executeUpdate();
|
||||
conn.commit();
|
||||
} finally {
|
||||
statement.close();
|
||||
}
|
||||
public void createTable(ColumnGenerator... extraColumns)
|
||||
throws SQLException {
|
||||
createTableNZ(getTableName(), extraColumns);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test an authenticated export using netezza external table import.
|
||||
* Creates the staging table.
|
||||
* @param extraColumns extra columns that go in the staging table
|
||||
* @throws SQLException if an error occurs during export
|
||||
*/
|
||||
public void testAuthExport() throws IOException, SQLException {
|
||||
@Override
|
||||
public void createStagingTable(ColumnGenerator... extraColumns)
|
||||
throws SQLException {
|
||||
createTableNZ(getStagingTableName(), extraColumns);
|
||||
}
|
||||
|
||||
private void runNetezzaTest(String tableName, String[] argv,
|
||||
ColumnGenerator...extraCols) throws IOException {
|
||||
SqoopOptions options = new SqoopOptions(
|
||||
NetezzaTestUtils.getNZConnectString(),
|
||||
getTableName());
|
||||
NetezzaTestUtils.getNZConnectString(), getTableName());
|
||||
options.setUsername(NetezzaTestUtils.getNZUser());
|
||||
options.setPassword(NetezzaTestUtils.getNZPassword());
|
||||
|
||||
LOG.info("Running export with argv : " + Arrays.toString(argv));
|
||||
manager = new DirectNetezzaManager(options);
|
||||
|
||||
Connection connection = null;
|
||||
Statement st = null;
|
||||
|
||||
String tableName = getTableName();
|
||||
|
||||
try {
|
||||
connection = manager.getConnection();
|
||||
connection.setAutoCommit(false);
|
||||
st = connection.createStatement();
|
||||
|
||||
// create a target database table.
|
||||
try {
|
||||
st.executeUpdate("DROP TABLE " + tableName);
|
||||
} catch(SQLException sqle) {
|
||||
LOG.info("Ignoring exception from DROP TABLE : " + sqle.getMessage());
|
||||
connection.rollback();
|
||||
}
|
||||
|
||||
LOG.info("Creating table " + tableName);
|
||||
|
||||
st.executeUpdate("CREATE TABLE " + tableName + " ("
|
||||
+ "id INT NOT NULL PRIMARY KEY, "
|
||||
+ "msg VARCHAR(24) NOT NULL)");
|
||||
|
||||
connection.commit();
|
||||
createTable(extraCols);
|
||||
LOG.info("Created table " + tableName);
|
||||
|
||||
// Write a file containing a record to export.
|
||||
Path tablePath = getTablePath();
|
||||
Path filePath = new Path(tablePath, "datafile");
|
||||
Configuration conf = new Configuration();
|
||||
|
||||
FileSystem fs = FileSystem.get(conf);
|
||||
fs.mkdirs(tablePath);
|
||||
OutputStream os = fs.create(filePath);
|
||||
BufferedWriter w = new BufferedWriter(new OutputStreamWriter(os));
|
||||
w.write(getRecordLine(0));
|
||||
w.write(getRecordLine(1));
|
||||
w.write(getRecordLine(2));
|
||||
w.close();
|
||||
os.close();
|
||||
|
||||
createExportFile(extraCols);
|
||||
// run the export and verify that the results are good.
|
||||
runExport(getArgv(true, 10, 10,
|
||||
"--username", NetezzaTestUtils.getNZUser(),
|
||||
"--password", NetezzaTestUtils.getNZPassword(),
|
||||
"--connect", NetezzaTestUtils.getNZConnectString()));
|
||||
verifyExport(3, connection);
|
||||
runExport(argv);
|
||||
verifyExport(3, conn);
|
||||
if (extraCols.length > 0) {
|
||||
assertColMinAndMax(forIdx(0), extraCols[0]);
|
||||
}
|
||||
} catch (SQLException sqlE) {
|
||||
LOG.error("Encountered SQL Exception: " + sqlE);
|
||||
sqlE.printStackTrace();
|
||||
fail("SQLException when accessing target table. " + sqlE);
|
||||
} finally {
|
||||
try {
|
||||
if (null != st) {
|
||||
st.close();
|
||||
}
|
||||
} catch (SQLException sqlE) {
|
||||
LOG.warn("Got SQLException when closing connection: " + sqlE);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test an authenticated export using netezza external table import.
|
||||
*/
|
||||
@Test
|
||||
public void testSimpleExport() throws IOException, SQLException {
|
||||
String[] argv = getArgv(true, 10, 10);
|
||||
runNetezzaTest(getTableName(), argv);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidExtraArgs() throws Exception {
|
||||
|
||||
String [] extraArgs = {
|
||||
"--",
|
||||
"--log-dir", "/tmp",
|
||||
"--max-errors", "2",
|
||||
};
|
||||
String[] argv = getArgv(true, 10, 10, extraArgs);
|
||||
runNetezzaTest(getTableName(), argv);
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public void testMultiMapTextExportWithStaging()
|
||||
throws IOException, SQLException {
|
||||
public void testMultiMapTextExportWithStaging() throws IOException,
|
||||
SQLException {
|
||||
// disable this test as staging is not supported in direct mode
|
||||
}
|
||||
|
||||
@Override
|
||||
public void testMultiTransactionWithStaging()
|
||||
throws IOException, SQLException {
|
||||
public void testMultiTransactionWithStaging() throws IOException,
|
||||
SQLException {
|
||||
// disable this test as staging is not supported in direct mode
|
||||
}
|
||||
|
||||
@Override
|
||||
public void testColumnsExport()
|
||||
throws IOException, SQLException {
|
||||
public void testColumnsExport() throws IOException, SQLException {
|
||||
// disable this test as it is not supported in direct mode
|
||||
}
|
||||
|
||||
@Override
|
||||
public void testSequenceFileExport()
|
||||
throws IOException, SQLException {
|
||||
public void testSequenceFileExport() throws IOException, SQLException {
|
||||
// disable this test as it is not supported in direct mode
|
||||
}
|
||||
}
|
||||
|
246
src/test/com/cloudera/sqoop/manager/NetezzaExportManualTest.java
Normal file
246
src/test/com/cloudera/sqoop/manager/NetezzaExportManualTest.java
Normal file
@ -0,0 +1,246 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package com.cloudera.sqoop.manager;
|
||||
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.io.OutputStreamWriter;
|
||||
import java.sql.Connection;
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.sqoop.manager.DirectNetezzaManager;
|
||||
import org.apache.sqoop.manager.NetezzaManager;
|
||||
import org.junit.Before;
|
||||
|
||||
|
||||
import com.cloudera.sqoop.SqoopOptions;
|
||||
import com.cloudera.sqoop.TestExport;
|
||||
import com.cloudera.sqoop.testutil.BaseSqoopTestCase;
|
||||
import com.cloudera.sqoop.testutil.CommonArgs;
|
||||
|
||||
/**
|
||||
* Test the Netezza implementation.
|
||||
*
|
||||
* This uses JDBC to export data from an Netezza database into HDFS. See
|
||||
* DirectNetezzaExportManualTest for external table methods.
|
||||
*
|
||||
* Since this requires an Netezza Server installation, this class is named in
|
||||
* such a way that Sqoop's default QA process does not run it. You need to run
|
||||
* this manually with -Dtestcase=NetezzaExportManualTest.
|
||||
*
|
||||
*/
|
||||
public class NetezzaExportManualTest extends TestExport {
|
||||
public static final Log LOG = LogFactory.getLog(NetezzaExportManualTest.class
|
||||
.getName());
|
||||
static final String TABLE_PREFIX = "EMPNZ_EXP_";
|
||||
// instance variables populated during setUp, used during tests
|
||||
protected NetezzaManager manager;
|
||||
protected Connection conn;
|
||||
|
||||
|
||||
@Override
|
||||
protected boolean useHsqldbTestServer() {
|
||||
return false;
|
||||
}
|
||||
|
||||
protected boolean isDirectMode() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Connection getConnection() {
|
||||
return conn;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected String getConnectString() {
|
||||
return NetezzaTestUtils.getNZConnectString();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getTablePrefix() {
|
||||
return TABLE_PREFIX;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getDropTableStatement(String tableName) {
|
||||
return "DROP TABLE " + tableName;
|
||||
}
|
||||
|
||||
protected void createTableNZ(String tableName, ColumnGenerator...extraCols)
|
||||
throws SQLException {
|
||||
String sqlStatement = getDropTableStatement(tableName);
|
||||
conn.rollback();
|
||||
LOG.info("Executing drop statement : " + sqlStatement);
|
||||
PreparedStatement statement = conn.prepareStatement(
|
||||
sqlStatement, ResultSet.TYPE_FORWARD_ONLY,
|
||||
ResultSet.CONCUR_READ_ONLY);
|
||||
try {
|
||||
statement.executeUpdate();
|
||||
conn.commit();
|
||||
} catch (SQLException sqle) {
|
||||
conn.rollback();
|
||||
} finally {
|
||||
statement.close();
|
||||
}
|
||||
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("CREATE TABLE ");
|
||||
sb.append(tableName);
|
||||
sb.append(" (id INT NOT NULL PRIMARY KEY, msg VARCHAR(64)");
|
||||
int colNum = 0;
|
||||
for (ColumnGenerator gen : extraCols) {
|
||||
sb.append(", " + forIdx(colNum++) + " " + gen.getType());
|
||||
}
|
||||
sb.append(")");
|
||||
sqlStatement = sb.toString();
|
||||
LOG.info("Executing create statement : " + sqlStatement);
|
||||
statement = conn.prepareStatement(sqlStatement,
|
||||
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
|
||||
try {
|
||||
statement.executeUpdate();
|
||||
conn.commit();
|
||||
} finally {
|
||||
statement.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the table definition to export to, removing any prior table. By
|
||||
* specifying ColumnGenerator arguments, you can add extra columns to the
|
||||
* table of arbitrary type.
|
||||
*/
|
||||
@Override
|
||||
public void createTable(ColumnGenerator... extraColumns)
|
||||
throws SQLException {
|
||||
createTableNZ(getTableName(), extraColumns);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates the staging table.
|
||||
* @param extraColumns extra columns that go in the staging table
|
||||
* @throws SQLException if an error occurs during export
|
||||
*/
|
||||
@Override
|
||||
public void createStagingTable(ColumnGenerator... extraColumns)
|
||||
throws SQLException {
|
||||
createTableNZ(getStagingTableName(), extraColumns);
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
super.setUp();
|
||||
SqoopOptions options = new SqoopOptions(
|
||||
NetezzaTestUtils.getNZConnectString(), getTableName());
|
||||
options.setUsername(NetezzaTestUtils.getNZUser());
|
||||
options.setPassword(NetezzaTestUtils.getNZPassword());
|
||||
if (isDirectMode()) {
|
||||
this.manager = new DirectNetezzaManager(options);
|
||||
} else {
|
||||
this.manager = new NetezzaManager(options);
|
||||
}
|
||||
|
||||
try {
|
||||
this.conn = manager.getConnection();
|
||||
this.conn.setAutoCommit(false);
|
||||
} catch (SQLException sqlE) {
|
||||
LOG.error("Encountered SQL Exception: " + sqlE);
|
||||
sqlE.printStackTrace();
|
||||
fail("SQLException when running test setUp(): " + sqlE);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
protected String[] getArgv(boolean includeHadoopFlags, int rowsPerStatement,
|
||||
int statementsPerTx, String... additionalArgv) {
|
||||
|
||||
String[] argV = super.getArgv(includeHadoopFlags,
|
||||
rowsPerStatement, statementsPerTx);
|
||||
String[] subArgV = newStrArray(argV,
|
||||
"--username", NetezzaTestUtils.getNZUser(), "--password",
|
||||
NetezzaTestUtils.getNZPassword());
|
||||
String[] newArgV = new String[subArgV.length + additionalArgv.length];
|
||||
int i = 0;
|
||||
for (String s : subArgV) {
|
||||
newArgV[i++] = s;
|
||||
}
|
||||
for (String s: additionalArgv) {
|
||||
newArgV[i++] = s;
|
||||
}
|
||||
return newArgV;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String[] getCodeGenArgv(String... extraArgs) {
|
||||
String[] moreArgs;
|
||||
|
||||
moreArgs = new String[extraArgs.length + 4];
|
||||
|
||||
int i = 0;
|
||||
for (i = 0; i < extraArgs.length; i++) {
|
||||
moreArgs[i] = extraArgs[i];
|
||||
}
|
||||
|
||||
// Add username argument for netezza.
|
||||
moreArgs[i++] = "--username";
|
||||
moreArgs[i++] = NetezzaTestUtils.getNZUser();
|
||||
moreArgs[i++] = "--password";
|
||||
moreArgs[i++] = NetezzaTestUtils.getNZPassword();
|
||||
|
||||
return super.getCodeGenArgv(moreArgs);
|
||||
}
|
||||
|
||||
protected void createExportFile(ColumnGenerator...extraCols)
|
||||
throws IOException, SQLException {
|
||||
String ext = ".txt";
|
||||
|
||||
Path tablePath = getTablePath();
|
||||
Path filePath = new Path(tablePath, "part0" + ext);
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
if (!BaseSqoopTestCase.isOnPhysicalCluster()) {
|
||||
conf.set(CommonArgs.FS_DEFAULT_NAME, CommonArgs.LOCAL_FS);
|
||||
}
|
||||
FileSystem fs = FileSystem.get(conf);
|
||||
fs.mkdirs(tablePath);
|
||||
OutputStream os = fs.create(filePath);
|
||||
|
||||
|
||||
BufferedWriter w = new BufferedWriter(new OutputStreamWriter(os));
|
||||
for (int i = 0; i < 3; i++) {
|
||||
String line = getRecordLine(i, extraCols);
|
||||
w.write(line);
|
||||
LOG.debug("Create Export file - Writing line : " + line);
|
||||
}
|
||||
w.close();
|
||||
os.close();
|
||||
}
|
||||
}
|
@ -23,6 +23,8 @@
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.sql.Connection;
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
import java.util.ArrayList;
|
||||
@ -43,26 +45,24 @@
|
||||
import com.cloudera.sqoop.util.FileListing;
|
||||
|
||||
/**
|
||||
* Test the Netezza implementation.
|
||||
* Test the Netezza implementation.
|
||||
*
|
||||
* This uses JDBC to import data from an Netezza database into HDFS.
|
||||
* This uses both JDBC and external tables to import data from an Netezza
|
||||
* database into HDFS.
|
||||
*
|
||||
* Since this requires an Netezza SErver installation, this class is named
|
||||
* in such a way that Sqoop's default QA process does not run it. You need to
|
||||
* run this manually with -Dtestcase=NetezzaManagerImportManualTest.
|
||||
* Since this requires an Netezza Server installation, this class is named in
|
||||
* such a way that Sqoop's default QA process does not run it. You need to run
|
||||
* this manually with -Dtestcase=NetezzaImportManualTest.
|
||||
*
|
||||
*/
|
||||
public class NetezzaImportManualTest extends ImportJobTestCase {
|
||||
|
||||
public static final Log LOG = LogFactory
|
||||
.getLog(NetezzaImportManualTest.class.getName());
|
||||
|
||||
|
||||
public static final Log LOG = LogFactory.
|
||||
getLog(NetezzaImportManualTest.class.getName());
|
||||
|
||||
// instance variables populated during setUp, used during tests
|
||||
private NetezzaManager manager;
|
||||
|
||||
|
||||
private Connection conn;
|
||||
@Override
|
||||
protected boolean useHsqldbTestServer() {
|
||||
return false;
|
||||
@ -70,70 +70,99 @@ protected boolean useHsqldbTestServer() {
|
||||
|
||||
@Override
|
||||
protected String getTableName() {
|
||||
return NetezzaTestUtils.TABLE_NAME;
|
||||
return NetezzaTestUtils.TABLE_NAME + "_IMP_";
|
||||
}
|
||||
|
||||
|
||||
private void createTable(String tableName, String... extraColumns)
|
||||
throws SQLException {
|
||||
PreparedStatement statement = conn.prepareStatement("DROP TABLE "
|
||||
+ tableName, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
|
||||
try {
|
||||
statement.executeUpdate();
|
||||
conn.commit();
|
||||
} catch (SQLException sqle) {
|
||||
conn.rollback();
|
||||
} finally {
|
||||
statement.close();
|
||||
}
|
||||
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("CREATE TABLE " + tableName + " (");
|
||||
sb.append("id INT NOT NULL PRIMARY KEY, ");
|
||||
sb.append("name VARCHAR(24) NOT NULL, ");
|
||||
sb.append("start_date DATE, ");
|
||||
sb.append("Salary FLOAT, ");
|
||||
sb.append("Fired BOOL, ");
|
||||
sb.append("dept VARCHAR(32) ");
|
||||
for (String col : extraColumns) {
|
||||
sb.append(", " + col + " INTEGER");
|
||||
}
|
||||
sb.append(")");
|
||||
|
||||
statement = conn.prepareStatement(sb.toString(),
|
||||
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
|
||||
try {
|
||||
statement.executeUpdate();
|
||||
conn.commit();
|
||||
} finally {
|
||||
statement.close();
|
||||
}
|
||||
}
|
||||
|
||||
private void populateTable(String tableName) throws SQLException {
|
||||
Statement statement = conn.createStatement();
|
||||
try {
|
||||
statement.executeUpdate("INSERT INTO " + tableName
|
||||
+ " VALUES(1,'Aaron','2009-05-14',1000000.00,TRUE,'engineering')");
|
||||
statement.executeUpdate("INSERT INTO " + tableName
|
||||
+ " VALUES(2,'Bob','2009-04-20',400.00,TRUE,'sales')");
|
||||
statement.executeUpdate("INSERT INTO " + tableName
|
||||
+ " VALUES(3,'Fred','2009-01-23',15.00,FALSE,'marketing')");
|
||||
conn.commit();
|
||||
} finally {
|
||||
statement.close();
|
||||
}
|
||||
}
|
||||
|
||||
private void populateTableWithNull(String tableName) throws SQLException{
|
||||
Statement statement = conn.createStatement();
|
||||
try {
|
||||
statement.executeUpdate("INSERT INTO " + tableName
|
||||
+ " VALUES(1,'Aaron','2009-05-14',1000000.00,TRUE,"
|
||||
+ "'engineering',NULL,1)");
|
||||
statement.executeUpdate("INSERT INTO " + tableName
|
||||
+ " VALUES(2,'Bob','2009-04-20',400.00,TRUE,'sales',NULL,2)");
|
||||
statement.executeUpdate("INSERT INTO " + tableName
|
||||
+ " VALUES(3,'Fred','2009-01-23',15.00,FALSE,'marketing',NULL,3)");
|
||||
conn.commit();
|
||||
} finally {
|
||||
statement.close();
|
||||
}
|
||||
}
|
||||
|
||||
public void setUpData() {
|
||||
SqoopOptions options = new SqoopOptions(
|
||||
NetezzaTestUtils.getNZConnectString(), getTableName());
|
||||
options.setUsername(NetezzaTestUtils.getNZUser());
|
||||
options.setPassword(NetezzaTestUtils.getNZPassword());
|
||||
try {
|
||||
manager = new NetezzaManager(options);
|
||||
conn = manager.getConnection();
|
||||
createTable(getTableName());
|
||||
populateTable(getTableName());
|
||||
String tableNameWithNull = getTableName() + "_W_N";
|
||||
createTable(tableNameWithNull, new String[] { "col0", "col1" });
|
||||
populateTableWithNull(tableNameWithNull);
|
||||
} catch (SQLException sqlE) {
|
||||
fail("Setup failed with SQLException " + sqlE);
|
||||
}
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
super.setUp();
|
||||
|
||||
SqoopOptions options = new SqoopOptions(
|
||||
NetezzaTestUtils.getNZConnectString(), getTableName());
|
||||
options.setUsername(NetezzaTestUtils.getNZUser());
|
||||
options.setPassword(NetezzaTestUtils.getNZPassword());
|
||||
|
||||
manager = new NetezzaManager(options);
|
||||
|
||||
// Drop the existing table, if there is one.
|
||||
Connection conn = null;
|
||||
Statement stmt = null;
|
||||
try {
|
||||
conn = manager.getConnection();
|
||||
stmt = conn.createStatement();
|
||||
stmt.execute("DROP TABLE " + getTableName());
|
||||
} catch (SQLException sqlE) {
|
||||
LOG.info("Table was not dropped: " + sqlE.getMessage());
|
||||
} finally {
|
||||
try {
|
||||
if (null != stmt) {
|
||||
stmt.close();
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
LOG.warn("Exception while closing stmt", ex);
|
||||
}
|
||||
}
|
||||
|
||||
// Create and populate table
|
||||
try {
|
||||
conn = manager.getConnection();
|
||||
conn.setAutoCommit(false);
|
||||
stmt = conn.createStatement();
|
||||
|
||||
// create the database table and populate it with data.
|
||||
stmt.executeUpdate("CREATE TABLE " + getTableName() + " ("
|
||||
+ "id INT NOT NULL, " + "name VARCHAR(24) NOT NULL, "
|
||||
+ "salary FLOAT, " + "dept VARCHAR(32), " + "PRIMARY KEY (id))");
|
||||
|
||||
stmt.executeUpdate("INSERT INTO " + getTableName() + " VALUES("
|
||||
+ "1,'Aaron', " + "1000000.00,'engineering')");
|
||||
stmt.executeUpdate("INSERT INTO " + getTableName() + " VALUES("
|
||||
+ "2,'Bob', " + "400.00,'sales')");
|
||||
stmt.executeUpdate("INSERT INTO " + getTableName() + " VALUES("
|
||||
+ "3,'Fred', 15.00," + "'marketing')");
|
||||
conn.commit();
|
||||
} catch (SQLException sqlE) {
|
||||
LOG.error("Encountered SQL Exception: ", sqlE);
|
||||
sqlE.printStackTrace();
|
||||
fail("SQLException when running test setUp(): " + sqlE);
|
||||
} finally {
|
||||
try {
|
||||
if (null != stmt) {
|
||||
stmt.close();
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
LOG.warn("Exception while closing connection/stmt", ex);
|
||||
}
|
||||
}
|
||||
setUpData();
|
||||
}
|
||||
|
||||
@After
|
||||
@ -147,24 +176,50 @@ public void tearDown() {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNetezzaImport() throws IOException {
|
||||
|
||||
runNetezzaTest(getExpectedResults());
|
||||
}
|
||||
|
||||
private String[] getExpectedResults() {
|
||||
return new String[] { "1,Aaron,1000000.0,engineering", "2,Bob,400.0,sales",
|
||||
"3,Fred,15.0,marketing", };
|
||||
String [] expectedResults = {
|
||||
"1,Aaron,2009-05-14,1000000.0,true,engineering",
|
||||
"2,Bob,2009-04-20,400.0,true,sales",
|
||||
"3,Fred,2009-01-23,15.0,false,marketing",
|
||||
};
|
||||
|
||||
return expectedResults;
|
||||
}
|
||||
private String[] getDirectModeExpectedResults() {
|
||||
String [] expectedResults = {
|
||||
"1,Aaron,2009-05-14,1000000,T,engineering",
|
||||
"2,Bob,2009-04-20,400,T,sales",
|
||||
"3,Fred,2009-01-23,15,F,marketing",
|
||||
};
|
||||
return expectedResults;
|
||||
}
|
||||
private String[] getExpectedResultsWithNulls() {
|
||||
String [] expectedResults = {
|
||||
"1,Aaron,2009-05-14,1000000.0,true,engineering,\\N,1",
|
||||
"2,Bob,2009-04-20,400.0,true,sales,\\N,2",
|
||||
"3,Fred,2009-01-23,15.0,false,marketing,\\N,3",
|
||||
};
|
||||
|
||||
return expectedResults;
|
||||
}
|
||||
|
||||
private String[] getArgv() {
|
||||
private String[] getDirectModeExpectedResultsWithNulls() {
|
||||
String [] expectedResults = {
|
||||
"1,Aaron,2009-05-14,1000000,T,engineering,nvl,1",
|
||||
"2,Bob,2009-04-20,400,T,sales,nvl,2",
|
||||
"3,Fred,2009-01-23,15,F,marketing,nvl,3",
|
||||
};
|
||||
|
||||
return expectedResults;
|
||||
}
|
||||
private String[] getArgv(boolean isDirect, String tableName,
|
||||
String... extraArgs) {
|
||||
ArrayList<String> args = new ArrayList<String>();
|
||||
|
||||
CommonArgs.addHadoopFlags(args);
|
||||
|
||||
args.add("--table");
|
||||
args.add(getTableName());
|
||||
args.add(tableName);
|
||||
args.add("--warehouse-dir");
|
||||
args.add(getWarehouseDir());
|
||||
args.add("--connect");
|
||||
@ -175,14 +230,25 @@ private String[] getArgv() {
|
||||
args.add(NetezzaTestUtils.getNZPassword());
|
||||
args.add("--num-mappers");
|
||||
args.add("1");
|
||||
|
||||
if (isDirect) {
|
||||
args.add("--direct");
|
||||
}
|
||||
for (String arg : extraArgs) {
|
||||
args.add(arg);
|
||||
}
|
||||
return args.toArray(new String[args.size()]);
|
||||
}
|
||||
|
||||
private void runNetezzaTest(String[] expectedResults) throws IOException {
|
||||
private void runNetezzaTest(boolean isDirect, String tableName,
|
||||
String[] expectedResults, String... extraArgs) throws IOException {
|
||||
|
||||
Path warehousePath = new Path(this.getWarehouseDir());
|
||||
Path tablePath = new Path(warehousePath, getTableName());
|
||||
Path filePath = new Path(tablePath, "part-m-00000");
|
||||
Path tablePath = new Path(warehousePath, tableName);
|
||||
|
||||
Path filePath;
|
||||
|
||||
filePath = new Path(tablePath, "part-m-00000");
|
||||
|
||||
File tableFile = new File(tablePath.toString());
|
||||
if (tableFile.exists() && tableFile.isDirectory()) {
|
||||
@ -190,7 +256,7 @@ private void runNetezzaTest(String[] expectedResults) throws IOException {
|
||||
FileListing.recursiveDeleteDir(tableFile);
|
||||
}
|
||||
|
||||
String[] argv = getArgv();
|
||||
String[] argv = getArgv(isDirect, tableName, extraArgs);
|
||||
try {
|
||||
runImport(argv);
|
||||
} catch (IOException ioe) {
|
||||
@ -200,7 +266,7 @@ private void runNetezzaTest(String[] expectedResults) throws IOException {
|
||||
}
|
||||
|
||||
File f = new File(filePath.toString());
|
||||
assertTrue("Could not find imported data file", f.exists());
|
||||
assertTrue("Could not find imported data file : " + f, f.exists());
|
||||
BufferedReader r = null;
|
||||
try {
|
||||
// Read through the file and make sure it's all there.
|
||||
@ -208,6 +274,7 @@ private void runNetezzaTest(String[] expectedResults) throws IOException {
|
||||
String[] s = new String[3];
|
||||
for (int i = 0; i < s.length; ++i) {
|
||||
s[i] = r.readLine();
|
||||
LOG.info("Line read from file = " + s[i]);
|
||||
}
|
||||
Arrays.sort(s);
|
||||
for (int i = 0; i < expectedResults.length; ++i) {
|
||||
@ -222,4 +289,70 @@ private void runNetezzaTest(String[] expectedResults) throws IOException {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNetezzaImport() throws IOException {
|
||||
|
||||
runNetezzaTest(false, getTableName(), getExpectedResults());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDirectImport() throws IOException {
|
||||
runNetezzaTest(true, getTableName(), getDirectModeExpectedResults());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testListTables() throws IOException {
|
||||
SqoopOptions options = new SqoopOptions(
|
||||
NetezzaTestUtils.getNZConnectString(), getTableName());
|
||||
options.setUsername(NetezzaTestUtils.getNZUser());
|
||||
options.setPassword(NetezzaTestUtils.getNZPassword());
|
||||
|
||||
ConnManager mgr = new NetezzaManager(options);
|
||||
String[] tables = mgr.listTables();
|
||||
Arrays.sort(tables);
|
||||
assertTrue(getTableName() + " is not found!",
|
||||
Arrays.binarySearch(tables, getTableName()) >= 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncrementalImport() throws IOException {
|
||||
String[] expectedResults = {};
|
||||
|
||||
String[] extraArgs = { "--incremental", "lastmodified", "--check-column",
|
||||
"START_DATE", };
|
||||
|
||||
runNetezzaTest(false, getTableName(), expectedResults, extraArgs);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNullEscapeCharacters() throws Exception {
|
||||
|
||||
|
||||
String [] extraArgs = {
|
||||
"--null-string", "\\\\N",
|
||||
"--null-non-string", "\\\\N",
|
||||
};
|
||||
|
||||
String[] expectedResultsWithNulls =
|
||||
getExpectedResultsWithNulls();
|
||||
String tableNameWithNull = getTableName() + "_W_N";
|
||||
|
||||
runNetezzaTest(false, tableNameWithNull, expectedResultsWithNulls,
|
||||
extraArgs);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidExtraArgs() throws Exception {
|
||||
|
||||
String [] extraArgs = {
|
||||
"--",
|
||||
"--log-dir", "/tmp",
|
||||
"--max-errors", "2",
|
||||
};
|
||||
String[] expectedResults = getDirectModeExpectedResults();
|
||||
String tableName = getTableName();
|
||||
|
||||
runNetezzaTest(true, tableName, expectedResults,
|
||||
extraArgs);
|
||||
}
|
||||
}
|
||||
|
@ -45,7 +45,7 @@ public final class NetezzaTestUtils {
|
||||
public static final String NZ_DB_NAME = System.getProperty(
|
||||
"sqoop.test.netezza.db.name", "SQOOP");
|
||||
public static final String TABLE_NAME = System.getProperty(
|
||||
"sqoop.test.netezza.table.name", "EMP");
|
||||
"sqoop.test.netezza.table.name", "EMPNZ");
|
||||
|
||||
|
||||
private NetezzaTestUtils() { }
|
||||
|
Loading…
Reference in New Issue
Block a user