5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-09 21:01:20 +08:00

SQOOP-2808: Sqoop2: Integration tests should test rows with null values

(Abraham Fine via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2016-02-02 07:54:33 -08:00
parent 118aa7c4f9
commit b71f55e4ed
13 changed files with 319 additions and 66 deletions

View File

@ -27,6 +27,7 @@
import java.sql.Statement;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.fail;
/**
@ -55,8 +56,12 @@ public static void assertRow(DatabaseProvider provider, TableName tableName, Ob
int i = 1;
for(Object expectedValue : values) {
Object actualValue = rs.getObject(i);
assertEquals(expectedValue.toString(), actualValue.toString(),
"Columns do not match on position: " + i);
if (expectedValue == null) {
assertNull(actualValue);
} else {
assertEquals(expectedValue.toString(), actualValue.toString(),
"Columns do not match on position: " + i);
}
i++;
}

View File

@ -38,6 +38,7 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.sql.Types;
import java.util.AbstractMap;
import java.util.HashSet;
import java.util.LinkedList;
@ -308,34 +309,38 @@ public void addBatch(Object[] array, Schema schema) {
try {
Column[] schemaColumns = schema.getColumnsArray();
for (int i = 0; i < array.length; i++) {
Column schemaColumn = schemaColumns[i];
switch (schemaColumn.getType()) {
case DATE:
// convert the JODA date to sql date
LocalDate date = (LocalDate) array[i];
java.sql.Date sqlDate = new java.sql.Date(date.toDateTimeAtCurrentTime().getMillis());
preparedStatement.setObject(i + 1, sqlDate);
break;
case DATE_TIME:
// convert the JODA date time to sql date
DateTime dateTime = null;
if (array[i] instanceof org.joda.time.LocalDateTime) {
dateTime = ((org.joda.time.LocalDateTime) array[i]).toDateTime();
} else {
dateTime = (DateTime) array[i];
if (array[i] == null) {
preparedStatement.setObject(i + 1, null);
} else {
Column schemaColumn = schemaColumns[i];
switch (schemaColumn.getType()) {
case DATE:
// convert the JODA date to sql date
LocalDate date = (LocalDate) array[i];
java.sql.Date sqlDate = new java.sql.Date(date.toDateTimeAtCurrentTime().getMillis());
preparedStatement.setObject(i + 1, sqlDate);
break;
case DATE_TIME:
// convert the JODA date time to sql date
DateTime dateTime = null;
if (array[i] instanceof org.joda.time.LocalDateTime) {
dateTime = ((org.joda.time.LocalDateTime) array[i]).toDateTime();
} else {
dateTime = (DateTime) array[i];
}
Timestamp timestamp = new Timestamp(dateTime.getMillis());
preparedStatement.setObject(i + 1, timestamp);
break;
case TIME:
LocalTime time = (LocalTime) array[i];
// convert the JODA time to sql date
java.sql.Time sqlTime = new java.sql.Time(time.toDateTimeToday().getMillis());
preparedStatement.setObject(i + 1, sqlTime);
break;
default:
// for anything else
preparedStatement.setObject(i + 1, array[i]);
}
Timestamp timestamp = new Timestamp(dateTime.getMillis());
preparedStatement.setObject(i + 1, timestamp);
break;
case TIME:
// convert the JODA time to sql date
LocalTime time = (LocalTime) array[i];
java.sql.Time sqlTime = new java.sql.Time(time.toDateTimeToday().getMillis());
preparedStatement.setObject(i + 1, sqlTime);
break;
default:
// for anything else
preparedStatement.setObject(i + 1, array[i]);
}
}
preparedStatement.addBatch();

View File

@ -210,12 +210,11 @@ private boolean isSequenceFile(Path file) {
private void extractRow(LinkConfiguration linkConfiguration, FromJobConfiguration fromJobConfiguration, Text line) throws UnsupportedEncodingException {
if (schema instanceof ByteArraySchema) {
dataWriter.writeArrayRecord(new Object[] {line.toString().getBytes(SqoopIDFUtils.BYTE_FIELD_CHARSET)});
} else if (!HdfsUtils.hasCustomFormat(linkConfiguration,
fromJobConfiguration)) {
} else if (!HdfsUtils.hasCustomFormat(linkConfiguration, fromJobConfiguration)) {
dataWriter.writeStringRecord(line.toString());
} else {
Object[] data = SqoopIDFUtils.fromCSV(line.toString(), schema);
dataWriter.writeArrayRecord(HdfsUtils.formatRecord(linkConfiguration, fromJobConfiguration, data));
Object[] data = SqoopIDFUtils.fromCSV(line.toString(), schema, fromJobConfiguration.fromJobConfig.nullValue);
dataWriter.writeArrayRecord(data);
}
}

View File

@ -103,12 +103,8 @@ public Void run() throws Exception {
}
} else {
Object[] record;
while ((record = reader.readArrayRecord()) != null) {
filewriter.write(
SqoopIDFUtils.toCSV(
HdfsUtils.formatRecord(linkConfiguration, toJobConfig, record),
context.getSchema()));
filewriter.write(SqoopIDFUtils.toCSV(record, context.getSchema(), toJobConfig.toJobConfig.nullValue));
rowsWritten++;
}
}

View File

@ -84,11 +84,11 @@ public void setUp() throws Exception {
FileUtils.mkdirs(inputDirectory);
switch (this.outputFileType) {
case TEXT_FILE:
createTextInput(inputDirectory, this.compressionClass, NUMBER_OF_FILES, NUMBER_OF_ROWS_PER_FILE, "%d,%f,NULL,%s,\\\\N");
createTextInput(inputDirectory, this.compressionClass, NUMBER_OF_FILES, NUMBER_OF_ROWS_PER_FILE, "%d,%f,NULL,%s,\\N");
break;
case SEQUENCE_FILE:
createSequenceInput(inputDirectory, this.compressionClass, NUMBER_OF_FILES, NUMBER_OF_ROWS_PER_FILE, "%d,%f,NULL,%s,\\\\N");
createSequenceInput(inputDirectory, this.compressionClass, NUMBER_OF_FILES, NUMBER_OF_ROWS_PER_FILE, "%d,%f,NULL,%s,\\N");
break;
}
}
@ -131,7 +131,7 @@ public void writeStringRecord(String text) {
Assert.assertEquals(String.valueOf((double) index), components[1]);
Assert.assertEquals("NULL", components[2]);
Assert.assertEquals("'" + index + "'", components[3]);
Assert.assertEquals("\\\\N", components[4]);
Assert.assertEquals("\\N", components[4]);
assertTestUser(TEST_USER);
@ -180,7 +180,7 @@ public void writeArrayRecord(Object[] array) {
Assert.assertFalse(visited[index - 1]);
Assert.assertEquals(String.valueOf((double) index), array[1].toString());
Assert.assertEquals(null, array[2]);
Assert.assertEquals("NULL", array[2]);
Assert.assertEquals(String.valueOf(index), array[3]);
Assert.assertNull(array[4]);

View File

@ -203,7 +203,7 @@ public Object readContent() {
Assert.assertEquals(1, fs.listStatus(outputPath).length);
for (FileStatus status : fs.listStatus(outputPath)) {
verifyOutput(fs, status.getPath(), "%d,%f,'\\\\N',%s");
verifyOutput(fs, status.getPath(), "%d,%f,\\N,%s");
}
loader.load(context, linkConf, jobConf);

View File

@ -63,7 +63,7 @@
@edu.umd.cs.findbugs.annotations.SuppressWarnings("PZLA_PREFER_ZERO_LENGTH_ARRAYS")
public class SqoopIDFUtils {
public static final String NULL_VALUE = "NULL";
public static final String DEFAULT_NULL_VALUE = "NULL";
// ISO-8859-1 is an 8-bit codec that is supported in every java
// implementation.
@ -578,8 +578,12 @@ public static boolean isColumnStringType(Column stringType) {
*
* @param objectArray
*/
@SuppressWarnings("unchecked")
public static String toCSV(Object[] objectArray, Schema schema) {
return toCSV(objectArray, schema, DEFAULT_NULL_VALUE);
}
@SuppressWarnings("unchecked")
public static String toCSV(Object[] objectArray, Schema schema, String nullValue) {
Column[] columns = schema.getColumnsArray();
StringBuilder csvString = new StringBuilder();
@ -589,7 +593,7 @@ public static String toCSV(Object[] objectArray, Schema schema) {
columns[i].getName() + " does not support null values");
}
if (objectArray[i] == null) {
csvString.append(NULL_VALUE);
csvString.append(nullValue);
} else {
switch (columns[i].getType()) {
case ARRAY:
@ -756,6 +760,10 @@ private static Object toObject(String csvString, Column column) {
* @return Object[]
*/
public static Object[] fromCSV(String csvText, Schema schema) {
return fromCSV(csvText, schema, DEFAULT_NULL_VALUE);
}
public static Object[] fromCSV(String csvText, Schema schema, String nullValue){
String[] csvArray = parseCSVString(csvText);
if (csvArray == null) {
@ -771,11 +779,11 @@ public static Object[] fromCSV(String csvText, Schema schema) {
Object[] objectArray = new Object[csvArray.length];
for (int i = 0; i < csvArray.length; i++) {
if (csvArray[i].equals(NULL_VALUE) && !columns[i].isNullable()) {
if (csvArray[i].equals(nullValue) && !columns[i].isNullable()) {
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
columns[i].getName() + " does not support null values");
}
if (csvArray[i].equals(NULL_VALUE)) {
if (csvArray[i].equals(nullValue)) {
objectArray[i] = null;
continue;
}

View File

@ -162,11 +162,11 @@ private GenericRecord toAVRO(String csv) {
}
GenericRecord avroObject = new GenericData.Record(avroSchema);
for (int i = 0; i < csvStringArray.length; i++) {
if (csvStringArray[i].equals(NULL_VALUE) && !columns[i].isNullable()) {
if (csvStringArray[i].equals(DEFAULT_NULL_VALUE) && !columns[i].isNullable()) {
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
columns[i].getName() + " does not support null values");
}
if (csvStringArray[i].equals(NULL_VALUE)) {
if (csvStringArray[i].equals(DEFAULT_NULL_VALUE)) {
avroObject.put(columns[i].getName(), null);
continue;
}
@ -323,7 +323,7 @@ private String toCSV(GenericRecord record) {
columns[i].getName() + " does not support null values");
}
if (obj == null) {
csvString.append(NULL_VALUE);
csvString.append(DEFAULT_NULL_VALUE);
} else {
switch (columns[i].getType()) {

View File

@ -146,12 +146,12 @@ private JSONObject toJSON(String csv) {
}
JSONObject object = new JSONObject();
for (int i = 0; i < csvStringArray.length; i++) {
if (csvStringArray[i].equals(NULL_VALUE) && !columns[i].isNullable()) {
if (csvStringArray[i].equals(DEFAULT_NULL_VALUE) && !columns[i].isNullable()) {
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
columns[i].getName() + " does not support null values");
}
// check for NULL field and bail out immediately
if (csvStringArray[i].equals(NULL_VALUE)) {
if (csvStringArray[i].equals(DEFAULT_NULL_VALUE)) {
object.put(columns[i].getName(), null);
continue;
}
@ -299,7 +299,7 @@ private String toCSV(JSONObject json) {
columns[i].getName() + " does not support null values");
}
if (obj == null) {
csvString.append(NULL_VALUE);
csvString.append(DEFAULT_NULL_VALUE);
} else {
switch (columns[i].getType()) {
case ARRAY:

View File

@ -19,7 +19,7 @@
package org.apache.sqoop.connector.idf;
import static org.apache.sqoop.connector.common.SqoopAvroUtils.createEnumSchema;
import static org.apache.sqoop.connector.common.SqoopIDFUtils.NULL_VALUE;
import static org.apache.sqoop.connector.common.SqoopIDFUtils.DEFAULT_NULL_VALUE;
import static org.apache.sqoop.connector.common.TestSqoopIDFUtils.getByteFieldString;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertEquals;
@ -419,7 +419,7 @@ public void testNullValueAsObjectArrayInAndCSVTextOut() {
String[] textValues = csvText.split(",");
assertEquals(15, textValues.length);
for (String text : textValues) {
assertEquals(text, NULL_VALUE);
assertEquals(text, DEFAULT_NULL_VALUE);
}
}
@ -459,7 +459,7 @@ public void testNullValueAsCSVTextInAndCSVTextOut() {
String[] textValues = csvText.split(",");
assertEquals(15, textValues.length);
for (String text : textValues) {
assertEquals(text, NULL_VALUE);
assertEquals(text, DEFAULT_NULL_VALUE);
}
}
@ -474,7 +474,7 @@ public void testNullValueAsDataInAndCSVTextOut() {
String[] textValues = csvText.split(",");
assertEquals(15, textValues.length);
for (String text : textValues) {
assertEquals(text, NULL_VALUE);
assertEquals(text, DEFAULT_NULL_VALUE);
}
}
@ -528,7 +528,7 @@ public void testSchemaNotNullableWithObjectArray() {
public void testSchemaNotNullableWithCSV() {
Schema overrideSchema = new Schema("Test").addColumn(new Text("one").setNullable(false));
AVROIntermediateDataFormat dataFormat = new AVROIntermediateDataFormat(overrideSchema);
dataFormat.setCSVTextData(NULL_VALUE);
dataFormat.setCSVTextData(DEFAULT_NULL_VALUE);
}
// no validation happens when the setAvro and getAvro is used

View File

@ -106,7 +106,7 @@ public void testNullValueAsObjectArrayInAndCSVTextOut() {
String[] textValues = csvText.split(",");
assertEquals(14, textValues.length);
for (String text : textValues) {
assertEquals(text, NULL_VALUE);
assertEquals(text, DEFAULT_NULL_VALUE);
}
}
@ -176,7 +176,7 @@ public void testNullValueAsCSVTextInAndCSVTextOut() {
String csvText = dataFormat.getCSVTextData();
String[] textValues = csvText.split(",");
for (String text : textValues) {
assertEquals(text, NULL_VALUE);
assertEquals(text, DEFAULT_NULL_VALUE);
}
}

View File

@ -18,7 +18,7 @@
*/
package org.apache.sqoop.connector.idf;
import static org.apache.sqoop.connector.common.SqoopIDFUtils.NULL_VALUE;
import static org.apache.sqoop.connector.common.SqoopIDFUtils.DEFAULT_NULL_VALUE;
import static org.apache.sqoop.connector.common.TestSqoopIDFUtils.getByteFieldString;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertEquals;
@ -414,7 +414,7 @@ public void testNullValueAsObjectArrayInAndCSVTextOut() {
String[] textValues = csvText.split(",");
assertEquals(15, textValues.length);
for (String text : textValues) {
assertEquals(text, NULL_VALUE);
assertEquals(text, DEFAULT_NULL_VALUE);
}
}
@ -454,7 +454,7 @@ public void testNullValueAsCSVTextInAndCSVTextOut() {
String[] textValues = csvText.split(",");
assertEquals(15, textValues.length);
for (String text : textValues) {
assertEquals(text, NULL_VALUE);
assertEquals(text, DEFAULT_NULL_VALUE);
}
}
@ -472,7 +472,7 @@ public void testNullValueAsDataInAndCSVTextOut() {
String[] textValues = csvText.split(",");
assertEquals(15, textValues.length);
for (String text : textValues) {
assertEquals(text, NULL_VALUE);
assertEquals(text, DEFAULT_NULL_VALUE);
}
}
@ -507,7 +507,7 @@ public void testSchemaNotNullableWithObjectArray() {
public void testSchemaNotNullableWithCSV() {
dataFormat = new JSONIntermediateDataFormat();
dataFormat.setSchema(new Schema("Test").addColumn(new Text("t").setNullable(false)));
dataFormat.setCSVTextData(NULL_VALUE);
dataFormat.setCSVTextData(DEFAULT_NULL_VALUE);
}
@SuppressWarnings("unchecked")

View File

@ -0,0 +1,240 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.integration.connector.hdfs;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.Iterables;
import com.google.common.collect.Multiset;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;
import org.apache.sqoop.connector.common.SqoopIDFUtils;
import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
import org.apache.sqoop.model.MDriverConfig;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MLink;
import org.apache.sqoop.test.asserts.HdfsAsserts;
import org.apache.sqoop.test.infrastructure.Infrastructure;
import org.apache.sqoop.test.infrastructure.SqoopTestCase;
import org.apache.sqoop.test.infrastructure.providers.DatabaseInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider;
import org.apache.sqoop.test.utils.HdfsUtils;
import org.apache.sqoop.test.utils.ParametrizedUtils;
import org.testng.Assert;
import org.testng.ITestContext;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Factory;
import org.testng.annotations.Test;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class})
public class NullValueTest extends SqoopTestCase {
private static final Logger LOG = Logger.getLogger(NullValueTest.class);
private ToFormat format;
// The custom nullValue to use (set to null if default)
private String nullValue;
@DataProvider(name="nul-value-test")
public static Object[][] data(ITestContext context) {
String customNullValue = "^&*custom!@";
return Iterables.toArray(
ParametrizedUtils.crossProduct(ToFormat.values(), new String[]{SqoopIDFUtils.DEFAULT_NULL_VALUE, customNullValue}),
Object[].class);
}
@Factory(dataProvider="nul-value-test")
public NullValueTest(ToFormat format, String nullValue) {
this.format = format;
this.nullValue = nullValue;
}
@Override
public String getTestName() {
return methodName + "[" + format.name() + ", " + nullValue + "]";
}
@BeforeMethod
public void setup() throws Exception {
createTableCities();
}
@AfterMethod()
public void dropTable() {
super.dropTable();
}
private boolean usingCustomNullValue() {
return nullValue != SqoopIDFUtils.DEFAULT_NULL_VALUE;
}
private String[] getCsv() {
return new String[] {
"1,'USA'," + nullValue + ",'San Francisco'",
"2,'USA','2004-10-24 00:00:00.000'," + nullValue,
"3," + nullValue + ",'2004-10-25 00:00:00.000','Brno'",
"4,'USA','2004-10-26 00:00:00.000','Palo Alto'"
};
}
@Test
public void testFromHdfs() throws Exception {
switch (format) {
case TEXT_FILE:
createFromFile("input-0001", getCsv());
break;
case SEQUENCE_FILE:
SequenceFile.Writer.Option optPath =
SequenceFile.Writer.file(new Path(HdfsUtils.joinPathFragments(getMapreduceDirectory(), "input-0001")));
SequenceFile.Writer.Option optKey = SequenceFile.Writer.keyClass(Text.class);
SequenceFile.Writer.Option optVal = SequenceFile.Writer.valueClass(NullWritable.class);
SequenceFile.Writer sequenceFileWriter =
SequenceFile.createWriter(getHadoopConf(), optPath, optKey, optVal);
for (String csv : getCsv()) {
sequenceFileWriter.append(new Text(csv), NullWritable.get());
}
sequenceFileWriter.close();
break;
default:
Assert.fail();
}
MLink hdfsLinkFrom = getClient().createLink("hdfs-connector");
fillHdfsLink(hdfsLinkFrom);
saveLink(hdfsLinkFrom);
MLink rdbmsLinkTo = getClient().createLink("generic-jdbc-connector");
fillRdbmsLinkConfig(rdbmsLinkTo);
saveLink(rdbmsLinkTo);
MJob job = getClient().createJob(hdfsLinkFrom.getName(), rdbmsLinkTo.getName());
fillHdfsFromConfig(job);
fillRdbmsToConfig(job);
if (usingCustomNullValue()) {
job.getFromJobConfig().getBooleanInput("fromJobConfig.overrideNullValue").setValue(true);
job.getFromJobConfig().getStringInput("fromJobConfig.nullValue").setValue(nullValue);
}
MDriverConfig driverConfig = job.getDriverConfig();
driverConfig.getIntegerInput("throttlingConfig.numExtractors").setValue(3);
saveJob(job);
executeJob(job);
Assert.assertEquals(4L, provider.rowCount(getTableName()));
assertRowInCities(1, "USA", null, "San Francisco");
assertRowInCities(2, "USA", Timestamp.valueOf("2004-10-24 00:00:00.000"), (String) null);
assertRowInCities(3, (String) null, Timestamp.valueOf("2004-10-25 00:00:00.000"), "Brno");
assertRowInCities(4, "USA", Timestamp.valueOf("2004-10-26 00:00:00.000"), "Palo Alto");
}
@Test
public void testToHdfs() throws Exception {
provider.insertRow(getTableName(), 1, "USA", (java.sql.Timestamp) null, "San Francisco");
provider.insertRow(getTableName(), 2, "USA", Timestamp.valueOf("2004-10-24 00:00:00.000"), (String) null);
provider.insertRow(getTableName(), 3, (String) null, Timestamp.valueOf("2004-10-25 00:00:00.000"), "Brno");
provider.insertRow(getTableName(), 4, "USA", Timestamp.valueOf("2004-10-26 00:00:00.000"), "Palo Alto");
MLink rdbmsLinkFrom = getClient().createLink("generic-jdbc-connector");
fillRdbmsLinkConfig(rdbmsLinkFrom);
saveLink(rdbmsLinkFrom);
MLink hdfsLinkTo = getClient().createLink("hdfs-connector");
fillHdfsLink(hdfsLinkTo);
saveLink(hdfsLinkTo);
MJob job = getClient().createJob(rdbmsLinkFrom.getName(), hdfsLinkTo.getName());
fillRdbmsFromConfig(job, "id");
fillHdfsToConfig(job, format);
if (usingCustomNullValue()) {
job.getToJobConfig().getBooleanInput("toJobConfig.overrideNullValue").setValue(true);
job.getToJobConfig().getStringInput("toJobConfig.nullValue").setValue(nullValue);
}
hdfsClient.mkdirs(new Path(HdfsUtils.joinPathFragments
(getMapreduceDirectory(), "TO")));
job.getToJobConfig().getStringInput("toJobConfig.outputDirectory")
.setValue(HdfsUtils.joinPathFragments(getMapreduceDirectory(), "TO"));
MDriverConfig driverConfig = job.getDriverConfig();
driverConfig.getIntegerInput("throttlingConfig.numExtractors").setValue(3);
saveJob(job);
executeJob(job);
switch (format) {
case TEXT_FILE:
HdfsAsserts.assertMapreduceOutput(hdfsClient,
HdfsUtils.joinPathFragments(getMapreduceDirectory(), "TO"), getCsv());
break;
case SEQUENCE_FILE:
Multiset<String> setLines = HashMultiset.create(Arrays.asList(getCsv()));
List<String> notFound = new ArrayList<>();
Path[] files = HdfsUtils.getOutputMapreduceFiles(hdfsClient, HdfsUtils.joinPathFragments(getMapreduceDirectory(), "TO"));
for(Path file : files) {
SequenceFile.Reader.Option optPath = SequenceFile.Reader.file(file);
SequenceFile.Reader sequenceFileReader = new SequenceFile.Reader(getHadoopConf(), optPath);
Text text = new Text();
while (sequenceFileReader.next(text)) {
if (!setLines.remove(text.toString())) {
notFound.add(text.toString());
}
}
}
if(!setLines.isEmpty() || !notFound.isEmpty()) {
LOG.error("Output do not match expectations.");
LOG.error("Expected lines that weren't present in the files:");
LOG.error("\t'" + StringUtils.join(setLines, "'\n\t'") + "'");
LOG.error("Extra lines in files that weren't expected:");
LOG.error("\t'" + StringUtils.join(notFound, "'\n\t'") + "'");
Assert.fail("Output do not match expectations.");
}
break;
default:
Assert.fail();
}
}
}