5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-04 09:40:35 +08:00

SQOOP-1309: Expand the Sqoop to support CUBRID database.

This commit is contained in:
Jarek Jarcec Cecho 2014-09-01 10:11:49 +02:00
parent 309cf45e7e
commit e0fc46e949
12 changed files with 1352 additions and 0 deletions

View File

@ -135,6 +135,22 @@ jdbc:sqlserver://sqlserverhost:1433
This can be useful if you have the hostname sqlserverhost mapped to the IP This can be useful if you have the hostname sqlserverhost mapped to the IP
address of the SQL Server instance. address of the SQL Server instance.
=== Cubrid
Install Cubrid 9.2.2.0003 and create a database instance and download the
appropriate JDBC driver. Instructions for configuring the database are in
CubridAuthTest, CubridCompatTest, CubridManagerImportTest
and CubridManagerExportTest.
Use the system property sqoop.test.cubrid.connectstring.host_url to specify the
URL for the Cubrid host used for testing. Specify this property on the command
line or via the build.properties file. For example:
sqoop.test.cubrid.connectstring.host_url=jdbc:cubrid:localhost
If not specified, the default value used for this property is:
jdbc:cubrid:localhost
=== DB2 === DB2
Install DB2 9.74 Express C and download the appropriate JDBC driver. Install DB2 9.74 Express C and download the appropriate JDBC driver.

View File

@ -356,6 +356,12 @@
<property name="sqoop.test.mysql.connectstring.host_url" <property name="sqoop.test.mysql.connectstring.host_url"
value="jdbc:mysql://localhost/"/> value="jdbc:mysql://localhost/"/>
<property name="sqoop.test.cubrid.connectstring.host_url"
value="jdbc:cubrid:localhost:30000"/>
<property name="sqoop.test.cubrid.connectstring.database" value="SQOOPCUBRIDTEST" />
<property name="sqoop.test.cubrid.connectstring.username" value="SQOOPUSER" />
<property name="sqoop.test.cubrid.connectstring.password" value="PASSWORD" />
<property name="sqoop.test.postgresql.connectstring.host_url" <property name="sqoop.test.postgresql.connectstring.host_url"
value="jdbc:postgresql://localhost/"/> value="jdbc:postgresql://localhost/"/>
@ -957,6 +963,9 @@
<sysproperty key="sqoop.test.mysql.connectstring.host_url" <sysproperty key="sqoop.test.mysql.connectstring.host_url"
value="${sqoop.test.mysql.connectstring.host_url}"/> value="${sqoop.test.mysql.connectstring.host_url}"/>
<sysproperty key="sqoop.test.cubrid.connectstring.host_url"
value="${sqoop.test.cubrid.connectstring.host_url}"/>
<sysproperty key="sqoop.test.postgresql.connectstring.host_url" <sysproperty key="sqoop.test.postgresql.connectstring.host_url"
value="${sqoop.test.postgresql.connectstring.host_url}"/> value="${sqoop.test.postgresql.connectstring.host_url}"/>

View File

@ -60,6 +60,7 @@ HSQLDB 1.8.0+ No +jdbc:hsqldb:*//+
MySQL 5.0+ Yes +jdbc:mysql://+ MySQL 5.0+ Yes +jdbc:mysql://+
Oracle 10.2.0+ No +jdbc:oracle:*//+ Oracle 10.2.0+ No +jdbc:oracle:*//+
PostgreSQL 8.3+ Yes (import only) +jdbc:postgresql://+ PostgreSQL 8.3+ Yes (import only) +jdbc:postgresql://+
CUBRID 9.2+ NO +jdbc:cubrid:*+
---------------------------------------------------------------- ----------------------------------------------------------------
Sqoop may work with older versions of the databases listed, but we have Sqoop may work with older versions of the databases listed, but we have
@ -190,3 +191,9 @@ http://download-west.oracle.com/docs/cd/B19306_01/server.102/b14225/applocaledat
include::hive-notes.txt[] include::hive-notes.txt[]
CUBRID
~~~~~~
Sqoop supports JDBC-based connector for Cubrid: http://www.cubrid.org/?mid=downloads&item=jdbc_driver
The connector has been tested using JDBC driver version "JDBC-9.2.0.0155-cubrid.jar" with Cubrid 9.2.

View File

