5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-03 20:40:58 +08:00

SQOOP-90. Tool to merge datasets imported via incremental import.

Adds 'merge' tool.
Adds MergeJob, Merge*Mapper, MergeReducer.
Merge-specific arguments added to SqoopOptions, BaseSqoopTool.
Add TestMerge to test that this tool functions as expected.

From: Aaron Kimball <aaron@cloudera.com>

git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1149980 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Bayer 2011-07-22 20:04:17 +00:00
parent ea716f5426
commit 3509b7941e
17 changed files with 1349 additions and 3 deletions

View File

@ -0,0 +1,77 @@
sqoop-merge(1)
==============
NAME
----
sqoop-merge - Merge a newer dataset from an incremental import onto an
older one.
SYNOPSIS
--------
'sqoop-merge' <generic-options> <tool-options>
'sqoop merge' <generic-options> <tool-options>
DESCRIPTION
-----------
include::../user/merge-purpose.txt[]
OPTIONS
-------
Merge options
~~~~~~~~~~~~~
--class-name (class)::
Specify the name of the record-specific class to use during the merge job.
--jar-file (file)::
Specify the name of the jar to load the record class from.
--merge-key (col)::
Specify the name of a column to use as the merge key.
--new-data (path)::
Specify the path of the newer dataset.
--onto (path)::
Specify the path of the older dataset.
--target-dir (path)::
Specify the target path for the output of the merge job.
Common options
~~~~~~~~~~~~~~
--help::
Print usage instructions
--verbose::
Print more information while working
ENVIRONMENT
-----------
See 'sqoop(1)'
////
Licensed to Cloudera, Inc. under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
Cloudera, Inc. licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
////

View File

@ -0,0 +1,26 @@
////
Licensed to Cloudera, Inc. under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
Cloudera, Inc. licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
////
The merge tool allows you to combine two datasets where entries in one
dataset should overwrite entries of an older dataset. For example, an
incremental import run in last-modified mode will generate multiple datasets
in HDFS where successively newer data appears in each dataset. The +merge+
tool will "flatten" two datasets into one, taking the newest available
records for each primary key.

View File

@ -51,7 +51,7 @@ $ sqoop job (generic-args) (job-args) [-- [subtool-name] (subtool-args)]
$ sqoop-job (generic-args) (job-args) [-- [subtool-name] (subtool-args)]
----
Although the Hadoop generic arguments must preceed any export arguments,
Although the Hadoop generic arguments must preceed any job arguments,
the job arguments can be entered in any order with respect to one
another.
@ -231,3 +231,78 @@ value +jdbc:hsqldb:hsql://<server-name>:<port>/sqoop+. For example,
This metastore may be hosted on a machine within the Hadoop cluster, or
elsewhere on the network.
+sqoop-merge+
-------------
Purpose
~~~~~~~
include::merge-purpose.txt[]
Syntax
~~~~~~
----
$ sqoop merge (generic-args) (merge-args)
$ sqoop-merge (generic-args) (merge-args)
----
Although the Hadoop generic arguments must preceed any merge arguments,
the job arguments can be entered in any order with respect to one
another.
.Merge options:
[grid="all"]
`---------------------------`------------------------------------------
Argument Description
-----------------------------------------------------------------------
+\--class-name <class>+ Specify the name of the record-specific \
class to use during the merge job.
+\--jar-file <file>+ Specify the name of the jar to load the \
record class from.
+\--merge-key <col>+ Specify the name of a column to use as \
the merge key.
+\--new-data <path>+ Specify the path of the newer dataset.
+\--onto <path>+ Specify the path of the older dataset.
+\--target-dir <path>+ Specify the target path for the output \
of the merge job.
-----------------------------------------------------------------------
The +merge+ tool runs a MapReduce job that takes two directories as
input: a newer dataset, and an older one. These are specified with
+\--new-data+ and +\--onto+ respectively. The output of the MapReduce
job will be placed in the directory in HDFS specified by +\--target-dir+.
When merging the datasets, it is assumed that there is a unique primary
key value in each record. The column for the primary key is specified
with +\--merge-key+. Multiple rows in the same dataset should not
have the same primary key, or else data loss may occur.
To parse the dataset and extract the key column, the auto-generated
class from a previous import must be used. You should specify the
class name and jar file with +\--class-name+ and +\--jar-file+. If
this is not availab,e you can recreate the class using the +codegen+
tool.
The merge tool is typically run after an incremental import with the
date-last-modified mode (+sqoop import --incremental lastmodified ...+).
Supposing two incremental imports were performed, where some older data
is in an HDFS directory named +older+ and newer data is in an HDFS
directory named +newer+, these could be merged like so:
----
$ sqoop merge --new-data newer --onto older --target-dir merged \
--jar-file datatypes.jar --class-name Foo --merge-key id
----
This would run a MapReduce job where the value in the +id+ column
of each row is used to join rows; rows in the +newer+ dataset will
be used in preference to rows in the +older+ dataset.
This can be used with both SequenceFile- and text-based incremental
imports. The file types of the newer and older datasets must be the
same.

