5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-04 06:41:12 +08:00

Support for CLOB/BLOB data in external files.

CLOB/BLOB data may now be stored in additional files in HDFS which are
accessible through streams if the data cannot be fully materialized in RAM.
Adds tests for external large objects.
Refactored large object loading into the map() method from readFields().

From: Aaron Kimball <aaron@cloudera.com>

git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1149866 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Bayer 2011-07-22 20:03:35 +00:00
parent 32a67749b1
commit bb29ce9492
20 changed files with 2123 additions and 88 deletions

View File

@ -123,6 +123,10 @@ Import control options
When using direct mode, write to multiple files of
approximately _size_ bytes each.
--inline-lob-limit (size)::
When importing LOBs, keep objects inline up to
_size_ bytes.
Export control options
~~~~~~~~~~~~~~~~~~~~~~

View File

@ -34,6 +34,14 @@ Data emitted to HDFS is by default uncompressed. You can instruct
Sqoop to use gzip to compress your data by providing either the
+--compress+ or +-z+ argument (both are equivalent).
Small CLOB and BLOB values will be imported as string-based data inline
with the rest of their containing record. Over a size threshold (by
default, 16 MB per object), these values will not be materialized directly,
inline, and will be written to external files in HDFS; the inline records
will contain pointers to these files. The inline materialization limit can
be controlled with the +--inline-lob-limit+ argument; the limit itself is
specified in bytes.
Using +--verbose+ will instruct Sqoop to print more details about its
operation; this is particularly handy if Sqoop appears to be misbehaving.

View File

