From b4b9920c4ffb8980b285538186fc90ae689cee2e Mon Sep 17 00:00:00 2001 From: Jarek Jarcec Cecho Date: Thu, 14 Mar 2013 22:01:09 -0700 Subject: [PATCH] SQOOP-929: Add more Netezza direct mode tests (Venkat Ranganathan via Jarek Jarcec Cecho) --- .../DirectNetezzaExportManualTest.java | 258 +++++---------- .../manager/NetezzaExportManualTest.java | 246 ++++++++++++++ .../manager/NetezzaImportManualTest.java | 303 +++++++++++++----- .../sqoop/manager/NetezzaTestUtils.java | 2 +- 4 files changed, 545 insertions(+), 264 deletions(-) create mode 100644 src/test/com/cloudera/sqoop/manager/NetezzaExportManualTest.java diff --git a/src/test/com/cloudera/sqoop/manager/DirectNetezzaExportManualTest.java b/src/test/com/cloudera/sqoop/manager/DirectNetezzaExportManualTest.java index bbcd138b..938ffc53 100644 --- a/src/test/com/cloudera/sqoop/manager/DirectNetezzaExportManualTest.java +++ b/src/test/com/cloudera/sqoop/manager/DirectNetezzaExportManualTest.java @@ -18,56 +18,30 @@ package com.cloudera.sqoop.manager; -import java.io.BufferedWriter; import java.io.IOException; -import java.io.OutputStream; -import java.io.OutputStreamWriter; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.Statement; import java.sql.SQLException; +import java.util.Arrays; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.sqoop.manager.DirectNetezzaManager; import org.junit.After; import org.junit.Before; +import org.junit.Test; + import com.cloudera.sqoop.SqoopOptions; -import com.cloudera.sqoop.TestExport; +import com.cloudera.sqoop.TestExport.ColumnGenerator; /** * Test the DirectNetezzaManager implementation's exportJob() functionality. */ -public class DirectNetezzaExportManualTest extends TestExport { +public class DirectNetezzaExportManualTest extends NetezzaExportManualTest { - public static final Log LOG = LogFactory.getLog( - DirectNetezzaExportManualTest.class.getName()); + public static final Log LOG = LogFactory + .getLog(DirectNetezzaExportManualTest.class.getName()); - static final String TABLE_PREFIX = "EMPNZ"; - - // instance variables populated during setUp, used during tests. - private DirectNetezzaManager manager; - private Connection conn; - - @Override - protected Connection getConnection() { - return conn; - } - - @Override - protected boolean useHsqldbTestServer() { - return false; - } - - @Override - protected String getConnectString() { - return NetezzaTestUtils.getNZConnectString(); - } + static final String TABLE_PREFIX = "EMPNZ_D_EXP"; @Override protected String getTablePrefix() { @@ -75,50 +49,14 @@ protected String getTablePrefix() { } @Override - protected String getDropTableStatement(String tableName) { - return "DROP TABLE " + tableName; - } - - @Before - public void setUp() { - super.setUp(); - conn = getConnection(); - SqoopOptions options = new SqoopOptions( - NetezzaTestUtils.getNZConnectString(), getTableName()); - options.setUsername(NetezzaTestUtils.getNZUser()); - options.setPassword(NetezzaTestUtils.getNZPassword()); - this.manager = new DirectNetezzaManager(options); - - try { - this.conn = manager.getConnection(); - this.conn.setAutoCommit(false); - } catch (SQLException sqlE) { - LOG.error("Encountered SQL Exception: " + sqlE); - sqlE.printStackTrace(); - fail("SQLException when running test setUp(): " + sqlE); - } - } - - @After - public void tearDown() { - super.tearDown(); - if (null != manager) { - try { - manager.close(); - } catch (SQLException sqlE) { - LOG.error("Got SQLException: " + sqlE.toString()); - fail("Got SQLException: " + sqlE.toString()); - } - } - this.conn = null; - this.manager = null; - + protected boolean isDirectMode() { + return true; } @Override - protected String [] getCodeGenArgv(String... extraArgs) { + protected String[] getCodeGenArgv(String... extraArgs) { - String [] moreArgs = new String[extraArgs.length + 4]; + String[] moreArgs = new String[extraArgs.length + 4]; int i = 0; for (i = 0; i < extraArgs.length; i++) { moreArgs[i] = extraArgs[i]; @@ -134,153 +72,117 @@ public void tearDown() { } @Override - protected String [] getArgv(boolean includeHadoopFlags, - int rowsPerStatement, int statementsPerTx, String... additionalArgv) { + protected String[] getArgv(boolean includeHadoopFlags, int rowsPerStatement, + int statementsPerTx, String... additionalArgv) { - String [] subArgv = newStrArray(additionalArgv, "--direct", - "--username", NetezzaTestUtils.getNZUser(), "--password", - NetezzaTestUtils.getNZPassword()); - return super.getArgv(includeHadoopFlags, rowsPerStatement, - statementsPerTx, subArgv); + String[] argV = super.getArgv(includeHadoopFlags, + rowsPerStatement, statementsPerTx); + String[] subArgV = newStrArray(argV, "--direct", + "--username", NetezzaTestUtils.getNZUser(), "--password", + NetezzaTestUtils.getNZPassword()); + String[] newArgV = new String[subArgV.length + additionalArgv.length]; + int i = 0; + for (String s : subArgV) { + newArgV[i++] = s; + } + for (String s: additionalArgv) { + newArgV[i++] = s; + } + return newArgV; } - /** * Create the table definition to export to, removing any prior table. By * specifying ColumnGenerator arguments, you can add extra columns to the * table of arbitrary type. */ @Override - public void createTable(ColumnGenerator... extraColumns) throws SQLException { - PreparedStatement statement = conn.prepareStatement( - getDropTableStatement(getTableName()), ResultSet.TYPE_FORWARD_ONLY, - ResultSet.CONCUR_READ_ONLY); - try { - statement.executeUpdate(); - conn.commit(); - } catch (SQLException sqle) { - conn.rollback(); - } finally { - statement.close(); - } - - StringBuilder sb = new StringBuilder(); - sb.append("CREATE TABLE "); - sb.append(getTableName()); - sb.append(" (id INT NOT NULL PRIMARY KEY, msg VARCHAR(64)"); - int colNum = 0; - for (ColumnGenerator gen : extraColumns) { - sb.append(", " + forIdx(colNum++) + " " + gen.getType()); - } - sb.append(")"); - - statement = conn.prepareStatement(sb.toString(), - ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); - try { - statement.executeUpdate(); - conn.commit(); - } finally { - statement.close(); - } + public void createTable(ColumnGenerator... extraColumns) + throws SQLException { + createTableNZ(getTableName(), extraColumns); } + /** - * Test an authenticated export using netezza external table import. + * Creates the staging table. + * @param extraColumns extra columns that go in the staging table + * @throws SQLException if an error occurs during export */ - public void testAuthExport() throws IOException, SQLException { + @Override + public void createStagingTable(ColumnGenerator... extraColumns) + throws SQLException { + createTableNZ(getStagingTableName(), extraColumns); + } + + private void runNetezzaTest(String tableName, String[] argv, + ColumnGenerator...extraCols) throws IOException { SqoopOptions options = new SqoopOptions( - NetezzaTestUtils.getNZConnectString(), - getTableName()); + NetezzaTestUtils.getNZConnectString(), getTableName()); options.setUsername(NetezzaTestUtils.getNZUser()); options.setPassword(NetezzaTestUtils.getNZPassword()); + LOG.info("Running export with argv : " + Arrays.toString(argv)); manager = new DirectNetezzaManager(options); - Connection connection = null; - Statement st = null; - - String tableName = getTableName(); - try { - connection = manager.getConnection(); - connection.setAutoCommit(false); - st = connection.createStatement(); - - // create a target database table. - try { - st.executeUpdate("DROP TABLE " + tableName); - } catch(SQLException sqle) { - LOG.info("Ignoring exception from DROP TABLE : " + sqle.getMessage()); - connection.rollback(); - } - - LOG.info("Creating table " + tableName); - - st.executeUpdate("CREATE TABLE " + tableName + " (" - + "id INT NOT NULL PRIMARY KEY, " - + "msg VARCHAR(24) NOT NULL)"); - - connection.commit(); + createTable(extraCols); LOG.info("Created table " + tableName); - - // Write a file containing a record to export. - Path tablePath = getTablePath(); - Path filePath = new Path(tablePath, "datafile"); - Configuration conf = new Configuration(); - - FileSystem fs = FileSystem.get(conf); - fs.mkdirs(tablePath); - OutputStream os = fs.create(filePath); - BufferedWriter w = new BufferedWriter(new OutputStreamWriter(os)); - w.write(getRecordLine(0)); - w.write(getRecordLine(1)); - w.write(getRecordLine(2)); - w.close(); - os.close(); - + createExportFile(extraCols); // run the export and verify that the results are good. - runExport(getArgv(true, 10, 10, - "--username", NetezzaTestUtils.getNZUser(), - "--password", NetezzaTestUtils.getNZPassword(), - "--connect", NetezzaTestUtils.getNZConnectString())); - verifyExport(3, connection); + runExport(argv); + verifyExport(3, conn); + if (extraCols.length > 0) { + assertColMinAndMax(forIdx(0), extraCols[0]); + } } catch (SQLException sqlE) { LOG.error("Encountered SQL Exception: " + sqlE); sqlE.printStackTrace(); fail("SQLException when accessing target table. " + sqlE); - } finally { - try { - if (null != st) { - st.close(); - } - } catch (SQLException sqlE) { - LOG.warn("Got SQLException when closing connection: " + sqlE); - } } } + /** + * Test an authenticated export using netezza external table import. + */ + @Test + public void testSimpleExport() throws IOException, SQLException { + String[] argv = getArgv(true, 10, 10); + runNetezzaTest(getTableName(), argv); + } + + @Test + public void testValidExtraArgs() throws Exception { + + String [] extraArgs = { + "--", + "--log-dir", "/tmp", + "--max-errors", "2", + }; + String[] argv = getArgv(true, 10, 10, extraArgs); + runNetezzaTest(getTableName(), argv); + } + + @Override - public void testMultiMapTextExportWithStaging() - throws IOException, SQLException { + public void testMultiMapTextExportWithStaging() throws IOException, + SQLException { // disable this test as staging is not supported in direct mode } @Override - public void testMultiTransactionWithStaging() - throws IOException, SQLException { + public void testMultiTransactionWithStaging() throws IOException, + SQLException { // disable this test as staging is not supported in direct mode } @Override - public void testColumnsExport() - throws IOException, SQLException { + public void testColumnsExport() throws IOException, SQLException { // disable this test as it is not supported in direct mode } @Override - public void testSequenceFileExport() - throws IOException, SQLException { + public void testSequenceFileExport() throws IOException, SQLException { // disable this test as it is not supported in direct mode } } diff --git a/src/test/com/cloudera/sqoop/manager/NetezzaExportManualTest.java b/src/test/com/cloudera/sqoop/manager/NetezzaExportManualTest.java new file mode 100644 index 00000000..50d27fec --- /dev/null +++ b/src/test/com/cloudera/sqoop/manager/NetezzaExportManualTest.java @@ -0,0 +1,246 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cloudera.sqoop.manager; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.sqoop.manager.DirectNetezzaManager; +import org.apache.sqoop.manager.NetezzaManager; +import org.junit.Before; + + +import com.cloudera.sqoop.SqoopOptions; +import com.cloudera.sqoop.TestExport; +import com.cloudera.sqoop.testutil.BaseSqoopTestCase; +import com.cloudera.sqoop.testutil.CommonArgs; + +/** + * Test the Netezza implementation. + * + * This uses JDBC to export data from an Netezza database into HDFS. See + * DirectNetezzaExportManualTest for external table methods. + * + * Since this requires an Netezza Server installation, this class is named in + * such a way that Sqoop's default QA process does not run it. You need to run + * this manually with -Dtestcase=NetezzaExportManualTest. + * + */ +public class NetezzaExportManualTest extends TestExport { + public static final Log LOG = LogFactory.getLog(NetezzaExportManualTest.class + .getName()); + static final String TABLE_PREFIX = "EMPNZ_EXP_"; + // instance variables populated during setUp, used during tests + protected NetezzaManager manager; + protected Connection conn; + + + @Override + protected boolean useHsqldbTestServer() { + return false; + } + + protected boolean isDirectMode() { + return false; + } + + @Override + protected Connection getConnection() { + return conn; + } + + + @Override + protected String getConnectString() { + return NetezzaTestUtils.getNZConnectString(); + } + + @Override + protected String getTablePrefix() { + return TABLE_PREFIX; + } + + @Override + protected String getDropTableStatement(String tableName) { + return "DROP TABLE " + tableName; + } + + protected void createTableNZ(String tableName, ColumnGenerator...extraCols) + throws SQLException { + String sqlStatement = getDropTableStatement(tableName); + conn.rollback(); + LOG.info("Executing drop statement : " + sqlStatement); + PreparedStatement statement = conn.prepareStatement( + sqlStatement, ResultSet.TYPE_FORWARD_ONLY, + ResultSet.CONCUR_READ_ONLY); + try { + statement.executeUpdate(); + conn.commit(); + } catch (SQLException sqle) { + conn.rollback(); + } finally { + statement.close(); + } + + StringBuilder sb = new StringBuilder(); + sb.append("CREATE TABLE "); + sb.append(tableName); + sb.append(" (id INT NOT NULL PRIMARY KEY, msg VARCHAR(64)"); + int colNum = 0; + for (ColumnGenerator gen : extraCols) { + sb.append(", " + forIdx(colNum++) + " " + gen.getType()); + } + sb.append(")"); + sqlStatement = sb.toString(); + LOG.info("Executing create statement : " + sqlStatement); + statement = conn.prepareStatement(sqlStatement, + ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + try { + statement.executeUpdate(); + conn.commit(); + } finally { + statement.close(); + } + } + + /** + * Create the table definition to export to, removing any prior table. By + * specifying ColumnGenerator arguments, you can add extra columns to the + * table of arbitrary type. + */ + @Override + public void createTable(ColumnGenerator... extraColumns) + throws SQLException { + createTableNZ(getTableName(), extraColumns); + } + + /** + * Creates the staging table. + * @param extraColumns extra columns that go in the staging table + * @throws SQLException if an error occurs during export + */ + @Override + public void createStagingTable(ColumnGenerator... extraColumns) + throws SQLException { + createTableNZ(getStagingTableName(), extraColumns); + } + + + + @Before + public void setUp() { + super.setUp(); + SqoopOptions options = new SqoopOptions( + NetezzaTestUtils.getNZConnectString(), getTableName()); + options.setUsername(NetezzaTestUtils.getNZUser()); + options.setPassword(NetezzaTestUtils.getNZPassword()); + if (isDirectMode()) { + this.manager = new DirectNetezzaManager(options); + } else { + this.manager = new NetezzaManager(options); + } + + try { + this.conn = manager.getConnection(); + this.conn.setAutoCommit(false); + } catch (SQLException sqlE) { + LOG.error("Encountered SQL Exception: " + sqlE); + sqlE.printStackTrace(); + fail("SQLException when running test setUp(): " + sqlE); + } + } + + + + @Override + protected String[] getArgv(boolean includeHadoopFlags, int rowsPerStatement, + int statementsPerTx, String... additionalArgv) { + + String[] argV = super.getArgv(includeHadoopFlags, + rowsPerStatement, statementsPerTx); + String[] subArgV = newStrArray(argV, + "--username", NetezzaTestUtils.getNZUser(), "--password", + NetezzaTestUtils.getNZPassword()); + String[] newArgV = new String[subArgV.length + additionalArgv.length]; + int i = 0; + for (String s : subArgV) { + newArgV[i++] = s; + } + for (String s: additionalArgv) { + newArgV[i++] = s; + } + return newArgV; + } + + @Override + protected String[] getCodeGenArgv(String... extraArgs) { + String[] moreArgs; + + moreArgs = new String[extraArgs.length + 4]; + + int i = 0; + for (i = 0; i < extraArgs.length; i++) { + moreArgs[i] = extraArgs[i]; + } + + // Add username argument for netezza. + moreArgs[i++] = "--username"; + moreArgs[i++] = NetezzaTestUtils.getNZUser(); + moreArgs[i++] = "--password"; + moreArgs[i++] = NetezzaTestUtils.getNZPassword(); + + return super.getCodeGenArgv(moreArgs); + } + + protected void createExportFile(ColumnGenerator...extraCols) + throws IOException, SQLException { + String ext = ".txt"; + + Path tablePath = getTablePath(); + Path filePath = new Path(tablePath, "part0" + ext); + + Configuration conf = new Configuration(); + if (!BaseSqoopTestCase.isOnPhysicalCluster()) { + conf.set(CommonArgs.FS_DEFAULT_NAME, CommonArgs.LOCAL_FS); + } + FileSystem fs = FileSystem.get(conf); + fs.mkdirs(tablePath); + OutputStream os = fs.create(filePath); + + + BufferedWriter w = new BufferedWriter(new OutputStreamWriter(os)); + for (int i = 0; i < 3; i++) { + String line = getRecordLine(i, extraCols); + w.write(line); + LOG.debug("Create Export file - Writing line : " + line); + } + w.close(); + os.close(); + } +} diff --git a/src/test/com/cloudera/sqoop/manager/NetezzaImportManualTest.java b/src/test/com/cloudera/sqoop/manager/NetezzaImportManualTest.java index 97399fe1..3482dd8e 100644 --- a/src/test/com/cloudera/sqoop/manager/NetezzaImportManualTest.java +++ b/src/test/com/cloudera/sqoop/manager/NetezzaImportManualTest.java @@ -23,6 +23,8 @@ import java.io.IOException; import java.io.InputStreamReader; import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; @@ -43,26 +45,24 @@ import com.cloudera.sqoop.util.FileListing; /** - * Test the Netezza implementation. + * Test the Netezza implementation. * - * This uses JDBC to import data from an Netezza database into HDFS. + * This uses both JDBC and external tables to import data from an Netezza + * database into HDFS. * - * Since this requires an Netezza SErver installation, this class is named - * in such a way that Sqoop's default QA process does not run it. You need to - * run this manually with -Dtestcase=NetezzaManagerImportManualTest. + * Since this requires an Netezza Server installation, this class is named in + * such a way that Sqoop's default QA process does not run it. You need to run + * this manually with -Dtestcase=NetezzaImportManualTest. * */ public class NetezzaImportManualTest extends ImportJobTestCase { - public static final Log LOG = LogFactory - .getLog(NetezzaImportManualTest.class.getName()); - - + public static final Log LOG = LogFactory. + getLog(NetezzaImportManualTest.class.getName()); // instance variables populated during setUp, used during tests private NetezzaManager manager; - - + private Connection conn; @Override protected boolean useHsqldbTestServer() { return false; @@ -70,70 +70,99 @@ protected boolean useHsqldbTestServer() { @Override protected String getTableName() { - return NetezzaTestUtils.TABLE_NAME; + return NetezzaTestUtils.TABLE_NAME + "_IMP_"; + } + + + private void createTable(String tableName, String... extraColumns) + throws SQLException { + PreparedStatement statement = conn.prepareStatement("DROP TABLE " + + tableName, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + try { + statement.executeUpdate(); + conn.commit(); + } catch (SQLException sqle) { + conn.rollback(); + } finally { + statement.close(); + } + + StringBuilder sb = new StringBuilder(); + sb.append("CREATE TABLE " + tableName + " ("); + sb.append("id INT NOT NULL PRIMARY KEY, "); + sb.append("name VARCHAR(24) NOT NULL, "); + sb.append("start_date DATE, "); + sb.append("Salary FLOAT, "); + sb.append("Fired BOOL, "); + sb.append("dept VARCHAR(32) "); + for (String col : extraColumns) { + sb.append(", " + col + " INTEGER"); + } + sb.append(")"); + + statement = conn.prepareStatement(sb.toString(), + ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + try { + statement.executeUpdate(); + conn.commit(); + } finally { + statement.close(); + } + } + + private void populateTable(String tableName) throws SQLException { + Statement statement = conn.createStatement(); + try { + statement.executeUpdate("INSERT INTO " + tableName + + " VALUES(1,'Aaron','2009-05-14',1000000.00,TRUE,'engineering')"); + statement.executeUpdate("INSERT INTO " + tableName + + " VALUES(2,'Bob','2009-04-20',400.00,TRUE,'sales')"); + statement.executeUpdate("INSERT INTO " + tableName + + " VALUES(3,'Fred','2009-01-23',15.00,FALSE,'marketing')"); + conn.commit(); + } finally { + statement.close(); + } + } + + private void populateTableWithNull(String tableName) throws SQLException{ + Statement statement = conn.createStatement(); + try { + statement.executeUpdate("INSERT INTO " + tableName + + " VALUES(1,'Aaron','2009-05-14',1000000.00,TRUE," + + "'engineering',NULL,1)"); + statement.executeUpdate("INSERT INTO " + tableName + + " VALUES(2,'Bob','2009-04-20',400.00,TRUE,'sales',NULL,2)"); + statement.executeUpdate("INSERT INTO " + tableName + + " VALUES(3,'Fred','2009-01-23',15.00,FALSE,'marketing',NULL,3)"); + conn.commit(); + } finally { + statement.close(); + } + } + + public void setUpData() { + SqoopOptions options = new SqoopOptions( + NetezzaTestUtils.getNZConnectString(), getTableName()); + options.setUsername(NetezzaTestUtils.getNZUser()); + options.setPassword(NetezzaTestUtils.getNZPassword()); + try { + manager = new NetezzaManager(options); + conn = manager.getConnection(); + createTable(getTableName()); + populateTable(getTableName()); + String tableNameWithNull = getTableName() + "_W_N"; + createTable(tableNameWithNull, new String[] { "col0", "col1" }); + populateTableWithNull(tableNameWithNull); + } catch (SQLException sqlE) { + fail("Setup failed with SQLException " + sqlE); + } } @Before public void setUp() { super.setUp(); - - SqoopOptions options = new SqoopOptions( - NetezzaTestUtils.getNZConnectString(), getTableName()); - options.setUsername(NetezzaTestUtils.getNZUser()); - options.setPassword(NetezzaTestUtils.getNZPassword()); - - manager = new NetezzaManager(options); - - // Drop the existing table, if there is one. - Connection conn = null; - Statement stmt = null; - try { - conn = manager.getConnection(); - stmt = conn.createStatement(); - stmt.execute("DROP TABLE " + getTableName()); - } catch (SQLException sqlE) { - LOG.info("Table was not dropped: " + sqlE.getMessage()); - } finally { - try { - if (null != stmt) { - stmt.close(); - } - } catch (Exception ex) { - LOG.warn("Exception while closing stmt", ex); - } - } - - // Create and populate table - try { - conn = manager.getConnection(); - conn.setAutoCommit(false); - stmt = conn.createStatement(); - - // create the database table and populate it with data. - stmt.executeUpdate("CREATE TABLE " + getTableName() + " (" - + "id INT NOT NULL, " + "name VARCHAR(24) NOT NULL, " - + "salary FLOAT, " + "dept VARCHAR(32), " + "PRIMARY KEY (id))"); - - stmt.executeUpdate("INSERT INTO " + getTableName() + " VALUES(" - + "1,'Aaron', " + "1000000.00,'engineering')"); - stmt.executeUpdate("INSERT INTO " + getTableName() + " VALUES(" - + "2,'Bob', " + "400.00,'sales')"); - stmt.executeUpdate("INSERT INTO " + getTableName() + " VALUES(" - + "3,'Fred', 15.00," + "'marketing')"); - conn.commit(); - } catch (SQLException sqlE) { - LOG.error("Encountered SQL Exception: ", sqlE); - sqlE.printStackTrace(); - fail("SQLException when running test setUp(): " + sqlE); - } finally { - try { - if (null != stmt) { - stmt.close(); - } - } catch (Exception ex) { - LOG.warn("Exception while closing connection/stmt", ex); - } - } + setUpData(); } @After @@ -147,24 +176,50 @@ public void tearDown() { } } - @Test - public void testNetezzaImport() throws IOException { - - runNetezzaTest(getExpectedResults()); - } - private String[] getExpectedResults() { - return new String[] { "1,Aaron,1000000.0,engineering", "2,Bob,400.0,sales", - "3,Fred,15.0,marketing", }; + String [] expectedResults = { + "1,Aaron,2009-05-14,1000000.0,true,engineering", + "2,Bob,2009-04-20,400.0,true,sales", + "3,Fred,2009-01-23,15.0,false,marketing", + }; + + return expectedResults; + } + private String[] getDirectModeExpectedResults() { + String [] expectedResults = { + "1,Aaron,2009-05-14,1000000,T,engineering", + "2,Bob,2009-04-20,400,T,sales", + "3,Fred,2009-01-23,15,F,marketing", + }; + return expectedResults; + } + private String[] getExpectedResultsWithNulls() { + String [] expectedResults = { + "1,Aaron,2009-05-14,1000000.0,true,engineering,\\N,1", + "2,Bob,2009-04-20,400.0,true,sales,\\N,2", + "3,Fred,2009-01-23,15.0,false,marketing,\\N,3", + }; + + return expectedResults; } - private String[] getArgv() { + private String[] getDirectModeExpectedResultsWithNulls() { + String [] expectedResults = { + "1,Aaron,2009-05-14,1000000,T,engineering,nvl,1", + "2,Bob,2009-04-20,400,T,sales,nvl,2", + "3,Fred,2009-01-23,15,F,marketing,nvl,3", + }; + + return expectedResults; + } + private String[] getArgv(boolean isDirect, String tableName, + String... extraArgs) { ArrayList args = new ArrayList(); CommonArgs.addHadoopFlags(args); args.add("--table"); - args.add(getTableName()); + args.add(tableName); args.add("--warehouse-dir"); args.add(getWarehouseDir()); args.add("--connect"); @@ -175,14 +230,25 @@ private String[] getArgv() { args.add(NetezzaTestUtils.getNZPassword()); args.add("--num-mappers"); args.add("1"); + + if (isDirect) { + args.add("--direct"); + } + for (String arg : extraArgs) { + args.add(arg); + } return args.toArray(new String[args.size()]); } - private void runNetezzaTest(String[] expectedResults) throws IOException { + private void runNetezzaTest(boolean isDirect, String tableName, + String[] expectedResults, String... extraArgs) throws IOException { Path warehousePath = new Path(this.getWarehouseDir()); - Path tablePath = new Path(warehousePath, getTableName()); - Path filePath = new Path(tablePath, "part-m-00000"); + Path tablePath = new Path(warehousePath, tableName); + + Path filePath; + + filePath = new Path(tablePath, "part-m-00000"); File tableFile = new File(tablePath.toString()); if (tableFile.exists() && tableFile.isDirectory()) { @@ -190,7 +256,7 @@ private void runNetezzaTest(String[] expectedResults) throws IOException { FileListing.recursiveDeleteDir(tableFile); } - String[] argv = getArgv(); + String[] argv = getArgv(isDirect, tableName, extraArgs); try { runImport(argv); } catch (IOException ioe) { @@ -200,7 +266,7 @@ private void runNetezzaTest(String[] expectedResults) throws IOException { } File f = new File(filePath.toString()); - assertTrue("Could not find imported data file", f.exists()); + assertTrue("Could not find imported data file : " + f, f.exists()); BufferedReader r = null; try { // Read through the file and make sure it's all there. @@ -208,6 +274,7 @@ private void runNetezzaTest(String[] expectedResults) throws IOException { String[] s = new String[3]; for (int i = 0; i < s.length; ++i) { s[i] = r.readLine(); + LOG.info("Line read from file = " + s[i]); } Arrays.sort(s); for (int i = 0; i < expectedResults.length; ++i) { @@ -222,4 +289,70 @@ private void runNetezzaTest(String[] expectedResults) throws IOException { } } + @Test + public void testNetezzaImport() throws IOException { + + runNetezzaTest(false, getTableName(), getExpectedResults()); + } + + @Test + public void testDirectImport() throws IOException { + runNetezzaTest(true, getTableName(), getDirectModeExpectedResults()); + } + + @Test + public void testListTables() throws IOException { + SqoopOptions options = new SqoopOptions( + NetezzaTestUtils.getNZConnectString(), getTableName()); + options.setUsername(NetezzaTestUtils.getNZUser()); + options.setPassword(NetezzaTestUtils.getNZPassword()); + + ConnManager mgr = new NetezzaManager(options); + String[] tables = mgr.listTables(); + Arrays.sort(tables); + assertTrue(getTableName() + " is not found!", + Arrays.binarySearch(tables, getTableName()) >= 0); + } + + @Test + public void testIncrementalImport() throws IOException { + String[] expectedResults = {}; + + String[] extraArgs = { "--incremental", "lastmodified", "--check-column", + "START_DATE", }; + + runNetezzaTest(false, getTableName(), expectedResults, extraArgs); + } + + @Test + public void testNullEscapeCharacters() throws Exception { + + + String [] extraArgs = { + "--null-string", "\\\\N", + "--null-non-string", "\\\\N", + }; + + String[] expectedResultsWithNulls = + getExpectedResultsWithNulls(); + String tableNameWithNull = getTableName() + "_W_N"; + + runNetezzaTest(false, tableNameWithNull, expectedResultsWithNulls, + extraArgs); + } + + @Test + public void testValidExtraArgs() throws Exception { + + String [] extraArgs = { + "--", + "--log-dir", "/tmp", + "--max-errors", "2", + }; + String[] expectedResults = getDirectModeExpectedResults(); + String tableName = getTableName(); + + runNetezzaTest(true, tableName, expectedResults, + extraArgs); + } } diff --git a/src/test/com/cloudera/sqoop/manager/NetezzaTestUtils.java b/src/test/com/cloudera/sqoop/manager/NetezzaTestUtils.java index 9f4c07cd..4bf05b8a 100644 --- a/src/test/com/cloudera/sqoop/manager/NetezzaTestUtils.java +++ b/src/test/com/cloudera/sqoop/manager/NetezzaTestUtils.java @@ -45,7 +45,7 @@ public final class NetezzaTestUtils { public static final String NZ_DB_NAME = System.getProperty( "sqoop.test.netezza.db.name", "SQOOP"); public static final String TABLE_NAME = System.getProperty( - "sqoop.test.netezza.table.name", "EMP"); + "sqoop.test.netezza.table.name", "EMPNZ"); private NetezzaTestUtils() { }