View File

@ -199,6 +199,14 @@ public enum IncrementalMode {
@StoredAsProperty("incremental.last.value")
private String incrementalLastValue;
// HDFS paths for "old" and "new" datasets in merge tool.
@StoredAsProperty("merge.old.path") private String mergeOldPath;
@StoredAsProperty("merge.new.path") private String mergeNewPath;
// "key" column for the merge operation.
@StoredAsProperty("merge.key.col") private String mergeKeyCol;
// These next two fields are not serialized to the metastore.
// If this SqoopOptions is created by reading a saved job, these will
// be populated by the JobStorage to facilitate updating the same
@ -1531,5 +1539,52 @@ public SqoopOptions getParent() {
public void setParent(SqoopOptions options) {
this.parent = options;
}
/**
* Set the path name used to do an incremental import of old data
* which will be combined with an "new" dataset.
*/
public void setMergeOldPath(String path) {
this.mergeOldPath = path;
}
/**
* Return the path name used to do an incremental import of old data
* which will be combined with an "new" dataset.
*/
public String getMergeOldPath() {
return this.mergeOldPath;
}
/**
* Set the path name used to do an incremental import of new data
* which will be combined with an "old" dataset.
*/
public void setMergeNewPath(String path) {
this.mergeNewPath = path;
}
/**
* Return the path name used to do an incremental import of new data
* which will be combined with an "old" dataset.
*/
public String getMergeNewPath() {
return this.mergeNewPath;
}
/**
* Set the name of the column used to merge an old and new dataset.
*/
public void setMergeKeyCol(String col) {
this.mergeKeyCol = col;
}
/**
* Return the name of the column used to merge an old and new dataset.
*/
public String getMergeKeyCol() {
return this.mergeKeyCol;
}
}

View File

@ -139,8 +139,13 @@ protected void cacheJars(Job job, ConnManager mgr)
addToCache(Jars.getSqoopJarPath(), fs, localUrls);
addToCache(Jars.getShimJarPath(), fs, localUrls);
addToCache(Jars.getDriverClassJar(mgr), fs, localUrls);
addToCache(Jars.getJarPathForClass(mgr.getClass()), fs, localUrls);
if (null != mgr) {
addToCache(Jars.getDriverClassJar(mgr), fs, localUrls);
addToCache(Jars.getJarPathForClass(mgr.getClass()), fs, localUrls);
}
// If the user specified a particular jar file name,
// Add anything in $SQOOP_HOME/lib, if this is set.
String sqoopHome = System.getenv("SQOOP_HOME");

View File

@ -0,0 +1,152 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.sqoop.mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.shims.ShimLoader;
import com.cloudera.sqoop.util.Jars;
/**
* Run a MapReduce job that merges two datasets.
*/
public class MergeJob extends JobBase {
/** Configuration key specifying the path to the "old" dataset. */
public static final String MERGE_OLD_PATH_KEY = "sqoop.merge.old.path";
/** Configuration key specifying the path to the "new" dataset. */
public static final String MERGE_NEW_PATH_KEY = "sqoop.merge.new.path";
/** Configuration key specifying the name of the key column for joins. */
public static final String MERGE_KEY_COL_KEY = "sqoop.merge.key.col";
/** Configuration key specifying the SqoopRecord class name for
* the records we are merging.
*/
public static final String MERGE_SQOOP_RECORD_KEY = "sqoop.merge.class";
public MergeJob(final SqoopOptions opts) {
super(opts, null, null, null);
}
public boolean runMergeJob() throws IOException {
Configuration conf = options.getConf();
Job job = new Job(conf);
String userClassName = options.getClassName();
if (null == userClassName) {
// Shouldn't get here.
throw new IOException("Record class name not specified with "
+ "--class-name.");
}
// Set the external jar to use for the job.
String existingJar = options.getExistingJarName();
if (existingJar != null) {
// User explicitly identified a jar path.
LOG.debug("Setting job jar to user-specified jar: " + existingJar);
job.getConfiguration().set("mapred.jar", existingJar);
} else {
// Infer it from the location of the specified class, if it's on the
// classpath.
try {
Class<? extends Object> userClass = conf.getClassByName(userClassName);
if (null != userClass) {
String userJar = Jars.getJarPathForClass(userClass);
LOG.debug("Setting job jar based on user class " + userClassName
+ ": " + userJar);
job.getConfiguration().set("mapred.jar", userJar);
} else {
LOG.warn("Specified class " + userClassName + " is not in a jar. "
+ "MapReduce may not find the class");
}
} catch (ClassNotFoundException cnfe) {
throw new IOException(cnfe);
}
}
try {
Path oldPath = new Path(options.getMergeOldPath());
Path newPath = new Path(options.getMergeNewPath());
Configuration jobConf = job.getConfiguration();
FileSystem fs = FileSystem.get(jobConf);
oldPath = oldPath.makeQualified(fs);
newPath = newPath.makeQualified(fs);
FileInputFormat.addInputPath(job, oldPath);
FileInputFormat.addInputPath(job, newPath);
jobConf.set(MERGE_OLD_PATH_KEY, oldPath.toString());
jobConf.set(MERGE_NEW_PATH_KEY, newPath.toString());
jobConf.set(MERGE_KEY_COL_KEY, options.getMergeKeyCol());
jobConf.set(MERGE_SQOOP_RECORD_KEY, userClassName);
FileOutputFormat.setOutputPath(job, new Path(options.getTargetDir()));
if (ExportJobBase.isSequenceFiles(jobConf, newPath)) {
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setMapperClass(MergeRecordMapper.class);
} else {
job.setMapperClass(MergeTextMapper.class);
job.setOutputFormatClass((Class<? extends OutputFormat>)
ShimLoader.getShimClass(
"com.cloudera.sqoop.mapreduce.RawKeyTextOutputFormat"));
}
jobConf.set("mapred.output.key.class", userClassName);
job.setOutputValueClass(NullWritable.class);
job.setReducerClass(MergeReducer.class);
// Set the intermediate data types.
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(MergeRecord.class);
// Make sure Sqoop and anything else we need is on the classpath.
cacheJars(job, null);
return this.runJob(job);
} catch (InterruptedException ie) {
throw new IOException(ie);
} catch (ClassNotFoundException cnfe) {
throw new IOException(cnfe);
}
}
}

View File

@ -0,0 +1,89 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.sqoop.mapreduce;
import java.io.IOException;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import com.cloudera.sqoop.lib.SqoopRecord;
/**
* Given a set of SqoopRecord instances which are from a "new" dataset
* or an "old" dataset, extract a key column from the record and tag
* each record with a bit specifying whether it is a new or old record.
*/
public class MergeMapperBase<INKEY, INVAL>
extends Mapper<INKEY, INVAL, Text, MergeRecord> {
public static final Log LOG = LogFactory.getLog(
MergeMapperBase.class.getName());
private String keyColName; // name of the key column.
private boolean isNew; // true if this split is from the new dataset.
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
keyColName = conf.get(MergeJob.MERGE_KEY_COL_KEY);
InputSplit is = context.getInputSplit();
FileSplit fs = (FileSplit) is;
Path splitPath = fs.getPath();
if (splitPath.toString().startsWith(
conf.get(MergeJob.MERGE_NEW_PATH_KEY))) {
this.isNew = true;
} else if (splitPath.toString().startsWith(
conf.get(MergeJob.MERGE_OLD_PATH_KEY))) {
this.isNew = false;
} else {
throw new IOException("File " + splitPath + " is not under new path "
+ conf.get(MergeJob.MERGE_NEW_PATH_KEY) + " or old path "
+ conf.get(MergeJob.MERGE_OLD_PATH_KEY));
}
}
protected void processRecord(SqoopRecord r, Context c)
throws IOException, InterruptedException {
MergeRecord mr = new MergeRecord(r, isNew);
Map<String, Object> fieldMap = r.getFieldMap();
if (null == fieldMap) {
throw new IOException("No field map in record " + r);
}
Object keyObj = fieldMap.get(keyColName);
if (null == keyObj) {
throw new IOException("Cannot join values on null key. "
+ "Did you specify a key column that exists?");
} else {
c.write(new Text(keyObj.toString()), mr);
}
}
}

