diff --git a/src/docs/Sqoop-manpage.txt b/src/docs/Sqoop-manpage.txt index 441d7280..5e898a3e 100644 --- a/src/docs/Sqoop-manpage.txt +++ b/src/docs/Sqoop-manpage.txt @@ -123,6 +123,10 @@ Import control options When using direct mode, write to multiple files of approximately _size_ bytes each. +--inline-lob-limit (size):: + When importing LOBs, keep objects inline up to + _size_ bytes. + Export control options ~~~~~~~~~~~~~~~~~~~~~~ diff --git a/src/docs/misc-args.txt b/src/docs/misc-args.txt index 21439424..4c7b05df 100644 --- a/src/docs/misc-args.txt +++ b/src/docs/misc-args.txt @@ -34,6 +34,14 @@ Data emitted to HDFS is by default uncompressed. You can instruct Sqoop to use gzip to compress your data by providing either the +--compress+ or +-z+ argument (both are equivalent). +Small CLOB and BLOB values will be imported as string-based data inline +with the rest of their containing record. Over a size threshold (by +default, 16 MB per object), these values will not be materialized directly, +inline, and will be written to external files in HDFS; the inline records +will contain pointers to these files. The inline materialization limit can +be controlled with the +--inline-lob-limit+ argument; the limit itself is +specified in bytes. + Using +--verbose+ will instruct Sqoop to print more details about its operation; this is particularly handy if Sqoop appears to be misbehaving. diff --git a/src/java/org/apache/hadoop/sqoop/SqoopOptions.java b/src/java/org/apache/hadoop/sqoop/SqoopOptions.java index dcbdfa66..15354854 100644 --- a/src/java/org/apache/hadoop/sqoop/SqoopOptions.java +++ b/src/java/org/apache/hadoop/sqoop/SqoopOptions.java @@ -30,6 +30,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.sqoop.lib.LargeObjectLoader; import org.apache.hadoop.util.ToolRunner; import org.apache.log4j.Category; import org.apache.log4j.Level; @@ -115,6 +116,9 @@ public enum FileLayout { private boolean useCompression; private long directSplitSize; // In direct mode, open a new stream every X bytes. + private long maxInlineLobSize; // Max size of an inline LOB; larger LOBs are written + // to external files on disk. + private String exportDir; // HDFS path to read from when performing an export private char inputFieldDelim; @@ -275,6 +279,8 @@ private void initDefaults(Configuration baseConfiguration) { this.useCompression = false; this.directSplitSize = 0; + this.maxInlineLobSize = LargeObjectLoader.DEFAULT_MAX_LOB_LENGTH; + if (null == baseConfiguration) { this.conf = new Configuration(); } else { @@ -328,6 +334,7 @@ public static void printUsage() { System.out.println("-z, --compress Enable compression"); System.out.println("--direct-split-size (n) Split the input stream every 'n' bytes"); System.out.println(" when importing in direct mode."); + System.out.println("--inline-lob-limit (n) Set the maximum size for an inline LOB"); System.out.println(""); System.out.println("Export options:"); System.out.println("--export-dir (dir) Export from an HDFS path into a table"); @@ -584,6 +591,8 @@ public void parse(String [] args) throws InvalidOptionsException { this.useCompression = true; } else if (args[i].equals("--direct-split-size")) { this.directSplitSize = Long.parseLong(args[++i]); + } else if (args[i].equals("--inline-lob-limit")) { + this.maxInlineLobSize = Long.parseLong(args[++i]); } else if (args[i].equals("--jar-file")) { this.existingJarFile = args[++i]; } else if (args[i].equals("--list-databases")) { @@ -1003,6 +1012,13 @@ public long getDirectSplitSize() { return this.directSplitSize; } + /** + * @return the max size of a LOB before we spill to a separate file. + */ + public long getInlineLobLimit() { + return this.maxInlineLobSize; + } + public Configuration getConf() { return conf; } diff --git a/src/java/org/apache/hadoop/sqoop/hive/HiveTypes.java b/src/java/org/apache/hadoop/sqoop/hive/HiveTypes.java index 59c6fda6..7cc83a8e 100644 --- a/src/java/org/apache/hadoop/sqoop/hive/HiveTypes.java +++ b/src/java/org/apache/hadoop/sqoop/hive/HiveTypes.java @@ -74,8 +74,10 @@ public static String toHiveType(int sqlType) { } else if (sqlType == Types.TIMESTAMP) { // unfortunate type coercion return "STRING"; + } else if (sqlType == Types.CLOB) { + return "STRING"; } else { - // TODO(aaron): Support BINARY, VARBINARY, LONGVARBINARY, DISTINCT, CLOB, + // TODO(aaron): Support BINARY, VARBINARY, LONGVARBINARY, DISTINCT, // BLOB, ARRAY, STRUCT, REF, JAVA_OBJECT. return null; } diff --git a/src/java/org/apache/hadoop/sqoop/lib/BlobRef.java b/src/java/org/apache/hadoop/sqoop/lib/BlobRef.java index 49b33b47..877fe0b6 100644 --- a/src/java/org/apache/hadoop/sqoop/lib/BlobRef.java +++ b/src/java/org/apache/hadoop/sqoop/lib/BlobRef.java @@ -18,12 +18,26 @@ package org.apache.hadoop.sqoop.lib; +import java.io.ByteArrayInputStream; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.io.InputStream; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.MapContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; /** * BlobRef is a wrapper that holds a Blob either directly, or a @@ -31,13 +45,20 @@ */ public class BlobRef implements Writable { + public static final Log LOG = LogFactory.getLog(BlobRef.class.getName()); + public BlobRef(byte [] bytes) { - this.blobFileNum = 0; + this.fileName = null; this.data = new BytesWritable(bytes); } public BlobRef() { - this.blobFileNum = 0; + this.fileName = null; + this.data = null; + } + + public BlobRef(String file) { + this.fileName = file; this.data = null; } @@ -45,15 +66,69 @@ public BlobRef() { private BytesWritable data; // If there data is too large, it's written into a file - // and the file is numbered; this number is recorded here. - // This takes precedence if this value is positive. - private long blobFileNum; + // whose path (relative to the rest of the dataset) is recorded here. + // This takes precedence if this value is non-null. + private String fileName; + + /** + * @return true if the BLOB data is in an external file; false if + * it materialized inline. + */ + public boolean isExternal() { + return fileName != null; + } + + /** + * Convenience method to access #getDataStream(Configuration, Path) + * from within a map task that read this BlobRef from a file-based + * InputSplit. + * @param mapContext the Mapper.Context instance that encapsulates + * the current map task. + * @return an InputStream to access the BLOB data. + * @throws IllegalArgumentException if it cannot find the source + * path for this BLOB based on the MapContext. + * @throws IOException if it could not read the BLOB from external storage. + */ + public InputStream getDataStream(MapContext mapContext) + throws IllegalArgumentException, IOException { + InputSplit split = mapContext.getInputSplit(); + if (split instanceof FileSplit) { + Path basePath = ((FileSplit) split).getPath().getParent(); + return getDataStream(mapContext.getConfiguration(), + basePath); + } else { + throw new IllegalArgumentException( + "Could not ascertain BLOB base path from MapContext."); + } + } + + /** + * Get access to the BLOB data itself. + * This method returns an InputStream-based representation of the + * BLOB data, accessing the filesystem for external BLOB storage + * as necessary. + * @param conf the Configuration used to access the filesystem + * @param basePath the base directory where the table records are + * stored. + * @return an InputStream used to read the BLOB data. + * @throws IOException if it could not read the BLOB from external storage. + */ + public InputStream getDataStream(Configuration conf, Path basePath) + throws IOException { + if (isExternal()) { + // use external storage. + FileSystem fs = FileSystem.get(conf); + return fs.open(new Path(basePath, fileName)); + } else { + return new ByteArrayInputStream(data.getBytes()); + } + } + public byte [] getData() { - if (blobFileNum > 0) { - // We have a numbered file. - // TODO: Implement this. - throw new RuntimeException("Unsupported: Indirect BLOBs are not supported"); + if (isExternal()) { + throw new RuntimeException( + "External BLOBs must be read via getDataStream()"); } return data.getBytes(); @@ -61,8 +136,8 @@ public BlobRef() { @Override public String toString() { - if (blobFileNum > 0) { - return "indirectBlob(" + blobFileNum + ")"; + if (isExternal()) { + return "externalBlob(" + fileName + ")"; } else { return data.toString(); } @@ -71,32 +146,67 @@ public String toString() { @Override public void readFields(DataInput in) throws IOException { // The serialization format for this object is: - // boolean isIndirect - // if true, the next field is a Long containing blobFileNum - // if false, the next field is String data. + // boolean isExternal + // if true, the next field is a String containing the file name. + // if false, the next field is a BytesWritable containing the + // actual data. - boolean isIndirect = in.readBoolean(); - if (isIndirect) { + boolean isExternal = in.readBoolean(); + if (isExternal) { this.data = null; - this.blobFileNum = in.readLong(); + this.fileName = Text.readString(in); } else { if (null == this.data) { this.data = new BytesWritable(); } this.data.readFields(in); - this.blobFileNum = 0; + this.fileName = null; } } @Override public void write(DataOutput out) throws IOException { - boolean isIndirect = blobFileNum > 0; - out.writeBoolean(isIndirect); - if (isIndirect) { - out.writeLong(blobFileNum); + out.writeBoolean(isExternal()); + if (isExternal()) { + Text.writeString(out, fileName); } else { data.write(out); } } + + private static final ThreadLocal EXTERNAL_MATCHER = + new ThreadLocal() { + @Override protected Matcher initialValue() { + Pattern externalPattern = Pattern.compile("externalBlob\\((.*)\\)"); + return externalPattern.matcher(""); + } + }; + + /** + * Create a BlobRef based on parsed data from a line of text. + * This only operates correctly on external blobs; inline blobs are simply + * returned as null. You should store BLOB data in SequenceFile format + * if reparsing is necessary. + * @param inputString the text-based input data to parse. + * @return a new BlobRef containing a reference to an external BLOB, or + * an empty BlobRef if the data to be parsed is actually inline. + */ + public static BlobRef parse(String inputString) { + // If inputString is of the form 'externalBlob(%s)', then this is an + // external BLOB stored at the filename indicated by '%s'. Otherwise, + // it is an inline BLOB, which we don't support parsing of. + + Matcher m = EXTERNAL_MATCHER.get(); + m.reset(inputString); + if (m.matches()) { + // Extract the filename component from the string. + return new BlobRef(m.group(1)); + } else { + // This is inline BLOB string data. + LOG.warn( + "Reparsing inline BLOB data is not supported; use SequenceFiles."); + return new BlobRef(); + } + } } diff --git a/src/java/org/apache/hadoop/sqoop/lib/ClobRef.java b/src/java/org/apache/hadoop/sqoop/lib/ClobRef.java index ed6c6b90..b1a969c1 100644 --- a/src/java/org/apache/hadoop/sqoop/lib/ClobRef.java +++ b/src/java/org/apache/hadoop/sqoop/lib/ClobRef.java @@ -21,9 +21,20 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; +import java.io.StringReader; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.MapContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; /** * ClobRef is a wrapper that holds a Clob either directly, or a @@ -32,12 +43,23 @@ public class ClobRef implements Writable { public ClobRef(String chars) { - this.clobFileNum = 0; + this.fileName = null; this.data = chars; } public ClobRef() { - this.clobFileNum = 0; + this.fileName = null; + this.data = null; + } + + /** + * Initialize a clobref to an external CLOB. + * @param file the filename to the CLOB. May be relative to the job dir. + * @param ignored is not used; this just differentiates this constructor + * from ClobRef(String chars). + */ + public ClobRef(String file, boolean ignored) { + this.fileName = file; this.data = null; } @@ -45,24 +67,74 @@ public ClobRef() { private String data; // If there data is too large, it's written into a file - // and the file is numbered; this number is recorded here. - // This takes precedence if this value is positive. - private long clobFileNum; + // whose path (relative to the rest of the dataset) is recorded here. + // This takes precedence if this value is non-null. + private String fileName; - public String getData() { - if (clobFileNum > 0) { - // We have a numbered file. - // TODO: Implement this. - throw new RuntimeException("Unsupported: Indirect CLOBs are not supported"); - } - - return data; + /** + * @return true if the CLOB data is in an external file; false if + * it is materialized inline. + */ + public boolean isExternal() { + return fileName != null; } + /** + * Convenience method to access #getDataReader(Configuration, Path) + * from within a map task that read this ClobRef from a file-based + * InputSplit. + * @param mapContext the Mapper.Context instance that encapsulates + * the current map task. + * @return a Reader to access the CLOB data. + * @throws IllegalArgumentException if it cannot find the source + * path for this CLOB based on the MapContext. + * @throws IOException if it could not read the CLOB from external storage. + */ + public Reader getDataReader(MapContext mapContext) + throws IllegalArgumentException, IOException { + InputSplit split = mapContext.getInputSplit(); + if (split instanceof FileSplit) { + Path basePath = ((FileSplit) split).getPath().getParent(); + return getDataReader(mapContext.getConfiguration(), + basePath); + } else { + throw new IllegalArgumentException( + "Could not ascertain CLOB base path from MapContext."); + } + } + + /** + * Get access to the CLOB data itself. + * This method returns a Reader-based representation of the + * CLOB data, accessing the filesystem for external CLOB storage + * as necessary. + * @param conf the Configuration used to access the filesystem + * @param basePath the base directory where the table records are + * stored. + * @return a Reader used to read the CLOB data. + * @throws IOException if it could not read the CLOB from external storage. + */ + public Reader getDataReader(Configuration conf, Path basePath) + throws IOException { + if (isExternal()) { + // use external storage. + FileSystem fs = FileSystem.get(conf); + return new InputStreamReader(fs.open(new Path(basePath, fileName))); + } else { + return new StringReader(data); + } + } + + /** + * @return a string representation of the ClobRef. If this is an + * inline clob (isExternal() returns false), it will contain the + * materialized data. Otherwise it returns a description of the + * reference. To ensure access to the data itself, {@see #getDataStream()}. + */ @Override public String toString() { - if (clobFileNum > 0) { - return "indirectClob(" + clobFileNum + ")"; + if (isExternal()) { + return "externalClob(" + fileName + ")"; } else { return data; } @@ -71,29 +143,60 @@ public String toString() { @Override public void readFields(DataInput in) throws IOException { // The serialization format for this object is: - // boolean isIndirect - // if true, the next field is a Long containing clobFileNum - // if false, the next field is String data. + // boolean isExternal + // if true, the next field is a String containing the external file name. + // if false, the next field is String containing the actual data. boolean isIndirect = in.readBoolean(); if (isIndirect) { + this.fileName = Text.readString(in); this.data = null; - this.clobFileNum = in.readLong(); } else { + this.fileName = null; this.data = Text.readString(in); - this.clobFileNum = 0; } } @Override public void write(DataOutput out) throws IOException { - boolean isIndirect = clobFileNum > 0; + boolean isIndirect = isExternal(); out.writeBoolean(isIndirect); if (isIndirect) { - out.writeLong(clobFileNum); + Text.writeString(out, fileName); } else { Text.writeString(out, data); } } + + // A pattern matcher which can recognize external CLOB data + // vs. an inline CLOB string. + private static final ThreadLocal EXTERNAL_MATCHER = + new ThreadLocal() { + @Override protected Matcher initialValue() { + Pattern externalPattern = Pattern.compile("externalClob\\((.*)\\)"); + return externalPattern.matcher(""); + } + }; + + /** + * Create a ClobRef based on parsed data from a line of text. + * @param inputString the text-based input data to parse. + * @return a ClobRef to the given data. + */ + public static ClobRef parse(String inputString) { + // If inputString is of the form 'externalClob(%s)', then this is an + // external CLOB stored at the filename indicated by '%s'. Otherwise, + // it is an inline CLOB. + + Matcher m = EXTERNAL_MATCHER.get(); + m.reset(inputString); + if (m.matches()) { + // Extract the filename component from the string. + return new ClobRef(m.group(1), true); + } else { + // This is inline CLOB string data. + return new ClobRef(inputString); + } + } } diff --git a/src/java/org/apache/hadoop/sqoop/lib/JdbcWritableBridge.java b/src/java/org/apache/hadoop/sqoop/lib/JdbcWritableBridge.java index dc0a0450..41179cbd 100644 --- a/src/java/org/apache/hadoop/sqoop/lib/JdbcWritableBridge.java +++ b/src/java/org/apache/hadoop/sqoop/lib/JdbcWritableBridge.java @@ -115,32 +115,14 @@ public static BigDecimal readBigDecimal(int colNum, ResultSet r) throws SQLExcep public static BlobRef readBlobRef(int colNum, ResultSet r) throws SQLException { - Blob b = r.getBlob(colNum); - if (null == b) { - return null; - } else if (b.length() > MAX_BLOB_LENGTH) { - // TODO: Deserialize very large BLOBs into separate files. - throw new UnsupportedOperationException("BLOB size exceeds max: " - + MAX_BLOB_LENGTH); - } else { - // This is a 1-based array. - return new BlobRef(b.getBytes(1, (int) b.length())); - } + // Loading of BLOBs is delayed; handled by LargeObjectLoader. + return null; } public static ClobRef readClobRef(int colNum, ResultSet r) throws SQLException { - Clob c = r.getClob(colNum); - if (null == c) { - return null; - } else if (c.length() > MAX_CLOB_LENGTH) { - // TODO: Deserialize very large CLOBs into separate files. - throw new UnsupportedOperationException("CLOB size exceeds max: " - + MAX_CLOB_LENGTH); - } else { - // This is a 1-based array. - return new ClobRef(c.getSubString(1, (int) c.length())); - } + // Loading of CLOBs is delayed; handled by LargeObjectLoader. + return null; } public static void writeInteger(Integer val, int paramIdx, int sqlType, PreparedStatement s) diff --git a/src/java/org/apache/hadoop/sqoop/lib/LargeObjectLoader.java b/src/java/org/apache/hadoop/sqoop/lib/LargeObjectLoader.java new file mode 100644 index 00000000..b4ec7884 --- /dev/null +++ b/src/java/org/apache/hadoop/sqoop/lib/LargeObjectLoader.java @@ -0,0 +1,251 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.sqoop.lib; + +import java.io.BufferedOutputStream; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.Reader; +import java.io.Writer; +import java.math.BigDecimal; +import java.sql.Blob; +import java.sql.Clob; +import java.sql.Date; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Time; +import java.sql.Timestamp; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.TaskInputOutputContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; + +/** + * Contains a set of methods which can read db columns from a ResultSet into + * Java types, and do serialization of these types to/from DataInput/DataOutput + * for use with Hadoop's Writable implementation. This supports null values + * for all types. + * + * This is a singleton instance class; only one may exist at a time. + * However, its lifetime is limited to the current TaskInputOutputContext's + * life. + */ +public class LargeObjectLoader { + + // Currently, cap BLOB/CLOB objects at 16 MB until we can use external storage. + public final static long DEFAULT_MAX_LOB_LENGTH = 16 * 1024 * 1024; + + public final static String MAX_INLINE_LOB_LEN_KEY = + "sqoop.inline.lob.length.max"; + + // The task context for the currently-initialized instance. + private TaskInputOutputContext context; + + private FileSystem fs; + + // Counter that is used with the current task attempt id to + // generate unique LOB file names. + private long nextLobFileId = 0; + + public LargeObjectLoader(TaskInputOutputContext context) + throws IOException { + this.context = context; + this.fs = FileSystem.get(context.getConfiguration()); + } + + /** + * @return a filename to use to put an external LOB in. + */ + private String getNextLobFileName() { + String file = "_lob/obj_" + context.getConfiguration().get( + JobContext.TASK_ID, "unknown_task_id") + nextLobFileId; + nextLobFileId++; + + return file; + } + + /** + * Copies all character data from the provided Reader to the provided + * Writer. Does not close handles when it's done. + * @param reader data source + * @param writer data sink + * @throws IOException if an I/O error occurs either reading or writing. + */ + private void copyAll(Reader reader, Writer writer) throws IOException { + int bufferSize = context.getConfiguration().getInt("io.file.buffer.size", + 4096); + char [] buf = new char[bufferSize]; + + while (true) { + int charsRead = reader.read(buf); + if (-1 == charsRead) { + break; // no more stream to read. + } + writer.write(buf, 0, charsRead); + } + } + + /** + * Copies all byte data from the provided InputStream to the provided + * OutputStream. Does not close handles when it's done. + * @param input data source + * @param output data sink + * @throws IOException if an I/O error occurs either reading or writing. + */ + private void copyAll(InputStream input, OutputStream output) + throws IOException { + int bufferSize = context.getConfiguration().getInt("io.file.buffer.size", + 4096); + byte [] buf = new byte[bufferSize]; + + while (true) { + int bytesRead = input.read(buf, 0, bufferSize); + if (-1 == bytesRead) { + break; // no more stream to read. + } + output.write(buf, 0, bytesRead); + } + } + + /** + * Actually read a BlobRef instance from the ResultSet and materialize + * the data either inline or to a file. + * + * @param colNum the column of the ResultSet's current row to read. + * @param r the ResultSet to read from. + * @return a BlobRef encapsulating the data in this field. + * @throws IOException if an error occurs writing to the FileSystem. + * @throws SQLException if an error occurs reading from the database. + */ + public BlobRef readBlobRef(int colNum, ResultSet r) + throws IOException, InterruptedException, SQLException { + + long maxInlineLobLen = context.getConfiguration().getLong( + MAX_INLINE_LOB_LEN_KEY, + DEFAULT_MAX_LOB_LENGTH); + + Blob b = r.getBlob(colNum); + if (null == b) { + return null; + } else if (b.length() > maxInlineLobLen) { + // Deserialize very large BLOBs into separate files. + String fileName = getNextLobFileName(); + Path p = new Path(FileOutputFormat.getWorkOutputPath(context), fileName); + + Path parent = p.getParent(); + if (!fs.exists(parent)) { + fs.mkdirs(parent); + } + + BufferedOutputStream bos = null; + InputStream is = null; + OutputStream os = fs.create(p); + try { + bos = new BufferedOutputStream(os); + is = b.getBinaryStream(); + copyAll(is, bos); + } finally { + if (null != bos) { + bos.close(); + os = null; // os is now closed. + } + + if (null != os) { + os.close(); + } + + if (null != is) { + is.close(); + } + } + + return new BlobRef(fileName); + } else { + // This is a 1-based array. + return new BlobRef(b.getBytes(1, (int) b.length())); + } + } + + + /** + * Actually read a ClobRef instance from the ResultSet and materialize + * the data either inline or to a file. + * + * @param colNum the column of the ResultSet's current row to read. + * @param r the ResultSet to read from. + * @return a ClobRef encapsulating the data in this field. + * @throws IOException if an error occurs writing to the FileSystem. + * @throws SQLException if an error occurs reading from the database. + */ + public ClobRef readClobRef(int colNum, ResultSet r) + throws IOException, InterruptedException, SQLException { + + long maxInlineLobLen = context.getConfiguration().getLong( + MAX_INLINE_LOB_LEN_KEY, + DEFAULT_MAX_LOB_LENGTH); + + Clob c = r.getClob(colNum); + if (null == c) { + return null; + } else if (c.length() > maxInlineLobLen) { + // Deserialize large CLOB into separate file. + String fileName = getNextLobFileName(); + Path p = new Path(FileOutputFormat.getWorkOutputPath(context), fileName); + + Path parent = p.getParent(); + if (!fs.exists(parent)) { + fs.mkdirs(parent); + } + + BufferedWriter w = null; + Reader reader = null; + OutputStream os = fs.create(p); + try { + w = new BufferedWriter(new OutputStreamWriter(os)); + reader = c.getCharacterStream(); + copyAll(reader, w); + } finally { + if (null != w) { + w.close(); + os = null; // os is now closed. + } + + if (null != os) { + os.close(); + } + + if (null != reader) { + reader.close(); + } + } + + return new ClobRef(fileName, true); + } else { + // This is a 1-based array. + return new ClobRef(c.getSubString(1, (int) c.length())); + } + } +} diff --git a/src/java/org/apache/hadoop/sqoop/lib/SqoopRecord.java b/src/java/org/apache/hadoop/sqoop/lib/SqoopRecord.java index 2b51083d..a3fcefd7 100644 --- a/src/java/org/apache/hadoop/sqoop/lib/SqoopRecord.java +++ b/src/java/org/apache/hadoop/sqoop/lib/SqoopRecord.java @@ -18,13 +18,15 @@ package org.apache.hadoop.sqoop.lib; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.sql.SQLException; + import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.lib.db.DBWritable; -import java.nio.ByteBuffer; -import java.nio.CharBuffer; - /** * Interface implemented by the classes generated by sqoop's orm.ClassWriter. */ @@ -35,5 +37,7 @@ public interface SqoopRecord extends DBWritable, Writable { public void parse(char [] s) throws RecordParser.ParseError; public void parse(ByteBuffer s) throws RecordParser.ParseError; public void parse(CharBuffer s) throws RecordParser.ParseError; + public void loadLargeObjects(LargeObjectLoader objLoader) + throws SQLException, IOException, InterruptedException; } diff --git a/src/java/org/apache/hadoop/sqoop/mapreduce/AutoProgressMapper.java b/src/java/org/apache/hadoop/sqoop/mapreduce/AutoProgressMapper.java index 8a889a89..b2c2ce1a 100644 --- a/src/java/org/apache/hadoop/sqoop/mapreduce/AutoProgressMapper.java +++ b/src/java/org/apache/hadoop/sqoop/mapreduce/AutoProgressMapper.java @@ -158,6 +158,7 @@ private final void configureAutoProgress(Configuration job) { /** * Run the mapping process for this task, wrapped in an auto-progress system. */ + @Override public void run(Context context) throws IOException, InterruptedException { configureAutoProgress(context.getConfiguration()); ProgressThread thread = this.new ProgressThread(context); @@ -182,5 +183,4 @@ public void run(Context context) throws IOException, InterruptedException { } } } - } diff --git a/src/java/org/apache/hadoop/sqoop/mapreduce/DataDrivenImportJob.java b/src/java/org/apache/hadoop/sqoop/mapreduce/DataDrivenImportJob.java index d97d3fd3..d2f5dea8 100644 --- a/src/java/org/apache/hadoop/sqoop/mapreduce/DataDrivenImportJob.java +++ b/src/java/org/apache/hadoop/sqoop/mapreduce/DataDrivenImportJob.java @@ -44,6 +44,7 @@ import org.apache.hadoop.sqoop.ConnFactory; import org.apache.hadoop.sqoop.SqoopOptions; import org.apache.hadoop.sqoop.manager.ConnManager; +import org.apache.hadoop.sqoop.lib.LargeObjectLoader; import org.apache.hadoop.sqoop.orm.TableClassName; import org.apache.hadoop.sqoop.util.ClassLoaderStack; import org.apache.hadoop.sqoop.util.ImportException; @@ -85,7 +86,7 @@ protected Class getMapperClass() { if (options.getFileLayout() == SqoopOptions.FileLayout.TextFile) { return TextImportMapper.class; } else if (options.getFileLayout() == SqoopOptions.FileLayout.SequenceFile) { - return AutoProgressMapper.class; + return SequenceFileImportMapper.class; } return null; @@ -140,6 +141,9 @@ protected void configureInputFormat(Job job, String tableName, job.getConfiguration().set(DBConfiguration.INPUT_CLASS_PROPERTY, tableClassName); + job.getConfiguration().setLong(LargeObjectLoader.MAX_INLINE_LOB_LEN_KEY, + options.getInlineLobLimit()); + LOG.debug("Using InputFormat: " + inputFormatClass); job.setInputFormatClass(inputFormatClass); } diff --git a/src/java/org/apache/hadoop/sqoop/mapreduce/SequenceFileImportMapper.java b/src/java/org/apache/hadoop/sqoop/mapreduce/SequenceFileImportMapper.java new file mode 100644 index 00000000..d71c7a78 --- /dev/null +++ b/src/java/org/apache/hadoop/sqoop/mapreduce/SequenceFileImportMapper.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.sqoop.mapreduce; + +import java.io.IOException; +import java.sql.SQLException; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.Mapper.Context; +import org.apache.hadoop.sqoop.lib.LargeObjectLoader; +import org.apache.hadoop.sqoop.lib.SqoopRecord; + +/** + * Imports records by writing them to a SequenceFile. + */ +public class SequenceFileImportMapper + extends AutoProgressMapper { + + public void map(LongWritable key, SqoopRecord val, Context context) + throws IOException, InterruptedException { + + try { + // Loading of LOBs was delayed until we have a Context. + val.loadLargeObjects(new LargeObjectLoader(context)); + } catch (SQLException sqlE) { + throw new IOException(sqlE); + } + + context.write(key, val); + } +} + diff --git a/src/java/org/apache/hadoop/sqoop/mapreduce/TextImportMapper.java b/src/java/org/apache/hadoop/sqoop/mapreduce/TextImportMapper.java index 3970105d..3c683c17 100644 --- a/src/java/org/apache/hadoop/sqoop/mapreduce/TextImportMapper.java +++ b/src/java/org/apache/hadoop/sqoop/mapreduce/TextImportMapper.java @@ -19,18 +19,20 @@ package org.apache.hadoop.sqoop.mapreduce; import java.io.IOException; +import java.sql.SQLException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper.Context; -import org.apache.hadoop.mapreduce.lib.db.DBWritable; +import org.apache.hadoop.sqoop.lib.LargeObjectLoader; +import org.apache.hadoop.sqoop.lib.SqoopRecord; /** - * Converts an input record into a string representation and emit it. + * Imports records by transforming them to strings for a plain-text flat file. */ public class TextImportMapper - extends AutoProgressMapper { + extends AutoProgressMapper { private Text outkey; @@ -38,9 +40,18 @@ public TextImportMapper() { outkey = new Text(); } - public void map(LongWritable key, DBWritable val, Context context) + public void map(LongWritable key, SqoopRecord val, Context context) throws IOException, InterruptedException { + + try { + // Loading of LOBs was delayed until we have a Context. + val.loadLargeObjects(new LargeObjectLoader(context)); + } catch (SQLException sqlE) { + throw new IOException(sqlE); + } + outkey.set(val.toString()); context.write(outkey, NullWritable.get()); } } + diff --git a/src/java/org/apache/hadoop/sqoop/orm/ClassWriter.java b/src/java/org/apache/hadoop/sqoop/orm/ClassWriter.java index e62ee767..7ad1fcd4 100644 --- a/src/java/org/apache/hadoop/sqoop/orm/ClassWriter.java +++ b/src/java/org/apache/hadoop/sqoop/orm/ClassWriter.java @@ -24,6 +24,7 @@ import org.apache.hadoop.sqoop.lib.BigDecimalSerializer; import org.apache.hadoop.sqoop.lib.FieldFormatter; import org.apache.hadoop.sqoop.lib.JdbcWritableBridge; +import org.apache.hadoop.sqoop.lib.LargeObjectLoader; import org.apache.hadoop.sqoop.lib.LobSerializer; import org.apache.hadoop.sqoop.lib.RecordParser; import org.apache.hadoop.sqoop.lib.BlobRef; @@ -415,6 +416,10 @@ private void generateDbRead(Map columnTypes, String [] colNames sb.append(" public void readFields(ResultSet __dbResults) throws SQLException {\n"); + // Save ResultSet object cursor for use in LargeObjectLoader + // if necessary. + sb.append(" this.__cur_result_set = __dbResults;\n"); + int fieldNum = 0; for (String col : colNames) { @@ -440,6 +445,44 @@ private void generateDbRead(Map columnTypes, String [] colNames sb.append(" }\n"); } + /** + * Generate the loadLargeObjects() method called by the mapper to load + * delayed objects (that require the Context from the mapper). + */ + private void generateLoadLargeObjects(Map columnTypes, + String [] colNames, StringBuilder sb) { + + // This method relies on the __cur_result_set field being set by + // readFields() method generated by generateDbRead(). + + sb.append(" public void loadLargeObjects(LargeObjectLoader __loader)\n"); + sb.append(" throws SQLException, IOException, InterruptedException {\n"); + + int fieldNum = 0; + + for (String col : colNames) { + fieldNum++; + + int sqlType = columnTypes.get(col); + String javaType = connManager.toJavaType(sqlType); + if (null == javaType) { + LOG.error("No Java type for SQL type " + sqlType); + continue; + } + + String getterMethod = dbGetterForType(javaType); + if ("readClobRef".equals(getterMethod) + || "readBlobRef".equals(getterMethod)) { + // This field is a blob/clob field with delayed loading. + // Call the appropriate LargeObjectLoader method (which has the + // same name as a JdbcWritableBridge method). + sb.append(" this." + col + " = __loader." + getterMethod + + "(" + fieldNum + ", this.__cur_result_set);\n"); + } + } + sb.append(" }\n"); + } + /** * Generate the write() method used by the database @@ -638,12 +681,9 @@ private void parseColumn(String colName, int colType, StringBuilder sb) { } else if (javaType.equals("java.math.BigDecimal")) { sb.append(" this." + colName + " = new java.math.BigDecimal(__cur_str);\n"); } else if (javaType.equals(ClobRef.class.getName())) { - sb.append(" this." + colName + " = new ClobRef(__cur_str);\n"); + sb.append(" this." + colName + " = ClobRef.parse(__cur_str);\n"); } else if (javaType.equals(BlobRef.class.getName())) { - // We don't support parsing BLOB data. - // Users must store this in SequenceFiles. - LOG.warn("BLOB data cannot be reparsed from text files"); - sb.append(" this." + colName + " = new BlobRef();\n"); + sb.append(" this." + colName + " = BlobRef.parse(__cur_str);\n"); } else { LOG.error("No parser available for Java type " + javaType); } @@ -844,6 +884,7 @@ public StringBuilder generateClassForColumns(Map columnTypes, sb.append("import " + RecordParser.class.getCanonicalName() + ";\n"); sb.append("import " + BlobRef.class.getCanonicalName() + ";\n"); sb.append("import " + ClobRef.class.getCanonicalName() + ";\n"); + sb.append("import " + LargeObjectLoader.class.getCanonicalName() + ";\n"); sb.append("import " + SqoopRecord.class.getCanonicalName() + ";\n"); sb.append("import java.sql.PreparedStatement;\n"); sb.append("import java.sql.ResultSet;\n"); @@ -860,10 +901,13 @@ public StringBuilder generateClassForColumns(Map columnTypes, sb.append("import java.util.List;\n"); String className = tableNameInfo.getShortClassForTable(tableName); - sb.append("public class " + className + " implements DBWritable, SqoopRecord, Writable {\n"); + sb.append("public class " + className + + " implements DBWritable, SqoopRecord, Writable {\n"); sb.append(" public static final int PROTOCOL_VERSION = " + CLASS_WRITER_VERSION + ";\n"); + sb.append(" protected ResultSet __cur_result_set;\n"); generateFields(columnTypes, colNames, sb); generateDbRead(columnTypes, colNames, sb); + generateLoadLargeObjects(columnTypes, colNames, sb); generateDbWrite(columnTypes, colNames, sb); generateHadoopRead(columnTypes, colNames, sb); generateHadoopWrite(columnTypes, colNames, sb); diff --git a/src/test/org/apache/hadoop/sqoop/SmokeTests.java b/src/test/org/apache/hadoop/sqoop/SmokeTests.java index a48182b0..a463f4c0 100644 --- a/src/test/org/apache/hadoop/sqoop/SmokeTests.java +++ b/src/test/org/apache/hadoop/sqoop/SmokeTests.java @@ -23,6 +23,9 @@ import org.apache.hadoop.sqoop.io.TestSplittableBufferedWriter; import org.apache.hadoop.sqoop.lib.TestFieldFormatter; import org.apache.hadoop.sqoop.lib.TestRecordParser; +import org.apache.hadoop.sqoop.lib.TestBlobRef; +import org.apache.hadoop.sqoop.lib.TestClobRef; +import org.apache.hadoop.sqoop.lib.TestLargeObjectLoader; import org.apache.hadoop.sqoop.manager.TestHsqldbManager; import org.apache.hadoop.sqoop.manager.TestSqlManager; import org.apache.hadoop.sqoop.mapreduce.MapreduceTests; @@ -60,6 +63,9 @@ public static Test suite() { suite.addTestSuite(TestConnFactory.class); suite.addTestSuite(TestSplittableBufferedWriter.class); suite.addTestSuite(TestTableDefWriter.class); + suite.addTestSuite(TestBlobRef.class); + suite.addTestSuite(TestClobRef.class); + suite.addTestSuite(TestLargeObjectLoader.class); suite.addTest(MapreduceTests.suite()); return suite; diff --git a/src/test/org/apache/hadoop/sqoop/lib/TestBlobRef.java b/src/test/org/apache/hadoop/sqoop/lib/TestBlobRef.java new file mode 100644 index 00000000..c086d81b --- /dev/null +++ b/src/test/org/apache/hadoop/sqoop/lib/TestBlobRef.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.sqoop.lib; + +import java.io.*; +import java.util.ArrayList; +import java.util.List; + +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + + +/** + * Test that the BlobRef.parse() method does the right thing. + * Note that we don't support inline parsing here; we only expect this to + * really work for external BLOBs. + */ +public class TestBlobRef extends TestCase { + + public void testEmptyStr() { + BlobRef r = BlobRef.parse(""); + assertFalse(r.isExternal()); + } + + public void testInline() throws IOException { + BlobRef r = BlobRef.parse("foo"); + assertFalse(r.isExternal()); + } + + public void testEmptyFile() { + BlobRef r = BlobRef.parse("externalBlob()"); + assertTrue(r.isExternal()); + assertEquals("externalBlob()", r.toString()); + } + + public void testInlineNearMatch() { + BlobRef r = BlobRef.parse("externalBlob(foo)bar"); + assertFalse(r.isExternal()); + } + + public void testExternal() throws IOException { + final byte [] DATA = { 1, 2, 3, 4, 5 }; + final String FILENAME = "blobdata"; + + doExternalTest(DATA, FILENAME); + } + + public void testExternalSubdir() throws IOException { + final byte [] DATA = { 1, 2, 3, 4, 5 }; + final String FILENAME = "_lob/blobdata"; + + try { + doExternalTest(DATA, FILENAME); + } finally { + // remove dir we made. + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.getLocal(conf); + String tmpDir = System.getProperty("test.build.data", "/tmp/"); + Path lobDir = new Path(new Path(tmpDir), "_lob"); + fs.delete(lobDir, false); + } + } + + private void doExternalTest(final byte [] DATA, final String FILENAME) + throws IOException { + + Configuration conf = new Configuration(); + conf.set("fs.defaultFS", "file:///"); + FileSystem fs = FileSystem.get(conf); + String tmpDir = System.getProperty("test.build.data", "/tmp/"); + + Path tmpPath = new Path(tmpDir); + + Path blobFile = new Path(tmpPath, FILENAME); + + // make any necessary parent dirs. + Path blobParent = blobFile.getParent(); + if (!fs.exists(blobParent)) { + fs.mkdirs(blobParent); + } + + OutputStream os = fs.create(blobFile); + try { + os.write(DATA, 0, DATA.length); + os.close(); + + BlobRef blob = BlobRef.parse("externalBlob(" + FILENAME + ")"); + assertTrue(blob.isExternal()); + assertEquals("externalBlob(" + FILENAME + ")", blob.toString()); + InputStream is = blob.getDataStream(conf, tmpPath); + assertNotNull(is); + + byte [] buf = new byte[4096]; + int bytes = is.read(buf, 0, 4096); + is.close(); + + assertEquals(DATA.length, bytes); + for (int i = 0; i < bytes; i++) { + assertEquals(DATA[i], buf[i]); + } + } finally { + fs.delete(blobFile, false); + } + } +} + diff --git a/src/test/org/apache/hadoop/sqoop/lib/TestClobRef.java b/src/test/org/apache/hadoop/sqoop/lib/TestClobRef.java new file mode 100644 index 00000000..afd2f051 --- /dev/null +++ b/src/test/org/apache/hadoop/sqoop/lib/TestClobRef.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.sqoop.lib; + +import java.io.*; +import java.util.ArrayList; +import java.util.List; + +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + + +/** + * Test parsing of ClobRef objects. + */ +public class TestClobRef extends TestCase { + + public void testEmptyStr() { + ClobRef r = ClobRef.parse(""); + assertFalse(r.isExternal()); + assertEquals("", r.toString()); + } + + public void testInline() throws IOException { + ClobRef r = ClobRef.parse("foo"); + assertFalse(r.isExternal()); + assertEquals("foo", r.toString()); + + Reader reader = r.getDataReader(null, null); + assertNotNull(reader); + char [] buf = new char[4096]; + int chars = reader.read(buf, 0, 4096); + reader.close(); + + String str = new String(buf, 0, chars); + assertEquals("foo", str); + } + + public void testEmptyFile() { + ClobRef r = ClobRef.parse("externalClob()"); + assertTrue(r.isExternal()); + assertEquals("externalClob()", r.toString()); + } + + public void testInlineNearMatch() { + ClobRef r = ClobRef.parse("externalClob(foo)bar"); + assertFalse(r.isExternal()); + assertEquals("externalClob(foo)bar", r.toString()); + } + + public void testExternal() throws IOException { + final String DATA = "This is the clob data!"; + final String FILENAME = "clobdata"; + + doExternalTest(DATA, FILENAME); + } + + public void testExternalSubdir() throws IOException { + final String DATA = "This is the clob data!"; + final String FILENAME = "_lob/clobdata"; + + try { + doExternalTest(DATA, FILENAME); + } finally { + // remove dir we made. + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.getLocal(conf); + String tmpDir = System.getProperty("test.build.data", "/tmp/"); + Path lobDir = new Path(new Path(tmpDir), "_lob"); + fs.delete(lobDir, false); + } + } + + private void doExternalTest(final String DATA, final String FILENAME) + throws IOException { + + Configuration conf = new Configuration(); + conf.set("fs.defaultFS", "file:///"); + FileSystem fs = FileSystem.get(conf); + String tmpDir = System.getProperty("test.build.data", "/tmp/"); + + Path tmpPath = new Path(tmpDir); + + Path clobFile = new Path(tmpPath, FILENAME); + + // make any necessary parent dirs. + Path clobParent = clobFile.getParent(); + if (!fs.exists(clobParent)) { + fs.mkdirs(clobParent); + } + + BufferedWriter w = new BufferedWriter(new OutputStreamWriter( + fs.create(clobFile))); + try { + w.append(DATA); + w.close(); + + ClobRef clob = ClobRef.parse("externalClob(" + FILENAME + ")"); + assertTrue(clob.isExternal()); + assertEquals("externalClob(" + FILENAME + ")", clob.toString()); + Reader r = clob.getDataReader(conf, tmpPath); + assertNotNull(r); + + char [] buf = new char[4096]; + int chars = r.read(buf, 0, 4096); + r.close(); + + String str = new String(buf, 0, chars); + assertEquals(DATA, str); + } finally { + fs.delete(clobFile, false); + } + } +} + diff --git a/src/test/org/apache/hadoop/sqoop/lib/TestLargeObjectLoader.java b/src/test/org/apache/hadoop/sqoop/lib/TestLargeObjectLoader.java new file mode 100644 index 00000000..29d16163 --- /dev/null +++ b/src/test/org/apache/hadoop/sqoop/lib/TestLargeObjectLoader.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.sqoop.lib; + +import java.io.*; +import java.util.ArrayList; +import java.util.List; +import java.sql.ResultSet; +import java.sql.SQLException; + +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.MapContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; +import org.apache.hadoop.mrunit.mapreduce.mock.MockMapContext; +import org.apache.hadoop.mrunit.types.Pair; +import org.apache.hadoop.sqoop.testutil.MockResultSet; + +/** + * Test deserialization of ClobRef and BlobRef fields. + */ +public class TestLargeObjectLoader extends TestCase { + + /** + * A mock MapContext that uses FileOutputCommitter. + * This MapContext is actually serving two roles here; when writing the + * CLOB files, its OutputCommitter is used to determine where to write + * the CLOB data, as these are placed in the task output work directory. + * When reading the CLOB data back for verification, we use the + * getInputSplit() to determine where to read our source data from--the same + * directory. We are repurposing the same context for both output and input. + */ + private static class MockMapContextWithCommitter + extends MockMapContext { + private Path outputDir; + private Configuration conf; + + public MockMapContextWithCommitter(Configuration conf, Path outDir) { + super(new ArrayList>(), new Counters()); + + this.outputDir = outDir; + this.conf = conf; + } + + @Override + public OutputCommitter getOutputCommitter() { + try { + return new FileOutputCommitter(outputDir, this); + } catch (IOException ioe) { + return null; + } + } + + @Override + public InputSplit getInputSplit() { + return new FileSplit(new Path(outputDir, "inputFile"), 0, 0, new String[0]); + } + + @Override + public Configuration getConfiguration() { + return conf; + } + } + + protected Configuration conf; + protected MapContext mapContext; + protected LargeObjectLoader loader; + + public void setUp() throws IOException { + conf = new Configuration(); + conf.set("fs.defaultFS", "file:///"); + String tmpDir = System.getProperty("test.build.data", "/tmp/"); + Path outDir = new Path(new Path(tmpDir), "testLobLoader"); + FileSystem fs = FileSystem.getLocal(conf); + if (fs.exists(outDir)) { + fs.delete(outDir, true); + } + fs.mkdirs(outDir); + + mapContext = new MockMapContextWithCommitter(conf, outDir); + loader = new LargeObjectLoader(mapContext); + } + + public void testReadClobRef() + throws IOException, InterruptedException, SQLException { + // This should give us an inline CLOB. + ResultSet resultSet = new MockResultSet(); + ClobRef clob = loader.readClobRef(0, resultSet); + assertNotNull(clob); + assertFalse(clob.isExternal()); + assertEquals(MockResultSet.CLOB_DATA, clob.toString()); + + // LOBs bigger than 4 bytes are now external. + conf.setLong(LargeObjectLoader.MAX_INLINE_LOB_LEN_KEY, 4); + clob = loader.readClobRef(0, resultSet); + assertNotNull(clob); + assertTrue(clob.isExternal()); + mapContext.getOutputCommitter().commitTask(mapContext); + Reader r = clob.getDataReader(mapContext); + char [] buf = new char[4096]; + int chars = r.read(buf, 0, 4096); + r.close(); + String str = new String(buf, 0, chars); + assertEquals(MockResultSet.CLOB_DATA, str); + } + + public void testReadBlobRef() + throws IOException, InterruptedException, SQLException { + // This should give us an inline BLOB. + ResultSet resultSet = new MockResultSet(); + BlobRef blob = loader.readBlobRef(0, resultSet); + assertNotNull(blob); + assertFalse(blob.isExternal()); + byte [] data = blob.getData(); + assertEquals(MockResultSet.BLOB_DATA.length, data.length); + for (int i = 0; i < data.length; i++) { + assertEquals(MockResultSet.BLOB_DATA[i], data[i]); + } + + // LOBs bigger than 4 bytes are now external. + conf.setLong(LargeObjectLoader.MAX_INLINE_LOB_LEN_KEY, 4); + blob = loader.readBlobRef(0, resultSet); + assertNotNull(blob); + assertTrue(blob.isExternal()); + mapContext.getOutputCommitter().commitTask(mapContext); + InputStream is = blob.getDataStream(mapContext); + byte [] buf = new byte[4096]; + int bytes = is.read(buf, 0, 4096); + is.close(); + + assertEquals(MockResultSet.BLOB_DATA.length, bytes); + for (int i = 0; i < bytes; i++) { + assertEquals(MockResultSet.BLOB_DATA[i], buf[i]); + } + } +} + diff --git a/src/test/org/apache/hadoop/sqoop/mapreduce/TestTextImportMapper.java b/src/test/org/apache/hadoop/sqoop/mapreduce/TestTextImportMapper.java index ad330187..97bac5e2 100644 --- a/src/test/org/apache/hadoop/sqoop/mapreduce/TestTextImportMapper.java +++ b/src/test/org/apache/hadoop/sqoop/mapreduce/TestTextImportMapper.java @@ -21,6 +21,8 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; @@ -28,8 +30,9 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.lib.db.DBWritable; import org.apache.hadoop.mrunit.mapreduce.MapDriver; +import org.apache.hadoop.sqoop.lib.LargeObjectLoader; +import org.apache.hadoop.sqoop.lib.SqoopRecord; import junit.framework.TestCase; @@ -39,10 +42,10 @@ public class TestTextImportMapper extends TestCase { - static class DummyDBWritable implements DBWritable { + static class DummySqoopRecord implements SqoopRecord { long field; - public DummyDBWritable(final long val) { + public DummySqoopRecord(final long val) { this.field = val; } @@ -65,14 +68,22 @@ public void write(PreparedStatement s) throws SQLException { public String toString() { return "" + field; } + + public void loadLargeObjects(LargeObjectLoader loader) { } + public void parse(CharSequence s) { } + public void parse(Text s) { } + public void parse(byte [] s) { } + public void parse(char [] s) { } + public void parse(ByteBuffer s) { } + public void parse(CharBuffer s) { } } public void testTextImport() { TextImportMapper m = new TextImportMapper(); - MapDriver driver = - new MapDriver(m); + MapDriver driver = + new MapDriver(m); - driver.withInput(new LongWritable(0), new DummyDBWritable(42)) + driver.withInput(new LongWritable(0), new DummySqoopRecord(42)) .withOutput(new Text("42"), NullWritable.get()) .runTest(); } diff --git a/src/test/org/apache/hadoop/sqoop/testutil/MockResultSet.java b/src/test/org/apache/hadoop/sqoop/testutil/MockResultSet.java new file mode 100644 index 00000000..8b3cbb9f --- /dev/null +++ b/src/test/org/apache/hadoop/sqoop/testutil/MockResultSet.java @@ -0,0 +1,1012 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.sqoop.testutil; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.StringReader; +import java.io.Reader; +import java.io.UnsupportedEncodingException; +import java.io.Writer; +import java.math.BigDecimal; +import java.net.URL; +import java.sql.Array; +import java.sql.Blob; +import java.sql.Clob; +import java.sql.Date; +import java.sql.NClob; +import java.sql.Ref; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.RowId; +import java.sql.SQLException; +import java.sql.SQLWarning; +import java.sql.SQLXML; +import java.sql.Statement; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.Calendar; +import java.util.Map; + +/** + * Mock ResultSet instance that mocks Clob/Blob behavior. + */ +public class MockResultSet implements ResultSet { + + public static final byte [] BLOB_DATA = { 0x0, 0x1, 0x2, 0x3, + 0x4, 0x5, 0x6, 0x7, 0x8, 0x9, 0xA, 0xB, 0xC, 0xD, 0xE, 0xF }; + + public static final String CLOB_DATA = "This is the mock clob data!"; + + /** + * Read-only Blob class that returns 16 bytes 0x0 .. 0xf + */ + public static class MockBlob implements Blob { + public InputStream getBinaryStream() { + return new ByteArrayInputStream(BLOB_DATA); + } + + public InputStream getBinaryStream(long pos, long len) { + return new ByteArrayInputStream(getBytes(pos, (int) len)); + } + + public byte [] getBytes(long pos, int length) { + byte [] bytes = new byte[length]; + + int start = (int) pos - 1; // SQL uses 1-based arrays!! + for (int i = 0; i < length; i++) { + bytes[i] = BLOB_DATA[i + start]; + } + return bytes; + } + + public long length() { + return BLOB_DATA.length; + } + + + public long position(Blob pattern, long start) { return 0; } + public long position(byte[] pattern, long start) { return 0; } + public OutputStream setBinaryStream(long pos) { return null; } + public int setBytes(long pos, byte[] bytes) { return 0; } + public int setBytes(long pos, byte[] bytes, int offset, int len) { return 0; } + public void truncate(long len) { } + public void free() { } + } + + /** + * Read-only Clob class that returns the CLOB_DATA string only. + */ + public static class MockClob implements Clob { + @Override + public InputStream getAsciiStream() { + try { + return new ByteArrayInputStream(CLOB_DATA.getBytes("UTF-8")); + } catch (UnsupportedEncodingException uee) { + return null; + } + } + + @Override + public Reader getCharacterStream() { + return new StringReader(CLOB_DATA); + } + + public Reader getCharacterStream(long pos, long len) { + return new StringReader(getSubString(pos, (int) len)); + } + + @Override + public String getSubString(long pos, int length) { + long start = pos - 1; // 1-based offsets in SQL + return CLOB_DATA.substring((int) start, (int) (start + length)); + } + + @Override + public long length() { + return CLOB_DATA.length(); + } + + public long position(Clob searchstr, long start) { return 0; } + public long position(String searchstr, long start) { return 0; } + public OutputStream setAsciiStream(long pos) { return null; } + public Writer setCharacterStream(long pos) { return null; } + public int setString(long pos, String str) { return 0; } + public int setString(long pos, String str, int offset, int len) { return 0; } + public void truncate(long len) { } + public void free() { } + } + + + // Methods that return mock Blob or Clob instances. + + public InputStream getAsciiStream(int columnIndex) { + return new MockClob().getAsciiStream(); + } + public InputStream getAsciiStream(String columnName) { + return new MockClob().getAsciiStream(); + } + public InputStream getBinaryStream(int columnIndex) { + return new MockBlob().getBinaryStream(); + } + public InputStream getBinaryStream(String columnName) { + return new MockBlob().getBinaryStream(); + } + public Blob getBlob(int i) { + return new MockBlob(); + } + public Blob getBlob(String colName) { + return new MockBlob(); + } + public Reader getCharacterStream(int columnIndex) { + return new MockClob().getCharacterStream(); + } + public Reader getCharacterStream(String columnName) { + return new MockClob().getCharacterStream(); + } + public Clob getClob(int i) { + return new MockClob(); + } + public Clob getClob(String colName) { + return new MockClob(); + } + + // Methods down here just return the default value for whatever + // type they're using (usually null, 0, or false). + // These stubs were all auto-generated by Eclipse. + @Override + public boolean absolute(int row) throws SQLException { + return false; + } + + @Override + public void afterLast() throws SQLException { } + + @Override + public void beforeFirst() throws SQLException { } + + @Override + public void cancelRowUpdates() throws SQLException { } + + @Override + public void clearWarnings() throws SQLException { } + + @Override + public void close() throws SQLException { } + + @Override + public void deleteRow() throws SQLException { } + + @Override + public int findColumn(String columnLabel) throws SQLException { + return 0; + } + + @Override + public boolean first() throws SQLException { + return false; + } + + @Override + public Array getArray(int columnIndex) throws SQLException { + return null; + } + + @Override + public Array getArray(String columnLabel) throws SQLException { + return null; + } + + @Override + public BigDecimal getBigDecimal(int columnIndex) throws SQLException { + return null; + } + + @Override + public BigDecimal getBigDecimal(String columnLabel) throws SQLException { + return null; + } + + @Override + public BigDecimal getBigDecimal(int columnIndex, int scale) + throws SQLException { + return null; + } + + @Override + public BigDecimal getBigDecimal(String columnLabel, int scale) + throws SQLException { + return null; + } + @Override + public boolean getBoolean(int columnIndex) throws SQLException { + return false; + } + + @Override + public boolean getBoolean(String columnLabel) throws SQLException { + return false; + } + + @Override + public byte getByte(int columnIndex) throws SQLException { + return 0; + } + + @Override + public byte getByte(String columnLabel) throws SQLException { + return 0; + } + + @Override + public int getConcurrency() throws SQLException { + return 0; + } + + @Override + public String getCursorName() throws SQLException { + return null; + } + + @Override + public Date getDate(int columnIndex) throws SQLException { + return null; + } + + @Override + public Date getDate(String columnLabel) throws SQLException { + return null; + } + + @Override + public Date getDate(int columnIndex, Calendar cal) throws SQLException { + return null; + } + + @Override + public Date getDate(String columnLabel, Calendar cal) throws SQLException { + return null; + } + + @Override + public double getDouble(int columnIndex) throws SQLException { + return 0; + } + + @Override + public double getDouble(String columnLabel) throws SQLException { + return 0; + } + + @Override + public int getFetchDirection() throws SQLException { + return 0; + } + + @Override + public int getFetchSize() throws SQLException { + return 0; + } + + @Override + public float getFloat(int columnIndex) throws SQLException { + return 0; + } + + @Override + public float getFloat(String columnLabel) throws SQLException { + return 0; + } + + @Override + public int getHoldability() throws SQLException { + return 0; + } + + @Override + public int getInt(int columnIndex) throws SQLException { + return 0; + } + + @Override + public int getInt(String columnLabel) throws SQLException { + return 0; + } + + @Override + public long getLong(int columnIndex) throws SQLException { + return 0; + } + + @Override + public long getLong(String columnLabel) throws SQLException { + return 0; + } + + @Override + public ResultSetMetaData getMetaData() throws SQLException { + return null; + } + + @Override + public Reader getNCharacterStream(int columnIndex) throws SQLException { + return null; + } + + @Override + public Reader getNCharacterStream(String columnLabel) throws SQLException { + return null; + } + + @Override + public NClob getNClob(int columnIndex) throws SQLException { + return null; + } + + @Override + public NClob getNClob(String columnLabel) throws SQLException { + return null; + } + + @Override + public String getNString(int columnIndex) throws SQLException { + return null; + } + + @Override + public String getNString(String columnLabel) throws SQLException { + return null; + } + + @Override + public Object getObject(int columnIndex) throws SQLException { + return null; + } + + @Override + public Object getObject(String columnLabel) throws SQLException { + return null; + } + + @Override + public Object getObject(int columnIndex, Map> map) + throws SQLException { + return null; + } + + @Override + public Object getObject(String columnLabel, Map> map) + throws SQLException { + return null; + } + + @Override + public Ref getRef(int columnIndex) throws SQLException { + return null; + } + + @Override + public Ref getRef(String columnLabel) throws SQLException { + return null; + } + + @Override + public int getRow() throws SQLException { + return 0; + } + + @Override + public RowId getRowId(int columnIndex) throws SQLException { + return null; + } + + @Override + public RowId getRowId(String columnLabel) throws SQLException { + return null; + } + + @Override + public SQLXML getSQLXML(int columnIndex) throws SQLException { + return null; + } + + @Override + public SQLXML getSQLXML(String columnLabel) throws SQLException { + return null; + } + + @Override + public short getShort(int columnIndex) throws SQLException { + return 0; + } + + @Override + public short getShort(String columnLabel) throws SQLException { + return 0; + } + + @Override + public Statement getStatement() throws SQLException { + return null; + } + + @Override + public String getString(int columnIndex) throws SQLException { + return null; + } + + @Override + public String getString(String columnLabel) throws SQLException { + return null; + } + + @Override + public Time getTime(int columnIndex) throws SQLException { + return null; + } + + @Override + public Time getTime(String columnLabel) throws SQLException { + return null; + } + + @Override + public Time getTime(int columnIndex, Calendar cal) throws SQLException { + return null; + } + + @Override + public Time getTime(String columnLabel, Calendar cal) throws SQLException { + return null; + } + + @Override + public Timestamp getTimestamp(int columnIndex) throws SQLException { + return null; + } + + @Override + public Timestamp getTimestamp(String columnLabel) throws SQLException { + return null; + } + + @Override + public Timestamp getTimestamp(int columnIndex, Calendar cal) + throws SQLException { + return null; + } + + @Override + public Timestamp getTimestamp(String columnLabel, Calendar cal) + throws SQLException { + return null; + } + + @Override + public int getType() throws SQLException { + return 0; + } + + @Override + public URL getURL(int columnIndex) throws SQLException { + return null; + } + + @Override + public URL getURL(String columnLabel) throws SQLException { + return null; + } + + @Override + public InputStream getUnicodeStream(int columnIndex) throws SQLException { + return null; + } + + @Override + public InputStream getUnicodeStream(String columnLabel) throws SQLException { + return null; + } + + @Override + public SQLWarning getWarnings() throws SQLException { + return null; + } + + @Override + public void insertRow() throws SQLException { + } + + @Override + public boolean isAfterLast() throws SQLException { + return false; + } + + @Override + public boolean isBeforeFirst() throws SQLException { + return false; + } + + @Override + public boolean isClosed() throws SQLException { + return false; + } + + @Override + public boolean isFirst() throws SQLException { + return false; + } + + @Override + public boolean isLast() throws SQLException { + return false; + } + + @Override + public boolean last() throws SQLException { + return false; + } + + @Override + public void moveToCurrentRow() throws SQLException { + } + + @Override + public void moveToInsertRow() throws SQLException { + } + + @Override + public boolean next() throws SQLException { + return false; + } + + @Override + public boolean previous() throws SQLException { + return false; + } + + @Override + public void refreshRow() throws SQLException { + } + + @Override + public boolean relative(int rows) throws SQLException { + return false; + } + + @Override + public boolean rowDeleted() throws SQLException { + return false; + } + + @Override + public boolean rowInserted() throws SQLException { + return false; + } + + @Override + public boolean rowUpdated() throws SQLException { + return false; + } + + @Override + public void setFetchDirection(int direction) throws SQLException { + } + + @Override + public void setFetchSize(int rows) throws SQLException { + } + + @Override + public void updateArray(int columnIndex, Array x) throws SQLException { + } + + @Override + public void updateArray(String columnLabel, Array x) throws SQLException { + } + + @Override + public void updateAsciiStream(int columnIndex, InputStream x) + throws SQLException { + } + + @Override + public void updateAsciiStream(String columnLabel, InputStream x) + throws SQLException { + } + + @Override + public void updateAsciiStream(int columnIndex, InputStream x, int length) + throws SQLException { + } + + @Override + public void updateAsciiStream(String columnLabel, InputStream x, int length) + throws SQLException { + } + + @Override + public void updateAsciiStream(int columnIndex, InputStream x, long length) + throws SQLException { + } + + @Override + public void updateAsciiStream(String columnLabel, InputStream x, long length) + throws SQLException { + } + + @Override + public void updateBigDecimal(int columnIndex, BigDecimal x) + throws SQLException { + } + + @Override + public void updateBigDecimal(String columnLabel, BigDecimal x) + throws SQLException { + } + + @Override + public void updateBinaryStream(int columnIndex, InputStream x) + throws SQLException { + } + + @Override + public void updateBinaryStream(String columnLabel, InputStream x) + throws SQLException { + } + + @Override + public void updateBinaryStream(int columnIndex, InputStream x, int length) + throws SQLException { + } + + @Override + public void updateBinaryStream(String columnLabel, InputStream x, int length) + throws SQLException { + } + + @Override + public void updateBinaryStream(int columnIndex, InputStream x, long length) + throws SQLException { + } + + @Override + public void updateBinaryStream(String columnLabel, InputStream x, long length) + throws SQLException { + } + + @Override + public void updateBlob(int columnIndex, Blob x) throws SQLException { + } + + @Override + public void updateBlob(String columnLabel, Blob x) throws SQLException { + } + + @Override + public void updateBlob(int columnIndex, InputStream inputStream) + throws SQLException { + } + + @Override + public void updateBlob(String columnLabel, InputStream inputStream) + throws SQLException { + } + + @Override + public void updateBlob(int columnIndex, InputStream inputStream, long length) + throws SQLException { + } + + @Override + public void updateBlob(String columnLabel, InputStream inputStream, + long length) throws SQLException { + } + + @Override + public void updateBoolean(int columnIndex, boolean x) throws SQLException { + } + + @Override + public void updateBoolean(String columnLabel, boolean x) throws SQLException { + } + + @Override + public void updateByte(int columnIndex, byte x) throws SQLException { + } + + @Override + public void updateByte(String columnLabel, byte x) throws SQLException { + } + + @Override + public void updateBytes(int columnIndex, byte[] x) throws SQLException { + } + + @Override + public void updateBytes(String columnLabel, byte[] x) throws SQLException { + } + + @Override + public void updateCharacterStream(int columnIndex, Reader x) + throws SQLException { + } + + @Override + public void updateCharacterStream(String columnLabel, Reader reader) + throws SQLException { + } + + @Override + public void updateCharacterStream(int columnIndex, Reader x, int length) + throws SQLException { + } + + @Override + public void updateCharacterStream(String columnLabel, Reader reader, + int length) throws SQLException { + } + + @Override + public void updateCharacterStream(int columnIndex, Reader x, long length) + throws SQLException { + } + + @Override + public void updateCharacterStream(String columnLabel, Reader reader, + long length) throws SQLException { + } + + @Override + public void updateClob(int columnIndex, Clob x) throws SQLException { + } + + @Override + public void updateClob(String columnLabel, Clob x) throws SQLException { + } + + @Override + public void updateClob(int columnIndex, Reader reader) throws SQLException { + } + + @Override + public void updateClob(String columnLabel, Reader reader) throws SQLException { + } + + @Override + public void updateClob(int columnIndex, Reader reader, long length) + throws SQLException { + } + + @Override + public void updateClob(String columnLabel, Reader reader, long length) + throws SQLException { + } + + @Override + public void updateDate(int columnIndex, Date x) throws SQLException { + } + + @Override + public void updateDate(String columnLabel, Date x) throws SQLException { + } + + @Override + public void updateDouble(int columnIndex, double x) throws SQLException { + } + + @Override + public void updateDouble(String columnLabel, double x) throws SQLException { + } + + @Override + public void updateFloat(int columnIndex, float x) throws SQLException { + } + + @Override + public void updateFloat(String columnLabel, float x) throws SQLException { + } + + @Override + public void updateInt(int columnIndex, int x) throws SQLException { + } + + @Override + public void updateInt(String columnLabel, int x) throws SQLException { + } + + @Override + public void updateLong(int columnIndex, long x) throws SQLException { + } + + @Override + public void updateLong(String columnLabel, long x) throws SQLException { + } + + @Override + public void updateNCharacterStream(int columnIndex, Reader x) + throws SQLException { + } + + @Override + public void updateNCharacterStream(String columnLabel, Reader reader) + throws SQLException { + } + + @Override + public void updateNCharacterStream(int columnIndex, Reader x, long length) + throws SQLException { + } + + @Override + public void updateNCharacterStream(String columnLabel, Reader reader, + long length) throws SQLException { + } + + @Override + public void updateNClob(int columnIndex, NClob clob) throws SQLException { + } + + @Override + public void updateNClob(String columnLabel, NClob clob) throws SQLException { + } + + @Override + public void updateNClob(int columnIndex, Reader reader) throws SQLException { + } + + @Override + public void updateNClob(String columnLabel, Reader reader) + throws SQLException { + } + + @Override + public void updateNClob(int columnIndex, Reader reader, long length) + throws SQLException { + } + + @Override + public void updateNClob(String columnLabel, Reader reader, long length) + throws SQLException { + } + + @Override + public void updateNString(int columnIndex, String string) throws SQLException { + } + + @Override + public void updateNString(String columnLabel, String string) + throws SQLException { + } + + @Override + public void updateNull(int columnIndex) throws SQLException { + } + + @Override + public void updateNull(String columnLabel) throws SQLException { + } + + @Override + public void updateObject(int columnIndex, Object x) throws SQLException { + } + + @Override + public void updateObject(String columnLabel, Object x) throws SQLException { + } + + @Override + public void updateObject(int columnIndex, Object x, int scaleOrLength) + throws SQLException { + } + + @Override + public void updateObject(String columnLabel, Object x, int scaleOrLength) + throws SQLException { + } + + @Override + public void updateRef(int columnIndex, Ref x) throws SQLException { + } + + @Override + public void updateRef(String columnLabel, Ref x) throws SQLException { + } + + @Override + public void updateRow() throws SQLException { + } + + @Override + public void updateRowId(int columnIndex, RowId x) throws SQLException { + } + + @Override + public void updateRowId(String columnLabel, RowId x) throws SQLException { + } + + @Override + public void updateSQLXML(int columnIndex, SQLXML xmlObject) + throws SQLException { + } + + @Override + public void updateSQLXML(String columnLabel, SQLXML xmlObject) + throws SQLException { + } + + @Override + public void updateShort(int columnIndex, short x) throws SQLException { + } + + @Override + public void updateShort(String columnLabel, short x) throws SQLException { + } + + @Override + public void updateString(int columnIndex, String x) throws SQLException { + } + + @Override + public void updateString(String columnLabel, String x) throws SQLException { + } + + @Override + public void updateTime(int columnIndex, Time x) throws SQLException { + } + + @Override + public void updateTime(String columnLabel, Time x) throws SQLException { + } + + @Override + public void updateTimestamp(int columnIndex, Timestamp x) throws SQLException { + } + + @Override + public void updateTimestamp(String columnLabel, Timestamp x) + throws SQLException { + } + + @Override + public boolean wasNull() throws SQLException { + return false; + } + + @Override + public boolean isWrapperFor(Class iface) throws SQLException { + return false; + } + + @Override + public T unwrap(Class iface) throws SQLException { + return null; + } + + @Override + public byte[] getBytes(int columnIndex) throws SQLException { + return null; + } + + @Override + public byte[] getBytes(String columnLabel) throws SQLException { + return null; + } +} +