From bb29ce94923734e31e70e768fcb9fa0b82b6cbcf Mon Sep 17 00:00:00 2001 From: Andrew Bayer Date: Fri, 22 Jul 2011 20:03:35 +0000 Subject: [PATCH] Support for CLOB/BLOB data in external files. CLOB/BLOB data may now be stored in additional files in HDFS which are accessible through streams if the data cannot be fully materialized in RAM. Adds tests for external large objects. Refactored large object loading into the map() method from readFields(). From: Aaron Kimball git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1149866 13f79535-47bb-0310-9956-ffa450edef68 --- src/docs/Sqoop-manpage.txt | 4 + src/docs/misc-args.txt | 8 + .../org/apache/hadoop/sqoop/SqoopOptions.java | 16 + .../apache/hadoop/sqoop/hive/HiveTypes.java | 4 +- .../org/apache/hadoop/sqoop/lib/BlobRef.java | 154 ++- .../org/apache/hadoop/sqoop/lib/ClobRef.java | 147 ++- .../hadoop/sqoop/lib/JdbcWritableBridge.java | 26 +- .../hadoop/sqoop/lib/LargeObjectLoader.java | 251 ++++ .../apache/hadoop/sqoop/lib/SqoopRecord.java | 10 +- .../sqoop/mapreduce/AutoProgressMapper.java | 2 +- .../sqoop/mapreduce/DataDrivenImportJob.java | 6 +- .../mapreduce/SequenceFileImportMapper.java | 48 + .../sqoop/mapreduce/TextImportMapper.java | 19 +- .../apache/hadoop/sqoop/orm/ClassWriter.java | 56 +- .../org/apache/hadoop/sqoop/SmokeTests.java | 6 + .../apache/hadoop/sqoop/lib/TestBlobRef.java | 125 ++ .../apache/hadoop/sqoop/lib/TestClobRef.java | 134 +++ .../sqoop/lib/TestLargeObjectLoader.java | 160 +++ .../sqoop/mapreduce/TestTextImportMapper.java | 23 +- .../hadoop/sqoop/testutil/MockResultSet.java | 1012 +++++++++++++++++ 20 files changed, 2123 insertions(+), 88 deletions(-) create mode 100644 src/java/org/apache/hadoop/sqoop/lib/LargeObjectLoader.java create mode 100644 src/java/org/apache/hadoop/sqoop/mapreduce/SequenceFileImportMapper.java create mode 100644 src/test/org/apache/hadoop/sqoop/lib/TestBlobRef.java create mode 100644 src/test/org/apache/hadoop/sqoop/lib/TestClobRef.java create mode 100644 src/test/org/apache/hadoop/sqoop/lib/TestLargeObjectLoader.java create mode 100644 src/test/org/apache/hadoop/sqoop/testutil/MockResultSet.java diff --git a/src/docs/Sqoop-manpage.txt b/src/docs/Sqoop-manpage.txt index 441d7280..5e898a3e 100644 --- a/src/docs/Sqoop-manpage.txt +++ b/src/docs/Sqoop-manpage.txt @@ -123,6 +123,10 @@ Import control options When using direct mode, write to multiple files of approximately _size_ bytes each. +--inline-lob-limit (size):: + When importing LOBs, keep objects inline up to + _size_ bytes. + Export control options ~~~~~~~~~~~~~~~~~~~~~~ diff --git a/src/docs/misc-args.txt b/src/docs/misc-args.txt index 21439424..4c7b05df 100644 --- a/src/docs/misc-args.txt +++ b/src/docs/misc-args.txt @@ -34,6 +34,14 @@ Data emitted to HDFS is by default uncompressed. You can instruct Sqoop to use gzip to compress your data by providing either the +--compress+ or +-z+ argument (both are equivalent). +Small CLOB and BLOB values will be imported as string-based data inline +with the rest of their containing record. Over a size threshold (by +default, 16 MB per object), these values will not be materialized directly, +inline, and will be written to external files in HDFS; the inline records +will contain pointers to these files. The inline materialization limit can +be controlled with the +--inline-lob-limit+ argument; the limit itself is +specified in bytes. + Using +--verbose+ will instruct Sqoop to print more details about its operation; this is particularly handy if Sqoop appears to be misbehaving. diff --git a/src/java/org/apache/hadoop/sqoop/SqoopOptions.java b/src/java/org/apache/hadoop/sqoop/SqoopOptions.java index dcbdfa66..15354854 100644 --- a/src/java/org/apache/hadoop/sqoop/SqoopOptions.java +++ b/src/java/org/apache/hadoop/sqoop/SqoopOptions.java @@ -30,6 +30,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.sqoop.lib.LargeObjectLoader; import org.apache.hadoop.util.ToolRunner; import org.apache.log4j.Category; import org.apache.log4j.Level; @@ -115,6 +116,9 @@ public enum FileLayout { private boolean useCompression; private long directSplitSize; // In direct mode, open a new stream every X bytes. + private long maxInlineLobSize; // Max size of an inline LOB; larger LOBs are written + // to external files on disk. + private String exportDir; // HDFS path to read from when performing an export private char inputFieldDelim; @@ -275,6 +279,8 @@ private void initDefaults(Configuration baseConfiguration) { this.useCompression = false; this.directSplitSize = 0; + this.maxInlineLobSize = LargeObjectLoader.DEFAULT_MAX_LOB_LENGTH; + if (null == baseConfiguration) { this.conf = new Configuration(); } else { @@ -328,6 +334,7 @@ public static void printUsage() { System.out.println("-z, --compress Enable compression"); System.out.println("--direct-split-size (n) Split the input stream every 'n' bytes"); System.out.println(" when importing in direct mode."); + System.out.println("--inline-lob-limit (n) Set the maximum size for an inline LOB"); System.out.println(""); System.out.println("Export options:"); System.out.println("--export-dir (dir) Export from an HDFS path into a table"); @@ -584,6 +591,8 @@ public void parse(String [] args) throws InvalidOptionsException { this.useCompression = true; } else if (args[i].equals("--direct-split-size")) { this.directSplitSize = Long.parseLong(args[++i]); + } else if (args[i].equals("--inline-lob-limit")) { + this.maxInlineLobSize = Long.parseLong(args[++i]); } else if (args[i].equals("--jar-file")) { this.existingJarFile = args[++i]; } else if (args[i].equals("--list-databases")) { @@ -1003,6 +1012,13 @@ public long getDirectSplitSize() { return this.directSplitSize; } + /** + * @return the max size of a LOB before we spill to a separate file. + */ + public long getInlineLobLimit() { + return this.maxInlineLobSize; + } + public Configuration getConf() { return conf; } diff --git a/src/java/org/apache/hadoop/sqoop/hive/HiveTypes.java b/src/java/org/apache/hadoop/sqoop/hive/HiveTypes.java index 59c6fda6..7cc83a8e 100644 --- a/src/java/org/apache/hadoop/sqoop/hive/HiveTypes.java +++ b/src/java/org/apache/hadoop/sqoop/hive/HiveTypes.java @@ -74,8 +74,10 @@ public static String toHiveType(int sqlType) { } else if (sqlType == Types.TIMESTAMP) { // unfortunate type coercion return "STRING"; + } else if (sqlType == Types.CLOB) { + return "STRING"; } else { - // TODO(aaron): Support BINARY, VARBINARY, LONGVARBINARY, DISTINCT, CLOB, + // TODO(aaron): Support BINARY, VARBINARY, LONGVARBINARY, DISTINCT, // BLOB, ARRAY, STRUCT, REF, JAVA_OBJECT. return null; } diff --git a/src/java/org/apache/hadoop/sqoop/lib/BlobRef.java b/src/java/org/apache/hadoop/sqoop/lib/BlobRef.java index 49b33b47..877fe0b6 100644 --- a/src/java/org/apache/hadoop/sqoop/lib/BlobRef.java +++ b/src/java/org/apache/hadoop/sqoop/lib/BlobRef.java @@ -18,12 +18,26 @@ package org.apache.hadoop.sqoop.lib; +import java.io.ByteArrayInputStream; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.io.InputStream; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.MapContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; /** * BlobRef is a wrapper that holds a Blob either directly, or a @@ -31,13 +45,20 @@ */ public class BlobRef implements Writable { + public static final Log LOG = LogFactory.getLog(BlobRef.class.getName()); + public BlobRef(byte [] bytes) { - this.blobFileNum = 0; + this.fileName = null; this.data = new BytesWritable(bytes); } public BlobRef() { - this.blobFileNum = 0; + this.fileName = null; + this.data = null; + } + + public BlobRef(String file) { + this.fileName = file; this.data = null; } @@ -45,15 +66,69 @@ public BlobRef() { private BytesWritable data; // If there data is too large, it's written into a file - // and the file is numbered; this number is recorded here. - // This takes precedence if this value is positive. - private long blobFileNum; + // whose path (relative to the rest of the dataset) is recorded here. + // This takes precedence if this value is non-null. + private String fileName; + + /** + * @return true if the BLOB data is in an external file; false if + * it materialized inline. + */ + public boolean isExternal() { + return fileName != null; + } + + /** + * Convenience method to access #getDataStream(Configuration, Path) + * from within a map task that read this BlobRef from a file-based + * InputSplit. + * @param mapContext the Mapper.Context instance that encapsulates + * the current map task. + * @return an InputStream to access the BLOB data. + * @throws IllegalArgumentException if it cannot find the source + * path for this BLOB based on the MapContext. + * @throws IOException if it could not read the BLOB from external storage. + */ + public InputStream getDataStream(MapContext mapContext) + throws IllegalArgumentException, IOException { + InputSplit split = mapContext.getInputSplit(); + if (split instanceof FileSplit) { + Path basePath = ((FileSplit) split).getPath().getParent(); + return getDataStream(mapContext.getConfiguration(), + basePath); + } else { + throw new IllegalArgumentException( + "Could not ascertain BLOB base path from MapContext."); + } + } + + /** + * Get access to the BLOB data itself. + * This method returns an InputStream-based representation of the + * BLOB data, accessing the filesystem for external BLOB storage + * as necessary. + * @param conf the Configuration used to access the filesystem + * @param basePath the base directory where the table records are + * stored. + * @return an InputStream used to read the BLOB data. + * @throws IOException if it could not read the BLOB from external storage. + */ + public InputStream getDataStream(Configuration conf, Path basePath) + throws IOException { + if (isExternal()) { + // use external storage. + FileSystem fs = FileSystem.get(conf); + return fs.open(new Path(basePath, fileName)); + } else { + return new ByteArrayInputStream(data.getBytes()); + } + } + public byte [] getData() { - if (blobFileNum > 0) { - // We have a numbered file. - // TODO: Implement this. - throw new RuntimeException("Unsupported: Indirect BLOBs are not supported"); + if (isExternal()) { + throw new RuntimeException( + "External BLOBs must be read via getDataStream()"); } return data.getBytes(); @@ -61,8 +136,8 @@ public BlobRef() { @Override public String toString() { - if (blobFileNum > 0) { - return "indirectBlob(" + blobFileNum + ")"; + if (isExternal()) { + return "externalBlob(" + fileName + ")"; } else { return data.toString(); } @@ -71,32 +146,67 @@ public String toString() { @Override public void readFields(DataInput in) throws IOException { // The serialization format for this object is: - // boolean isIndirect - // if true, the next field is a Long containing blobFileNum - // if false, the next field is String data. + // boolean isExternal + // if true, the next field is a String containing the file name. + // if false, the next field is a BytesWritable containing the + // actual data. - boolean isIndirect = in.readBoolean(); - if (isIndirect) { + boolean isExternal = in.readBoolean(); + if (isExternal) { this.data = null; - this.blobFileNum = in.readLong(); + this.fileName = Text.readString(in); } else { if (null == this.data) { this.data = new BytesWritable(); } this.data.readFields(in); - this.blobFileNum = 0; + this.fileName = null; } } @Override public void write(DataOutput out) throws IOException { - boolean isIndirect = blobFileNum > 0; - out.writeBoolean(isIndirect); - if (isIndirect) { - out.writeLong(blobFileNum); + out.writeBoolean(isExternal()); + if (isExternal()) { + Text.writeString(out, fileName); } else { data.write(out); } } + + private static final ThreadLocal EXTERNAL_MATCHER = + new ThreadLocal() { + @Override protected Matcher initialValue() { + Pattern externalPattern = Pattern.compile("externalBlob\\((.*)\\)"); + return externalPattern.matcher(""); + } + }; + + /** + * Create a BlobRef based on parsed data from a line of text. + * This only operates correctly on external blobs; inline blobs are simply + * returned as null. You should store BLOB data in SequenceFile format + * if reparsing is necessary. + * @param inputString the text-based input data to parse. + * @return a new BlobRef containing a reference to an external BLOB, or + * an empty BlobRef if the data to be parsed is actually inline. + */ + public static BlobRef parse(String inputString) { + // If inputString is of the form 'externalBlob(%s)', then this is an + // external BLOB stored at the filename indicated by '%s'. Otherwise, + // it is an inline BLOB, which we don't support parsing of. + + Matcher m = EXTERNAL_MATCHER.get(); + m.reset(inputString); + if (m.matches()) { + // Extract the filename component from the string. + return new BlobRef(m.group(1)); + } else { + // This is inline BLOB string data. + LOG.warn( + "Reparsing inline BLOB data is not supported; use SequenceFiles."); + return new BlobRef(); + } + } } diff --git a/src/java/org/apache/hadoop/sqoop/lib/ClobRef.java b/src/java/org/apache/hadoop/sqoop/lib/ClobRef.java index ed6c6b90..b1a969c1 100644 --- a/src/java/org/apache/hadoop/sqoop/lib/ClobRef.java +++ b/src/java/org/apache/hadoop/sqoop/lib/ClobRef.java @@ -21,9 +21,20 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; +import java.io.StringReader; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.MapContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; /** * ClobRef is a wrapper that holds a Clob either directly, or a @@ -32,12 +43,23 @@ public class ClobRef implements Writable { public ClobRef(String chars) { - this.clobFileNum = 0; + this.fileName = null; this.data = chars; } public ClobRef() { - this.clobFileNum = 0; + this.fileName = null; + this.data = null; + } + + /** + * Initialize a clobref to an external CLOB. + * @param file the filename to the CLOB. May be relative to the job dir. + * @param ignored is not used; this just differentiates this constructor + * from ClobRef(String chars). + */ + public ClobRef(String file, boolean ignored) { + this.fileName = file; this.data = null; } @@ -45,24 +67,74 @@ public ClobRef() { private String data; // If there data is too large, it's written into a file - // and the file is numbered; this number is recorded here. - // This takes precedence if this value is positive. - private long clobFileNum; + // whose path (relative to the rest of the dataset) is recorded here. + // This takes precedence if this value is non-null. + private String fileName; - public String getData() { - if (clobFileNum > 0) { - // We have a numbered file. - // TODO: Implement this. - throw new RuntimeException("Unsupported: Indirect CLOBs are not supported"); - } - - return data; + /** + * @return true if the CLOB data is in an external file; false if + * it is materialized inline. + */ + public boolean isExternal() { + return fileName != null; } + /** + * Convenience method to access #getDataReader(Configuration, Path) + * from within a map task that read this ClobRef from a file-based + * InputSplit. + * @param mapContext the Mapper.Context instance that encapsulates + * the current map task. + * @return a Reader to access the CLOB data. + * @throws IllegalArgumentException if it cannot find the source + * path for this CLOB based on the MapContext. + * @throws IOException if it could not read the CLOB from external storage. + */ + public Reader getDataReader(MapContext mapContext) + throws IllegalArgumentException, IOException { + InputSplit split = mapContext.getInputSplit(); + if (split instanceof FileSplit) { + Path basePath = ((FileSplit) split).getPath().getParent(); + return getDataReader(mapContext.getConfiguration(), + basePath); + } else { + throw new IllegalArgumentException( + "Could not ascertain CLOB base path from MapContext."); + } + } + + /** + * Get access to the CLOB data itself. + * This method returns a Reader-based representation of the + * CLOB data, accessing the filesystem for external CLOB storage + * as necessary. + * @param conf the Configuration used to access the filesystem + * @param basePath the base directory where the table records are + * stored. + * @return a Reader used to read the CLOB data. + * @throws IOException if it could not read the CLOB from external storage. + */ + public Reader getDataReader(Configuration conf, Path basePath) + throws IOException { + if (isExternal()) { + // use external storage. + FileSystem fs = FileSystem.get(conf); + return new InputStreamReader(fs.open(new Path(basePath, fileName))); + } else { + return new StringReader(data); + } + } + + /** + * @return a string representation of the ClobRef. If this is an + * inline clob (isExternal() returns false), it will contain the + * materialized data. Otherwise it returns a description of the + * reference. To ensure access to the data itself, {@see #getDataStream()}. + */ @Override public String toString() { - if (clobFileNum > 0) { - return "indirectClob(" + clobFileNum + ")"; + if (isExternal()) { + return "externalClob(" + fileName + ")"; } else { return data; } @@ -71,29 +143,60 @@ public String toString() { @Override public void readFields(DataInput in) throws IOException { // The serialization format for this object is: - // boolean isIndirect - // if true, the next field is a Long containing clobFileNum - // if false, the next field is String data. + // boolean isExternal + // if true, the next field is a String containing the external file name. + // if false, the next field is String containing the actual data. boolean isIndirect = in.readBoolean(); if (isIndirect) { + this.fileName = Text.readString(in); this.data = null; - this.clobFileNum = in.readLong(); } else { + this.fileName = null; this.data = Text.readString(in); - this.clobFileNum = 0; } } @Override public void write(DataOutput out) throws IOException { - boolean isIndirect = clobFileNum > 0; + boolean isIndirect = isExternal(); out.writeBoolean(isIndirect); if (isIndirect) { - out.writeLong(clobFileNum); + Text.writeString(out, fileName); } else { Text.writeString(out, data); } } + + // A pattern matcher which can recognize external CLOB data + // vs. an inline CLOB string. + private static final ThreadLocal EXTERNAL_MATCHER = + new ThreadLocal() { + @Override protected Matcher initialValue() { + Pattern externalPattern = Pattern.compile("externalClob\\((.*)\\)"); + return externalPattern.matcher(""); + } + }; + + /** + * Create a ClobRef based on parsed data from a line of text. + * @param inputString the text-based input data to parse. + * @return a ClobRef to the given data. + */ + public static ClobRef parse(String inputString) { + // If inputString is of the form 'externalClob(%s)', then this is an + // external CLOB stored at the filename indicated by '%s'. Otherwise, + // it is an inline CLOB. + + Matcher m = EXTERNAL_MATCHER.get(); + m.reset(inputString); + if (m.matches()) { + // Extract the filename component from the string. + return new ClobRef(m.group(1), true); + } else { + // This is inline CLOB string data. + return new ClobRef(inputString); + } + } } diff --git a/src/java/org/apache/hadoop/sqoop/lib/JdbcWritableBridge.java b/src/java/org/apache/hadoop/sqoop/lib/JdbcWritableBridge.java index dc0a0450..41179cbd 100644 --- a/src/java/org/apache/hadoop/sqoop/lib/JdbcWritableBridge.java +++ b/src/java/org/apache/hadoop/sqoop/lib/JdbcWritableBridge.java @@ -115,32 +115,14 @@ public static BigDecimal readBigDecimal(int colNum, ResultSet r) throws SQLExcep public static BlobRef readBlobRef(int colNum, ResultSet r) throws SQLException { - Blob b = r.getBlob(colNum); - if (null == b) { - return null; - } else if (b.length() > MAX_BLOB_LENGTH) { - // TODO: Deserialize very large BLOBs into separate files. - throw new UnsupportedOperationException("BLOB size exceeds max: " - + MAX_BLOB_LENGTH); - } else { - // This is a 1-based array. - return new BlobRef(b.getBytes(1, (int) b.length())); - } + // Loading of BLOBs is delayed; handled by LargeObjectLoader. + return null; } public static ClobRef readClobRef(int colNum, ResultSet r) throws SQLException { - Clob c = r.getClob(colNum); - if (null == c) { - return null; - } else if (c.length() > MAX_CLOB_LENGTH) { - // TODO: Deserialize very large CLOBs into separate files. - throw new UnsupportedOperationException("CLOB size exceeds max: " - + MAX_CLOB_LENGTH); - } else { - // This is a 1-based array. - return new ClobRef(c.getSubString(1, (int) c.length())); - } + // Loading of CLOBs is delayed; handled by LargeObjectLoader. + return null; } public static void writeInteger(Integer val, int paramIdx, int sqlType, PreparedStatement s) diff --git a/src/java/org/apache/hadoop/sqoop/lib/LargeObjectLoader.java b/src/java/org/apache/hadoop/sqoop/lib/LargeObjectLoader.java new file mode 100644 index 00000000..b4ec7884 --- /dev/null +++ b/src/java/org/apache/hadoop/sqoop/lib/LargeObjectLoader.java @@ -0,0 +1,251 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.sqoop.lib; + +import java.io.BufferedOutputStream; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.Reader; +import java.io.Writer; +import java.math.BigDecimal; +import java.sql.Blob; +import java.sql.Clob; +import java.sql.Date; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Time; +import java.sql.Timestamp; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.TaskInputOutputContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; + +/** + * Contains a set of methods which can read db columns from a ResultSet into + * Java types, and do serialization of these types to/from DataInput/DataOutput + * for use with Hadoop's Writable implementation. This supports null values + * for all types. + * + * This is a singleton instance class; only one may exist at a time. + * However, its lifetime is limited to the current TaskInputOutputContext's + * life. + */ +public class LargeObjectLoader { + + // Currently, cap BLOB/CLOB objects at 16 MB until we can use external storage. + public final static long DEFAULT_MAX_LOB_LENGTH = 16 * 1024 * 1024; + + public final static String MAX_INLINE_LOB_LEN_KEY = + "sqoop.inline.lob.length.max"; + + // The task context for the currently-initialized instance. + private TaskInputOutputContext context; + + private FileSystem fs; + + // Counter that is used with the current task attempt id to + // generate unique LOB file names. + private long nextLobFileId = 0; + + public LargeObjectLoader(TaskInputOutputContext context) + throws IOException { + this.context = context; + this.fs = FileSystem.get(context.getConfiguration()); + } + + /** + * @return a filename to use to put an external LOB in. + */ + private String getNextLobFileName() { + String file = "_lob/obj_" + context.getConfiguration().get( + JobContext.TASK_ID, "unknown_task_id") + nextLobFileId; + nextLobFileId++; + + return file; + } + + /** + * Copies all character data from the provided Reader to the provided + * Writer. Does not close handles when it's done. + * @param reader data source + * @param writer data sink + * @throws IOException if an I/O error occurs either reading or writing. + */ + private void copyAll(Reader reader, Writer writer) throws IOException { + int bufferSize = context.getConfiguration().getInt("io.file.buffer.size", + 4096); + char [] buf = new char[bufferSize]; + + while (true) { + int charsRead = reader.read(buf); + if (-1 == charsRead) { + break; // no more stream to read. + } + writer.write(buf, 0, charsRead); + } + } + + /** + * Copies all byte data from the provided InputStream to the provided + * OutputStream. Does not close handles when it's done. + * @param input data source + * @param output data sink + * @throws IOException if an I/O error occurs either reading or writing. + */ + private void copyAll(InputStream input, OutputStream output) + throws IOException { + int bufferSize = context.getConfiguration().getInt("io.file.buffer.size", + 4096); + byte [] buf = new byte[bufferSize]; + + while (true) { + int bytesRead = input.read(buf, 0, bufferSize); + if (-1 == bytesRead) { + break; // no more stream to read. + } + output.write(buf, 0, bytesRead); + } + } + + /** + * Actually read a BlobRef instance from the ResultSet and materialize + * the data either inline or to a file. + * + * @param colNum the column of the ResultSet's current row to read. + * @param r the ResultSet to read from. + * @return a BlobRef encapsulating the data in this field. + * @throws IOException if an error occurs writing to the FileSystem. + * @throws SQLException if an error occurs reading from the database. + */ + public BlobRef readBlobRef(int colNum, ResultSet r) + throws IOException, InterruptedException, SQLException { + + long maxInlineLobLen = context.getConfiguration().getLong( + MAX_INLINE_LOB_LEN_KEY, + DEFAULT_MAX_LOB_LENGTH); + + Blob b = r.getBlob(colNum); + if (null == b) { + return null; + } else if (b.length() > maxInlineLobLen) { + // Deserialize very large BLOBs into separate files. + String fileName = getNextLobFileName(); + Path p = new Path(FileOutputFormat.getWorkOutputPath(context), fileName); + + Path parent = p.getParent(); + if (!fs.exists(parent)) { + fs.mkdirs(parent); + } + + BufferedOutputStream bos = null; + InputStream is = null; + OutputStream os = fs.create(p); + try { + bos = new BufferedOutputStream(os); + is = b.getBinaryStream(); + copyAll(is, bos); + } finally { + if (null != bos) { + bos.close(); + os = null; // os is now closed. + } + + if (null != os) { + os.close(); + } + + if (null != is) { + is.close(); + } + } + + return new BlobRef(fileName); + } else { + // This is a 1-based array. + return new BlobRef(b.getBytes(1, (int) b.length())); + } + } + + + /** + * Actually read a ClobRef instance from the ResultSet and materialize + * the data either inline or to a file. + * + * @param colNum the column of the ResultSet's current row to read. + * @param r the ResultSet to read from. + * @return a ClobRef encapsulating the data in this field. + * @throws IOException if an error occurs writing to the FileSystem. + * @throws SQLException if an error occurs reading from the database. + */ + public ClobRef readClobRef(int colNum, ResultSet r) + throws IOException, InterruptedException, SQLException { + + long maxInlineLobLen = context.getConfiguration().getLong( + MAX_INLINE_LOB_LEN_KEY, + DEFAULT_MAX_LOB_LENGTH); + + Clob c = r.getClob(colNum); + if (null == c) { + return null; + } else if (c.length() > maxInlineLobLen) { + // Deserialize large CLOB into separate file. + String fileName = getNextLobFileName(); + Path p = new Path(FileOutputFormat.getWorkOutputPath(context), fileName); + + Path parent = p.getParent(); + if (!fs.exists(parent)) { + fs.mkdirs(parent); + } + + BufferedWriter w = null; + Reader reader = null; + OutputStream os = fs.create(p); + try { + w = new BufferedWriter(new OutputStreamWriter(os)); + reader = c.getCharacterStream(); + copyAll(reader, w); + } finally { + if (null != w) { + w.close(); + os = null; // os is now closed. + } + + if (null != os) { + os.close(); + } + + if (null != reader) { + reader.close(); + } + } + + return new ClobRef(fileName, true); + } else { + // This is a 1-based array. + return new ClobRef(c.getSubString(1, (int) c.length())); + } + } +} diff --git a/src/java/org/apache/hadoop/sqoop/lib/SqoopRecord.java b/src/java/org/apache/hadoop/sqoop/lib/SqoopRecord.java index 2b51083d..a3fcefd7 100644 --- a/src/java/org/apache/hadoop/sqoop/lib/SqoopRecord.java +++ b/src/java/org/apache/hadoop/sqoop/lib/SqoopRecord.java @@ -18,13 +18,15 @@ package org.apache.hadoop.sqoop.lib; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.sql.SQLException; + import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.lib.db.DBWritable; -import java.nio.ByteBuffer; -import java.nio.CharBuffer; - /** * Interface implemented by the classes generated by sqoop's orm.ClassWriter. */ @@ -35,5 +37,7 @@ public interface SqoopRecord extends DBWritable, Writable { public void parse(char [] s) throws RecordParser.ParseError; public void parse(ByteBuffer s) throws RecordParser.ParseError; public void parse(CharBuffer s) throws RecordParser.ParseError; + public void loadLargeObjects(LargeObjectLoader objLoader) + throws SQLException, IOException, InterruptedException; } diff --git a/src/java/org/apache/hadoop/sqoop/mapreduce/AutoProgressMapper.java b/src/java/org/apache/hadoop/sqoop/mapreduce/AutoProgressMapper.java index 8a889a89..b2c2ce1a 100644 --- a/src/java/org/apache/hadoop/sqoop/mapreduce/AutoProgressMapper.java +++ b/src/java/org/apache/hadoop/sqoop/mapreduce/AutoProgressMapper.java @@ -158,6 +158,7 @@ private final void configureAutoProgress(Configuration job) { /** * Run the mapping process for this task, wrapped in an auto-progress system. */ + @Override public void run(Context context) throws IOException, InterruptedException { configureAutoProgress(context.getConfiguration()); ProgressThread thread = this.new ProgressThread(context); @@ -182,5 +183,4 @@ public void run(Context context) throws IOException, InterruptedException { } } } - } diff --git a/src/java/org/apache/hadoop/sqoop/mapreduce/DataDrivenImportJob.java b/src/java/org/apache/hadoop/sqoop/mapreduce/DataDrivenImportJob.java index d97d3fd3..d2f5dea8 100644 --- a/src/java/org/apache/hadoop/sqoop/mapreduce/DataDrivenImportJob.java +++ b/src/java/org/apache/hadoop/sqoop/mapreduce/DataDrivenImportJob.java @@ -44,6 +44,7 @@ import org.apache.hadoop.sqoop.ConnFactory; import org.apache.hadoop.sqoop.SqoopOptions; import org.apache.hadoop.sqoop.manager.ConnManager; +import org.apache.hadoop.sqoop.lib.LargeObjectLoader; import org.apache.hadoop.sqoop.orm.TableClassName; import org.apache.hadoop.sqoop.util.ClassLoaderStack; import org.apache.hadoop.sqoop.util.ImportException; @@ -85,7 +86,7 @@ protected Class getMapperClass() { if (options.getFileLayout() == SqoopOptions.FileLayout.TextFile) { return TextImportMapper.class; } else if (options.getFileLayout() == SqoopOptions.FileLayout.SequenceFile) { - return AutoProgressMapper.class; + return SequenceFileImportMapper.class; } return null; @@ -140,6 +141,9 @@ protected void configureInputFormat(Job job, String tableName, job.getConfiguration().set(DBConfiguration.INPUT_CLASS_PROPERTY, tableClassName); + job.getConfiguration().setLong(LargeObjectLoader.MAX_INLINE_LOB_LEN_KEY, + options.getInlineLobLimit()); + LOG.debug("Using InputFormat: " + inputFormatClass); job.setInputFormatClass(inputFormatClass); } diff --git a/src/java/org/apache/hadoop/sqoop/mapreduce/SequenceFileImportMapper.java b/src/java/org/apache/hadoop/sqoop/mapreduce/SequenceFileImportMapper.java new file mode 100644 index 00000000..d71c7a78 --- /dev/null +++ b/src/java/org/apache/hadoop/sqoop/mapreduce/SequenceFileImportMapper.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.sqoop.mapreduce; + +import java.io.IOException; +import java.sql.SQLException; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.Mapper.Context; +import org.apache.hadoop.sqoop.lib.LargeObjectLoader; +import org.apache.hadoop.sqoop.lib.SqoopRecord; + +/** + * Imports records by writing them to a SequenceFile. + */ +public class SequenceFileImportMapper + extends AutoProgressMapper { + + public void map(LongWritable key, SqoopRecord val, Context context) + throws IOException, InterruptedException { + + try { + // Loading of LOBs was delayed until we have a Context. + val.loadLargeObjects(new LargeObjectLoader(context)); + } catch (SQLException sqlE) { + throw new IOException(sqlE); + } + + context.write(key, val); + } +} + diff --git a/src/java/org/apache/hadoop/sqoop/mapreduce/TextImportMapper.java b/src/java/org/apache/hadoop/sqoop/mapreduce/TextImportMapper.java index 3970105d..3c683c17 100644 --- a/src/java/org/apache/hadoop/sqoop/mapreduce/TextImportMapper.java +++ b/src/java/org/apache/hadoop/sqoop/mapreduce/TextImportMapper.java @@ -19,18 +19,20 @@ package org.apache.hadoop.sqoop.mapreduce; import java.io.IOException; +import java.sql.SQLException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper.Context; -import org.apache.hadoop.mapreduce.lib.db.DBWritable; +import org.apache.hadoop.sqoop.lib.LargeObjectLoader; +import org.apache.hadoop.sqoop.lib.SqoopRecord; /** - * Converts an input record into a string representation and emit it. + * Imports records by transforming them to strings for a plain-text flat file. */ public class TextImportMapper - extends AutoProgressMapper { + extends AutoProgressMapper { private Text outkey; @@ -38,9 +40,18 @@ public TextImportMapper() { outkey = new Text(); } - public void map(LongWritable key, DBWritable val, Context context) + public void map(LongWritable key, SqoopRecord val, Context context) throws IOException, InterruptedException { + + try { + // Loading of LOBs was delayed until we have a Context. + val.loadLargeObjects(new LargeObjectLoader(context)); + } catch (SQLException sqlE) { + throw new IOException(sqlE); + } + outkey.set(val.toString()); context.write(outkey, NullWritable.get()); } } + diff --git a/src/java/org/apache/hadoop/sqoop/orm/ClassWriter.java b/src/java/org/apache/hadoop/sqoop/orm/ClassWriter.java index e62ee767..7ad1fcd4 100644 --- a/src/java/org/apache/hadoop/sqoop/orm/ClassWriter.java +++ b/src/java/org/apache/hadoop/sqoop/orm/ClassWriter.java @@ -24,6 +24,7 @@ import org.apache.hadoop.sqoop.lib.BigDecimalSerializer; import org.apache.hadoop.sqoop.lib.FieldFormatter; import org.apache.hadoop.sqoop.lib.JdbcWritableBridge; +import org.apache.hadoop.sqoop.lib.LargeObjectLoader; import org.apache.hadoop.sqoop.lib.LobSerializer; import org.apache.hadoop.sqoop.lib.RecordParser; import org.apache.hadoop.sqoop.lib.BlobRef; @@ -415,6 +416,10 @@ private void generateDbRead(Map columnTypes, String [] colNames sb.append(" public void readFields(ResultSet __dbResults) throws SQLException {\n"); + // Save ResultSet object cursor for use in LargeObjectLoader + // if necessary. + sb.append(" this.__cur_result_set = __dbResults;\n"); + int fieldNum = 0; for (String col : colNames) { @@ -440,6 +445,44 @@ private void generateDbRead(Map columnTypes, String [] colNames sb.append(" }\n"); } + /** + * Generate the loadLargeObjects() method called by the mapper to load + * delayed objects (that require the Context from the mapper). + */ + private void generateLoadLargeObjects(Map columnTypes, + String [] colNames, StringBuilder sb) { + + // This method relies on the __cur_result_set field being set by + // readFields() method generated by generateDbRead(). + + sb.append(" public void loadLargeObjects(LargeObjectLoader __loader)\n"); + sb.append(" throws SQLException, IOException, InterruptedException {\n"); + + int fieldNum = 0; + + for (String col : colNames) { + fieldNum++; + + int sqlType = columnTypes.get(col); + String javaType = connManager.toJavaType(sqlType); + if (null == javaType) { + LOG.error("No Java type for SQL type " + sqlType); + continue; + } + + String getterMethod = dbGetterForType(javaType); + if ("readClobRef".equals(getterMethod) + || "readBlobRef".equals(getterMethod)) { + // This field is a blob/clob field with delayed loading. + // Call the appropriate LargeObjectLoader method (which has the + // same name as a JdbcWritableBridge method). + sb.append(" this." + col + " = __loader." + getterMethod + + "(" + fieldNum + ", this.__cur_result_set);\n"); + } + } + sb.append(" }\n"); + } + /** * Generate the write() method used by the database @@ -638,12 +681,9 @@ private void parseColumn(String colName, int colType, StringBuilder sb) { } else if (javaType.equals("java.math.BigDecimal")) { sb.append(" this." + colName + " = new java.math.BigDecimal(__cur_str);\n"); } else if (javaType.equals(ClobRef.class.getName())) { - sb.append(" this." + colName + " = new ClobRef(__cur_str);\n"); + sb.append(" this." + colName + " = ClobRef.parse(__cur_str);\n"); } else if (javaType.equals(BlobRef.class.getName())) { - // We don't support parsing BLOB data. - // Users must store this in SequenceFiles. - LOG.warn("BLOB data cannot be reparsed from text files"); - sb.append(" this." + colName + " = new BlobRef();\n"); + sb.append(" this." + colName + " = BlobRef.parse(__cur_str);\n"); } else { LOG.error("No parser available for Java type " + javaType); } @@ -844,6 +884,7 @@ public StringBuilder generateClassForColumns(Map columnTypes, sb.append("import " + RecordParser.class.getCanonicalName() + ";\n"); sb.append("import " + BlobRef.class.getCanonicalName() + ";\n"); sb.append("import " + ClobRef.class.getCanonicalName() + ";\n"); + sb.append("import " + LargeObjectLoader.class.getCanonicalName() + ";\n"); sb.append("import " + SqoopRecord.class.getCanonicalName() + ";\n"); sb.append("import java.sql.PreparedStatement;\n"); sb.append("import java.sql.ResultSet;\n"); @@ -860,10 +901,13 @@ public StringBuilder generateClassForColumns(Map columnTypes, sb.append("import java.util.List;\n"); String className = tableNameInfo.getShortClassForTable(tableName); - sb.append("public class " + className + " implements DBWritable, SqoopRecord, Writable {\n"); + sb.append("public class " + className + + " implements DBWritable, SqoopRecord, Writable {\n"); sb.append(" public static final int PROTOCOL_VERSION = " + CLASS_WRITER_VERSION + ";\n"); + sb.append(" protected ResultSet __cur_result_set;\n"); generateFields(columnTypes, colNames, sb); generateDbRead(columnTypes, colNames, sb); + generateLoadLargeObjects(columnTypes, colNames, sb); generateDbWrite(columnTypes, colNames, sb); generateHadoopRead(columnTypes, colNames, sb); generateHadoopWrite(columnTypes, colNames, sb); diff --git a/src/test/org/apache/hadoop/sqoop/SmokeTests.java b/src/test/org/apache/hadoop/sqoop/SmokeTests.java index a48182b0..a463f4c0 100644 --- a/src/test/org/apache/hadoop/sqoop/SmokeTests.java +++ b/src/test/org/apache/hadoop/sqoop/SmokeTests.java @@ -23,6 +23,9 @@ import org.apache.hadoop.sqoop.io.TestSplittableBufferedWriter; import org.apache.hadoop.sqoop.lib.TestFieldFormatter; import org.apache.hadoop.sqoop.lib.TestRecordParser; +import org.apache.hadoop.sqoop.lib.TestBlobRef; +import org.apache.hadoop.sqoop.lib.TestClobRef; +import org.apache.hadoop.sqoop.lib.TestLargeObjectLoader; import org.apache.hadoop.sqoop.manager.TestHsqldbManager; import org.apache.hadoop.sqoop.manager.TestSqlManager; import org.apache.hadoop.sqoop.mapreduce.MapreduceTests; @@ -60,6 +63,9 @@ public static Test suite() { suite.addTestSuite(TestConnFactory.class); suite.addTestSuite(TestSplittableBufferedWriter.class); suite.addTestSuite(TestTableDefWriter.class); + suite.addTestSuite(TestBlobRef.class); + suite.addTestSuite(TestClobRef.class); + suite.addTestSuite(TestLargeObjectLoader.class); suite.addTest(MapreduceTests.suite()); return suite; diff --git a/src/test/org/apache/hadoop/sqoop/lib/TestBlobRef.java b/src/test/org/apache/hadoop/sqoop/lib/TestBlobRef.java new file mode 100644 index 00000000..c086d81b --- /dev/null +++ b/src/test/org/apache/hadoop/sqoop/lib/TestBlobRef.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.sqoop.lib; + +import java.io.*; +import java.util.ArrayList; +import java.util.List; + +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + + +/** + * Test that the BlobRef.parse() method does the right thing. + * Note that we don't support inline parsing here; we only expect this to + * really work for external BLOBs. + */ +public class TestBlobRef extends TestCase { + + public void testEmptyStr() { + BlobRef r = BlobRef.parse(""); + assertFalse(r.isExternal()); + } + + public void testInline() throws IOException { + BlobRef r = BlobRef.parse("foo"); + assertFalse(r.isExternal()); + } + + public void testEmptyFile() { + BlobRef r = BlobRef.parse("externalBlob()"); + assertTrue(r.isExternal()); + assertEquals("externalBlob()", r.toString()); + } + + public void testInlineNearMatch() { + BlobRef r = BlobRef.parse("externalBlob(foo)bar"); + assertFalse(r.isExternal()); + } + + public void testExternal() throws IOException { + final byte [] DATA = { 1, 2, 3, 4, 5 }; + final String FILENAME = "blobdata"; + + doExternalTest(DATA, FILENAME); + } + + public void testExternalSubdir() throws IOException { + final byte [] DATA = { 1, 2, 3, 4, 5 }; + final String FILENAME = "_lob/blobdata"; + + try { + doExternalTest(DATA, FILENAME); + } finally { + // remove dir we made. + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.getLocal(conf); + String tmpDir = System.getProperty("test.build.data", "/tmp/"); + Path lobDir = new Path(new Path(tmpDir), "_lob"); + fs.delete(lobDir, false); + } + } + + private void doExternalTest(final byte [] DATA, final String FILENAME) + throws IOException { + + Configuration conf = new Configuration(); + conf.set("fs.defaultFS", "file:///"); + FileSystem fs = FileSystem.get(conf); + String tmpDir = System.getProperty("test.build.data", "/tmp/"); + + Path tmpPath = new Path(tmpDir); + + Path blobFile = new Path(tmpPath, FILENAME); + + // make any necessary parent dirs. + Path blobParent = blobFile.getParent(); + if (!fs.exists(blobParent)) { + fs.mkdirs(blobParent); + } + + OutputStream os = fs.create(blobFile); + try { + os.write(DATA, 0, DATA.length); + os.close(); + + BlobRef blob = BlobRef.parse("externalBlob(" + FILENAME + ")"); + assertTrue(blob.isExternal()); + assertEquals("externalBlob(" + FILENAME + ")", blob.toString()); + InputStream is = blob.getDataStream(conf, tmpPath); + assertNotNull(is); + + byte [] buf = new byte[4096]; + int bytes = is.read(buf, 0, 4096); + is.close(); + + assertEquals(DATA.length, bytes); + for (int i = 0; i < bytes; i++) { + assertEquals(DATA[i], buf[i]); + } + } finally { + fs.delete(blobFile, false); + } + } +} + diff --git a/src/test/org/apache/hadoop/sqoop/lib/TestClobRef.java b/src/test/org/apache/hadoop/sqoop/lib/TestClobRef.java new file mode 100644 index 00000000..afd2f051 --- /dev/null +++ b/src/test/org/apache/hadoop/sqoop/lib/TestClobRef.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.sqoop.lib; + +import java.io.*; +import java.util.ArrayList; +import java.util.List; + +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + + +/** + * Test parsing of ClobRef objects. + */ +public class TestClobRef extends TestCase { + + public void testEmptyStr() { + ClobRef r = ClobRef.parse(""); + assertFalse(r.isExternal()); + assertEquals("", r.toString()); + } + + public void testInline() throws IOException { + ClobRef r = ClobRef.parse("foo"); + assertFalse(r.isExternal()); + assertEquals("foo", r.toString()); + + Reader reader = r.getDataReader(null, null); + assertNotNull(reader); + char [] buf = new char[4096]; + int chars = reader.read(buf, 0, 4096); + reader.close(); + + String str = new String(buf, 0, chars); + assertEquals("foo", str); + } + + public void testEmptyFile() { + ClobRef r = ClobRef.parse("externalClob()"); + assertTrue(r.isExternal()); + assertEquals("externalClob()", r.toString()); + } + + public void testInlineNearMatch() { + ClobRef r = ClobRef.parse("externalClob(foo)bar"); + assertFalse(r.isExternal()); + assertEquals("externalClob(foo)bar", r.toString()); + } + + public void testExternal() throws IOException { + final String DATA = "This is the clob data!"; + final String FILENAME = "clobdata"; + + doExternalTest(DATA, FILENAME); + } + + public void testExternalSubdir() throws IOException { + final String DATA = "This is the clob data!"; + final String FILENAME = "_lob/clobdata"; + + try { + doExternalTest(DATA, FILENAME); + } finally { + // remove dir we made. + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.getLocal(conf); + String tmpDir = System.getProperty("test.build.data", "/tmp/"); + Path lobDir = new Path(new Path(tmpDir), "_lob"); + fs.delete(lobDir, false); + } + } + + private void doExternalTest(final String DATA, final String FILENAME) + throws IOException { + + Configuration conf = new Configuration(); + conf.set("fs.defaultFS", "file:///"); + FileSystem fs = FileSystem.get(conf); + String tmpDir = System.getProperty("test.build.data", "/tmp/"); + + Path tmpPath = new Path(tmpDir); + + Path clobFile = new Path(tmpPath, FILENAME); + + // make any necessary parent dirs. + Path clobParent = clobFile.getParent(); + if (!fs.exists(clobParent)) { + fs.mkdirs(clobParent); + } + + BufferedWriter w = new BufferedWriter(new OutputStreamWriter( + fs.create(clobFile))); + try { + w.append(DATA); + w.close(); + + ClobRef clob = ClobRef.parse("externalClob(" + FILENAME + ")"); + assertTrue(clob.isExternal()); + assertEquals("externalClob(" + FILENAME + ")", clob.toString()); + Reader r = clob.getDataReader(conf, tmpPath); + assertNotNull(r); + + char [] buf = new char[4096]; + int chars = r.read(buf, 0, 4096); + r.close(); + + String str = new String(buf, 0, chars); + assertEquals(DATA, str); + } finally { + fs.delete(clobFile, false); + } + } +} + diff --git a/src/test/org/apache/hadoop/sqoop/lib/TestLargeObjectLoader.java b/src/test/org/apache/hadoop/sqoop/lib/TestLargeObjectLoader.java new file mode 100644 index 00000000..29d16163 --- /dev/null +++ b/src/test/org/apache/hadoop/sqoop/lib/TestLargeObjectLoader.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.sqoop.lib; + +import java.io.*; +import java.util.ArrayList; +import java.util.List; +import java.sql.ResultSet; +import java.sql.SQLException; + +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.MapContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; +import org.apache.hadoop.mrunit.mapreduce.mock.MockMapContext; +import org.apache.hadoop.mrunit.types.Pair; +import org.apache.hadoop.sqoop.testutil.MockResultSet; + +/** + * Test deserialization of ClobRef and BlobRef fields. + */ +public class TestLargeObjectLoader extends TestCase { + + /** + * A mock MapContext that uses FileOutputCommitter. + * This MapContext is actually serving two roles here; when writing the + * CLOB files, its OutputCommitter is used to determine where to write + * the CLOB data, as these are placed in the task output work directory. + * When reading the CLOB data back for verification, we use the + * getInputSplit() to determine where to read our source data from--the same + * directory. We are repurposing the same context for both output and input. + */ + private static class MockMapContextWithCommitter + extends MockMapContext { + private Path outputDir; + private Configuration conf; + + public MockMapContextWithCommitter(Configuration conf, Path outDir) { + super(new ArrayList>(), new Counters()); + + this.outputDir = outDir; + this.conf = conf; + } + + @Override + public OutputCommitter getOutputCommitter() { + try { + return new FileOutputCommitter(outputDir, this); + } catch (IOException ioe) { + return null; + } + } + + @Override + public InputSplit getInputSplit() { + return new FileSplit(new Path(outputDir, "inputFile"), 0, 0, new String[0]); + } + + @Override + public Configuration getConfiguration() { + return conf; + } + } + + protected Configuration conf; + protected MapContext mapContext; + protected LargeObjectLoader loader; + + public void setUp() throws IOException { + conf = new Configuration(); + conf.set("fs.defaultFS", "file:///"); + String tmpDir = System.getProperty("test.build.data", "/tmp/"); + Path outDir = new Path(new Path(tmpDir), "testLobLoader"); + FileSystem fs = FileSystem.getLocal(conf); + if (fs.exists(outDir)) { + fs.delete(outDir, true); + } + fs.mkdirs(outDir); + + mapContext = new MockMapContextWithCommitter(conf, outDir); + loader = new LargeObjectLoader(mapContext); + } + + public void testReadClobRef() + throws IOException, InterruptedException, SQLException { + // This should give us an inline CLOB. + ResultSet resultSet = new MockResultSet(); + ClobRef clob = loader.readClobRef(0, resultSet); + assertNotNull(clob); + assertFalse(clob.isExternal()); + assertEquals(MockResultSet.CLOB_DATA, clob.toString()); + + // LOBs bigger than 4 bytes are now external. + conf.setLong(LargeObjectLoader.MAX_INLINE_LOB_LEN_KEY, 4); + clob = loader.readClobRef(0, resultSet); + assertNotNull(clob); + assertTrue(clob.isExternal()); + mapContext.getOutputCommitter().commitTask(mapContext); + Reader r = clob.getDataReader(mapContext); + char [] buf = new char[4096]; + int chars = r.read(buf, 0, 4096); + r.close(); + String str = new String(buf, 0, chars); + assertEquals(MockResultSet.CLOB_DATA, str); + } + + public void testReadBlobRef() + throws IOException, InterruptedException, SQLException { + // This should give us an inline BLOB. + ResultSet resultSet = new MockResultSet(); + BlobRef blob = loader.readBlobRef(0, resultSet); + assertNotNull(blob); + assertFalse(blob.isExternal()); + byte [] data = blob.getData(); + assertEquals(MockResultSet.BLOB_DATA.length, data.length); + for (int i = 0; i < data.length; i++) { + assertEquals(MockResultSet.BLOB_DATA[i], data[i]); + } + + // LOBs bigger than 4 bytes are now external. + conf.setLong(LargeObjectLoader.MAX_INLINE_LOB_LEN_KEY, 4); + blob = loader.readBlobRef(0, resultSet); + assertNotNull(blob); + assertTrue(blob.isExternal()); + mapContext.getOutputCommitter().commitTask(mapContext); + InputStream is = blob.getDataStream(mapContext); + byte [] buf = new byte[4096]; + int bytes = is.read(buf, 0, 4096); + is.close(); + + assertEquals(MockResultSet.BLOB_DATA.length, bytes); + for (int i = 0; i < bytes; i++) { + assertEquals(MockResultSet.BLOB_DATA[i], buf[i]); + } + } +} + diff --git a/src/test/org/apache/hadoop/sqoop/mapreduce/TestTextImportMapper.java b/src/test/org/apache/hadoop/sqoop/mapreduce/TestTextImportMapper.java index ad330187..97bac5e2 100644 --- a/src/test/org/apache/hadoop/sqoop/mapreduce/TestTextImportMapper.java +++ b/src/test/org/apache/hadoop/sqoop/mapreduce/TestTextImportMapper.java @@ -21,6 +21,8 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; @@ -28,8 +30,9 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.lib.db.DBWritable; import org.apache.hadoop.mrunit.mapreduce.MapDriver; +import org.apache.hadoop.sqoop.lib.LargeObjectLoader; +import org.apache.hadoop.sqoop.lib.SqoopRecord; import junit.framework.TestCase; @@ -39,10 +42,10 @@ public class TestTextImportMapper extends TestCase { - static class DummyDBWritable implements DBWritable { + static class DummySqoopRecord implements SqoopRecord { long field; - public DummyDBWritable(final long val) { + public DummySqoopRecord(final long val) { this.field = val; } @@ -65,14 +68,22 @@ public void write(PreparedStatement s) throws SQLException { public String toString() { return "" + field; } + + public void loadLargeObjects(LargeObjectLoader loader) { } + public void parse(CharSequence s) { } + public void parse(Text s) { } + public void parse(byte [] s) { } + public void parse(char [] s) { } + public void parse(ByteBuffer s) { } + public void parse(CharBuffer s) { } } public void testTextImport() { TextImportMapper m = new TextImportMapper(); - MapDriver driver = - new MapDriver(m); + MapDriver driver = + new MapDriver(m); - driver.withInput(new LongWritable(0), new DummyDBWritable(42)) + driver.withInput(new LongWritable(0), new DummySqoopRecord(42)) .withOutput(new Text("42"), NullWritable.get()) .runTest(); } diff --git a/src/test/org/apache/hadoop/sqoop/testutil/MockResultSet.java b/src/test/org/apache/hadoop/sqoop/testutil/MockResultSet.java new file mode 100644 index 00000000..8b3cbb9f --- /dev/null +++ b/src/test/org/apache/hadoop/sqoop/testutil/MockResultSet.java @@ -0,0 +1,1012 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.sqoop.testutil; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.StringReader; +import java.io.Reader; +import java.io.UnsupportedEncodingException; +import java.io.Writer; +import java.math.BigDecimal; +import java.net.URL; +import java.sql.Array; +import java.sql.Blob; +import java.sql.Clob; +import java.sql.Date; +import java.sql.NClob; +import java.sql.Ref; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.RowId; +import java.sql.SQLException; +import java.sql.SQLWarning; +import java.sql.SQLXML; +import java.sql.Statement; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.Calendar; +import java.util.Map; + +/** + * Mock ResultSet instance that mocks Clob/Blob behavior. + */ +public class MockResultSet implements ResultSet { + + public static final byte [] BLOB_DATA = { 0x0, 0x1, 0x2, 0x3, + 0x4, 0x5, 0x6, 0x7, 0x8, 0x9, 0xA, 0xB, 0xC, 0xD, 0xE, 0xF }; + + public static final String CLOB_DATA = "This is the mock clob data!"; + + /** + * Read-only Blob class that returns 16 bytes 0x0 .. 0xf + */ + public static class MockBlob implements Blob { + public InputStream getBinaryStream() { + return new ByteArrayInputStream(BLOB_DATA); + } + + public InputStream getBinaryStream(long pos, long len) { + return new ByteArrayInputStream(getBytes(pos, (int) len)); + } + + public byte [] getBytes(long pos, int length) { + byte [] bytes = new byte[length]; + + int start = (int) pos - 1; // SQL uses 1-based arrays!! + for (int i = 0; i < length; i++) { + bytes[i] = BLOB_DATA[i + start]; + } + return bytes; + } + + public long length() { + return BLOB_DATA.length; + } + + + public long position(Blob pattern, long start) { return 0; } + public long position(byte[] pattern, long start) { return 0; } + public OutputStream setBinaryStream(long pos) { return null; } + public int setBytes(long pos, byte[] bytes) { return 0; } + public int setBytes(long pos, byte[] bytes, int offset, int len) { return 0; } + public void truncate(long len) { } + public void free() { } + } + + /** + * Read-only Clob class that returns the CLOB_DATA string only. + */ + public static class MockClob implements Clob { + @Override + public InputStream getAsciiStream() { + try { + return new ByteArrayInputStream(CLOB_DATA.getBytes("UTF-8")); + } catch (UnsupportedEncodingException uee) { + return null; + } + } + + @Override + public Reader getCharacterStream() { + return new StringReader(CLOB_DATA); + } + + public Reader getCharacterStream(long pos, long len) { + return new StringReader(getSubString(pos, (int) len)); + } + + @Override + public String getSubString(long pos, int length) { + long start = pos - 1; // 1-based offsets in SQL + return CLOB_DATA.substring((int) start, (int) (start + length)); + } + + @Override + public long length() { + return CLOB_DATA.length(); + } + + public long position(Clob searchstr, long start) { return 0; } + public long position(String searchstr, long start) { return 0; } + public OutputStream setAsciiStream(long pos) { return null; } + public Writer setCharacterStream(long pos) { return null; } + public int setString(long pos, String str) { return 0; } + public int setString(long pos, String str, int offset, int len) { return 0; } + public void truncate(long len) { } + public void free() { } + } + + + // Methods that return mock Blob or Clob instances. + + public InputStream getAsciiStream(int columnIndex) { + return new MockClob().getAsciiStream(); + } + public InputStream getAsciiStream(String columnName) { + return new MockClob().getAsciiStream(); + } + public InputStream getBinaryStream(int columnIndex) { + return new MockBlob().getBinaryStream(); + } + public InputStream getBinaryStream(String columnName) { + return new MockBlob().getBinaryStream(); + } + public Blob getBlob(int i) { + return new MockBlob(); + } + public Blob getBlob(String colName) { + return new MockBlob(); + } + public Reader getCharacterStream(int columnIndex) { + return new MockClob().getCharacterStream(); + } + public Reader getCharacterStream(String columnName) { + return new MockClob().getCharacterStream(); + } + public Clob getClob(int i) { + return new MockClob(); + } + public Clob getClob(String colName) { + return new MockClob(); + } + + // Methods down here just return the default value for whatever + // type they're using (usually null, 0, or false). + // These stubs were all auto-generated by Eclipse. + @Override + public boolean absolute(int row) throws SQLException { + return false; + } + + @Override + public void afterLast() throws SQLException { } + + @Override + public void beforeFirst() throws SQLException { } + + @Override + public void cancelRowUpdates() throws SQLException { } + + @Override + public void clearWarnings() throws SQLException { } + + @Override + public void close() throws SQLException { } + + @Override + public void deleteRow() throws SQLException { } + + @Override + public int findColumn(String columnLabel) throws SQLException { + return 0; + } + + @Override + public boolean first() throws SQLException { + return false; + } + + @Override + public Array getArray(int columnIndex) throws SQLException { + return null; + } + + @Override + public Array getArray(String columnLabel) throws SQLException { + return null; + } + + @Override + public BigDecimal getBigDecimal(int columnIndex) throws SQLException { + return null; + } + + @Override + public BigDecimal getBigDecimal(String columnLabel) throws SQLException { + return null; + } + + @Override + public BigDecimal getBigDecimal(int columnIndex, int scale) + throws SQLException { + return null; + } + + @Override + public BigDecimal getBigDecimal(String columnLabel, int scale) + throws SQLException { + return null; + } + @Override + public boolean getBoolean(int columnIndex) throws SQLException { + return false; + } + + @Override + public boolean getBoolean(String columnLabel) throws SQLException { + return false; + } + + @Override + public byte getByte(int columnIndex) throws SQLException { + return 0; + } + + @Override + public byte getByte(String columnLabel) throws SQLException { + return 0; + } + + @Override + public int getConcurrency() throws SQLException { + return 0; + } + + @Override + public String getCursorName() throws SQLException { + return null; + } + + @Override + public Date getDate(int columnIndex) throws SQLException { + return null; + } + + @Override + public Date getDate(String columnLabel) throws SQLException { + return null; + } + + @Override + public Date getDate(int columnIndex, Calendar cal) throws SQLException { + return null; + } + + @Override + public Date getDate(String columnLabel, Calendar cal) throws SQLException { + return null; + } + + @Override + public double getDouble(int columnIndex) throws SQLException { + return 0; + } + + @Override + public double getDouble(String columnLabel) throws SQLException { + return 0; + } + + @Override + public int getFetchDirection() throws SQLException { + return 0; + } + + @Override + public int getFetchSize() throws SQLException { + return 0; + } + + @Override + public float getFloat(int columnIndex) throws SQLException { + return 0; + } + + @Override + public float getFloat(String columnLabel) throws SQLException { + return 0; + } + + @Override + public int getHoldability() throws SQLException { + return 0; + } + + @Override + public int getInt(int columnIndex) throws SQLException { + return 0; + } + + @Override + public int getInt(String columnLabel) throws SQLException { + return 0; + } + + @Override + public long getLong(int columnIndex) throws SQLException { + return 0; + } + + @Override + public long getLong(String columnLabel) throws SQLException { + return 0; + } + + @Override + public ResultSetMetaData getMetaData() throws SQLException { + return null; + } + + @Override + public Reader getNCharacterStream(int columnIndex) throws SQLException { + return null; + } + + @Override + public Reader getNCharacterStream(String columnLabel) throws SQLException { + return null; + } + + @Override + public NClob getNClob(int columnIndex) throws SQLException { + return null; + } + + @Override + public NClob getNClob(String columnLabel) throws SQLException { + return null; + } + + @Override + public String getNString(int columnIndex) throws SQLException { + return null; + } + + @Override + public String getNString(String columnLabel) throws SQLException { + return null; + } + + @Override + public Object getObject(int columnIndex) throws SQLException { + return null; + } + + @Override + public Object getObject(String columnLabel) throws SQLException { + return null; + } + + @Override + public Object getObject(int columnIndex, Map> map) + throws SQLException { + return null; + } + + @Override + public Object getObject(String columnLabel, Map> map) + throws SQLException { + return null; + } + + @Override + public Ref getRef(int columnIndex) throws SQLException { + return null; + } + + @Override + public Ref getRef(String columnLabel) throws SQLException { + return null; + } + + @Override + public int getRow() throws SQLException { + return 0; + } + + @Override + public RowId getRowId(int columnIndex) throws SQLException { + return null; + } + + @Override + public RowId getRowId(String columnLabel) throws SQLException { + return null; + } + + @Override + public SQLXML getSQLXML(int columnIndex) throws SQLException { + return null; + } + + @Override + public SQLXML getSQLXML(String columnLabel) throws SQLException { + return null; + } + + @Override + public short getShort(int columnIndex) throws SQLException { + return 0; + } + + @Override + public short getShort(String columnLabel) throws SQLException { + return 0; + } + + @Override + public Statement getStatement() throws SQLException { + return null; + } + + @Override + public String getString(int columnIndex) throws SQLException { + return null; + } + + @Override + public String getString(String columnLabel) throws SQLException { + return null; + } + + @Override + public Time getTime(int columnIndex) throws SQLException { + return null; + } + + @Override + public Time getTime(String columnLabel) throws SQLException { + return null; + } + + @Override + public Time getTime(int columnIndex, Calendar cal) throws SQLException { + return null; + } + + @Override + public Time getTime(String columnLabel, Calendar cal) throws SQLException { + return null; + } + + @Override + public Timestamp getTimestamp(int columnIndex) throws SQLException { + return null; + } + + @Override + public Timestamp getTimestamp(String columnLabel) throws SQLException { + return null; + } + + @Override + public Timestamp getTimestamp(int columnIndex, Calendar cal) + throws SQLException { + return null; + } + + @Override + public Timestamp getTimestamp(String columnLabel, Calendar cal) + throws SQLException { + return null; + } + + @Override + public int getType() throws SQLException { + return 0; + } + + @Override + public URL getURL(int columnIndex) throws SQLException { + return null; + } + + @Override + public URL getURL(String columnLabel) throws SQLException { + return null; + } + + @Override + public InputStream getUnicodeStream(int columnIndex) throws SQLException { + return null; + } + + @Override + public InputStream getUnicodeStream(String columnLabel) throws SQLException { + return null; + } + + @Override + public SQLWarning getWarnings() throws SQLException { + return null; + } + + @Override + public void insertRow() throws SQLException { + } + + @Override + public boolean isAfterLast() throws SQLException { + return false; + } + + @Override + public boolean isBeforeFirst() throws SQLException { + return false; + } + + @Override + public boolean isClosed() throws SQLException { + return false; + } + + @Override + public boolean isFirst() throws SQLException { + return false; + } + + @Override + public boolean isLast() throws SQLException { + return false; + } + + @Override + public boolean last() throws SQLException { + return false; + } + + @Override + public void moveToCurrentRow() throws SQLException { + } + + @Override + public void moveToInsertRow() throws SQLException { + } + + @Override + public boolean next() throws SQLException { + return false; + } + + @Override + public boolean previous() throws SQLException { + return false; + } + + @Override + public void refreshRow() throws SQLException { + } + + @Override + public boolean relative(int rows) throws SQLException { + return false; + } + + @Override + public boolean rowDeleted() throws SQLException { + return false; + } + + @Override + public boolean rowInserted() throws SQLException { + return false; + } + + @Override + public boolean rowUpdated() throws SQLException { + return false; + } + + @Override + public void setFetchDirection(int direction) throws SQLException { + } + + @Override + public void setFetchSize(int rows) throws SQLException { + } + + @Override + public void updateArray(int columnIndex, Array x) throws SQLException { + } + + @Override + public void updateArray(String columnLabel, Array x) throws SQLException { + } + + @Override + public void updateAsciiStream(int columnIndex, InputStream x) + throws SQLException { + } + + @Override + public void updateAsciiStream(String columnLabel, InputStream x) + throws SQLException { + } + + @Override + public void updateAsciiStream(int columnIndex, InputStream x, int length) + throws SQLException { + } + + @Override + public void updateAsciiStream(String columnLabel, InputStream x, int length) + throws SQLException { + } + + @Override + public void updateAsciiStream(int columnIndex, InputStream x, long length) + throws SQLException { + } + + @Override + public void updateAsciiStream(String columnLabel, InputStream x, long length) + throws SQLException { + } + + @Override + public void updateBigDecimal(int columnIndex, BigDecimal x) + throws SQLException { + } + + @Override + public void updateBigDecimal(String columnLabel, BigDecimal x) + throws SQLException { + } + + @Override + public void updateBinaryStream(int columnIndex, InputStream x) + throws SQLException { + } + + @Override + public void updateBinaryStream(String columnLabel, InputStream x) + throws SQLException { + } + + @Override + public void updateBinaryStream(int columnIndex, InputStream x, int length) + throws SQLException { + } + + @Override + public void updateBinaryStream(String columnLabel, InputStream x, int length) + throws SQLException { + } + + @Override + public void updateBinaryStream(int columnIndex, InputStream x, long length) + throws SQLException { + } + + @Override + public void updateBinaryStream(String columnLabel, InputStream x, long length) + throws SQLException { + } + + @Override + public void updateBlob(int columnIndex, Blob x) throws SQLException { + } + + @Override + public void updateBlob(String columnLabel, Blob x) throws SQLException { + } + + @Override + public void updateBlob(int columnIndex, InputStream inputStream) + throws SQLException { + } + + @Override + public void updateBlob(String columnLabel, InputStream inputStream) + throws SQLException { + } + + @Override + public void updateBlob(int columnIndex, InputStream inputStream, long length) + throws SQLException { + } + + @Override + public void updateBlob(String columnLabel, InputStream inputStream, + long length) throws SQLException { + } + + @Override + public void updateBoolean(int columnIndex, boolean x) throws SQLException { + } + + @Override + public void updateBoolean(String columnLabel, boolean x) throws SQLException { + } + + @Override + public void updateByte(int columnIndex, byte x) throws SQLException { + } + + @Override + public void updateByte(String columnLabel, byte x) throws SQLException { + } + + @Override + public void updateBytes(int columnIndex, byte[] x) throws SQLException { + } + + @Override + public void updateBytes(String columnLabel, byte[] x) throws SQLException { + } + + @Override + public void updateCharacterStream(int columnIndex, Reader x) + throws SQLException { + } + + @Override + public void updateCharacterStream(String columnLabel, Reader reader) + throws SQLException { + } + + @Override + public void updateCharacterStream(int columnIndex, Reader x, int length) + throws SQLException { + } + + @Override + public void updateCharacterStream(String columnLabel, Reader reader, + int length) throws SQLException { + } + + @Override + public void updateCharacterStream(int columnIndex, Reader x, long length) + throws SQLException { + } + + @Override + public void updateCharacterStream(String columnLabel, Reader reader, + long length) throws SQLException { + } + + @Override + public void updateClob(int columnIndex, Clob x) throws SQLException { + } + + @Override + public void updateClob(String columnLabel, Clob x) throws SQLException { + } + + @Override + public void updateClob(int columnIndex, Reader reader) throws SQLException { + } + + @Override + public void updateClob(String columnLabel, Reader reader) throws SQLException { + } + + @Override + public void updateClob(int columnIndex, Reader reader, long length) + throws SQLException { + } + + @Override + public void updateClob(String columnLabel, Reader reader, long length) + throws SQLException { + } + + @Override + public void updateDate(int columnIndex, Date x) throws SQLException { + } + + @Override + public void updateDate(String columnLabel, Date x) throws SQLException { + } + + @Override + public void updateDouble(int columnIndex, double x) throws SQLException { + } + + @Override + public void updateDouble(String columnLabel, double x) throws SQLException { + } + + @Override + public void updateFloat(int columnIndex, float x) throws SQLException { + } + + @Override + public void updateFloat(String columnLabel, float x) throws SQLException { + } + + @Override + public void updateInt(int columnIndex, int x) throws SQLException { + } + + @Override + public void updateInt(String columnLabel, int x) throws SQLException { + } + + @Override + public void updateLong(int columnIndex, long x) throws SQLException { + } + + @Override + public void updateLong(String columnLabel, long x) throws SQLException { + } + + @Override + public void updateNCharacterStream(int columnIndex, Reader x) + throws SQLException { + } + + @Override + public void updateNCharacterStream(String columnLabel, Reader reader) + throws SQLException { + } + + @Override + public void updateNCharacterStream(int columnIndex, Reader x, long length) + throws SQLException { + } + + @Override + public void updateNCharacterStream(String columnLabel, Reader reader, + long length) throws SQLException { + } + + @Override + public void updateNClob(int columnIndex, NClob clob) throws SQLException { + } + + @Override + public void updateNClob(String columnLabel, NClob clob) throws SQLException { + } + + @Override + public void updateNClob(int columnIndex, Reader reader) throws SQLException { + } + + @Override + public void updateNClob(String columnLabel, Reader reader) + throws SQLException { + } + + @Override + public void updateNClob(int columnIndex, Reader reader, long length) + throws SQLException { + } + + @Override + public void updateNClob(String columnLabel, Reader reader, long length) + throws SQLException { + } + + @Override + public void updateNString(int columnIndex, String string) throws SQLException { + } + + @Override + public void updateNString(String columnLabel, String string) + throws SQLException { + } + + @Override + public void updateNull(int columnIndex) throws SQLException { + } + + @Override + public void updateNull(String columnLabel) throws SQLException { + } + + @Override + public void updateObject(int columnIndex, Object x) throws SQLException { + } + + @Override + public void updateObject(String columnLabel, Object x) throws SQLException { + } + + @Override + public void updateObject(int columnIndex, Object x, int scaleOrLength) + throws SQLException { + } + + @Override + public void updateObject(String columnLabel, Object x, int scaleOrLength) + throws SQLException { + } + + @Override + public void updateRef(int columnIndex, Ref x) throws SQLException { + } + + @Override + public void updateRef(String columnLabel, Ref x) throws SQLException { + } + + @Override + public void updateRow() throws SQLException { + } + + @Override + public void updateRowId(int columnIndex, RowId x) throws SQLException { + } + + @Override + public void updateRowId(String columnLabel, RowId x) throws SQLException { + } + + @Override + public void updateSQLXML(int columnIndex, SQLXML xmlObject) + throws SQLException { + } + + @Override + public void updateSQLXML(String columnLabel, SQLXML xmlObject) + throws SQLException { + } + + @Override + public void updateShort(int columnIndex, short x) throws SQLException { + } + + @Override + public void updateShort(String columnLabel, short x) throws SQLException { + } + + @Override + public void updateString(int columnIndex, String x) throws SQLException { + } + + @Override + public void updateString(String columnLabel, String x) throws SQLException { + } + + @Override + public void updateTime(int columnIndex, Time x) throws SQLException { + } + + @Override + public void updateTime(String columnLabel, Time x) throws SQLException { + } + + @Override + public void updateTimestamp(int columnIndex, Timestamp x) throws SQLException { + } + + @Override + public void updateTimestamp(String columnLabel, Timestamp x) + throws SQLException { + } + + @Override + public boolean wasNull() throws SQLException { + return false; + } + + @Override + public boolean isWrapperFor(Class iface) throws SQLException { + return false; + } + + @Override + public T unwrap(Class iface) throws SQLException { + return null; + } + + @Override + public byte[] getBytes(int columnIndex) throws SQLException { + return null; + } + + @Override + public byte[] getBytes(String columnLabel) throws SQLException { + return null; + } +} +