5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-03 04:11:44 +08:00

SQOOP-329: SQOOP doesn't work with the DB2 JCC driver

(Arvind Prabhakar via Bilung Lee)

git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1173471 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bilung Lee 2011-09-21 02:58:31 +00:00
parent cad898768b
commit 230c687d88
4 changed files with 372 additions and 0 deletions

View File

@ -129,6 +129,24 @@ jdbc:sqlserver://sqlserverhost:1433
This can be useful if you have the hostname sqlserverhost mapped to the IP This can be useful if you have the hostname sqlserverhost mapped to the IP
address of the SQL Server instance. address of the SQL Server instance.
=== DB2
Install DB2 9.74 Express C and download the appropriate JDBC driver.
Instructions for configuring the server can be found in
DB2ManagerImportManualTest.
Use the system property sqoop.test.db2.connectstring.host_url to specify
the URL for the DB Server host used for testing. Specify this property on
the command line or via build.properties file. For example:
sqoop.test.db2.connectstring.host_url=jdbc:db2://db2host:50000
If not specified, the default value used for this property is:
jdbc:db2://db2host:50000
This can be useful if you have the hostname db2host mapped to the IP
address of the DB2 Server instance.
=== Running the Third-party Tests === Running the Third-party Tests
After the third-party databases are installed and configured, run: After the third-party databases are installed and configured, run:

View File

@ -0,0 +1,111 @@
/**
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.sqoop.manager;
import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.mapreduce.ExportBatchOutputFormat;
import com.cloudera.sqoop.mapreduce.JdbcExportJob;
import com.cloudera.sqoop.util.ExportException;
/**
* Manages connections to DB2 databases. Requires the DB2 JDBC driver.
*/
public class Db2Manager extends GenericJdbcManager {
public static final Log LOG = LogFactory.getLog(
Db2Manager.class.getName());
// driver class to ensure is loaded when making db connection.
private static final String DRIVER_CLASS =
"com.ibm.db2.jcc.DB2Driver";
public Db2Manager(final SqoopOptions opts) {
super(DRIVER_CLASS, opts);
}
/**
* Export data stored in HDFS into a table in a database.
*/
@Override
public void exportTable(ExportJobContext context)
throws IOException, ExportException {
context.setConnManager(this);
JdbcExportJob exportJob = new JdbcExportJob(context, null, null,
ExportBatchOutputFormat.class);
exportJob.runExport();
}
/**
* DB2 does not support the CURRENT_TIMESTAMP() function. Instead
* it uses the sysibm schema for timestamp lookup.
*/
@Override
public String getCurTimestampQuery() {
return "SELECT CURRENT TIMESTAMP FROM SYSIBM.SYSDUMMY1 WITH UR";
}
@Override
public String[] listDatabases() {
Connection conn = null;
ResultSet rset = null;
List<String> databases = new ArrayList<String>();
try {
conn = getConnection();
rset = conn.getMetaData().getSchemas();
while (rset.next()) {
// The ResultSet contains two columns - TABLE_SCHEM(1),
// TABLE_CATALOG(2). We are only interested in TABLE_SCHEM which
// represents schema name.
databases.add(rset.getString(1));
}
conn.commit();
} catch (SQLException sqle) {
try {
if (conn != null) {
conn.rollback();
}
} catch (SQLException ce) {
LOG.error("Failed to rollback transaction", ce);
}
LOG.error("Failed to list databases", sqle);
throw new RuntimeException(sqle);
} finally {
if (rset != null) {
try {
rset.close();
} catch (SQLException re) {
LOG.error("Failed to close resultset", re);
}
}
}
return databases.toArray(new String[databases.size()]);
}
}

View File

@ -121,6 +121,8 @@ public ConnManager accept(JobData data) {
return new OracleManager(options); return new OracleManager(options);
} else if (scheme.startsWith("jdbc:sqlserver:")) { } else if (scheme.startsWith("jdbc:sqlserver:")) {
return new SQLServerManager(options); return new SQLServerManager(options);
} else if (scheme.startsWith("jdbc:db2:")) {
return new Db2Manager(options);
} else { } else {
return null; return null;
} }

View File