View File

@ -0,0 +1,134 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.sqoop.mapreduce;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import com.cloudera.sqoop.lib.SqoopRecord;
/**
* Class that holds a record to be merged. This contains a SqoopRecord which
* is the "guts" of the item, and a boolean value indicating whether it is a
* "new" record or an "old" record. In the Reducer, we prefer to emit a new
* record rather than an old one, if a new one is available.
*/
public class MergeRecord implements Configurable, Writable {
private SqoopRecord sqoopRecord;
private boolean isNew;
private Configuration config;
/** Construct an empty MergeRecord. */
public MergeRecord() {
this.sqoopRecord = null;
this.isNew = false;
this.config = new Configuration();
}
/**
* Construct a MergeRecord with all fields initialized.
*/
public MergeRecord(SqoopRecord sr, boolean recordIsNew) {
this.sqoopRecord = sr;
this.isNew = recordIsNew;
this.config = new Configuration();
}
@Override
/** {@inheritDoc} */
public void setConf(Configuration conf) {
this.config = conf;
}
@Override
/** {@inheritDoc} */
public Configuration getConf() {
return this.config;
}
/** @return true if this record came from the "new" dataset. */
public boolean isNewRecord() {
return isNew;
}
/**
* Set the isNew field to 'newVal'.
*/
public void setNewRecord(boolean newVal) {
this.isNew = newVal;
}
/**
* @return the underlying SqoopRecord we're shipping.
*/
public SqoopRecord getSqoopRecord() {
return this.sqoopRecord;
}
/**
* Set the SqoopRecord instance we should pass from the mapper to the
* reducer.
*/
public void setSqoopRecord(SqoopRecord record) {
this.sqoopRecord = record;
}
@Override
/**
* {@inheritDoc}
*/
public void readFields(DataInput in) throws IOException {
this.isNew = in.readBoolean();
String className = Text.readString(in);
if (null == this.sqoopRecord) {
// If we haven't already instantiated an inner SqoopRecord, do so here.
try {
Class<? extends SqoopRecord> recordClass =
(Class<? extends SqoopRecord>) config.getClassByName(className);
this.sqoopRecord = recordClass.newInstance();
} catch (Exception e) {
throw new IOException(e);
}
}
this.sqoopRecord.readFields(in);
}
@Override
/**
* {@inheritDoc}
*/
public void write(DataOutput out) throws IOException {
out.writeBoolean(this.isNew);
Text.writeString(out, this.sqoopRecord.getClass().getName());
this.sqoopRecord.write(out);
}
@Override
public String toString() {
return "" + this.sqoopRecord;
}
}

