5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-02 07:21:58 +08:00

SIP-3. File format for large object (LOB) storage.

Introduce LobFile format for storing large objects.
Implemented LobFile.Reader, LobFile.Writer classes.
Added a performance test of LobFile reading/writing speed.
Build system: fix cobertura build deps.
Remove unused utility classes from o.a.h.s.io.
Use LobFile for external storage in {B,C}lobRef.
Added LobReaderCache.
Converted BlobRef to read from LobFiles (through LobReaderCache).
LargeObjectLoader writes to LobFiles.
Common code from BlobRef and ClobRef factored out into LobRef abstract
base class.
Updated Test{B,C}lobRef and TestLargeObjectLoader for new external LOB storage.
Updated *ImportMappers to close LargeObjectLoaders when they're done.
Added performance tests to build.
Added script to run perf tests; factored out common logic into config script.
Fixed ivy dependency resolution to use multiple configuration inheritance.
Added LobFileStressTest.
Added readme with instructions to src/perftest directory.
Added CodecMap that abstracts compression codec classes to names.

From: Aaron Kimball <aaron@cloudera.com>

git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1149897 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Bayer 2011-07-22 20:03:46 +00:00
parent 04fb6ee44f
commit 1eb4226230
26 changed files with 4012 additions and 585 deletions

72
bin/configure-sqoop Executable file
View File

@ -0,0 +1,72 @@
#!/bin/sh
#
# Licensed to Cloudera, Inc. under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# Cloudera, Inc. licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This is sourced in by bin/sqoop to set environment variables prior to
# invoking Hadoop.
prgm=`readlink -f $0`
bin=`dirname ${prgm}`
bin=`cd ${bin} && pwd`
if [ -z "$SQOOP_HOME" ]; then
export SQOOP_HOME=${bin}/..
fi
if [ -z "${HADOOP_HOME}" ]; then
# Try CDH default if the user hasn't set this.
HADOOP_HOME=/usr/lib/hadoop
fi
# Where to find the main Sqoop jar
SQOOP_JAR_DIR=$SQOOP_HOME
# Where to find the shim jars.
SQOOP_SHIM_DIR=$SQOOP_HOME/shims
# If there's a "build" subdir, override with this, so we use
# the newly-compiled copy.
if [ -d "$SQOOP_JAR_DIR/build" ]; then
SQOOP_JAR_DIR="${SQOOP_JAR_DIR}/build"
if [ -d "$SQOOP_JAR_DIR/shims" ]; then
SQOOP_SHIM_DIR="$SQOOP_JAR_DIR/shims"
fi
fi
# Add sqoop dependencies to classpath.
SQOOP_CLASSPATH=""
if [ -d "$SQOOP_HOME/lib" ]; then
for f in $SQOOP_HOME/lib/*.jar; do
SQOOP_CLASSPATH=${SQOOP_CLASSPATH}:$f;
done
fi
# If there's a build subdir, use Ivy-retrieved dependencies too.
if [ -d "$SQOOP_HOME/build/ivy/lib/sqoop" ]; then
for f in $SQOOP_HOME/build/ivy/lib/sqoop/*/*.jar; do
SQOOP_CLASSPATH=${SQOOP_CLASSPATH}:$f;
done
fi
export SQOOP_CLASSPATH
export SQOOP_JAR_DIR
export SQOOP_SHIM_DIR
export SQOOP_JAR=`ls -1 ${SQOOP_JAR_DIR}/sqoop-*.jar | head -n 1`
export HADOOP_CLASSPATH="${SQOOP_CLASSPATH}:${HADOOP_CLASSPATH}"
export HADOOP_HOME
export HADOOP_OPTS="-Dsqoop.shim.jar.dir=${SQOOP_SHIM_DIR} ${HADOOP_OPTS}"

View File

@ -19,43 +19,6 @@ prgm=`readlink -f $0`
bin=`dirname ${prgm}`
bin=`cd ${bin} && pwd`
if [ -z "$SQOOP_HOME" ]; then
SQOOP_HOME=${bin}/..
fi
if [ -z "${HADOOP_HOME}" ]; then
# Try CDH default if the user hasn't set this.
HADOOP_HOME=/usr/lib/hadoop
fi
# Where to find the main Sqoop jar
SQOOP_JAR_DIR=$SQOOP_HOME
# Where to find the shim jars.
SQOOP_SHIM_DIR=$SQOOP_HOME/shims
# If there's a "build" subdir, override with this, so we use
# the newly-compiled copy.
if [ -d "$SQOOP_JAR_DIR/build" ]; then
SQOOP_JAR_DIR="${SQOOP_JAR_DIR}/build"
if [ -d "$SQOOP_JAR_DIR/shims" ]; then
SQOOP_SHIM_DIR="$SQOOP_JAR_DIR/shims"
fi
fi
# Add sqoop dependencies to classpath.
SQOOP_CLASSPATH=""
if [ -d "$SQOOP_HOME/lib" ]; then
for f in $SQOOP_HOME/lib/*.jar; do
SQOOP_CLASSPATH=${SQOOP_CLASSPATH}:$f;
done
fi
SQOOP_JAR=`ls -1 ${SQOOP_JAR_DIR}/sqoop-*.jar | head -n 1`
# Invoke Hadoop
HADOOP_CLASSPATH="${SQOOP_CLASSPATH}:${HADOOP_CLASSPATH}" \
HADOOP_OPTS="-Dsqoop.shim.jar.dir=${SQOOP_SHIM_DIR} ${HADOOP_OPTS}" \
${HADOOP_HOME}/bin/hadoop jar ${SQOOP_JAR} org.apache.hadoop.sqoop.Sqoop "$@"
source ${bin}/configure-sqoop
${HADOOP_HOME}/bin/hadoop jar ${SQOOP_JAR} \
org.apache.hadoop.sqoop.Sqoop "$@"

View File

@ -39,6 +39,7 @@
<property name="src.dir" location="${base.src.dir}/java" />
<property name="test.dir" location="${base.src.dir}/test" />
<property name="shim.src.dir" location="${base.src.dir}/shims" />
<property name="perftest.src.dir" location="${base.src.dir}/perftest" />
<property name="lib.dir" location="${basedir}/lib" />
<property name="docs.src.dir" location="${base.src.dir}/docs" />
<property name="script.src.dir" location="${base.src.dir}/scripts" />
@ -50,6 +51,8 @@
<property name="build.shim.classes" location="${build.shim.dir}/classes"/>
<property name="build.test" location="${build.dir}/test"/>
<property name="build.test.classes" location="${build.test}/classes" />
<property name="build.perftest" location="${build.dir}/perftest"/>
<property name="build.perftest.classes" location="${build.perftest}/classes"/>
<property name="build.javadoc" location="${build.dir}/docs/api" />
<property name="test.log.dir" location="${build.dir}/test/logs"/>
<property name="dist.dir" location="${build.dir}/${artifact.name}" />
@ -123,7 +126,6 @@
<path id="compile.classpath">
<pathelement location="${build.classes}"/>
<path refid="lib.path"/>
<path refid="${name}.common.classpath"/>
<path refid="${name}.hadoop.classpath"/>
</path>
@ -131,7 +133,6 @@
<path id="test.classpath">
<pathelement location="${build.test.classes}" />
<path refid="${name}.hadooptest.classpath"/>
<path refid="${name}.test.classpath"/>
<path refid="compile.classpath"/>
</path>
@ -146,7 +147,7 @@
<!-- Compile a shim class so Sqoop can run with the specified hadoop.dist -->
<target name="compile-one-shim"
depends="init, ivy-retrieve-common, ivy-retrieve-hadoop">
depends="init, ivy-retrieve-hadoop">
<mkdir dir="${build.shim.classes}/${hadoop.dist}" />
<javac
encoding="${build.encoding}"
@ -178,7 +179,7 @@
<!-- Compile core classes for the project -->
<target name="compile"
depends="init, ivy-retrieve-common, ivy-retrieve-hadoop"
depends="init, ivy-retrieve-hadoop"
description="Compile core classes for the project">
<!-- don't use an out-of-date instrumented build. -->
<delete dir="${cobertura.class.dir}" />
@ -196,7 +197,7 @@
</target>
<target name="compile-test"
depends="compile, ivy-retrieve-test, ivy-retrieve-hadoop-test"
depends="compile, ivy-retrieve-hadoop-test"
description="Compile test classes">
<mkdir dir="${build.test.classes}" />
<javac
@ -211,12 +212,29 @@
</javac>
</target>
<target name="compile-perf-test"
depends="compile, ivy-retrieve-hadoop-test"
description="Compile manual performance tests">
<mkdir dir="${build.perftest.classes}" />
<javac
encoding="${build.encoding}"
srcdir="${perftest.src.dir}"
includes="**/*.java"
destdir="${build.perftest.classes}"
debug="${javac.debug}">
<classpath>
<path refid="test.classpath"/>
</classpath>
</javac>
</target>
<target name="jar" depends="compile" description="Create main jar">
<jar jarfile="${build.dir}/${dest.jar}" basedir="${build.classes}" />
</target>
<!-- Ensure that all source code can be built -->
<target name="compile-all" depends="compile,compile-test,jar-all-shims"
<target name="compile-all"
depends="compile,compile-test,jar-all-shims,compile-perf-test"
description="Compile all sources"/>
<target name="scripts" depends="jar"
@ -542,7 +560,7 @@
</target>
<target name="cobertura"
depends="check-for-cobertura,warn-cobertura-unset,jar,compile-test,test-prep"
depends="check-for-cobertura,warn-cobertura-unset,jar,compile-test,test-prep,jar-all-shims"
if="cobertura.present" description="Run Cobertura (code coverage)">
<taskdef classpathref="cobertura.classpath"
resource="tasks.properties"/>

10
ivy.xml
View File

@ -33,18 +33,18 @@
extends="runtime"
description="artifacts needed to compile/test the application"/>
<conf name="apache" visibility="private"
extends="runtime"
extends="common,runtime"
description="artifacts from Apache for compile/test" />
<conf name="cloudera" visibility="private"
extends="runtime"
extends="common,runtime"
description="artifacts from Cloudera for compile/test" />
<conf name="test" visibility="private" extends="runtime"/>
<conf name="test" visibility="private" extends="common,runtime"/>
<conf name="apachetest" visibility="private"
extends="test"
extends="test,apache"
description="artifacts from Apache for testing" />
<conf name="clouderatest" visibility="private"
extends="test"
extends="test,cloudera"
description="artifacts from Cloudera for testing" />
<!-- We don't redistribute everything we depend on (e.g., Hadoop itself);

View File

