diff --git a/src/java/org/apache/hadoop/sqoop/hive/TableDefWriter.java b/src/java/org/apache/hadoop/sqoop/hive/TableDefWriter.java index 81550afd..fa06c8cb 100644 --- a/src/java/org/apache/hadoop/sqoop/hive/TableDefWriter.java +++ b/src/java/org/apache/hadoop/sqoop/hive/TableDefWriter.java @@ -24,7 +24,6 @@ import org.apache.hadoop.sqoop.SqoopOptions; import org.apache.hadoop.sqoop.manager.ConnManager; -import org.apache.hadoop.sqoop.hive.HiveTypes; import java.io.File; import java.io.IOException; @@ -95,9 +94,9 @@ public String getCreateTableStmt() throws IOException { first = false; Integer colType = columnTypes.get(col); - String hiveColType = HiveTypes.toHiveType(colType); + String hiveColType = connManager.toHiveType(colType); if (null == hiveColType) { - throw new IOException("Hive does not support the SQL type for column " + col); + throw new IOException("Hive does not support the SQL type for column " + col); } sb.append(col + " " + hiveColType); diff --git a/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java b/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java index dfcd61c1..493bb814 100644 --- a/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java +++ b/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java @@ -56,6 +56,20 @@ public abstract class ConnManager { */ public abstract String getPrimaryKey(String tableName); + /** + * Return java type for SQL type + * @param sqlType sql type + * @return java type + */ + public abstract String toJavaType(int sqlType); + + /** + * Return hive type for SQL type + * @param sqlType sql type + * @return hive type + */ + public abstract String toHiveType(int sqlType); + /** * Return an unordered mapping from colname to sqltype for * all columns in a table. diff --git a/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java b/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java index f0d92e54..869057bb 100644 --- a/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java +++ b/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java @@ -23,7 +23,10 @@ import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Types; import java.util.ArrayList; +import java.util.TimeZone; +import java.lang.reflect.Method; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -83,9 +86,51 @@ protected Connection makeConnection() throws SQLException { // We only use this for metadata queries. Loosest semantics are okay. connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); + // Setting session time zone + setSessionTimeZone(connection); + return connection; } + /** + * Set session time zone + * @param conn Connection object + * @throws SQLException instance + */ + private void setSessionTimeZone(Connection conn) throws SQLException { + // need to use reflection to call the method setSessionTimeZone on the OracleConnection class + // because oracle specific java libraries are not accessible in this context + Method method; + try { + method = conn.getClass().getMethod( + "setSessionTimeZone", new Class [] {String.class}); + } catch (Exception ex) { + LOG.error("Could not find method setSessionTimeZone in " + conn.getClass().getName(), ex); + // rethrow SQLException + throw new SQLException(ex); + } + + // Need to set the time zone in order for Java + // to correctly access the column "TIMESTAMP WITH LOCAL TIME ZONE" + String clientTimeZone = TimeZone.getDefault().getID(); + try { + method.setAccessible(true); + method.invoke(conn, clientTimeZone); + LOG.info("Time zone has been set"); + } catch (Exception ex) { + LOG.warn("Time zone " + clientTimeZone + + " could not be set on oracle database."); + LOG.info("Setting default time zone: UTC"); + try { + method.invoke(conn, "UTC"); + } catch (Exception ex2) { + LOG.error("Could not set time zone for oracle connection", ex2); + // rethrow SQLException + throw new SQLException(ex); + } + } + } + /** * This importTable() implementation continues to use the older DBInputFormat * because DataDrivenDBInputFormat does not currently work with Oracle. @@ -111,5 +156,117 @@ public void importTable(ImportJobContext context) importer.runImport(tableName, jarFile, splitCol, options.getConf()); } + + /** + * Resolve a database-specific type to the Java type that should contain it. + * @param sqlType + * @return the name of a Java type to hold the sql datatype, or null if none. + */ + public String toJavaType(int sqlType) { + String defaultJavaType = super.toJavaType(sqlType); + return (defaultJavaType == null) ? dbToJavaType(sqlType) : defaultJavaType; + } + + /** + * Attempt to map sql type to java type + * @param sqlType sql type + * @return java type + */ + private String dbToJavaType(int sqlType) { + // load class oracle.jdbc.OracleTypes + // need to use reflection because oracle specific libraries + // are not accessible in this context + Class typeClass = getTypeClass("oracle.jdbc.OracleTypes"); + + // check if it is TIMESTAMPTZ + int dbType = getDatabaseType(typeClass, "TIMESTAMPTZ"); + if (sqlType == dbType) { + return "java.sql.Timestamp"; + } + + // check if it is TIMESTAMPLTZ + dbType = getDatabaseType(typeClass, "TIMESTAMPLTZ"); + if (sqlType == dbType) { + return "java.sql.Timestamp"; + } + + // return null if no java type was found for sqlType + return null; + } + + /** + * Attempt to map sql type to hive type + * @param sqlType sql data type + * @return hive data type + */ + public String toHiveType(int sqlType) { + String defaultHiveType = super.toHiveType(sqlType); + return (defaultHiveType == null) ? dbToHiveType(sqlType) : defaultHiveType; + } + + /** + * Resolve a database-specific type to Hive type + * @param sqlType sql type + * @return hive type + */ + private String dbToHiveType(int sqlType) { + // load class oracle.jdbc.OracleTypes + // need to use reflection because oracle specific libraries + // are not accessible in this context + Class typeClass = getTypeClass("oracle.jdbc.OracleTypes"); + + // check if it is TIMESTAMPTZ + int dbType = getDatabaseType(typeClass, "TIMESTAMPTZ"); + if (sqlType == dbType) { + return "STRING"; + } + + // check if it is TIMESTAMPLTZ + dbType = getDatabaseType(typeClass, "TIMESTAMPLTZ"); + if (sqlType == dbType) { + return "STRING"; + } + + // return null if no hive type was found for sqlType + return null; + } + + /** + * Get database type + * @param clazz oracle class representing sql types + * @param fieldName field name + * @return value of database type constant + */ + private int getDatabaseType(Class clazz, String fieldName) { + // need to use reflection to extract constant values + // because the database specific java libraries are not accessible in this context + int value = -1; + try { + java.lang.reflect.Field field = clazz.getDeclaredField(fieldName); + value = field.getInt(null); + } catch (NoSuchFieldException ex) { + LOG.error("Could not retrieve value for field " + fieldName, ex); + } catch (IllegalAccessException ex) { + LOG.error("Could not retrieve value for field " + fieldName, ex); + } + return value; + } + + /** + * Load class by name + * @param className class name + * @return class instance + */ + private Class getTypeClass(String className) { + // need to use reflection to load class + // because the database specific java libraries are not accessible in this context + Class typeClass = null; + try { + typeClass = Class.forName(className); + } catch (ClassNotFoundException ex) { + LOG.error("Could not load class " + className, ex); + } + return typeClass; + } } diff --git a/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java b/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java index 808efcc7..42846d99 100644 --- a/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java +++ b/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java @@ -19,6 +19,7 @@ package org.apache.hadoop.sqoop.manager; import org.apache.hadoop.sqoop.SqoopOptions; +import org.apache.hadoop.sqoop.hive.HiveTypes; import org.apache.hadoop.sqoop.mapreduce.DataDrivenImportJob; import org.apache.hadoop.sqoop.mapreduce.ExportJob; import org.apache.hadoop.sqoop.util.ExportException; @@ -308,7 +309,7 @@ protected ResultSet execute(String stmt, Object... args) throws SQLException { * @param sqlType * @return the name of a Java type to hold the sql datatype, or null if none. */ - public static String toJavaType(int sqlType) { + public String toJavaType(int sqlType) { // mappings from http://java.sun.com/j2se/1.3/docs/guide/jdbc/getstart/mapping.html if (sqlType == Types.INTEGER) { return "Integer"; @@ -347,10 +348,19 @@ public static String toJavaType(int sqlType) { } else { // TODO(aaron): Support BINARY, VARBINARY, LONGVARBINARY, DISTINCT, CLOB, BLOB, ARRAY, // STRUCT, REF, JAVA_OBJECT. + // return database specific java data type return null; } } + /** + * Resolve a database-specific type to Hive data type + * @param sqlType sql type + * @return hive type + */ + public String toHiveType(int sqlType) { + return HiveTypes.toHiveType(sqlType); + } public void close() throws SQLException { } diff --git a/src/java/org/apache/hadoop/sqoop/orm/ClassWriter.java b/src/java/org/apache/hadoop/sqoop/orm/ClassWriter.java index 1e046d12..611e0b08 100644 --- a/src/java/org/apache/hadoop/sqoop/orm/ClassWriter.java +++ b/src/java/org/apache/hadoop/sqoop/orm/ClassWriter.java @@ -376,7 +376,7 @@ private void generateFields(Map columnTypes, String [] colNames for (String col : colNames) { int sqlType = columnTypes.get(col); - String javaType = SqlManager.toJavaType(sqlType); + String javaType = connManager.toJavaType(sqlType); if (null == javaType) { LOG.error("Cannot resolve SQL type " + sqlType); continue; @@ -406,7 +406,7 @@ private void generateDbRead(Map columnTypes, String [] colNames fieldNum++; int sqlType = columnTypes.get(col); - String javaType = SqlManager.toJavaType(sqlType); + String javaType = connManager.toJavaType(sqlType); if (null == javaType) { LOG.error("No Java type for SQL type " + sqlType); continue; @@ -443,7 +443,7 @@ private void generateDbWrite(Map columnTypes, String [] colName fieldNum++; int sqlType = columnTypes.get(col); - String javaType = SqlManager.toJavaType(sqlType); + String javaType = connManager.toJavaType(sqlType); if (null == javaType) { LOG.error("No Java type for SQL type " + sqlType); continue; @@ -476,7 +476,7 @@ private void generateHadoopRead(Map columnTypes, String [] colN for (String col : colNames) { int sqlType = columnTypes.get(col); - String javaType = SqlManager.toJavaType(sqlType); + String javaType = connManager.toJavaType(sqlType); if (null == javaType) { LOG.error("No Java type for SQL type " + sqlType); continue; @@ -532,7 +532,7 @@ private void generateToString(Map columnTypes, String [] colNam boolean first = true; for (String col : colNames) { int sqlType = columnTypes.get(col); - String javaType = SqlManager.toJavaType(sqlType); + String javaType = connManager.toJavaType(sqlType); if (null == javaType) { LOG.error("No Java type for SQL type " + sqlType); continue; @@ -597,7 +597,7 @@ private void parseNullVal(String colName, StringBuilder sb) { private void parseColumn(String colName, int colType, StringBuilder sb) { // assume that we have __it and __cur_str vars, based on __loadFromFields() code. sb.append(" __cur_str = __it.next();\n"); - String javaType = SqlManager.toJavaType(colType); + String javaType = connManager.toJavaType(colType); parseNullVal(colName, sb); if (javaType.equals("String")) { @@ -690,7 +690,7 @@ private void generateHadoopWrite(Map columnTypes, String [] col for (String col : colNames) { int sqlType = columnTypes.get(col); - String javaType = SqlManager.toJavaType(sqlType); + String javaType = connManager.toJavaType(sqlType); if (null == javaType) { LOG.error("No Java type for SQL type " + sqlType); continue; diff --git a/src/test/org/apache/hadoop/sqoop/TestConnFactory.java b/src/test/org/apache/hadoop/sqoop/TestConnFactory.java index 96c91e38..6af7ba34 100644 --- a/src/test/org/apache/hadoop/sqoop/TestConnFactory.java +++ b/src/test/org/apache/hadoop/sqoop/TestConnFactory.java @@ -113,6 +113,24 @@ public String getPrimaryKey(String tableName) { return null; } + /** + * Default implementation + * @param sqlType sql data type + * @return java data type + */ + public String toJavaType(int sqlType) { + return null; + } + + /** + * Default implementation + * @param sqlType sql data type + * @return hive data type + */ + public String toHiveType(int sqlType) { + return null; + } + public Map getColumnTypes(String tableName) { return null; } diff --git a/src/test/org/apache/hadoop/sqoop/manager/OracleManagerTest.java b/src/test/org/apache/hadoop/sqoop/manager/OracleManagerTest.java index be09e266..40771d92 100644 --- a/src/test/org/apache/hadoop/sqoop/manager/OracleManagerTest.java +++ b/src/test/org/apache/hadoop/sqoop/manager/OracleManagerTest.java @@ -27,7 +27,13 @@ import java.sql.Connection; import java.sql.SQLException; import java.sql.Statement; +import java.util.Date; +import java.util.Calendar; +import java.util.TimeZone; import java.util.ArrayList; +import java.text.ParseException; +import java.text.DateFormat; +import java.text.SimpleDateFormat; import junit.framework.TestCase; @@ -101,14 +107,16 @@ public void setUp() { + "start_date DATE, " + "salary FLOAT, " + "dept VARCHAR2(32), " + + "timestamp_tz TIMESTAMP WITH TIME ZONE, " + + "timestamp_ltz TIMESTAMP WITH LOCAL TIME ZONE, " + "PRIMARY KEY (id))"); st.executeUpdate("INSERT INTO " + TABLE_NAME + " VALUES(" - + "1,'Aaron',to_date('2009-05-14','yyyy-mm-dd'),1000000.00,'engineering')"); + + "1,'Aaron',to_date('2009-05-14','yyyy-mm-dd'),1000000.00,'engineering','29-DEC-09 12.00.00.000000000 PM','29-DEC-09 12.00.00.000000000 PM')"); st.executeUpdate("INSERT INTO " + TABLE_NAME + " VALUES(" - + "2,'Bob',to_date('2009-04-20','yyyy-mm-dd'),400.00,'sales')"); + + "2,'Bob',to_date('2009-04-20','yyyy-mm-dd'),400.00,'sales','30-DEC-09 12.00.00.000000000 PM','30-DEC-09 12.00.00.000000000 PM')"); st.executeUpdate("INSERT INTO " + TABLE_NAME + " VALUES(" - + "3,'Fred',to_date('2009-01-23','yyyy-mm-dd'),15.00,'marketing')"); + + "3,'Fred',to_date('2009-01-23','yyyy-mm-dd'),15.00,'marketing','31-DEC-09 12.00.00.000000000 PM','31-DEC-09 12.00.00.000000000 PM')"); connection.commit(); } catch (SQLException sqlE) { LOG.error("Encountered SQL Exception: " + sqlE); @@ -180,7 +188,7 @@ private void runOracleTest(String [] expectedResults) throws IOException { ioe.printStackTrace(); fail(ioe.toString()); } - + File f = new File(filePath.toString()); assertTrue("Could not find imported data file", f.exists()); BufferedReader r = null; @@ -188,7 +196,7 @@ private void runOracleTest(String [] expectedResults) throws IOException { // Read through the file and make sure it's all there. r = new BufferedReader(new InputStreamReader(new FileInputStream(f))); for (String expectedLine : expectedResults) { - assertEquals(expectedLine, r.readLine()); + compareRecords(expectedLine, r.readLine()); } } catch (IOException ioe) { LOG.error("Got IOException verifying results: " + ioe.toString()); @@ -208,11 +216,80 @@ public void testOracleImport() throws IOException { // a strict DATE type. Thus we include HH:MM:SS.mmmmm below. // See http://www.oracle.com/technology/tech/java/sqlj_jdbc/htdocs/jdbc_faq.html#08_01 String [] expectedResults = { - "1,Aaron,2009-05-14 00:00:00.0,1000000,engineering", - "2,Bob,2009-04-20 00:00:00.0,400,sales", - "3,Fred,2009-01-23 00:00:00.0,15,marketing" + "1,Aaron,2009-05-14 00:00:00.0,1000000,engineering,2009-12-29 12:00:00.0,2009-12-29 12:00:00.0", + "2,Bob,2009-04-20 00:00:00.0,400,sales,2009-12-30 12:00:00.0,2009-12-30 12:00:00.0", + "3,Fred,2009-01-23 00:00:00.0,15,marketing,2009-12-31 12:00:00.0,2009-12-31 12:00:00.0" }; runOracleTest(expectedResults); } + + /** + * Compare two lines + * @param expectedLine expected line + * @param receivedLine received line + * @throws IOException exception during lines comparison + */ + private void compareRecords(String expectedLine, String receivedLine) throws IOException { + // handle null case + if (expectedLine == null || receivedLine == null) { + return; + } + + // check if lines are equal + if (expectedLine.equals(receivedLine)) { + return; + } + + // check if size is the same + String [] expectedValues = expectedLine.split(","); + String [] receivedValues = receivedLine.split(","); + if (expectedValues.length != 7 || receivedValues.length != 7) { + LOG.error("Number of expected fields did not match number of received fields"); + throw new IOException("Number of expected fields did not match number of received fields"); + } + + // check first 5 values + boolean mismatch = false; + for (int i = 0; !mismatch && i < 5; i++) { + mismatch = !expectedValues[i].equals(receivedValues[i]); + } + if (mismatch) { + throw new IOException("Expected:<" + expectedLine + "> but was:<" + receivedLine + ">"); + } + + Date expectedDate = null; + Date receivedDate = null; + DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.S"); + int offset = TimeZone.getDefault().getOffset(System.currentTimeMillis()) / 3600000; + for (int i = 5; i < 7; i++) { + // parse expected timestamp + try { + expectedDate = df.parse(expectedValues[i]); + } catch (ParseException ex) { + LOG.error("Could not parse expected timestamp: " + expectedValues[i]); + throw new IOException("Could not parse expected timestamp: " + expectedValues[i]); + } + + // parse received timestamp + try { + receivedDate = df.parse(receivedValues[i]); + } catch (ParseException ex) { + LOG.error("Could not parse received timestamp: " + receivedValues[i]); + throw new IOException("Could not parse received timestamp: " + receivedValues[i]); + } + + // compare two timestamps considering timezone offset + Calendar expectedCal = Calendar.getInstance(); + expectedCal.setTime(expectedDate); + expectedCal.add(Calendar.HOUR, offset); + + Calendar receivedCal = Calendar.getInstance(); + receivedCal.setTime(receivedDate); + + if (!expectedCal.equals(receivedCal)) { + throw new IOException("Expected:<" + expectedLine + "> but was:<" + receivedLine + ">, while timezone offset is: " + offset); + } + } + } }