diff --git a/src/java/org/apache/sqoop/avro/AvroUtil.java b/src/java/org/apache/sqoop/avro/AvroUtil.java index ee3cf621..90360763 100644 --- a/src/java/org/apache/sqoop/avro/AvroUtil.java +++ b/src/java/org/apache/sqoop/avro/AvroUtil.java @@ -18,14 +18,26 @@ package org.apache.sqoop.avro; import org.apache.avro.Schema; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.file.FileReader; +import org.apache.avro.file.SeekableInput; import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericFixed; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.mapred.FsInput; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.io.BytesWritable; import org.apache.sqoop.lib.BlobRef; import org.apache.sqoop.lib.ClobRef; import org.apache.sqoop.orm.ClassWriter; +import java.io.IOException; import java.math.BigDecimal; import java.nio.ByteBuffer; import java.sql.Date; @@ -184,4 +196,35 @@ public static Object fromAvro(Object avroObject, Schema schema, String type) { } } + /** + * Get the schema of AVRO files stored in a directory + */ + public static Schema getAvroSchema(Path path, Configuration conf) + throws IOException { + FileSystem fs = path.getFileSystem(conf); + Path fileToTest; + if (fs.isDirectory(path)) { + FileStatus[] fileStatuses = fs.listStatus(path, new PathFilter() { + @Override + public boolean accept(Path p) { + String name = p.getName(); + return !name.startsWith("_") && !name.startsWith("."); + } + }); + if (fileStatuses.length == 0) { + return null; + } + fileToTest = fileStatuses[0].getPath(); + } else { + fileToTest = path; + } + + SeekableInput input = new FsInput(fileToTest, conf); + DatumReader reader = new GenericDatumReader(); + FileReader fileReader = DataFileReader.openReader(input, reader); + + Schema result = fileReader.getSchema(); + fileReader.close(); + return result; + } } diff --git a/src/java/org/apache/sqoop/mapreduce/AvroJob.java b/src/java/org/apache/sqoop/mapreduce/AvroJob.java index bb4755c8..65695d85 100644 --- a/src/java/org/apache/sqoop/mapreduce/AvroJob.java +++ b/src/java/org/apache/sqoop/mapreduce/AvroJob.java @@ -42,6 +42,11 @@ public static Schema getMapOutputSchema(Configuration job) { return Schema.parse(job.get(MAP_OUTPUT_SCHEMA, job.get(OUTPUT_SCHEMA))); } + /** Set a job's output key schema. */ + public static void setOutputSchema(Configuration job, Schema s) { + job.set(OUTPUT_SCHEMA, s.toString()); + } + /** Return a job's output key schema. */ public static Schema getOutputSchema(Configuration job) { return Schema.parse(job.get(OUTPUT_SCHEMA)); diff --git a/src/java/org/apache/sqoop/mapreduce/MergeAvroMapper.java b/src/java/org/apache/sqoop/mapreduce/MergeAvroMapper.java new file mode 100644 index 00000000..a2277bf7 --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/MergeAvroMapper.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.mapred.AvroWrapper; +import org.apache.avro.mapred.Pair; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.sqoop.avro.AvroUtil; +import com.cloudera.sqoop.lib.SqoopRecord; + +/** + * Mapper for the merge program which operates on AVRO data files. + */ +public class MergeAvroMapper + extends + MergeMapperBase, NullWritable> { + + private Map> sqoopRecordFields = new HashMap>(); + private SqoopRecord sqoopRecordImpl; + + @Override + protected void setup(Context context) throws InterruptedException, IOException { + super.setup(context); + Configuration conf = context.getConfiguration(); + final String userClassName = conf.get(MergeJob.MERGE_SQOOP_RECORD_KEY); + try { + final Class clazz = Class.forName(userClassName, true, + Thread.currentThread().getContextClassLoader()); + sqoopRecordImpl = (SqoopRecord) ReflectionUtils.newInstance(clazz, conf); + for (final Field field : clazz.getDeclaredFields()) { + final String fieldName = field.getName(); + final String fieldTypeName = field.getType().getName(); + sqoopRecordFields.put(fieldName.toLowerCase(), new Pair(fieldName, + fieldTypeName)); + } + } catch (ClassNotFoundException e) { + throw new IOException("Cannot find the user record class with class name" + + userClassName, e); + } + } + + @Override + public void map(AvroWrapper key, NullWritable val, Context c) + throws IOException, InterruptedException { + processRecord(toSqoopRecord(key.datum()), c); + } + + private SqoopRecord toSqoopRecord(GenericRecord genericRecord) throws IOException { + Schema avroSchema = genericRecord.getSchema(); + for (Schema.Field field : avroSchema.getFields()) { + Pair sqoopRecordField = sqoopRecordFields.get(field.name().toLowerCase()); + if (null == sqoopRecordField) { + throw new IOException("Cannot find field '" + field.name() + "' in fields of user class" + + sqoopRecordImpl.getClass().getName() + ". Fields are: " + + Arrays.deepToString(sqoopRecordFields.values().toArray())); + } + Object avroObject = genericRecord.get(field.name()); + Object fieldVal = AvroUtil.fromAvro(avroObject, field.schema(), sqoopRecordField.value()); + sqoopRecordImpl.setField(sqoopRecordField.key(), fieldVal); + } + return sqoopRecordImpl; + } + +} diff --git a/src/java/org/apache/sqoop/mapreduce/MergeAvroReducer.java b/src/java/org/apache/sqoop/mapreduce/MergeAvroReducer.java new file mode 100644 index 00000000..2e85f51f --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/MergeAvroReducer.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce; + +import java.io.IOException; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.mapred.AvroWrapper; +import org.apache.hadoop.io.NullWritable; +import org.apache.sqoop.avro.AvroUtil; +import com.cloudera.sqoop.lib.SqoopRecord; + +public class MergeAvroReducer extends MergeReducerBase, NullWritable> { + private AvroWrapper wrapper; + private Schema schema; + private boolean bigDecimalFormatString; + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + wrapper = new AvroWrapper(); + schema = AvroJob.getOutputSchema(context.getConfiguration()); + bigDecimalFormatString = context.getConfiguration().getBoolean( + ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT, ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT_DEFAULT); + } + + @Override + protected void writeRecord(SqoopRecord record, Context context) + throws IOException, InterruptedException { + GenericRecord outKey = AvroUtil.toGenericRecord(record.getFieldMap(), schema, + bigDecimalFormatString); + wrapper.datum(outKey); + context.write(wrapper, NullWritable.get()); + } +} diff --git a/src/java/org/apache/sqoop/mapreduce/MergeJob.java b/src/java/org/apache/sqoop/mapreduce/MergeJob.java index 4e2a9169..5b6c4dfa 100644 --- a/src/java/org/apache/sqoop/mapreduce/MergeJob.java +++ b/src/java/org/apache/sqoop/mapreduce/MergeJob.java @@ -20,9 +20,19 @@ import java.io.IOException; +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.file.FileReader; +import org.apache.avro.file.SeekableInput; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.mapred.FsInput; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; @@ -30,7 +40,10 @@ import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.sqoop.avro.AvroUtil; +import org.apache.sqoop.mapreduce.ExportJobBase.FileType; import org.apache.sqoop.util.Jars; + import com.cloudera.sqoop.SqoopOptions; import com.cloudera.sqoop.mapreduce.JobBase; @@ -114,20 +127,26 @@ public boolean runMergeJob() throws IOException { FileOutputFormat.setOutputPath(job, new Path(options.getTargetDir())); - if (ExportJobBase.isSequenceFiles(jobConf, newPath)) { - job.setInputFormatClass(SequenceFileInputFormat.class); - job.setOutputFormatClass(SequenceFileOutputFormat.class); - job.setMapperClass(MergeRecordMapper.class); - } else { - job.setMapperClass(MergeTextMapper.class); - job.setOutputFormatClass(RawKeyTextOutputFormat.class); + FileType fileType = ExportJobBase.getFileType(jobConf, oldPath); + switch (fileType) { + case AVRO_DATA_FILE: + configueAvroMergeJob(conf, job, oldPath, newPath); + break; + case SEQUENCE_FILE: + job.setInputFormatClass(SequenceFileInputFormat.class); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + job.setMapperClass(MergeRecordMapper.class); + job.setReducerClass(MergeReducer.class); + break; + default: + job.setMapperClass(MergeTextMapper.class); + job.setOutputFormatClass(RawKeyTextOutputFormat.class); + job.setReducerClass(MergeReducer.class); } jobConf.set("mapred.output.key.class", userClassName); job.setOutputValueClass(NullWritable.class); - job.setReducerClass(MergeReducer.class); - // Set the intermediate data types. job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(MergeRecord.class); @@ -142,6 +161,23 @@ public boolean runMergeJob() throws IOException { throw new IOException(cnfe); } } + + private void configueAvroMergeJob(Configuration conf, Job job, Path oldPath, Path newPath) + throws IOException { + LOG.info("Trying to merge avro files"); + final Schema oldPathSchema = AvroUtil.getAvroSchema(oldPath, conf); + final Schema newPathSchema = AvroUtil.getAvroSchema(newPath, conf); + if (oldPathSchema == null || newPathSchema == null || !oldPathSchema.equals(newPathSchema)) { + throw new IOException("Invalid schema for input directories. Schema for old data: [" + + oldPathSchema + "]. Schema for new data: [" + newPathSchema + "]"); + } + LOG.debug("Avro Schema:" + oldPathSchema); + job.setInputFormatClass(AvroInputFormat.class); + job.setOutputFormatClass(AvroOutputFormat.class); + job.setMapperClass(MergeAvroMapper.class); + job.setReducerClass(MergeAvroReducer.class); + AvroJob.setOutputSchema(job.getConfiguration(), oldPathSchema); + } } diff --git a/src/java/org/apache/sqoop/mapreduce/MergeReducer.java b/src/java/org/apache/sqoop/mapreduce/MergeReducer.java index cafff8ab..6192cdbe 100644 --- a/src/java/org/apache/sqoop/mapreduce/MergeReducer.java +++ b/src/java/org/apache/sqoop/mapreduce/MergeReducer.java @@ -29,28 +29,12 @@ * a new one if possible; otherwise, an old one. */ public class MergeReducer - extends Reducer { + extends MergeReducerBase { @Override - public void reduce(Text key, Iterable vals, Context c) + protected void writeRecord(SqoopRecord record, Context c) throws IOException, InterruptedException { - SqoopRecord bestRecord = null; - try { - for (MergeRecord val : vals) { - if (null == bestRecord && !val.isNewRecord()) { - // Use an old record if we don't have a new record. - bestRecord = (SqoopRecord) val.getSqoopRecord().clone(); - } else if (val.isNewRecord()) { - bestRecord = (SqoopRecord) val.getSqoopRecord().clone(); - } - } - } catch (CloneNotSupportedException cnse) { - throw new IOException(cnse); - } - - if (null != bestRecord) { - c.write(bestRecord, NullWritable.get()); - } + c.write(record, NullWritable.get()); } } diff --git a/src/java/org/apache/sqoop/mapreduce/MergeReducerBase.java b/src/java/org/apache/sqoop/mapreduce/MergeReducerBase.java new file mode 100644 index 00000000..4af498fd --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/MergeReducerBase.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce; + +import java.io.IOException; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.Reducer.Context; +import com.cloudera.sqoop.lib.SqoopRecord; + +public abstract class MergeReducerBase extends + Reducer { + + @Override + public void reduce(Text key, Iterable vals, Context c) + throws IOException, InterruptedException { + SqoopRecord bestRecord = null; + try { + for (MergeRecord val : vals) { + if (null == bestRecord && !val.isNewRecord()) { + // Use an old record if we don't have a new record. + bestRecord = (SqoopRecord) val.getSqoopRecord().clone(); + } else if (val.isNewRecord()) { + bestRecord = (SqoopRecord) val.getSqoopRecord().clone(); + } + } + } catch (CloneNotSupportedException cnse) { + throw new IOException(cnse); + } + + if (null != bestRecord) { + writeRecord(bestRecord, c); + } + } + + abstract protected void writeRecord(SqoopRecord record, Context c) + throws IOException, InterruptedException; +} diff --git a/src/test/com/cloudera/sqoop/TestMerge.java b/src/test/com/cloudera/sqoop/TestMerge.java index 3821aa1c..17094194 100644 --- a/src/test/com/cloudera/sqoop/TestMerge.java +++ b/src/test/com/cloudera/sqoop/TestMerge.java @@ -25,18 +25,12 @@ import java.sql.Timestamp; import java.sql.PreparedStatement; import java.sql.SQLException; - +import java.util.Arrays; +import java.util.LinkedList; import java.util.List; - import com.cloudera.sqoop.testutil.CommonArgs; import com.cloudera.sqoop.testutil.HsqldbTestServer; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - +import com.cloudera.sqoop.SqoopOptions.FileLayout; import com.cloudera.sqoop.SqoopOptions.IncrementalMode; import com.cloudera.sqoop.manager.ConnManager; import com.cloudera.sqoop.testutil.BaseSqoopTestCase; @@ -44,6 +38,20 @@ import com.cloudera.sqoop.tool.ImportTool; import com.cloudera.sqoop.tool.MergeTool; import com.cloudera.sqoop.util.ClassLoaderStack; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.file.FileReader; +import org.apache.avro.file.SeekableInput; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.mapred.FsInput; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; /** * Test that the merge tool works. @@ -58,6 +66,19 @@ public class TestMerge extends BaseSqoopTestCase { public static final String SOURCE_DB_URL = "jdbc:hsqldb:mem:merge"; + private static final List> initRecords = Arrays + .asList(Arrays.asList(new Integer(0), new Integer(0)), + Arrays.asList(new Integer(1), new Integer(42))); + + private static final List> newRecords = Arrays.asList( + Arrays.asList(new Integer(1), new Integer(43)), + Arrays.asList(new Integer(3), new Integer(313))); + + private static final List> mergedRecords = Arrays.asList( + Arrays.asList(new Integer(0), new Integer(0)), + Arrays.asList(new Integer(1), new Integer(43)), + Arrays.asList(new Integer(3), new Integer(313))); + @Override public void setUp() { super.setUp(); @@ -71,6 +92,9 @@ public void setUp() { } public static final String TABLE_NAME = "MergeTable"; + private static final String OLD_PATH = "merge-old"; + private static final String NEW_PATH = "merge_new"; + private static final String FINAL_PATH = "merge_final"; public Configuration newConf() { Configuration conf = new Configuration(); @@ -91,7 +115,7 @@ public SqoopOptions getSqoopOptions(Configuration conf) { return options; } - protected void createTable() throws SQLException { + protected void createTable(List> records) throws SQLException { PreparedStatement s = conn.prepareStatement("DROP TABLE \"" + TABLE_NAME + "\" IF EXISTS"); try { s.executeUpdate(); @@ -99,32 +123,38 @@ protected void createTable() throws SQLException { s.close(); } - s = conn.prepareStatement("CREATE TABLE \"" + TABLE_NAME + "\" (id INT NOT NULL PRIMARY KEY, val INT, lastmod TIMESTAMP)"); + s = conn.prepareStatement("CREATE TABLE \"" + TABLE_NAME + + "\" (id INT NOT NULL PRIMARY KEY, val INT, LASTMOD timestamp)"); try { s.executeUpdate(); } finally { s.close(); } - s = conn.prepareStatement("INSERT INTO \"" + TABLE_NAME + "\" VALUES (0, 0, NOW())"); - try { - s.executeUpdate(); - } finally { - s.close(); - } - - s = conn.prepareStatement("INSERT INTO \"" + TABLE_NAME + "\" VALUES (1, 42, NOW())"); - try { - s.executeUpdate(); - } finally { - s.close(); + for (List record : records) { + final String values = StringUtils.join(record, ", "); + s = conn + .prepareStatement("INSERT INTO \"" + TABLE_NAME + "\" VALUES (" + values + ", now())"); + try { + s.executeUpdate(); + } finally { + s.close(); + } } conn.commit(); } - public void testMerge() throws Exception { - createTable(); + public void testTextFileMerge() throws Exception { + runMergeTest(SqoopOptions.FileLayout.TextFile); + } + + public void testAvroFileMerge() throws Exception { + runMergeTest(SqoopOptions.FileLayout.AvroDataFile); + } + + public void runMergeTest(SqoopOptions.FileLayout fileLayout) throws Exception { + createTable(initRecords); // Create a jar to use for the merging process; we'll load it // into the current thread CL for when this runs. This needs @@ -146,80 +176,33 @@ public void testMerge() throws Exception { String jarFileName = jars.get(0); // Now do the imports. - - Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR); - - options = getSqoopOptions(newConf()); - options.setTableName(TABLE_NAME); - options.setNumMappers(1); - - // Do an import of this data into the "old" dataset. - options.setTargetDir(new Path(warehouse, "merge-old").toString()); - options.setIncrementalMode(IncrementalMode.DateLastModified); - options.setIncrementalTestColumn("LASTMOD"); - - ImportTool importTool = new ImportTool(); - Sqoop importer = new Sqoop(importTool, options.getConf(), options); - ret = Sqoop.runSqoop(importer, new String[0]); - if (0 != ret) { - fail("Initial import failed with exit code " + ret); - } + importData(OLD_PATH, fileLayout); // Check that we got records that meet our expected values. - assertRecordStartsWith("0,0,", "merge-old"); - assertRecordStartsWith("1,42,", "merge-old"); - - long prevImportEnd = System.currentTimeMillis(); + checkData(OLD_PATH, initRecords, fileLayout); Thread.sleep(25); // Modify the data in the warehouse. - PreparedStatement s = conn.prepareStatement("UPDATE \"" + TABLE_NAME + "\" SET val=43, lastmod=NOW() WHERE id=1"); - try { - s.executeUpdate(); - conn.commit(); - } finally { - s.close(); - } - - s = conn.prepareStatement("INSERT INTO \"" + TABLE_NAME + "\" VALUES (3,313,NOW())"); - try { - s.executeUpdate(); - conn.commit(); - } finally { - s.close(); - } + createTable(newRecords); Thread.sleep(25); // Do another import, into the "new" dir. - options = getSqoopOptions(newConf()); - options.setTableName(TABLE_NAME); - options.setNumMappers(1); - options.setTargetDir(new Path(warehouse, "merge-new").toString()); - options.setIncrementalMode(IncrementalMode.DateLastModified); - options.setIncrementalTestColumn("LASTMOD"); - options.setIncrementalLastValue(new Timestamp(prevImportEnd).toString()); + importData(NEW_PATH, fileLayout); - importTool = new ImportTool(); - importer = new Sqoop(importTool, options.getConf(), options); - ret = Sqoop.runSqoop(importer, new String[0]); - if (0 != ret) { - fail("Second import failed with exit code " + ret); - } - - assertRecordStartsWith("1,43,", "merge-new"); - assertRecordStartsWith("3,313,", "merge-new"); + checkData(NEW_PATH, newRecords, fileLayout); // Now merge the results! ClassLoaderStack.addJarFile(jarFileName, MERGE_CLASS_NAME); - + Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR); options = getSqoopOptions(newConf()); - options.setMergeOldPath(new Path(warehouse, "merge-old").toString()); - options.setMergeNewPath(new Path(warehouse, "merge-new").toString()); + options.setMergeOldPath(new Path(warehouse, OLD_PATH).toString()); + options.setMergeNewPath(new Path(warehouse, NEW_PATH).toString()); options.setMergeKeyCol("ID"); - options.setTargetDir(new Path(warehouse, "merge-final").toString()); + options.setTargetDir(new Path(warehouse, FINAL_PATH).toString()); options.setClassName(MERGE_CLASS_NAME); + options.setExistingJarName(jarFileName); MergeTool mergeTool = new MergeTool(); Sqoop merger = new Sqoop(mergeTool, options.getConf(), options); @@ -228,17 +211,47 @@ public void testMerge() throws Exception { fail("Merge failed with exit code " + ret); } - assertRecordStartsWith("0,0,", "merge-final"); - assertRecordStartsWith("1,43,", "merge-final"); - assertRecordStartsWith("3,313,", "merge-final"); + checkData(FINAL_PATH, mergedRecords, fileLayout); + } + + private void checkData(String dataDir, List> records, + SqoopOptions.FileLayout fileLayout) throws Exception { + for (List record : records) { + assertRecordStartsWith(record, dataDir, fileLayout); + } + } + + private boolean valueMatches(GenericRecord genericRecord, List recordVals) { + return recordVals.get(0).equals(genericRecord.get(0)) + && recordVals.get(1).equals(genericRecord.get(1)); + } + + private void importData(String targetDir, SqoopOptions.FileLayout fileLayout) { + SqoopOptions options; + options = getSqoopOptions(newConf()); + options.setTableName(TABLE_NAME); + options.setNumMappers(1); + options.setFileLayout(fileLayout); + options.setDeleteMode(true); + + Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR); + options.setTargetDir(new Path(warehouse, targetDir).toString()); + + ImportTool importTool = new ImportTool(); + Sqoop importer = new Sqoop(importTool, options.getConf(), options); + int ret = Sqoop.runSqoop(importer, new String[0]); + if (0 != ret) { + fail("Initial import failed with exit code " + ret); + } } /** * @return true if the file specified by path 'p' contains a line * that starts with 'prefix' */ - protected boolean checkFileForLine(FileSystem fs, Path p, String prefix) + protected boolean checkTextFileForLine(FileSystem fs, Path p, List record) throws IOException { + final String prefix = StringUtils.join(record, ','); BufferedReader r = new BufferedReader(new InputStreamReader(fs.open(p))); try { while (true) { @@ -258,11 +271,42 @@ protected boolean checkFileForLine(FileSystem fs, Path p, String prefix) return false; } + private boolean checkAvroFileForLine(FileSystem fs, Path p, List record) + throws IOException { + SeekableInput in = new FsInput(p, new Configuration()); + DatumReader datumReader = new GenericDatumReader(); + FileReader reader = DataFileReader.openReader(in, datumReader); + reader.sync(0); + + while (reader.hasNext()) { + if (valueMatches(reader.next(), record)) { + return true; + } + } + + return false; + } + + protected boolean checkFileForLine(FileSystem fs, Path p, SqoopOptions.FileLayout fileLayout, + List record) throws IOException { + boolean result = false; + switch (fileLayout) { + case TextFile: + result = checkTextFileForLine(fs, p, record); + break; + case AvroDataFile: + result = checkAvroFileForLine(fs, p, record); + break; + } + return result; + } + /** * Return true if there's a file in 'dirName' with a line that starts with * 'prefix'. */ - protected boolean recordStartsWith(String prefix, String dirName) + protected boolean recordStartsWith(List record, String dirName, + SqoopOptions.FileLayout fileLayout) throws Exception { Path warehousePath = new Path(LOCAL_WAREHOUSE_DIR); Path targetPath = new Path(warehousePath, dirName); @@ -277,7 +321,7 @@ protected boolean recordStartsWith(String prefix, String dirName) for (FileStatus stat : files) { Path p = stat.getPath(); if (p.getName().startsWith("part-")) { - if (checkFileForLine(fs, p, prefix)) { + if (checkFileForLine(fs, p, fileLayout, record)) { // We found the line. Nothing further to do. return true; } @@ -287,11 +331,10 @@ protected boolean recordStartsWith(String prefix, String dirName) return false; } - protected void assertRecordStartsWith(String prefix, String dirName) - throws Exception { - if (!recordStartsWith(prefix, dirName)) { - fail("No record found that starts with " + prefix + " in " + dirName); + protected void assertRecordStartsWith(List record, String dirName, + SqoopOptions.FileLayout fileLayout) throws Exception { + if (!recordStartsWith(record, dirName, fileLayout)) { + fail("No record found that starts with [" + StringUtils.join(record, ", ") + "] in " + dirName); } } } -