@ -0,0 +1,189 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.manager;
import java.io.IOException;
import java.sql.Types;
import java.util.Map;
import org.apache.avro.Schema.Type;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sqoop.mapreduce.cubrid.CubridUpsertOutputFormat;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.mapreduce.ExportBatchOutputFormat;
import com.cloudera.sqoop.mapreduce.JdbcExportJob;
import com.cloudera.sqoop.mapreduce.JdbcUpsertExportJob;
import com.cloudera.sqoop.util.ExportException;
import com.cloudera.sqoop.util.ImportException;
/**
* Manages connections to CUBRID databases.
*/
public class CubridManager extends
com.cloudera.sqoop.manager.CatalogQueryManager {
public static final Log LOG = LogFactory
.getLog(CubridManager.class.getName());
// driver class to ensure is loaded when making db connection.
private static final String DRIVER_CLASS =
"cubrid.jdbc.driver.CUBRIDDriver";
public CubridManager(final SqoopOptions opts) {
super(DRIVER_CLASS, opts);
}
@Override
public void importTable(
com.cloudera.sqoop.manager.ImportJobContext context)
throws IOException, ImportException {
// Then run the normal importTable() method.
super.importTable(context);
}
/**
* Export data stored in HDFS into a table in a database.
*/
public void exportTable(
com.cloudera.sqoop.manager.ExportJobContext context)
throws IOException, ExportException {
context.setConnManager(this);
JdbcExportJob exportJob = new JdbcExportJob(context, null, null,
ExportBatchOutputFormat.class);
exportJob.runExport();
}
/**
* {@inheritDoc}
*/
@Override
public void upsertTable(
com.cloudera.sqoop.manager.ExportJobContext context)
throws IOException, ExportException {
context.setConnManager(this);
JdbcUpsertExportJob exportJob = new JdbcUpsertExportJob(context,
CubridUpsertOutputFormat.class);
exportJob.runExport();
}
@Override
/**
* {@inheritDoc}
*/
public void configureDbOutputColumns(SqoopOptions options) {
// In case that we're running upsert, we do not want to
// change column order as we're actually going to use
// INSERT INTO ... ON DUPLICATE KEY UPDATE
// clause.
if (options.getUpdateMode()
== SqoopOptions.UpdateMode.AllowInsert) {
return;
}
super.configureDbOutputColumns(options);
}
@Override
public String getColNamesQuery(String tableName) {
// Use LIMIT to return fast
return "SELECT t.* FROM " + escapeTableName(tableName)
+ " AS t LIMIT 1";
}
@Override
public String getInputBoundsQuery(String splitByCol,
String sanitizedQuery) {
return "SELECT MIN(" + splitByCol + "), MAX("
+ splitByCol + ") FROM ("
+ sanitizedQuery + ") t1";
}
private Map<String, String> colTypeNames;
private static final int YEAR_TYPE_OVERWRITE = Types.SMALLINT;
private int overrideSqlType(String tableName, String columnName,
int sqlType) {
if (colTypeNames == null) {
colTypeNames = getColumnTypeNames(tableName,
options.getCall(),
options.getSqlQuery());
}
if ("YEAR".equalsIgnoreCase(colTypeNames.get(columnName))) {
sqlType = YEAR_TYPE_OVERWRITE;
}
return sqlType;
}
@Override
public String toJavaType(String tableName, String columnName,
int sqlType) {
sqlType = overrideSqlType(tableName, columnName, sqlType);
String javaType = super.toJavaType(tableName,
columnName, sqlType);
return javaType;
}
@Override
public String toHiveType(String tableName, String columnName,
int sqlType) {
sqlType = overrideSqlType(tableName, columnName, sqlType);
return super.toHiveType(tableName, columnName, sqlType);
}
@Override
public Type toAvroType(String tableName, String columnName,
int sqlType) {
sqlType = overrideSqlType(tableName, columnName, sqlType);
return super.toAvroType(tableName, columnName, sqlType);
}
@Override
protected String getListDatabasesQuery() {
return null;
}
@Override
protected String getListTablesQuery() {
return "SELECT CLASS_NAME FROM DB_CLASS WHERE"
+ " IS_SYSTEM_CLASS = 'NO'";
}
@Override
protected String getListColumnsQuery(String tableName) {
tableName = tableName.toLowerCase();
return "SELECT ATTR_NAME FROM DB_ATTRIBUTE WHERE"
+ " CLASS_NAME = '"
+ tableName + "' ORDER BY def_order";
}
@Override
protected String getPrimaryKeyQuery(String tableName) {
tableName = tableName.toLowerCase();
return "SELECT KEY_ATTR_NAME FROM DB_INDEX_KEY WHERE"
+ " CLASS_NAME = '"
+ tableName + "' ";
}
}

View File

@ -74,6 +74,8 @@ public ConnManager accept(JobData data) {
} else { } else {
return new NetezzaManager(options); return new NetezzaManager(options);
} }
} else if (scheme.startsWith("jdbc:cubrid:")) {
return new CubridManager(options);
} else { } else {
return null; return null;
} }

View File