@ -0,0 +1,241 @@
/**
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.sqoop.manager;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.testutil.CommonArgs;
import com.cloudera.sqoop.testutil.ImportJobTestCase;
import com.cloudera.sqoop.util.FileListing;
/**
* Test the DB2Manager implementation.
*
* This uses JDBC to import data from an DB2 database into HDFS.
*
* Since this requires an DB2 SErver installation,
* this class is named in such a way that Sqoop's default QA process does
* not run it. You need to run this manually with
* -Dtestcase=DB2ManagerImportManualTest
*
* You need to put DB2 JDBC driver library (db2jcc4.jar) in a location
* where Sqoop will be able to access it (since this library cannot be checked
* into Apache's tree for licensing reasons).
*
* To set up your test environment:
* Install DB2 Express 9.7 C server.
* Create a database SQOOP
* Create a login SQOOP with password PASSWORD and grant all
* access for database SQOOP to user SQOOP.
*/
public class DB2ManagerImportManualTest extends ImportJobTestCase {
public static final Log LOG = LogFactory.getLog(
DB2ManagerImportManualTest.class.getName());
static final String HOST_URL = System.getProperty(
"sqoop.test.db2.connectstring.host_url",
"jdbc:db2://db2host:50000");
static final String DATABASE_NAME = "SQOOP";
static final String DATABASE_USER = "SQOOP";
static final String DATABASE_PASSWORD = "PASSWORD";
static final String TABLE_NAME = "EMPLOYEES_DB2";
static final String CONNECT_STRING = HOST_URL
+ "/" + DATABASE_NAME
+ ":currentSchema=" + DATABASE_USER +";";
// instance variables populated during setUp, used during tests
private Db2Manager manager;
@Override
protected boolean useHsqldbTestServer() {
return false;
}
@Before
public void setUp() {
super.setUp();
SqoopOptions options = new SqoopOptions(CONNECT_STRING,
TABLE_NAME);
options.setUsername(DATABASE_USER);
options.setPassword(DATABASE_PASSWORD);
manager = new Db2Manager(options);
// Drop the existing table, if there is one.
Connection conn = null;
Statement stmt = null;
try {
conn = manager.getConnection();
stmt = conn.createStatement();
stmt.execute("DROP TABLE " + TABLE_NAME);
} catch (SQLException sqlE) {
LOG.info("Table was not dropped: " + sqlE.getMessage());
} finally {
try {
if (null != stmt) {
stmt.close();
}
} catch (Exception ex) {
LOG.warn("Exception while closing stmt", ex);
}
}
// Create and populate table
try {
conn = manager.getConnection();
conn.setAutoCommit(false);
stmt = conn.createStatement();
// create the database table and populate it with data.
stmt.executeUpdate("CREATE TABLE " + TABLE_NAME + " ("
+ "id INT NOT NULL, "
+ "name VARCHAR(24) NOT NULL, "
+ "salary FLOAT, "
+ "dept VARCHAR(32), "
+ "PRIMARY KEY (id))");
stmt.executeUpdate("INSERT INTO " + TABLE_NAME + " VALUES("
+ "1,'Aaron', "
+ "1000000.00,'engineering')");
stmt.executeUpdate("INSERT INTO " + TABLE_NAME + " VALUES("
+ "2,'Bob', "
+ "400.00,'sales')");
stmt.executeUpdate("INSERT INTO " + TABLE_NAME + " VALUES("
+ "3,'Fred', 15.00,"
+ "'marketing')");
conn.commit();
} catch (SQLException sqlE) {
LOG.error("Encountered SQL Exception: ", sqlE);
sqlE.printStackTrace();
fail("SQLException when running test setUp(): " + sqlE);
} finally {
try {
if (null != stmt) {
stmt.close();
}
} catch (Exception ex) {
LOG.warn("Exception while closing connection/stmt", ex);
}
}
}
@After
public void tearDown() {
super.tearDown();
try {
manager.close();
} catch (SQLException sqlE) {
LOG.error("Got SQLException: " + sqlE.toString());
fail("Got SQLException: " + sqlE.toString());
}
}
@Test
public void testDb2Import() throws IOException {
String [] expectedResults = {
"1,Aaron,1000000.0,engineering",
"2,Bob,400.0,sales",
"3,Fred,15.0,marketing",
};
runDb2Test(expectedResults);
}
private String [] getArgv() {
ArrayList<String> args = new ArrayList<String>();
CommonArgs.addHadoopFlags(args);
args.add("--table");
args.add(TABLE_NAME);
args.add("--warehouse-dir");
args.add(getWarehouseDir());
args.add("--connect");
args.add(CONNECT_STRING);
args.add("--username");
args.add(DATABASE_USER);
args.add("--password");
args.add(DATABASE_PASSWORD);
args.add("--num-mappers");
args.add("1");
return args.toArray(new String[0]);
}
private void runDb2Test(String [] expectedResults) throws IOException {
Path warehousePath = new Path(this.getWarehouseDir());
Path tablePath = new Path(warehousePath, TABLE_NAME);
Path filePath = new Path(tablePath, "part-m-00000");
File tableFile = new File(tablePath.toString());
if (tableFile.exists() && tableFile.isDirectory()) {
// remove the directory before running the import.
FileListing.recursiveDeleteDir(tableFile);
}
String [] argv = getArgv();
try {
runImport(argv);
} catch (IOException ioe) {
LOG.error("Got IOException during import: " + ioe.toString());
ioe.printStackTrace();
fail(ioe.toString());
}
File f = new File(filePath.toString());
assertTrue("Could not find imported data file", f.exists());
BufferedReader r = null;
try {
// Read through the file and make sure it's all there.
r = new BufferedReader(new InputStreamReader(new FileInputStream(f)));
for (String expectedLine : expectedResults) {
assertEquals(expectedLine, r.readLine());
}
} catch (IOException ioe) {
LOG.error("Got IOException verifying results: " + ioe.toString());
ioe.printStackTrace();
fail(ioe.toString());
} finally {
IOUtils.closeStream(r);
}
}
}