5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-04 06:22:46 +08:00

SQOOP-1094: Add Avro support to merge tool

(Yibing Shi via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2015-07-24 09:05:23 -07:00
parent d2c062b202
commit b8df3a8033
8 changed files with 426 additions and 121 deletions

View File

@ -18,14 +18,26 @@
package org.apache.sqoop.avro;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.FileReader;
import org.apache.avro.file.SeekableInput;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.mapred.FsInput;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.BytesWritable;
import org.apache.sqoop.lib.BlobRef;
import org.apache.sqoop.lib.ClobRef;
import org.apache.sqoop.orm.ClassWriter;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.sql.Date;
@ -184,4 +196,35 @@ public static Object fromAvro(Object avroObject, Schema schema, String type) {
}
}
/**
* Get the schema of AVRO files stored in a directory
*/
public static Schema getAvroSchema(Path path, Configuration conf)
throws IOException {
FileSystem fs = path.getFileSystem(conf);
Path fileToTest;
if (fs.isDirectory(path)) {
FileStatus[] fileStatuses = fs.listStatus(path, new PathFilter() {
@Override
public boolean accept(Path p) {
String name = p.getName();
return !name.startsWith("_") && !name.startsWith(".");
}
});
if (fileStatuses.length == 0) {
return null;
}
fileToTest = fileStatuses[0].getPath();
} else {
fileToTest = path;
}
SeekableInput input = new FsInput(fileToTest, conf);
DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>();
FileReader<GenericRecord> fileReader = DataFileReader.openReader(input, reader);
Schema result = fileReader.getSchema();
fileReader.close();
return result;
}
}

View File

@ -42,6 +42,11 @@ public static Schema getMapOutputSchema(Configuration job) {
return Schema.parse(job.get(MAP_OUTPUT_SCHEMA, job.get(OUTPUT_SCHEMA)));
}
/** Set a job's output key schema. */
public static void setOutputSchema(Configuration job, Schema s) {
job.set(OUTPUT_SCHEMA, s.toString());
}
/** Return a job's output key schema. */
public static Schema getOutputSchema(Configuration job) {
return Schema.parse(job.get(OUTPUT_SCHEMA));

View File

@ -0,0 +1,89 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.AvroWrapper;
import org.apache.avro.mapred.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.sqoop.avro.AvroUtil;
import com.cloudera.sqoop.lib.SqoopRecord;
/**
* Mapper for the merge program which operates on AVRO data files.
*/
public class MergeAvroMapper
extends
MergeMapperBase<AvroWrapper<GenericRecord>, NullWritable> {
private Map<String, Pair<String, String>> sqoopRecordFields = new HashMap<String, Pair<String, String>>();
private SqoopRecord sqoopRecordImpl;
@Override
protected void setup(Context context) throws InterruptedException, IOException {
super.setup(context);
Configuration conf = context.getConfiguration();
final String userClassName = conf.get(MergeJob.MERGE_SQOOP_RECORD_KEY);
try {
final Class<? extends Object> clazz = Class.forName(userClassName, true,
Thread.currentThread().getContextClassLoader());
sqoopRecordImpl = (SqoopRecord) ReflectionUtils.newInstance(clazz, conf);
for (final Field field : clazz.getDeclaredFields()) {
final String fieldName = field.getName();
final String fieldTypeName = field.getType().getName();
sqoopRecordFields.put(fieldName.toLowerCase(), new Pair<String, String>(fieldName,
fieldTypeName));
}
} catch (ClassNotFoundException e) {
throw new IOException("Cannot find the user record class with class name"
+ userClassName, e);
}
}
@Override
public void map(AvroWrapper<GenericRecord> key, NullWritable val, Context c)
throws IOException, InterruptedException {
processRecord(toSqoopRecord(key.datum()), c);
}
private SqoopRecord toSqoopRecord(GenericRecord genericRecord) throws IOException {
Schema avroSchema = genericRecord.getSchema();
for (Schema.Field field : avroSchema.getFields()) {
Pair<String, String> sqoopRecordField = sqoopRecordFields.get(field.name().toLowerCase());
if (null == sqoopRecordField) {
throw new IOException("Cannot find field '" + field.name() + "' in fields of user class"
+ sqoopRecordImpl.getClass().getName() + ". Fields are: "
+ Arrays.deepToString(sqoopRecordFields.values().toArray()));
}
Object avroObject = genericRecord.get(field.name());
Object fieldVal = AvroUtil.fromAvro(avroObject, field.schema(), sqoopRecordField.value());
sqoopRecordImpl.setField(sqoopRecordField.key(), fieldVal);
}
return sqoopRecordImpl;
}
}

View File

@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce;
import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.AvroWrapper;
import org.apache.hadoop.io.NullWritable;
import org.apache.sqoop.avro.AvroUtil;
import com.cloudera.sqoop.lib.SqoopRecord;
public class MergeAvroReducer extends MergeReducerBase<AvroWrapper<GenericRecord>, NullWritable> {
private AvroWrapper<GenericRecord> wrapper;
private Schema schema;
private boolean bigDecimalFormatString;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
wrapper = new AvroWrapper<GenericRecord>();
schema = AvroJob.getOutputSchema(context.getConfiguration());
bigDecimalFormatString = context.getConfiguration().getBoolean(
ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT, ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT_DEFAULT);
}
@Override
protected void writeRecord(SqoopRecord record, Context context)
throws IOException, InterruptedException {
GenericRecord outKey = AvroUtil.toGenericRecord(record.getFieldMap(), schema,
bigDecimalFormatString);
wrapper.datum(outKey);
context.write(wrapper, NullWritable.get());
}
}