@ -0,0 +1,99 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.sqoop.io;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.util.ReflectionUtils;
/**
* Provides a mapping from codec names to concrete implementation class names.
* This is used by LobFile.
*/
public final class CodecMap {
// Supported codec map values
public static final String NONE = "none";
public static final String DEFLATE = "deflate";
public static final String LZO = "lzo";
private static Map<String, String> codecNames;
static {
codecNames = new TreeMap<String, String>();
// Register the names of codecs we know about.
codecNames.put(NONE, null);
codecNames.put(DEFLATE, "org.apache.hadoop.io.compress.DefaultCodec");
codecNames.put(LZO, "com.hadoop.compression.lzo.LzoCodec");
}
private CodecMap() {
}
/**
* Given a codec name, return the name of the concrete class
* that implements it (or 'null' in the case of the "none" codec).
* @throws UnsupportedCodecException if a codec cannot be found
* with the supplied name.
*/
public static String getCodecClassName(String codecName)
throws UnsupportedCodecException {
if (!codecNames.containsKey(codecName)) {
throw new UnsupportedCodecException(codecName);
}
return codecNames.get(codecName);
}
/**
* Given a codec name, instantiate the concrete implementation
* class that implements it.
* @throws UnsupportedCodecException if a codec cannot be found
* with the supplied name.
*/
public static CompressionCodec getCodec(String codecName,
Configuration conf) throws UnsupportedCodecException {
String codecClassName = null;
try {
codecClassName = getCodecClassName(codecName);
if (null == codecClassName) {
return null;
}
Class<? extends CompressionCodec> codecClass =
(Class<? extends CompressionCodec>)
conf.getClassByName(codecClassName);
return (CompressionCodec) ReflectionUtils.newInstance(
codecClass, conf);
} catch (ClassNotFoundException cnfe) {
throw new UnsupportedCodecException("Cannot find codec class "
+ codecClassName + " for codec " + codecName);
}
}
/**
* Return the set of available codec names.
*/
public static Set<String> getCodecNames() {
return codecNames.keySet();
}
}

View File

@ -1,74 +0,0 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.sqoop.io;
import java.io.OutputStream;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* An output stream that counts how many bytes its written.
*/
public class CountingOutputStream extends OutputStream {
public static final Log LOG = LogFactory.getLog(CountingOutputStream.class.getName());
private final OutputStream stream;
private long bytesWritten;
public CountingOutputStream(final OutputStream outputStream) {
this.stream = outputStream;
this.bytesWritten = 0;
}
/** @return the number of bytes written thus far to the stream. */
public long getBytesWritten() {
return bytesWritten;
}
/** Reset the counter of bytes written to zero. */
public void resetCount() {
this.bytesWritten = 0;
}
public void close() throws IOException {
this.stream.close();
}
public void flush() throws IOException {
this.stream.flush();
}
public void write(byte [] b) throws IOException {
this.stream.write(b);
bytesWritten += b.length;
}
public void write(byte [] b, int off, int len) throws IOException {
this.stream.write(b, off, len);
bytesWritten += len;
}
public void write(int b) throws IOException {
this.stream.write(b);
bytesWritten++;
}
}

View File

@ -0,0 +1,90 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.sqoop.io;
import java.io.FilterInputStream;
import java.io.InputStream;
import java.io.IOException;
import org.apache.commons.io.input.CloseShieldInputStream;
import org.apache.commons.io.input.CountingInputStream;
import org.apache.commons.io.input.ProxyInputStream;
/**
* Provides an InputStream that can consume a fixed maximum number of bytes
* from an underlying stream. Closing the FixedLengthInputStream does not
* close the underlying stream. After reading the maximum number of available
* bytes this acts as though EOF has been reached.
*/
public class FixedLengthInputStream extends ProxyInputStream {
private CountingInputStream countingIn;
private long maxBytes;
public FixedLengthInputStream(InputStream stream, long maxLen) {
super(new CountingInputStream(new CloseShieldInputStream(stream)));
// Save a correctly-typed reference to the underlying stream.
this.countingIn = (CountingInputStream) this.in;
this.maxBytes = maxLen;
}
/** @return the number of bytes already consumed by the client. */
private long consumed() {
return countingIn.getByteCount();
}
/**
* @return number of bytes remaining to be read before the limit
* is reached.
*/
private long toLimit() {
return maxBytes - consumed();
}
@Override
public int available() throws IOException {
return (int) Math.min(toLimit(), countingIn.available());
}
@Override
public int read() throws IOException {
if (toLimit() > 0) {
return super.read();
} else {
return -1; // EOF.
}
}
@Override
public int read(byte [] buf) throws IOException {
return read(buf, 0, buf.length);
}
@Override
public int read(byte [] buf, int start, int count) throws IOException {
long limit = toLimit();
if (limit == 0) {
return -1; // EOF.
} else {
return super.read(buf, start, (int) Math.min(count, limit));
}
}
}

View File

