5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-06 21:23:03 +08:00

SQOOP-1862: Sqoop2: JDBC Connector To side needs to handle converting JODA objects to sql date

(Veena Basavaraj via Abraham Elmahrek)
This commit is contained in:
Abraham Elmahrek 2015-02-03 13:58:36 -08:00
parent fc32358abc
commit 27d87b4f2f
10 changed files with 102 additions and 44 deletions

View File

@ -53,7 +53,7 @@ public static void assertRow(DatabaseProvider provider, String tableName, Objec
int i = 1; int i = 1;
for(Object expectedValue : values) { for(Object expectedValue : values) {
Object actualValue = rs.getObject(i); Object actualValue = rs.getObject(i);
assertEquals("Columns do not match on position: " + i, expectedValue, actualValue); assertEquals("Columns do not match on position: " + i, expectedValue.toString(), actualValue.toString());
i++; i++;
} }

View File

@ -17,6 +17,15 @@
*/ */
package org.apache.sqoop.connector.jdbc; package org.apache.sqoop.connector.jdbc;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.error.code.GenericJdbcConnectorError;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.schema.type.Column;
import org.joda.time.DateTime;
import org.joda.time.LocalDate;
import org.joda.time.LocalTime;
import java.sql.Connection; import java.sql.Connection;
import java.sql.DatabaseMetaData; import java.sql.DatabaseMetaData;
import java.sql.DriverManager; import java.sql.DriverManager;
@ -25,10 +34,7 @@
import java.sql.ResultSetMetaData; import java.sql.ResultSetMetaData;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Statement; import java.sql.Statement;
import java.sql.Timestamp;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.error.code.GenericJdbcConnectorError;
public class GenericJdbcExecutor { public class GenericJdbcExecutor {
@ -167,11 +173,35 @@ public void beginBatch(String sql) {
} }
} }
public void addBatch(Object[] array) { public void addBatch(Object[] array, Schema schema) {
try { try {
Column[] schemaColumns = schema.getColumnsArray();
for (int i = 0; i < array.length; i++) { for (int i = 0; i < array.length; i++) {
Column schemaColumn = schemaColumns[i];
switch (schemaColumn.getType()) {
case DATE:
// convert the JODA date to sql date
LocalDate date = (LocalDate) array[i];
java.sql.Date sqlDate = new java.sql.Date(date.toDateTimeAtCurrentTime().getMillis());
preparedStatement.setObject(i + 1, sqlDate);
break;
case DATE_TIME:
// convert the JODA date time to sql date
DateTime dateTime = (DateTime) array[i];
Timestamp timestamp = new Timestamp(dateTime.getMillis());
preparedStatement.setObject(i + 1, timestamp);
break;
case TIME:
// convert the JODA time to sql date
LocalTime time = (LocalTime) array[i];
java.sql.Time sqlTime = new java.sql.Time(time.toDateTimeToday().getMillis());
preparedStatement.setObject(i + 1, sqlTime);
break;
default:
// for anything else
preparedStatement.setObject(i + 1, array[i]); preparedStatement.setObject(i + 1, array[i]);
} }
}
preparedStatement.addBatch(); preparedStatement.addBatch();
} catch (SQLException e) { } catch (SQLException e) {
logSQLException(e); logSQLException(e);

View File

@ -38,7 +38,6 @@ public void load(LoaderContext context, LinkConfiguration linkConfig, ToJobConfi
String password = linkConfig.linkConfig.password; String password = linkConfig.linkConfig.password;
GenericJdbcExecutor executor = new GenericJdbcExecutor(driver, url, username, password); GenericJdbcExecutor executor = new GenericJdbcExecutor(driver, url, username, password);
executor.setAutoCommit(false); executor.setAutoCommit(false);
String sql = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_TO_DATA_SQL); String sql = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_TO_DATA_SQL);
executor.beginBatch(sql); executor.beginBatch(sql);
try { try {
@ -48,7 +47,7 @@ public void load(LoaderContext context, LinkConfiguration linkConfig, ToJobConfi
while ((array = context.getDataReader().readArrayRecord()) != null) { while ((array = context.getDataReader().readArrayRecord()) != null) {
numberOfRowsPerBatch++; numberOfRowsPerBatch++;
executor.addBatch(array); executor.addBatch(array, context.getSchema());
if (numberOfRowsPerBatch == rowsPerBatch) { if (numberOfRowsPerBatch == rowsPerBatch) {
numberOfBatchesPerTransaction++; numberOfBatchesPerTransaction++;

View File

@ -26,6 +26,17 @@
import org.apache.sqoop.etl.io.DataReader; import org.apache.sqoop.etl.io.DataReader;
import org.apache.sqoop.job.etl.Loader; import org.apache.sqoop.job.etl.Loader;
import org.apache.sqoop.job.etl.LoaderContext; import org.apache.sqoop.job.etl.LoaderContext;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.schema.type.Date;
import org.apache.sqoop.schema.type.DateTime;
import org.apache.sqoop.schema.type.Time;
import org.apache.sqoop.schema.type.Decimal;
import org.apache.sqoop.schema.type.FixedPoint;
import org.apache.sqoop.schema.type.Text;
import org.joda.time.LocalDate;
import org.joda.time.LocalTime;
import org.testng.annotations.AfterMethod; import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod; import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider; import org.testng.annotations.DataProvider;
@ -64,7 +75,7 @@ public void setUp() {
if (!executor.existTable(tableName)) { if (!executor.existTable(tableName)) {
executor.executeUpdate("CREATE TABLE " executor.executeUpdate("CREATE TABLE "
+ executor.delimitIdentifier(tableName) + executor.delimitIdentifier(tableName)
+ "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))"); + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20), DATECOL DATE, DATETIMECOL TIMESTAMP, TIMECOL TIME)");
} else { } else {
executor.deleteTableData(tableName); executor.deleteTableData(tableName);
} }
@ -75,6 +86,7 @@ public void tearDown() {
executor.close(); executor.close();
} }
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test @Test
public void testInsert() throws Exception { public void testInsert() throws Exception {
MutableContext context = new MutableMapContext(); MutableContext context = new MutableMapContext();
@ -87,11 +99,16 @@ public void testInsert() throws Exception {
ToJobConfiguration jobConfig = new ToJobConfiguration(); ToJobConfiguration jobConfig = new ToJobConfiguration();
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_TO_DATA_SQL, context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_TO_DATA_SQL,
"INSERT INTO " + executor.delimitIdentifier(tableName) + " VALUES (?,?,?)"); "INSERT INTO " + executor.delimitIdentifier(tableName) + " VALUES (?,?,?,?,?,?)");
Loader loader = new GenericJdbcLoader(); Loader loader = new GenericJdbcLoader();
DummyReader reader = new DummyReader(); DummyReader reader = new DummyReader();
LoaderContext loaderContext = new LoaderContext(context, reader, null); Schema schema = new Schema("TestLoader");
schema.addColumn(new FixedPoint("c1", 2L, true)).addColumn(new Decimal("c2", 5, 2))
.addColumn(new Text("c3")).addColumn(new Date("c4"))
.addColumn(new DateTime("c5", false, false)).addColumn(new Time("c6", false));
LoaderContext loaderContext = new LoaderContext(context, reader, schema);
loader.load(loaderContext, linkConfig, jobConfig); loader.load(loaderContext, linkConfig, jobConfig);
int index = START; int index = START;
@ -101,6 +118,10 @@ public void testInsert() throws Exception {
assertEquals(index, rs.getObject(1)); assertEquals(index, rs.getObject(1));
assertEquals((double) index, rs.getObject(2)); assertEquals((double) index, rs.getObject(2));
assertEquals(String.valueOf(index), rs.getObject(3)); assertEquals(String.valueOf(index), rs.getObject(3));
assertEquals("2004-10-19", rs.getObject(4).toString());
assertEquals("2004-10-19 10:23:34.0", rs.getObject(5).toString());
assertEquals("11:33:59", rs.getObject(6).toString());
index++; index++;
} }
assertEquals(numberOfRows, index-START); assertEquals(numberOfRows, index-START);
@ -111,11 +132,18 @@ public class DummyReader extends DataReader {
@Override @Override
public Object[] readArrayRecord() { public Object[] readArrayRecord() {
LocalDate jodaDate= new LocalDate(2004, 10, 19);
org.joda.time.DateTime jodaDateTime= new org.joda.time.DateTime(2004, 10, 19, 10, 23, 34);
LocalTime time= new LocalTime(11, 33, 59);
if (index < numberOfRows) { if (index < numberOfRows) {
Object[] array = new Object[] { Object[] array = new Object[] {
START + index, START + index,
(double) (START + index), (double) (START + index),
String.valueOf(START+index) }; String.valueOf(START+index),
jodaDate,
jodaDateTime,
time};
index++; index++;
return array; return array;
} else { } else {

View File

@ -458,7 +458,7 @@ public void testDateWithCSVTextInObjectArrayOut() {
dataFormat = new CSVIntermediateDataFormat(schema); dataFormat = new CSVIntermediateDataFormat(schema);
dataFormat.setCSVTextData("'2014-10-01'"); dataFormat.setCSVTextData("'2014-10-01'");
org.joda.time.LocalDate date = new org.joda.time.LocalDate(2014, 10, 01); org.joda.time.LocalDate date = new org.joda.time.LocalDate(2014, 10, 01);
assertEquals(date.toString(), dataFormat.getObjectData()[0].toString()); assertEquals(date, dataFormat.getObjectData()[0]);
} }
@Test @Test

View File

@ -35,6 +35,7 @@ public DataSet createTables() {
"id", "id",
"id", "int", "id", "int",
"country", "varchar(50)", "country", "varchar(50)",
"some_date", "date",
"city", "varchar(50)" "city", "varchar(50)"
); );
@ -43,10 +44,10 @@ public DataSet createTables() {
@Override @Override
public DataSet loadBasicData() { public DataSet loadBasicData() {
provider.insertRow(tableBaseName, 1, "USA", "San Francisco"); provider.insertRow(tableBaseName, 1, "USA", "2004-10-23","San Francisco");
provider.insertRow(tableBaseName, 2, "USA", "Sunnyvale"); provider.insertRow(tableBaseName, 2, "USA", "2004-10-24", "Sunnyvale");
provider.insertRow(tableBaseName, 3, "Czech Republic", "Brno"); provider.insertRow(tableBaseName, 3, "Czech Republic", "2004-10-25", "Brno");
provider.insertRow(tableBaseName, 4, "USA", "Palo Alto"); provider.insertRow(tableBaseName, 4, "USA", "2004-10-26", "Palo Alto");
return this; return this;
} }

View File

@ -36,10 +36,10 @@ public class FromHDFSToRDBMSTest extends ConnectorTestCase {
public void testBasic() throws Exception { public void testBasic() throws Exception {
createTableCities(); createTableCities();
createFromFile("input-0001", createFromFile("input-0001",
"1,'USA','San Francisco'", "1,'USA','2004-10-23','San Francisco'",
"2,'USA','Sunnyvale'", "2,'USA','2004-10-24','Sunnyvale'",
"3,'Czech Republic','Brno'", "3,'Czech Republic','2004-10-25','Brno'",
"4,'USA','Palo Alto'" "4,'USA','2004-10-26','Palo Alto'"
); );
// RDBMS link // RDBMS link
@ -69,10 +69,10 @@ public void testBasic() throws Exception {
executeJob(job); executeJob(job);
assertEquals(4L, rowCount()); assertEquals(4L, rowCount());
assertRowInCities(1, "USA", "San Francisco"); assertRowInCities(1, "USA", "2004-10-23", "San Francisco");
assertRowInCities(2, "USA", "Sunnyvale"); assertRowInCities(2, "USA", "2004-10-24", "Sunnyvale");
assertRowInCities(3, "Czech Republic", "Brno"); assertRowInCities(3, "Czech Republic", "2004-10-25", "Brno");
assertRowInCities(4, "USA", "Palo Alto"); assertRowInCities(4, "USA", "2004-10-26", "Palo Alto");
// Clean up testing table // Clean up testing table
dropTable(); dropTable();

View File

@ -67,10 +67,10 @@ public void testBasic() throws Exception {
// Assert correct output // Assert correct output
assertTo( assertTo(
"1,'USA','San Francisco'", "1,'USA','2004-10-23','San Francisco'",
"2,'USA','Sunnyvale'", "2,'USA','2004-10-24','Sunnyvale'",
"3,'Czech Republic','Brno'", "3,'Czech Republic','2004-10-25','Brno'",
"4,'USA','Palo Alto'" "4,'USA','2004-10-26','Palo Alto'"
); );
// Clean up testing table // Clean up testing table

View File

@ -37,10 +37,10 @@ public void testStagedTransfer() throws Exception {
final String stageTableName = "STAGE_" + getTableName(); final String stageTableName = "STAGE_" + getTableName();
createTableCities(); createTableCities();
createFromFile("input-0001", createFromFile("input-0001",
"1,'USA','San Francisco'", "1,'USA','2004-10-23','San Francisco'",
"2,'USA','Sunnyvale'", "2,'USA','2004-10-24','Sunnyvale'",
"3,'Czech Republic','Brno'", "3,'Czech Republic','2004-10-25','Brno'",
"4,'USA','Palo Alto'" "4,'USA','2004-10-26','Palo Alto'"
); );
new Cities(provider, stageTableName).createTables(); new Cities(provider, stageTableName).createTables();
@ -76,10 +76,10 @@ public void testStagedTransfer() throws Exception {
assertEquals(0L, provider.rowCount(stageTableName)); assertEquals(0L, provider.rowCount(stageTableName));
assertEquals(4L, rowCount()); assertEquals(4L, rowCount());
assertRowInCities(1, "USA", "San Francisco"); assertRowInCities(1, "USA", "2004-10-23", "San Francisco");
assertRowInCities(2, "USA", "Sunnyvale"); assertRowInCities(2, "USA", "2004-10-24", "Sunnyvale");
assertRowInCities(3, "Czech Republic", "Brno"); assertRowInCities(3, "Czech Republic", "2004-10-25", "Brno");
assertRowInCities(4, "USA", "Palo Alto"); assertRowInCities(4, "USA", "2004-10-26", "Palo Alto");
// Clean up testing table // Clean up testing table
provider.dropTable(stageTableName); provider.dropTable(stageTableName);

View File

@ -29,10 +29,10 @@
public class FromRDBMSToKafkaTest extends KafkaConnectorTestCase { public class FromRDBMSToKafkaTest extends KafkaConnectorTestCase {
private static final String[] input = { private static final String[] input = {
"1,'USA','San Francisco'", "1,'USA','2004-10-23','San Francisco'",
"2,'USA','Sunnyvale'", "2,'USA','2004-10-24','Sunnyvale'",
"3,'Czech Republic','Brno'", "3,'Czech Republic','2004-10-25','Brno'",
"4,'USA','Palo Alto'" "4,'USA','2004-10-26','Palo Alto'"
}; };
@Test @Test