View File

@ -20,9 +20,19 @@
import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.FileReader;
import org.apache.avro.file.SeekableInput;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.mapred.FsInput;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
@ -30,7 +40,10 @@
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.sqoop.avro.AvroUtil;
import org.apache.sqoop.mapreduce.ExportJobBase.FileType;
import org.apache.sqoop.util.Jars;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.mapreduce.JobBase;
@ -114,20 +127,26 @@ public boolean runMergeJob() throws IOException {
FileOutputFormat.setOutputPath(job, new Path(options.getTargetDir()));
if (ExportJobBase.isSequenceFiles(jobConf, newPath)) {
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setMapperClass(MergeRecordMapper.class);
} else {
job.setMapperClass(MergeTextMapper.class);
job.setOutputFormatClass(RawKeyTextOutputFormat.class);
FileType fileType = ExportJobBase.getFileType(jobConf, oldPath);
switch (fileType) {
case AVRO_DATA_FILE:
configueAvroMergeJob(conf, job, oldPath, newPath);
break;
case SEQUENCE_FILE:
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setMapperClass(MergeRecordMapper.class);
job.setReducerClass(MergeReducer.class);
break;
default:
job.setMapperClass(MergeTextMapper.class);
job.setOutputFormatClass(RawKeyTextOutputFormat.class);
job.setReducerClass(MergeReducer.class);
}
jobConf.set("mapred.output.key.class", userClassName);
job.setOutputValueClass(NullWritable.class);
job.setReducerClass(MergeReducer.class);
// Set the intermediate data types.
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(MergeRecord.class);
@ -142,6 +161,23 @@ public boolean runMergeJob() throws IOException {
throw new IOException(cnfe);
}
}
private void configueAvroMergeJob(Configuration conf, Job job, Path oldPath, Path newPath)
throws IOException {
LOG.info("Trying to merge avro files");
final Schema oldPathSchema = AvroUtil.getAvroSchema(oldPath, conf);
final Schema newPathSchema = AvroUtil.getAvroSchema(newPath, conf);
if (oldPathSchema == null || newPathSchema == null || !oldPathSchema.equals(newPathSchema)) {
throw new IOException("Invalid schema for input directories. Schema for old data: ["
+ oldPathSchema + "]. Schema for new data: [" + newPathSchema + "]");
}
LOG.debug("Avro Schema:" + oldPathSchema);
job.setInputFormatClass(AvroInputFormat.class);
job.setOutputFormatClass(AvroOutputFormat.class);
job.setMapperClass(MergeAvroMapper.class);
job.setReducerClass(MergeAvroReducer.class);
AvroJob.setOutputSchema(job.getConfiguration(), oldPathSchema);
}
}

View File

