From b71f55e4edfc4d3f07067957c3cdfdd3e426e312 Mon Sep 17 00:00:00 2001 From: Jarek Jarcec Cecho Date: Tue, 2 Feb 2016 07:54:33 -0800 Subject: [PATCH] SQOOP-2808: Sqoop2: Integration tests should test rows with null values (Abraham Fine via Jarek Jarcec Cecho) --- .../common/test/asserts/ProviderAsserts.java | 9 +- .../connector/jdbc/GenericJdbcExecutor.java | 59 +++-- .../sqoop/connector/hdfs/HdfsExtractor.java | 7 +- .../sqoop/connector/hdfs/HdfsLoader.java | 6 +- .../sqoop/connector/hdfs/TestExtractor.java | 8 +- .../sqoop/connector/hdfs/TestLoader.java | 2 +- .../sqoop/connector/common/SqoopIDFUtils.java | 18 +- .../idf/AVROIntermediateDataFormat.java | 6 +- .../idf/JSONIntermediateDataFormat.java | 6 +- .../idf/TestAVROIntermediateDataFormat.java | 10 +- .../idf/TestCSVIntermediateDataFormat.java | 4 +- .../idf/TestJSONIntermediateDataFormat.java | 10 +- .../connector/hdfs/NullValueTest.java | 240 ++++++++++++++++++ 13 files changed, 319 insertions(+), 66 deletions(-) create mode 100644 test/src/test/java/org/apache/sqoop/integration/connector/hdfs/NullValueTest.java diff --git a/common-test/src/main/java/org/apache/sqoop/common/test/asserts/ProviderAsserts.java b/common-test/src/main/java/org/apache/sqoop/common/test/asserts/ProviderAsserts.java index 4e1ef6a2..ae1b60dd 100644 --- a/common-test/src/main/java/org/apache/sqoop/common/test/asserts/ProviderAsserts.java +++ b/common-test/src/main/java/org/apache/sqoop/common/test/asserts/ProviderAsserts.java @@ -27,6 +27,7 @@ import java.sql.Statement; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNull; import static org.testng.Assert.fail; /** @@ -55,8 +56,12 @@ public static void assertRow(DatabaseProvider provider, TableName tableName, Ob int i = 1; for(Object expectedValue : values) { Object actualValue = rs.getObject(i); - assertEquals(expectedValue.toString(), actualValue.toString(), - "Columns do not match on position: " + i); + if (expectedValue == null) { + assertNull(actualValue); + } else { + assertEquals(expectedValue.toString(), actualValue.toString(), + "Columns do not match on position: " + i); + } i++; } diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java index 7c943c2b..134b826f 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java @@ -38,6 +38,7 @@ import java.sql.SQLException; import java.sql.Statement; import java.sql.Timestamp; +import java.sql.Types; import java.util.AbstractMap; import java.util.HashSet; import java.util.LinkedList; @@ -308,34 +309,38 @@ public void addBatch(Object[] array, Schema schema) { try { Column[] schemaColumns = schema.getColumnsArray(); for (int i = 0; i < array.length; i++) { - Column schemaColumn = schemaColumns[i]; - switch (schemaColumn.getType()) { - case DATE: - // convert the JODA date to sql date - LocalDate date = (LocalDate) array[i]; - java.sql.Date sqlDate = new java.sql.Date(date.toDateTimeAtCurrentTime().getMillis()); - preparedStatement.setObject(i + 1, sqlDate); - break; - case DATE_TIME: - // convert the JODA date time to sql date - DateTime dateTime = null; - if (array[i] instanceof org.joda.time.LocalDateTime) { - dateTime = ((org.joda.time.LocalDateTime) array[i]).toDateTime(); - } else { - dateTime = (DateTime) array[i]; + if (array[i] == null) { + preparedStatement.setObject(i + 1, null); + } else { + Column schemaColumn = schemaColumns[i]; + switch (schemaColumn.getType()) { + case DATE: + // convert the JODA date to sql date + LocalDate date = (LocalDate) array[i]; + java.sql.Date sqlDate = new java.sql.Date(date.toDateTimeAtCurrentTime().getMillis()); + preparedStatement.setObject(i + 1, sqlDate); + break; + case DATE_TIME: + // convert the JODA date time to sql date + DateTime dateTime = null; + if (array[i] instanceof org.joda.time.LocalDateTime) { + dateTime = ((org.joda.time.LocalDateTime) array[i]).toDateTime(); + } else { + dateTime = (DateTime) array[i]; + } + Timestamp timestamp = new Timestamp(dateTime.getMillis()); + preparedStatement.setObject(i + 1, timestamp); + break; + case TIME: + LocalTime time = (LocalTime) array[i]; + // convert the JODA time to sql date + java.sql.Time sqlTime = new java.sql.Time(time.toDateTimeToday().getMillis()); + preparedStatement.setObject(i + 1, sqlTime); + break; + default: + // for anything else + preparedStatement.setObject(i + 1, array[i]); } - Timestamp timestamp = new Timestamp(dateTime.getMillis()); - preparedStatement.setObject(i + 1, timestamp); - break; - case TIME: - // convert the JODA time to sql date - LocalTime time = (LocalTime) array[i]; - java.sql.Time sqlTime = new java.sql.Time(time.toDateTimeToday().getMillis()); - preparedStatement.setObject(i + 1, sqlTime); - break; - default: - // for anything else - preparedStatement.setObject(i + 1, array[i]); } } preparedStatement.addBatch(); diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java index b430739a..9ef2a051 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java @@ -210,12 +210,11 @@ private boolean isSequenceFile(Path file) { private void extractRow(LinkConfiguration linkConfiguration, FromJobConfiguration fromJobConfiguration, Text line) throws UnsupportedEncodingException { if (schema instanceof ByteArraySchema) { dataWriter.writeArrayRecord(new Object[] {line.toString().getBytes(SqoopIDFUtils.BYTE_FIELD_CHARSET)}); - } else if (!HdfsUtils.hasCustomFormat(linkConfiguration, - fromJobConfiguration)) { + } else if (!HdfsUtils.hasCustomFormat(linkConfiguration, fromJobConfiguration)) { dataWriter.writeStringRecord(line.toString()); } else { - Object[] data = SqoopIDFUtils.fromCSV(line.toString(), schema); - dataWriter.writeArrayRecord(HdfsUtils.formatRecord(linkConfiguration, fromJobConfiguration, data)); + Object[] data = SqoopIDFUtils.fromCSV(line.toString(), schema, fromJobConfiguration.fromJobConfig.nullValue); + dataWriter.writeArrayRecord(data); } } diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java index 774221aa..5de20c62 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java @@ -103,12 +103,8 @@ public Void run() throws Exception { } } else { Object[] record; - while ((record = reader.readArrayRecord()) != null) { - filewriter.write( - SqoopIDFUtils.toCSV( - HdfsUtils.formatRecord(linkConfiguration, toJobConfig, record), - context.getSchema())); + filewriter.write(SqoopIDFUtils.toCSV(record, context.getSchema(), toJobConfig.toJobConfig.nullValue)); rowsWritten++; } } diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java index 7d2177fd..4e1d5c7d 100644 --- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java +++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java @@ -84,11 +84,11 @@ public void setUp() throws Exception { FileUtils.mkdirs(inputDirectory); switch (this.outputFileType) { case TEXT_FILE: - createTextInput(inputDirectory, this.compressionClass, NUMBER_OF_FILES, NUMBER_OF_ROWS_PER_FILE, "%d,%f,NULL,%s,\\\\N"); + createTextInput(inputDirectory, this.compressionClass, NUMBER_OF_FILES, NUMBER_OF_ROWS_PER_FILE, "%d,%f,NULL,%s,\\N"); break; case SEQUENCE_FILE: - createSequenceInput(inputDirectory, this.compressionClass, NUMBER_OF_FILES, NUMBER_OF_ROWS_PER_FILE, "%d,%f,NULL,%s,\\\\N"); + createSequenceInput(inputDirectory, this.compressionClass, NUMBER_OF_FILES, NUMBER_OF_ROWS_PER_FILE, "%d,%f,NULL,%s,\\N"); break; } } @@ -131,7 +131,7 @@ public void writeStringRecord(String text) { Assert.assertEquals(String.valueOf((double) index), components[1]); Assert.assertEquals("NULL", components[2]); Assert.assertEquals("'" + index + "'", components[3]); - Assert.assertEquals("\\\\N", components[4]); + Assert.assertEquals("\\N", components[4]); assertTestUser(TEST_USER); @@ -180,7 +180,7 @@ public void writeArrayRecord(Object[] array) { Assert.assertFalse(visited[index - 1]); Assert.assertEquals(String.valueOf((double) index), array[1].toString()); - Assert.assertEquals(null, array[2]); + Assert.assertEquals("NULL", array[2]); Assert.assertEquals(String.valueOf(index), array[3]); Assert.assertNull(array[4]); diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java index 11fcef2a..adede3a7 100644 --- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java +++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java @@ -203,7 +203,7 @@ public Object readContent() { Assert.assertEquals(1, fs.listStatus(outputPath).length); for (FileStatus status : fs.listStatus(outputPath)) { - verifyOutput(fs, status.getPath(), "%d,%f,'\\\\N',%s"); + verifyOutput(fs, status.getPath(), "%d,%f,\\N,%s"); } loader.load(context, linkConf, jobConf); diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java index 9b0885ad..fc251003 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java @@ -63,7 +63,7 @@ @edu.umd.cs.findbugs.annotations.SuppressWarnings("PZLA_PREFER_ZERO_LENGTH_ARRAYS") public class SqoopIDFUtils { - public static final String NULL_VALUE = "NULL"; + public static final String DEFAULT_NULL_VALUE = "NULL"; // ISO-8859-1 is an 8-bit codec that is supported in every java // implementation. @@ -578,8 +578,12 @@ public static boolean isColumnStringType(Column stringType) { * * @param objectArray */ - @SuppressWarnings("unchecked") public static String toCSV(Object[] objectArray, Schema schema) { + return toCSV(objectArray, schema, DEFAULT_NULL_VALUE); + } + + @SuppressWarnings("unchecked") + public static String toCSV(Object[] objectArray, Schema schema, String nullValue) { Column[] columns = schema.getColumnsArray(); StringBuilder csvString = new StringBuilder(); @@ -589,7 +593,7 @@ public static String toCSV(Object[] objectArray, Schema schema) { columns[i].getName() + " does not support null values"); } if (objectArray[i] == null) { - csvString.append(NULL_VALUE); + csvString.append(nullValue); } else { switch (columns[i].getType()) { case ARRAY: @@ -756,6 +760,10 @@ private static Object toObject(String csvString, Column column) { * @return Object[] */ public static Object[] fromCSV(String csvText, Schema schema) { + return fromCSV(csvText, schema, DEFAULT_NULL_VALUE); + } + + public static Object[] fromCSV(String csvText, Schema schema, String nullValue){ String[] csvArray = parseCSVString(csvText); if (csvArray == null) { @@ -771,11 +779,11 @@ public static Object[] fromCSV(String csvText, Schema schema) { Object[] objectArray = new Object[csvArray.length]; for (int i = 0; i < csvArray.length; i++) { - if (csvArray[i].equals(NULL_VALUE) && !columns[i].isNullable()) { + if (csvArray[i].equals(nullValue) && !columns[i].isNullable()) { throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005, columns[i].getName() + " does not support null values"); } - if (csvArray[i].equals(NULL_VALUE)) { + if (csvArray[i].equals(nullValue)) { objectArray[i] = null; continue; } diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java index d78fa8b7..ace1bdf5 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java @@ -162,11 +162,11 @@ private GenericRecord toAVRO(String csv) { } GenericRecord avroObject = new GenericData.Record(avroSchema); for (int i = 0; i < csvStringArray.length; i++) { - if (csvStringArray[i].equals(NULL_VALUE) && !columns[i].isNullable()) { + if (csvStringArray[i].equals(DEFAULT_NULL_VALUE) && !columns[i].isNullable()) { throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005, columns[i].getName() + " does not support null values"); } - if (csvStringArray[i].equals(NULL_VALUE)) { + if (csvStringArray[i].equals(DEFAULT_NULL_VALUE)) { avroObject.put(columns[i].getName(), null); continue; } @@ -323,7 +323,7 @@ private String toCSV(GenericRecord record) { columns[i].getName() + " does not support null values"); } if (obj == null) { - csvString.append(NULL_VALUE); + csvString.append(DEFAULT_NULL_VALUE); } else { switch (columns[i].getType()) { diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java index 8db4d3da..078b598b 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java @@ -146,12 +146,12 @@ private JSONObject toJSON(String csv) { } JSONObject object = new JSONObject(); for (int i = 0; i < csvStringArray.length; i++) { - if (csvStringArray[i].equals(NULL_VALUE) && !columns[i].isNullable()) { + if (csvStringArray[i].equals(DEFAULT_NULL_VALUE) && !columns[i].isNullable()) { throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005, columns[i].getName() + " does not support null values"); } // check for NULL field and bail out immediately - if (csvStringArray[i].equals(NULL_VALUE)) { + if (csvStringArray[i].equals(DEFAULT_NULL_VALUE)) { object.put(columns[i].getName(), null); continue; } @@ -299,7 +299,7 @@ private String toCSV(JSONObject json) { columns[i].getName() + " does not support null values"); } if (obj == null) { - csvString.append(NULL_VALUE); + csvString.append(DEFAULT_NULL_VALUE); } else { switch (columns[i].getType()) { case ARRAY: diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestAVROIntermediateDataFormat.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestAVROIntermediateDataFormat.java index 703ed0a0..84757207 100644 --- a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestAVROIntermediateDataFormat.java +++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestAVROIntermediateDataFormat.java @@ -19,7 +19,7 @@ package org.apache.sqoop.connector.idf; import static org.apache.sqoop.connector.common.SqoopAvroUtils.createEnumSchema; -import static org.apache.sqoop.connector.common.SqoopIDFUtils.NULL_VALUE; +import static org.apache.sqoop.connector.common.SqoopIDFUtils.DEFAULT_NULL_VALUE; import static org.apache.sqoop.connector.common.TestSqoopIDFUtils.getByteFieldString; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertEquals; @@ -419,7 +419,7 @@ public void testNullValueAsObjectArrayInAndCSVTextOut() { String[] textValues = csvText.split(","); assertEquals(15, textValues.length); for (String text : textValues) { - assertEquals(text, NULL_VALUE); + assertEquals(text, DEFAULT_NULL_VALUE); } } @@ -459,7 +459,7 @@ public void testNullValueAsCSVTextInAndCSVTextOut() { String[] textValues = csvText.split(","); assertEquals(15, textValues.length); for (String text : textValues) { - assertEquals(text, NULL_VALUE); + assertEquals(text, DEFAULT_NULL_VALUE); } } @@ -474,7 +474,7 @@ public void testNullValueAsDataInAndCSVTextOut() { String[] textValues = csvText.split(","); assertEquals(15, textValues.length); for (String text : textValues) { - assertEquals(text, NULL_VALUE); + assertEquals(text, DEFAULT_NULL_VALUE); } } @@ -528,7 +528,7 @@ public void testSchemaNotNullableWithObjectArray() { public void testSchemaNotNullableWithCSV() { Schema overrideSchema = new Schema("Test").addColumn(new Text("one").setNullable(false)); AVROIntermediateDataFormat dataFormat = new AVROIntermediateDataFormat(overrideSchema); - dataFormat.setCSVTextData(NULL_VALUE); + dataFormat.setCSVTextData(DEFAULT_NULL_VALUE); } // no validation happens when the setAvro and getAvro is used diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java index 040dbfcc..e82d8172 100644 --- a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java +++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java @@ -106,7 +106,7 @@ public void testNullValueAsObjectArrayInAndCSVTextOut() { String[] textValues = csvText.split(","); assertEquals(14, textValues.length); for (String text : textValues) { - assertEquals(text, NULL_VALUE); + assertEquals(text, DEFAULT_NULL_VALUE); } } @@ -176,7 +176,7 @@ public void testNullValueAsCSVTextInAndCSVTextOut() { String csvText = dataFormat.getCSVTextData(); String[] textValues = csvText.split(","); for (String text : textValues) { - assertEquals(text, NULL_VALUE); + assertEquals(text, DEFAULT_NULL_VALUE); } } diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestJSONIntermediateDataFormat.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestJSONIntermediateDataFormat.java index bcc1f959..09c4a117 100644 --- a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestJSONIntermediateDataFormat.java +++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestJSONIntermediateDataFormat.java @@ -18,7 +18,7 @@ */ package org.apache.sqoop.connector.idf; -import static org.apache.sqoop.connector.common.SqoopIDFUtils.NULL_VALUE; +import static org.apache.sqoop.connector.common.SqoopIDFUtils.DEFAULT_NULL_VALUE; import static org.apache.sqoop.connector.common.TestSqoopIDFUtils.getByteFieldString; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertEquals; @@ -414,7 +414,7 @@ public void testNullValueAsObjectArrayInAndCSVTextOut() { String[] textValues = csvText.split(","); assertEquals(15, textValues.length); for (String text : textValues) { - assertEquals(text, NULL_VALUE); + assertEquals(text, DEFAULT_NULL_VALUE); } } @@ -454,7 +454,7 @@ public void testNullValueAsCSVTextInAndCSVTextOut() { String[] textValues = csvText.split(","); assertEquals(15, textValues.length); for (String text : textValues) { - assertEquals(text, NULL_VALUE); + assertEquals(text, DEFAULT_NULL_VALUE); } } @@ -472,7 +472,7 @@ public void testNullValueAsDataInAndCSVTextOut() { String[] textValues = csvText.split(","); assertEquals(15, textValues.length); for (String text : textValues) { - assertEquals(text, NULL_VALUE); + assertEquals(text, DEFAULT_NULL_VALUE); } } @@ -507,7 +507,7 @@ public void testSchemaNotNullableWithObjectArray() { public void testSchemaNotNullableWithCSV() { dataFormat = new JSONIntermediateDataFormat(); dataFormat.setSchema(new Schema("Test").addColumn(new Text("t").setNullable(false))); - dataFormat.setCSVTextData(NULL_VALUE); + dataFormat.setCSVTextData(DEFAULT_NULL_VALUE); } @SuppressWarnings("unchecked") diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/NullValueTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/NullValueTest.java new file mode 100644 index 00000000..43638143 --- /dev/null +++ b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/NullValueTest.java @@ -0,0 +1,240 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.integration.connector.hdfs; + +import com.google.common.collect.HashMultiset; +import com.google.common.collect.Iterables; +import com.google.common.collect.Multiset; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; +import org.apache.sqoop.connector.common.SqoopIDFUtils; +import org.apache.sqoop.connector.hdfs.configuration.ToFormat; +import org.apache.sqoop.model.MDriverConfig; +import org.apache.sqoop.model.MJob; +import org.apache.sqoop.model.MLink; +import org.apache.sqoop.test.asserts.HdfsAsserts; +import org.apache.sqoop.test.infrastructure.Infrastructure; +import org.apache.sqoop.test.infrastructure.SqoopTestCase; +import org.apache.sqoop.test.infrastructure.providers.DatabaseInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider; +import org.apache.sqoop.test.utils.HdfsUtils; +import org.apache.sqoop.test.utils.ParametrizedUtils; +import org.testng.Assert; +import org.testng.ITestContext; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Factory; +import org.testng.annotations.Test; + +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class}) +public class NullValueTest extends SqoopTestCase { + + private static final Logger LOG = Logger.getLogger(NullValueTest.class); + + + private ToFormat format; + + // The custom nullValue to use (set to null if default) + private String nullValue; + + @DataProvider(name="nul-value-test") + public static Object[][] data(ITestContext context) { + String customNullValue = "^&*custom!@"; + + return Iterables.toArray( + ParametrizedUtils.crossProduct(ToFormat.values(), new String[]{SqoopIDFUtils.DEFAULT_NULL_VALUE, customNullValue}), + Object[].class); + } + + @Factory(dataProvider="nul-value-test") + public NullValueTest(ToFormat format, String nullValue) { + this.format = format; + this.nullValue = nullValue; + } + + @Override + public String getTestName() { + return methodName + "[" + format.name() + ", " + nullValue + "]"; + } + + @BeforeMethod + public void setup() throws Exception { + createTableCities(); + } + + @AfterMethod() + public void dropTable() { + super.dropTable(); + } + + private boolean usingCustomNullValue() { + return nullValue != SqoopIDFUtils.DEFAULT_NULL_VALUE; + } + + private String[] getCsv() { + return new String[] { + "1,'USA'," + nullValue + ",'San Francisco'", + "2,'USA','2004-10-24 00:00:00.000'," + nullValue, + "3," + nullValue + ",'2004-10-25 00:00:00.000','Brno'", + "4,'USA','2004-10-26 00:00:00.000','Palo Alto'" + }; + } + + @Test + public void testFromHdfs() throws Exception { + switch (format) { + case TEXT_FILE: + createFromFile("input-0001", getCsv()); + + break; + case SEQUENCE_FILE: + SequenceFile.Writer.Option optPath = + SequenceFile.Writer.file(new Path(HdfsUtils.joinPathFragments(getMapreduceDirectory(), "input-0001"))); + SequenceFile.Writer.Option optKey = SequenceFile.Writer.keyClass(Text.class); + SequenceFile.Writer.Option optVal = SequenceFile.Writer.valueClass(NullWritable.class); + + + SequenceFile.Writer sequenceFileWriter = + SequenceFile.createWriter(getHadoopConf(), optPath, optKey, optVal); + for (String csv : getCsv()) { + sequenceFileWriter.append(new Text(csv), NullWritable.get()); + } + sequenceFileWriter.close(); + break; + default: + Assert.fail(); + } + + MLink hdfsLinkFrom = getClient().createLink("hdfs-connector"); + fillHdfsLink(hdfsLinkFrom); + saveLink(hdfsLinkFrom); + + MLink rdbmsLinkTo = getClient().createLink("generic-jdbc-connector"); + fillRdbmsLinkConfig(rdbmsLinkTo); + saveLink(rdbmsLinkTo); + + MJob job = getClient().createJob(hdfsLinkFrom.getName(), rdbmsLinkTo.getName()); + + fillHdfsFromConfig(job); + fillRdbmsToConfig(job); + + if (usingCustomNullValue()) { + job.getFromJobConfig().getBooleanInput("fromJobConfig.overrideNullValue").setValue(true); + job.getFromJobConfig().getStringInput("fromJobConfig.nullValue").setValue(nullValue); + } + + + MDriverConfig driverConfig = job.getDriverConfig(); + driverConfig.getIntegerInput("throttlingConfig.numExtractors").setValue(3); + saveJob(job); + + executeJob(job); + + Assert.assertEquals(4L, provider.rowCount(getTableName())); + assertRowInCities(1, "USA", null, "San Francisco"); + assertRowInCities(2, "USA", Timestamp.valueOf("2004-10-24 00:00:00.000"), (String) null); + assertRowInCities(3, (String) null, Timestamp.valueOf("2004-10-25 00:00:00.000"), "Brno"); + assertRowInCities(4, "USA", Timestamp.valueOf("2004-10-26 00:00:00.000"), "Palo Alto"); + } + + @Test + public void testToHdfs() throws Exception { + provider.insertRow(getTableName(), 1, "USA", (java.sql.Timestamp) null, "San Francisco"); + provider.insertRow(getTableName(), 2, "USA", Timestamp.valueOf("2004-10-24 00:00:00.000"), (String) null); + provider.insertRow(getTableName(), 3, (String) null, Timestamp.valueOf("2004-10-25 00:00:00.000"), "Brno"); + provider.insertRow(getTableName(), 4, "USA", Timestamp.valueOf("2004-10-26 00:00:00.000"), "Palo Alto"); + + MLink rdbmsLinkFrom = getClient().createLink("generic-jdbc-connector"); + fillRdbmsLinkConfig(rdbmsLinkFrom); + saveLink(rdbmsLinkFrom); + + + MLink hdfsLinkTo = getClient().createLink("hdfs-connector"); + fillHdfsLink(hdfsLinkTo); + saveLink(hdfsLinkTo); + + MJob job = getClient().createJob(rdbmsLinkFrom.getName(), hdfsLinkTo.getName()); + + fillRdbmsFromConfig(job, "id"); + fillHdfsToConfig(job, format); + + if (usingCustomNullValue()) { + job.getToJobConfig().getBooleanInput("toJobConfig.overrideNullValue").setValue(true); + job.getToJobConfig().getStringInput("toJobConfig.nullValue").setValue(nullValue); + } + + hdfsClient.mkdirs(new Path(HdfsUtils.joinPathFragments + (getMapreduceDirectory(), "TO"))); + + job.getToJobConfig().getStringInput("toJobConfig.outputDirectory") + .setValue(HdfsUtils.joinPathFragments(getMapreduceDirectory(), "TO")); + + + MDriverConfig driverConfig = job.getDriverConfig(); + driverConfig.getIntegerInput("throttlingConfig.numExtractors").setValue(3); + saveJob(job); + + executeJob(job); + + switch (format) { + case TEXT_FILE: + HdfsAsserts.assertMapreduceOutput(hdfsClient, + HdfsUtils.joinPathFragments(getMapreduceDirectory(), "TO"), getCsv()); + break; + case SEQUENCE_FILE: + Multiset setLines = HashMultiset.create(Arrays.asList(getCsv())); + List notFound = new ArrayList<>(); + Path[] files = HdfsUtils.getOutputMapreduceFiles(hdfsClient, HdfsUtils.joinPathFragments(getMapreduceDirectory(), "TO")); + + for(Path file : files) { + SequenceFile.Reader.Option optPath = SequenceFile.Reader.file(file); + SequenceFile.Reader sequenceFileReader = new SequenceFile.Reader(getHadoopConf(), optPath); + + Text text = new Text(); + while (sequenceFileReader.next(text)) { + if (!setLines.remove(text.toString())) { + notFound.add(text.toString()); + } + } + } + if(!setLines.isEmpty() || !notFound.isEmpty()) { + LOG.error("Output do not match expectations."); + LOG.error("Expected lines that weren't present in the files:"); + LOG.error("\t'" + StringUtils.join(setLines, "'\n\t'") + "'"); + LOG.error("Extra lines in files that weren't expected:"); + LOG.error("\t'" + StringUtils.join(notFound, "'\n\t'") + "'"); + Assert.fail("Output do not match expectations."); + } + break; + default: + Assert.fail(); + } + } +}