From 3509b7941e55a14c318165fabf882bacff1eb170 Mon Sep 17 00:00:00 2001 From: Andrew Bayer Date: Fri, 22 Jul 2011 20:04:17 +0000 Subject: [PATCH] SQOOP-90. Tool to merge datasets imported via incremental import. Adds 'merge' tool. Adds MergeJob, Merge*Mapper, MergeReducer. Merge-specific arguments added to SqoopOptions, BaseSqoopTool. Add TestMerge to test that this tool functions as expected. From: Aaron Kimball git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1149980 13f79535-47bb-0310-9956-ffa450edef68 --- src/docs/man/sqoop-merge.txt | 77 +++++ src/docs/user/merge-purpose.txt | 26 ++ src/docs/user/saved-jobs.txt | 77 ++++- src/java/com/cloudera/sqoop/SqoopOptions.java | 55 +++ .../com/cloudera/sqoop/mapreduce/JobBase.java | 9 +- .../cloudera/sqoop/mapreduce/MergeJob.java | 152 +++++++++ .../sqoop/mapreduce/MergeMapperBase.java | 89 +++++ .../cloudera/sqoop/mapreduce/MergeRecord.java | 134 ++++++++ .../sqoop/mapreduce/MergeRecordMapper.java | 37 ++ .../sqoop/mapreduce/MergeReducer.java | 58 ++++ .../sqoop/mapreduce/MergeTextMapper.java | 61 ++++ .../cloudera/sqoop/tool/BaseSqoopTool.java | 6 + .../com/cloudera/sqoop/tool/ImportTool.java | 8 + .../com/cloudera/sqoop/tool/MergeTool.java | 237 +++++++++++++ .../com/cloudera/sqoop/tool/SqoopTool.java | 2 + src/test/com/cloudera/sqoop/SmokeTests.java | 1 + src/test/com/cloudera/sqoop/TestMerge.java | 323 ++++++++++++++++++ 17 files changed, 1349 insertions(+), 3 deletions(-) create mode 100644 src/docs/man/sqoop-merge.txt create mode 100644 src/docs/user/merge-purpose.txt create mode 100644 src/java/com/cloudera/sqoop/mapreduce/MergeJob.java create mode 100644 src/java/com/cloudera/sqoop/mapreduce/MergeMapperBase.java create mode 100644 src/java/com/cloudera/sqoop/mapreduce/MergeRecord.java create mode 100644 src/java/com/cloudera/sqoop/mapreduce/MergeRecordMapper.java create mode 100644 src/java/com/cloudera/sqoop/mapreduce/MergeReducer.java create mode 100644 src/java/com/cloudera/sqoop/mapreduce/MergeTextMapper.java create mode 100644 src/java/com/cloudera/sqoop/tool/MergeTool.java create mode 100644 src/test/com/cloudera/sqoop/TestMerge.java diff --git a/src/docs/man/sqoop-merge.txt b/src/docs/man/sqoop-merge.txt new file mode 100644 index 00000000..841c52ce --- /dev/null +++ b/src/docs/man/sqoop-merge.txt @@ -0,0 +1,77 @@ +sqoop-merge(1) +============== + +NAME +---- +sqoop-merge - Merge a newer dataset from an incremental import onto an +older one. + +SYNOPSIS +-------- +'sqoop-merge' + +'sqoop merge' + + +DESCRIPTION +----------- + +include::../user/merge-purpose.txt[] + +OPTIONS +------- + +Merge options +~~~~~~~~~~~~~ +--class-name (class):: + Specify the name of the record-specific class to use during the merge job. + +--jar-file (file):: + Specify the name of the jar to load the record class from. + +--merge-key (col):: + Specify the name of a column to use as the merge key. + +--new-data (path):: + Specify the path of the newer dataset. + +--onto (path):: + Specify the path of the older dataset. + +--target-dir (path):: + Specify the target path for the output of the merge job. + + +Common options +~~~~~~~~~~~~~~ + +--help:: + Print usage instructions + +--verbose:: + Print more information while working + + +ENVIRONMENT +----------- + +See 'sqoop(1)' + + +//// + Licensed to Cloudera, Inc. under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + Cloudera, Inc. licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +//// + diff --git a/src/docs/user/merge-purpose.txt b/src/docs/user/merge-purpose.txt new file mode 100644 index 00000000..cb25e3d3 --- /dev/null +++ b/src/docs/user/merge-purpose.txt @@ -0,0 +1,26 @@ + +//// + Licensed to Cloudera, Inc. under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + Cloudera, Inc. licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +//// + +The merge tool allows you to combine two datasets where entries in one +dataset should overwrite entries of an older dataset. For example, an +incremental import run in last-modified mode will generate multiple datasets +in HDFS where successively newer data appears in each dataset. The +merge+ +tool will "flatten" two datasets into one, taking the newest available +records for each primary key. + + diff --git a/src/docs/user/saved-jobs.txt b/src/docs/user/saved-jobs.txt index 6fc3e641..a0b92623 100644 --- a/src/docs/user/saved-jobs.txt +++ b/src/docs/user/saved-jobs.txt @@ -51,7 +51,7 @@ $ sqoop job (generic-args) (job-args) [-- [subtool-name] (subtool-args)] $ sqoop-job (generic-args) (job-args) [-- [subtool-name] (subtool-args)] ---- -Although the Hadoop generic arguments must preceed any export arguments, +Although the Hadoop generic arguments must preceed any job arguments, the job arguments can be entered in any order with respect to one another. @@ -231,3 +231,78 @@ value +jdbc:hsqldb:hsql://:/sqoop+. For example, This metastore may be hosted on a machine within the Hadoop cluster, or elsewhere on the network. + ++sqoop-merge+ +------------- + +Purpose +~~~~~~~ + +include::merge-purpose.txt[] + +Syntax +~~~~~~ + +---- +$ sqoop merge (generic-args) (merge-args) +$ sqoop-merge (generic-args) (merge-args) +---- + +Although the Hadoop generic arguments must preceed any merge arguments, +the job arguments can be entered in any order with respect to one +another. + +.Merge options: +[grid="all"] +`---------------------------`------------------------------------------ +Argument Description +----------------------------------------------------------------------- ++\--class-name + Specify the name of the record-specific \ + class to use during the merge job. ++\--jar-file + Specify the name of the jar to load the \ + record class from. ++\--merge-key + Specify the name of a column to use as \ + the merge key. ++\--new-data + Specify the path of the newer dataset. ++\--onto + Specify the path of the older dataset. ++\--target-dir + Specify the target path for the output \ + of the merge job. +----------------------------------------------------------------------- + +The +merge+ tool runs a MapReduce job that takes two directories as +input: a newer dataset, and an older one. These are specified with ++\--new-data+ and +\--onto+ respectively. The output of the MapReduce +job will be placed in the directory in HDFS specified by +\--target-dir+. + +When merging the datasets, it is assumed that there is a unique primary +key value in each record. The column for the primary key is specified +with +\--merge-key+. Multiple rows in the same dataset should not +have the same primary key, or else data loss may occur. + +To parse the dataset and extract the key column, the auto-generated +class from a previous import must be used. You should specify the +class name and jar file with +\--class-name+ and +\--jar-file+. If +this is not availab,e you can recreate the class using the +codegen+ +tool. + +The merge tool is typically run after an incremental import with the +date-last-modified mode (+sqoop import --incremental lastmodified ...+). + +Supposing two incremental imports were performed, where some older data +is in an HDFS directory named +older+ and newer data is in an HDFS +directory named +newer+, these could be merged like so: + +---- +$ sqoop merge --new-data newer --onto older --target-dir merged \ + --jar-file datatypes.jar --class-name Foo --merge-key id +---- + +This would run a MapReduce job where the value in the +id+ column +of each row is used to join rows; rows in the +newer+ dataset will +be used in preference to rows in the +older+ dataset. + +This can be used with both SequenceFile- and text-based incremental +imports. The file types of the newer and older datasets must be the +same. + + diff --git a/src/java/com/cloudera/sqoop/SqoopOptions.java b/src/java/com/cloudera/sqoop/SqoopOptions.java index 3ebf22cb..edf95930 100644 --- a/src/java/com/cloudera/sqoop/SqoopOptions.java +++ b/src/java/com/cloudera/sqoop/SqoopOptions.java @@ -199,6 +199,14 @@ public enum IncrementalMode { @StoredAsProperty("incremental.last.value") private String incrementalLastValue; + // HDFS paths for "old" and "new" datasets in merge tool. + @StoredAsProperty("merge.old.path") private String mergeOldPath; + @StoredAsProperty("merge.new.path") private String mergeNewPath; + + // "key" column for the merge operation. + @StoredAsProperty("merge.key.col") private String mergeKeyCol; + + // These next two fields are not serialized to the metastore. // If this SqoopOptions is created by reading a saved job, these will // be populated by the JobStorage to facilitate updating the same @@ -1531,5 +1539,52 @@ public SqoopOptions getParent() { public void setParent(SqoopOptions options) { this.parent = options; } + + /** + * Set the path name used to do an incremental import of old data + * which will be combined with an "new" dataset. + */ + public void setMergeOldPath(String path) { + this.mergeOldPath = path; + } + + /** + * Return the path name used to do an incremental import of old data + * which will be combined with an "new" dataset. + */ + public String getMergeOldPath() { + return this.mergeOldPath; + } + + /** + * Set the path name used to do an incremental import of new data + * which will be combined with an "old" dataset. + */ + public void setMergeNewPath(String path) { + this.mergeNewPath = path; + } + + /** + * Return the path name used to do an incremental import of new data + * which will be combined with an "old" dataset. + */ + public String getMergeNewPath() { + return this.mergeNewPath; + } + + /** + * Set the name of the column used to merge an old and new dataset. + */ + public void setMergeKeyCol(String col) { + this.mergeKeyCol = col; + } + + /** + * Return the name of the column used to merge an old and new dataset. + */ + public String getMergeKeyCol() { + return this.mergeKeyCol; + } + } diff --git a/src/java/com/cloudera/sqoop/mapreduce/JobBase.java b/src/java/com/cloudera/sqoop/mapreduce/JobBase.java index 4aae092a..41448f33 100644 --- a/src/java/com/cloudera/sqoop/mapreduce/JobBase.java +++ b/src/java/com/cloudera/sqoop/mapreduce/JobBase.java @@ -139,8 +139,13 @@ protected void cacheJars(Job job, ConnManager mgr) addToCache(Jars.getSqoopJarPath(), fs, localUrls); addToCache(Jars.getShimJarPath(), fs, localUrls); - addToCache(Jars.getDriverClassJar(mgr), fs, localUrls); - addToCache(Jars.getJarPathForClass(mgr.getClass()), fs, localUrls); + if (null != mgr) { + addToCache(Jars.getDriverClassJar(mgr), fs, localUrls); + addToCache(Jars.getJarPathForClass(mgr.getClass()), fs, localUrls); + } + + + // If the user specified a particular jar file name, // Add anything in $SQOOP_HOME/lib, if this is set. String sqoopHome = System.getenv("SQOOP_HOME"); diff --git a/src/java/com/cloudera/sqoop/mapreduce/MergeJob.java b/src/java/com/cloudera/sqoop/mapreduce/MergeJob.java new file mode 100644 index 00000000..b2931195 --- /dev/null +++ b/src/java/com/cloudera/sqoop/mapreduce/MergeJob.java @@ -0,0 +1,152 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop.mapreduce; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; + +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; + +import com.cloudera.sqoop.SqoopOptions; + +import com.cloudera.sqoop.shims.ShimLoader; +import com.cloudera.sqoop.util.Jars; + +/** + * Run a MapReduce job that merges two datasets. + */ +public class MergeJob extends JobBase { + + /** Configuration key specifying the path to the "old" dataset. */ + public static final String MERGE_OLD_PATH_KEY = "sqoop.merge.old.path"; + + /** Configuration key specifying the path to the "new" dataset. */ + public static final String MERGE_NEW_PATH_KEY = "sqoop.merge.new.path"; + + /** Configuration key specifying the name of the key column for joins. */ + public static final String MERGE_KEY_COL_KEY = "sqoop.merge.key.col"; + + /** Configuration key specifying the SqoopRecord class name for + * the records we are merging. + */ + public static final String MERGE_SQOOP_RECORD_KEY = "sqoop.merge.class"; + + public MergeJob(final SqoopOptions opts) { + super(opts, null, null, null); + } + + public boolean runMergeJob() throws IOException { + Configuration conf = options.getConf(); + Job job = new Job(conf); + + String userClassName = options.getClassName(); + if (null == userClassName) { + // Shouldn't get here. + throw new IOException("Record class name not specified with " + + "--class-name."); + } + + // Set the external jar to use for the job. + String existingJar = options.getExistingJarName(); + if (existingJar != null) { + // User explicitly identified a jar path. + LOG.debug("Setting job jar to user-specified jar: " + existingJar); + job.getConfiguration().set("mapred.jar", existingJar); + } else { + // Infer it from the location of the specified class, if it's on the + // classpath. + try { + Class userClass = conf.getClassByName(userClassName); + if (null != userClass) { + String userJar = Jars.getJarPathForClass(userClass); + LOG.debug("Setting job jar based on user class " + userClassName + + ": " + userJar); + job.getConfiguration().set("mapred.jar", userJar); + } else { + LOG.warn("Specified class " + userClassName + " is not in a jar. " + + "MapReduce may not find the class"); + } + } catch (ClassNotFoundException cnfe) { + throw new IOException(cnfe); + } + } + + try { + Path oldPath = new Path(options.getMergeOldPath()); + Path newPath = new Path(options.getMergeNewPath()); + + Configuration jobConf = job.getConfiguration(); + FileSystem fs = FileSystem.get(jobConf); + oldPath = oldPath.makeQualified(fs); + newPath = newPath.makeQualified(fs); + + FileInputFormat.addInputPath(job, oldPath); + FileInputFormat.addInputPath(job, newPath); + + jobConf.set(MERGE_OLD_PATH_KEY, oldPath.toString()); + jobConf.set(MERGE_NEW_PATH_KEY, newPath.toString()); + jobConf.set(MERGE_KEY_COL_KEY, options.getMergeKeyCol()); + jobConf.set(MERGE_SQOOP_RECORD_KEY, userClassName); + + FileOutputFormat.setOutputPath(job, new Path(options.getTargetDir())); + + if (ExportJobBase.isSequenceFiles(jobConf, newPath)) { + job.setInputFormatClass(SequenceFileInputFormat.class); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + job.setMapperClass(MergeRecordMapper.class); + } else { + job.setMapperClass(MergeTextMapper.class); + job.setOutputFormatClass((Class) + ShimLoader.getShimClass( + "com.cloudera.sqoop.mapreduce.RawKeyTextOutputFormat")); + } + + jobConf.set("mapred.output.key.class", userClassName); + job.setOutputValueClass(NullWritable.class); + + job.setReducerClass(MergeReducer.class); + + // Set the intermediate data types. + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(MergeRecord.class); + + // Make sure Sqoop and anything else we need is on the classpath. + cacheJars(job, null); + return this.runJob(job); + } catch (InterruptedException ie) { + throw new IOException(ie); + } catch (ClassNotFoundException cnfe) { + throw new IOException(cnfe); + } + } +} + + diff --git a/src/java/com/cloudera/sqoop/mapreduce/MergeMapperBase.java b/src/java/com/cloudera/sqoop/mapreduce/MergeMapperBase.java new file mode 100644 index 00000000..39226093 --- /dev/null +++ b/src/java/com/cloudera/sqoop/mapreduce/MergeMapperBase.java @@ -0,0 +1,89 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop.mapreduce; + +import java.io.IOException; + +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; + +import com.cloudera.sqoop.lib.SqoopRecord; + +/** + * Given a set of SqoopRecord instances which are from a "new" dataset + * or an "old" dataset, extract a key column from the record and tag + * each record with a bit specifying whether it is a new or old record. + */ +public class MergeMapperBase + extends Mapper { + + public static final Log LOG = LogFactory.getLog( + MergeMapperBase.class.getName()); + + private String keyColName; // name of the key column. + private boolean isNew; // true if this split is from the new dataset. + + @Override + protected void setup(Context context) + throws IOException, InterruptedException { + Configuration conf = context.getConfiguration(); + keyColName = conf.get(MergeJob.MERGE_KEY_COL_KEY); + + InputSplit is = context.getInputSplit(); + FileSplit fs = (FileSplit) is; + Path splitPath = fs.getPath(); + + if (splitPath.toString().startsWith( + conf.get(MergeJob.MERGE_NEW_PATH_KEY))) { + this.isNew = true; + } else if (splitPath.toString().startsWith( + conf.get(MergeJob.MERGE_OLD_PATH_KEY))) { + this.isNew = false; + } else { + throw new IOException("File " + splitPath + " is not under new path " + + conf.get(MergeJob.MERGE_NEW_PATH_KEY) + " or old path " + + conf.get(MergeJob.MERGE_OLD_PATH_KEY)); + } + } + + protected void processRecord(SqoopRecord r, Context c) + throws IOException, InterruptedException { + MergeRecord mr = new MergeRecord(r, isNew); + Map fieldMap = r.getFieldMap(); + if (null == fieldMap) { + throw new IOException("No field map in record " + r); + } + Object keyObj = fieldMap.get(keyColName); + if (null == keyObj) { + throw new IOException("Cannot join values on null key. " + + "Did you specify a key column that exists?"); + } else { + c.write(new Text(keyObj.toString()), mr); + } + } +} diff --git a/src/java/com/cloudera/sqoop/mapreduce/MergeRecord.java b/src/java/com/cloudera/sqoop/mapreduce/MergeRecord.java new file mode 100644 index 00000000..3ea25636 --- /dev/null +++ b/src/java/com/cloudera/sqoop/mapreduce/MergeRecord.java @@ -0,0 +1,134 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop.mapreduce; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; + +import com.cloudera.sqoop.lib.SqoopRecord; + +/** + * Class that holds a record to be merged. This contains a SqoopRecord which + * is the "guts" of the item, and a boolean value indicating whether it is a + * "new" record or an "old" record. In the Reducer, we prefer to emit a new + * record rather than an old one, if a new one is available. + */ +public class MergeRecord implements Configurable, Writable { + private SqoopRecord sqoopRecord; + private boolean isNew; + private Configuration config; + + /** Construct an empty MergeRecord. */ + public MergeRecord() { + this.sqoopRecord = null; + this.isNew = false; + this.config = new Configuration(); + } + + /** + * Construct a MergeRecord with all fields initialized. + */ + public MergeRecord(SqoopRecord sr, boolean recordIsNew) { + this.sqoopRecord = sr; + this.isNew = recordIsNew; + this.config = new Configuration(); + } + + @Override + /** {@inheritDoc} */ + public void setConf(Configuration conf) { + this.config = conf; + } + + @Override + /** {@inheritDoc} */ + public Configuration getConf() { + return this.config; + } + + /** @return true if this record came from the "new" dataset. */ + public boolean isNewRecord() { + return isNew; + } + + /** + * Set the isNew field to 'newVal'. + */ + public void setNewRecord(boolean newVal) { + this.isNew = newVal; + } + + /** + * @return the underlying SqoopRecord we're shipping. + */ + public SqoopRecord getSqoopRecord() { + return this.sqoopRecord; + } + + /** + * Set the SqoopRecord instance we should pass from the mapper to the + * reducer. + */ + public void setSqoopRecord(SqoopRecord record) { + this.sqoopRecord = record; + } + + @Override + /** + * {@inheritDoc} + */ + public void readFields(DataInput in) throws IOException { + this.isNew = in.readBoolean(); + String className = Text.readString(in); + if (null == this.sqoopRecord) { + // If we haven't already instantiated an inner SqoopRecord, do so here. + try { + Class recordClass = + (Class) config.getClassByName(className); + this.sqoopRecord = recordClass.newInstance(); + } catch (Exception e) { + throw new IOException(e); + } + } + + this.sqoopRecord.readFields(in); + } + + @Override + /** + * {@inheritDoc} + */ + public void write(DataOutput out) throws IOException { + out.writeBoolean(this.isNew); + Text.writeString(out, this.sqoopRecord.getClass().getName()); + this.sqoopRecord.write(out); + } + + @Override + public String toString() { + return "" + this.sqoopRecord; + } +} diff --git a/src/java/com/cloudera/sqoop/mapreduce/MergeRecordMapper.java b/src/java/com/cloudera/sqoop/mapreduce/MergeRecordMapper.java new file mode 100644 index 00000000..24492e04 --- /dev/null +++ b/src/java/com/cloudera/sqoop/mapreduce/MergeRecordMapper.java @@ -0,0 +1,37 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop.mapreduce; + +import java.io.IOException; + +import org.apache.hadoop.io.LongWritable; + +import com.cloudera.sqoop.lib.SqoopRecord; + +/** + * Mapper for the merge program which operates on SequenceFiles. + */ +public class MergeRecordMapper + extends MergeMapperBase { + + public void map(LongWritable key, SqoopRecord val, Context c) + throws IOException, InterruptedException { + processRecord(val, c); + } +} diff --git a/src/java/com/cloudera/sqoop/mapreduce/MergeReducer.java b/src/java/com/cloudera/sqoop/mapreduce/MergeReducer.java new file mode 100644 index 00000000..f9c58db0 --- /dev/null +++ b/src/java/com/cloudera/sqoop/mapreduce/MergeReducer.java @@ -0,0 +1,58 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop.mapreduce; + +import java.io.IOException; + +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Reducer; + +import com.cloudera.sqoop.lib.SqoopRecord; + +/** + * Reducer for merge tool. Given records tagged as 'old' or 'new', emit + * a new one if possible; otherwise, an old one. + */ +public class MergeReducer + extends Reducer { + + @Override + public void reduce(Text key, Iterable vals, Context c) + throws IOException, InterruptedException { + SqoopRecord bestRecord = null; + try { + for (MergeRecord val : vals) { + if (null == bestRecord && !val.isNewRecord()) { + // Use an old record if we don't have a new record. + bestRecord = (SqoopRecord) val.getSqoopRecord().clone(); + } else if (val.isNewRecord()) { + bestRecord = (SqoopRecord) val.getSqoopRecord().clone(); + } + } + } catch (CloneNotSupportedException cnse) { + throw new IOException(cnse); + } + + if (null != bestRecord) { + c.write(bestRecord, NullWritable.get()); + } + } +} + diff --git a/src/java/com/cloudera/sqoop/mapreduce/MergeTextMapper.java b/src/java/com/cloudera/sqoop/mapreduce/MergeTextMapper.java new file mode 100644 index 00000000..1fc818c7 --- /dev/null +++ b/src/java/com/cloudera/sqoop/mapreduce/MergeTextMapper.java @@ -0,0 +1,61 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop.mapreduce; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.util.ReflectionUtils; + +import com.cloudera.sqoop.lib.RecordParser; +import com.cloudera.sqoop.lib.SqoopRecord; + +/** + * Mapper for the merge program which operates on text files that we need to + * parse into SqoopRecord instances. + */ +public class MergeTextMapper extends MergeMapperBase { + + private SqoopRecord record; + + @Override + protected void setup(Context c) throws IOException, InterruptedException { + Configuration conf = c.getConfiguration(); + + Class recordClass = + (Class) conf.getClass( + MergeJob.MERGE_SQOOP_RECORD_KEY, SqoopRecord.class); + this.record = ReflectionUtils.newInstance(recordClass, conf); + + super.setup(c); + } + + public void map(LongWritable key, Text val, Context c) + throws IOException, InterruptedException { + try { + this.record.parse(val); + } catch (RecordParser.ParseError pe) { + throw new IOException(pe); + } + + processRecord(this.record, c); + } +} diff --git a/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java b/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java index 00854dfe..9ebd34cd 100644 --- a/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java +++ b/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java @@ -138,6 +138,12 @@ public abstract class BaseSqoopTool extends SqoopTool { // Arguments for the metastore. public static final String METASTORE_SHUTDOWN_ARG = "shutdown"; + + // Arguments for merging datasets. + public static final String NEW_DATASET_ARG = "new-data"; + public static final String OLD_DATASET_ARG = "onto"; + public static final String MERGE_KEY_ARG = "merge-key"; + public BaseSqoopTool() { } diff --git a/src/java/com/cloudera/sqoop/tool/ImportTool.java b/src/java/com/cloudera/sqoop/tool/ImportTool.java index ffe72f41..5a64bbec 100644 --- a/src/java/com/cloudera/sqoop/tool/ImportTool.java +++ b/src/java/com/cloudera/sqoop/tool/ImportTool.java @@ -253,6 +253,14 @@ private boolean initIncrementalConstraints(SqoopOptions options, StringBuilder sb = new StringBuilder(); String prevEndpoint = options.getIncrementalLastValue(); + if (incrementalMode == SqoopOptions.IncrementalMode.DateLastModified + && null != prevEndpoint && !prevEndpoint.contains("\'")) { + // Incremental imports based on timestamps should be 'quoted' in + // ANSI SQL. If the user didn't specify single-quotes, put them + // around, here. + prevEndpoint = "'" + prevEndpoint + "'"; + } + String checkColName = manager.escapeColName( options.getIncrementalTestColumn()); LOG.info("Incremental import based on column " + checkColName); diff --git a/src/java/com/cloudera/sqoop/tool/MergeTool.java b/src/java/com/cloudera/sqoop/tool/MergeTool.java new file mode 100644 index 00000000..796570b4 --- /dev/null +++ b/src/java/com/cloudera/sqoop/tool/MergeTool.java @@ -0,0 +1,237 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop.tool; + +import java.io.IOException; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.util.StringUtils; + +import org.apache.log4j.Category; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + +import com.cloudera.sqoop.Sqoop; +import com.cloudera.sqoop.SqoopOptions; +import com.cloudera.sqoop.SqoopOptions.InvalidOptionsException; +import com.cloudera.sqoop.cli.RelatedOptions; +import com.cloudera.sqoop.cli.ToolOptions; +import com.cloudera.sqoop.mapreduce.MergeJob; + +/** + * Tool that merges a more recent dataset on top of an older one. + */ +public class MergeTool extends BaseSqoopTool { + + public static final Log LOG = LogFactory.getLog(MergeTool.class.getName()); + + public MergeTool() { + this("merge"); + } + + public MergeTool(String toolName) { + super(toolName); + } + + @Override + /** {@inheritDoc} */ + public int run(SqoopOptions options) { + try { + // Configure and execute a MapReduce job to merge these datasets. + MergeJob mergeJob = new MergeJob(options); + if (!mergeJob.runMergeJob()) { + LOG.error("MapReduce job failed!"); + return 1; + } + } catch (IOException ioe) { + LOG.error("Encountered IOException running import job: " + + StringUtils.stringifyException(ioe)); + if (System.getProperty(Sqoop.SQOOP_RETHROW_PROPERTY) != null) { + throw new RuntimeException(ioe); + } else { + return 1; + } + } + + return 0; + } + + /** + * Construct the set of options that control imports, either of one + * table or a batch of tables. + * @return the RelatedOptions that can be used to parse the import + * arguments. + */ + protected RelatedOptions getMergeOptions() { + // Imports + RelatedOptions mergeOpts = new RelatedOptions("Merge arguments"); + + mergeOpts.addOption(OptionBuilder.withArgName("file") + .hasArg().withDescription("Load class from specified jar file") + .withLongOpt(JAR_FILE_NAME_ARG) + .create()); + + mergeOpts.addOption(OptionBuilder.withArgName("name") + .hasArg().withDescription("Specify record class name to load") + .withLongOpt(CLASS_NAME_ARG) + .create()); + + mergeOpts.addOption(OptionBuilder.withArgName("path") + .hasArg().withDescription("Path to the more recent data set") + .withLongOpt(NEW_DATASET_ARG) + .create()); + + mergeOpts.addOption(OptionBuilder.withArgName("path") + .hasArg().withDescription("Path to the older data set") + .withLongOpt(OLD_DATASET_ARG) + .create()); + + mergeOpts.addOption(OptionBuilder.withArgName("path") + .hasArg().withDescription("Destination path for merged results") + .withLongOpt(TARGET_DIR_ARG) + .create()); + + mergeOpts.addOption(OptionBuilder.withArgName("column") + .hasArg().withDescription("Key column to use to join results") + .withLongOpt(MERGE_KEY_ARG) + .create()); + + // Since the "common" options aren't used in the merge tool, + // add these settings here. + mergeOpts.addOption(OptionBuilder + .withDescription("Print more information while working") + .withLongOpt(VERBOSE_ARG) + .create()); + mergeOpts.addOption(OptionBuilder + .withDescription("Print usage instructions") + .withLongOpt(HELP_ARG) + .create()); + + return mergeOpts; + } + + + @Override + /** Configure the command-line arguments we expect to receive */ + public void configureOptions(ToolOptions toolOptions) { + toolOptions.addUniqueOptions(getMergeOptions()); + } + + + @Override + /** {@inheritDoc} */ + public void applyOptions(CommandLine in, SqoopOptions out) + throws InvalidOptionsException { + + if (in.hasOption(VERBOSE_ARG)) { + // Immediately switch into DEBUG logging. + Category sqoopLogger = Logger.getLogger( + Sqoop.class.getName()).getParent(); + sqoopLogger.setLevel(Level.DEBUG); + LOG.debug("Enabled debug logging."); + } + + if (in.hasOption(HELP_ARG)) { + ToolOptions toolOpts = new ToolOptions(); + configureOptions(toolOpts); + printHelp(toolOpts); + throw new InvalidOptionsException(""); + } + + if (in.hasOption(JAR_FILE_NAME_ARG)) { + out.setExistingJarName(in.getOptionValue(JAR_FILE_NAME_ARG)); + } + + if (in.hasOption(CLASS_NAME_ARG)) { + out.setClassName(in.getOptionValue(CLASS_NAME_ARG)); + } + + if (in.hasOption(NEW_DATASET_ARG)) { + out.setMergeNewPath(in.getOptionValue(NEW_DATASET_ARG)); + } + + if (in.hasOption(OLD_DATASET_ARG)) { + out.setMergeOldPath(in.getOptionValue(OLD_DATASET_ARG)); + } + + if (in.hasOption(TARGET_DIR_ARG)) { + out.setTargetDir(in.getOptionValue(TARGET_DIR_ARG)); + } + + if (in.hasOption(MERGE_KEY_ARG)) { + out.setMergeKeyCol(in.getOptionValue(MERGE_KEY_ARG)); + } + } + + /** + * Validate merge-specific arguments. + * @param options the configured SqoopOptions to check + */ + protected void validateMergeOptions(SqoopOptions options) + throws InvalidOptionsException { + + if (options.getMergeNewPath() == null) { + throw new InvalidOptionsException("Must set the new dataset path with --" + + NEW_DATASET_ARG + "." + HELP_STR); + } + + if (options.getMergeOldPath() == null) { + throw new InvalidOptionsException("Must set the old dataset path with --" + + OLD_DATASET_ARG + "." + HELP_STR); + } + + if (options.getMergeKeyCol() == null) { + throw new InvalidOptionsException("Must set the merge key column with --" + + MERGE_KEY_ARG + "." + HELP_STR); + } + + if (options.getTargetDir() == null) { + throw new InvalidOptionsException("Must set the target directory with --" + + TARGET_DIR_ARG + "." + HELP_STR); + } + + if (options.getClassName() == null) { + throw new InvalidOptionsException("Must set the SqoopRecord class " + + "implementation to use with --" + CLASS_NAME_ARG + "." + + HELP_STR); + } + } + + @Override + /** {@inheritDoc} */ + public void validateOptions(SqoopOptions options) + throws InvalidOptionsException { + + // If extraArguments is full, check for '--' followed by args for + // mysqldump or other commands we rely on. + options.setExtraArgs(getSubcommandArgs(extraArguments)); + int dashPos = getDashPosition(extraArguments); + + if (hasUnrecognizedArgs(extraArguments, 0, dashPos)) { + throw new InvalidOptionsException(HELP_STR); + } + + validateMergeOptions(options); + } +} + diff --git a/src/java/com/cloudera/sqoop/tool/SqoopTool.java b/src/java/com/cloudera/sqoop/tool/SqoopTool.java index cd2fd0e2..751d2bf9 100644 --- a/src/java/com/cloudera/sqoop/tool/SqoopTool.java +++ b/src/java/com/cloudera/sqoop/tool/SqoopTool.java @@ -73,6 +73,8 @@ public abstract class SqoopTool { "List available databases on a server"); registerTool("list-tables", ListTablesTool.class, "List available tables in a database"); + registerTool("merge", MergeTool.class, + "Merge results of incremental imports"); registerTool("metastore", MetastoreTool.class, "Run a standalone Sqoop metastore"); registerTool("job", JobTool.class, diff --git a/src/test/com/cloudera/sqoop/SmokeTests.java b/src/test/com/cloudera/sqoop/SmokeTests.java index b406268a..9e68e74c 100644 --- a/src/test/com/cloudera/sqoop/SmokeTests.java +++ b/src/test/com/cloudera/sqoop/SmokeTests.java @@ -80,6 +80,7 @@ public static Test suite() { suite.addTestSuite(TestSavedJobs.class); suite.addTestSuite(TestNamedFifo.class); suite.addTestSuite(TestBooleanParser.class); + suite.addTestSuite(TestMerge.class); suite.addTest(MapreduceTests.suite()); return suite; diff --git a/src/test/com/cloudera/sqoop/TestMerge.java b/src/test/com/cloudera/sqoop/TestMerge.java new file mode 100644 index 00000000..32a0c03b --- /dev/null +++ b/src/test/com/cloudera/sqoop/TestMerge.java @@ -0,0 +1,323 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.sql.Connection; +import java.sql.Timestamp; +import java.sql.PreparedStatement; +import java.sql.SQLException; + +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import com.cloudera.sqoop.SqoopOptions.IncrementalMode; +import com.cloudera.sqoop.manager.ConnManager; +import com.cloudera.sqoop.manager.HsqldbManager; +import com.cloudera.sqoop.testutil.BaseSqoopTestCase; +import com.cloudera.sqoop.tool.CodeGenTool; +import com.cloudera.sqoop.tool.ImportTool; +import com.cloudera.sqoop.tool.MergeTool; +import com.cloudera.sqoop.util.ClassLoaderStack; + +import junit.framework.TestCase; + +/** + * Test that the merge tool works. + */ +public class TestMerge extends TestCase { + + private static final Log LOG = + LogFactory.getLog(TestMerge.class.getName()); + + protected ConnManager manager; + protected Connection conn; + + public static final String SOURCE_DB_URL = "jdbc:hsqldb:mem:merge"; + + @Override + public void setUp() throws IOException, InterruptedException, SQLException { + Configuration conf = newConf(); + SqoopOptions options = getSqoopOptions(conf); + manager = new HsqldbManager(options); + conn = manager.getConnection(); + } + + @Override + public void tearDown() throws SQLException { + if (null != conn) { + this.conn.close(); + } + } + + /** Base directory for all temporary data. */ + public static final String TEMP_BASE_DIR; + + /** Where to import table data to in the local filesystem for testing. */ + public static final String LOCAL_WAREHOUSE_DIR; + + // Initializer for the above. + static { + String tmpDir = System.getProperty("test.build.data", "/tmp/"); + if (!tmpDir.endsWith(File.separator)) { + tmpDir = tmpDir + File.separator; + } + + TEMP_BASE_DIR = tmpDir; + LOCAL_WAREHOUSE_DIR = TEMP_BASE_DIR + "sqoop/warehouse"; + } + + public static final String TABLE_NAME = "MergeTable"; + + public Configuration newConf() { + Configuration conf = new Configuration(); + conf.set("fs.default.name", "file:///"); + conf.set("mapred.job.tracker", "local"); + return conf; + } + + /** + * Create a SqoopOptions to connect to the manager. + */ + public SqoopOptions getSqoopOptions(Configuration conf) { + SqoopOptions options = new SqoopOptions(conf); + options.setConnectString(SOURCE_DB_URL); + + return options; + } + + protected void createTable() throws SQLException { + PreparedStatement s = conn.prepareStatement("DROP TABLE " + TABLE_NAME + + " IF EXISTS"); + try { + s.executeUpdate(); + } finally { + s.close(); + } + + s = conn.prepareStatement("CREATE TABLE " + TABLE_NAME + + " (id INT NOT NULL PRIMARY KEY, val INT, lastmod TIMESTAMP)"); + try { + s.executeUpdate(); + } finally { + s.close(); + } + + s = conn.prepareStatement("INSERT INTO " + TABLE_NAME + " VALUES (" + + "0, 0, NOW())"); + try { + s.executeUpdate(); + } finally { + s.close(); + } + + s = conn.prepareStatement("INSERT INTO " + TABLE_NAME + " VALUES (" + + "1, 42, NOW())"); + try { + s.executeUpdate(); + } finally { + s.close(); + } + + conn.commit(); + } + + public void testMerge() throws Exception { + createTable(); + + // Create a jar to use for the merging process; we'll load it + // into the current thread CL for when this runs. This needs + // to contain a different class name than used for the imports + // due to classloaderstack issues in the same JVM. + final String MERGE_CLASS_NAME = "ClassForMerging"; + SqoopOptions options = getSqoopOptions(newConf()); + options.setTableName(TABLE_NAME); + options.setClassName(MERGE_CLASS_NAME); + + CodeGenTool codeGen = new CodeGenTool(); + Sqoop codeGenerator = new Sqoop(codeGen, options.getConf(), options); + int ret = Sqoop.runSqoop(codeGenerator, new String[0]); + if (0 != ret) { + fail("Nonzero exit from codegen: " + ret); + } + + List jars = codeGen.getGeneratedJarFiles(); + String jarFileName = jars.get(0); + + // Now do the imports. + + Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR); + + options = getSqoopOptions(newConf()); + options.setTableName(TABLE_NAME); + options.setNumMappers(1); + + // Do an import of this data into the "old" dataset. + options.setTargetDir(new Path(warehouse, "merge-old").toString()); + options.setIncrementalMode(IncrementalMode.DateLastModified); + options.setIncrementalTestColumn("lastmod"); + + ImportTool importTool = new ImportTool(); + Sqoop importer = new Sqoop(importTool, options.getConf(), options); + ret = Sqoop.runSqoop(importer, new String[0]); + if (0 != ret) { + fail("Initial import failed with exit code " + ret); + } + + // Check that we got records that meet our expected values. + assertRecordStartsWith("0,0,", "merge-old"); + assertRecordStartsWith("1,42,", "merge-old"); + + long prevImportEnd = System.currentTimeMillis(); + + Thread.sleep(25); + + // Modify the data in the warehouse. + PreparedStatement s = conn.prepareStatement("UPDATE " + TABLE_NAME + + " SET val=43, lastmod=NOW() WHERE id=1"); + try { + s.executeUpdate(); + conn.commit(); + } finally { + s.close(); + } + + s = conn.prepareStatement("INSERT INTO " + TABLE_NAME + " VALUES (" + + "3,313,NOW())"); + try { + s.executeUpdate(); + conn.commit(); + } finally { + s.close(); + } + + Thread.sleep(25); + + // Do another import, into the "new" dir. + options = getSqoopOptions(newConf()); + options.setTableName(TABLE_NAME); + options.setNumMappers(1); + options.setTargetDir(new Path(warehouse, "merge-new").toString()); + options.setIncrementalMode(IncrementalMode.DateLastModified); + options.setIncrementalTestColumn("lastmod"); + options.setIncrementalLastValue(new Timestamp(prevImportEnd).toString()); + + importTool = new ImportTool(); + importer = new Sqoop(importTool, options.getConf(), options); + ret = Sqoop.runSqoop(importer, new String[0]); + if (0 != ret) { + fail("Second import failed with exit code " + ret); + } + + assertRecordStartsWith("1,43,", "merge-new"); + assertRecordStartsWith("3,313,", "merge-new"); + + // Now merge the results! + ClassLoaderStack.addJarFile(jarFileName, MERGE_CLASS_NAME); + + options = getSqoopOptions(newConf()); + options.setMergeOldPath(new Path(warehouse, "merge-old").toString()); + options.setMergeNewPath(new Path(warehouse, "merge-new").toString()); + options.setMergeKeyCol("ID"); + options.setTargetDir(new Path(warehouse, "merge-final").toString()); + options.setClassName(MERGE_CLASS_NAME); + + MergeTool mergeTool = new MergeTool(); + Sqoop merger = new Sqoop(mergeTool, options.getConf(), options); + ret = Sqoop.runSqoop(merger, new String[0]); + if (0 != ret) { + fail("Merge failed with exit code " + ret); + } + + assertRecordStartsWith("0,0,", "merge-final"); + assertRecordStartsWith("1,43,", "merge-final"); + assertRecordStartsWith("3,313,", "merge-final"); + } + + /** + * @return true if the file specified by path 'p' contains a line + * that starts with 'prefix' + */ + protected boolean checkFileForLine(FileSystem fs, Path p, String prefix) + throws IOException { + BufferedReader r = new BufferedReader(new InputStreamReader(fs.open(p))); + try { + while (true) { + String in = r.readLine(); + if (null == in) { + break; // done with the file. + } + + if (in.startsWith(prefix)) { + return true; + } + } + } finally { + r.close(); + } + + return false; + } + + /** + * Return true if there's a file in 'dirName' with a line that starts with + * 'prefix'. + */ + protected boolean recordStartsWith(String prefix, String dirName) + throws Exception { + Path warehousePath = new Path(LOCAL_WAREHOUSE_DIR); + Path targetPath = new Path(warehousePath, dirName); + + FileSystem fs = FileSystem.getLocal(new Configuration()); + FileStatus [] files = fs.listStatus(targetPath); + + if (null == files || files.length == 0) { + fail("Got no import files!"); + } + + for (FileStatus stat : files) { + Path p = stat.getPath(); + if (p.getName().startsWith("part-")) { + if (checkFileForLine(fs, p, prefix)) { + // We found the line. Nothing further to do. + return true; + } + } + } + + return false; + } + + protected void assertRecordStartsWith(String prefix, String dirName) + throws Exception { + if (!recordStartsWith(prefix, dirName)) { + fail("No record found that starts with " + prefix + " in " + dirName); + } + } +} +