@ -1,154 +0,0 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.sqoop.io;
import java.io.OutputStream;
import java.io.IOException;
import java.util.zip.GZIPOutputStream;
import java.util.Formatter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
/**
* An output stream that writes to HDFS, opening a new file after
* a specified number of bytes have been written to the current one.
*/
public class HdfsSplitOutputStream extends OutputStream {
public static final Log LOG = LogFactory.getLog(HdfsSplitOutputStream.class.getName());
private OutputStream writeStream;
private CountingOutputStream countingFilterStream;
private Configuration conf;
private Path destDir;
private String filePrefix;
private long cutoffBytes;
private boolean doGzip;
private int fileNum;
/**
* Create a new HdfsSplitOutputStream.
* @param conf the Configuration to use to interface with HDFS
* @param destDir the directory where the files will go (should already exist).
* @param filePrefix the first part of the filename, which will be appended by a number.
* This file will be placed inside destDir.
* @param cutoff the approximate number of bytes to use per file
* @param doGzip if true, then output files will be gzipped and have a .gz suffix.
*/
public HdfsSplitOutputStream(final Configuration conf, final Path destDir,
final String filePrefix, final long cutoff, final boolean doGzip) throws IOException {
this.conf = conf;
this.destDir = destDir;
this.filePrefix = filePrefix;
this.cutoffBytes = cutoff;
if (this.cutoffBytes < 0) {
this.cutoffBytes = 0; // splitting disabled.
}
this.doGzip = doGzip;
this.fileNum = 0;
openNextFile();
}
/** Initialize the OutputStream to the next file to write to.
*/
private void openNextFile() throws IOException {
FileSystem fs = FileSystem.get(conf);
StringBuffer sb = new StringBuffer();
Formatter fmt = new Formatter(sb);
fmt.format("%05d", this.fileNum++);
String filename = filePrefix + fmt.toString();
if (this.doGzip) {
filename = filename + ".gz";
}
Path destFile = new Path(destDir, filename);
LOG.debug("Opening next output file: " + destFile);
if (fs.exists(destFile)) {
Path canonicalDest = destFile.makeQualified(fs);
throw new IOException("Destination file " + canonicalDest + " already exists");
}
OutputStream fsOut = fs.create(destFile);
// Count how many actual bytes hit HDFS.
this.countingFilterStream = new CountingOutputStream(fsOut);
if (this.doGzip) {
// Wrap that in a Gzip stream.
this.writeStream = new GZIPOutputStream(this.countingFilterStream);
} else {
// Write to the counting stream directly.
this.writeStream = this.countingFilterStream;
}
}
/**
* @return true if allowSplit() would actually cause a split.
*/
public boolean wouldSplit() {
return this.cutoffBytes > 0
&& this.countingFilterStream.getBytesWritten() >= this.cutoffBytes;
}
/** If we've written more to the disk than the user's split size,
* open the next file.
*/
private void checkForNextFile() throws IOException {
if (wouldSplit()) {
LOG.debug("Starting new split");
this.writeStream.flush();
this.writeStream.close();
openNextFile();
}
}
/** Defines a point in the stream when it is acceptable to split to a new file;
e.g., the end of a record.
*/
public void allowSplit() throws IOException {
checkForNextFile();
}
public void close() throws IOException {
this.writeStream.close();
}
public void flush() throws IOException {
this.writeStream.flush();
}
public void write(byte [] b) throws IOException {
this.writeStream.write(b);
}
public void write(byte [] b, int off, int len) throws IOException {
this.writeStream.write(b, off, len);
}
public void write(int b) throws IOException {
this.writeStream.write(b);
}
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,146 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.sqoop.io;
import java.io.IOException;
import java.util.Map;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
/**
* A cache of open LobFile.Reader objects.
* This maps from filenames to the open Reader, if any. This uses the
* Singleton pattern. While nothing prevents multiple LobReaderCache
* instances, it is most useful to have a single global cache. This cache is
* internally synchronized; only one thread can insert or retrieve a reader
* from the cache at a time.
*/
public final class LobReaderCache {
public static final Log LOG = LogFactory.getLog(
LobReaderCache.class.getName());
private Map<Path, LobFile.Reader> readerMap;
private LobReaderCache() {
this.readerMap = new TreeMap<Path, LobFile.Reader>();
}
private static final LobReaderCache CACHE;
static {
CACHE = new LobReaderCache();
}
/**
* @return the singleton LobReaderCache instance.
*/
public static LobReaderCache getCache() {
return CACHE;
}
/**
* Created a fully-qualified path object.
* @param path the path to fully-qualify with its fs URI.
* @param conf the current Hadoop FS configuration.
* @return a new path representing the same location as the input 'path',
* but with a fully-qualified URI.
*/
public static Path qualify(Path path, Configuration conf)
throws IOException {
if (null == path) {
return null;
}
FileSystem fs = path.getFileSystem(conf);
if (null == fs) {
fs = FileSystem.get(conf);
}
return path.makeQualified(fs);
}
/**
* Open a LobFile for read access, returning a cached reader if one is
* available, or a new reader otherwise.
* @param path the path to the LobFile to open
* @param conf the configuration to use to access the FS.
* @throws IOException if there's an error opening the file.
*/
public LobFile.Reader get(Path path, Configuration conf)
throws IOException {
LobFile.Reader reader = null;
Path canonicalPath = qualify(path, conf);
// Look up an entry in the cache.
synchronized(this) {
reader = readerMap.remove(canonicalPath);
}
if (null != reader && !reader.isClosed()) {
// Cache hit. return it.
LOG.debug("Using cached reader for " + canonicalPath);
return reader;
}
// Cache miss; open the file.
LOG.debug("No cached reader available for " + canonicalPath);
return LobFile.open(path, conf);
}
/**
* Return a reader back to the cache. If there's already a reader for
* this path, then the current reader is closed.
* @param reader the opened reader. Any record-specific subreaders should be
* closed.
* @throws IOException if there's an error accessing the path's filesystem.
*/
public void recycle(LobFile.Reader reader) throws IOException {
Path canonicalPath = reader.getPath();
// Check if the cache has a reader for this path already. If not, add this.
boolean cached = false;
synchronized(this) {
if (readerMap.get(canonicalPath) == null) {
LOG.debug("Caching reader for path: " + canonicalPath);
readerMap.put(canonicalPath, reader);
cached = true;
}
}
if (!cached) {
LOG.debug("Reader already present for path: " + canonicalPath
+ "; closing.");
reader.close();
}
}
@Override
protected synchronized void finalize() throws Throwable {
for (LobFile.Reader r : readerMap.values()) {
r.close();
}
super.finalize();
}
}

View File

@ -0,0 +1,38 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.sqoop.io;
import java.io.IOException;
/**
* Thrown when a compression codec cannot be recognized.
*/
public class UnsupportedCodecException extends IOException {
public UnsupportedCodecException() {
super("UnsupportedCodecException");
}
public UnsupportedCodecException(String msg) {
super(msg);
}
public UnsupportedCodecException(Throwable cause) {
super(cause);
}
}

View File

@ -35,152 +35,69 @@
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.sqoop.io.LobFile;
import org.apache.hadoop.sqoop.io.LobReaderCache;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* BlobRef is a wrapper that holds a Blob either directly, or a
* reference to a file that holds the blob data.
* BlobRef is a wrapper that holds a BLOB either directly, or a
* reference to a file that holds the BLOB data.
*/
public class BlobRef implements Writable {
public class BlobRef extends LobRef<byte[], BytesWritable, InputStream> {
public static final Log LOG = LogFactory.getLog(BlobRef.class.getName());
public BlobRef(byte [] bytes) {
this.fileName = null;
this.data = new BytesWritable(bytes);
}
public BlobRef() {
this.fileName = null;
this.data = null;
super();
}
public BlobRef(String file) {
this.fileName = file;
this.data = null;
}
// If the data is 'small', it's held directly, here.
private BytesWritable data;
// If there data is too large, it's written into a file
// whose path (relative to the rest of the dataset) is recorded here.
// This takes precedence if this value is non-null.
private String fileName;
/**
* @return true if the BLOB data is in an external file; false if
* it materialized inline.
*/
public boolean isExternal() {
return fileName != null;
public BlobRef(byte [] bytes) {
super(new BytesWritable(bytes));
}
/**
* Convenience method to access #getDataStream(Configuration, Path)
* from within a map task that read this BlobRef from a file-based
* InputSplit.
* @param mapContext the Mapper.Context instance that encapsulates
* the current map task.
* @return an InputStream to access the BLOB data.
* @throws IllegalArgumentException if it cannot find the source
* path for this BLOB based on the MapContext.
* @throws IOException if it could not read the BLOB from external storage.
* Initialize a BlobRef to an external BLOB.
* @param file the filename to the BLOB. May be relative to the job dir.
* @param offset the offset (in bytes) into the LobFile for this record.
* @param length the length of the record in bytes.
*/
public InputStream getDataStream(Mapper.Context mapContext)
throws IllegalArgumentException, IOException {
InputSplit split = mapContext.getInputSplit();
if (split instanceof FileSplit) {
Path basePath = ((FileSplit) split).getPath().getParent();
return getDataStream(mapContext.getConfiguration(),
basePath);
} else {
throw new IllegalArgumentException(
"Could not ascertain BLOB base path from MapContext.");
}
public BlobRef(String file, long offset, long length) {
super(file, offset, length);
}
/**
* Get access to the BLOB data itself.
* This method returns an InputStream-based representation of the
* BLOB data, accessing the filesystem for external BLOB storage
* as necessary.
* @param conf the Configuration used to access the filesystem
* @param basePath the base directory where the table records are
* stored.
* @return an InputStream used to read the BLOB data.
* @throws IOException if it could not read the BLOB from external storage.
*/
public InputStream getDataStream(Configuration conf, Path basePath)
@Override
protected InputStream getExternalSource(LobFile.Reader reader)
throws IOException {
if (isExternal()) {
// use external storage.
FileSystem fs = FileSystem.get(conf);
return fs.open(new Path(basePath, fileName));
} else {
return reader.readBlobRecord();
}
@Override
protected InputStream getInternalSource(BytesWritable data) {
return new ByteArrayInputStream(data.getBytes());
}
}
public byte [] getData() {
if (isExternal()) {
throw new RuntimeException(
"External BLOBs must be read via getDataStream()");
}
@Override
protected byte [] getInternalData(BytesWritable data) {
return data.getBytes();
}
@Override
public String toString() {
if (isExternal()) {
return "externalBlob(" + fileName + ")";
} else {
return data.toString();
}
}
public void readFieldsInternal(DataInput in) throws IOException {
// For internally-stored BLOBs, the data is a BytesWritable
// containing the actual data.
@Override
public void readFields(DataInput in) throws IOException {
// The serialization format for this object is:
// boolean isExternal
// if true, the next field is a String containing the file name.
// if false, the next field is a BytesWritable containing the
// actual data.
boolean isExternal = in.readBoolean();
if (isExternal) {
this.data = null;
this.fileName = Text.readString(in);
} else {
if (null == this.data) {
this.data = new BytesWritable();
}
this.data.readFields(in);
this.fileName = null;
}
}
@Override
public void write(DataOutput out) throws IOException {
out.writeBoolean(isExternal());
if (isExternal()) {
Text.writeString(out, fileName);
} else {
public void writeInternal(DataOutput out) throws IOException {
data.write(out);
}
}
private static final ThreadLocal<Matcher> EXTERNAL_MATCHER =
new ThreadLocal<Matcher>() {
@Override protected Matcher initialValue() {
Pattern externalPattern = Pattern.compile("externalBlob\\((.*)\\)");
return externalPattern.matcher("");
}
};
/**
* Create a BlobRef based on parsed data from a line of text.
@ -192,15 +109,18 @@ public void write(DataOutput out) throws IOException {
* an empty BlobRef if the data to be parsed is actually inline.
*/
public static BlobRef parse(String inputString) {
// If inputString is of the form 'externalBlob(%s)', then this is an
// external BLOB stored at the filename indicated by '%s'. Otherwise,
// it is an inline BLOB, which we don't support parsing of.
// If inputString is of the form 'externalLob(lf,%s,%d,%d)', then this is
// an external BLOB stored at the LobFile indicated by '%s' with the next
// two arguments representing its offset and length in the file.
// Otherwise, it is an inline BLOB, which we don't support parsing of.
Matcher m = EXTERNAL_MATCHER.get();
m.reset(inputString);
if (m.matches()) {
// Extract the filename component from the string.
return new BlobRef(m.group(1));
// This is a LobFile. Extract the filename, offset and len from the
// matcher.
return new BlobRef(m.group(1), Long.valueOf(m.group(2)),
Long.valueOf(m.group(3)));
} else {
// This is inline BLOB string data.
LOG.warn(

View File

@ -35,149 +35,58 @@
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.sqoop.io.LobFile;
/**
* ClobRef is a wrapper that holds a Clob either directly, or a
* reference to a file that holds the clob data.
* ClobRef is a wrapper that holds a CLOB either directly, or a
* reference to a file that holds the CLOB data.
*/
public class ClobRef implements Writable {
public ClobRef(String chars) {
this.fileName = null;
this.data = chars;
}
public class ClobRef extends LobRef<String, String, Reader> {
public ClobRef() {
this.fileName = null;
this.data = null;
super();
}
public ClobRef(String chars) {
super(chars);
}
/**
* Initialize a clobref to an external CLOB.
* @param file the filename to the CLOB. May be relative to the job dir.
* @param ignored is not used; this just differentiates this constructor
* from ClobRef(String chars).
* @param offset the offset (in bytes) into the LobFile for this record.
* @param length the length of the record in characters.
*/
public ClobRef(String file, boolean ignored) {
this.fileName = file;
this.data = null;
public ClobRef(String file, long offset, long length) {
super(file, offset, length);
}
// If the data is 'small', it's held directly, here.
private String data;
// If there data is too large, it's written into a file
// whose path (relative to the rest of the dataset) is recorded here.
// This takes precedence if this value is non-null.
private String fileName;
/**
* @return true if the CLOB data is in an external file; false if
* it is materialized inline.
*/
public boolean isExternal() {
return fileName != null;
}
/**
* Convenience method to access #getDataReader(Configuration, Path)
* from within a map task that read this ClobRef from a file-based
* InputSplit.
* @param mapContext the Mapper.Context instance that encapsulates
* the current map task.
* @return a Reader to access the CLOB data.
* @throws IllegalArgumentException if it cannot find the source
* path for this CLOB based on the MapContext.
* @throws IOException if it could not read the CLOB from external storage.
*/
public Reader getDataReader(Mapper.Context mapContext)
throws IllegalArgumentException, IOException {
InputSplit split = mapContext.getInputSplit();
if (split instanceof FileSplit) {
Path basePath = ((FileSplit) split).getPath().getParent();
return getDataReader(mapContext.getConfiguration(),
basePath);
} else {
throw new IllegalArgumentException(
"Could not ascertain CLOB base path from MapContext.");
}
}
/**
* Get access to the CLOB data itself.
* This method returns a Reader-based representation of the
* CLOB data, accessing the filesystem for external CLOB storage
* as necessary.
* @param conf the Configuration used to access the filesystem
* @param basePath the base directory where the table records are
* stored.
* @return a Reader used to read the CLOB data.
* @throws IOException if it could not read the CLOB from external storage.
*/
public Reader getDataReader(Configuration conf, Path basePath)
@Override
protected Reader getExternalSource(LobFile.Reader reader)
throws IOException {
if (isExternal()) {
// use external storage.
FileSystem fs = FileSystem.get(conf);
return new InputStreamReader(fs.open(new Path(basePath, fileName)));
} else {
return reader.readClobRecord();
}
@Override
protected Reader getInternalSource(String data) {
return new StringReader(data);
}
}
@Override
/**
* @return a string representation of the ClobRef. If this is an
* inline clob (isExternal() returns false), it will contain the
* materialized data. Otherwise it returns a description of the
* reference. To ensure access to the data itself,
* {@see #getDataReader(Configuration,Path)}.
*/
public String toString() {
if (isExternal()) {
return "externalClob(" + fileName + ")";
} else {
protected String getInternalData(String data) {
return data;
}
}
@Override
public void readFields(DataInput in) throws IOException {
// The serialization format for this object is:
// boolean isExternal
// if true, the next field is a String containing the external file name.
// if false, the next field is String containing the actual data.
boolean isIndirect = in.readBoolean();
if (isIndirect) {
this.fileName = Text.readString(in);
this.data = null;
} else {
this.fileName = null;
public void readFieldsInternal(DataInput in) throws IOException {
// For internally-stored clobs, the data is written as UTF8 Text.
this.data = Text.readString(in);
}
}
@Override
public void write(DataOutput out) throws IOException {
boolean isIndirect = isExternal();
out.writeBoolean(isIndirect);
if (isIndirect) {
Text.writeString(out, fileName);
} else {
public void writeInternal(DataOutput out) throws IOException {
Text.writeString(out, data);
}
}
// A pattern matcher which can recognize external CLOB data
// vs. an inline CLOB string.
private static final ThreadLocal<Matcher> EXTERNAL_MATCHER =
new ThreadLocal<Matcher>() {
@Override protected Matcher initialValue() {
Pattern externalPattern = Pattern.compile("externalClob\\((.*)\\)");
return externalPattern.matcher("");
}
};
/**
* Create a ClobRef based on parsed data from a line of text.
@ -185,15 +94,18 @@ public void write(DataOutput out) throws IOException {
* @return a ClobRef to the given data.
*/
public static ClobRef parse(String inputString) {
// If inputString is of the form 'externalClob(%s)', then this is an
// external CLOB stored at the filename indicated by '%s'. Otherwise,
// it is an inline CLOB.
// If inputString is of the form 'externalLob(lf,%s,%d,%d)', then this is
// an external CLOB stored at the LobFile indicated by '%s' with the next
// two arguments representing its offset and length in the file.
// Otherwise, it is an inline CLOB, which we read as-is.
Matcher m = EXTERNAL_MATCHER.get();
m.reset(inputString);
if (m.matches()) {
// Extract the filename component from the string.
return new ClobRef(m.group(1), true);
// This is a LobFile. Extract the filename, offset and len from the
// matcher.
return new ClobRef(m.group(1), Long.valueOf(m.group(2)),
Long.valueOf(m.group(3)));
} else {
// This is inline CLOB string data.
return new ClobRef(inputString);

View File

@ -20,6 +20,8 @@
import java.io.BufferedOutputStream;
import java.io.BufferedWriter;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@ -40,6 +42,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.sqoop.io.LobFile;
/**
* Contains a set of methods which can read db columns from a ResultSet into
@ -51,9 +54,9 @@
* However, its lifetime is limited to the current TaskInputOutputContext's
* life.
*/
public class LargeObjectLoader {
public class LargeObjectLoader implements Closeable {
// Currently, cap BLOB/CLOB objects at 16 MB until we can use external storage.
// Spill to external storage for BLOB/CLOB objects > 16 MB.
public final static long DEFAULT_MAX_LOB_LENGTH = 16 * 1024 * 1024;
public final static String MAX_INLINE_LOB_LEN_KEY =
@ -63,6 +66,10 @@ public class LargeObjectLoader {
private Path workPath;
private FileSystem fs;
// Handles to the open BLOB / CLOB file writers.
private LobFile.Writer curBlobWriter;
private LobFile.Writer curClobWriter;
// Counter that is used with the current task attempt id to
// generate unique LOB file names.
private long nextLobFileId = 0;
@ -77,19 +84,99 @@ public LargeObjectLoader(Configuration conf, Path workPath)
this.conf = conf;
this.workPath = workPath;
this.fs = FileSystem.get(conf);
this.curBlobWriter = null;
this.curClobWriter = null;
}
@Override
protected synchronized void finalize() throws Throwable {
close();
}
@Override
public void close() throws IOException {
if (null != curBlobWriter) {
curBlobWriter.close();
curBlobWriter = null;
}
if (null != curClobWriter) {
curClobWriter.close();
curClobWriter = null;
}
}
/**
* @return a filename to use to put an external LOB in.
*/
private String getNextLobFileName() {
String file = "_lob/obj_" + TaskId.get(conf, "unknown_task_id")
+ nextLobFileId;
String file = "_lob/large_obj_" + TaskId.get(conf, "unknown_task_id")
+ nextLobFileId + ".lob";
nextLobFileId++;
return file;
}
/**
* Calculates a path to a new LobFile object, creating any
* missing directories.
* @return a Path to a LobFile to write
*/
private Path getNextLobFilePath() throws IOException {
Path p = new Path(workPath, getNextLobFileName());
Path parent = p.getParent();
if (!fs.exists(parent)) {
fs.mkdirs(parent);
}
return p;
}
/**
* @return the current LobFile writer for BLOBs, creating one if necessary.
*/
private LobFile.Writer getBlobWriter() throws IOException {
if (null == this.curBlobWriter) {
this.curBlobWriter = LobFile.create(getNextLobFilePath(), conf, false);
}
return this.curBlobWriter;
}
/**
* @return the current LobFile writer for CLOBs, creating one if necessary.
*/
private LobFile.Writer getClobWriter() throws IOException {
if (null == this.curClobWriter) {
this.curClobWriter = LobFile.create(getNextLobFilePath(), conf, true);
}
return this.curClobWriter;
}
/**
* Returns the path being written to by a given LobFile.Writer, relative
* to the working directory of this LargeObjectLoader.
* @param w the LobFile.Writer whose path should be examined.
* @return the path this is writing to, relative to the current working dir.
*/
private String getRelativePath(LobFile.Writer w) {
Path writerPath = w.getPath();
String writerPathStr = writerPath.toString();
String workPathStr = workPath.toString();
if (!workPathStr.endsWith(File.separator)) {
workPathStr = workPathStr + File.separator;
}
if (writerPathStr.startsWith(workPathStr)) {
return writerPathStr.substring(workPathStr.length());
}
// Outside the working dir; return the whole thing.
return writerPathStr;
}
/**
* Copies all character data from the provided Reader to the provided
* Writer. Does not close handles when it's done.
@ -155,27 +242,16 @@ public BlobRef readBlobRef(int colNum, ResultSet r)
return null;
} else if (b.length() > maxInlineLobLen) {
// Deserialize very large BLOBs into separate files.
String fileName = getNextLobFileName();
Path p = new Path(workPath, fileName);
long len = b.length();
LobFile.Writer lobWriter = getBlobWriter();
Path parent = p.getParent();
if (!fs.exists(parent)) {
fs.mkdirs(parent);
}
BufferedOutputStream bos = null;
long recordOffset = lobWriter.tell();
InputStream is = null;
OutputStream os = fs.create(p);
OutputStream os = lobWriter.writeBlobRecord(len);
try {
bos = new BufferedOutputStream(os);
is = b.getBinaryStream();
copyAll(is, bos);
copyAll(is, os);
} finally {
if (null != bos) {
bos.close();
os = null; // os is now closed.
}
if (null != os) {
os.close();
}
@ -183,9 +259,12 @@ public BlobRef readBlobRef(int colNum, ResultSet r)
if (null != is) {
is.close();
}
// Mark the record as finished.
lobWriter.finishRecord();
}
return new BlobRef(fileName);
return new BlobRef(getRelativePath(curBlobWriter), recordOffset, len);
} else {
// This is a 1-based array.
return new BlobRef(b.getBytes(1, (int) b.length()));
@ -215,37 +294,29 @@ public ClobRef readClobRef(int colNum, ResultSet r)
return null;
} else if (c.length() > maxInlineLobLen) {
// Deserialize large CLOB into separate file.
String fileName = getNextLobFileName();
Path p = new Path(workPath, fileName);
long len = c.length();
LobFile.Writer lobWriter = getClobWriter();
Path parent = p.getParent();
if (!fs.exists(parent)) {
fs.mkdirs(parent);
}
BufferedWriter w = null;
long recordOffset = lobWriter.tell();
Reader reader = null;
OutputStream os = fs.create(p);
Writer w = lobWriter.writeClobRecord(len);
try {
w = new BufferedWriter(new OutputStreamWriter(os));
reader = c.getCharacterStream();
copyAll(reader, w);
} finally {
if (null != w) {
w.close();
os = null; // os is now closed.
}
if (null != os) {
os.close();
}
if (null != reader) {
reader.close();
}
// Mark the record as finished.
lobWriter.finishRecord();
}
return new ClobRef(fileName, true);
return new ClobRef(getRelativePath(lobWriter), recordOffset, len);
} else {
// This is a 1-based array.
return new ClobRef(c.getSubString(1, (int) c.length()));

View File

@ -0,0 +1,295 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.sqoop.lib;
import java.io.Closeable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.InputStream;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.sqoop.io.LobFile;
import org.apache.hadoop.sqoop.io.LobReaderCache;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Abstract base class that holds a reference to a Blob or a Clob.
* DATATYPE is the type being held (e.g., a byte array).
* CONTAINERTYPE is the type used to hold this data (e.g., BytesWritable).
* ACCESSORTYPE is the type used to access this data in a streaming fashion
* (either an InputStream or a Reader).
*/
public abstract class LobRef<DATATYPE, CONTAINERTYPE, ACCESSORTYPE>
implements Closeable, Writable {
public static final Log LOG = LogFactory.getLog(LobRef.class.getName());
protected LobRef() {
this.fileName = null;
this.offset = 0;
this.length = 0;
this.data = null;
}
protected LobRef(CONTAINERTYPE container) {
this.fileName = null;
this.offset = 0;
this.length = 0;
this.data = container;
}
protected LobRef(String file, long offset, long length) {
this.fileName = file;
this.offset = offset;
this.length = length;
this.data = null;
}
// If the data is 'small', it's held directly, here.
protected CONTAINERTYPE data;
// If there data is too large to materialize fully, it's written into a file
// whose path (relative to the rest of the dataset) is recorded here. This
// takes precedence if the value fof fileName is non-null. These records are
// currently written into LobFile-formatted files, which hold multiple
// records. The starting offset and length of the record are recorded here
// as well.
private String fileName;
private long offset;
private long length;
// If we've opened a LobFile object, track our reference to it here.
protected LobFile.Reader reader;
@Override
protected synchronized void finalize() throws Throwable {
close();
}
public void close() throws IOException {
// Discard any open LobReader.
if (null != this.reader) {
LobReaderCache.getCache().recycle(this.reader);
}
}
/**
* @return true if the LOB data is in an external file; false if
* it materialized inline.
*/
public boolean isExternal() {
return fileName != null;
}
/**
* Convenience method to access #getDataStream(Configuration, Path)
* from within a map task that read this LobRef from a file-based
* InputSplit.
* @param mapContext the Mapper.Context instance that encapsulates
* the current map task.
* @return an object that lazily streams the record to the client.
* @throws IllegalArgumentException if it cannot find the source
* path for this LOB based on the MapContext.
* @throws IOException if it could not read the LOB from external storage.
*/
public ACCESSORTYPE getDataStream(Mapper.Context mapContext)
throws IllegalArgumentException, IOException {
InputSplit split = mapContext.getInputSplit();
if (split instanceof FileSplit) {
Path basePath = ((FileSplit) split).getPath().getParent();
return getDataStream(mapContext.getConfiguration(),
basePath);
} else {
throw new IllegalArgumentException(
"Could not ascertain LOB base path from MapContext.");
}
}
/**
* Get access to the LOB data itself.
* This method returns a lazy reader of the LOB data, accessing the
* filesystem for external LOB storage as necessary.
* @param conf the Configuration used to access the filesystem
* @param basePath the base directory where the table records are
* stored.
* @return an object that lazily streams the record to the client.
* @throws IOException if it could not read the LOB from external storage.
*/
public ACCESSORTYPE getDataStream(Configuration conf, Path basePath)
throws IOException {
if (isExternal()) {
// Read from external storage.
Path pathToRead = LobReaderCache.qualify(
new Path(basePath, fileName), conf);
LOG.debug("Retreving data stream from external path: " + pathToRead);
if (reader != null) {
// We already have a reader open to a LobFile. Is it the correct file?
if (!pathToRead.equals(reader.getPath())) {
// No. Close this reader and get the correct one.
LOG.debug("Releasing previous external reader for "
+ reader.getPath());
LobReaderCache.getCache().recycle(reader);
reader = LobReaderCache.getCache().get(pathToRead, conf);
}
} else {
reader = LobReaderCache.getCache().get(pathToRead, conf);
}
// We now have a LobFile.Reader associated with the correct file. Get to
// the correct offset and return an InputStream/Reader to the user.
if (reader.tell() != offset) {
LOG.debug("Seeking to record start offset " + offset);
reader.seek(offset);
}
if (!reader.next()) {
throw new IOException("Could not locate record at " + pathToRead
+ ":" + offset);
}
return getExternalSource(reader);
} else {
// This data is already materialized in memory; wrap it and return.
return getInternalSource(data);
}
}
/**
* Using the LobFile reader, get an accessor InputStream or Reader to the
* underlying data.
*/
protected abstract ACCESSORTYPE getExternalSource(LobFile.Reader reader)
throws IOException;
/**
* Wrap the materialized data in an InputStream or Reader.
*/
protected abstract ACCESSORTYPE getInternalSource(CONTAINERTYPE data);
/**
* @return the materialized data itself.
*/
protected abstract DATATYPE getInternalData(CONTAINERTYPE data);
public DATATYPE getData() {
if (isExternal()) {
throw new RuntimeException(
"External LOBs must be read via getDataStream()");
}
return getInternalData(data);
}
@Override
public String toString() {
if (isExternal()) {
return "externalLob(lf," + fileName + "," + Long.toString(offset)
+ "," + Long.toString(length) + ")";
} else {
return data.toString();
}
}
@Override
public void readFields(DataInput in) throws IOException {
// The serialization format for this object is:
// boolean isExternal
// if true, then:
// a string identifying the external storage type
// and external-storage-specific data.
// if false, then we use readFieldsInternal() to allow BlobRef/ClobRef
// to serialize as it sees fit.
//
// Currently the only external storage supported is LobFile, identified
// by the string "lf". This serializes with the filename (as a string),
// followed by a long-valued offset and a long-valued length.
boolean isExternal = in.readBoolean();
if (isExternal) {
this.data = null;
String storageType = Text.readString(in);
if (!storageType.equals("lf")) {
throw new IOException("Unsupported external LOB storage code: "
+ storageType);
}
// Storage type "lf" is LobFile: filename, offset, length.
this.fileName = Text.readString(in);
this.offset = in.readLong();
this.length = in.readLong();
} else {
readFieldsInternal(in);
this.fileName = null;
this.offset = 0;
this.length = 0;
}
}
/**
* Perform the readFields() operation on a fully-materializable record.
* @param in the DataInput to deserialize from.
*/
protected abstract void readFieldsInternal(DataInput in) throws IOException;
@Override
public void write(DataOutput out) throws IOException {
out.writeBoolean(isExternal());
if (isExternal()) {
Text.writeString(out, "lf"); // storage type "lf" for LobFile.
Text.writeString(out, fileName);
out.writeLong(offset);
out.writeLong(length);
} else {
writeInternal(out);
}
}
/**
* Perform the write() operation on a fully-materializable record.
* @param out the DataOutput to deserialize to.
*/
protected abstract void writeInternal(DataOutput out) throws IOException;
protected static final ThreadLocal<Matcher> EXTERNAL_MATCHER =
new ThreadLocal<Matcher>() {
@Override protected Matcher initialValue() {
Pattern externalPattern = Pattern.compile(
"externalLob\\(lf,(.*),([0-9]+),([0-9]+)\\)");
return externalPattern.matcher("");
}
};
}

View File

@ -33,18 +33,33 @@
public class SequenceFileImportMapper
extends AutoProgressMapper<LongWritable, SqoopRecord, LongWritable, SqoopRecord> {
private LargeObjectLoader lobLoader;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
this.lobLoader = new LargeObjectLoader(context.getConfiguration(),
FileOutputFormat.getWorkOutputPath(context));
}
@Override
public void map(LongWritable key, SqoopRecord val, Context context)
throws IOException, InterruptedException {
try {
// Loading of LOBs was delayed until we have a Context.
val.loadLargeObjects(new LargeObjectLoader(context.getConfiguration(),
FileOutputFormat.getWorkOutputPath(context)));
val.loadLargeObjects(lobLoader);
} catch (SQLException sqlE) {
throw new IOException(sqlE);
}
context.write(key, val);
}
@Override
protected void cleanup(Context context) throws IOException {
if (null != lobLoader) {
lobLoader.close();
}
}
}

View File

@ -36,18 +36,25 @@ public class TextImportMapper
extends AutoProgressMapper<LongWritable, SqoopRecord, Text, NullWritable> {
private Text outkey;
private LargeObjectLoader lobLoader;
public TextImportMapper() {
outkey = new Text();
}
@Override
protected void setup(Context context) throws IOException, InterruptedException {
this.lobLoader = new LargeObjectLoader(context.getConfiguration(),
FileOutputFormat.getWorkOutputPath(context));
}
@Override
public void map(LongWritable key, SqoopRecord val, Context context)
throws IOException, InterruptedException {
try {
// Loading of LOBs was delayed until we have a Context.
val.loadLargeObjects(new LargeObjectLoader(context.getConfiguration(),
FileOutputFormat.getWorkOutputPath(context)));
val.loadLargeObjects(lobLoader);
} catch (SQLException sqlE) {
throw new IOException(sqlE);
}
@ -55,5 +62,12 @@ public void map(LongWritable key, SqoopRecord val, Context context)
outkey.set(val.toString());
context.write(outkey, NullWritable.get());
}
@Override
protected void cleanup(Context context) throws IOException {
if (null != lobLoader) {
lobLoader.close();
}
}
}

View File

@ -0,0 +1,112 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.sqoop.io.*;
/**
* A simple benchmark to performance test LobFile reader/writer speed.
* Writes out 10 GB of data to the local disk and then reads it back.
* Run with:
* HADOOP_OPTS=-agentlib:hprof=cpu=samples src/scripts/run-perftest.sh LobFilePerfTest
*/
public class LobFilePerfTest {
long recordLen = 20 * 1024 * 1024; // 20 MB records
int numRecords = 500;
Configuration conf;
Path p;
long startTime;
byte [] record;
public LobFilePerfTest() {
conf = new Configuration();
conf.set("fs.default.name", "file:///");
p = new Path("foo.lob");
}
private void startTiming(String s) {
System.out.println(s);
startTime = System.currentTimeMillis();
}
private void stopTiming() {
long finishTime = System.currentTimeMillis();
long delta = finishTime - startTime;
System.out.println("Finished. Time elapsed: " + delta);
}
private void makeRecordBody() {
startTiming("Allocating record");
record = new byte[(int)recordLen];
for (long i = 0; i < recordLen; i++) {
record[(int)i] = (byte) (i % 256);
}
stopTiming();
}
private void writeFile() throws Exception {
startTiming("Writing " + numRecords + " records to lob file");
LobFile.Writer w = LobFile.create(p, conf);
for (int i = 0; i < numRecords; i++) {
OutputStream out = w.writeBlobRecord(recordLen);
out.write(record);
out.close();
w.finishRecord();
}
w.close();
stopTiming();
}
private void readFile() throws Exception {
startTiming("Reading from lob file");
LobFile.Reader r = LobFile.open(p, conf);
int receivedRecs = 0;
byte [] inputBuffer = new byte[4096];
long recordSize = 0;
while (r.next()) {
receivedRecs++;
InputStream in = r.readBlobRecord();
while (true) {
int thisRead = in.read(inputBuffer);
if (-1 == thisRead) {
break;
}
recordSize += (long) thisRead;
}
}
r.close();
stopTiming();
System.out.println("Got " + receivedRecs + " records");
System.out.println("Read " + recordSize + " bytes");
}
private void run() throws Exception {
makeRecordBody();
writeFile();
readFile();
}
public static void main(String [] args) throws Exception {
LobFilePerfTest test = new LobFilePerfTest();
test.run();
}
}

View File

@ -0,0 +1,391 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.*;
import java.util.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.sqoop.io.*;
/**
* Stress test LobFiles by writing a bunch of different files and reading
* them back in various ways.
* Run with: src/scripts/run-perftest.sh LobFileStressTest
*/
public class LobFileStressTest {
// Big records in testBigFile() are 5 GB each.
public final static long LARGE_RECORD_LEN = 5L * 1024L * 1024L * 1024L;
int numRandomTrials = 1000000;
Configuration conf;
boolean allPassed;
long lastCompressPos; // start offset of the last record in the file.
long lastRawPos;
public LobFileStressTest() {
conf = new Configuration();
conf.set("fs.default.name", "file:///");
allPassed = true;
}
private Path getPath(boolean compress) {
if (compress) {
return new Path("compressed.lob");
} else {
return new Path("integers.lob");
}
}
private long getLastRecordPos(boolean compress) {
if (compress) {
return lastCompressPos;
} else {
return lastRawPos;
}
}
private void setLastRecordPos(long pos, boolean compress) {
if (compress) {
lastCompressPos = pos;
} else {
lastRawPos = pos;
}
}
private int getNumRecords(boolean compress) {
if (!compress) {
return 40000000; // 40 million.
} else {
return 5000000; // 5 million; the compressor is just too slow for 40M.
}
}
private void writeIntegerFile(boolean compress) throws Exception {
boolean passed = false;
try {
System.out.print("Writing integers file. compress=" + compress + ". ");
Path p = getPath(compress);
FileSystem fs = FileSystem.getLocal(conf);
if (fs.exists(p)) {
fs.delete(p, false);
}
String codecName = compress ? "deflate" : null;
LobFile.Writer w = LobFile.create(p, conf, false, codecName);
int numRecords = getNumRecords(compress);
for (int i = 0; i < numRecords; i++) {
setLastRecordPos(w.tell(), compress);
OutputStream os = w.writeBlobRecord(0);
DataOutputStream dos = new DataOutputStream(os);
dos.writeInt(i);
dos.close();
os.close();
}
w.close();
System.out.println("PASS");
passed = true;
} finally {
if (!passed) {
allPassed = false;
System.out.println("FAIL");
}
}
}
private void testSequentialScan(boolean compress) throws Exception {
// Write a LobFile containing several million integers
// and read the entire resulting file, verify that they
// all appear in order.
boolean success = false;
System.out.print("Testing sequential scan. compress=" + compress + ". ");
try {
LobFile.Reader r = LobFile.open(getPath(compress), conf);
int numRecords = getNumRecords(compress);
for (int i = 0; i < numRecords; i++) {
r.next();
if (!r.isRecordAvailable()) {
throw new Exception("File ended early; i=" + i);
}
long id = r.getRecordId();
if (id != i) {
throw new Exception("record id mismatch; expected " + i
+ " got " + id);
}
InputStream is = r.readBlobRecord();
DataInputStream dis = new DataInputStream(is);
int val = dis.readInt();
if (val != i) {
throw new Exception("Unexpected value in stream; expected " + i
+ " got " + val);
}
try {
// Make sure we can't read additional data from the stream.
byte b = dis.readByte();
throw new Exception("Got unexpected extra byte : " + b);
} catch (EOFException eof) {
// Expected this. Ignore.
}
dis.close();
}
if (r.next()) {
// We should have finished reading everything already.
throw new Exception("Additional record was waiting at end of file");
}
r.close();
System.out.println("PASS");
success = true;
} finally {
if (!success) {
allPassed = false;
System.out.println("FAIL");
}
}
}
private void testSmallSeeks(boolean compress) throws Exception {
// Write a LobFile containing several million integers
// and seek to randomly selected records. Make sure we can
// access each one.
boolean success = false;
long seed = System.currentTimeMillis();
long lastRecordPos = getLastRecordPos(compress);
System.out.print("Testing random seeks. compress=" + compress + ". "
+ ". seed=" + seed + ". lastPos=" + lastRecordPos +". ");
try {
LobFile.Reader r = LobFile.open(getPath(compress), conf);
Random rnd = new Random(seed);
long curRecord = -1; // current position starts before record 0.
int numRecords = getNumRecords(compress);
for (int i = 0; i < numRandomTrials; i++) {
if (rnd.nextInt(100) < 20 && curRecord != numRecords - 1) {
// Sequentially access the next record.
if (!r.next()) {
throw new Exception("cur record is " + curRecord
+ " but sequential next() failed!");
}
curRecord++;
} else {
// Randomly find a record.
long targetPos = rnd.nextLong() % (lastRecordPos + 1);
r.seek(targetPos);
if (!r.next()) {
throw new Exception("Could not seek to record starting at "
+ targetPos);
}
curRecord = r.getRecordId();
if (curRecord < 0 || curRecord >= numRecords) {
throw new Exception("Unexpected record id " + curRecord);
}
}
// In either case, read the record and verify the data's correct.
InputStream is = r.readBlobRecord();
DataInputStream dis = new DataInputStream(is);
int val = dis.readInt();
if (val != curRecord) {
throw new Exception("Unexpected value in stream; expected "
+ curRecord + " got " + val);
}
try {
// Make sure we can't read additional data from the stream.
byte b = dis.readByte();
throw new Exception("Got unexpected extra byte : " + b);
} catch (EOFException eof) {
// Expected this. Ignore.
}
dis.close();
}
r.close();
System.out.println("PASS");
success = true;
} finally {
if (!success) {
allPassed = false;
System.out.println("FAIL");
}
}
}
private Path getBigFilePath(boolean compress) {
if (compress) {
return new Path("big-compressed.lob");
} else {
return new Path("big.lob");
}
}
/**
* Check that the bytes in buf are of the form 0 1 2 .. 254 255 0 1 2..
* but offset by the value in 'firstByte'. Returns the next byte value
* we would expect in an adjacent follow-up array of bytes.
*/
private byte checkBuf(byte [] buf, int len, byte firstByte) throws Exception {
byte b = firstByte;
for (int i = 0; i < len; i++) {
if (buf[i] != b++) {
throw new Exception("Expected value " + b + " but got " + buf[i]);
}
}
return b;
}
/**
* Check that a big record (from testBigFile) matches its expected
* contract.
* The reader should be positioned at the beginning of the record
* with id 'expectedId'.
*/
private void checkBigRecord(LobFile.Reader r, long expectedId)
throws Exception {
if (!r.isRecordAvailable()) {
throw new Exception("No record available");
}
if (r.getRecordId() != expectedId) {
throw new Exception("Expected record id=" + expectedId
+ "; got " + r.getRecordId());
}
// Read all the data in this record, check that it is the data
// we expect. The bytes in each record increase 0 1 2 .. 254 255 0 1 2..
// but each record's start byte is offset by its record id.
long received = 0;
InputStream is = r.readBlobRecord();
long expected = LARGE_RECORD_LEN;
final int BUF_SIZE = 16384;
byte [] buf = new byte[BUF_SIZE];
byte last = (byte) r.getRecordId();
while (expected > 0) {
int thisRead = is.read(buf);
if (thisRead == -1) {
break;
}
last = checkBuf(buf, thisRead, last);
expected -= thisRead;
}
if (expected > 0) {
throw new Exception("Couldn't read all the data! expected "
+ expected + " more bytes");
}
if (is.read() != -1) {
throw new Exception("Got an extra byte! Expected no more data.");
}
}
private void testBigFile(boolean compress) throws Exception {
// Write a file containing 5 GB records.
final int NUM_RECORDS = 5;
boolean passed = false;
try {
System.out.print("Testing large file operations. compress="
+ compress + ". ");
Path p = getBigFilePath(compress);
long startOffsets [] = new long[NUM_RECORDS];
// Write the file. Five records, 5 GB a piece.
System.out.print("Testing write. ");
FileSystem fs = FileSystem.getLocal(conf);
if (fs.exists(p)) {
fs.delete(p, false);
}
String codecName = compress ? "deflate" : null;
System.out.println("record size: " + LARGE_RECORD_LEN);
LobFile.Writer w = LobFile.create(p, conf, false, codecName);
for (int i = 0; i < NUM_RECORDS; i++) {
startOffsets[i] = w.tell();
System.out.println("Starting record " + i + " at " + startOffsets[i]);
OutputStream os = w.writeBlobRecord(0);
for (long v = 0; v < LARGE_RECORD_LEN; v++) {
long byteVal = (((long) i) + v) & 0xFF;
os.write((int) byteVal);
}
os.close();
}
w.close();
System.out.println("PASS");
// Iterate past three records, read the fourth.
System.out.print("Testing iterated skipping. ");
LobFile.Reader r = LobFile.open(p, conf);
for (int i = 0; i < 4; i++) {
r.next();
}
checkBigRecord(r, 3);
System.out.println("PASS");
// Seek directly to record 2, read it through.
System.out.print("Testing large backward seek. ");
r.seek(startOffsets[2]);
r.next();
checkBigRecord(r, 2);
System.out.println("PASS");
passed = true;
} finally {
if (!passed) {
allPassed = false;
System.out.println("FAIL");
}
}
}
private void run() throws Exception {
writeIntegerFile(true);
writeIntegerFile(false);
testSequentialScan(false);
testSmallSeeks(false);
testSequentialScan(true);
testSmallSeeks(true);
testBigFile(false);
testBigFile(true);
if (allPassed) {
System.out.println("Tests passed.");
} else {
System.out.println("All tests did not pass!");
System.exit(1);
}
}
public static void main(String [] args) throws Exception {
LobFileStressTest test = new LobFileStressTest();
test.run();
}
}

View File

@ -0,0 +1,40 @@
Licensed to Cloudera, Inc. under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
Cloudera, Inc. licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
= Performance and Stress Tests
The files in this directory represent performance or stress tests of aspects
of Sqoop. These are intended to be run by a developer as a part of a lengthy
QA process but are not the "primary" tests which are expected to be run for
every build.
== Compiling
To compile the tests in this directory, run 'ant jar compile-perf-test' in the
project root.
== Running
After compiling the performance tests, you can run them with:
$ src/scripts/run-perftest.sh <PerfTestClassName> [<args...>]
e.g.:
$ src/scripts/run-perftest.sh LobFileStressTest

39
src/scripts/run-perftest.sh Executable file
View File

@ -0,0 +1,39 @@
#!/bin/bash
#
# Licensed to Cloudera, Inc. under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# Cloudera, Inc. licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# run-perftest.sh
# USAGE:
# ./run-perftest.sh PerfTestClassName [arg arg arg...]
#
# This script will run one of the classes from src/perftest/ with the
# correct classpath set up.
bin=`dirname $0`
bin=`cd ${bin} && pwd`
# This is run in src/scripts/
SQOOP_HOME="${bin}/../../"
# Set up environment and classpath
source ${SQOOP_HOME}/bin/configure-sqoop
PERFTEST_CLASSES=${SQOOP_HOME}/build/perftest/classes
export HADOOP_CLASSPATH=${PERFTEST_CLASSES}:${SQOOP_JAR}:${HADOOP_CLASSPATH}
${HADOOP_HOME}/bin/hadoop "$@"

View File

@ -20,6 +20,7 @@
import org.apache.hadoop.sqoop.hive.TestHiveImport;
import org.apache.hadoop.sqoop.hive.TestTableDefWriter;
import org.apache.hadoop.sqoop.io.TestLobFile;
import org.apache.hadoop.sqoop.io.TestSplittableBufferedWriter;
import org.apache.hadoop.sqoop.lib.TestFieldFormatter;
import org.apache.hadoop.sqoop.lib.TestRecordParser;
@ -68,6 +69,7 @@ public static Test suite() {
suite.addTestSuite(TestClobRef.class);
suite.addTestSuite(TestLargeObjectLoader.class);
suite.addTestSuite(TestDirectImportUtils.class);
suite.addTestSuite(TestLobFile.class);
suite.addTest(MapreduceTests.suite());
return suite;

View File

@ -0,0 +1,577 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.sqoop.io;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Reader;
import java.io.Writer;
import java.nio.CharBuffer;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
/**
* Test the LobFile reader/writer implementation.
*/
public class TestLobFile extends TestCase {
public static final Log LOG = LogFactory.getLog(
TestLobFile.class.getName());
public static final Path TEMP_BASE_DIR;
static {
String tmpDir = System.getProperty("test.build.data", "/tmp/");
if (!tmpDir.endsWith(File.separator)) {
tmpDir = tmpDir + File.separator;
}
TEMP_BASE_DIR = new Path(new Path(tmpDir), "lobtest");
}
private Configuration conf;
private FileSystem fs;
public void setUp() throws Exception {
conf = new Configuration();
conf.set("fs.default.name", "file:///");
fs = FileSystem.getLocal(conf);
fs.mkdirs(TEMP_BASE_DIR);
}
private long [] writeClobFile(Path p, String codec,
String... records) throws Exception {
if (fs.exists(p)) {
fs.delete(p, false);
}
// memorize the offsets of each record we write.
long [] offsets = new long[records.length];
// Create files with four entries per index segment.
LobFile.Writer writer = LobFile.create(p, conf, true, codec, 4);
int i = 0;
for (String r : records) {
offsets[i++] = writer.tell();
Writer w = writer.writeClobRecord(r.length());
w.write(r);
w.close();
}
writer.close();
return offsets;
}
private void verifyClobFile(Path p, String... expectedRecords) throws Exception {
LobFile.Reader reader = LobFile.open(p, conf);
int recNum = 0;
while (reader.next()) {
// We should have a record of the same length as the expected one.
String expected = expectedRecords[recNum];
assertTrue(reader.isRecordAvailable());
assertEquals(expected.length(), reader.getRecordLen());
Reader r = reader.readClobRecord();
// Read in the record and assert that we got enough characters out.
CharBuffer buf = CharBuffer.allocate(expected.length());
int bytesRead = 0;
while (bytesRead < expected.length()) {
int thisRead = r.read(buf);
LOG.info("performed read of " + thisRead + " chars");
if (-1 == thisRead) {
break;
}
bytesRead += thisRead;
}
LOG.info("Got record of " + bytesRead + " chars");
assertEquals(expected.length(), bytesRead);
char [] charData = buf.array();
String finalRecord = new String(charData);
assertEquals(expected, finalRecord);
recNum++;
}
// Check that we got everything.
assertEquals(expectedRecords.length, recNum);
reader.close();
try {
reader.next();
fail("Expected IOException calling next after close");
} catch (IOException ioe) {
// expected this.
}
// A second close shouldn't hurt anything. This should be a no-op.
reader.close();
}
private void runClobFileTest(Path p, String codec,
String... records) throws Exception {
writeClobFile(p, codec, records);
verifyClobFile(p, records);
fs.delete(p, false);
}
public void testEmptyRecord() throws Exception {
runClobFileTest(new Path(TEMP_BASE_DIR, "empty.lob"), null);
}
public void testSingleRecord() throws Exception {
runClobFileTest(new Path(TEMP_BASE_DIR, "single.lob"),
null, "this is a single record!");
}
public void testMultiRecords() throws Exception {
runClobFileTest(new Path(TEMP_BASE_DIR, "multi.lob"),
CodecMap.NONE,
"this is the first record",
"this is the second record. I assure you that this record is long.",
"yet one more record graces this file.");
}
public void testMultiIndexSegments() throws Exception {
// Test that we can use multiple IndexSegments.
runClobFileTest(new Path(TEMP_BASE_DIR, "multi-index.lob"),
null,
"this is the first record",
"this is the second record. I assure you that this record is long.",
"record number three",
"last one in first index segment",
"first in the second index segment",
"yet one more record graces this file.");
}
/**
* Run a test where we read only a fraction of the first record,
* but then read the second record completely. Verify that we
* can re-align on a record boundary correctly. This test requires
* at least 3 records.
* @param p the path to the file to create.
* @param firstLine the first line of the first reord
* @param records All of the records to write to the file.
*/
private void runLineAndRecordTest(Path p, String firstLine,
String... records) throws Exception {
assertTrue("This test requires 3+ records", records.length > 2);
writeClobFile(p, null, records);
LobFile.Reader reader = LobFile.open(p, conf);
// We should not yet be aligned.
assertFalse(reader.isRecordAvailable());
assertTrue(reader.next());
// Now we should be.
assertTrue(reader.isRecordAvailable());
// Read just one line from the record.
Reader r = reader.readClobRecord();
BufferedReader br = new BufferedReader(r);
String line = br.readLine();
assertEquals(firstLine, line);
br.close();
r.close();
// We should no longer be aligned on a record start.
assertFalse(reader.isRecordAvailable());
// We should now be able to get to record two.
assertTrue(reader.next());
// This should be nicely aligned even if the first record was not
// completely consumed by a client.
r = reader.readClobRecord();
CharBuffer buf = CharBuffer.allocate(records[1].length());
r.read(buf);
r.close();
char [] chars = buf.array();
String s = new String(chars);
assertEquals(records[1], s);
// Close the reader before we consume the entire file.
reader.close();
assertFalse(reader.isRecordAvailable());
}
public void testVeryShortRead() throws Exception {
// Read only a small fraction of a record, ensure that we can
// read the next record, even when we've left more than a 16-byte
// quantity in the readahead buffer.
Path p = new Path(TEMP_BASE_DIR, "shortread.lob");
final String firstLine = "line1";
final String secondLine =
"This contains much more in the record than just one line.";
final String record2 = "here is the second record.";
final String record3 = "The 3rd record, which we won't actually read.";
runLineAndRecordTest(p, firstLine,
firstLine + "\n" + secondLine,
record2,
record3);
}
public void testIncompleteOverread() throws Exception {
// Read most of the first record so that we partially consume the
// next record start mark; make sure we realign properly.
Path p = new Path(TEMP_BASE_DIR, "longread.lob");
final String firstLine = "this is a really long line of text to read!";
final String secondLine = "not this.";
final String record2 = "Here is yet another record to verify.";
final String record3 = "Nobody cares about record 3.";
runLineAndRecordTest(p, firstLine,
firstLine + "\n" + secondLine,
record2,
record3);
}
public void testSeekToRecord() throws Exception {
// Seek past the first two records and read the third.
Path p = new Path(TEMP_BASE_DIR, "seek.lob");
String [] records = {
"this is the first record!",
"here comes record number two. It is a bit longer.",
"this is the third record. we can read it."
};
// Write the file and memorize when the third record starts.
LobFile.Writer writer = LobFile.create(p, conf, true);
int recNum = 0;
long rec3Start = 0;
for (String r : records) {
Writer w = writer.writeClobRecord(r.length());
w.write(r);
w.close();
writer.finishRecord();
if (recNum == 1) {
rec3Start = writer.tell();
LOG.info("Record three start: " + rec3Start);
}
recNum++;
}
writer.close();
// Now reopen the file for read, seek to the third record, and get it.
LobFile.Reader reader = LobFile.open(p, conf);
reader.seek(rec3Start);
assertTrue(reader.next());
assertTrue(reader.isRecordAvailable());
assertEquals(rec3Start, reader.getRecordOffset());
Reader r = reader.readClobRecord();
CharBuffer buf = CharBuffer.allocate(records[2].length());
r.read(buf);
r.close();
char [] chars = buf.array();
String s = new String(chars);
assertEquals(records[2], s);
r.close();
reader.close();
}
/** Verifies that the next record in the LobFile is the expected one. */
private void verifyNextRecord(LobFile.Reader reader, long expectedId,
String expectedRecord) throws Exception {
assertTrue(reader.next());
assertTrue(reader.isRecordAvailable());
assertEquals(expectedId, reader.getRecordId());
Reader r = reader.readClobRecord();
CharBuffer buf = CharBuffer.allocate(expectedRecord.length());
int bytesRead = 0;
while (bytesRead < expectedRecord.length()) {
int thisRead = r.read(buf);
if (-1 == thisRead) {
break;
}
bytesRead += thisRead;
}
LOG.info("Got record of " + bytesRead + " chars");
assertEquals(expectedRecord.length(), bytesRead);
char [] charData = buf.array();
String finalRecord = new String(charData);
assertEquals(expectedRecord, finalRecord);
}
public void testManySeeks() throws Exception {
// Test that we can do gymnastics with seeking between records.
Path p = new Path(TEMP_BASE_DIR, "manyseeks.lob");
String [] records = {
"first record",
"second record",
"the third record",
"rec4 is the last in IndexSeg 0",
"rec5 is first in IndexSeg 1",
"rec6 is yet another record",
"rec7 is starting to feel boring",
"rec8 is at the end of seg 1",
"rec9 is all by itself in seg 2"
};
// Write the records to a file, save their offsets.
long [] offsets = writeClobFile(p, null, records);
// Sanity check that we can stream the file.
verifyClobFile(p, records);
// Open a handle to the file.
LobFile.Reader reader = LobFile.open(p, conf);
// Seeking to offset 0 should return the first record.
reader.seek(0);
verifyNextRecord(reader, 0, records[0]);
// Seek to the last item in the first IndexSegment.
reader.seek(offsets[3]);
verifyNextRecord(reader, 3, records[3]);
// Seek to just ahead of that same record.
reader.seek(offsets[3] - 10);
verifyNextRecord(reader, 3, records[3]);
// Seek (backwards) to the first record.
reader.seek(offsets[0]);
verifyNextRecord(reader, 0, records[0]);
// Seek to first record in second IndexSegment.
reader.seek(offsets[4]);
verifyNextRecord(reader, 4, records[4]);
// Move backwards.
reader.seek(0);
// Seek to "no man's land" between last offset in first IndexSeg
// and the first offset in second IndexSegment. Result should be
// the first record in second InexSegment.
reader.seek(offsets[4] - 10);
verifyNextRecord(reader, 4, records[4]);
// Seek to past the last record. No record should be returned.
reader.seek(offsets[8] + 4);
assertFalse("Found a record past last record start.", reader.next());
// Seek to somewhere in the middle of IndexSegment 0.
// This should recover just fine.
reader.seek(offsets[2]);
verifyNextRecord(reader, 2, records[2]);
// Seek to last record in IndexSegment 1.
reader.seek(offsets[3] - 1);
verifyNextRecord(reader, 3, records[3]);
// And make sure that iteration picks up naturally from there.
verifyNextRecord(reader, 4, records[4]);
// Seek well past the end of the file. No record should be returned.
reader.seek(50000000);
assertFalse("Found a record past expected end-of-file", reader.next());
// Seek to somewhere in the index.
reader.seek(offsets[8] + 32);
assertFalse("Found a record past beginning of index", reader.next());
// Seek to the last record (exact hit). This is a singleton IndexSegment.
reader.seek(offsets[8]);
verifyNextRecord(reader, 8, records[8]);
// Seek to no-man's-land ahead of last record.
reader.seek(offsets[8] - 3);
verifyNextRecord(reader, 8, records[8]);
reader.close();
}
/**
* Verifies that a record to be read from a lob file has
* as many bytes as we expect, and that the bytes are what we
* expect them to be. Assumes that the bytes are such that
* input[i] == i + offset.
* @param reader the LobFile reader to consume data from
* @param expectedDeclaredLen the size we expect the LobFile to declare
* its record length as.
* @param expectedActualLen the true number of bytes we expect to read in
* the record.
* @param offset the offset amount for each of the elements of the array.
*/
private void verifyBlobRecord(LobFile.Reader reader,
long expectedDeclaredLen, long expectedActualLen,
int offset) throws Exception {
assertTrue(reader.next());
assertTrue(reader.isRecordAvailable());
assertEquals(expectedDeclaredLen, reader.getRecordLen());
InputStream is = reader.readBlobRecord();
byte [] bytes = new byte[(int) expectedActualLen];
int numRead = is.read(bytes);
assertEquals(expectedActualLen, numRead);
for (int i = 0; i < numRead; i++) {
assertEquals(i + offset, (int) bytes[i]);
}
is.close();
}
/**
* Write a binary record to a LobFile. This allows the declared length
* of the record to disagree with the actual length (the actual length
* should be &gt;= the declared length).
* The record written will have values v[i] = i + offset.
* @param writer the LobFile writer to put the record into
* @param declaredLen the length value written into the file itself
* @param actualLen the true number of bytes to write
* @param offset an amount to adjust each record's byte values by.
*/
private void writeBlobRecord(LobFile.Writer writer, long declaredLen,
long actualLen, int offset) throws Exception {
OutputStream os = writer.writeBlobRecord(declaredLen);
for (int i = 0; i < actualLen; i++) {
os.write(i + offset);
}
os.close();
writer.finishRecord();
}
/**
* Verifies a number of records that all have the same declared
* and actual record lengths.
* @param p the path to the LobFile to open
* @param numRecords the number of records to expect
* @param declaredLen the declared length of each record in the file
* @param actualLen the true number of bytes we expect to read per record.
*/
private void verifyBlobRecords(Path p, int numRecords,
long declaredLen, long actualLen) throws Exception {
LobFile.Reader reader = LobFile.open(p, conf);
for (int i = 0; i < numRecords; i++) {
verifyBlobRecord(reader, declaredLen, actualLen, i);
}
assertFalse(reader.next());
reader.close();
}
public void testBinaryRecords() throws Exception {
// Write a BLOB file and read it all back.
final long RECORD_LEN = 32;
final int NUM_RECORDS = 2;
Path p = new Path(TEMP_BASE_DIR, "binary.lob");
LobFile.Writer writer = LobFile.create(p, conf);
for (int i = 0; i < NUM_RECORDS; i++) {
writeBlobRecord(writer, RECORD_LEN, RECORD_LEN, i);
}
writer.close();
// Now check the read-back on those records.
verifyBlobRecords(p, NUM_RECORDS, RECORD_LEN, RECORD_LEN);
}
public void testOverLengthBinaryRecord() throws Exception {
// Write a record with a declared length shorter than the
// actual length, and read it back.
final long ACTUAL_RECORD_LEN = 48;
final long DECLARED_RECORD_LEN = 32;
final int NUM_RECORDS = 2;
Path p = new Path(TEMP_BASE_DIR, "overlength.lob");
LobFile.Writer writer = LobFile.create(p, conf);
for (int i = 0; i < NUM_RECORDS; i++) {
writeBlobRecord(writer, DECLARED_RECORD_LEN, ACTUAL_RECORD_LEN, i);
}
writer.close();
// Now read them back.
verifyBlobRecords(p, NUM_RECORDS, DECLARED_RECORD_LEN, ACTUAL_RECORD_LEN);
}
private void runCompressedTest(String codec) throws Exception {
LOG.info("Testing with codec: " + codec);
Path p = new Path(TEMP_BASE_DIR, "compressed-" + codec + ".lob");
String [] records = {
"this is the first record, It should be compressed a lot!",
"record 2 record 2 record 2 record 2 2 2 2 2 2 2 2 2 2 2 2",
"and a third and a third yes this is the third"
};
runClobFileTest(p, codec, records);
}
public void testCompressedFile() throws Exception {
// Test all the various compression codecs.
// The following values for 'codec' should pass.
runCompressedTest(null);
runCompressedTest(CodecMap.NONE);
runCompressedTest(CodecMap.DEFLATE);
try {
// We expect this to throw UnsupportedCodecException
// because this class is not included in our package.
runCompressedTest(CodecMap.LZO);
fail("Expected unsupported codec exception for lzo");
} catch (UnsupportedCodecException uce) {
// We pass.
LOG.info("Got unsupported codec exception for lzo; expected -- good.");
}
}
}

View File

@ -27,7 +27,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.sqoop.io.LobFile;
/**
* Test that the BlobRef.parse() method does the right thing.
@ -47,13 +47,25 @@ public void testInline() throws IOException {
}
public void testEmptyFile() {
BlobRef r = BlobRef.parse("externalBlob()");
BlobRef r = BlobRef.parse("externalLob()");
assertFalse(r.isExternal());
r = BlobRef.parse("externalLob(lf,,0,0)");
assertTrue(r.isExternal());
assertEquals("externalBlob()", r.toString());
assertEquals("externalLob(lf,,0,0)", r.toString());
}
public void testInlineNearMatch() {
BlobRef r = BlobRef.parse("externalBlob(foo)bar");
BlobRef r = BlobRef.parse("externalLob(foo)bar");
assertFalse(r.isExternal());
r = BlobRef.parse("externalLob(foo)");
assertFalse(r.isExternal());
r = BlobRef.parse("externalLob(lf,foo)");
assertFalse(r.isExternal());
r = BlobRef.parse("externalLob(lf,foo,1,2)x");
assertFalse(r.isExternal());
}
@ -89,7 +101,6 @@ private void doExternalTest(final byte [] DATA, final String FILENAME)
String tmpDir = System.getProperty("test.build.data", "/tmp/");
Path tmpPath = new Path(tmpDir);
Path blobFile = new Path(tmpPath, FILENAME);
// make any necessary parent dirs.
@ -98,14 +109,20 @@ private void doExternalTest(final byte [] DATA, final String FILENAME)
fs.mkdirs(blobParent);
}
OutputStream os = fs.create(blobFile);
LobFile.Writer lw = LobFile.create(blobFile, conf, false);
try {
long off = lw.tell();
long len = DATA.length;
OutputStream os = lw.writeBlobRecord(len);
os.write(DATA, 0, DATA.length);
os.close();
lw.close();
BlobRef blob = BlobRef.parse("externalBlob(" + FILENAME + ")");
String refString = "externalLob(lf," + FILENAME
+ "," + off + "," + len + ")";
BlobRef blob = BlobRef.parse(refString);
assertTrue(blob.isExternal());
assertEquals("externalBlob(" + FILENAME + ")", blob.toString());
assertEquals(refString, blob.toString());
InputStream is = blob.getDataStream(conf, tmpPath);
assertNotNull(is);

View File

@ -27,7 +27,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.sqoop.io.LobFile;
/**
* Test parsing of ClobRef objects.
@ -45,7 +45,7 @@ public void testInline() throws IOException {
assertFalse(r.isExternal());
assertEquals("foo", r.toString());
Reader reader = r.getDataReader(null, null);
Reader reader = r.getDataStream(null, null);
assertNotNull(reader);
char [] buf = new char[4096];
int chars = reader.read(buf, 0, 4096);
@ -56,15 +56,31 @@ public void testInline() throws IOException {
}
public void testEmptyFile() {
ClobRef r = ClobRef.parse("externalClob()");
ClobRef r = ClobRef.parse("externalLob()");
assertFalse(r.isExternal());
assertEquals("externalLob()", r.toString());
r = ClobRef.parse("externalLob(lf,,0,0)");
assertTrue(r.isExternal());
assertEquals("externalClob()", r.toString());
assertEquals("externalLob(lf,,0,0)", r.toString());
}
public void testInlineNearMatch() {
ClobRef r = ClobRef.parse("externalClob(foo)bar");
ClobRef r = ClobRef.parse("externalLob(foo)bar");
assertFalse(r.isExternal());
assertEquals("externalClob(foo)bar", r.toString());
assertEquals("externalLob(foo)bar", r.toString());
r = ClobRef.parse("externalLob(foo)");
assertFalse(r.isExternal());
assertEquals("externalLob(foo)", r.getData());
r = ClobRef.parse("externalLob(lf,foo)");
assertFalse(r.isExternal());
assertEquals("externalLob(lf,foo)", r.getData());
r = ClobRef.parse("externalLob(lf,foo,1,2)x");
assertFalse(r.isExternal());
assertEquals("externalLob(lf,foo,1,2)x", r.getData());
}
public void testExternal() throws IOException {
@ -99,7 +115,6 @@ private void doExternalTest(final String DATA, final String FILENAME)
String tmpDir = System.getProperty("test.build.data", "/tmp/");
Path tmpPath = new Path(tmpDir);
Path clobFile = new Path(tmpPath, FILENAME);
// make any necessary parent dirs.
@ -108,16 +123,21 @@ private void doExternalTest(final String DATA, final String FILENAME)
fs.mkdirs(clobParent);
}
BufferedWriter w = new BufferedWriter(new OutputStreamWriter(
fs.create(clobFile)));
LobFile.Writer lw = LobFile.create(clobFile, conf, true);
try {
long off = lw.tell();
long len = DATA.length();
Writer w = lw.writeClobRecord(len);
w.append(DATA);
w.close();
lw.close();
ClobRef clob = ClobRef.parse("externalClob(" + FILENAME + ")");
String refString = "externalLob(lf," + FILENAME
+ "," + off + "," + len + ")";
ClobRef clob = ClobRef.parse(refString);
assertTrue(clob.isExternal());
assertEquals("externalClob(" + FILENAME + ")", clob.toString());
Reader r = clob.getDataReader(conf, tmpPath);
assertEquals(refString, clob.toString());
Reader r = clob.getDataStream(conf, tmpPath);
assertNotNull(r);
char [] buf = new char[4096];

View File

@ -87,8 +87,9 @@ public void testReadClobRef()
clob = loader.readClobRef(0, resultSet);
assertNotNull(clob);
assertTrue(clob.isExternal());
loader.close();
mapContext.getOutputCommitter().commitTask(mapContext);
Reader r = clob.getDataReader(conf, outDir);
Reader r = clob.getDataStream(conf, outDir);
char [] buf = new char[4096];
int chars = r.read(buf, 0, 4096);
r.close();
@ -115,6 +116,7 @@ public void testReadBlobRef()
blob = loader.readBlobRef(0, resultSet);
assertNotNull(blob);
assertTrue(blob.isExternal());
loader.close();
mapContext.getOutputCommitter().commitTask(mapContext);
InputStream is = blob.getDataStream(conf, outDir);
byte [] buf = new byte[4096];