From c3b9a87d6007b6ee424d0637710561afe0a70666 Mon Sep 17 00:00:00 2001 From: Jarek Jarcec Cecho Date: Sun, 13 Jul 2014 09:29:40 -0700 Subject: [PATCH] SQOOP-1239: Sqoop import code too large error (Abraham Elmahrek via Jarek Jarcec Cecho) --- .../org/apache/sqoop/orm/ClassWriter.java | 582 ++++++++++++++++-- .../sqoop/mapreduce/TestImportJob.java | 46 ++ 2 files changed, 571 insertions(+), 57 deletions(-) diff --git a/src/java/org/apache/sqoop/orm/ClassWriter.java b/src/java/org/apache/sqoop/orm/ClassWriter.java index df1ab727..364431d4 100644 --- a/src/java/org/apache/sqoop/orm/ClassWriter.java +++ b/src/java/org/apache/sqoop/orm/ClassWriter.java @@ -118,6 +118,9 @@ public class ClassWriter { JAVA_RESERVED_WORDS.add("while"); } + public static final String PROPERTY_CODEGEN_METHODS_MAXCOLS = + "codegen.methods.maxcols"; + /** * This version number is injected into all generated Java classes to denote * which version of the ClassWriter's output format was used to generate the @@ -129,6 +132,18 @@ public class ClassWriter { */ public static final int CLASS_WRITER_VERSION = 3; + /** + * Default maximum number of columns per method. + */ + public static final int MAX_COLUMNS_PER_METHOD_DEFAULT = 500; + + /** + * This number confines the number of allowed columns in a single method. + * It allows code generation to scale to thousands of columns without + * running into "code too large" exceptions. + */ + private int maxColumnsPerMethod; + private SqoopOptions options; private ConnManager connManager; private String tableName; @@ -152,6 +167,9 @@ public ClassWriter(final SqoopOptions opts, final ConnManager connMgr, this.bigDecimalFormatString = this.options.getConf().getBoolean( ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT, ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT_DEFAULT); + this.maxColumnsPerMethod = this.options.getConf().getInt( + PROPERTY_CODEGEN_METHODS_MAXCOLS, + MAX_COLUMNS_PER_METHOD_DEFAULT); } /** @@ -493,6 +511,32 @@ private String rpcSetterForMaybeNull(String javaType, String outputObj, + " }\n"; } + /** + * Get the number of methods that should be generated for a particular column set. + * @param colNames + * @param size + * @return + */ + private int getNumberOfMethods(String[] colNames, int size) { + int extra = 0; + if (colNames.length % size != 0) { + extra = 1; + } + return colNames.length / size + extra; + } + + /** + * Get the top boundary when iterating through columns on a + * per method basis. + * @param colNames + * @param methodNumber + * @param size + * @return + */ + private int topBoundary(String[] colNames, int methodNumber, int size) { + return (colNames.length > (methodNumber + 1) * size) ? (methodNumber + 1) * size : colNames.length; + } + /** * Generate a member field, getter, setter and with method for each column. * @param columnTypes - mapping from column names to sql types @@ -537,7 +581,43 @@ private void generateFields(Map columnTypes, private void generateEquals(Map columnTypes, String [] colNames, String className, StringBuilder sb) { + int numberOfMethods = this.getNumberOfMethods(colNames, maxColumnsPerMethod); + sb.append(" public boolean equals(Object o) {\n"); + if (numberOfMethods > 1) { + sb.append(" boolean equal = true;\n"); + for (int i = 0; i < numberOfMethods; ++i) { + sb.append(" equal = equal && this.equals" + i + "(o);\n"); + } + sb.append(" return equal;\n"); + } else { + _generateEquals(columnTypes, colNames, className, sb, 0, maxColumnsPerMethod, false); + } + sb.append(" }\n"); + + for (int i = 0; i < numberOfMethods; ++i) { + _generateEquals(columnTypes, colNames, className, sb, i, maxColumnsPerMethod, true); + } + } + + /** + * Generate an equals method that compares the fields for each column. + * @param columnTypes - mapping from column names to sql types + * @param colNames - ordered list of column names for table + * @param className - name of the generated class + * @param sb - StringBuilder to append code to + * @param methodNumber - method number + * @param size - number of columns per method + * @param wrapInMethod - wrap body in a method. + */ + private void _generateEquals(Map columnTypes, + String [] colNames, String className, StringBuilder sb, + int methodNumber, int size, boolean wrapInMethod) { + + if (wrapInMethod) { + sb.append(" public boolean equals" + methodNumber + "(Object o) {\n"); + } + sb.append(" if (this == o) {\n"); sb.append(" return true;\n"); sb.append(" }\n"); @@ -546,7 +626,8 @@ private void generateEquals(Map columnTypes, sb.append(" }\n"); sb.append(" " + className + " that = (" + className + ") o;\n"); sb.append(" boolean equal = true;\n"); - for (String col : colNames) { + for (int i = size * methodNumber; i < topBoundary(colNames, methodNumber, size); ++i) { + String col = colNames[i]; int sqlType = columnTypes.get(col); String javaType = toJavaType(col, sqlType); if (null == javaType) { @@ -557,7 +638,10 @@ private void generateEquals(Map columnTypes, + " == null : this." + col + ".equals(that." + col + "));\n"); } sb.append(" return equal;\n"); - sb.append(" }\n"); + + if (wrapInMethod) { + sb.append(" }\n"); + } } /** @@ -569,23 +653,53 @@ private void generateEquals(Map columnTypes, private void generateDbRead(Map columnTypes, String [] colNames, StringBuilder sb) { + int numberOfMethods = this.getNumberOfMethods(colNames, maxColumnsPerMethod); + sb.append(" public void readFields(ResultSet __dbResults) "); sb.append("throws SQLException {\n"); - // Save ResultSet object cursor for use in LargeObjectLoader // if necessary. sb.append(" this.__cur_result_set = __dbResults;\n"); + if (numberOfMethods > 1) { + for (int i = 0; i < numberOfMethods; ++i) { + sb.append(" this.readFields" + i + "(__dbResults);\n"); + } + } else { + _generateDbRead(columnTypes, colNames, sb, 0, maxColumnsPerMethod, false); + } + sb.append(" }\n"); - int fieldNum = 0; + for (int i = 0; i < numberOfMethods; ++i) { + _generateDbRead(columnTypes, colNames, sb, i, maxColumnsPerMethod, true); + } + } - for (String col : colNames) { - fieldNum++; + /** + * Generate the readFields() method used by the database. + * @param columnTypes - mapping from column names to sql types + * @param colNames - ordered list of column names for table. + * @param sb - StringBuilder to append code to + * @param methodNumber - method number + * @param size - number of columns per method + * @param wrapInMethod - wrap body in a method. + */ + private void _generateDbRead(Map columnTypes, + String [] colNames, StringBuilder sb, + int methodNumber, int size, boolean wrapInMethod) { + + if (wrapInMethod) { + sb.append(" public void readFields" + methodNumber + "(ResultSet __dbResults) "); + sb.append("throws SQLException {\n"); + } + + for (int i = methodNumber * size; i < topBoundary(colNames, methodNumber, size); ++i) { + String col = colNames[i]; int sqlType = columnTypes.get(col); String javaType = toJavaType(col, sqlType); if (null == javaType) { LOG.error("No Java type for SQL type " + sqlType - + " for column " + col); + + " for column " + col); continue; } @@ -596,10 +710,12 @@ private void generateDbRead(Map columnTypes, } sb.append(" this." + col + " = JdbcWritableBridge." + getterMethod - + "(" + fieldNum + ", __dbResults);\n"); + + "(" + (i + 1) + ", __dbResults);\n"); } - sb.append(" }\n"); + if (wrapInMethod) { + sb.append(" }\n"); + } } /** @@ -609,6 +725,8 @@ private void generateDbRead(Map columnTypes, private void generateLoadLargeObjects(Map columnTypes, String [] colNames, StringBuilder sb) { + int numberOfMethods = this.getNumberOfMethods(colNames, maxColumnsPerMethod); + // This method relies on the __cur_result_set field being set by // readFields() method generated by generateDbRead(). @@ -616,16 +734,46 @@ private void generateLoadLargeObjects(Map columnTypes, sb.append(" throws SQLException, IOException, "); sb.append("InterruptedException {\n"); - int fieldNum = 0; + if (numberOfMethods > 1) { + for (int i = 0; i < numberOfMethods; ++i) { + sb.append(" this.loadLargeObjects" + i + "(__loader);\n"); + } + } else { + _generateLoadLargeObjects(columnTypes, colNames, sb, 0, maxColumnsPerMethod, false); + } - for (String col : colNames) { - fieldNum++; + sb.append(" }\n"); + + for (int i = 0; i < numberOfMethods; ++i) { + _generateLoadLargeObjects(columnTypes, colNames, sb, i, maxColumnsPerMethod, true); + } + } + + /** + * Generate the loadLargeObjects() method called by the mapper to load + * delayed objects (that require the Context from the mapper). + */ + private void _generateLoadLargeObjects(Map columnTypes, + String [] colNames, StringBuilder sb, + int methodNumber, int size, boolean wrapInMethod) { + + // This method relies on the __cur_result_set field being set by + // readFields() method generated by generateDbRead(). + + if (wrapInMethod) { + sb.append(" public void loadLargeObjects" + methodNumber + "(LargeObjectLoader __loader)\n"); + sb.append(" throws SQLException, IOException, "); + sb.append("InterruptedException {\n"); + } + + for (int i = methodNumber * size; i < topBoundary(colNames, methodNumber, size); ++i) { + String col = colNames[i]; int sqlType = columnTypes.get(col); String javaType = toJavaType(col, sqlType); if (null == javaType) { LOG.error("No Java type for SQL type " + sqlType - + " for column " + col); + + " for column " + col); continue; } @@ -636,12 +784,14 @@ private void generateLoadLargeObjects(Map columnTypes, // appropriate LargeObjectLoader method (which has the same name as a // JdbcWritableBridge method). sb.append(" this." + col + " = __loader." + getterMethod - + "(" + fieldNum + ", this.__cur_result_set);\n"); + + "(" + (i + 1) + ", this.__cur_result_set);\n"); } } - sb.append(" }\n"); - } + if (wrapInMethod) { + sb.append(" }\n"); + } + } /** * Generate the write() method used by the database. @@ -652,6 +802,8 @@ private void generateLoadLargeObjects(Map columnTypes, private void generateDbWrite(Map columnTypes, String [] colNames, StringBuilder sb) { + int numberOfMethods = this.getNumberOfMethods(colNames, maxColumnsPerMethod); + sb.append(" public void write(PreparedStatement __dbStmt) " + "throws SQLException {\n"); sb.append(" write(__dbStmt, 0);\n"); @@ -660,16 +812,48 @@ private void generateDbWrite(Map columnTypes, sb.append(" public int write(PreparedStatement __dbStmt, int __off) " + "throws SQLException {\n"); - int fieldNum = 0; + if (numberOfMethods > 1) { + for (int i = 0; i < numberOfMethods; ++i) { + sb.append(" write" + i + "(__dbStmt, __off);\n"); + } + } else { + _generateDbWrite(columnTypes, colNames, sb, 0, maxColumnsPerMethod, false); + } - for (String col : colNames) { - fieldNum++; + sb.append(" return " + colNames.length + ";\n"); + sb.append(" }\n"); + + for (int i = 0; i < numberOfMethods; ++i) { + _generateDbWrite(columnTypes, colNames, sb, i, maxColumnsPerMethod, true); + } + } + + /** + * Generate the write() method used by the database. + * @param columnTypes - mapping from column names to sql types + * @param colNames - ordered list of column names for table. + * @param sb - StringBuilder to append code to + * @param methodNumber - method number + * @param size - number of columns per method + * @param wrapInMethod - wrap body in a method. + */ + private void _generateDbWrite(Map columnTypes, + String [] colNames, StringBuilder sb, + int methodNumber, int size, boolean wrapInMethod) { + + if (wrapInMethod) { + sb.append(" public void write" + methodNumber + "(PreparedStatement __dbStmt, int __off) " + + "throws SQLException {\n"); + } + + for (int i = methodNumber * size; i < topBoundary(colNames, methodNumber, size); ++i) { + String col = colNames[i]; int sqlType = columnTypes.get(col); String javaType = toJavaType(col, sqlType); if (null == javaType) { LOG.error("No Java type for SQL type " + sqlType - + " for column " + col); + + " for column " + col); continue; } @@ -680,14 +864,14 @@ private void generateDbWrite(Map columnTypes, } sb.append(" JdbcWritableBridge." + setterMethod + "(" + col + ", " - + fieldNum + " + __off, " + sqlType + ", __dbStmt);\n"); + + (i + 1) + " + __off, " + sqlType + ", __dbStmt);\n"); } - sb.append(" return " + fieldNum + ";\n"); - sb.append(" }\n"); + if (wrapInMethod) { + sb.append(" }\n"); + } } - /** * Generate the readFields() method used by the Hadoop RPC system. * @param columnTypes - mapping from column names to sql types @@ -697,15 +881,46 @@ private void generateDbWrite(Map columnTypes, private void generateHadoopRead(Map columnTypes, String [] colNames, StringBuilder sb) { + int numberOfMethods = this.getNumberOfMethods(colNames, maxColumnsPerMethod); + sb.append(" public void readFields(DataInput __dataIn) " + "throws IOException {\n"); - for (String col : colNames) { + for (int i = 0; i < numberOfMethods; ++i) { + sb.append("this.readFields" + i + "(__dataIn);"); + } + + sb.append(" }\n"); + + for (int i = 0; i < numberOfMethods; ++i) { + _generateHadoopRead(columnTypes, colNames, sb, i, maxColumnsPerMethod, true); + } + } + + /** + * Generate the readFields() method used by the Hadoop RPC system. + * @param columnTypes - mapping from column names to sql types + * @param colNames - ordered list of column names for table. + * @param sb - StringBuilder to append code to + * @param methodNumber - method number + * @param size - number of columns per method + * @param wrapInMethod - wrap body in a method. + */ + private void _generateHadoopRead(Map columnTypes, + String [] colNames, StringBuilder sb, + int methodNumber, int size, boolean wrapInMethod) { + if (wrapInMethod) { + sb.append(" public void readFields" + methodNumber + "(DataInput __dataIn) " + + "throws IOException {\n"); + } + + for (int i = methodNumber * size; i < topBoundary(colNames, methodNumber, size); ++i) { + String col = colNames[i]; int sqlType = columnTypes.get(col); String javaType = toJavaType(col, sqlType); if (null == javaType) { LOG.error("No Java type for SQL type " + sqlType - + " for column " + col); + + " for column " + col); continue; } @@ -718,7 +933,9 @@ private void generateHadoopRead(Map columnTypes, sb.append(getterMethod); } - sb.append(" }\n"); + if (wrapInMethod) { + sb.append(" }\n"); + } } /** @@ -730,14 +947,52 @@ private void generateHadoopRead(Map columnTypes, private void generateCloneMethod(Map columnTypes, String [] colNames, StringBuilder sb) { + int numberOfMethods = this.getNumberOfMethods(colNames, maxColumnsPerMethod); + TableClassName tableNameInfo = new TableClassName(options); String className = tableNameInfo.getShortClassForTable(tableName); sb.append(" public Object clone() throws CloneNotSupportedException {\n"); sb.append(" " + className + " o = (" + className + ") super.clone();\n"); + if (numberOfMethods > 1) { + for (int i = 0; i < numberOfMethods; ++i) { + sb.append(" this.clone" + i + "(o);"); + } + } else { + _generateCloneMethod(columnTypes, colNames, sb, 0, maxColumnsPerMethod, false); + } + + sb.append(" return o;\n"); + sb.append(" }\n\n"); + + for (int i = 0; i < numberOfMethods; ++i) { + _generateCloneMethod(columnTypes, colNames, sb, i, maxColumnsPerMethod, true); + } + } + + /** + * Generate the clone() method. + * @param columnTypes - mapping from column names to sql types + * @param colNames - ordered list of column names for table. + * @param sb - StringBuilder to append code to + * @param methodNumber - method number + * @param size - number of columns per method + * @param wrapInMethod - wrap body in a method. + */ + private void _generateCloneMethod(Map columnTypes, + String [] colNames, StringBuilder sb, + int methodNumber, int size, boolean wrapInMethod) { + TableClassName tableNameInfo = new TableClassName(options); + String className = tableNameInfo.getShortClassForTable(tableName); + + if (wrapInMethod) { + sb.append(" public void clone" + methodNumber + "(" + className + " o) throws CloneNotSupportedException {\n"); + } + // For each field that is mutable, we need to perform the deep copy. - for (String colName : colNames) { + for (int i = methodNumber * size; i < topBoundary(colNames, methodNumber, size); ++i) { + String colName = colNames[i]; int sqlType = columnTypes.get(colName); String javaType = toJavaType(colName, sqlType); if (null == javaType) { @@ -748,7 +1003,7 @@ private void generateCloneMethod(Map columnTypes, || javaType.equals(ClobRef.class.getName()) || javaType.equals(BlobRef.class.getName())) { sb.append(" o." + colName + " = (o." + colName + " != null) ? (" - + javaType + ") o." + colName + ".clone() : null;\n"); + + javaType + ") o." + colName + ".clone() : null;\n"); } else if (javaType.equals(BytesWritable.class.getName())) { sb.append(" o." + colName + " = (o." + colName + " != null) ? " + "new BytesWritable(Arrays.copyOf(" + colName + ".getBytes(), " @@ -756,8 +1011,9 @@ private void generateCloneMethod(Map columnTypes, } } - sb.append(" return o;\n"); - sb.append(" }\n\n"); + if (wrapInMethod) { + sb.append(" }\n\n"); + } } /** @@ -768,10 +1024,69 @@ private void generateCloneMethod(Map columnTypes, */ private void generateSetField(Map columnTypes, String [] colNames, StringBuilder sb) { + + int numberOfMethods = this.getNumberOfMethods(colNames, maxColumnsPerMethod); + sb.append(" public void setField(String __fieldName, Object __fieldVal) " + "{\n"); + if (numberOfMethods > 1) { + boolean first = true; + for (int i = 0; i < numberOfMethods; ++i) { + if (!first) { + sb.append(" else"); + } + sb.append(" if (this.setField" + i + "(__fieldName, __fieldVal)) {\n"); + sb.append(" return;\n"); + sb.append(" }\n"); + first = false; + } + } else { + boolean first = true; + for (String colName : colNames) { + int sqlType = columnTypes.get(colName); + String javaType = toJavaType(colName, sqlType); + if (null == javaType) { + continue; + } else { + if (!first) { + sb.append(" else"); + } + + sb.append(" if (\"" + colName + "\".equals(__fieldName)) {\n"); + sb.append(" this." + colName + " = (" + javaType + + ") __fieldVal;\n"); + sb.append(" }\n"); + first = false; + } + } + } + sb.append(" else {\n"); + sb.append(" throw new RuntimeException("); + sb.append("\"No such field: \" + __fieldName);\n"); + sb.append(" }\n"); + sb.append(" }\n"); + + for (int i = 0; i < numberOfMethods; ++i) { + _generateSetField(columnTypes, colNames, sb, i, maxColumnsPerMethod); + } + } + + /** + * Generate the setField() method. + * @param columnTypes - mapping from column names to sql types + * @param colNames - ordered list of column names for table. + * @param sb - StringBuilder to append code to + * @param methodNumber - method number + * @param size - number of columns per method + */ + private void _generateSetField(Map columnTypes, + String [] colNames, StringBuilder sb, + int methodNumber, int size) { + sb.append(" public boolean setField" + methodNumber + "(String __fieldName, Object __fieldVal) {\n"); + boolean first = true; - for (String colName : colNames) { + for (int i = methodNumber * size; i < topBoundary(colNames, methodNumber, size); ++i) { + String colName = colNames[i]; int sqlType = columnTypes.get(colName); String javaType = toJavaType(colName, sqlType); if (null == javaType) { @@ -784,13 +1099,13 @@ private void generateSetField(Map columnTypes, sb.append(" if (\"" + colName + "\".equals(__fieldName)) {\n"); sb.append(" this." + colName + " = (" + javaType + ") __fieldVal;\n"); + sb.append(" return true;\n"); sb.append(" }\n"); first = false; } } sb.append(" else {\n"); - sb.append(" throw new RuntimeException("); - sb.append("\"No such field: \" + __fieldName);\n"); + sb.append(" return false;"); sb.append(" }\n"); sb.append(" }\n"); } @@ -803,15 +1118,51 @@ private void generateSetField(Map columnTypes, */ private void generateGetFieldMap(Map columnTypes, String [] colNames, StringBuilder sb) { + int numberOfMethods = this.getNumberOfMethods(colNames, maxColumnsPerMethod); + sb.append(" public Map getFieldMap() {\n"); sb.append(" Map __sqoop$field_map = " + "new TreeMap();\n"); - for (String colName : colNames) { - sb.append(" __sqoop$field_map.put(\"" + colName + "\", this." - + colName + ");\n"); + if (numberOfMethods > 1) { + for (int i = 0; i < numberOfMethods; ++i) { + sb.append(" this.getFieldMap" + i + "(__sqoop$field_map);\n"); + } + } else { + _generateGetFieldMap(columnTypes, colNames, sb, 0, maxColumnsPerMethod, false); } sb.append(" return __sqoop$field_map;\n"); sb.append(" }\n\n"); + + for (int i = 0; i < numberOfMethods; ++i) { + _generateGetFieldMap(columnTypes, colNames, sb, i, maxColumnsPerMethod, true); + } + } + + /** + * Generate the getFieldMap() method. + * @param columnTypes - mapping from column names to sql types + * @param colNames - ordered list of column names for table. + * @param sb - StringBuilder to append code to + * @param methodNumber - method number + * @param size - number of columns per method + * @param wrapInMethod - wrap body in a method. + */ + private void _generateGetFieldMap(Map columnTypes, + String [] colNames, StringBuilder sb, + int methodNumber, int size, boolean wrapInMethod) { + if (wrapInMethod) { + sb.append(" public void getFieldMap" + methodNumber + "(Map __sqoop$field_map) {\n"); + } + + for (int i = methodNumber * size; i < topBoundary(colNames, methodNumber, size); ++i) { + String colName = colNames[i]; + sb.append(" __sqoop$field_map.put(\"" + colName + "\", this." + + colName + ");\n"); + } + + if (wrapInMethod) { + sb.append(" }\n\n"); + } } /** @@ -823,6 +1174,8 @@ private void generateGetFieldMap(Map columnTypes, private void generateToString(Map columnTypes, String [] colNames, StringBuilder sb) { + int numberOfMethods = this.getNumberOfMethods(colNames, maxColumnsPerMethod); + // Save the delimiters to the class. sb.append(" private static final DelimiterSet __outputDelimiters = "); sb.append(options.getOutputDelimiters().formatConstructor() + ";\n"); @@ -855,13 +1208,56 @@ private void generateToString(Map columnTypes, sb.append(" StringBuilder __sb = new StringBuilder();\n"); sb.append(" char fieldDelim = delimiters.getFieldsTerminatedBy();\n"); + if (numberOfMethods > 1) { + for (int i = 0; i < numberOfMethods; ++i) { + sb.append(" this.toString" + i + "(delimiters, __sb, fieldDelim);\n"); + } + } else { + _generateToString(columnTypes, colNames, sb, true, 0, maxColumnsPerMethod, false); + } + + sb.append(" if (useRecordDelim) {\n"); + sb.append(" __sb.append(delimiters.getLinesTerminatedBy());\n"); + sb.append(" }\n"); + sb.append(" return __sb.toString();\n"); + sb.append(" }\n"); + boolean first = true; - for (String col : colNames) { + for (int i = 0; i < numberOfMethods; ++i) { + _generateToString(columnTypes, colNames, sb, first, i, maxColumnsPerMethod, true); + first = false; + } + } + + /** + * Generate the toString() method. + * @param columnTypes - mapping from column names to sql types + * @param colNames - ordered list of column names for table. + * @param sb - StringBuilder to append code to + * @param methodNumber - method number + * @param size - number of columns per method + * @param wrapInMethod - wrap body in a method. + */ + private void _generateToString(Map columnTypes, + String [] colNames, StringBuilder sb, + boolean first, int methodNumber, int size, + boolean wrapInMethod) { + // This toString() variant allows the user to specify delimiters, as well + // as whether or not the end-of-record delimiter should be added to the + // string. Use 'false' to do reasonable things with TextOutputFormat, + // which appends its own newline. + if (wrapInMethod) { + sb.append(" public void toString" + methodNumber + "(DelimiterSet delimiters, "); + sb.append("StringBuilder __sb, char fieldDelim) {\n"); + } + + for (int i = methodNumber * size; i < topBoundary(colNames, methodNumber, size); ++i) { + String col = colNames[i]; int sqlType = columnTypes.get(col); String javaType = toJavaType(col, sqlType); if (null == javaType) { LOG.error("No Java type for SQL type " + sqlType - + " for column " + col); + + " for column " + col); continue; } @@ -880,28 +1276,26 @@ private void generateToString(Map columnTypes, if (javaType.equals("String") && options.doHiveDropDelims()) { sb.append(" // special case for strings hive, dropping" - + "delimiters \\n,\\r,\\01 from strings\n"); + + "delimiters \\n,\\r,\\01 from strings\n"); sb.append(" __sb.append(FieldFormatter.hiveStringDropDelims(" - + stringExpr + ", delimiters));\n"); + + stringExpr + ", delimiters));\n"); } else if (javaType.equals("String") - && options.getHiveDelimsReplacement() != null) { + && options.getHiveDelimsReplacement() != null) { sb.append(" // special case for strings hive, replacing " - + "delimiters \\n,\\r,\\01 with '" - + options.getHiveDelimsReplacement() + "' from strings\n"); + + "delimiters \\n,\\r,\\01 with '" + + options.getHiveDelimsReplacement() + "' from strings\n"); sb.append(" __sb.append(FieldFormatter.hiveStringReplaceDelims(" - + stringExpr + ", \"" + options.getHiveDelimsReplacement() + "\", " - + "delimiters));\n"); + + stringExpr + ", \"" + options.getHiveDelimsReplacement() + "\", " + + "delimiters));\n"); } else { sb.append(" __sb.append(FieldFormatter.escapeAndEnclose(" + stringExpr + ", delimiters));\n"); } } - sb.append(" if (useRecordDelim) {\n"); - sb.append(" __sb.append(delimiters.getLinesTerminatedBy());\n"); - sb.append(" }\n"); - sb.append(" return __sb.toString();\n"); - sb.append(" }\n"); + if (wrapInMethod) { + sb.append(" }\n"); + } } /** @@ -1007,6 +1401,8 @@ private void parseColumn(String colName, int colType, StringBuilder sb) { private void generateParser(Map columnTypes, String [] colNames, StringBuilder sb) { + int numberOfMethods = this.getNumberOfMethods(colNames, maxColumnsPerMethod); + // Embed into the class the delimiter characters to use when parsing input // records. Note that these can differ from the delims to use as output // via toString(), if the user wants to use this class to convert one @@ -1030,17 +1426,52 @@ private void generateParser(Map columnTypes, // method is type-dependent for the fields. sb.append(" private void __loadFromFields(List fields) {\n"); sb.append(" Iterator __it = fields.listIterator();\n"); + if (numberOfMethods > 1) { + for (int i = 0; i < numberOfMethods; ++i) { + sb.append(" this.__loadFromFields" + i + "(__it);\n"); + } + } else { + _generateParser(columnTypes, colNames, sb, 0, maxColumnsPerMethod, false); + } + sb.append(" }\n\n"); + + for (int i = 0; i < numberOfMethods; ++i) { + _generateParser(columnTypes, colNames, sb, i, maxColumnsPerMethod, true); + } + } + + /** + * Generate the parse() method. + * @param columnTypes - mapping from column names to sql types + * @param colNames - ordered list of column names for table. + * @param sb - StringBuilder to append code to + * @param methodNumber - method number + * @param size - number of columns per method + * @param wrapInMethod - wrap body in a method. + */ + private void _generateParser(Map columnTypes, + String [] colNames, StringBuilder sb, + int methodNumber, int size, boolean wrapInMethod) { + // The wrapper methods call __loadFromFields() to actually interpret the + // raw field data as string, int, boolean, etc. The generation of this + // method is type-dependent for the fields. + if (wrapInMethod) { + sb.append(" private void __loadFromFields" + methodNumber + "(Iterator __it) {\n"); + } sb.append(" String __cur_str = null;\n"); sb.append(" try {\n"); - for (String colName : colNames) { + for (int i = methodNumber * size; i < topBoundary(colNames, methodNumber, size); ++i) { + String colName = colNames[i]; int colType = columnTypes.get(colName); parseColumn(colName, colType, sb); } sb.append(" } catch (RuntimeException e) {"); sb.append(" throw new RuntimeException(" - + "\"Can't parse input data: '\" + __cur_str + \"'\", e);"); + + "\"Can't parse input data: '\" + __cur_str + \"'\", e);"); sb.append(" }"); - sb.append(" }\n\n"); + if (wrapInMethod) { + sb.append(" }\n\n"); + } } /** @@ -1052,15 +1483,50 @@ private void generateParser(Map columnTypes, private void generateHadoopWrite(Map columnTypes, String [] colNames, StringBuilder sb) { + int numberOfMethods = this.getNumberOfMethods(colNames, maxColumnsPerMethod); + sb.append(" public void write(DataOutput __dataOut) " + "throws IOException {\n"); - for (String col : colNames) { + if (numberOfMethods > 1) { + for (int i = 0; i < numberOfMethods; ++i) { + sb.append(" this.write" + i + "(__dataOut);\n"); + } + } else { + _generateHadoopWrite(columnTypes, colNames, sb, 0, maxColumnsPerMethod, false); + } + + sb.append(" }\n"); + + for (int i = 0; i < numberOfMethods; ++i) { + _generateHadoopWrite(columnTypes, colNames, sb, i, maxColumnsPerMethod, true); + } + } + + /** + * Generate the write() method used by the Hadoop RPC system. + * @param columnTypes - mapping from column names to sql types + * @param colNames - ordered list of column names for table. + * @param sb - StringBuilder to append code to + * @param methodNumber - method number + * @param size - number of columns per method + * @param wrapInMethod - wrap body in a method. + */ + private void _generateHadoopWrite(Map columnTypes, + String [] colNames, StringBuilder sb, + int methodNumber, int size, boolean wrapInMethod) { + if (wrapInMethod) { + sb.append(" public void write" + methodNumber + "(DataOutput __dataOut) " + + "throws IOException {\n"); + } + + for (int i = methodNumber * size; i < topBoundary(colNames, methodNumber, size); ++i) { + String col = colNames[i]; int sqlType = columnTypes.get(col); String javaType = toJavaType(col, sqlType); if (null == javaType) { LOG.error("No Java type for SQL type " + sqlType - + " for column " + col); + + " for column " + col); continue; } @@ -1073,7 +1539,9 @@ private void generateHadoopWrite(Map columnTypes, sb.append(setterMethod); } - sb.append(" }\n"); + if (wrapInMethod) { + sb.append(" }\n"); + } } /** diff --git a/src/test/com/cloudera/sqoop/mapreduce/TestImportJob.java b/src/test/com/cloudera/sqoop/mapreduce/TestImportJob.java index 95049747..d3f55499 100644 --- a/src/test/com/cloudera/sqoop/mapreduce/TestImportJob.java +++ b/src/test/com/cloudera/sqoop/mapreduce/TestImportJob.java @@ -45,6 +45,7 @@ import com.cloudera.sqoop.testutil.InjectableManagerFactory; import com.cloudera.sqoop.testutil.InjectableConnManager; import com.cloudera.sqoop.tool.ImportTool; +import org.apache.hadoop.util.StringUtils; import org.apache.sqoop.SqoopOptions; import org.apache.sqoop.util.ClassLoaderStack; @@ -326,4 +327,49 @@ public void testDeleteTargetDir() throws Exception { LOG.info("Got exceptional return (expected: ok). msg is: " + e); } } + + public void testManyColumns() throws Exception { + int numberOfColumns = 7500; + + // Create a bunch of columns + String[] colNames = new String[numberOfColumns]; + String[] colTypes = new String[numberOfColumns]; + String[] colVals = new String[numberOfColumns]; + List testColVals = new ArrayList(numberOfColumns); + for (int i = 0; i < numberOfColumns; ++i) { + colNames[i] = BASE_COL_NAME + Integer.toString(i); + colTypes[i] = "VARCHAR(32)"; + colVals[i] = "'meep'"; + testColVals.add("meep"); + } + createTableWithColTypesAndNames(colNames, colTypes, colVals); + + Configuration conf = new Configuration(); + + // Make sure the output dir does not exist + Path outputPath = new Path(new Path(getWarehouseDir()), getTableName()); + FileSystem fs = FileSystem.getLocal(conf); + fs.delete(outputPath, true); + assertTrue(!fs.exists(outputPath)); + + String[] argv = getArgv(true, colNames, conf); + + Sqoop importer = new Sqoop(new ImportTool()); + try { + int ret = Sqoop.runSqoop(importer, argv); + assertTrue("Expected job to go through if target directory" + + " does not exist.", 0 == ret); + assertTrue(fs.exists(outputPath)); + // expecting one _SUCCESS file and one file containing data + assertTrue("Expecting two files in the directory.", + fs.listStatus(outputPath).length == 2); + String[] output = getContent(conf, outputPath); + assertEquals("Expected output and actual output should be same.", + StringUtils.join(",", testColVals) + "\n", + output[0]); + } catch (Exception e) { + // In debug mode, ImportException is wrapped in RuntimeException. + LOG.info("Got exceptional return (expected: ok). msg is: " + e); + } + } }