@ -0,0 +1,110 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce.cubrid;
import java.io.IOException;
import java.sql.SQLException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import com.cloudera.sqoop.lib.SqoopRecord;
import com.cloudera.sqoop.mapreduce.UpdateOutputFormat;
/**
* Output format for CUBRID Update/insert functionality. We will use CUBID
* clause INSERT INTO ... ON DUPLICATE KEY UPDATE, for more info please see
* official CUBRID documentation.
*/
public class CubridUpsertOutputFormat<K extends SqoopRecord, V> extends
UpdateOutputFormat<K, V> {
private final Log log = LogFactory.getLog(getClass());
@Override
/** {@inheritDoc} */
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
throws IOException {
try {
return new CubridUpsertRecordWriter(context);
} catch (Exception e) {
throw new IOException(e);
}
}
/**
* RecordWriter to write the output to UPDATE/INSERT statements.
*/
public class CubridUpsertRecordWriter extends UpdateRecordWriter {
public CubridUpsertRecordWriter(TaskAttemptContext context)
throws ClassNotFoundException, SQLException {
super(context);
}
/**
* {@inheritDoc}
*/
@Override
protected String getUpdateStatement() {
boolean first;
StringBuilder sb = new StringBuilder();
sb.append("INSERT INTO ");
sb.append(tableName);
sb.append("(");
first = true;
for (String column : columnNames) {
if (first) {
first = false;
} else {
sb.append(", ");
}
sb.append(column);
}
sb.append(") VALUES(");
first = true;
for (int i = 0; i < columnNames.length; i++) {
if (first) {
first = false;
} else {
sb.append(", ");
}
sb.append("?");
}
sb.append(") ON DUPLICATE KEY UPDATE ");
first = true;
for (String column : columnNames) {
if (first) {
first = false;
} else {
sb.append(", ");
}
sb.append(column).append("=").append(column);
}
String query = sb.toString();
log.debug("Using upsert query: " + query);
return query;
}
}
}

View File