View File

@ -0,0 +1,37 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.sqoop.mapreduce;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import com.cloudera.sqoop.lib.SqoopRecord;
/**
* Mapper for the merge program which operates on SequenceFiles.
*/
public class MergeRecordMapper
extends MergeMapperBase<LongWritable, SqoopRecord> {
public void map(LongWritable key, SqoopRecord val, Context c)
throws IOException, InterruptedException {
processRecord(val, c);
}
}

View File

@ -0,0 +1,58 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.sqoop.mapreduce;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import com.cloudera.sqoop.lib.SqoopRecord;
/**
* Reducer for merge tool. Given records tagged as 'old' or 'new', emit
* a new one if possible; otherwise, an old one.
*/
public class MergeReducer
extends Reducer<Text, MergeRecord, SqoopRecord, NullWritable> {
@Override
public void reduce(Text key, Iterable<MergeRecord> vals, Context c)
throws IOException, InterruptedException {
SqoopRecord bestRecord = null;
try {
for (MergeRecord val : vals) {
if (null == bestRecord && !val.isNewRecord()) {
// Use an old record if we don't have a new record.
bestRecord = (SqoopRecord) val.getSqoopRecord().clone();
} else if (val.isNewRecord()) {
bestRecord = (SqoopRecord) val.getSqoopRecord().clone();
}
}
} catch (CloneNotSupportedException cnse) {
throw new IOException(cnse);
}
if (null != bestRecord) {
c.write(bestRecord, NullWritable.get());
}
}
}

View File