@ -30,6 +30,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.sqoop.lib.LargeObjectLoader;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Category;
import org.apache.log4j.Level;
@ -115,6 +116,9 @@ public enum FileLayout {
private boolean useCompression;
private long directSplitSize; // In direct mode, open a new stream every X bytes.
private long maxInlineLobSize; // Max size of an inline LOB; larger LOBs are written
// to external files on disk.
private String exportDir; // HDFS path to read from when performing an export
private char inputFieldDelim;
@ -275,6 +279,8 @@ private void initDefaults(Configuration baseConfiguration) {
this.useCompression = false;
this.directSplitSize = 0;
this.maxInlineLobSize = LargeObjectLoader.DEFAULT_MAX_LOB_LENGTH;
if (null == baseConfiguration) {
this.conf = new Configuration();
} else {
@ -328,6 +334,7 @@ public static void printUsage() {
System.out.println("-z, --compress Enable compression");
System.out.println("--direct-split-size (n) Split the input stream every 'n' bytes");
System.out.println(" when importing in direct mode.");
System.out.println("--inline-lob-limit (n) Set the maximum size for an inline LOB");
System.out.println("");
System.out.println("Export options:");
System.out.println("--export-dir (dir) Export from an HDFS path into a table");
@ -584,6 +591,8 @@ public void parse(String [] args) throws InvalidOptionsException {
this.useCompression = true;
} else if (args[i].equals("--direct-split-size")) {
this.directSplitSize = Long.parseLong(args[++i]);
} else if (args[i].equals("--inline-lob-limit")) {
this.maxInlineLobSize = Long.parseLong(args[++i]);
} else if (args[i].equals("--jar-file")) {
this.existingJarFile = args[++i];
} else if (args[i].equals("--list-databases")) {
@ -1003,6 +1012,13 @@ public long getDirectSplitSize() {
return this.directSplitSize;
}
/**
* @return the max size of a LOB before we spill to a separate file.
*/
public long getInlineLobLimit() {
return this.maxInlineLobSize;
}
public Configuration getConf() {
return conf;
}

View File

@ -74,8 +74,10 @@ public static String toHiveType(int sqlType) {
} else if (sqlType == Types.TIMESTAMP) {
// unfortunate type coercion
return "STRING";
} else if (sqlType == Types.CLOB) {
return "STRING";
} else {
// TODO(aaron): Support BINARY, VARBINARY, LONGVARBINARY, DISTINCT, CLOB,
// TODO(aaron): Support BINARY, VARBINARY, LONGVARBINARY, DISTINCT,
// BLOB, ARRAY, STRUCT, REF, JAVA_OBJECT.
return null;
}

View File

@ -18,12 +18,26 @@
package org.apache.hadoop.sqoop.lib;
import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.InputStream;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.MapContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* BlobRef is a wrapper that holds a Blob either directly, or a
@ -31,13 +45,20 @@
*/
public class BlobRef implements Writable {
public static final Log LOG = LogFactory.getLog(BlobRef.class.getName());
public BlobRef(byte [] bytes) {
this.blobFileNum = 0;
this.fileName = null;
this.data = new BytesWritable(bytes);
}
public BlobRef() {
this.blobFileNum = 0;
this.fileName = null;
this.data = null;
}
public BlobRef(String file) {
this.fileName = file;
this.data = null;
}
@ -45,15 +66,69 @@ public BlobRef() {
private BytesWritable data;
// If there data is too large, it's written into a file
// and the file is numbered; this number is recorded here.
// This takes precedence if this value is positive.
private long blobFileNum;
// whose path (relative to the rest of the dataset) is recorded here.
// This takes precedence if this value is non-null.
private String fileName;
/**
* @return true if the BLOB data is in an external file; false if
* it materialized inline.
*/
public boolean isExternal() {
return fileName != null;
}
/**
* Convenience method to access #getDataStream(Configuration, Path)
* from within a map task that read this BlobRef from a file-based
* InputSplit.
* @param mapContext the Mapper.Context instance that encapsulates
* the current map task.
* @return an InputStream to access the BLOB data.
* @throws IllegalArgumentException if it cannot find the source
* path for this BLOB based on the MapContext.
* @throws IOException if it could not read the BLOB from external storage.
*/
public InputStream getDataStream(MapContext mapContext)
throws IllegalArgumentException, IOException {
InputSplit split = mapContext.getInputSplit();
if (split instanceof FileSplit) {
Path basePath = ((FileSplit) split).getPath().getParent();
return getDataStream(mapContext.getConfiguration(),
basePath);
} else {
throw new IllegalArgumentException(
"Could not ascertain BLOB base path from MapContext.");
}
}
/**
* Get access to the BLOB data itself.
* This method returns an InputStream-based representation of the
* BLOB data, accessing the filesystem for external BLOB storage
* as necessary.
* @param conf the Configuration used to access the filesystem
* @param basePath the base directory where the table records are
* stored.
* @return an InputStream used to read the BLOB data.
* @throws IOException if it could not read the BLOB from external storage.
*/
public InputStream getDataStream(Configuration conf, Path basePath)
throws IOException {
if (isExternal()) {
// use external storage.
FileSystem fs = FileSystem.get(conf);
return fs.open(new Path(basePath, fileName));
} else {
return new ByteArrayInputStream(data.getBytes());
}
}
public byte [] getData() {
if (blobFileNum > 0) {
// We have a numbered file.
// TODO: Implement this.
throw new RuntimeException("Unsupported: Indirect BLOBs are not supported");
if (isExternal()) {
throw new RuntimeException(
"External BLOBs must be read via getDataStream()");
}
return data.getBytes();
@ -61,8 +136,8 @@ public BlobRef() {
@Override
public String toString() {
if (blobFileNum > 0) {
return "indirectBlob(" + blobFileNum + ")";
if (isExternal()) {
return "externalBlob(" + fileName + ")";
} else {
return data.toString();
}
@ -71,32 +146,67 @@ public String toString() {
@Override
public void readFields(DataInput in) throws IOException {
// The serialization format for this object is:
// boolean isIndirect
// if true, the next field is a Long containing blobFileNum
// if false, the next field is String data.
// boolean isExternal
// if true, the next field is a String containing the file name.
// if false, the next field is a BytesWritable containing the
// actual data.
boolean isIndirect = in.readBoolean();
if (isIndirect) {
boolean isExternal = in.readBoolean();
if (isExternal) {
this.data = null;
this.blobFileNum = in.readLong();
this.fileName = Text.readString(in);
} else {
if (null == this.data) {
this.data = new BytesWritable();
}
this.data.readFields(in);
this.blobFileNum = 0;
this.fileName = null;
}
}
@Override
public void write(DataOutput out) throws IOException {
boolean isIndirect = blobFileNum > 0;
out.writeBoolean(isIndirect);
if (isIndirect) {
out.writeLong(blobFileNum);
out.writeBoolean(isExternal());
if (isExternal()) {
Text.writeString(out, fileName);
} else {
data.write(out);
}
}
private static final ThreadLocal<Matcher> EXTERNAL_MATCHER =
new ThreadLocal<Matcher>() {
@Override protected Matcher initialValue() {
Pattern externalPattern = Pattern.compile("externalBlob\\((.*)\\)");
return externalPattern.matcher("");
}
};
/**
* Create a BlobRef based on parsed data from a line of text.
* This only operates correctly on external blobs; inline blobs are simply
* returned as null. You should store BLOB data in SequenceFile format
* if reparsing is necessary.
* @param inputString the text-based input data to parse.
* @return a new BlobRef containing a reference to an external BLOB, or
* an empty BlobRef if the data to be parsed is actually inline.
*/
public static BlobRef parse(String inputString) {
// If inputString is of the form 'externalBlob(%s)', then this is an
// external BLOB stored at the filename indicated by '%s'. Otherwise,
// it is an inline BLOB, which we don't support parsing of.
Matcher m = EXTERNAL_MATCHER.get();
m.reset(inputString);
if (m.matches()) {
// Extract the filename component from the string.
return new BlobRef(m.group(1));
} else {
// This is inline BLOB string data.
LOG.warn(
"Reparsing inline BLOB data is not supported; use SequenceFiles.");
return new BlobRef();
}
}
}

View File

@ -21,9 +21,20 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.io.StringReader;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.MapContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
/**
* ClobRef is a wrapper that holds a Clob either directly, or a
@ -32,12 +43,23 @@
public class ClobRef implements Writable {
public ClobRef(String chars) {
this.clobFileNum = 0;
this.fileName = null;
this.data = chars;
}
public ClobRef() {
this.clobFileNum = 0;
this.fileName = null;
this.data = null;
}
/**
* Initialize a clobref to an external CLOB.
* @param file the filename to the CLOB. May be relative to the job dir.
* @param ignored is not used; this just differentiates this constructor
* from ClobRef(String chars).
*/
public ClobRef(String file, boolean ignored) {
this.fileName = file;
this.data = null;
}
@ -45,24 +67,74 @@ public ClobRef() {
private String data;
// If there data is too large, it's written into a file
// and the file is numbered; this number is recorded here.
// This takes precedence if this value is positive.
private long clobFileNum;
// whose path (relative to the rest of the dataset) is recorded here.
// This takes precedence if this value is non-null.
private String fileName;
public String getData() {
if (clobFileNum > 0) {
// We have a numbered file.
// TODO: Implement this.
throw new RuntimeException("Unsupported: Indirect CLOBs are not supported");
}
return data;
/**
* @return true if the CLOB data is in an external file; false if
* it is materialized inline.
*/
public boolean isExternal() {
return fileName != null;
}
/**
* Convenience method to access #getDataReader(Configuration, Path)
* from within a map task that read this ClobRef from a file-based
* InputSplit.
* @param mapContext the Mapper.Context instance that encapsulates
* the current map task.
* @return a Reader to access the CLOB data.
* @throws IllegalArgumentException if it cannot find the source
* path for this CLOB based on the MapContext.
* @throws IOException if it could not read the CLOB from external storage.
*/
public Reader getDataReader(MapContext mapContext)
throws IllegalArgumentException, IOException {
InputSplit split = mapContext.getInputSplit();
if (split instanceof FileSplit) {
Path basePath = ((FileSplit) split).getPath().getParent();
return getDataReader(mapContext.getConfiguration(),
basePath);
} else {
throw new IllegalArgumentException(
"Could not ascertain CLOB base path from MapContext.");
}
}
/**
* Get access to the CLOB data itself.
* This method returns a Reader-based representation of the
* CLOB data, accessing the filesystem for external CLOB storage
* as necessary.
* @param conf the Configuration used to access the filesystem
* @param basePath the base directory where the table records are
* stored.
* @return a Reader used to read the CLOB data.
* @throws IOException if it could not read the CLOB from external storage.
*/
public Reader getDataReader(Configuration conf, Path basePath)
throws IOException {
if (isExternal()) {
// use external storage.
FileSystem fs = FileSystem.get(conf);
return new InputStreamReader(fs.open(new Path(basePath, fileName)));
} else {
return new StringReader(data);
}
}
/**
* @return a string representation of the ClobRef. If this is an
* inline clob (isExternal() returns false), it will contain the
* materialized data. Otherwise it returns a description of the
* reference. To ensure access to the data itself, {@see #getDataStream()}.
*/
@Override
public String toString() {
if (clobFileNum > 0) {
return "indirectClob(" + clobFileNum + ")";
if (isExternal()) {
return "externalClob(" + fileName + ")";
} else {
return data;
}
@ -71,29 +143,60 @@ public String toString() {
@Override
public void readFields(DataInput in) throws IOException {
// The serialization format for this object is:
// boolean isIndirect
// if true, the next field is a Long containing clobFileNum
// if false, the next field is String data.
// boolean isExternal
// if true, the next field is a String containing the external file name.
// if false, the next field is String containing the actual data.
boolean isIndirect = in.readBoolean();
if (isIndirect) {
this.fileName = Text.readString(in);
this.data = null;
this.clobFileNum = in.readLong();
} else {
this.fileName = null;
this.data = Text.readString(in);
this.clobFileNum = 0;
}
}
@Override
public void write(DataOutput out) throws IOException {
boolean isIndirect = clobFileNum > 0;
boolean isIndirect = isExternal();
out.writeBoolean(isIndirect);
if (isIndirect) {
out.writeLong(clobFileNum);
Text.writeString(out, fileName);
} else {
Text.writeString(out, data);
}
}
// A pattern matcher which can recognize external CLOB data
// vs. an inline CLOB string.
private static final ThreadLocal<Matcher> EXTERNAL_MATCHER =
new ThreadLocal<Matcher>() {
@Override protected Matcher initialValue() {
Pattern externalPattern = Pattern.compile("externalClob\\((.*)\\)");
return externalPattern.matcher("");
}
};
/**
* Create a ClobRef based on parsed data from a line of text.
* @param inputString the text-based input data to parse.
* @return a ClobRef to the given data.
*/
public static ClobRef parse(String inputString) {
// If inputString is of the form 'externalClob(%s)', then this is an
// external CLOB stored at the filename indicated by '%s'. Otherwise,
// it is an inline CLOB.
Matcher m = EXTERNAL_MATCHER.get();
m.reset(inputString);
if (m.matches()) {
// Extract the filename component from the string.
return new ClobRef(m.group(1), true);
} else {
// This is inline CLOB string data.
return new ClobRef(inputString);
}
}
}

View File

@ -115,32 +115,14 @@ public static BigDecimal readBigDecimal(int colNum, ResultSet r) throws SQLExcep
public static BlobRef readBlobRef(int colNum, ResultSet r)
throws SQLException {
Blob b = r.getBlob(colNum);
if (null == b) {
return null;
} else if (b.length() > MAX_BLOB_LENGTH) {
// TODO: Deserialize very large BLOBs into separate files.
throw new UnsupportedOperationException("BLOB size exceeds max: "
+ MAX_BLOB_LENGTH);
} else {
// This is a 1-based array.
return new BlobRef(b.getBytes(1, (int) b.length()));
}
// Loading of BLOBs is delayed; handled by LargeObjectLoader.
return null;
}
public static ClobRef readClobRef(int colNum, ResultSet r)
throws SQLException {
Clob c = r.getClob(colNum);
if (null == c) {
return null;
} else if (c.length() > MAX_CLOB_LENGTH) {
// TODO: Deserialize very large CLOBs into separate files.
throw new UnsupportedOperationException("CLOB size exceeds max: "
+ MAX_CLOB_LENGTH);
} else {
// This is a 1-based array.
return new ClobRef(c.getSubString(1, (int) c.length()));
}
// Loading of CLOBs is delayed; handled by LargeObjectLoader.
return null;
}
public static void writeInteger(Integer val, int paramIdx, int sqlType, PreparedStatement s)

View File

@ -0,0 +1,251 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.sqoop.lib;
import java.io.BufferedOutputStream;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Reader;
import java.io.Writer;
import java.math.BigDecimal;
import java.sql.Blob;
import java.sql.Clob;
import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Time;
import java.sql.Timestamp;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* Contains a set of methods which can read db columns from a ResultSet into
* Java types, and do serialization of these types to/from DataInput/DataOutput
* for use with Hadoop's Writable implementation. This supports null values
* for all types.
*
* This is a singleton instance class; only one may exist at a time.
* However, its lifetime is limited to the current TaskInputOutputContext's
* life.
*/
public class LargeObjectLoader {
// Currently, cap BLOB/CLOB objects at 16 MB until we can use external storage.
public final static long DEFAULT_MAX_LOB_LENGTH = 16 * 1024 * 1024;
public final static String MAX_INLINE_LOB_LEN_KEY =
"sqoop.inline.lob.length.max";
// The task context for the currently-initialized instance.
private TaskInputOutputContext context;
private FileSystem fs;
// Counter that is used with the current task attempt id to
// generate unique LOB file names.
private long nextLobFileId = 0;
public LargeObjectLoader(TaskInputOutputContext context)
throws IOException {
this.context = context;
this.fs = FileSystem.get(context.getConfiguration());
}
/**
* @return a filename to use to put an external LOB in.
*/
private String getNextLobFileName() {
String file = "_lob/obj_" + context.getConfiguration().get(
JobContext.TASK_ID, "unknown_task_id") + nextLobFileId;
nextLobFileId++;
return file;
}
/**
* Copies all character data from the provided Reader to the provided
* Writer. Does not close handles when it's done.
* @param reader data source
* @param writer data sink
* @throws IOException if an I/O error occurs either reading or writing.
*/
private void copyAll(Reader reader, Writer writer) throws IOException {
int bufferSize = context.getConfiguration().getInt("io.file.buffer.size",
4096);
char [] buf = new char[bufferSize];
while (true) {
int charsRead = reader.read(buf);
if (-1 == charsRead) {
break; // no more stream to read.
}
writer.write(buf, 0, charsRead);
}
}
/**
* Copies all byte data from the provided InputStream to the provided
* OutputStream. Does not close handles when it's done.
* @param input data source
* @param output data sink
* @throws IOException if an I/O error occurs either reading or writing.
*/
private void copyAll(InputStream input, OutputStream output)
throws IOException {
int bufferSize = context.getConfiguration().getInt("io.file.buffer.size",
4096);
byte [] buf = new byte[bufferSize];
while (true) {
int bytesRead = input.read(buf, 0, bufferSize);
if (-1 == bytesRead) {
break; // no more stream to read.
}
output.write(buf, 0, bytesRead);
}
}
/**
* Actually read a BlobRef instance from the ResultSet and materialize
* the data either inline or to a file.
*
* @param colNum the column of the ResultSet's current row to read.
* @param r the ResultSet to read from.
* @return a BlobRef encapsulating the data in this field.
* @throws IOException if an error occurs writing to the FileSystem.
* @throws SQLException if an error occurs reading from the database.
*/
public BlobRef readBlobRef(int colNum, ResultSet r)
throws IOException, InterruptedException, SQLException {
long maxInlineLobLen = context.getConfiguration().getLong(
MAX_INLINE_LOB_LEN_KEY,
DEFAULT_MAX_LOB_LENGTH);
Blob b = r.getBlob(colNum);
if (null == b) {
return null;
} else if (b.length() > maxInlineLobLen) {
// Deserialize very large BLOBs into separate files.
String fileName = getNextLobFileName();
Path p = new Path(FileOutputFormat.getWorkOutputPath(context), fileName);
Path parent = p.getParent();
if (!fs.exists(parent)) {
fs.mkdirs(parent);
}
BufferedOutputStream bos = null;
InputStream is = null;
OutputStream os = fs.create(p);
try {
bos = new BufferedOutputStream(os);
is = b.getBinaryStream();
copyAll(is, bos);
} finally {
if (null != bos) {
bos.close();
os = null; // os is now closed.
}
if (null != os) {
os.close();
}
if (null != is) {
is.close();
}
}
return new BlobRef(fileName);
} else {
// This is a 1-based array.
return new BlobRef(b.getBytes(1, (int) b.length()));
}
}
/**
* Actually read a ClobRef instance from the ResultSet and materialize
* the data either inline or to a file.
*
* @param colNum the column of the ResultSet's current row to read.
* @param r the ResultSet to read from.
* @return a ClobRef encapsulating the data in this field.
* @throws IOException if an error occurs writing to the FileSystem.
* @throws SQLException if an error occurs reading from the database.
*/
public ClobRef readClobRef(int colNum, ResultSet r)
throws IOException, InterruptedException, SQLException {
long maxInlineLobLen = context.getConfiguration().getLong(
MAX_INLINE_LOB_LEN_KEY,
DEFAULT_MAX_LOB_LENGTH);
Clob c = r.getClob(colNum);
if (null == c) {
return null;
} else if (c.length() > maxInlineLobLen) {
// Deserialize large CLOB into separate file.
String fileName = getNextLobFileName();
Path p = new Path(FileOutputFormat.getWorkOutputPath(context), fileName);
Path parent = p.getParent();
if (!fs.exists(parent)) {
fs.mkdirs(parent);
}
BufferedWriter w = null;
Reader reader = null;
OutputStream os = fs.create(p);
try {
w = new BufferedWriter(new OutputStreamWriter(os));
reader = c.getCharacterStream();
copyAll(reader, w);
} finally {
if (null != w) {
w.close();
os = null; // os is now closed.
}
if (null != os) {
os.close();
}
if (null != reader) {
reader.close();
}
}
return new ClobRef(fileName, true);
} else {
// This is a 1-based array.
return new ClobRef(c.getSubString(1, (int) c.length()));
}
}
}

View File

@ -18,13 +18,15 @@
package org.apache.hadoop.sqoop.lib;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.sql.SQLException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.lib.db.DBWritable;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
/**
* Interface implemented by the classes generated by sqoop's orm.ClassWriter.
*/
@ -35,5 +37,7 @@ public interface SqoopRecord extends DBWritable, Writable {
public void parse(char [] s) throws RecordParser.ParseError;
public void parse(ByteBuffer s) throws RecordParser.ParseError;
public void parse(CharBuffer s) throws RecordParser.ParseError;
public void loadLargeObjects(LargeObjectLoader objLoader)
throws SQLException, IOException, InterruptedException;
}

View File

@ -158,6 +158,7 @@ private final void configureAutoProgress(Configuration job) {
/**
* Run the mapping process for this task, wrapped in an auto-progress system.
*/
@Override
public void run(Context context) throws IOException, InterruptedException {
configureAutoProgress(context.getConfiguration());
ProgressThread thread = this.new ProgressThread(context);
@ -182,5 +183,4 @@ public void run(Context context) throws IOException, InterruptedException {
}
}
}
}

View File

@ -44,6 +44,7 @@
import org.apache.hadoop.sqoop.ConnFactory;
import org.apache.hadoop.sqoop.SqoopOptions;
import org.apache.hadoop.sqoop.manager.ConnManager;
import org.apache.hadoop.sqoop.lib.LargeObjectLoader;
import org.apache.hadoop.sqoop.orm.TableClassName;
import org.apache.hadoop.sqoop.util.ClassLoaderStack;
import org.apache.hadoop.sqoop.util.ImportException;
@ -85,7 +86,7 @@ protected Class<? extends Mapper> getMapperClass() {
if (options.getFileLayout() == SqoopOptions.FileLayout.TextFile) {
return TextImportMapper.class;
} else if (options.getFileLayout() == SqoopOptions.FileLayout.SequenceFile) {
return AutoProgressMapper.class;
return SequenceFileImportMapper.class;
}
return null;
@ -140,6 +141,9 @@ protected void configureInputFormat(Job job, String tableName,
job.getConfiguration().set(DBConfiguration.INPUT_CLASS_PROPERTY,
tableClassName);
job.getConfiguration().setLong(LargeObjectLoader.MAX_INLINE_LOB_LEN_KEY,
options.getInlineLobLimit());
LOG.debug("Using InputFormat: " + inputFormatClass);
job.setInputFormatClass(inputFormatClass);
}

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.sqoop.mapreduce;
import java.io.IOException;
import java.sql.SQLException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.sqoop.lib.LargeObjectLoader;
import org.apache.hadoop.sqoop.lib.SqoopRecord;
/**
* Imports records by writing them to a SequenceFile.
*/
public class SequenceFileImportMapper
extends AutoProgressMapper<LongWritable, SqoopRecord, LongWritable, SqoopRecord> {
public void map(LongWritable key, SqoopRecord val, Context context)
throws IOException, InterruptedException {
try {
// Loading of LOBs was delayed until we have a Context.
val.loadLargeObjects(new LargeObjectLoader(context));
} catch (SQLException sqlE) {
throw new IOException(sqlE);
}
context.write(key, val);
}
}

View File

@ -19,18 +19,20 @@
package org.apache.hadoop.sqoop.mapreduce;
import java.io.IOException;
import java.sql.SQLException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.hadoop.sqoop.lib.LargeObjectLoader;
import org.apache.hadoop.sqoop.lib.SqoopRecord;
/**
* Converts an input record into a string representation and emit it.
* Imports records by transforming them to strings for a plain-text flat file.
*/
public class TextImportMapper
extends AutoProgressMapper<LongWritable, DBWritable, Text, NullWritable> {
extends AutoProgressMapper<LongWritable, SqoopRecord, Text, NullWritable> {
private Text outkey;
@ -38,9 +40,18 @@ public TextImportMapper() {
outkey = new Text();
}
public void map(LongWritable key, DBWritable val, Context context)
public void map(LongWritable key, SqoopRecord val, Context context)
throws IOException, InterruptedException {
try {
// Loading of LOBs was delayed until we have a Context.
val.loadLargeObjects(new LargeObjectLoader(context));
} catch (SQLException sqlE) {
throw new IOException(sqlE);
}
outkey.set(val.toString());
context.write(outkey, NullWritable.get());
}
}

View File

@ -24,6 +24,7 @@
import org.apache.hadoop.sqoop.lib.BigDecimalSerializer;
import org.apache.hadoop.sqoop.lib.FieldFormatter;
import org.apache.hadoop.sqoop.lib.JdbcWritableBridge;
import org.apache.hadoop.sqoop.lib.LargeObjectLoader;
import org.apache.hadoop.sqoop.lib.LobSerializer;
import org.apache.hadoop.sqoop.lib.RecordParser;
import org.apache.hadoop.sqoop.lib.BlobRef;
@ -415,6 +416,10 @@ private void generateDbRead(Map<String, Integer> columnTypes, String [] colNames
sb.append(" public void readFields(ResultSet __dbResults) throws SQLException {\n");
// Save ResultSet object cursor for use in LargeObjectLoader
// if necessary.
sb.append(" this.__cur_result_set = __dbResults;\n");
int fieldNum = 0;
for (String col : colNames) {
@ -440,6 +445,44 @@ private void generateDbRead(Map<String, Integer> columnTypes, String [] colNames
sb.append(" }\n");
}
/**
* Generate the loadLargeObjects() method called by the mapper to load
* delayed objects (that require the Context from the mapper).
*/
private void generateLoadLargeObjects(Map<String, Integer> columnTypes,
String [] colNames, StringBuilder sb) {
// This method relies on the __cur_result_set field being set by
// readFields() method generated by generateDbRead().
sb.append(" public void loadLargeObjects(LargeObjectLoader __loader)\n");
sb.append(" throws SQLException, IOException, InterruptedException {\n");
int fieldNum = 0;
for (String col : colNames) {
fieldNum++;
int sqlType = columnTypes.get(col);
String javaType = connManager.toJavaType(sqlType);
if (null == javaType) {
LOG.error("No Java type for SQL type " + sqlType);
continue;
}
String getterMethod = dbGetterForType(javaType);
if ("readClobRef".equals(getterMethod)
|| "readBlobRef".equals(getterMethod)) {
// This field is a blob/clob field with delayed loading.
// Call the appropriate LargeObjectLoader method (which has the
// same name as a JdbcWritableBridge method).
sb.append(" this." + col + " = __loader." + getterMethod
+ "(" + fieldNum + ", this.__cur_result_set);\n");
}
}
sb.append(" }\n");
}
/**
* Generate the write() method used by the database
@ -638,12 +681,9 @@ private void parseColumn(String colName, int colType, StringBuilder sb) {
} else if (javaType.equals("java.math.BigDecimal")) {
sb.append(" this." + colName + " = new java.math.BigDecimal(__cur_str);\n");
} else if (javaType.equals(ClobRef.class.getName())) {
sb.append(" this." + colName + " = new ClobRef(__cur_str);\n");
sb.append(" this." + colName + " = ClobRef.parse(__cur_str);\n");
} else if (javaType.equals(BlobRef.class.getName())) {
// We don't support parsing BLOB data.
// Users must store this in SequenceFiles.
LOG.warn("BLOB data cannot be reparsed from text files");
sb.append(" this." + colName + " = new BlobRef();\n");
sb.append(" this." + colName + " = BlobRef.parse(__cur_str);\n");
} else {
LOG.error("No parser available for Java type " + javaType);
}
@ -844,6 +884,7 @@ public StringBuilder generateClassForColumns(Map<String, Integer> columnTypes,
sb.append("import " + RecordParser.class.getCanonicalName() + ";\n");
sb.append("import " + BlobRef.class.getCanonicalName() + ";\n");
sb.append("import " + ClobRef.class.getCanonicalName() + ";\n");
sb.append("import " + LargeObjectLoader.class.getCanonicalName() + ";\n");
sb.append("import " + SqoopRecord.class.getCanonicalName() + ";\n");
sb.append("import java.sql.PreparedStatement;\n");
sb.append("import java.sql.ResultSet;\n");
@ -860,10 +901,13 @@ public StringBuilder generateClassForColumns(Map<String, Integer> columnTypes,
sb.append("import java.util.List;\n");
String className = tableNameInfo.getShortClassForTable(tableName);
sb.append("public class " + className + " implements DBWritable, SqoopRecord, Writable {\n");
sb.append("public class " + className
+ " implements DBWritable, SqoopRecord, Writable {\n");
sb.append(" public static final int PROTOCOL_VERSION = " + CLASS_WRITER_VERSION + ";\n");
sb.append(" protected ResultSet __cur_result_set;\n");
generateFields(columnTypes, colNames, sb);
generateDbRead(columnTypes, colNames, sb);
generateLoadLargeObjects(columnTypes, colNames, sb);
generateDbWrite(columnTypes, colNames, sb);
generateHadoopRead(columnTypes, colNames, sb);
generateHadoopWrite(columnTypes, colNames, sb);

View File

@ -23,6 +23,9 @@
import org.apache.hadoop.sqoop.io.TestSplittableBufferedWriter;
import org.apache.hadoop.sqoop.lib.TestFieldFormatter;
import org.apache.hadoop.sqoop.lib.TestRecordParser;
import org.apache.hadoop.sqoop.lib.TestBlobRef;
import org.apache.hadoop.sqoop.lib.TestClobRef;
import org.apache.hadoop.sqoop.lib.TestLargeObjectLoader;
import org.apache.hadoop.sqoop.manager.TestHsqldbManager;
import org.apache.hadoop.sqoop.manager.TestSqlManager;
import org.apache.hadoop.sqoop.mapreduce.MapreduceTests;
@ -60,6 +63,9 @@ public static Test suite() {
suite.addTestSuite(TestConnFactory.class);
suite.addTestSuite(TestSplittableBufferedWriter.class);
suite.addTestSuite(TestTableDefWriter.class);
suite.addTestSuite(TestBlobRef.class);
suite.addTestSuite(TestClobRef.class);
suite.addTestSuite(TestLargeObjectLoader.class);
suite.addTest(MapreduceTests.suite());
return suite;

View File

@ -0,0 +1,125 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.sqoop.lib;
import java.io.*;
import java.util.ArrayList;
import java.util.List;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
/**
* Test that the BlobRef.parse() method does the right thing.
* Note that we don't support inline parsing here; we only expect this to
* really work for external BLOBs.
*/
public class TestBlobRef extends TestCase {
public void testEmptyStr() {
BlobRef r = BlobRef.parse("");
assertFalse(r.isExternal());
}
public void testInline() throws IOException {
BlobRef r = BlobRef.parse("foo");
assertFalse(r.isExternal());
}
public void testEmptyFile() {
BlobRef r = BlobRef.parse("externalBlob()");
assertTrue(r.isExternal());
assertEquals("externalBlob()", r.toString());
}
public void testInlineNearMatch() {
BlobRef r = BlobRef.parse("externalBlob(foo)bar");
assertFalse(r.isExternal());
}
public void testExternal() throws IOException {
final byte [] DATA = { 1, 2, 3, 4, 5 };
final String FILENAME = "blobdata";
doExternalTest(DATA, FILENAME);
}
public void testExternalSubdir() throws IOException {
final byte [] DATA = { 1, 2, 3, 4, 5 };
final String FILENAME = "_lob/blobdata";
try {
doExternalTest(DATA, FILENAME);
} finally {
// remove dir we made.
Configuration conf = new Configuration();
FileSystem fs = FileSystem.getLocal(conf);
String tmpDir = System.getProperty("test.build.data", "/tmp/");
Path lobDir = new Path(new Path(tmpDir), "_lob");
fs.delete(lobDir, false);
}
}
private void doExternalTest(final byte [] DATA, final String FILENAME)
throws IOException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "file:///");
FileSystem fs = FileSystem.get(conf);
String tmpDir = System.getProperty("test.build.data", "/tmp/");
Path tmpPath = new Path(tmpDir);
Path blobFile = new Path(tmpPath, FILENAME);
// make any necessary parent dirs.
Path blobParent = blobFile.getParent();
if (!fs.exists(blobParent)) {
fs.mkdirs(blobParent);
}
OutputStream os = fs.create(blobFile);
try {
os.write(DATA, 0, DATA.length);
os.close();
BlobRef blob = BlobRef.parse("externalBlob(" + FILENAME + ")");
assertTrue(blob.isExternal());
assertEquals("externalBlob(" + FILENAME + ")", blob.toString());
InputStream is = blob.getDataStream(conf, tmpPath);
assertNotNull(is);
byte [] buf = new byte[4096];
int bytes = is.read(buf, 0, 4096);
is.close();
assertEquals(DATA.length, bytes);
for (int i = 0; i < bytes; i++) {
assertEquals(DATA[i], buf[i]);
}
} finally {
fs.delete(blobFile, false);
}
}
}

View File

@ -0,0 +1,134 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.sqoop.lib;
import java.io.*;
import java.util.ArrayList;
import java.util.List;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
/**
* Test parsing of ClobRef objects.
*/
public class TestClobRef extends TestCase {
public void testEmptyStr() {
ClobRef r = ClobRef.parse("");
assertFalse(r.isExternal());
assertEquals("", r.toString());
}
public void testInline() throws IOException {
ClobRef r = ClobRef.parse("foo");
assertFalse(r.isExternal());
assertEquals("foo", r.toString());
Reader reader = r.getDataReader(null, null);
assertNotNull(reader);
char [] buf = new char[4096];
int chars = reader.read(buf, 0, 4096);
reader.close();
String str = new String(buf, 0, chars);
assertEquals("foo", str);
}
public void testEmptyFile() {
ClobRef r = ClobRef.parse("externalClob()");
assertTrue(r.isExternal());
assertEquals("externalClob()", r.toString());
}
public void testInlineNearMatch() {
ClobRef r = ClobRef.parse("externalClob(foo)bar");
assertFalse(r.isExternal());
assertEquals("externalClob(foo)bar", r.toString());
}
public void testExternal() throws IOException {
final String DATA = "This is the clob data!";
final String FILENAME = "clobdata";
doExternalTest(DATA, FILENAME);
}
public void testExternalSubdir() throws IOException {
final String DATA = "This is the clob data!";
final String FILENAME = "_lob/clobdata";
try {
doExternalTest(DATA, FILENAME);
} finally {
// remove dir we made.
Configuration conf = new Configuration();
FileSystem fs = FileSystem.getLocal(conf);
String tmpDir = System.getProperty("test.build.data", "/tmp/");
Path lobDir = new Path(new Path(tmpDir), "_lob");
fs.delete(lobDir, false);
}
}
private void doExternalTest(final String DATA, final String FILENAME)
throws IOException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "file:///");
FileSystem fs = FileSystem.get(conf);
String tmpDir = System.getProperty("test.build.data", "/tmp/");
Path tmpPath = new Path(tmpDir);
Path clobFile = new Path(tmpPath, FILENAME);
// make any necessary parent dirs.
Path clobParent = clobFile.getParent();
if (!fs.exists(clobParent)) {
fs.mkdirs(clobParent);
}
BufferedWriter w = new BufferedWriter(new OutputStreamWriter(
fs.create(clobFile)));
try {
w.append(DATA);
w.close();
ClobRef clob = ClobRef.parse("externalClob(" + FILENAME + ")");
assertTrue(clob.isExternal());
assertEquals("externalClob(" + FILENAME + ")", clob.toString());
Reader r = clob.getDataReader(conf, tmpPath);
assertNotNull(r);
char [] buf = new char[4096];
int chars = r.read(buf, 0, 4096);
r.close();
String str = new String(buf, 0, chars);
assertEquals(DATA, str);
} finally {
fs.delete(clobFile, false);
}
}
}

View File

@ -0,0 +1,160 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.sqoop.lib;
import java.io.*;
import java.util.ArrayList;
import java.util.List;
import java.sql.ResultSet;
import java.sql.SQLException;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.MapContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mrunit.mapreduce.mock.MockMapContext;
import org.apache.hadoop.mrunit.types.Pair;
import org.apache.hadoop.sqoop.testutil.MockResultSet;
/**
* Test deserialization of ClobRef and BlobRef fields.
*/
public class TestLargeObjectLoader extends TestCase {
/**
* A mock MapContext that uses FileOutputCommitter.
* This MapContext is actually serving two roles here; when writing the
* CLOB files, its OutputCommitter is used to determine where to write
* the CLOB data, as these are placed in the task output work directory.
* When reading the CLOB data back for verification, we use the
* getInputSplit() to determine where to read our source data from--the same
* directory. We are repurposing the same context for both output and input.
*/
private static class MockMapContextWithCommitter<K1, V1, K2, V2>
extends MockMapContext<K1, V1, K2, V2> {
private Path outputDir;
private Configuration conf;
public MockMapContextWithCommitter(Configuration conf, Path outDir) {
super(new ArrayList<Pair<K1, V1>>(), new Counters());
this.outputDir = outDir;
this.conf = conf;
}
@Override
public OutputCommitter getOutputCommitter() {
try {
return new FileOutputCommitter(outputDir, this);
} catch (IOException ioe) {
return null;
}
}
@Override
public InputSplit getInputSplit() {
return new FileSplit(new Path(outputDir, "inputFile"), 0, 0, new String[0]);
}
@Override
public Configuration getConfiguration() {
return conf;
}
}
protected Configuration conf;
protected MapContext mapContext;
protected LargeObjectLoader loader;
public void setUp() throws IOException {
conf = new Configuration();
conf.set("fs.defaultFS", "file:///");
String tmpDir = System.getProperty("test.build.data", "/tmp/");
Path outDir = new Path(new Path(tmpDir), "testLobLoader");
FileSystem fs = FileSystem.getLocal(conf);
if (fs.exists(outDir)) {
fs.delete(outDir, true);
}
fs.mkdirs(outDir);
mapContext = new MockMapContextWithCommitter(conf, outDir);
loader = new LargeObjectLoader(mapContext);
}
public void testReadClobRef()
throws IOException, InterruptedException, SQLException {
// This should give us an inline CLOB.
ResultSet resultSet = new MockResultSet();
ClobRef clob = loader.readClobRef(0, resultSet);
assertNotNull(clob);
assertFalse(clob.isExternal());
assertEquals(MockResultSet.CLOB_DATA, clob.toString());
// LOBs bigger than 4 bytes are now external.
conf.setLong(LargeObjectLoader.MAX_INLINE_LOB_LEN_KEY, 4);
clob = loader.readClobRef(0, resultSet);
assertNotNull(clob);
assertTrue(clob.isExternal());
mapContext.getOutputCommitter().commitTask(mapContext);
Reader r = clob.getDataReader(mapContext);
char [] buf = new char[4096];
int chars = r.read(buf, 0, 4096);
r.close();
String str = new String(buf, 0, chars);
assertEquals(MockResultSet.CLOB_DATA, str);
}
public void testReadBlobRef()
throws IOException, InterruptedException, SQLException {
// This should give us an inline BLOB.
ResultSet resultSet = new MockResultSet();
BlobRef blob = loader.readBlobRef(0, resultSet);
assertNotNull(blob);
assertFalse(blob.isExternal());
byte [] data = blob.getData();
assertEquals(MockResultSet.BLOB_DATA.length, data.length);
for (int i = 0; i < data.length; i++) {
assertEquals(MockResultSet.BLOB_DATA[i], data[i]);
}
// LOBs bigger than 4 bytes are now external.
conf.setLong(LargeObjectLoader.MAX_INLINE_LOB_LEN_KEY, 4);
blob = loader.readBlobRef(0, resultSet);
assertNotNull(blob);
assertTrue(blob.isExternal());
mapContext.getOutputCommitter().commitTask(mapContext);
InputStream is = blob.getDataStream(mapContext);
byte [] buf = new byte[4096];
int bytes = is.read(buf, 0, 4096);
is.close();
assertEquals(MockResultSet.BLOB_DATA.length, bytes);
for (int i = 0; i < bytes; i++) {
assertEquals(MockResultSet.BLOB_DATA[i], buf[i]);
}
}
}

View File

@ -21,6 +21,8 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
@ -28,8 +30,9 @@
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.sqoop.lib.LargeObjectLoader;
import org.apache.hadoop.sqoop.lib.SqoopRecord;
import junit.framework.TestCase;
@ -39,10 +42,10 @@
public class TestTextImportMapper extends TestCase {
static class DummyDBWritable implements DBWritable {
static class DummySqoopRecord implements SqoopRecord {
long field;
public DummyDBWritable(final long val) {
public DummySqoopRecord(final long val) {
this.field = val;
}
@ -65,14 +68,22 @@ public void write(PreparedStatement s) throws SQLException {
public String toString() {
return "" + field;
}
public void loadLargeObjects(LargeObjectLoader loader) { }
public void parse(CharSequence s) { }
public void parse(Text s) { }
public void parse(byte [] s) { }
public void parse(char [] s) { }
public void parse(ByteBuffer s) { }
public void parse(CharBuffer s) { }
}
public void testTextImport() {
TextImportMapper m = new TextImportMapper();
MapDriver<LongWritable, DBWritable, Text, NullWritable> driver =
new MapDriver<LongWritable, DBWritable, Text, NullWritable>(m);
MapDriver<LongWritable, SqoopRecord, Text, NullWritable> driver =
new MapDriver<LongWritable, SqoopRecord, Text, NullWritable>(m);
driver.withInput(new LongWritable(0), new DummyDBWritable(42))
driver.withInput(new LongWritable(0), new DummySqoopRecord(42))
.withOutput(new Text("42"), NullWritable.get())
.runTest();
}

File diff suppressed because it is too large Load Diff