@ -34,6 +34,8 @@
import junit.framework.TestCase; import junit.framework.TestCase;
import junit.framework.TestSuite; import junit.framework.TestSuite;
import com.cloudera.sqoop.manager.CubridManagerExportTest;
import com.cloudera.sqoop.manager.CubridManagerImportTest;
import com.cloudera.sqoop.manager.DirectMySQLTest; import com.cloudera.sqoop.manager.DirectMySQLTest;
import com.cloudera.sqoop.manager.DirectMySQLExportTest; import com.cloudera.sqoop.manager.DirectMySQLExportTest;
import com.cloudera.sqoop.manager.JdbcMySQLExportTest; import com.cloudera.sqoop.manager.JdbcMySQLExportTest;
@ -45,6 +47,8 @@
import com.cloudera.sqoop.manager.PostgresqlExportTest; import com.cloudera.sqoop.manager.PostgresqlExportTest;
import com.cloudera.sqoop.manager.PostgresqlImportTest; import com.cloudera.sqoop.manager.PostgresqlImportTest;
import org.apache.sqoop.manager.cubrid.CubridAuthTest;
import org.apache.sqoop.manager.cubrid.CubridCompatTest;
import org.apache.sqoop.manager.mysql.MySqlCallExportTest; import org.apache.sqoop.manager.mysql.MySqlCallExportTest;
import org.apache.sqoop.manager.netezza.DirectNetezzaExportManualTest; import org.apache.sqoop.manager.netezza.DirectNetezzaExportManualTest;
import org.apache.sqoop.manager.netezza.DirectNetezzaHCatExportManualTest; import org.apache.sqoop.manager.netezza.DirectNetezzaHCatExportManualTest;
@ -111,6 +115,12 @@ public static Test suite() {
suite.addTestSuite(PostgresqlImportTest.class); suite.addTestSuite(PostgresqlImportTest.class);
suite.addTestSuite(PostgresqlExportTest.class); suite.addTestSuite(PostgresqlExportTest.class);
// Cubrid
suite.addTestSuite(CubridManagerImportTest.class);
suite.addTestSuite(CubridManagerExportTest.class);
suite.addTestSuite(CubridAuthTest.class);
suite.addTestSuite(CubridCompatTest.class);
// DB2 // DB2
suite.addTestSuite(DB2ManagerImportManualTest.class); suite.addTestSuite(DB2ManagerImportManualTest.class);

View File

@ -0,0 +1,299 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.sqoop.manager;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Writer;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.StringUtils;
import org.apache.sqoop.manager.CubridManager;
import org.apache.sqoop.manager.cubrid.CubridTestUtils;
import org.junit.After;
import org.junit.Before;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.TestExport;
/**
* Test the CubridManager implementation.
*
* This uses JDBC to export data from HDFS to an Cubrid database.
*
* Since this requires an Cubrid installation, this class is named in such a way
* that Sqoop's default QA process does not run it. You need to run this
* manually with -Dtestcase=CubridManagerExportTest.
*
* You need to put Cubrid JDBC driver library (JDBC-9.2.2.0003-cubrid.jar) in a
* location where Sqoop will be able to access it (since this library cannot be
* checked into Apache's tree for licensing reasons).
*
* To set up your test environment:
* Install Cubrid 9.2.2
* ref:http://www.cubrid.org/wiki_tutorials/entry/installing-cubrid-on-linux-using-shell-and-rpm
* Create a database SQOOPCUBRIDTEST
* $cubrid createdb SQOOPCUBRIDTEST en_us.utf8
* Start cubrid and database
* $cubrid service start
* $cubrid server start SQOOPCUBRIDTEST
* Create a login SQOOPUSER with password PASSWORD and grant all
* $csql -u dba SQOOPCUBRIDTEST
* csql>CREATE USER SQOOPUSER password 'PASSWORD';
*/
public class CubridManagerExportTest extends TestExport {
public static final Log LOG = LogFactory.getLog(
CubridManagerExportTest.class.getName());
static final String TABLE_PREFIX = "EXPORT_CUBRID_";
// instance variables populated during setUp, used during tests.
private CubridManager manager;
private Connection conn;
@Override
protected Connection getConnection() {
return conn;
}
@Override
protected boolean useHsqldbTestServer() {
return false;
}
@Override
protected String getConnectString() {
return CubridTestUtils.getConnectString();
}
@Override
protected String getTablePrefix() {
return TABLE_PREFIX;
}
@Override
protected String getDropTableStatement(String tableName) {
return "DROP TABLE IF EXISTS " + tableName;
}
/**
* Cubrid could not support --staging-table, Diable this test case.
*/
@Override
public void testMultiTransactionWithStaging() throws IOException,
SQLException {
return;
}
/**
* Cubrid could not support --staging-table, Diable this test case.
*/
@Override
public void testMultiMapTextExportWithStaging() throws IOException,
SQLException {
return;
}
public void createTableAndPopulateData(String table) {
String fulltableName = manager.escapeTableName(table);
Statement stmt = null;
// Drop the existing table, if there is one.
try {
conn = manager.getConnection();
stmt = conn.createStatement();
stmt.execute("DROP TABLE IF EXISTS " + fulltableName);
conn.commit();
} catch (SQLException sqlE) {
LOG.info("Table was not dropped: " + sqlE.getMessage());
} finally {
try {
if (null != stmt) {
stmt.close();
}
} catch (Exception ex) {
LOG.warn("Exception while closing stmt", ex);
}
}
// Create and populate table
try {
conn = manager.getConnection();
conn.setAutoCommit(false);
stmt = conn.createStatement();
// create the database table and populate it with data.
stmt.executeUpdate("CREATE TABLE "
+ fulltableName + " ("
+ "id INT NOT NULL, "
+ "name VARCHAR(24) NOT NULL, "
+ "salary FLOAT, " + "dept VARCHAR(32), "
+ "PRIMARY KEY (id))");
conn.commit();
} catch (SQLException sqlE) {
LOG.error("Encountered SQL Exception: ", sqlE);
sqlE.printStackTrace();
fail("SQLException when running test setUp(): " + sqlE);
} finally {
try {
if (null != stmt) {
stmt.close();
}
} catch (Exception ex) {
LOG.warn(
"Exception while closing connection/stmt", ex);
}
}
}
@Before
public void setUp() {
super.setUp();
SqoopOptions options = new SqoopOptions(
CubridTestUtils.getConnectString(),
getTableName());
options.setUsername(CubridTestUtils.getCurrentUser());
options.setPassword(CubridTestUtils.getPassword());
this.manager = new CubridManager(options);
try {
this.conn = manager.getConnection();
this.conn.setAutoCommit(false);
} catch (SQLException sqlE) {
LOG.error(StringUtils.stringifyException(sqlE));
fail("Failed with sql exception in setup: " + sqlE);
}
}
@After
public void tearDown() {
super.tearDown();
try {
conn.close();
manager.close();
} catch (SQLException sqlE) {
LOG.error("Got SQLException: " + sqlE.toString());
fail("Got SQLException: " + sqlE.toString());
}
}
@Override
protected String[] getCodeGenArgv(String... extraArgs) {
String[] moreArgs = new String[extraArgs.length + 4];
int i = 0;
for (i = 0; i < extraArgs.length; i++) {
moreArgs[i] = extraArgs[i];
}
// Add username and password args.
moreArgs[i++] = "--username";
moreArgs[i++] = CubridTestUtils.getCurrentUser();
moreArgs[i++] = "--password";
moreArgs[i++] = CubridTestUtils.getPassword();
return super.getCodeGenArgv(moreArgs);
}
@Override
protected String[] getArgv(boolean includeHadoopFlags,
int rowsPerStatement,
int statementsPerTx, String... additionalArgv) {
String[] subArgv = newStrArray(additionalArgv, "--username",
CubridTestUtils.getCurrentUser(), "--password",
CubridTestUtils.getPassword());
return super.getArgv(includeHadoopFlags, rowsPerStatement,
statementsPerTx, subArgv);
}
protected void createTestFile(String filename,
String[] lines)
throws IOException {
File testdir = new File(getWarehouseDir());
if (!testdir.exists()) {
testdir.mkdirs();
}
File file = new File(getWarehouseDir() + "/" + filename);
Writer output = new BufferedWriter(new FileWriter(file));
for (String line : lines) {
output.write(line);
output.write("\n");
}
output.close();
}
public static void assertRowCount(long expected,
String tableName,
Connection connection) {
Statement stmt = null;
ResultSet rs = null;
try {
stmt = connection.createStatement();
rs = stmt.executeQuery("SELECT count(*) FROM "
+ tableName);
rs.next();
assertEquals(expected, rs.getLong(1));
} catch (SQLException e) {
LOG.error("Can't verify number of rows", e);
fail();
} finally {
try {
connection.commit();
if (stmt != null) {
stmt.close();
}
if (rs != null) {
rs.close();
}
} catch (SQLException ex) {
LOG.info("Ignored exception in finally block.");
}
}
}
public String escapeTableOrSchemaName(String tableName) {
return "\"" + tableName + "\"";
}
/** Make sure mixed update/insert export work correctly. */
public void testUpsertTextExport() throws IOException, SQLException {
final int TOTAL_RECORDS = 10;
createTextFile(0, TOTAL_RECORDS, false);
createTable();
// first time will be insert.
runExport(getArgv(true, 10, 10,
newStrArray(null, "--update-key", "id",
"--update-mode", "allowinsert")));
// second time will be update.
runExport(getArgv(true, 10, 10,
newStrArray(null, "--update-key", "id",
"--update-mode", "allowinsert")));
verifyExport(TOTAL_RECORDS);
}
}

View File

@ -0,0 +1,292 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.sqoop.manager;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.sqoop.manager.CubridManager;
import org.apache.sqoop.manager.cubrid.CubridTestUtils;
import org.apache.sqoop.util.FileListing;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.testutil.CommonArgs;
import com.cloudera.sqoop.testutil.ImportJobTestCase;
/**
* Test the CubridManager implementation.
*
* This uses JDBC to import data from an Cubrid database into HDFS.
*
* Since this requires an Cubrid installation, this class is named in such a way
* that Sqoop's default QA process does not run it. You need to run this
* manually with -Dtestcase=CubridManagerImportTest.
*
* You need to put Cubrid JDBC driver library (JDBC-9.2.2.0003-cubrid.jar) in a
* location where Sqoop will be able to access it (since this library cannot be
* checked into Apache's tree for licensing reasons).
*
* To set up your test environment:
* Install Cubrid 9.2.2
* ref:http://www.cubrid.org/wiki_tutorials/entry/installing-cubrid-on-linux-using-shell-and-rpm
* Create a database SQOOPCUBRIDTEST
* $cubrid createdb SQOOPCUBRIDTEST en_us.utf8
* Start cubrid and database
* $cubrid service start
* $cubrid server start SQOOPCUBRIDTEST
* Create a login SQOOPUSER with password PASSWORD and grant all
* $csql -u dba SQOOPCUBRIDTEST
* csql>CREATE USER SQOOPUSER password 'PASSWORD';
*/
public class CubridManagerImportTest extends ImportJobTestCase {
public static final Log LOG = LogFactory.getLog(
CubridManagerImportTest.class.getName());
static final String TABLE_NAME = "employees_cubrid";
static final String NULL_TABLE_NAME = "null_employees_cubrid";
// instance variables populated during setUp, used during tests
private CubridManager manager;
private Configuration conf = new Configuration();
@Override
protected Configuration getConf() {
return conf;
}
@Override
protected boolean useHsqldbTestServer() {
return false;
}
@Before
public void setUp() {
super.setUp();
LOG.debug("Setting up another CubridImport test: "
+ CubridTestUtils.getConnectString());
setUpData(TABLE_NAME, false);
setUpData(NULL_TABLE_NAME, true);
LOG.debug("setUp complete.");
}
public void setUpData(String tableName, boolean nullEntry) {
SqoopOptions options = new SqoopOptions(
CubridTestUtils.getConnectString(), tableName);
options.setUsername(CubridTestUtils.getCurrentUser());
options.setPassword(CubridTestUtils.getPassword());
LOG.debug("Setting up another CubridImport test: "
+ CubridTestUtils.getConnectString());
manager = new CubridManager(options);
Connection connection = null;
Statement st = null;
try {
connection = manager.getConnection();
connection.setAutoCommit(false);
st = connection.createStatement();
// create the database table and populate it with data.
st.executeUpdate("DROP TABLE IF EXISTS " + tableName);
st.executeUpdate("CREATE TABLE " + tableName + " ("
+ manager.escapeColName("id")
+ " INT NOT NULL PRIMARY KEY, "
+ manager.escapeColName("name")
+ " VARCHAR(24) NOT NULL, "
+ manager.escapeColName("start_date") + " DATE, "
+ manager.escapeColName("Salary") + " FLOAT, "
+ manager.escapeColName("dept")
+ " VARCHAR(32));");
st.executeUpdate("INSERT INTO " + tableName
+ " VALUES(1,'Aaron','2009-05-14',"
+ "1000000.00,'engineering');");
st.executeUpdate("INSERT INTO " + tableName
+ " VALUES(2,'Bob','2009-04-20',400.00,'sales');");
st.executeUpdate("INSERT INTO " + tableName
+ " VALUES(3,'Fred','2009-01-23',"
+ "15.00,'marketing');");
if (nullEntry) {
st.executeUpdate("INSERT INTO " + tableName
+ " VALUES(4,'Mike',NULL,NULL,NULL);");
}
connection.commit();
} catch (SQLException sqlE) {
LOG.error("Encountered SQL Exception: " + sqlE);
sqlE.printStackTrace();
fail("SQLException when running test setUp(): " + sqlE);
} finally {
try {
if (null != st) {
st.close();
}
if (null != connection) {
connection.close();
}
} catch (SQLException sqlE) {
LOG.warn("Got SQLException when closing connection: "
+ sqlE);
}
}
}
@After
public void tearDown() {
super.tearDown();
try {
manager.close();
} catch (SQLException sqlE) {
LOG.error("Got SQLException: " + sqlE.toString());
fail("Got SQLException: " + sqlE.toString());
}
}
@Test
public void testImportSimple() throws IOException {
String[] expectedResults = {
"1,Aaron,2009-05-14,1000000.0,engineering",
"2,Bob,2009-04-20,400.0,sales",
"3,Fred,2009-01-23,15.0,marketing", };
doImportAndVerify(TABLE_NAME, expectedResults);
}
@Test
public void testListTables() throws IOException {
SqoopOptions options = new SqoopOptions(new Configuration());
options.setConnectString(CubridTestUtils.getConnectString());
options.setUsername(CubridTestUtils.getCurrentUser());
options.setPassword(CubridTestUtils.getPassword());
ConnManager mgr = new CubridManager(options);
String[] tables = mgr.listTables();
Arrays.sort(tables);
assertTrue(TABLE_NAME + " is not found!",
Arrays.binarySearch(tables, TABLE_NAME) >= 0);
}
@Test
public void testNullEscapeCharacters() throws Exception {
String[] expectedResults = {
"1,Aaron,2009-05-14,1000000.0,engineering",
"2,Bob,2009-04-20,400.0,sales",
"3,Fred,2009-01-23,15.0,marketing",
"4,Mike,cubrid,cubrid,cubrid", };
String[] extraArgs = {
"--null-string",
"cubrid",
"--null-non-string",
"cubrid", };
doImportAndVerify(NULL_TABLE_NAME, expectedResults, extraArgs);
}
private void doImportAndVerify(String tableName,
String[] expectedResults,
String... extraArgs) throws IOException {
Path warehousePath = new Path(this.getWarehouseDir());
Path tablePath = new Path(warehousePath, tableName);
Path filePath = new Path(tablePath, "part-m-00000");
File tableFile = new File(tablePath.toString());
if (tableFile.exists() && tableFile.isDirectory()) {
// remove the directory before running the import.
FileListing.recursiveDeleteDir(tableFile);
}
String[] argv = getArgv(tableName, extraArgs);
try {
runImport(argv);
} catch (IOException ioe) {
LOG.error("Got IOException during import: "
+ ioe.toString());
ioe.printStackTrace();
fail(ioe.toString());
}
File f = new File(filePath.toString());
assertTrue("Could not find imported data file", f.exists());
BufferedReader r = null;
try {
// Read through the file and make sure it's all there.
r = new BufferedReader(new InputStreamReader(
new FileInputStream(f)));
for (String expectedLine : expectedResults) {
assertEquals(expectedLine, r.readLine());
}
} catch (IOException ioe) {
LOG.error("Got IOException verifying results: "
+ ioe.toString());
ioe.printStackTrace();
fail(ioe.toString());
} finally {
IOUtils.closeStream(r);
}
}
private String[] getArgv(String tableName, String... extraArgs) {
ArrayList<String> args = new ArrayList<String>();
CommonArgs.addHadoopFlags(args);
args.add("--table");
args.add(tableName);
args.add("--warehouse-dir");
args.add(getWarehouseDir());
args.add("--connect");
args.add(CubridTestUtils.getConnectString());
args.add("--username");
args.add(CubridTestUtils.getCurrentUser());
args.add("--password");
args.add(CubridTestUtils.getPassword());
args.add("--num-mappers");
args.add("1");
if (extraArgs.length > 0) {
for (String arg : extraArgs) {
args.add(arg);
}
}
return args.toArray(new String[0]);
}
}

View File

@ -0,0 +1,163 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.manager.cubrid;
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.sqoop.manager.CubridManager;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.manager.ConnManager;
import com.cloudera.sqoop.testutil.ImportJobTestCase;
/**
* Test authentication.
*
* Since this requires a CUBRID installation on your local machine to use, this
* class is named in such a way that Hadoop's default QA process does not run
* it. You need to run this manually with -Dtestcase=CubridAuthTest
*
* You need to put CUBRID's Connector/J JDBC driver library into a location
* where Hadoop will be able to access it (since this library cannot be checked
* into Apache's tree for licensing reasons).
*
* To set up your test environment:
* Install Cubrid 9.2.2
* ref:http://www.cubrid.org/wiki_tutorials/entry/installing-cubrid-on-linux-using-shell-and-rpm
* Create a database SQOOPCUBRIDTEST
* $cubrid createdb SQOOPCUBRIDTEST en_us.utf8
* Start cubrid and database
* $cubrid service start
* $cubrid server start SQOOPCUBRIDTEST
* Create a login SQOOPUSER with password PASSWORD and grant all
* $csql -u dba SQOOPCUBRIDTEST
* csql>CREATE USER SQOOPUSER password 'PASSWORD';
*/
public class CubridAuthTest extends ImportJobTestCase {
public static final Log LOG = LogFactory.getLog(CubridAuthTest.class
.getName());
static final String TABLE_NAME = "employees_cubrid";
private CubridManager manager;
private Configuration conf = new Configuration();
@Override
protected boolean useHsqldbTestServer() {
return false;
}
@Before
public void setUp() {
super.setUp();
LOG.debug("Setting up another CubridImport test: "
+ CubridTestUtils.getConnectString());
setUpData(TABLE_NAME, true);
LOG.debug("setUp complete.");
}
public void setUpData(String tableName, boolean nullEntry) {
SqoopOptions options = new SqoopOptions(
CubridTestUtils.getConnectString(), TABLE_NAME);
options.setUsername(CubridTestUtils.getCurrentUser());
options.setPassword(CubridTestUtils.getPassword());
LOG.debug("Setting up another CubridImport test: "
+ CubridTestUtils.getConnectString());
manager = new CubridManager(options);
Connection connection = null;
Statement st = null;
try {
connection = manager.getConnection();
connection.setAutoCommit(false);
st = connection.createStatement();
// create the database table and populate it with data.
st.executeUpdate("DROP TABLE IF EXISTS " + TABLE_NAME);
String sqlStmt = "CREATE TABLE "
+ TABLE_NAME + " ("
+ manager.escapeColName("id")
+ " INT NOT NULL PRIMARY KEY, "
+ manager.escapeColName("name")
+ " VARCHAR(24) NOT NULL);";
st.executeUpdate(sqlStmt);
st.executeUpdate("INSERT INTO " + TABLE_NAME
+ " VALUES(1,'Aaron');");
connection.commit();
} catch (SQLException sqlE) {
LOG.error("Encountered SQL Exception: " + sqlE);
sqlE.printStackTrace();
fail("SQLException when running test setUp(): " + sqlE);
} finally {
try {
if (null != st) {
st.close();
}
if (null != connection) {
connection.close();
}
} catch (SQLException sqlE) {
LOG.warn("Got SQLException when closing connection: "
+ sqlE);
}
}
}
@After
public void tearDown() {
super.tearDown();
try {
manager.close();
} catch (SQLException sqlE) {
LOG.error("Got SQLException: " + sqlE.toString());
fail("Got SQLException: " + sqlE.toString());
}
}
/**
* Connect to a db and ensure that password-based
* authentication succeeds.
*/
@Test
public void testAuthAccess() throws IOException {
SqoopOptions options = new SqoopOptions(conf);
options.setConnectString(CubridTestUtils.getConnectString());
options.setUsername(CubridTestUtils.getCurrentUser());
options.setPassword(CubridTestUtils.getPassword());
ConnManager mgr = new CubridManager(options);
String[] tables = mgr.listTables();
Arrays.sort(tables);
assertTrue(TABLE_NAME + " is not found!",
Arrays.binarySearch(tables, TABLE_NAME) >= 0);
}
}

View File

@ -0,0 +1,170 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.manager.cubrid;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.testutil.ManagerCompatTestCase;
/**
* Test the basic Cubrid connection manager with the various column types.
*
* Since this requires a CUBRID installation on your local machine to use, this
* class is named in such a way that Hadoop's default QA process does not run
* it. You need to run this manually with -Dtestcase=CubridCompatTest
*
* You need to put CUBRID's Connector/J JDBC driver library into a location
* where Hadoop will be able to access it (since this library cannot be checked
* into Apache's tree for licensing reasons).
*
* To set up your test environment:
* Install Cubrid 9.2.2
* ref:http://www.cubrid.org/wiki_tutorials/entry/installing-cubrid-on-linux-using-shell-and-rpm
* Create a database SQOOPCUBRIDTEST
* $cubrid createdb SQOOPCUBRIDTEST en_us.utf8
* Start cubrid and database
* $cubrid service start
* $cubrid server start SQOOPCUBRIDTEST
* Create a login SQOOPUSER with password PASSWORD and grant all
* $csql -u dba SQOOPCUBRIDTEST
* csql>CREATE USER SQOOPUSER password 'PASSWORD';
*/
public class CubridCompatTest extends ManagerCompatTestCase {
public static final Log LOG = LogFactory.getLog(CubridCompatTest.class
.getName());
@Override
protected Log getLogger() {
return LOG;
}
@Override
protected String getDbFriendlyName() {
return "CUBRID";
}
@Override
protected String getConnectString() {
return CubridTestUtils.getConnectString();
}
@Override
protected SqoopOptions getSqoopOptions(Configuration conf) {
SqoopOptions opts = new SqoopOptions(conf);
opts.setUsername(CubridTestUtils.getCurrentUser());
opts.setPassword(CubridTestUtils.getPassword());
return opts;
}
@Override
protected void dropTableIfExists(String table) throws SQLException {
Connection conn = getManager().getConnection();
PreparedStatement statement = conn.prepareStatement(
"DROP TABLE IF EXISTS "
+ table, ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY);
try {
statement.executeUpdate();
conn.commit();
} finally {
statement.close();
}
}
@Override
protected boolean supportsBoolean() {
return false;
}
@Override
protected boolean supportsLongVarChar() {
return false;
}
@Override
protected String getFixedCharSeqOut(int fieldWidth, String asInserted) {
return padString(fieldWidth, asInserted);
}
@Override
protected String getTimestampSeqOutput(String tsAsInserted) {
// We trim timestamps to exactly one tenth of a second.
if ("null".equals(tsAsInserted)) {
return tsAsInserted;
}
int dotPos = tsAsInserted.indexOf(".");
if (-1 == dotPos) {
return tsAsInserted + ".0";
} else {
return tsAsInserted.substring(0, dotPos + 2);
}
}
@Override
protected String getVarBinaryType() {
return "BIT VARYING(48)";
}
@Override
protected String getVarBinarySeqOutput(String asInserted) {
return toLowerHexString(asInserted);
}
@Override
protected String getNumericSeqOutput(String numAsInserted) {
int totalDecPartSize = getNumericDecPartDigits();
int numPad; // number of digits to pad by.
int dotPos = numAsInserted.indexOf(".");
if (-1 == dotPos) {
numAsInserted = numAsInserted + ".";
numPad = totalDecPartSize;
} else {
int existingDecimalSize = numAsInserted.length() - dotPos;
numPad = totalDecPartSize - existingDecimalSize;
}
if (numPad < 0) {
// We actually have to trim the value.
return numAsInserted.substring(0, numAsInserted.length() + numPad + 1);
} else {
String zeros = "";
for (int i = 0; i < numPad; i++) {
zeros = zeros + "0";
}
return numAsInserted + zeros;
}
}
@Override
protected String getDecimalSeqOutput(String numAsInserted) {
return getNumericSeqOutput(numAsInserted);
}
}

View File

@ -0,0 +1,85 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.manager.cubrid;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Utilities for Cubrid-based tests.
*
* Since this requires a CUBRID installation on your local machine to use, this
* class is named in such a way that Hadoop's default QA process does not run
* it. You need to run this manually with -Dtestcase=CubridTestUtils
*
* You need to put CUBRID's Connector/J JDBC driver library into a location
* where Hadoop will be able to access it (since this library cannot be checked
* into Apache's tree for licensing reasons).
*
* To set up your test environment:
* Install Cubrid 9.2.2
* ref:http://www.cubrid.org/wiki_tutorials/entry/installing-cubrid-on-linux-using-shell-and-rpm
* Create a database SQOOPCUBRIDTEST
* $cubrid createdb SQOOPCUBRIDTEST en_us.utf8
* Start cubrid and database
* $cubrid service start
* $cubrid server start SQOOPCUBRIDTEST
* Create a login SQOOPUSER with password PASSWORD and grant all
* $csql -u dba SQOOPCUBRIDTEST
* csql>CREATE USER SQOOPUSER password 'PASSWORD';
*/
public class CubridTestUtils {
public static final Log LOG = LogFactory.getLog(CubridTestUtils.class
.getName());
public static final String HOST_URL = System
.getProperty("sqoop.test.cubrid.connectstring.host_url",
"jdbc:cubrid:localhost:30000");
static final String TEST_DATABASE = System
.getProperty("sqoop.test.cubrid.connectstring.database",
"SQOOPCUBRIDTEST");
static final String TEST_USER = System
.getProperty("sqoop.test.cubrid.connectstring.username",
"SQOOPUSER");
static final String TEST_PASS = System
.getProperty("sqoop.test.cubrid.connectstring.password",
"PASSWORD");
static final String TABLE_NAME = "EMPLOYEES_CUBRID";
static final String NULL_TABLE_NAME = "NULL_EMPLOYEES_CUBRID";
static final String CONNECT_STRING = HOST_URL + ":"
+ TEST_DATABASE + ":::";
public static String getCurrentUser() {
return TEST_USER;
}
public static String getPassword() {
return TEST_PASS;
}
public static String getConnectString() {
return CONNECT_STRING;
}
public static String getTableName() {
return TABLE_NAME;
}
}