@ -29,28 +29,12 @@
* a new one if possible; otherwise, an old one.
*/
public class MergeReducer
extends Reducer<Text, MergeRecord, SqoopRecord, NullWritable> {
extends MergeReducerBase<SqoopRecord, NullWritable> {
@Override
public void reduce(Text key, Iterable<MergeRecord> vals, Context c)
protected void writeRecord(SqoopRecord record, Context c)
throws IOException, InterruptedException {
SqoopRecord bestRecord = null;
try {
for (MergeRecord val : vals) {
if (null == bestRecord && !val.isNewRecord()) {
// Use an old record if we don't have a new record.
bestRecord = (SqoopRecord) val.getSqoopRecord().clone();
} else if (val.isNewRecord()) {
bestRecord = (SqoopRecord) val.getSqoopRecord().clone();
}
}
} catch (CloneNotSupportedException cnse) {
throw new IOException(cnse);
}
if (null != bestRecord) {
c.write(bestRecord, NullWritable.get());
}
c.write(record, NullWritable.get());
}
}

View File

@ -0,0 +1,55 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import com.cloudera.sqoop.lib.SqoopRecord;
public abstract class MergeReducerBase<KEYOUT, VALUEOUT> extends
Reducer<Text, MergeRecord, KEYOUT, VALUEOUT> {
@Override
public void reduce(Text key, Iterable<MergeRecord> vals, Context c)
throws IOException, InterruptedException {
SqoopRecord bestRecord = null;
try {
for (MergeRecord val : vals) {
if (null == bestRecord && !val.isNewRecord()) {
// Use an old record if we don't have a new record.
bestRecord = (SqoopRecord) val.getSqoopRecord().clone();
} else if (val.isNewRecord()) {
bestRecord = (SqoopRecord) val.getSqoopRecord().clone();
}
}
} catch (CloneNotSupportedException cnse) {
throw new IOException(cnse);
}
if (null != bestRecord) {
writeRecord(bestRecord, c);
}
}
abstract protected void writeRecord(SqoopRecord record, Context c)
throws IOException, InterruptedException;
}

View File

@ -25,18 +25,12 @@
import java.sql.Timestamp;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import com.cloudera.sqoop.testutil.CommonArgs;
import com.cloudera.sqoop.testutil.HsqldbTestServer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import com.cloudera.sqoop.SqoopOptions.FileLayout;
import com.cloudera.sqoop.SqoopOptions.IncrementalMode;
import com.cloudera.sqoop.manager.ConnManager;
import com.cloudera.sqoop.testutil.BaseSqoopTestCase;
@ -44,6 +38,20 @@
import com.cloudera.sqoop.tool.ImportTool;
import com.cloudera.sqoop.tool.MergeTool;
import com.cloudera.sqoop.util.ClassLoaderStack;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.FileReader;
import org.apache.avro.file.SeekableInput;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.mapred.FsInput;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
/**
* Test that the merge tool works.
@ -58,6 +66,19 @@ public class TestMerge extends BaseSqoopTestCase {
public static final String SOURCE_DB_URL = "jdbc:hsqldb:mem:merge";
private static final List<List<Integer>> initRecords = Arrays
.asList(Arrays.asList(new Integer(0), new Integer(0)),
Arrays.asList(new Integer(1), new Integer(42)));
private static final List<List<Integer>> newRecords = Arrays.asList(
Arrays.asList(new Integer(1), new Integer(43)),
Arrays.asList(new Integer(3), new Integer(313)));
private static final List<List<Integer>> mergedRecords = Arrays.asList(
Arrays.asList(new Integer(0), new Integer(0)),
Arrays.asList(new Integer(1), new Integer(43)),
Arrays.asList(new Integer(3), new Integer(313)));
@Override
public void setUp() {
super.setUp();
@ -71,6 +92,9 @@ public void setUp() {
}
public static final String TABLE_NAME = "MergeTable";
private static final String OLD_PATH = "merge-old";
private static final String NEW_PATH = "merge_new";
private static final String FINAL_PATH = "merge_final";
public Configuration newConf() {
Configuration conf = new Configuration();
@ -91,7 +115,7 @@ public SqoopOptions getSqoopOptions(Configuration conf) {
return options;
}
protected void createTable() throws SQLException {
protected void createTable(List<List<Integer>> records) throws SQLException {
PreparedStatement s = conn.prepareStatement("DROP TABLE \"" + TABLE_NAME + "\" IF EXISTS");
try {
s.executeUpdate();
@ -99,32 +123,38 @@ protected void createTable() throws SQLException {
s.close();
}
s = conn.prepareStatement("CREATE TABLE \"" + TABLE_NAME + "\" (id INT NOT NULL PRIMARY KEY, val INT, lastmod TIMESTAMP)");
s = conn.prepareStatement("CREATE TABLE \"" + TABLE_NAME
+ "\" (id INT NOT NULL PRIMARY KEY, val INT, LASTMOD timestamp)");
try {
s.executeUpdate();
} finally {
s.close();
}
s = conn.prepareStatement("INSERT INTO \"" + TABLE_NAME + "\" VALUES (0, 0, NOW())");
try {
s.executeUpdate();
} finally {
s.close();
}
s = conn.prepareStatement("INSERT INTO \"" + TABLE_NAME + "\" VALUES (1, 42, NOW())");
try {
s.executeUpdate();
} finally {
s.close();
for (List<Integer> record : records) {
final String values = StringUtils.join(record, ", ");
s = conn
.prepareStatement("INSERT INTO \"" + TABLE_NAME + "\" VALUES (" + values + ", now())");
try {
s.executeUpdate();
} finally {
s.close();
}
}
conn.commit();
}
public void testMerge() throws Exception {
createTable();
public void testTextFileMerge() throws Exception {
runMergeTest(SqoopOptions.FileLayout.TextFile);
}
public void testAvroFileMerge() throws Exception {
runMergeTest(SqoopOptions.FileLayout.AvroDataFile);
}
public void runMergeTest(SqoopOptions.FileLayout fileLayout) throws Exception {
createTable(initRecords);
// Create a jar to use for the merging process; we'll load it
// into the current thread CL for when this runs. This needs
@ -146,80 +176,33 @@ public void testMerge() throws Exception {
String jarFileName = jars.get(0);
// Now do the imports.
Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
options = getSqoopOptions(newConf());
options.setTableName(TABLE_NAME);
options.setNumMappers(1);
// Do an import of this data into the "old" dataset.
options.setTargetDir(new Path(warehouse, "merge-old").toString());
options.setIncrementalMode(IncrementalMode.DateLastModified);
options.setIncrementalTestColumn("LASTMOD");
ImportTool importTool = new ImportTool();
Sqoop importer = new Sqoop(importTool, options.getConf(), options);
ret = Sqoop.runSqoop(importer, new String[0]);
if (0 != ret) {
fail("Initial import failed with exit code " + ret);
}
importData(OLD_PATH, fileLayout);
// Check that we got records that meet our expected values.
assertRecordStartsWith("0,0,", "merge-old");
assertRecordStartsWith("1,42,", "merge-old");
long prevImportEnd = System.currentTimeMillis();
checkData(OLD_PATH, initRecords, fileLayout);
Thread.sleep(25);
// Modify the data in the warehouse.
PreparedStatement s = conn.prepareStatement("UPDATE \"" + TABLE_NAME + "\" SET val=43, lastmod=NOW() WHERE id=1");
try {
s.executeUpdate();
conn.commit();
} finally {
s.close();
}
s = conn.prepareStatement("INSERT INTO \"" + TABLE_NAME + "\" VALUES (3,313,NOW())");
try {
s.executeUpdate();
conn.commit();
} finally {
s.close();
}
createTable(newRecords);
Thread.sleep(25);
// Do another import, into the "new" dir.
options = getSqoopOptions(newConf());
options.setTableName(TABLE_NAME);
options.setNumMappers(1);
options.setTargetDir(new Path(warehouse, "merge-new").toString());
options.setIncrementalMode(IncrementalMode.DateLastModified);
options.setIncrementalTestColumn("LASTMOD");
options.setIncrementalLastValue(new Timestamp(prevImportEnd).toString());
importData(NEW_PATH, fileLayout);
importTool = new ImportTool();
importer = new Sqoop(importTool, options.getConf(), options);
ret = Sqoop.runSqoop(importer, new String[0]);
if (0 != ret) {
fail("Second import failed with exit code " + ret);
}
assertRecordStartsWith("1,43,", "merge-new");
assertRecordStartsWith("3,313,", "merge-new");
checkData(NEW_PATH, newRecords, fileLayout);
// Now merge the results!
ClassLoaderStack.addJarFile(jarFileName, MERGE_CLASS_NAME);
Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
options = getSqoopOptions(newConf());
options.setMergeOldPath(new Path(warehouse, "merge-old").toString());
options.setMergeNewPath(new Path(warehouse, "merge-new").toString());
options.setMergeOldPath(new Path(warehouse, OLD_PATH).toString());
options.setMergeNewPath(new Path(warehouse, NEW_PATH).toString());
options.setMergeKeyCol("ID");
options.setTargetDir(new Path(warehouse, "merge-final").toString());
options.setTargetDir(new Path(warehouse, FINAL_PATH).toString());
options.setClassName(MERGE_CLASS_NAME);
options.setExistingJarName(jarFileName);
MergeTool mergeTool = new MergeTool();
Sqoop merger = new Sqoop(mergeTool, options.getConf(), options);
@ -228,17 +211,47 @@ public void testMerge() throws Exception {
fail("Merge failed with exit code " + ret);
}
assertRecordStartsWith("0,0,", "merge-final");
assertRecordStartsWith("1,43,", "merge-final");
assertRecordStartsWith("3,313,", "merge-final");
checkData(FINAL_PATH, mergedRecords, fileLayout);
}
private void checkData(String dataDir, List<List<Integer>> records,
SqoopOptions.FileLayout fileLayout) throws Exception {
for (List<Integer> record : records) {
assertRecordStartsWith(record, dataDir, fileLayout);
}
}
private boolean valueMatches(GenericRecord genericRecord, List<Integer> recordVals) {
return recordVals.get(0).equals(genericRecord.get(0))
&& recordVals.get(1).equals(genericRecord.get(1));
}
private void importData(String targetDir, SqoopOptions.FileLayout fileLayout) {
SqoopOptions options;
options = getSqoopOptions(newConf());
options.setTableName(TABLE_NAME);
options.setNumMappers(1);
options.setFileLayout(fileLayout);
options.setDeleteMode(true);
Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
options.setTargetDir(new Path(warehouse, targetDir).toString());
ImportTool importTool = new ImportTool();
Sqoop importer = new Sqoop(importTool, options.getConf(), options);
int ret = Sqoop.runSqoop(importer, new String[0]);
if (0 != ret) {
fail("Initial import failed with exit code " + ret);
}
}
/**
* @return true if the file specified by path 'p' contains a line
* that starts with 'prefix'
*/
protected boolean checkFileForLine(FileSystem fs, Path p, String prefix)
protected boolean checkTextFileForLine(FileSystem fs, Path p, List<Integer> record)
throws IOException {
final String prefix = StringUtils.join(record, ',');
BufferedReader r = new BufferedReader(new InputStreamReader(fs.open(p)));
try {
while (true) {
@ -258,11 +271,42 @@ protected boolean checkFileForLine(FileSystem fs, Path p, String prefix)
return false;
}
private boolean checkAvroFileForLine(FileSystem fs, Path p, List<Integer> record)
throws IOException {
SeekableInput in = new FsInput(p, new Configuration());
DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>();
FileReader<GenericRecord> reader = DataFileReader.openReader(in, datumReader);
reader.sync(0);
while (reader.hasNext()) {
if (valueMatches(reader.next(), record)) {
return true;
}
}
return false;
}
protected boolean checkFileForLine(FileSystem fs, Path p, SqoopOptions.FileLayout fileLayout,
List<Integer> record) throws IOException {
boolean result = false;
switch (fileLayout) {
case TextFile:
result = checkTextFileForLine(fs, p, record);
break;
case AvroDataFile:
result = checkAvroFileForLine(fs, p, record);
break;
}
return result;
}
/**
* Return true if there's a file in 'dirName' with a line that starts with
* 'prefix'.
*/
protected boolean recordStartsWith(String prefix, String dirName)
protected boolean recordStartsWith(List<Integer> record, String dirName,
SqoopOptions.FileLayout fileLayout)
throws Exception {
Path warehousePath = new Path(LOCAL_WAREHOUSE_DIR);
Path targetPath = new Path(warehousePath, dirName);
@ -277,7 +321,7 @@ protected boolean recordStartsWith(String prefix, String dirName)
for (FileStatus stat : files) {
Path p = stat.getPath();
if (p.getName().startsWith("part-")) {
if (checkFileForLine(fs, p, prefix)) {
if (checkFileForLine(fs, p, fileLayout, record)) {
// We found the line. Nothing further to do.
return true;
}
@ -287,11 +331,10 @@ protected boolean recordStartsWith(String prefix, String dirName)
return false;
}
protected void assertRecordStartsWith(String prefix, String dirName)
throws Exception {
if (!recordStartsWith(prefix, dirName)) {
fail("No record found that starts with " + prefix + " in " + dirName);
protected void assertRecordStartsWith(List<Integer> record, String dirName,
SqoopOptions.FileLayout fileLayout) throws Exception {
if (!recordStartsWith(record, dirName, fileLayout)) {
fail("No record found that starts with [" + StringUtils.join(record, ", ") + "] in " + dirName);
}
}
}