diff --git a/common/pom.xml b/common/pom.xml index db11b5ba..9bfa07d4 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -45,6 +45,11 @@ limitations under the License. commons-lang commons-lang + + + com.google.guava + guava + diff --git a/common/src/main/java/org/apache/sqoop/etl/io/DataReader.java b/common/src/main/java/org/apache/sqoop/etl/io/DataReader.java index 3e1adc70..a34dfb44 100644 --- a/common/src/main/java/org/apache/sqoop/etl/io/DataReader.java +++ b/common/src/main/java/org/apache/sqoop/etl/io/DataReader.java @@ -18,17 +18,32 @@ package org.apache.sqoop.etl.io; /** - * An intermediate layer for passing data from the MR framework + * An intermediate layer for passing data from the execution framework * to the ETL framework. */ public abstract class DataReader { + /** + * Read data from the execution framework as an object array. + * @return - array of objects with each column represented as an object + * @throws Exception + */ public abstract Object[] readArrayRecord() throws Exception; - public abstract String readCsvRecord() throws Exception; + /** + * Read data from execution framework as text - as a CSV record. + * public abstract Object readContent(int type) throws Exception; + * @return - CSV formatted data. + * @throws Exception + */ + public abstract String readTextRecord() throws Exception; - public abstract Object readContent(int type) throws Exception; - - public abstract void setFieldDelimiter(char fieldDelimiter); + /** + * Read data from execution framework as a native format. + * @return - the content in the native format of the intermediate data + * format being used. + * @throws Exception + */ + public abstract Object readContent() throws Exception; } diff --git a/common/src/main/java/org/apache/sqoop/etl/io/DataWriter.java b/common/src/main/java/org/apache/sqoop/etl/io/DataWriter.java index d81364ef..2166b09f 100644 --- a/common/src/main/java/org/apache/sqoop/etl/io/DataWriter.java +++ b/common/src/main/java/org/apache/sqoop/etl/io/DataWriter.java @@ -23,12 +23,24 @@ */ public abstract class DataWriter { + /** + * Write an array of objects into the execution framework + * @param array - data to be written + */ public abstract void writeArrayRecord(Object[] array); - public abstract void writeCsvRecord(String csv); + /** + * Write data into execution framework as text. The Intermediate Data Format + * may choose to convert the data to another format based on how the data + * format is implemented + * @param text - data represented as CSV text. + */ + public abstract void writeStringRecord(String text); - public abstract void writeContent(Object content, int type); - - public abstract void setFieldDelimiter(char fieldDelimiter); + /** + * Write data in the intermediate data format's native format. + * @param obj - data to be written + */ + public abstract void writeRecord(Object obj); } diff --git a/common/src/main/java/org/apache/sqoop/schema/type/Column.java b/common/src/main/java/org/apache/sqoop/schema/type/Column.java index 8b630b28..30c26a3c 100644 --- a/common/src/main/java/org/apache/sqoop/schema/type/Column.java +++ b/common/src/main/java/org/apache/sqoop/schema/type/Column.java @@ -98,4 +98,9 @@ public int hashCode() { result = 31 * result + (nullable != null ? nullable.hashCode() : 0); return result; } + + public boolean validate(Object o) { + // TODO: Implement this in all subclasses! + return true; + } } diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java index e0da80f7..298288e2 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java @@ -21,6 +21,8 @@ import java.util.ResourceBundle; import org.apache.sqoop.common.VersionInfo; +import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; +import org.apache.sqoop.connector.idf.IntermediateDataFormat; import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration; import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration; @@ -61,7 +63,7 @@ public String getVersion() { @Override public ResourceBundle getBundle(Locale locale) { return ResourceBundle.getBundle( - GenericJdbcConnectorConstants.RESOURCE_BUNDLE_NAME, locale); + GenericJdbcConnectorConstants.RESOURCE_BUNDLE_NAME, locale); } @Override diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java index 2b1a0ad5..c3747505 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java @@ -78,6 +78,8 @@ public enum GenericJdbcConnectorError implements ErrorCode { GENERIC_JDBC_CONNECTOR_0018("Error occurred while transferring data from " + "stage table to destination table."), + + GENERIC_JDBC_CONNECTOR_0019("Table name extraction not supported.") ; private final String message; diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java index ef39cdcb..80253be9 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java @@ -17,6 +17,9 @@ */ package org.apache.sqoop.connector.jdbc; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; import java.util.LinkedList; import java.util.List; @@ -26,9 +29,11 @@ import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration; +import org.apache.sqoop.connector.jdbc.util.SqlTypesUtils; import org.apache.sqoop.job.etl.Initializer; import org.apache.sqoop.job.etl.InitializerContext; import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.schema.type.Column; import org.apache.sqoop.utils.ClassUtils; public class GenericJdbcExportInitializer extends Initializer { @@ -58,7 +63,53 @@ public List getJars(InitializerContext context, ConnectionConfiguration @Override public Schema getSchema(InitializerContext context, ConnectionConfiguration connectionConfiguration, ExportJobConfiguration exportJobConfiguration) { - return null; + configureJdbcProperties(context.getContext(), connectionConfiguration, exportJobConfiguration); + + String schemaName = exportJobConfiguration.table.tableName; + + if (schemaName == null) { + throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0019, + "Table name extraction not supported yet."); + } + + if(exportJobConfiguration.table.schemaName != null) { + schemaName = exportJobConfiguration.table.schemaName + "." + schemaName; + } + + Schema schema = new Schema(schemaName); + ResultSet rs = null; + ResultSetMetaData rsmt = null; + try { + rs = executor.executeQuery("SELECT * FROM " + schemaName + " WHERE 1 = 0"); + + rsmt = rs.getMetaData(); + for (int i = 1 ; i <= rsmt.getColumnCount(); i++) { + Column column = SqlTypesUtils.sqlTypeToAbstractType(rsmt.getColumnType(i)); + + String columnName = rsmt.getColumnName(i); + if (columnName == null || columnName.equals("")) { + columnName = rsmt.getColumnLabel(i); + if (null == columnName) { + columnName = "Column " + i; + } + } + + column.setName(columnName); + schema.addColumn(column); + } + + return schema; + } catch (SQLException e) { + throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0016, e); + } finally { + if(rs != null) { + try { + rs.close(); + } catch (SQLException e) { + LOG.info("Ignoring exception while closing ResultSet", e); + } + } + } } private void configureJdbcProperties(MutableContext context, ConnectionConfiguration connectionConfig, ExportJobConfiguration jobConfig) { diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java index 96818ba2..2ad3cb20 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java @@ -71,16 +71,17 @@ public Schema getSchema(InitializerContext context, ConnectionConfiguration conn String schemaName = importJobConfiguration.table.tableName; if(schemaName == null) { schemaName = "Query"; + } else if(importJobConfiguration.table.schemaName != null) { + schemaName = importJobConfiguration.table.schemaName + "." + schemaName; } Schema schema = new Schema(schemaName); - ResultSet rs = null; ResultSetMetaData rsmt = null; try { rs = executor.executeQuery( - context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL) - .replace(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 0") + context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL) + .replace(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 0") ); rsmt = rs.getMetaData(); diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java index d4c4565a..fc3ddd02 100644 --- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java @@ -112,11 +112,6 @@ public void testInsert() throws Exception { public class DummyReader extends DataReader { int index = 0; - @Override - public void setFieldDelimiter(char fieldDelimiter) { - // do nothing and use default delimiter - } - @Override public Object[] readArrayRecord() { if (index < numberOfRows) { @@ -132,16 +127,17 @@ public Object[] readArrayRecord() { } @Override - public String readCsvRecord() { + public String readTextRecord() { fail("This method should not be invoked."); return null; } @Override - public Object readContent(int type) { + public Object readContent() throws Exception { fail("This method should not be invoked."); return null; } + } } diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java index a7ed6bab..30d0b9a9 100644 --- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java @@ -133,11 +133,6 @@ public void testSubquery() throws Exception { public class DummyWriter extends DataWriter { int indx = START; - @Override - public void setFieldDelimiter(char fieldDelimiter) { - // do nothing and use default delimiter - } - @Override public void writeArrayRecord(Object[] array) { for (int i = 0; i < array.length; i++) { @@ -153,12 +148,12 @@ public void writeArrayRecord(Object[] array) { } @Override - public void writeCsvRecord(String csv) { + public void writeStringRecord(String text) { fail("This method should not be invoked."); } @Override - public void writeContent(Object content, int type) { + public void writeRecord(Object content) { fail("This method should not be invoked."); } } diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java index a33fa363..cd05e308 100644 --- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java @@ -327,7 +327,7 @@ public void testGetSchemaForTable() throws Exception { Initializer initializer = new GenericJdbcImportInitializer(); initializer.initialize(initializerContext, connConf, jobConf); Schema schema = initializer.getSchema(initializerContext, connConf, jobConf); - assertEquals(getSchema(tableName), schema); + assertEquals(getSchema(jobConf.table.schemaName + "." + tableName), schema); } @SuppressWarnings("unchecked") diff --git a/connector/connector-sdk/pom.xml b/connector/connector-sdk/pom.xml index 4056e149..f54837d0 100644 --- a/connector/connector-sdk/pom.xml +++ b/connector/connector-sdk/pom.xml @@ -38,6 +38,12 @@ limitations under the License. junit test + + + org.apache.sqoop + sqoop-common + + diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java new file mode 100644 index 00000000..39d48c79 --- /dev/null +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sqoop.connector.idf; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.commons.lang.StringUtils; +import org.apache.log4j.Logger; +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.schema.type.Column; +import org.apache.sqoop.schema.type.FixedPoint; +import org.apache.sqoop.schema.type.FloatingPoint; +import org.apache.sqoop.schema.type.Type; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class CSVIntermediateDataFormat extends IntermediateDataFormat { + + public static final char SEPARATOR_CHARACTER = ','; + public static final char ESCAPE_CHARACTER = '\\'; + public static final char QUOTE_CHARACTER = '\''; + + private static final Logger LOG = Logger.getLogger + (CSVIntermediateDataFormat.class); + + private static final char[] originals = { + 0x5C,0x00,0x0A,0x0D,0x1A,0x22,0x27 + }; + + private static final String[] replacements = { + new String(new char[] { ESCAPE_CHARACTER, '\\'}), + new String(new char[] { ESCAPE_CHARACTER, '0'}), + new String(new char[] { ESCAPE_CHARACTER, 'n'}), + new String(new char[] { ESCAPE_CHARACTER, 'r'}), + new String(new char[] { ESCAPE_CHARACTER, 'Z'}), + new String(new char[] { ESCAPE_CHARACTER, '\"'}), + new String(new char[] { ESCAPE_CHARACTER, '\''}) + }; + + // ISO-8859-1 is an 8-bit codec that is supported in every java implementation. + public static final String BYTE_FIELD_CHARSET = "ISO-8859-1"; + + private final List stringFieldIndices = new ArrayList(); + private final List byteFieldIndices = new ArrayList(); + + private Schema schema; + + /** + * {@inheritDoc} + */ + @Override + public String getTextData() { + return data; + } + + /** + * {@inheritDoc} + */ + @Override + public void setTextData(String text) { + this.data = text; + } + + /** + * {@inheritDoc} + */ + @Override + public Schema getSchema() { + return schema; + } + + /** + * {@inheritDoc} + */ + @Override + public void setSchema(Schema schema) { + if(schema == null) { + return; + } + this.schema = schema; + List columns = schema.getColumns(); + int i = 0; + for(Column col : columns) { + if(col.getType() == Type.TEXT) { + stringFieldIndices.add(i); + } else if(col.getType() == Type.BINARY) { + byteFieldIndices.add(i); + } + i++; + } + } + + /** + * Custom CSV parser that honors quoting and escaped quotes. + * All other escaping is handled elsewhere. + * + * @return String[] + */ + private String[] getFields() { + if (data == null) { + return null; + } + + boolean quoted = false; + boolean escaped = false; + List parsedData = new LinkedList(); + StringBuffer buffer = new StringBuffer(); + for (int i = 0; i < data.length(); ++i) { + char c = data.charAt(i); + switch(c) { + case QUOTE_CHARACTER: + buffer.append(c); + if (escaped) { + escaped = false; + } else { + quoted = !quoted; + } + break; + + case ESCAPE_CHARACTER: + buffer.append(ESCAPE_CHARACTER); + escaped = !escaped; + break; + + case SEPARATOR_CHARACTER: + if (quoted) { + buffer.append(c); + } else { + parsedData.add(buffer.toString()); + buffer = new StringBuffer(); + } + break; + + default: + if (escaped) { + escaped = false; + } + buffer.append(c); + break; + } + } + parsedData.add(buffer.toString()); + + return parsedData.toArray(new String[parsedData.size()]); + } + + /** + * {@inheritDoc} + */ + @Override + public Object[] getObjectData() { + String[] fields = getFields(); + if (fields == null) { + return null; + } + + if (fields.length != schema.getColumns().size()) { + throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005, + "The data " + getTextData() + " has the wrong number of fields."); + } + + Object[] out = new Object[fields.length]; + Column[] cols = schema.getColumns().toArray(new Column[fields.length]); + for (int i = 0; i < fields.length; i++) { + Type colType = cols[i].getType(); + if (fields[i].equals("NULL")) { + out[i] = null; + continue; + } + if (colType == Type.TEXT) { + out[i] = unescapeStrings(fields[i]); + } else if (colType == Type.BINARY) { + out[i] = unescapeByteArray(fields[i]); + } else if (colType == Type.FIXED_POINT) { + Long byteSize = ((FixedPoint) cols[i]).getByteSize(); + if (byteSize != null && byteSize <= Integer.SIZE) { + out[i] = Integer.valueOf(fields[i]); + } else { + out[i] = Long.valueOf(fields[i]); + } + } else if (colType == Type.FLOATING_POINT) { + Long byteSize = ((FloatingPoint) cols[i]).getByteSize(); + if (byteSize != null && byteSize <= Float.SIZE) { + out[i] = Float.valueOf(fields[i]); + } else { + out[i] = Double.valueOf(fields[i]); + } + } else if (colType == Type.DECIMAL) { + out[i] = new BigDecimal(fields[i]); + } else { + throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0004, "Column type from schema was not recognized for " + colType); + } + } + return out; + } + + /** + * {@inheritDoc} + */ + @VisibleForTesting + @Override + public void setObjectData(Object[] data) { + escapeArray(data); + this.data = StringUtils.join(data, SEPARATOR_CHARACTER); + } + + /** + * {@inheritDoc} + */ + @Override + public void write(DataOutput out) throws IOException { + out.writeUTF(this.data); + } + + /** + * {@inheritDoc} + */ + @Override + public void read(DataInput in) throws IOException { + data = in.readUTF(); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean equals(Object other) { + if(this == other) { + return true; + } + if(other == null || !(other instanceof CSVIntermediateDataFormat)) { + return false; + } + return data.equals(((CSVIntermediateDataFormat)other).data); + } + + public int compareTo(IntermediateDataFormat o) { + if(this == o) { + return 0; + } + if(this.equals(o)) { + return 0; + } + if(!(o instanceof CSVIntermediateDataFormat)) { + throw new IllegalStateException("Expected Data to be instance of " + + "CSVIntermediateFormat, but was an instance of " + o.getClass() + .getName()); + } + return data.compareTo(o.getTextData()); + } + + /** + * If the incoming data is an array, parse it and return the CSV-ised version + * + * @param array + */ + private void escapeArray(Object[] array) { + for (int i : stringFieldIndices) { + array[i] = escapeStrings((String) array[i]); + } + for (int i : byteFieldIndices) { + array[i] = escapeByteArrays((byte[]) array[i]); + } + } + + private String escapeByteArrays(byte[] bytes) { + try { + return escapeStrings(new String(bytes, BYTE_FIELD_CHARSET)); + } catch (UnsupportedEncodingException e) { + // We should never hit this case. + // This character set should be distributed with Java. + throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, "The character set " + BYTE_FIELD_CHARSET + " is not available."); + } + } + + private String getRegExp(char orig) { + return getRegExp(String.valueOf(orig)); + } + + private String getRegExp(String orig) { + return orig.replaceAll("\\\\", Matcher.quoteReplacement("\\\\")); + } + + private String escapeStrings(String orig) { + int j = 0; + String replacement = orig; + try { + for (j = 0; j < replacements.length; j++) { + replacement = replacement.replaceAll(getRegExp(originals[j]), Matcher.quoteReplacement(replacements[j])); + } + } catch (Exception e) { + throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0002, orig + " " + replacement + " " + String.valueOf(j) + " " + e.getMessage()); + } + StringBuilder builder = new StringBuilder(); + builder.append(QUOTE_CHARACTER).append(replacement).append(QUOTE_CHARACTER); + return builder.toString(); + } + + private String unescapeStrings(String orig) { + //Remove the trailing and starting quotes. + orig = orig.substring(1, orig.length() - 1); + int j = 0; + try { + for (j = 0; j < replacements.length; j++) { + orig = orig.replaceAll(getRegExp(replacements[j]), + Matcher.quoteReplacement(String.valueOf(originals[j]))); + } + } catch (Exception e) { + throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0003, orig + " " + String.valueOf(j) + e.getMessage()); + } + + return orig; + } + + private byte[] unescapeByteArray(String orig) { + // Always encoded in BYTE_FIELD_CHARSET. + try { + return unescapeStrings(orig).getBytes(BYTE_FIELD_CHARSET); + } catch (UnsupportedEncodingException e) { + // Should never hit this case. + // This character set should be distributed with Java. + throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, "The character set " + BYTE_FIELD_CHARSET + " is not available."); + } + } + + public String toString() { + return data; + } +} diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java new file mode 100644 index 00000000..91b594ee --- /dev/null +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sqoop.connector.idf; + +import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.schema.type.Column; +import org.apache.sqoop.schema.type.Type; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Abstract class representing a pluggable intermediate data format the Sqoop + * framework will use to move data to/from the connector. All intermediate + * data formats are expected to have an internal/native implementation, + * but also should minimally be able to return a text (CSV) version of the + * data. The data format should also be able to return the data as an object + * array - each array representing one row. + *

+ * Why a "native" internal format and then return text too? + * Imagine a connector that moves data from a system that stores data as a + * serialization format called FooFormat. If I also need the data to be + * written into HDFS as FooFormat, the additional cycles burnt in converting + * the FooFormat to text and back is useless - so plugging in an intermediate + * format that can store the data as FooFormat saves those cycles! + *

+ * Most fast access mechanisms, like mysqldump or pgsqldump write the data + * out as CSV, and most often the destination data is also represented as CSV + * - so having a minimal CSV support is important, so we can easily pull the + * data out as text. + *

+ * Any conversion to the final format from the native or text format is to be + * done by the connector or OutputFormat classes. + * + * @param - Each data format may have a native representation of the + * data, represented by the parameter. + */ +public abstract class IntermediateDataFormat { + + protected volatile T data; + + public int hashCode() { + return data.hashCode(); + } + + /** + * Set one row of data. If validate is set to true, the data is validated + * against the schema. + * + * @param data - A single row of data to be moved. + */ + public void setData(T data) { + this.data = data; + } + + /** + * Get one row of data. + * + * @return - One row of data, represented in the internal/native format of + * the intermediate data format implementation. + */ + public T getData() { + return data; + } + + /** + * Get one row of data as CSV. + * + * @return - String representing the data in CSV + */ + public abstract String getTextData(); + + /** + * Set one row of data as CSV. + * + */ + public abstract void setTextData(String text); + + /** + * Get one row of data as an Object array. + * + * @return - String representing the data as an Object array + */ + public abstract Object[] getObjectData(); + + /** + * Set one row of data as an Object array. + * + */ + public abstract void setObjectData(Object[] data); + + /** + * Set the schema to be used. + * + * @param schema - the schema to be used + */ + public abstract void setSchema(Schema schema); + + /** + * Get the schema of the data. + * + * @return - The schema of the data. + */ + public abstract Schema getSchema(); + + /** + * Serialize the fields of this object to out. + * + * @param out DataOuput to serialize this object into. + * @throws IOException + */ + public abstract void write(DataOutput out) throws IOException; + + /** + * Deserialize the fields of this object from in. + * + *

For efficiency, implementations should attempt to re-use storage in the + * existing object where possible.

+ * + * @param in DataInput to deseriablize this object from. + * @throws IOException + */ + public abstract void read(DataInput in) throws IOException; +} diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java new file mode 100644 index 00000000..92190744 --- /dev/null +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sqoop.connector.idf; + +import org.apache.sqoop.common.ErrorCode; + +public enum IntermediateDataFormatError implements ErrorCode { + /** An unknown error has occurred. */ + INTERMEDIATE_DATA_FORMAT_0000("An unknown error has occurred."), + + /** An encoding is missing in the Java native libraries. */ + INTERMEDIATE_DATA_FORMAT_0001("Native character set error."), + + /** Error while escaping a row. */ + INTERMEDIATE_DATA_FORMAT_0002("An error has occurred while escaping a row."), + + /** Error while escaping a row. */ + INTERMEDIATE_DATA_FORMAT_0003("An error has occurred while unescaping a row."), + + /** Column type isn't known by Intermediate Data Format. */ + INTERMEDIATE_DATA_FORMAT_0004("Unknown column type."), + + /** Number of fields. */ + INTERMEDIATE_DATA_FORMAT_0005("Wrong number of fields.") + + ; + + private final String message; + + private IntermediateDataFormatError(String message) { + this.message = message; + } + + public String getCode() { + return name(); + } + + public String getMessage() { + return message; + } +} diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormatTest.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormatTest.java new file mode 100644 index 00000000..df6d30f2 --- /dev/null +++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormatTest.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sqoop.connector.idf; + +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.schema.type.Binary; +import org.apache.sqoop.schema.type.FixedPoint; +import org.apache.sqoop.schema.type.Text; +import org.junit.Before; +import org.junit.Test; + +import java.io.UnsupportedEncodingException; +import java.util.Arrays; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class CSVIntermediateDataFormatTest { + + private final String BYTE_FIELD_ENCODING = "ISO-8859-1"; + + private IntermediateDataFormat data; + + @Before + public void setUp() { + data = new CSVIntermediateDataFormat(); + } + + private String getByteFieldString(byte[] byteFieldData) { + try { + return new StringBuilder("'").append(new String(byteFieldData, BYTE_FIELD_ENCODING)).append("'").toString(); + } catch(UnsupportedEncodingException e) { + // Should never get to this point because ISO-8859-1 is a standard codec. + return null; + } + } + + @Test + public void testStringInStringOut() { + String testData = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54}) + + ",'" + String.valueOf(0x0A) + "'"; + data.setTextData(testData); + assertEquals(testData, data.getTextData()); + } + + @Test + public void testNullStringInObjectOut() { + Schema schema = new Schema("test"); + schema.addColumn(new FixedPoint("1")) + .addColumn(new FixedPoint("2")) + .addColumn(new Text("3")) + .addColumn(new Text("4")) + .addColumn(new Binary("5")) + .addColumn(new Text("6")); + data.setSchema(schema); + data.setTextData(null); + + Object[] out = data.getObjectData(); + + assertNull(out); + } + + @Test(expected=SqoopException.class) + public void testEmptyStringInObjectOut() { + Schema schema = new Schema("test"); + schema.addColumn(new FixedPoint("1")) + .addColumn(new FixedPoint("2")) + .addColumn(new Text("3")) + .addColumn(new Text("4")) + .addColumn(new Binary("5")) + .addColumn(new Text("6")); + data.setSchema(schema); + data.setTextData(""); + + data.getObjectData(); + } + + @Test + public void testStringInObjectOut() { + + //byte[0] = -112, byte[1] = 54 - 2's complements + String testData = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54}) + + ",'\\n'"; + Schema schema = new Schema("test"); + schema.addColumn(new FixedPoint("1")) + .addColumn(new FixedPoint("2")) + .addColumn(new Text("3")) + .addColumn(new Text("4")) + .addColumn(new Binary("5")) + .addColumn(new Text("6")); + data.setSchema(schema); + data.setTextData(testData); + + Object[] out = data.getObjectData(); + + assertEquals(new Long(10),out[0]); + assertEquals(new Long(34),out[1]); + assertEquals("54",out[2]); + assertEquals("random data",out[3]); + assertEquals(-112, ((byte[])out[4])[0]); + assertEquals(54, ((byte[])out[4])[1]); + assertEquals("\n", out[5].toString()); + } + + @Test + public void testObjectInStringOut() { + Schema schema = new Schema("test"); + schema.addColumn(new FixedPoint("1")) + .addColumn(new FixedPoint("2")) + .addColumn(new Text("3")) + .addColumn(new Text("4")) + .addColumn(new Binary("5")) + .addColumn(new Text("6")); + data.setSchema(schema); + + byte[] byteFieldData = new byte[] { (byte) 0x0D, (byte) -112, (byte) 54}; + Object[] in = new Object[6]; + in[0] = new Long(10); + in[1] = new Long(34); + in[2] = "54"; + in[3] = "random data"; + in[4] = byteFieldData; + in[5] = new String(new char[] { 0x0A }); + + data.setObjectData(in); + + //byte[0] = \r byte[1] = -112, byte[1] = 54 - 2's complements + String testData = "10,34,'54','random data'," + + getByteFieldString(byteFieldData).replaceAll("\r", "\\\\r") + ",'\\n'"; + assertEquals(testData, data.getTextData()); + } + + @Test + public void testObjectInObjectOut() { + //Test escapable sequences too. + //byte[0] = -112, byte[1] = 54 - 2's complements + Schema schema = new Schema("test"); + schema.addColumn(new FixedPoint("1")) + .addColumn(new FixedPoint("2")) + .addColumn(new Text("3")) + .addColumn(new Text("4")) + .addColumn(new Binary("5")) + .addColumn(new Text("6")); + data.setSchema(schema); + + Object[] in = new Object[6]; + in[0] = new Long(10); + in[1] = new Long(34); + in[2] = "54"; + in[3] = "random data"; + in[4] = new byte[] { (byte) -112, (byte) 54}; + in[5] = new String(new char[] { 0x0A }); + Object[] inCopy = new Object[6]; + System.arraycopy(in,0,inCopy,0,in.length); + + // Modifies the input array, so we use the copy to confirm + data.setObjectData(in); + + assertTrue(Arrays.deepEquals(inCopy, data.getObjectData())); + } + + @Test + public void testStringFullRangeOfCharacters() { + Schema schema = new Schema("test"); + schema.addColumn(new Text("1")); + data.setSchema(schema); + + char[] allCharArr = new char[256]; + for(int i = 0; i < allCharArr.length; ++i) { + allCharArr[i] = (char)i; + } + String strData = new String(allCharArr); + + Object[] in = {strData}; + Object[] inCopy = new Object[1]; + System.arraycopy(in,0,inCopy,0,in.length); + + // Modifies the input array, so we use the copy to confirm + data.setObjectData(in); + + assertEquals(strData, data.getObjectData()[0]); + assertTrue(Arrays.deepEquals(inCopy, data.getObjectData())); + } + + @Test + public void testByteArrayFullRangeOfCharacters() { + Schema schema = new Schema("test"); + schema.addColumn(new Binary("1")); + data.setSchema(schema); + + byte[] allCharByteArr = new byte[256]; + for(int i = 0; i < allCharByteArr.length; ++i) { + allCharByteArr[i] = (byte)i; + } + + Object[] in = {allCharByteArr}; + Object[] inCopy = new Object[1]; + System.arraycopy(in,0,inCopy,0,in.length); + + // Modifies the input array, so we use the copy to confirm + data.setObjectData(in); + assertTrue(Arrays.deepEquals(inCopy, data.getObjectData())); + } +} diff --git a/core/src/main/java/org/apache/sqoop/framework/JobManager.java b/core/src/main/java/org/apache/sqoop/framework/JobManager.java index e0525846..1700432c 100644 --- a/core/src/main/java/org/apache/sqoop/framework/JobManager.java +++ b/core/src/main/java/org/apache/sqoop/framework/JobManager.java @@ -22,6 +22,7 @@ import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.ConnectorManager; import org.apache.sqoop.request.HttpEventContext; +import org.apache.sqoop.connector.idf.IntermediateDataFormat; import org.apache.sqoop.connector.spi.SqoopConnector; import org.apache.sqoop.core.Reconfigurable; import org.apache.sqoop.core.SqoopConfiguration; @@ -327,6 +328,10 @@ public MSubmission submit(long jobId, HttpEventContext ctx) { request.setJobName(job.getName()); request.setJobId(job.getPersistenceId()); request.setNotificationUrl(notificationBaseUrl + jobId); + Class> dataFormatClass = + connector.getIntermediateDataFormat(); + request.setIntermediateDataFormat(connector.getIntermediateDataFormat()); + // Create request object // Let's register all important jars // sqoop-common @@ -343,6 +348,9 @@ public MSubmission submit(long jobId, HttpEventContext ctx) { // Extra libraries that Sqoop code requires request.addJarForClass(JSONValue.class); + // The IDF is used in the ETL process. + request.addJarForClass(dataFormatClass); + // Get connector callbacks switch (job.getType()) { case IMPORT: diff --git a/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java b/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java index a138db54..7900eee0 100644 --- a/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java +++ b/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java @@ -18,6 +18,7 @@ package org.apache.sqoop.framework; import org.apache.sqoop.common.MutableMapContext; +import org.apache.sqoop.connector.idf.IntermediateDataFormat; import org.apache.sqoop.connector.spi.SqoopConnector; import org.apache.sqoop.job.etl.CallbackBase; import org.apache.sqoop.model.MJob; @@ -107,6 +108,11 @@ public class SubmissionRequest { */ Integer loaders; + /** + * The intermediate data format this submission should use. + */ + Class intermediateDataFormat; + public SubmissionRequest() { this.jars = new LinkedList(); this.connectorContext = new MutableMapContext(); @@ -252,4 +258,13 @@ public Integer getLoaders() { public void setLoaders(Integer loaders) { this.loaders = loaders; } + + public Class getIntermediateDataFormat() { + return intermediateDataFormat; + } + + public void setIntermediateDataFormat(Class intermediateDataFormat) { + this.intermediateDataFormat = intermediateDataFormat; + } + } diff --git a/execution/mapreduce/pom.xml b/execution/mapreduce/pom.xml index f9a2a0e5..b23b9055 100644 --- a/execution/mapreduce/pom.xml +++ b/execution/mapreduce/pom.xml @@ -52,6 +52,10 @@ limitations under the License. mockito-all test + + com.google.guava + guava + diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java index 5c0a0273..84f62135 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java @@ -34,7 +34,7 @@ import org.apache.sqoop.job.etl.HdfsSequenceImportLoader; import org.apache.sqoop.job.etl.HdfsTextImportLoader; import org.apache.sqoop.job.etl.Importer; -import org.apache.sqoop.job.io.Data; +import org.apache.sqoop.job.io.SqoopWritable; import org.apache.sqoop.job.mr.SqoopFileOutputFormat; import org.apache.sqoop.job.mr.SqoopInputFormat; import org.apache.sqoop.job.mr.SqoopMapper; @@ -53,14 +53,7 @@ public SubmissionRequest createSubmissionRequest() { return new MRSubmissionRequest(); } - /** - * {@inheritDoc} - */ - @Override - public void prepareImportSubmission(SubmissionRequest gRequest) { - MRSubmissionRequest request = (MRSubmissionRequest) gRequest; - ImportJobConfiguration jobConf = (ImportJobConfiguration) request.getConfigFrameworkJob(); - + public void prepareSubmission(MRSubmissionRequest request) { // Add jar dependencies addDependencies(request); @@ -68,13 +61,35 @@ public void prepareImportSubmission(SubmissionRequest gRequest) { request.setInputFormatClass(SqoopInputFormat.class); request.setMapperClass(SqoopMapper.class); - request.setMapOutputKeyClass(Data.class); + request.setMapOutputKeyClass(SqoopWritable.class); request.setMapOutputValueClass(NullWritable.class); - request.setOutputFormatClass(SqoopFileOutputFormat.class); - request.setOutputKeyClass(Data.class); + request.setOutputFormatClass(SqoopNullOutputFormat.class); + request.setOutputKeyClass(SqoopWritable.class); request.setOutputValueClass(NullWritable.class); + // Set up framework context + MutableMapContext context = request.getFrameworkContext(); + context.setString(JobConstants.INTERMEDIATE_DATA_FORMAT, + request.getIntermediateDataFormat().getName()); + + if(request.getExtractors() != null) { + context.setInteger(JobConstants.JOB_ETL_EXTRACTOR_NUM, request.getExtractors()); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void prepareImportSubmission(SubmissionRequest gRequest) { + MRSubmissionRequest request = (MRSubmissionRequest) gRequest; + + prepareSubmission(request); + request.setOutputFormatClass(SqoopFileOutputFormat.class); + + ImportJobConfiguration jobConf = (ImportJobConfiguration) request.getConfigFrameworkJob(); + Importer importer = (Importer)request.getConnectorCallbacks(); // Set up framework context @@ -83,10 +98,6 @@ public void prepareImportSubmission(SubmissionRequest gRequest) { context.setString(JobConstants.JOB_ETL_EXTRACTOR, importer.getExtractor().getName()); context.setString(JobConstants.JOB_ETL_DESTROYER, importer.getDestroyer().getName()); - if(request.getExtractors() != null) { - context.setInteger(JobConstants.JOB_ETL_EXTRACTOR_NUM, request.getExtractors()); - } - // TODO: This settings should be abstracted to core module at some point if(jobConf.output.outputFormat == OutputFormat.TEXT_FILE) { context.setString(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName()); @@ -137,19 +148,7 @@ public void prepareExportSubmission(SubmissionRequest gRequest) { MRSubmissionRequest request = (MRSubmissionRequest) gRequest; ExportJobConfiguration jobConf = (ExportJobConfiguration) request.getConfigFrameworkJob(); - // Add jar dependencies - addDependencies(request); - - // Configure map-reduce classes for import - request.setInputFormatClass(SqoopInputFormat.class); - - request.setMapperClass(SqoopMapper.class); - request.setMapOutputKeyClass(Data.class); - request.setMapOutputValueClass(NullWritable.class); - - request.setOutputFormatClass(SqoopNullOutputFormat.class); - request.setOutputKeyClass(Data.class); - request.setOutputValueClass(NullWritable.class); + prepareSubmission(request); Exporter exporter = (Exporter)request.getConnectorCallbacks(); @@ -162,10 +161,6 @@ public void prepareExportSubmission(SubmissionRequest gRequest) { // Extractor that will be able to read all supported file types context.setString(JobConstants.JOB_ETL_EXTRACTOR, HdfsExportExtractor.class.getName()); context.setString(JobConstants.HADOOP_INPUTDIR, jobConf.input.inputDirectory); - - if(request.getExtractors() != null) { - context.setInteger(JobConstants.JOB_ETL_EXTRACTOR_NUM, request.getExtractors()); - } } /** diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java index 7fd9a017..b2fa15d5 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java @@ -18,6 +18,7 @@ package org.apache.sqoop.job; import org.apache.sqoop.core.ConfigurationConstants; +import org.apache.sqoop.framework.FrameworkConstants; public final class JobConstants extends Constants { /** @@ -66,6 +67,9 @@ public final class JobConstants extends Constants { public static final String HADOOP_COMPRESS_CODEC = "mapred.output.compression.codec"; + public static final String INTERMEDIATE_DATA_FORMAT = + FrameworkConstants.PREFIX_EXECUTION_CONFIG + "intermediate.format"; + private JobConstants() { // Disable explicit object creation } diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java index 1978ec61..43e6463a 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java @@ -36,7 +36,6 @@ import org.apache.sqoop.framework.configuration.ExportJobConfiguration; import org.apache.sqoop.job.MapreduceExecutionError; import org.apache.sqoop.job.PrefixContext; -import org.apache.sqoop.job.io.Data; /** * Extract from HDFS. @@ -50,12 +49,6 @@ public class HdfsExportExtractor extends Extractor= end && filereader.syncSeen()) { @@ -173,7 +165,7 @@ private void extractTextFile(Path file, long start, long length) next = fileseeker.getPos(); } rowRead++; - dataWriter.writeCsvRecord(line.toString()); + dataWriter.writeStringRecord(line.toString()); } LOG.info("Extracting ended on position: " + fileseeker.getPos()); filestream.close(); diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java index a07c5111..d4ffb130 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java @@ -30,7 +30,6 @@ import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.job.JobConstants; import org.apache.sqoop.job.MapreduceExecutionError; -import org.apache.sqoop.job.io.Data; import org.apache.sqoop.etl.io.DataReader; import org.apache.sqoop.utils.ClassUtils; @@ -38,16 +37,9 @@ public class HdfsSequenceImportLoader extends Loader { public static final String EXTENSION = ".seq"; - private final char fieldDelimiter; - - public HdfsSequenceImportLoader() { - fieldDelimiter = Data.DEFAULT_FIELD_DELIMITER; - } - @Override public void load(LoaderContext context, Object oc, Object oj) throws Exception { DataReader reader = context.getDataReader(); - reader.setFieldDelimiter(fieldDelimiter); Configuration conf = new Configuration(); // Configuration conf = ((EtlContext)context).getConfiguration(); @@ -87,7 +79,7 @@ public void load(LoaderContext context, Object oc, Object oj) throws Exception { String csv; Text text = new Text(); - while ((csv = reader.readCsvRecord()) != null) { + while ((csv = reader.readTextRecord()) != null) { text.set(csv); filewriter.append(text, NullWritable.get()); } diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java index 46219423..7b799ca6 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.OutputStreamWriter; +import com.google.common.base.Charsets; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -36,18 +37,15 @@ public class HdfsTextImportLoader extends Loader { - private final char fieldDelimiter; private final char recordDelimiter; public HdfsTextImportLoader() { - fieldDelimiter = Data.DEFAULT_FIELD_DELIMITER; recordDelimiter = Data.DEFAULT_RECORD_DELIMITER; } @Override public void load(LoaderContext context, Object oc, Object oj) throws Exception{ DataReader reader = context.getDataReader(); - reader.setFieldDelimiter(fieldDelimiter); Configuration conf = new Configuration(); // Configuration conf = ((EtlContext)context).getConfiguration(); @@ -81,15 +79,15 @@ public void load(LoaderContext context, Object oc, Object oj) throws Exception{ DataOutputStream filestream = fs.create(filepath, false); if (codec != null) { filewriter = new BufferedWriter(new OutputStreamWriter( - codec.createOutputStream(filestream, codec.createCompressor()), - Data.CHARSET_NAME)); + codec.createOutputStream(filestream, codec.createCompressor()), + Charsets.UTF_8)); } else { filewriter = new BufferedWriter(new OutputStreamWriter( - filestream, Data.CHARSET_NAME)); + filestream, Charsets.UTF_8)); } String csv; - while ((csv = reader.readCsvRecord()) != null) { + while ((csv = reader.readTextRecord()) != null) { filewriter.write(csv + recordDelimiter); } filewriter.close(); diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/SqoopWritable.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/SqoopWritable.java new file mode 100644 index 00000000..ed118d2c --- /dev/null +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/SqoopWritable.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sqoop.job.io; + +import org.apache.hadoop.io.WritableComparable; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +public class SqoopWritable implements WritableComparable { + private String strData; + + public SqoopWritable() {} + + public void setString(String data) { + strData = data; + } + + public String getString() { + return strData; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeUTF(strData); + } + + @Override + public void readFields(DataInput in) throws IOException { + strData = in.readUTF(); + } + + @Override + public int compareTo(SqoopWritable o) { + return strData.compareTo(o.getString()); + } + + @Override + public String toString() { + return getString(); + } +} diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java index 356ae8ab..bbf73428 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java @@ -34,13 +34,13 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.log4j.Logger; import org.apache.sqoop.job.JobConstants; -import org.apache.sqoop.job.io.Data; +import org.apache.sqoop.job.io.SqoopWritable; /** * An output format for MapReduce job. */ public class SqoopFileOutputFormat - extends FileOutputFormat { + extends FileOutputFormat { public static final Logger LOG = Logger.getLogger(SqoopFileOutputFormat.class); @@ -49,7 +49,7 @@ public class SqoopFileOutputFormat DefaultCodec.class; @Override - public RecordWriter getRecordWriter( + public RecordWriter getRecordWriter( TaskAttemptContext context) throws IOException { Configuration conf = context.getConfiguration(); @@ -69,6 +69,7 @@ public RecordWriter getRecordWriter( return executor.getRecordWriter(); } + @Override public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException { Path output = getOutputPath(context); return new DestroyerFileOutputCommitter(output, context); diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java index 92de37e1..645dbc62 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java @@ -27,21 +27,22 @@ import org.apache.hadoop.mapreduce.Mapper; import org.apache.log4j.Logger; import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.connector.idf.IntermediateDataFormat; import org.apache.sqoop.job.JobConstants; import org.apache.sqoop.job.MapreduceExecutionError; import org.apache.sqoop.job.PrefixContext; import org.apache.sqoop.job.etl.Extractor; import org.apache.sqoop.job.etl.ExtractorContext; -import org.apache.sqoop.job.io.Data; import org.apache.sqoop.etl.io.DataWriter; import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.job.io.SqoopWritable; import org.apache.sqoop.submission.counter.SqoopCounters; import org.apache.sqoop.utils.ClassUtils; /** * A mapper to perform map function. */ -public class SqoopMapper extends Mapper { +public class SqoopMapper extends Mapper { static { ConfigurationUtils.configureLogging(); @@ -52,6 +53,8 @@ public class SqoopMapper extends Mapper { +public class SqoopNullOutputFormat extends OutputFormat { public static final Logger LOG = Logger.getLogger(SqoopNullOutputFormat.class); @@ -46,7 +46,7 @@ public void checkOutputSpecs(JobContext context) { } @Override - public RecordWriter getRecordWriter( + public RecordWriter getRecordWriter( TaskAttemptContext context) { SqoopOutputFormatLoadExecutor executor = new SqoopOutputFormatLoadExecutor(context); diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java index 7dedee9f..6efadf68 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java @@ -31,14 +31,16 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.log4j.Logger; import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; +import org.apache.sqoop.connector.idf.IntermediateDataFormat; import org.apache.sqoop.job.JobConstants; import org.apache.sqoop.job.MapreduceExecutionError; import org.apache.sqoop.job.PrefixContext; import org.apache.sqoop.job.etl.Loader; import org.apache.sqoop.job.etl.LoaderContext; -import org.apache.sqoop.job.io.Data; import org.apache.sqoop.etl.io.DataReader; import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.job.io.SqoopWritable; import org.apache.sqoop.utils.ClassUtils; public class SqoopOutputFormatLoadExecutor { @@ -48,7 +50,7 @@ public class SqoopOutputFormatLoadExecutor { private volatile boolean readerFinished = false; private volatile boolean writerFinished = false; - private volatile Data data; + private volatile IntermediateDataFormat data; private JobContext context; private SqoopRecordWriter producer; private Future consumerFuture; @@ -60,17 +62,19 @@ public class SqoopOutputFormatLoadExecutor { SqoopOutputFormatLoadExecutor(boolean isTest, String loaderName){ this.isTest = isTest; this.loaderName = loaderName; - data = new Data(); + data = new CSVIntermediateDataFormat(); producer = new SqoopRecordWriter(); } public SqoopOutputFormatLoadExecutor(JobContext jobctx) { - data = new Data(); context = jobctx; producer = new SqoopRecordWriter(); + data = (IntermediateDataFormat) ClassUtils.instantiate(context + .getConfiguration().get(JobConstants.INTERMEDIATE_DATA_FORMAT)); + data.setSchema(ConfigurationUtils.getConnectorSchema(context.getConfiguration())); } - public RecordWriter getRecordWriter() { + public RecordWriter getRecordWriter() { consumerFuture = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat ("OutputFormatLoader-consumer").build()).submit( new ConsumerThread()); @@ -81,14 +85,13 @@ public RecordWriter getRecordWriter() { * This is a producer-consumer problem and can be solved * with two semaphores. */ - private class SqoopRecordWriter extends RecordWriter { + private class SqoopRecordWriter extends RecordWriter { @Override - public void write(Data key, NullWritable value) throws InterruptedException { + public void write(SqoopWritable key, NullWritable value) throws InterruptedException { free.acquire(); checkIfConsumerThrew(); - int type = key.getType(); - data.setContent(key.getContent(type), type); + data.setTextData(key.getString()); filled.release(); } @@ -135,23 +138,53 @@ private void waitForConsumer() { } private class OutputFormatDataReader extends DataReader { - @Override - public void setFieldDelimiter(char fieldDelimiter) { - data.setFieldDelimiter(fieldDelimiter); - } @Override public Object[] readArrayRecord() throws InterruptedException { - return (Object[])readContent(Data.ARRAY_RECORD); + acquireSema(); + // If the writer has finished, there is definitely no data remaining + if (writerFinished) { + return null; + } + try { + return data.getObjectData(); + } finally { + releaseSema(); + } } @Override - public String readCsvRecord() throws InterruptedException { - return (String)readContent(Data.CSV_RECORD); + public String readTextRecord() throws InterruptedException { + acquireSema(); + // If the writer has finished, there is definitely no data remaining + if (writerFinished) { + return null; + } + try { + return data.getTextData(); + } finally { + releaseSema(); + } } @Override - public Object readContent(int type) throws InterruptedException { + public Object readContent() throws InterruptedException { + acquireSema(); + if (writerFinished) { + return null; + } + try { + return data.getData(); + } catch (Throwable t) { + readerFinished = true; + LOG.error("Caught exception e while getting content ", t); + throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0018, t); + } finally { + releaseSema(); + } + } + + private void acquireSema() throws InterruptedException { // Has any more data been produced after I last consumed. // If no, wait for the producer to produce. try { @@ -159,23 +192,13 @@ public Object readContent(int type) throws InterruptedException { } catch (InterruptedException ex) { //Really at this point, there is nothing to do. Just throw and get out LOG.error("Interrupted while waiting for data to be available from " + - "mapper", ex); + "mapper", ex); throw ex; } - // If the writer has finished, there is definitely no data remaining - if (writerFinished) { - return null; - } - try { - Object content = data.getContent(type); - return content; - } catch (Throwable t) { - readerFinished = true; - LOG.error("Caught exception e while getting content ", t); - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0018, t); - } finally { - free.release(); - } + } + + private void releaseSema(){ + free.release(); } } diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java index 98a2c518..a55534a3 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java @@ -20,7 +20,7 @@ import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; import org.apache.log4j.Logger; -import org.apache.sqoop.job.io.Data; +import org.apache.sqoop.job.io.SqoopWritable; import java.io.IOException; import java.util.concurrent.Executors; @@ -30,7 +30,7 @@ /** * A reducer to perform reduce function. */ -public class SqoopReducer extends Reducer { +public class SqoopReducer extends Reducer { static { ConfigurationUtils.configureLogging(); diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngineTest.java b/execution/mapreduce/src/test/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngineTest.java index 39d1b53a..a8493944 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngineTest.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngineTest.java @@ -18,6 +18,7 @@ package org.apache.sqoop.execution.mapreduce; import org.apache.sqoop.common.MutableMapContext; +import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; import org.apache.sqoop.framework.SubmissionRequest; import org.apache.sqoop.framework.configuration.ImportJobConfiguration; import org.apache.sqoop.framework.configuration.OutputCompression; @@ -71,6 +72,7 @@ private void testImportCompressionInner(OutputCompression comprssionFormat, request.setConnectorCallbacks(new Importer(Initializer.class, Partitioner.class, Extractor.class, Destroyer.class) { }); + request.setIntermediateDataFormat(CSVIntermediateDataFormat.class); executionEngine.prepareImportSubmission(request); MutableMapContext context = request.getFrameworkContext(); @@ -97,6 +99,7 @@ public void testCustomCompression() { request.setConnectorCallbacks(new Importer(Initializer.class, Partitioner.class, Extractor.class, Destroyer.class) { }); + request.setIntermediateDataFormat(CSVIntermediateDataFormat.class); executionEngine.prepareImportSubmission(request); MutableMapContext context = request.getFrameworkContext(); diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java index e21f15b8..09e5ec58 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java @@ -27,7 +27,7 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.sqoop.job.io.Data; +import org.apache.sqoop.job.io.SqoopWritable; import org.apache.sqoop.job.mr.SqoopFileOutputFormat; import org.apache.sqoop.job.mr.SqoopInputFormat; import org.apache.sqoop.job.mr.SqoopMapper; @@ -44,17 +44,17 @@ public static void runJob(Configuration conf) } public static void runJob(Configuration conf, - Class> input, - Class> mapper, - Class> output) - throws IOException, InterruptedException, ClassNotFoundException { + Class> input, + Class> mapper, + Class> output) + throws IOException, InterruptedException, ClassNotFoundException { Job job = new Job(conf); job.setInputFormatClass(input); job.setMapperClass(mapper); - job.setMapOutputKeyClass(Data.class); + job.setMapOutputKeyClass(SqoopWritable.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputFormatClass(output); - job.setOutputKeyClass(Data.class); + job.setOutputKeyClass(SqoopWritable.class); job.setOutputValueClass(NullWritable.class); boolean success = job.waitForCompletion(true); diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java index b7079dd4..8061c783 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java @@ -30,10 +30,12 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.BZip2Codec; import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.mapreduce.Job; +import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; import org.apache.sqoop.job.etl.HdfsExportExtractor; import org.apache.sqoop.job.etl.HdfsExportPartitioner; import org.apache.sqoop.job.etl.HdfsSequenceImportLoader; @@ -45,6 +47,9 @@ import org.apache.sqoop.job.mr.ConfigurationUtils; import org.apache.sqoop.job.mr.SqoopFileOutputFormat; import org.apache.sqoop.model.MJob; +import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.schema.type.FixedPoint; +import org.apache.sqoop.schema.type.FloatingPoint; import org.junit.Test; public class TestHdfsExtract extends TestCase { @@ -53,12 +58,22 @@ public class TestHdfsExtract extends TestCase { private static final int NUMBER_OF_FILES = 5; private static final int NUMBER_OF_ROWS_PER_FILE = 1000; - private String indir; + private final String indir; public TestHdfsExtract() { indir = INPUT_ROOT + getClass().getSimpleName(); } + @Override + public void setUp() throws IOException { + FileUtils.mkdirs(indir); + } + + @Override + public void tearDown() throws IOException { + FileUtils.delete(indir); + } + /** * Test case for validating the number of partitions creation * based on input. @@ -68,12 +83,12 @@ public TestHdfsExtract() { */ @Test public void testHdfsExportPartitioner() throws Exception { - FileUtils.delete(indir); - FileUtils.mkdirs(indir); createTextInput(null); Configuration conf = new Configuration(); conf.set(JobConstants.HADOOP_INPUTDIR, indir); + conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT, + CSVIntermediateDataFormat.class.getName()); HdfsExportPartitioner partitioner = new HdfsExportPartitioner(); PrefixContext prefixContext = new PrefixContext(conf, ""); int[] partitionValues = {2, 3, 4, 5, 7, 8, 9, 10, 11, 12, 13, 17}; @@ -87,87 +102,67 @@ public void testHdfsExportPartitioner() throws Exception { @Test public void testUncompressedText() throws Exception { - FileUtils.delete(indir); - FileUtils.mkdirs(indir); createTextInput(null); - Configuration conf = new Configuration(); - ConfigurationUtils.setJobType(conf, MJob.Type.EXPORT); - conf.set(JobConstants.JOB_ETL_PARTITIONER, - HdfsExportPartitioner.class.getName()); - conf.set(JobConstants.JOB_ETL_EXTRACTOR, - HdfsExportExtractor.class.getName()); - conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName()); - conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4"); - conf.set(JobConstants.HADOOP_INPUTDIR, indir); - JobUtils.runJob(conf); + JobUtils.runJob(createJob(createConf(), createSchema()).getConfiguration()); } @Test - public void testCompressedText() throws Exception { - FileUtils.delete(indir); - FileUtils.mkdirs(indir); + public void testDefaultCompressedText() throws Exception { createTextInput(SqoopFileOutputFormat.DEFAULT_CODEC); - Configuration conf = new Configuration(); - ConfigurationUtils.setJobType(conf, MJob.Type.EXPORT); - conf.set(JobConstants.JOB_ETL_PARTITIONER, - HdfsExportPartitioner.class.getName()); - conf.set(JobConstants.JOB_ETL_EXTRACTOR, - HdfsExportExtractor.class.getName()); - conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName()); - conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4"); - conf.set(JobConstants.HADOOP_INPUTDIR, indir); - JobUtils.runJob(conf); - - FileUtils.delete(indir); - FileUtils.mkdirs(indir); - createTextInput(BZip2Codec.class); - - conf.set(JobConstants.JOB_ETL_PARTITIONER, - HdfsExportPartitioner.class.getName()); - conf.set(JobConstants.JOB_ETL_EXTRACTOR, - HdfsExportExtractor.class.getName()); - conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName()); - conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4"); - conf.set(JobConstants.HADOOP_INPUTDIR, indir); - JobUtils.runJob(conf); + JobUtils.runJob(createJob(createConf(), createSchema()).getConfiguration()); } @Test - public void testCompressedSequence() throws Exception { - FileUtils.delete(indir); - FileUtils.mkdirs(indir); + public void testBZip2CompressedText() throws Exception { + createTextInput(BZip2Codec.class); + + JobUtils.runJob(createJob(createConf(), createSchema()).getConfiguration()); + } + + @Test + public void testDefaultCompressedSequence() throws Exception { createSequenceInput(SqoopFileOutputFormat.DEFAULT_CODEC); - Configuration conf = new Configuration(); - ConfigurationUtils.setJobType(conf, MJob.Type.EXPORT); - conf.set(JobConstants.JOB_ETL_PARTITIONER, - HdfsExportPartitioner.class.getName()); - conf.set(JobConstants.JOB_ETL_EXTRACTOR, - HdfsExportExtractor.class.getName()); - conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName()); - conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4"); - conf.set(JobConstants.HADOOP_INPUTDIR, indir); - JobUtils.runJob(conf); + JobUtils.runJob(createJob(createConf(), createSchema()).getConfiguration()); } @Test public void testUncompressedSequence() throws Exception { - FileUtils.delete(indir); - FileUtils.mkdirs(indir); createSequenceInput(null); + JobUtils.runJob(createJob(createConf(), createSchema()).getConfiguration()); + } + + private Schema createSchema() { + Schema schema = new Schema("Test"); + schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2")) + .addColumn(new org.apache.sqoop.schema.type.Text("3")); + return schema; + } + + private Configuration createConf() { Configuration conf = new Configuration(); ConfigurationUtils.setJobType(conf, MJob.Type.EXPORT); - conf.set(JobConstants.JOB_ETL_PARTITIONER, + conf.setIfUnset(JobConstants.JOB_ETL_PARTITIONER, HdfsExportPartitioner.class.getName()); - conf.set(JobConstants.JOB_ETL_EXTRACTOR, + conf.setIfUnset(JobConstants.JOB_ETL_EXTRACTOR, HdfsExportExtractor.class.getName()); - conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName()); - conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4"); - conf.set(JobConstants.HADOOP_INPUTDIR, indir); - JobUtils.runJob(conf); + conf.setIfUnset(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName()); + conf.setIfUnset(Constants.JOB_ETL_NUMBER_PARTITIONS, "4"); + conf.setIfUnset(JobConstants.INTERMEDIATE_DATA_FORMAT, + CSVIntermediateDataFormat.class.getName()); + conf.setIfUnset(JobConstants.HADOOP_INPUTDIR, indir); + return conf; + } + + private Job createJob(Configuration conf, Schema schema) throws Exception { + Job job = new Job(conf); + ConfigurationUtils.setConnectorSchema(job, schema); + job.getConfiguration().set(JobConstants.INTERMEDIATE_DATA_FORMAT, + CSVIntermediateDataFormat.class.getName()); + return job; } private void createTextInput(Class clz) @@ -227,11 +222,11 @@ private void createSequenceInput(Class clz) SequenceFile.Writer filewriter; if (codec != null) { filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf), - conf, filepath, Text.class, NullWritable.class, - CompressionType.BLOCK, codec); + conf, filepath, Text.class, NullWritable.class, + CompressionType.BLOCK, codec); } else { filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf), - conf, filepath, Text.class, NullWritable.class, CompressionType.NONE); + conf, filepath, Text.class, NullWritable.class, CompressionType.NONE); } Text text = new Text(); diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java index f849aaef..721bba6f 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java @@ -26,6 +26,7 @@ import java.util.LinkedList; import java.util.List; +import com.google.common.base.Charsets; import junit.framework.TestCase; import org.apache.hadoop.conf.Configuration; @@ -33,7 +34,9 @@ import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; import org.apache.sqoop.job.etl.Extractor; import org.apache.sqoop.job.etl.ExtractorContext; import org.apache.sqoop.job.etl.HdfsSequenceImportLoader; @@ -45,6 +48,9 @@ import org.apache.sqoop.job.mr.ConfigurationUtils; import org.apache.sqoop.job.mr.SqoopFileOutputFormat; import org.apache.sqoop.model.MJob; +import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.schema.type.FixedPoint; +import org.apache.sqoop.schema.type.FloatingPoint; public class TestHdfsLoad extends TestCase { @@ -68,13 +74,21 @@ public void testUncompressedText() throws Exception { conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); conf.set(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName()); + conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT, + CSVIntermediateDataFormat.class.getName()); conf.set(JobConstants.HADOOP_OUTDIR, outdir); - JobUtils.runJob(conf); + Schema schema = new Schema("Test"); + schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2")) + .addColumn(new org.apache.sqoop.schema.type.Text("3")); + + Job job = new Job(conf); + ConfigurationUtils.setConnectorSchema(job, schema); + JobUtils.runJob(job.getConfiguration()); String fileName = outdir + "/" + OUTPUT_FILE; InputStream filestream = FileUtils.open(fileName); BufferedReader filereader = new BufferedReader(new InputStreamReader( - filestream, Data.CHARSET_NAME)); + filestream, Charsets.UTF_8)); verifyOutputText(filereader); } @@ -86,9 +100,18 @@ public void testCompressedText() throws Exception { conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); conf.set(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName()); + conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT, + CSVIntermediateDataFormat.class.getName()); conf.set(JobConstants.HADOOP_OUTDIR, outdir); conf.setBoolean(JobConstants.HADOOP_COMPRESS, true); - JobUtils.runJob(conf); + + Schema schema = new Schema("Test"); + schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2")) + .addColumn(new org.apache.sqoop.schema.type.Text("3")); + + Job job = new Job(conf); + ConfigurationUtils.setConnectorSchema(job, schema); + JobUtils.runJob(job.getConfiguration()); Class codecClass = conf.getClass( JobConstants.HADOOP_COMPRESS_CODEC, SqoopFileOutputFormat.DEFAULT_CODEC) @@ -97,7 +120,7 @@ public void testCompressedText() throws Exception { String fileName = outdir + "/" + OUTPUT_FILE + codec.getDefaultExtension(); InputStream filestream = codec.createInputStream(FileUtils.open(fileName)); BufferedReader filereader = new BufferedReader(new InputStreamReader( - filestream, Data.CHARSET_NAME)); + filestream, Charsets.UTF_8)); verifyOutputText(filereader); } @@ -108,7 +131,7 @@ private void verifyOutputText(BufferedReader reader) throws IOException { int index = START_ID*NUMBER_OF_ROWS_PER_ID; while ((actual = reader.readLine()) != null){ data.setContent(new Object[] { - index, (double) index, String.valueOf(index) }, + index, (double) index, new String(new byte[] {(byte)(index + 127)}, Charsets.ISO_8859_1) }, Data.ARRAY_RECORD); expected = data.toString(); index++; @@ -129,8 +152,17 @@ public void testUncompressedSequence() throws Exception { conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); conf.set(JobConstants.JOB_ETL_LOADER, HdfsSequenceImportLoader.class.getName()); + conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT, + CSVIntermediateDataFormat.class.getName()); conf.set(JobConstants.HADOOP_OUTDIR, outdir); - JobUtils.runJob(conf); + + Schema schema = new Schema("Test"); + schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2")) + .addColumn(new org.apache.sqoop.schema.type.Text("3")); + + Job job = new Job(conf); + ConfigurationUtils.setConnectorSchema(job, schema); + JobUtils.runJob(job.getConfiguration()); Path filepath = new Path(outdir, OUTPUT_FILE + HdfsSequenceImportLoader.EXTENSION); @@ -147,10 +179,18 @@ public void testCompressedSequence() throws Exception { conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); conf.set(JobConstants.JOB_ETL_LOADER, HdfsSequenceImportLoader.class.getName()); + conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT, + CSVIntermediateDataFormat.class.getName()); conf.set(JobConstants.HADOOP_OUTDIR, outdir); conf.setBoolean(JobConstants.HADOOP_COMPRESS, true); - JobUtils.runJob(conf); + Schema schema = new Schema("Test"); + schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2")) + .addColumn(new org.apache.sqoop.schema.type.Text("3")); + + Job job = new Job(conf); + ConfigurationUtils.setConnectorSchema(job, schema); + JobUtils.runJob(job.getConfiguration()); Path filepath = new Path(outdir, OUTPUT_FILE + HdfsSequenceImportLoader.EXTENSION); SequenceFile.Reader filereader = new SequenceFile.Reader(filepath.getFileSystem(conf), filepath, conf); @@ -164,7 +204,7 @@ private void verifyOutputSequence(SequenceFile.Reader reader) throws IOException Data data = new Data(); while (reader.next(actual)){ data.setContent(new Object[] { - index, (double) index, String.valueOf(index) }, + index, (double) index, new String(new byte[] {(byte)(index + 127)}, Charsets.ISO_8859_1) }, Data.ARRAY_RECORD); expected.set(data.toString()); index++; @@ -225,7 +265,7 @@ public void extract(ExtractorContext context, Object oc, Object oj, Object parti Object[] array = new Object[] { id * NUMBER_OF_ROWS_PER_ID + row, (double) (id * NUMBER_OF_ROWS_PER_ID + row), - String.valueOf(id*NUMBER_OF_ROWS_PER_ID+row) + new String(new byte[]{(byte)(id * NUMBER_OF_ROWS_PER_ID + row + 127)}, Charsets.ISO_8859_1) }; context.getDataWriter().writeArrayRecord(array); } diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java index 7b264c68..ba16b3c8 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java @@ -34,6 +34,7 @@ import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; import org.apache.sqoop.job.etl.Extractor; import org.apache.sqoop.job.etl.ExtractorContext; import org.apache.sqoop.job.etl.Loader; @@ -42,12 +43,17 @@ import org.apache.sqoop.job.etl.Partitioner; import org.apache.sqoop.job.etl.PartitionerContext; import org.apache.sqoop.job.io.Data; +import org.apache.sqoop.job.io.SqoopWritable; import org.apache.sqoop.job.mr.ConfigurationUtils; import org.apache.sqoop.job.mr.SqoopInputFormat; import org.apache.sqoop.job.mr.SqoopMapper; import org.apache.sqoop.job.mr.SqoopNullOutputFormat; import org.apache.sqoop.job.mr.SqoopSplit; import org.apache.sqoop.model.MJob; +import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.schema.type.FixedPoint; +import org.apache.sqoop.schema.type.FloatingPoint; +import org.apache.sqoop.schema.type.Text; public class TestMapReduce extends TestCase { @@ -59,6 +65,8 @@ public void testInputFormat() throws Exception { Configuration conf = new Configuration(); ConfigurationUtils.setJobType(conf, MJob.Type.IMPORT); conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); + conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT, + CSVIntermediateDataFormat.class.getName()); Job job = new Job(conf); SqoopInputFormat inputformat = new SqoopInputFormat(); @@ -77,8 +85,15 @@ public void testMapper() throws Exception { ConfigurationUtils.setJobType(conf, MJob.Type.IMPORT); conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); + conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT, + CSVIntermediateDataFormat.class.getName()); + Schema schema = new Schema("Test"); + schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2")) + .addColumn(new org.apache.sqoop.schema.type.Text("3")); - JobUtils.runJob(conf, SqoopInputFormat.class, SqoopMapper.class, + Job job = new Job(conf); + ConfigurationUtils.setConnectorSchema(job, schema); + JobUtils.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class, DummyOutputFormat.class); } @@ -88,8 +103,15 @@ public void testOutputFormat() throws Exception { conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName()); + conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT, + CSVIntermediateDataFormat.class.getName()); + Schema schema = new Schema("Test"); + schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2")) + .addColumn(new Text("3")); - JobUtils.runJob(conf, SqoopInputFormat.class, SqoopMapper.class, + Job job = new Job(conf); + ConfigurationUtils.setConnectorSchema(job, schema); + JobUtils.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class, SqoopNullOutputFormat.class); } @@ -152,14 +174,14 @@ public long getRowsRead() { } public static class DummyOutputFormat - extends OutputFormat { + extends OutputFormat { @Override public void checkOutputSpecs(JobContext context) { // do nothing } @Override - public RecordWriter getRecordWriter( + public RecordWriter getRecordWriter( TaskAttemptContext context) { return new DummyRecordWriter(); } @@ -170,12 +192,13 @@ public OutputCommitter getOutputCommitter(TaskAttemptContext context) { } public static class DummyRecordWriter - extends RecordWriter { + extends RecordWriter { private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION; private Data data = new Data(); @Override - public void write(Data key, NullWritable value) { + public void write(SqoopWritable key, NullWritable value) { + data.setContent(new Object[] { index, (double) index, @@ -215,22 +238,22 @@ public boolean needsTaskCommit(TaskAttemptContext taskContext) { public static class DummyLoader extends Loader { private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION; private Data expected = new Data(); - private Data actual = new Data(); + private CSVIntermediateDataFormat actual = new CSVIntermediateDataFormat(); @Override public void load(LoaderContext context, Object oc, Object oj) throws Exception{ - Object[] array; - while ((array = context.getDataReader().readArrayRecord()) != null) { - actual.setContent(array, Data.ARRAY_RECORD); + String data; + while ((data = context.getDataReader().readTextRecord()) != null) { +// actual.setSchema(context.getSchema()); +// actual.setObjectData(array, false); expected.setContent(new Object[] { index, (double) index, String.valueOf(index)}, Data.ARRAY_RECORD); index++; - - assertEquals(expected.toString(), actual.toString()); + assertEquals(expected.toString(), data); } } } diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java new file mode 100644 index 00000000..b78b140f --- /dev/null +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sqoop.job.io; + +import com.google.common.base.Charsets; +import junit.framework.Assert; +import junit.framework.TestCase; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; + +import org.apache.hadoop.conf.Configuration; +import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; +import org.apache.sqoop.job.JobConstants; + +public class SqoopWritableTest extends TestCase { + + private final SqoopWritable writable = new SqoopWritable(); + + public void testStringInStringOut() { + String testData = "Live Long and prosper"; + writable.setString(testData); + Assert.assertEquals(testData,writable.getString()); + } + + public void testDataWritten() throws IOException { + String testData = "One ring to rule them all"; + byte[] testDataBytes = testData.getBytes(Charsets.UTF_8); + writable.setString(testData); + ByteArrayOutputStream ostream = new ByteArrayOutputStream(); + DataOutput out = new DataOutputStream(ostream); + writable.write(out); + byte[] written = ostream.toByteArray(); + InputStream instream = new ByteArrayInputStream(written); + DataInput in = new DataInputStream(instream); + String readData = in.readUTF(); + Assert.assertEquals(testData, readData); + } + + public void testDataRead() throws IOException { + String testData = "Brandywine Bridge - 20 miles!"; + ByteArrayOutputStream ostream = new ByteArrayOutputStream(); + DataOutput out = new DataOutputStream(ostream); + out.writeUTF(testData); + InputStream instream = new ByteArrayInputStream(ostream.toByteArray()); + DataInput in = new DataInputStream(instream); + writable.readFields(in); + Assert.assertEquals(testData, writable.getString()); + } + + public void testWriteReadUsingStream() throws IOException { + String testData = "You shall not pass"; + ByteArrayOutputStream ostream = new ByteArrayOutputStream(); + DataOutput out = new DataOutputStream(ostream); + writable.setString(testData); + writable.write(out); + byte[] written = ostream.toByteArray(); + + //Don't test what the data is, test that SqoopWritable can read it. + InputStream instream = new ByteArrayInputStream(written); + SqoopWritable newWritable = new SqoopWritable(); + DataInput in = new DataInputStream(instream); + newWritable.readFields(in); + Assert.assertEquals(testData, newWritable.getString()); + ostream.close(); + instream.close(); + } + +} diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java index bee8ab75..1f55f1b9 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java @@ -23,11 +23,13 @@ import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; +import org.apache.sqoop.connector.idf.IntermediateDataFormat; import org.apache.sqoop.job.JobConstants; import org.apache.sqoop.job.etl.Loader; import org.apache.sqoop.job.etl.LoaderContext; -import org.apache.sqoop.job.io.Data; import org.apache.sqoop.model.MJob; +import org.apache.sqoop.job.io.SqoopWritable; import org.junit.Before; import org.junit.Test; @@ -47,7 +49,7 @@ public ThrowingLoader() { @Override public void load(LoaderContext context, Object cc, Object jc) throws Exception { - context.getDataReader().readContent(Data.CSV_RECORD); + context.getDataReader().readTextRecord(); throw new BrokenBarrierException(); } } @@ -62,7 +64,7 @@ public void load(LoaderContext context, Object cc, Object jc) throws Exception { int runCount = 0; Object o; String[] arr; - while ((o = context.getDataReader().readContent(Data.CSV_RECORD)) != null) { + while ((o = context.getDataReader().readTextRecord()) != null) { arr = o.toString().split(","); Assert.assertEquals(100, arr.length); for (int i = 0; i < arr.length; i++) { @@ -84,7 +86,7 @@ public GoodLoader() { @Override public void load(LoaderContext context, Object cc, Object jc) throws Exception { - String[] arr = context.getDataReader().readContent(Data.CSV_RECORD).toString().split(","); + String[] arr = context.getDataReader().readTextRecord().toString().split(","); Assert.assertEquals(100, arr.length); for (int i = 0; i < arr.length; i++) { Assert.assertEquals(i, Integer.parseInt(arr[i])); @@ -103,7 +105,7 @@ public void load(LoaderContext context, Object cc, Object jc) throws Exception { int runCount = 0; Object o; String[] arr; - while ((o = context.getDataReader().readContent(Data.CSV_RECORD)) != null) { + while ((o = context.getDataReader().readTextRecord()) != null) { arr = o.toString().split(","); Assert.assertEquals(100, arr.length); for (int i = 0; i < arr.length; i++) { @@ -119,6 +121,7 @@ public void load(LoaderContext context, Object cc, Object jc) throws Exception { @Before public void setUp() { conf = new Configuration(); + conf.setIfUnset(JobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName()); } @@ -128,12 +131,14 @@ public void testWhenLoaderThrows() throws Throwable { conf.set(JobConstants.JOB_ETL_LOADER, ThrowingLoader.class.getName()); SqoopOutputFormatLoadExecutor executor = new SqoopOutputFormatLoadExecutor(true, ThrowingLoader.class.getName()); - RecordWriter writer = executor.getRecordWriter(); - Data data = new Data(); + RecordWriter writer = executor.getRecordWriter(); + IntermediateDataFormat data = new CSVIntermediateDataFormat(); + SqoopWritable writable = new SqoopWritable(); try { for (int count = 0; count < 100; count++) { - data.setContent(String.valueOf(count), Data.CSV_RECORD); - writer.write(data, null); + data.setTextData(String.valueOf(count)); + writable.setString(data.getTextData()); + writer.write(writable, null); } } catch (SqoopException ex) { throw ex.getCause(); @@ -146,8 +151,9 @@ public void testSuccessfulContinuousLoader() throws Throwable { conf.set(JobConstants.JOB_ETL_LOADER, GoodContinuousLoader.class.getName()); SqoopOutputFormatLoadExecutor executor = new SqoopOutputFormatLoadExecutor(true, GoodContinuousLoader.class.getName()); - RecordWriter writer = executor.getRecordWriter(); - Data data = new Data(); + RecordWriter writer = executor.getRecordWriter(); + IntermediateDataFormat data = new CSVIntermediateDataFormat(); + SqoopWritable writable = new SqoopWritable(); for (int i = 0; i < 10; i++) { StringBuilder builder = new StringBuilder(); for (int count = 0; count < 100; count++) { @@ -156,8 +162,9 @@ public void testSuccessfulContinuousLoader() throws Throwable { builder.append(","); } } - data.setContent(builder.toString(), Data.CSV_RECORD); - writer.write(data, null); + data.setTextData(builder.toString()); + writable.setString(data.getTextData()); + writer.write(writable, null); } writer.close(null); } @@ -166,8 +173,9 @@ public void testSuccessfulContinuousLoader() throws Throwable { public void testSuccessfulLoader() throws Throwable { SqoopOutputFormatLoadExecutor executor = new SqoopOutputFormatLoadExecutor(true, GoodLoader.class.getName()); - RecordWriter writer = executor.getRecordWriter(); - Data data = new Data(); + RecordWriter writer = executor.getRecordWriter(); + IntermediateDataFormat data = new CSVIntermediateDataFormat(); + SqoopWritable writable = new SqoopWritable(); StringBuilder builder = new StringBuilder(); for (int count = 0; count < 100; count++) { builder.append(String.valueOf(count)); @@ -175,8 +183,10 @@ public void testSuccessfulLoader() throws Throwable { builder.append(","); } } - data.setContent(builder.toString(), Data.CSV_RECORD); - writer.write(data, null); + data.setTextData(builder.toString()); + writable.setString(data.getTextData()); + writer.write(writable, null); + //Allow writer to complete. TimeUnit.SECONDS.sleep(5); writer.close(null); @@ -189,8 +199,9 @@ public void testThrowingContinuousLoader() throws Throwable { conf.set(JobConstants.JOB_ETL_LOADER, ThrowingContinuousLoader.class.getName()); SqoopOutputFormatLoadExecutor executor = new SqoopOutputFormatLoadExecutor(true, ThrowingContinuousLoader.class.getName()); - RecordWriter writer = executor.getRecordWriter(); - Data data = new Data(); + RecordWriter writer = executor.getRecordWriter(); + IntermediateDataFormat data = new CSVIntermediateDataFormat(); + SqoopWritable writable = new SqoopWritable(); try { for (int i = 0; i < 10; i++) { StringBuilder builder = new StringBuilder(); @@ -200,8 +211,9 @@ public void testThrowingContinuousLoader() throws Throwable { builder.append(","); } } - data.setContent(builder.toString(), Data.CSV_RECORD); - writer.write(data, null); + data.setTextData(builder.toString()); + writable.setString(data.getTextData()); + writer.write(writable, null); } writer.close(null); } catch (SqoopException ex) { diff --git a/pom.xml b/pom.xml index 1e2f0057..a722c74c 100644 --- a/pom.xml +++ b/pom.xml @@ -142,12 +142,6 @@ limitations under the License. ${commons-io.version} - - com.google.guava - guava - ${guava.version} - - org.apache.hadoop hadoop-core @@ -344,6 +338,11 @@ limitations under the License. commons-lang ${commons-lang.version} + + com.google.guava + guava + ${guava.version} + javax.servlet servlet-api diff --git a/spi/pom.xml b/spi/pom.xml index 0b240e8a..43f17d4c 100644 --- a/spi/pom.xml +++ b/spi/pom.xml @@ -36,5 +36,10 @@ limitations under the License. org.apache.sqoop sqoop-common + + + org.apache.sqoop + connector-sdk + diff --git a/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java b/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java index 2becc563..50eb9402 100644 --- a/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java +++ b/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java @@ -20,6 +20,8 @@ import java.util.Locale; import java.util.ResourceBundle; +import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; +import org.apache.sqoop.connector.idf.IntermediateDataFormat; import org.apache.sqoop.job.etl.Exporter; import org.apache.sqoop.job.etl.Importer; import org.apache.sqoop.model.MJob; @@ -79,4 +81,14 @@ public abstract class SqoopConnector { */ public abstract MetadataUpgrader getMetadataUpgrader(); + /** + * Returns the {@linkplain IntermediateDataFormat} this connector + * can return natively in. This will support retrieving the data as text + * and an array of objects. This should never return null. + * + * @return {@linkplain IntermediateDataFormat} object + */ + public Class> getIntermediateDataFormat() { + return CSVIntermediateDataFormat.class; + } } diff --git a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java index bfc28ef8..a05274a0 100644 --- a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java +++ b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java @@ -198,6 +198,7 @@ public boolean submit(SubmissionRequest generalRequest) { ConfigurationUtils.setConfigConnectorJob(job, request.getConfigConnectorJob()); ConfigurationUtils.setConfigFrameworkConnection(job, request.getConfigFrameworkConnection()); ConfigurationUtils.setConfigFrameworkJob(job, request.getConfigFrameworkJob()); + ConfigurationUtils.setConnectorSchema(job, request.getSummary().getConnectorSchema()); if(request.getJobName() != null) { job.setJobName("Sqoop: " + request.getJobName());