@ -0,0 +1,61 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.sqoop.mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.util.ReflectionUtils;
import com.cloudera.sqoop.lib.RecordParser;
import com.cloudera.sqoop.lib.SqoopRecord;
/**
* Mapper for the merge program which operates on text files that we need to
* parse into SqoopRecord instances.
*/
public class MergeTextMapper extends MergeMapperBase<LongWritable, Text> {
private SqoopRecord record;
@Override
protected void setup(Context c) throws IOException, InterruptedException {
Configuration conf = c.getConfiguration();
Class<? extends SqoopRecord> recordClass =
(Class<? extends SqoopRecord>) conf.getClass(
MergeJob.MERGE_SQOOP_RECORD_KEY, SqoopRecord.class);
this.record = ReflectionUtils.newInstance(recordClass, conf);
super.setup(c);
}
public void map(LongWritable key, Text val, Context c)
throws IOException, InterruptedException {
try {
this.record.parse(val);
} catch (RecordParser.ParseError pe) {
throw new IOException(pe);
}
processRecord(this.record, c);
}
}

View File

@ -138,6 +138,12 @@ public abstract class BaseSqoopTool extends SqoopTool {
// Arguments for the metastore.
public static final String METASTORE_SHUTDOWN_ARG = "shutdown";
// Arguments for merging datasets.
public static final String NEW_DATASET_ARG = "new-data";
public static final String OLD_DATASET_ARG = "onto";
public static final String MERGE_KEY_ARG = "merge-key";
public BaseSqoopTool() {
}

View File

@ -253,6 +253,14 @@ private boolean initIncrementalConstraints(SqoopOptions options,
StringBuilder sb = new StringBuilder();
String prevEndpoint = options.getIncrementalLastValue();
if (incrementalMode == SqoopOptions.IncrementalMode.DateLastModified
&& null != prevEndpoint && !prevEndpoint.contains("\'")) {
// Incremental imports based on timestamps should be 'quoted' in
// ANSI SQL. If the user didn't specify single-quotes, put them
// around, here.
prevEndpoint = "'" + prevEndpoint + "'";
}
String checkColName = manager.escapeColName(
options.getIncrementalTestColumn());
LOG.info("Incremental import based on column " + checkColName);

View File

@ -0,0 +1,237 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.sqoop.tool;
import java.io.IOException;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.StringUtils;
import org.apache.log4j.Category;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import com.cloudera.sqoop.Sqoop;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.SqoopOptions.InvalidOptionsException;
import com.cloudera.sqoop.cli.RelatedOptions;
import com.cloudera.sqoop.cli.ToolOptions;
import com.cloudera.sqoop.mapreduce.MergeJob;
/**
* Tool that merges a more recent dataset on top of an older one.
*/
public class MergeTool extends BaseSqoopTool {
public static final Log LOG = LogFactory.getLog(MergeTool.class.getName());
public MergeTool() {
this("merge");
}
public MergeTool(String toolName) {
super(toolName);
}
@Override
/** {@inheritDoc} */
public int run(SqoopOptions options) {
try {
// Configure and execute a MapReduce job to merge these datasets.
MergeJob mergeJob = new MergeJob(options);
if (!mergeJob.runMergeJob()) {
LOG.error("MapReduce job failed!");
return 1;
}
} catch (IOException ioe) {
LOG.error("Encountered IOException running import job: "
+ StringUtils.stringifyException(ioe));
if (System.getProperty(Sqoop.SQOOP_RETHROW_PROPERTY) != null) {
throw new RuntimeException(ioe);
} else {
return 1;
}
}
return 0;
}
/**
* Construct the set of options that control imports, either of one
* table or a batch of tables.
* @return the RelatedOptions that can be used to parse the import
* arguments.
*/
protected RelatedOptions getMergeOptions() {
// Imports
RelatedOptions mergeOpts = new RelatedOptions("Merge arguments");
mergeOpts.addOption(OptionBuilder.withArgName("file")
.hasArg().withDescription("Load class from specified jar file")
.withLongOpt(JAR_FILE_NAME_ARG)
.create());
mergeOpts.addOption(OptionBuilder.withArgName("name")
.hasArg().withDescription("Specify record class name to load")
.withLongOpt(CLASS_NAME_ARG)
.create());
mergeOpts.addOption(OptionBuilder.withArgName("path")
.hasArg().withDescription("Path to the more recent data set")
.withLongOpt(NEW_DATASET_ARG)
.create());
mergeOpts.addOption(OptionBuilder.withArgName("path")
.hasArg().withDescription("Path to the older data set")
.withLongOpt(OLD_DATASET_ARG)
.create());
mergeOpts.addOption(OptionBuilder.withArgName("path")
.hasArg().withDescription("Destination path for merged results")
.withLongOpt(TARGET_DIR_ARG)
.create());
mergeOpts.addOption(OptionBuilder.withArgName("column")
.hasArg().withDescription("Key column to use to join results")
.withLongOpt(MERGE_KEY_ARG)
.create());
// Since the "common" options aren't used in the merge tool,
// add these settings here.
mergeOpts.addOption(OptionBuilder
.withDescription("Print more information while working")
.withLongOpt(VERBOSE_ARG)
.create());
mergeOpts.addOption(OptionBuilder
.withDescription("Print usage instructions")
.withLongOpt(HELP_ARG)
.create());
return mergeOpts;
}
@Override
/** Configure the command-line arguments we expect to receive */
public void configureOptions(ToolOptions toolOptions) {
toolOptions.addUniqueOptions(getMergeOptions());
}
@Override
/** {@inheritDoc} */
public void applyOptions(CommandLine in, SqoopOptions out)
throws InvalidOptionsException {
if (in.hasOption(VERBOSE_ARG)) {
// Immediately switch into DEBUG logging.
Category sqoopLogger = Logger.getLogger(
Sqoop.class.getName()).getParent();
sqoopLogger.setLevel(Level.DEBUG);
LOG.debug("Enabled debug logging.");
}
if (in.hasOption(HELP_ARG)) {
ToolOptions toolOpts = new ToolOptions();
configureOptions(toolOpts);
printHelp(toolOpts);
throw new InvalidOptionsException("");
}
if (in.hasOption(JAR_FILE_NAME_ARG)) {
out.setExistingJarName(in.getOptionValue(JAR_FILE_NAME_ARG));
}
if (in.hasOption(CLASS_NAME_ARG)) {
out.setClassName(in.getOptionValue(CLASS_NAME_ARG));
}
if (in.hasOption(NEW_DATASET_ARG)) {
out.setMergeNewPath(in.getOptionValue(NEW_DATASET_ARG));
}
if (in.hasOption(OLD_DATASET_ARG)) {
out.setMergeOldPath(in.getOptionValue(OLD_DATASET_ARG));
}
if (in.hasOption(TARGET_DIR_ARG)) {
out.setTargetDir(in.getOptionValue(TARGET_DIR_ARG));
}
if (in.hasOption(MERGE_KEY_ARG)) {
out.setMergeKeyCol(in.getOptionValue(MERGE_KEY_ARG));
}
}
/**
* Validate merge-specific arguments.
* @param options the configured SqoopOptions to check
*/
protected void validateMergeOptions(SqoopOptions options)
throws InvalidOptionsException {
if (options.getMergeNewPath() == null) {
throw new InvalidOptionsException("Must set the new dataset path with --"
+ NEW_DATASET_ARG + "." + HELP_STR);
}
if (options.getMergeOldPath() == null) {
throw new InvalidOptionsException("Must set the old dataset path with --"
+ OLD_DATASET_ARG + "." + HELP_STR);
}
if (options.getMergeKeyCol() == null) {
throw new InvalidOptionsException("Must set the merge key column with --"
+ MERGE_KEY_ARG + "." + HELP_STR);
}
if (options.getTargetDir() == null) {
throw new InvalidOptionsException("Must set the target directory with --"
+ TARGET_DIR_ARG + "." + HELP_STR);
}
if (options.getClassName() == null) {
throw new InvalidOptionsException("Must set the SqoopRecord class "
+ "implementation to use with --" + CLASS_NAME_ARG + "."
+ HELP_STR);
}
}
@Override
/** {@inheritDoc} */
public void validateOptions(SqoopOptions options)
throws InvalidOptionsException {
// If extraArguments is full, check for '--' followed by args for
// mysqldump or other commands we rely on.
options.setExtraArgs(getSubcommandArgs(extraArguments));
int dashPos = getDashPosition(extraArguments);
if (hasUnrecognizedArgs(extraArguments, 0, dashPos)) {
throw new InvalidOptionsException(HELP_STR);
}
validateMergeOptions(options);
}
}

View File

@ -73,6 +73,8 @@ public abstract class SqoopTool {
"List available databases on a server");
registerTool("list-tables", ListTablesTool.class,
"List available tables in a database");
registerTool("merge", MergeTool.class,
"Merge results of incremental imports");
registerTool("metastore", MetastoreTool.class,
"Run a standalone Sqoop metastore");
registerTool("job", JobTool.class,

View File

@ -80,6 +80,7 @@ public static Test suite() {
suite.addTestSuite(TestSavedJobs.class);
suite.addTestSuite(TestNamedFifo.class);
suite.addTestSuite(TestBooleanParser.class);
suite.addTestSuite(TestMerge.class);
suite.addTest(MapreduceTests.suite());
return suite;

View File

@ -0,0 +1,323 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.sqoop;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.sql.Connection;
import java.sql.Timestamp;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import com.cloudera.sqoop.SqoopOptions.IncrementalMode;
import com.cloudera.sqoop.manager.ConnManager;
import com.cloudera.sqoop.manager.HsqldbManager;
import com.cloudera.sqoop.testutil.BaseSqoopTestCase;
import com.cloudera.sqoop.tool.CodeGenTool;
import com.cloudera.sqoop.tool.ImportTool;
import com.cloudera.sqoop.tool.MergeTool;
import com.cloudera.sqoop.util.ClassLoaderStack;
import junit.framework.TestCase;
/**
* Test that the merge tool works.
*/
public class TestMerge extends TestCase {
private static final Log LOG =
LogFactory.getLog(TestMerge.class.getName());
protected ConnManager manager;
protected Connection conn;
public static final String SOURCE_DB_URL = "jdbc:hsqldb:mem:merge";
@Override
public void setUp() throws IOException, InterruptedException, SQLException {
Configuration conf = newConf();
SqoopOptions options = getSqoopOptions(conf);
manager = new HsqldbManager(options);
conn = manager.getConnection();
}
@Override
public void tearDown() throws SQLException {
if (null != conn) {
this.conn.close();
}
}
/** Base directory for all temporary data. */
public static final String TEMP_BASE_DIR;
/** Where to import table data to in the local filesystem for testing. */
public static final String LOCAL_WAREHOUSE_DIR;
// Initializer for the above.
static {
String tmpDir = System.getProperty("test.build.data", "/tmp/");
if (!tmpDir.endsWith(File.separator)) {
tmpDir = tmpDir + File.separator;
}
TEMP_BASE_DIR = tmpDir;
LOCAL_WAREHOUSE_DIR = TEMP_BASE_DIR + "sqoop/warehouse";
}
public static final String TABLE_NAME = "MergeTable";
public Configuration newConf() {
Configuration conf = new Configuration();
conf.set("fs.default.name", "file:///");
conf.set("mapred.job.tracker", "local");
return conf;
}
/**
* Create a SqoopOptions to connect to the manager.
*/
public SqoopOptions getSqoopOptions(Configuration conf) {
SqoopOptions options = new SqoopOptions(conf);
options.setConnectString(SOURCE_DB_URL);
return options;
}
protected void createTable() throws SQLException {
PreparedStatement s = conn.prepareStatement("DROP TABLE " + TABLE_NAME
+ " IF EXISTS");
try {
s.executeUpdate();
} finally {
s.close();
}
s = conn.prepareStatement("CREATE TABLE " + TABLE_NAME
+ " (id INT NOT NULL PRIMARY KEY, val INT, lastmod TIMESTAMP)");
try {
s.executeUpdate();
} finally {
s.close();
}
s = conn.prepareStatement("INSERT INTO " + TABLE_NAME + " VALUES ("
+ "0, 0, NOW())");
try {
s.executeUpdate();
} finally {
s.close();
}
s = conn.prepareStatement("INSERT INTO " + TABLE_NAME + " VALUES ("
+ "1, 42, NOW())");
try {
s.executeUpdate();
} finally {
s.close();
}
conn.commit();
}
public void testMerge() throws Exception {
createTable();
// Create a jar to use for the merging process; we'll load it
// into the current thread CL for when this runs. This needs
// to contain a different class name than used for the imports
// due to classloaderstack issues in the same JVM.
final String MERGE_CLASS_NAME = "ClassForMerging";
SqoopOptions options = getSqoopOptions(newConf());
options.setTableName(TABLE_NAME);
options.setClassName(MERGE_CLASS_NAME);
CodeGenTool codeGen = new CodeGenTool();
Sqoop codeGenerator = new Sqoop(codeGen, options.getConf(), options);
int ret = Sqoop.runSqoop(codeGenerator, new String[0]);
if (0 != ret) {
fail("Nonzero exit from codegen: " + ret);
}
List<String> jars = codeGen.getGeneratedJarFiles();
String jarFileName = jars.get(0);
// Now do the imports.
Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
options = getSqoopOptions(newConf());
options.setTableName(TABLE_NAME);
options.setNumMappers(1);
// Do an import of this data into the "old" dataset.
options.setTargetDir(new Path(warehouse, "merge-old").toString());
options.setIncrementalMode(IncrementalMode.DateLastModified);
options.setIncrementalTestColumn("lastmod");
ImportTool importTool = new ImportTool();
Sqoop importer = new Sqoop(importTool, options.getConf(), options);
ret = Sqoop.runSqoop(importer, new String[0]);
if (0 != ret) {
fail("Initial import failed with exit code " + ret);
}
// Check that we got records that meet our expected values.
assertRecordStartsWith("0,0,", "merge-old");
assertRecordStartsWith("1,42,", "merge-old");
long prevImportEnd = System.currentTimeMillis();
Thread.sleep(25);
// Modify the data in the warehouse.
PreparedStatement s = conn.prepareStatement("UPDATE " + TABLE_NAME
+ " SET val=43, lastmod=NOW() WHERE id=1");
try {
s.executeUpdate();
conn.commit();
} finally {
s.close();
}
s = conn.prepareStatement("INSERT INTO " + TABLE_NAME + " VALUES ("
+ "3,313,NOW())");
try {
s.executeUpdate();
conn.commit();
} finally {
s.close();
}
Thread.sleep(25);
// Do another import, into the "new" dir.
options = getSqoopOptions(newConf());
options.setTableName(TABLE_NAME);
options.setNumMappers(1);
options.setTargetDir(new Path(warehouse, "merge-new").toString());
options.setIncrementalMode(IncrementalMode.DateLastModified);
options.setIncrementalTestColumn("lastmod");
options.setIncrementalLastValue(new Timestamp(prevImportEnd).toString());
importTool = new ImportTool();
importer = new Sqoop(importTool, options.getConf(), options);
ret = Sqoop.runSqoop(importer, new String[0]);
if (0 != ret) {
fail("Second import failed with exit code " + ret);
}
assertRecordStartsWith("1,43,", "merge-new");
assertRecordStartsWith("3,313,", "merge-new");
// Now merge the results!
ClassLoaderStack.addJarFile(jarFileName, MERGE_CLASS_NAME);
options = getSqoopOptions(newConf());
options.setMergeOldPath(new Path(warehouse, "merge-old").toString());
options.setMergeNewPath(new Path(warehouse, "merge-new").toString());
options.setMergeKeyCol("ID");
options.setTargetDir(new Path(warehouse, "merge-final").toString());
options.setClassName(MERGE_CLASS_NAME);
MergeTool mergeTool = new MergeTool();
Sqoop merger = new Sqoop(mergeTool, options.getConf(), options);
ret = Sqoop.runSqoop(merger, new String[0]);
if (0 != ret) {
fail("Merge failed with exit code " + ret);
}
assertRecordStartsWith("0,0,", "merge-final");
assertRecordStartsWith("1,43,", "merge-final");
assertRecordStartsWith("3,313,", "merge-final");
}
/**
* @return true if the file specified by path 'p' contains a line
* that starts with 'prefix'
*/
protected boolean checkFileForLine(FileSystem fs, Path p, String prefix)
throws IOException {
BufferedReader r = new BufferedReader(new InputStreamReader(fs.open(p)));
try {
while (true) {
String in = r.readLine();
if (null == in) {
break; // done with the file.
}
if (in.startsWith(prefix)) {
return true;
}
}
} finally {
r.close();
}
return false;
}
/**
* Return true if there's a file in 'dirName' with a line that starts with
* 'prefix'.
*/
protected boolean recordStartsWith(String prefix, String dirName)
throws Exception {
Path warehousePath = new Path(LOCAL_WAREHOUSE_DIR);
Path targetPath = new Path(warehousePath, dirName);
FileSystem fs = FileSystem.getLocal(new Configuration());
FileStatus [] files = fs.listStatus(targetPath);
if (null == files || files.length == 0) {
fail("Got no import files!");
}
for (FileStatus stat : files) {
Path p = stat.getPath();
if (p.getName().startsWith("part-")) {
if (checkFileForLine(fs, p, prefix)) {
// We found the line. Nothing further to do.
return true;
}
}
}
return false;
}
protected void assertRecordStartsWith(String prefix, String dirName)
throws Exception {
if (!recordStartsWith(prefix, dirName)) {
fail("No record found that starts with " + prefix + " in " + dirName);
}
}
}