5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-04 05:39:35 +08:00

MAPREDUCE-1327. Fix Sqoop handling of Oracle timezone with timestamp data

types in import. Contributed by Leonid Furman

From: Christopher Douglas <cdouglas@apache.org>

git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1149852 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Bayer 2011-07-22 20:03:31 +00:00
parent 6174268d28
commit e7a8e519f3
7 changed files with 294 additions and 19 deletions

View File

@ -24,7 +24,6 @@
import org.apache.hadoop.sqoop.SqoopOptions;
import org.apache.hadoop.sqoop.manager.ConnManager;
import org.apache.hadoop.sqoop.hive.HiveTypes;
import java.io.File;
import java.io.IOException;
@ -95,7 +94,7 @@ public String getCreateTableStmt() throws IOException {
first = false;
Integer colType = columnTypes.get(col);
String hiveColType = HiveTypes.toHiveType(colType);
String hiveColType = connManager.toHiveType(colType);
if (null == hiveColType) {
throw new IOException("Hive does not support the SQL type for column " + col);
}

View File

@ -56,6 +56,20 @@ public abstract class ConnManager {
*/
public abstract String getPrimaryKey(String tableName);
/**
* Return java type for SQL type
* @param sqlType sql type
* @return java type
*/
public abstract String toJavaType(int sqlType);
/**
* Return hive type for SQL type
* @param sqlType sql type
* @return hive type
*/
public abstract String toHiveType(int sqlType);
/**
* Return an unordered mapping from colname to sqltype for
* all columns in a table.

View File

@ -23,7 +23,10 @@
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import java.util.ArrayList;
import java.util.TimeZone;
import java.lang.reflect.Method;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -83,9 +86,51 @@ protected Connection makeConnection() throws SQLException {
// We only use this for metadata queries. Loosest semantics are okay.
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
// Setting session time zone
setSessionTimeZone(connection);
return connection;
}
/**
* Set session time zone
* @param conn Connection object
* @throws SQLException instance
*/
private void setSessionTimeZone(Connection conn) throws SQLException {
// need to use reflection to call the method setSessionTimeZone on the OracleConnection class
// because oracle specific java libraries are not accessible in this context
Method method;
try {
method = conn.getClass().getMethod(
"setSessionTimeZone", new Class [] {String.class});
} catch (Exception ex) {
LOG.error("Could not find method setSessionTimeZone in " + conn.getClass().getName(), ex);
// rethrow SQLException
throw new SQLException(ex);
}
// Need to set the time zone in order for Java
// to correctly access the column "TIMESTAMP WITH LOCAL TIME ZONE"
String clientTimeZone = TimeZone.getDefault().getID();
try {
method.setAccessible(true);
method.invoke(conn, clientTimeZone);
LOG.info("Time zone has been set");
} catch (Exception ex) {
LOG.warn("Time zone " + clientTimeZone +
" could not be set on oracle database.");
LOG.info("Setting default time zone: UTC");
try {
method.invoke(conn, "UTC");
} catch (Exception ex2) {
LOG.error("Could not set time zone for oracle connection", ex2);
// rethrow SQLException
throw new SQLException(ex);
}
}
}
/**
* This importTable() implementation continues to use the older DBInputFormat
* because DataDrivenDBInputFormat does not currently work with Oracle.
@ -111,5 +156,117 @@ public void importTable(ImportJobContext context)
importer.runImport(tableName, jarFile, splitCol, options.getConf());
}
/**
* Resolve a database-specific type to the Java type that should contain it.
* @param sqlType
* @return the name of a Java type to hold the sql datatype, or null if none.
*/
public String toJavaType(int sqlType) {
String defaultJavaType = super.toJavaType(sqlType);
return (defaultJavaType == null) ? dbToJavaType(sqlType) : defaultJavaType;
}
/**
* Attempt to map sql type to java type
* @param sqlType sql type
* @return java type
*/
private String dbToJavaType(int sqlType) {
// load class oracle.jdbc.OracleTypes
// need to use reflection because oracle specific libraries
// are not accessible in this context
Class typeClass = getTypeClass("oracle.jdbc.OracleTypes");
// check if it is TIMESTAMPTZ
int dbType = getDatabaseType(typeClass, "TIMESTAMPTZ");
if (sqlType == dbType) {
return "java.sql.Timestamp";
}
// check if it is TIMESTAMPLTZ
dbType = getDatabaseType(typeClass, "TIMESTAMPLTZ");
if (sqlType == dbType) {
return "java.sql.Timestamp";
}
// return null if no java type was found for sqlType
return null;
}
/**
* Attempt to map sql type to hive type
* @param sqlType sql data type
* @return hive data type
*/
public String toHiveType(int sqlType) {
String defaultHiveType = super.toHiveType(sqlType);
return (defaultHiveType == null) ? dbToHiveType(sqlType) : defaultHiveType;
}
/**
* Resolve a database-specific type to Hive type
* @param sqlType sql type
* @return hive type
*/
private String dbToHiveType(int sqlType) {
// load class oracle.jdbc.OracleTypes
// need to use reflection because oracle specific libraries
// are not accessible in this context
Class typeClass = getTypeClass("oracle.jdbc.OracleTypes");
// check if it is TIMESTAMPTZ
int dbType = getDatabaseType(typeClass, "TIMESTAMPTZ");
if (sqlType == dbType) {
return "STRING";
}
// check if it is TIMESTAMPLTZ
dbType = getDatabaseType(typeClass, "TIMESTAMPLTZ");
if (sqlType == dbType) {
return "STRING";
}
// return null if no hive type was found for sqlType
return null;
}
/**
* Get database type
* @param clazz oracle class representing sql types
* @param fieldName field name
* @return value of database type constant
*/
private int getDatabaseType(Class clazz, String fieldName) {
// need to use reflection to extract constant values
// because the database specific java libraries are not accessible in this context
int value = -1;
try {
java.lang.reflect.Field field = clazz.getDeclaredField(fieldName);
value = field.getInt(null);
} catch (NoSuchFieldException ex) {
LOG.error("Could not retrieve value for field " + fieldName, ex);
} catch (IllegalAccessException ex) {
LOG.error("Could not retrieve value for field " + fieldName, ex);
}
return value;
}
/**
* Load class by name
* @param className class name
* @return class instance
*/
private Class getTypeClass(String className) {
// need to use reflection to load class
// because the database specific java libraries are not accessible in this context
Class typeClass = null;
try {
typeClass = Class.forName(className);
} catch (ClassNotFoundException ex) {
LOG.error("Could not load class " + className, ex);
}
return typeClass;
}
}

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.sqoop.manager;
import org.apache.hadoop.sqoop.SqoopOptions;
import org.apache.hadoop.sqoop.hive.HiveTypes;
import org.apache.hadoop.sqoop.mapreduce.DataDrivenImportJob;
import org.apache.hadoop.sqoop.mapreduce.ExportJob;
import org.apache.hadoop.sqoop.util.ExportException;
@ -308,7 +309,7 @@ protected ResultSet execute(String stmt, Object... args) throws SQLException {
* @param sqlType
* @return the name of a Java type to hold the sql datatype, or null if none.
*/
public static String toJavaType(int sqlType) {
public String toJavaType(int sqlType) {
// mappings from http://java.sun.com/j2se/1.3/docs/guide/jdbc/getstart/mapping.html
if (sqlType == Types.INTEGER) {
return "Integer";
@ -347,10 +348,19 @@ public static String toJavaType(int sqlType) {
} else {
// TODO(aaron): Support BINARY, VARBINARY, LONGVARBINARY, DISTINCT, CLOB, BLOB, ARRAY,
// STRUCT, REF, JAVA_OBJECT.
// return database specific java data type
return null;
}
}
/**
* Resolve a database-specific type to Hive data type
* @param sqlType sql type
* @return hive type
*/
public String toHiveType(int sqlType) {
return HiveTypes.toHiveType(sqlType);
}
public void close() throws SQLException {
}

View File

@ -376,7 +376,7 @@ private void generateFields(Map<String, Integer> columnTypes, String [] colNames
for (String col : colNames) {
int sqlType = columnTypes.get(col);
String javaType = SqlManager.toJavaType(sqlType);
String javaType = connManager.toJavaType(sqlType);
if (null == javaType) {
LOG.error("Cannot resolve SQL type " + sqlType);
continue;
@ -406,7 +406,7 @@ private void generateDbRead(Map<String, Integer> columnTypes, String [] colNames
fieldNum++;
int sqlType = columnTypes.get(col);
String javaType = SqlManager.toJavaType(sqlType);
String javaType = connManager.toJavaType(sqlType);
if (null == javaType) {
LOG.error("No Java type for SQL type " + sqlType);
continue;
@ -443,7 +443,7 @@ private void generateDbWrite(Map<String, Integer> columnTypes, String [] colName
fieldNum++;
int sqlType = columnTypes.get(col);
String javaType = SqlManager.toJavaType(sqlType);
String javaType = connManager.toJavaType(sqlType);
if (null == javaType) {
LOG.error("No Java type for SQL type " + sqlType);
continue;
@ -476,7 +476,7 @@ private void generateHadoopRead(Map<String, Integer> columnTypes, String [] colN
for (String col : colNames) {
int sqlType = columnTypes.get(col);
String javaType = SqlManager.toJavaType(sqlType);
String javaType = connManager.toJavaType(sqlType);
if (null == javaType) {
LOG.error("No Java type for SQL type " + sqlType);
continue;
@ -532,7 +532,7 @@ private void generateToString(Map<String, Integer> columnTypes, String [] colNam
boolean first = true;
for (String col : colNames) {
int sqlType = columnTypes.get(col);
String javaType = SqlManager.toJavaType(sqlType);
String javaType = connManager.toJavaType(sqlType);
if (null == javaType) {
LOG.error("No Java type for SQL type " + sqlType);
continue;
@ -597,7 +597,7 @@ private void parseNullVal(String colName, StringBuilder sb) {
private void parseColumn(String colName, int colType, StringBuilder sb) {
// assume that we have __it and __cur_str vars, based on __loadFromFields() code.
sb.append(" __cur_str = __it.next();\n");
String javaType = SqlManager.toJavaType(colType);
String javaType = connManager.toJavaType(colType);
parseNullVal(colName, sb);
if (javaType.equals("String")) {
@ -690,7 +690,7 @@ private void generateHadoopWrite(Map<String, Integer> columnTypes, String [] col
for (String col : colNames) {
int sqlType = columnTypes.get(col);
String javaType = SqlManager.toJavaType(sqlType);
String javaType = connManager.toJavaType(sqlType);
if (null == javaType) {
LOG.error("No Java type for SQL type " + sqlType);
continue;

View File

@ -113,6 +113,24 @@ public String getPrimaryKey(String tableName) {
return null;
}
/**
* Default implementation
* @param sqlType sql data type
* @return java data type
*/
public String toJavaType(int sqlType) {
return null;
}
/**
* Default implementation
* @param sqlType sql data type
* @return hive data type
*/
public String toHiveType(int sqlType) {
return null;
}
public Map<String, Integer> getColumnTypes(String tableName) {
return null;
}

View File

@ -27,7 +27,13 @@
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Date;
import java.util.Calendar;
import java.util.TimeZone;
import java.util.ArrayList;
import java.text.ParseException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import junit.framework.TestCase;
@ -101,14 +107,16 @@ public void setUp() {
+ "start_date DATE, "
+ "salary FLOAT, "
+ "dept VARCHAR2(32), "
+ "timestamp_tz TIMESTAMP WITH TIME ZONE, "
+ "timestamp_ltz TIMESTAMP WITH LOCAL TIME ZONE, "
+ "PRIMARY KEY (id))");
st.executeUpdate("INSERT INTO " + TABLE_NAME + " VALUES("
+ "1,'Aaron',to_date('2009-05-14','yyyy-mm-dd'),1000000.00,'engineering')");
+ "1,'Aaron',to_date('2009-05-14','yyyy-mm-dd'),1000000.00,'engineering','29-DEC-09 12.00.00.000000000 PM','29-DEC-09 12.00.00.000000000 PM')");
st.executeUpdate("INSERT INTO " + TABLE_NAME + " VALUES("
+ "2,'Bob',to_date('2009-04-20','yyyy-mm-dd'),400.00,'sales')");
+ "2,'Bob',to_date('2009-04-20','yyyy-mm-dd'),400.00,'sales','30-DEC-09 12.00.00.000000000 PM','30-DEC-09 12.00.00.000000000 PM')");
st.executeUpdate("INSERT INTO " + TABLE_NAME + " VALUES("
+ "3,'Fred',to_date('2009-01-23','yyyy-mm-dd'),15.00,'marketing')");
+ "3,'Fred',to_date('2009-01-23','yyyy-mm-dd'),15.00,'marketing','31-DEC-09 12.00.00.000000000 PM','31-DEC-09 12.00.00.000000000 PM')");
connection.commit();
} catch (SQLException sqlE) {
LOG.error("Encountered SQL Exception: " + sqlE);
@ -188,7 +196,7 @@ private void runOracleTest(String [] expectedResults) throws IOException {
// Read through the file and make sure it's all there.
r = new BufferedReader(new InputStreamReader(new FileInputStream(f)));
for (String expectedLine : expectedResults) {
assertEquals(expectedLine, r.readLine());
compareRecords(expectedLine, r.readLine());
}
} catch (IOException ioe) {
LOG.error("Got IOException verifying results: " + ioe.toString());
@ -208,11 +216,80 @@ public void testOracleImport() throws IOException {
// a strict DATE type. Thus we include HH:MM:SS.mmmmm below.
// See http://www.oracle.com/technology/tech/java/sqlj_jdbc/htdocs/jdbc_faq.html#08_01
String [] expectedResults = {
"1,Aaron,2009-05-14 00:00:00.0,1000000,engineering",
"2,Bob,2009-04-20 00:00:00.0,400,sales",
"3,Fred,2009-01-23 00:00:00.0,15,marketing"
"1,Aaron,2009-05-14 00:00:00.0,1000000,engineering,2009-12-29 12:00:00.0,2009-12-29 12:00:00.0",
"2,Bob,2009-04-20 00:00:00.0,400,sales,2009-12-30 12:00:00.0,2009-12-30 12:00:00.0",
"3,Fred,2009-01-23 00:00:00.0,15,marketing,2009-12-31 12:00:00.0,2009-12-31 12:00:00.0"
};
runOracleTest(expectedResults);
}
/**
* Compare two lines
* @param expectedLine expected line
* @param receivedLine received line
* @throws IOException exception during lines comparison
*/
private void compareRecords(String expectedLine, String receivedLine) throws IOException {
// handle null case
if (expectedLine == null || receivedLine == null) {
return;
}
// check if lines are equal
if (expectedLine.equals(receivedLine)) {
return;
}
// check if size is the same
String [] expectedValues = expectedLine.split(",");
String [] receivedValues = receivedLine.split(",");
if (expectedValues.length != 7 || receivedValues.length != 7) {
LOG.error("Number of expected fields did not match number of received fields");
throw new IOException("Number of expected fields did not match number of received fields");
}
// check first 5 values
boolean mismatch = false;
for (int i = 0; !mismatch && i < 5; i++) {
mismatch = !expectedValues[i].equals(receivedValues[i]);
}
if (mismatch) {
throw new IOException("Expected:<" + expectedLine + "> but was:<" + receivedLine + ">");
}
Date expectedDate = null;
Date receivedDate = null;
DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.S");
int offset = TimeZone.getDefault().getOffset(System.currentTimeMillis()) / 3600000;
for (int i = 5; i < 7; i++) {
// parse expected timestamp
try {
expectedDate = df.parse(expectedValues[i]);
} catch (ParseException ex) {
LOG.error("Could not parse expected timestamp: " + expectedValues[i]);
throw new IOException("Could not parse expected timestamp: " + expectedValues[i]);
}
// parse received timestamp
try {
receivedDate = df.parse(receivedValues[i]);
} catch (ParseException ex) {
LOG.error("Could not parse received timestamp: " + receivedValues[i]);
throw new IOException("Could not parse received timestamp: " + receivedValues[i]);
}
// compare two timestamps considering timezone offset
Calendar expectedCal = Calendar.getInstance();
expectedCal.setTime(expectedDate);
expectedCal.add(Calendar.HOUR, offset);
Calendar receivedCal = Calendar.getInstance();
receivedCal.setTime(receivedDate);
if (!expectedCal.equals(receivedCal)) {
throw new IOException("Expected:<" + expectedLine + "> but was:<" + receivedLine + ">, while timezone offset is: " + offset);
}
}
}
}