5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-05 04:50:47 +08:00

SQOOP-777: Sqoop2: Implement intermediate data format representation policy

(Abraham Elmahrek via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2014-07-26 11:37:50 -07:00
parent 17c7219b94
commit 3c93930bf3
41 changed files with 1412 additions and 272 deletions

View File

@ -45,6 +45,11 @@ limitations under the License.
<groupId>commons-lang</groupId> <groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId> <artifactId>commons-lang</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -18,17 +18,32 @@
package org.apache.sqoop.etl.io; package org.apache.sqoop.etl.io;
/** /**
* An intermediate layer for passing data from the MR framework * An intermediate layer for passing data from the execution framework
* to the ETL framework. * to the ETL framework.
*/ */
public abstract class DataReader { public abstract class DataReader {
/**
* Read data from the execution framework as an object array.
* @return - array of objects with each column represented as an object
* @throws Exception
*/
public abstract Object[] readArrayRecord() throws Exception; public abstract Object[] readArrayRecord() throws Exception;
public abstract String readCsvRecord() throws Exception; /**
* Read data from execution framework as text - as a CSV record.
* public abstract Object readContent(int type) throws Exception;
* @return - CSV formatted data.
* @throws Exception
*/
public abstract String readTextRecord() throws Exception;
public abstract Object readContent(int type) throws Exception; /**
* Read data from execution framework as a native format.
public abstract void setFieldDelimiter(char fieldDelimiter); * @return - the content in the native format of the intermediate data
* format being used.
* @throws Exception
*/
public abstract Object readContent() throws Exception;
} }

View File

@ -23,12 +23,24 @@
*/ */
public abstract class DataWriter { public abstract class DataWriter {
/**
* Write an array of objects into the execution framework
* @param array - data to be written
*/
public abstract void writeArrayRecord(Object[] array); public abstract void writeArrayRecord(Object[] array);
public abstract void writeCsvRecord(String csv); /**
* Write data into execution framework as text. The Intermediate Data Format
* may choose to convert the data to another format based on how the data
* format is implemented
* @param text - data represented as CSV text.
*/
public abstract void writeStringRecord(String text);
public abstract void writeContent(Object content, int type); /**
* Write data in the intermediate data format's native format.
public abstract void setFieldDelimiter(char fieldDelimiter); * @param obj - data to be written
*/
public abstract void writeRecord(Object obj);
} }

View File

@ -98,4 +98,9 @@ public int hashCode() {
result = 31 * result + (nullable != null ? nullable.hashCode() : 0); result = 31 * result + (nullable != null ? nullable.hashCode() : 0);
return result; return result;
} }
public boolean validate(Object o) {
// TODO: Implement this in all subclasses!
return true;
}
} }

View File

@ -21,6 +21,8 @@
import java.util.ResourceBundle; import java.util.ResourceBundle;
import org.apache.sqoop.common.VersionInfo; import org.apache.sqoop.common.VersionInfo;
import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
import org.apache.sqoop.connector.idf.IntermediateDataFormat;
import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration; import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration;
import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration; import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
@ -61,7 +63,7 @@ public String getVersion() {
@Override @Override
public ResourceBundle getBundle(Locale locale) { public ResourceBundle getBundle(Locale locale) {
return ResourceBundle.getBundle( return ResourceBundle.getBundle(
GenericJdbcConnectorConstants.RESOURCE_BUNDLE_NAME, locale); GenericJdbcConnectorConstants.RESOURCE_BUNDLE_NAME, locale);
} }
@Override @Override

View File

@ -78,6 +78,8 @@ public enum GenericJdbcConnectorError implements ErrorCode {
GENERIC_JDBC_CONNECTOR_0018("Error occurred while transferring data from " + GENERIC_JDBC_CONNECTOR_0018("Error occurred while transferring data from " +
"stage table to destination table."), "stage table to destination table."),
GENERIC_JDBC_CONNECTOR_0019("Table name extraction not supported.")
; ;
private final String message; private final String message;

View File

@ -17,6 +17,9 @@
*/ */
package org.apache.sqoop.connector.jdbc; package org.apache.sqoop.connector.jdbc;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
@ -26,9 +29,11 @@
import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration; import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration;
import org.apache.sqoop.connector.jdbc.util.SqlTypesUtils;
import org.apache.sqoop.job.etl.Initializer; import org.apache.sqoop.job.etl.Initializer;
import org.apache.sqoop.job.etl.InitializerContext; import org.apache.sqoop.job.etl.InitializerContext;
import org.apache.sqoop.schema.Schema; import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.schema.type.Column;
import org.apache.sqoop.utils.ClassUtils; import org.apache.sqoop.utils.ClassUtils;
public class GenericJdbcExportInitializer extends Initializer<ConnectionConfiguration, ExportJobConfiguration> { public class GenericJdbcExportInitializer extends Initializer<ConnectionConfiguration, ExportJobConfiguration> {
@ -58,7 +63,53 @@ public List<String> getJars(InitializerContext context, ConnectionConfiguration
@Override @Override
public Schema getSchema(InitializerContext context, ConnectionConfiguration connectionConfiguration, ExportJobConfiguration exportJobConfiguration) { public Schema getSchema(InitializerContext context, ConnectionConfiguration connectionConfiguration, ExportJobConfiguration exportJobConfiguration) {
return null; configureJdbcProperties(context.getContext(), connectionConfiguration, exportJobConfiguration);
String schemaName = exportJobConfiguration.table.tableName;
if (schemaName == null) {
throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0019,
"Table name extraction not supported yet.");
}
if(exportJobConfiguration.table.schemaName != null) {
schemaName = exportJobConfiguration.table.schemaName + "." + schemaName;
}
Schema schema = new Schema(schemaName);
ResultSet rs = null;
ResultSetMetaData rsmt = null;
try {
rs = executor.executeQuery("SELECT * FROM " + schemaName + " WHERE 1 = 0");
rsmt = rs.getMetaData();
for (int i = 1 ; i <= rsmt.getColumnCount(); i++) {
Column column = SqlTypesUtils.sqlTypeToAbstractType(rsmt.getColumnType(i));
String columnName = rsmt.getColumnName(i);
if (columnName == null || columnName.equals("")) {
columnName = rsmt.getColumnLabel(i);
if (null == columnName) {
columnName = "Column " + i;
}
}
column.setName(columnName);
schema.addColumn(column);
}
return schema;
} catch (SQLException e) {
throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0016, e);
} finally {
if(rs != null) {
try {
rs.close();
} catch (SQLException e) {
LOG.info("Ignoring exception while closing ResultSet", e);
}
}
}
} }
private void configureJdbcProperties(MutableContext context, ConnectionConfiguration connectionConfig, ExportJobConfiguration jobConfig) { private void configureJdbcProperties(MutableContext context, ConnectionConfiguration connectionConfig, ExportJobConfiguration jobConfig) {

View File

@ -71,16 +71,17 @@ public Schema getSchema(InitializerContext context, ConnectionConfiguration conn
String schemaName = importJobConfiguration.table.tableName; String schemaName = importJobConfiguration.table.tableName;
if(schemaName == null) { if(schemaName == null) {
schemaName = "Query"; schemaName = "Query";
} else if(importJobConfiguration.table.schemaName != null) {
schemaName = importJobConfiguration.table.schemaName + "." + schemaName;
} }
Schema schema = new Schema(schemaName); Schema schema = new Schema(schemaName);
ResultSet rs = null; ResultSet rs = null;
ResultSetMetaData rsmt = null; ResultSetMetaData rsmt = null;
try { try {
rs = executor.executeQuery( rs = executor.executeQuery(
context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL) context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL)
.replace(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 0") .replace(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 0")
); );
rsmt = rs.getMetaData(); rsmt = rs.getMetaData();

View File

@ -112,11 +112,6 @@ public void testInsert() throws Exception {
public class DummyReader extends DataReader { public class DummyReader extends DataReader {
int index = 0; int index = 0;
@Override
public void setFieldDelimiter(char fieldDelimiter) {
// do nothing and use default delimiter
}
@Override @Override
public Object[] readArrayRecord() { public Object[] readArrayRecord() {
if (index < numberOfRows) { if (index < numberOfRows) {
@ -132,16 +127,17 @@ public Object[] readArrayRecord() {
} }
@Override @Override
public String readCsvRecord() { public String readTextRecord() {
fail("This method should not be invoked."); fail("This method should not be invoked.");
return null; return null;
} }
@Override @Override
public Object readContent(int type) { public Object readContent() throws Exception {
fail("This method should not be invoked."); fail("This method should not be invoked.");
return null; return null;
} }
} }
} }

View File

@ -133,11 +133,6 @@ public void testSubquery() throws Exception {
public class DummyWriter extends DataWriter { public class DummyWriter extends DataWriter {
int indx = START; int indx = START;
@Override
public void setFieldDelimiter(char fieldDelimiter) {
// do nothing and use default delimiter
}
@Override @Override
public void writeArrayRecord(Object[] array) { public void writeArrayRecord(Object[] array) {
for (int i = 0; i < array.length; i++) { for (int i = 0; i < array.length; i++) {
@ -153,12 +148,12 @@ public void writeArrayRecord(Object[] array) {
} }
@Override @Override
public void writeCsvRecord(String csv) { public void writeStringRecord(String text) {
fail("This method should not be invoked."); fail("This method should not be invoked.");
} }
@Override @Override
public void writeContent(Object content, int type) { public void writeRecord(Object content) {
fail("This method should not be invoked."); fail("This method should not be invoked.");
} }
} }

View File

@ -327,7 +327,7 @@ public void testGetSchemaForTable() throws Exception {
Initializer initializer = new GenericJdbcImportInitializer(); Initializer initializer = new GenericJdbcImportInitializer();
initializer.initialize(initializerContext, connConf, jobConf); initializer.initialize(initializerContext, connConf, jobConf);
Schema schema = initializer.getSchema(initializerContext, connConf, jobConf); Schema schema = initializer.getSchema(initializerContext, connConf, jobConf);
assertEquals(getSchema(tableName), schema); assertEquals(getSchema(jobConf.table.schemaName + "." + tableName), schema);
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")

View File

@ -38,6 +38,12 @@ limitations under the License.
<artifactId>junit</artifactId> <artifactId>junit</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.sqoop</groupId>
<artifactId>sqoop-common</artifactId>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -0,0 +1,355 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sqoop.connector.idf;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.schema.type.Column;
import org.apache.sqoop.schema.type.FixedPoint;
import org.apache.sqoop.schema.type.FloatingPoint;
import org.apache.sqoop.schema.type.Type;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
public static final char SEPARATOR_CHARACTER = ',';
public static final char ESCAPE_CHARACTER = '\\';
public static final char QUOTE_CHARACTER = '\'';
private static final Logger LOG = Logger.getLogger
(CSVIntermediateDataFormat.class);
private static final char[] originals = {
0x5C,0x00,0x0A,0x0D,0x1A,0x22,0x27
};
private static final String[] replacements = {
new String(new char[] { ESCAPE_CHARACTER, '\\'}),
new String(new char[] { ESCAPE_CHARACTER, '0'}),
new String(new char[] { ESCAPE_CHARACTER, 'n'}),
new String(new char[] { ESCAPE_CHARACTER, 'r'}),
new String(new char[] { ESCAPE_CHARACTER, 'Z'}),
new String(new char[] { ESCAPE_CHARACTER, '\"'}),
new String(new char[] { ESCAPE_CHARACTER, '\''})
};
// ISO-8859-1 is an 8-bit codec that is supported in every java implementation.
public static final String BYTE_FIELD_CHARSET = "ISO-8859-1";
private final List<Integer> stringFieldIndices = new ArrayList<Integer>();
private final List<Integer> byteFieldIndices = new ArrayList<Integer>();
private Schema schema;
/**
* {@inheritDoc}
*/
@Override
public String getTextData() {
return data;
}
/**
* {@inheritDoc}
*/
@Override
public void setTextData(String text) {
this.data = text;
}
/**
* {@inheritDoc}
*/
@Override
public Schema getSchema() {
return schema;
}
/**
* {@inheritDoc}
*/
@Override
public void setSchema(Schema schema) {
if(schema == null) {
return;
}
this.schema = schema;
List<Column> columns = schema.getColumns();
int i = 0;
for(Column col : columns) {
if(col.getType() == Type.TEXT) {
stringFieldIndices.add(i);
} else if(col.getType() == Type.BINARY) {
byteFieldIndices.add(i);
}
i++;
}
}
/**
* Custom CSV parser that honors quoting and escaped quotes.
* All other escaping is handled elsewhere.
*
* @return String[]
*/
private String[] getFields() {
if (data == null) {
return null;
}
boolean quoted = false;
boolean escaped = false;
List<String> parsedData = new LinkedList<String>();
StringBuffer buffer = new StringBuffer();
for (int i = 0; i < data.length(); ++i) {
char c = data.charAt(i);
switch(c) {
case QUOTE_CHARACTER:
buffer.append(c);
if (escaped) {
escaped = false;
} else {
quoted = !quoted;
}
break;
case ESCAPE_CHARACTER:
buffer.append(ESCAPE_CHARACTER);
escaped = !escaped;
break;
case SEPARATOR_CHARACTER:
if (quoted) {
buffer.append(c);
} else {
parsedData.add(buffer.toString());
buffer = new StringBuffer();
}
break;
default:
if (escaped) {
escaped = false;
}
buffer.append(c);
break;
}
}
parsedData.add(buffer.toString());
return parsedData.toArray(new String[parsedData.size()]);
}
/**
* {@inheritDoc}
*/
@Override
public Object[] getObjectData() {
String[] fields = getFields();
if (fields == null) {
return null;
}
if (fields.length != schema.getColumns().size()) {
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
"The data " + getTextData() + " has the wrong number of fields.");
}
Object[] out = new Object[fields.length];
Column[] cols = schema.getColumns().toArray(new Column[fields.length]);
for (int i = 0; i < fields.length; i++) {
Type colType = cols[i].getType();
if (fields[i].equals("NULL")) {
out[i] = null;
continue;
}
if (colType == Type.TEXT) {
out[i] = unescapeStrings(fields[i]);
} else if (colType == Type.BINARY) {
out[i] = unescapeByteArray(fields[i]);
} else if (colType == Type.FIXED_POINT) {
Long byteSize = ((FixedPoint) cols[i]).getByteSize();
if (byteSize != null && byteSize <= Integer.SIZE) {
out[i] = Integer.valueOf(fields[i]);
} else {
out[i] = Long.valueOf(fields[i]);
}
} else if (colType == Type.FLOATING_POINT) {
Long byteSize = ((FloatingPoint) cols[i]).getByteSize();
if (byteSize != null && byteSize <= Float.SIZE) {
out[i] = Float.valueOf(fields[i]);
} else {
out[i] = Double.valueOf(fields[i]);
}
} else if (colType == Type.DECIMAL) {
out[i] = new BigDecimal(fields[i]);
} else {
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0004, "Column type from schema was not recognized for " + colType);
}
}
return out;
}
/**
* {@inheritDoc}
*/
@VisibleForTesting
@Override
public void setObjectData(Object[] data) {
escapeArray(data);
this.data = StringUtils.join(data, SEPARATOR_CHARACTER);
}
/**
* {@inheritDoc}
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(this.data);
}
/**
* {@inheritDoc}
*/
@Override
public void read(DataInput in) throws IOException {
data = in.readUTF();
}
/**
* {@inheritDoc}
*/
@Override
public boolean equals(Object other) {
if(this == other) {
return true;
}
if(other == null || !(other instanceof CSVIntermediateDataFormat)) {
return false;
}
return data.equals(((CSVIntermediateDataFormat)other).data);
}
public int compareTo(IntermediateDataFormat<?> o) {
if(this == o) {
return 0;
}
if(this.equals(o)) {
return 0;
}
if(!(o instanceof CSVIntermediateDataFormat)) {
throw new IllegalStateException("Expected Data to be instance of " +
"CSVIntermediateFormat, but was an instance of " + o.getClass()
.getName());
}
return data.compareTo(o.getTextData());
}
/**
* If the incoming data is an array, parse it and return the CSV-ised version
*
* @param array
*/
private void escapeArray(Object[] array) {
for (int i : stringFieldIndices) {
array[i] = escapeStrings((String) array[i]);
}
for (int i : byteFieldIndices) {
array[i] = escapeByteArrays((byte[]) array[i]);
}
}
private String escapeByteArrays(byte[] bytes) {
try {
return escapeStrings(new String(bytes, BYTE_FIELD_CHARSET));
} catch (UnsupportedEncodingException e) {
// We should never hit this case.
// This character set should be distributed with Java.
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, "The character set " + BYTE_FIELD_CHARSET + " is not available.");
}
}
private String getRegExp(char orig) {
return getRegExp(String.valueOf(orig));
}
private String getRegExp(String orig) {
return orig.replaceAll("\\\\", Matcher.quoteReplacement("\\\\"));
}
private String escapeStrings(String orig) {
int j = 0;
String replacement = orig;
try {
for (j = 0; j < replacements.length; j++) {
replacement = replacement.replaceAll(getRegExp(originals[j]), Matcher.quoteReplacement(replacements[j]));
}
} catch (Exception e) {
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0002, orig + " " + replacement + " " + String.valueOf(j) + " " + e.getMessage());
}
StringBuilder builder = new StringBuilder();
builder.append(QUOTE_CHARACTER).append(replacement).append(QUOTE_CHARACTER);
return builder.toString();
}
private String unescapeStrings(String orig) {
//Remove the trailing and starting quotes.
orig = orig.substring(1, orig.length() - 1);
int j = 0;
try {
for (j = 0; j < replacements.length; j++) {
orig = orig.replaceAll(getRegExp(replacements[j]),
Matcher.quoteReplacement(String.valueOf(originals[j])));
}
} catch (Exception e) {
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0003, orig + " " + String.valueOf(j) + e.getMessage());
}
return orig;
}
private byte[] unescapeByteArray(String orig) {
// Always encoded in BYTE_FIELD_CHARSET.
try {
return unescapeStrings(orig).getBytes(BYTE_FIELD_CHARSET);
} catch (UnsupportedEncodingException e) {
// Should never hit this case.
// This character set should be distributed with Java.
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, "The character set " + BYTE_FIELD_CHARSET + " is not available.");
}
}
public String toString() {
return data;
}
}

View File

@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sqoop.connector.idf;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.schema.type.Column;
import org.apache.sqoop.schema.type.Type;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* Abstract class representing a pluggable intermediate data format the Sqoop
* framework will use to move data to/from the connector. All intermediate
* data formats are expected to have an internal/native implementation,
* but also should minimally be able to return a text (CSV) version of the
* data. The data format should also be able to return the data as an object
* array - each array representing one row.
* <p/>
* Why a "native" internal format and then return text too?
* Imagine a connector that moves data from a system that stores data as a
* serialization format called FooFormat. If I also need the data to be
* written into HDFS as FooFormat, the additional cycles burnt in converting
* the FooFormat to text and back is useless - so plugging in an intermediate
* format that can store the data as FooFormat saves those cycles!
* <p/>
* Most fast access mechanisms, like mysqldump or pgsqldump write the data
* out as CSV, and most often the destination data is also represented as CSV
* - so having a minimal CSV support is important, so we can easily pull the
* data out as text.
* <p/>
* Any conversion to the final format from the native or text format is to be
* done by the connector or OutputFormat classes.
*
* @param <T> - Each data format may have a native representation of the
* data, represented by the parameter.
*/
public abstract class IntermediateDataFormat<T> {
protected volatile T data;
public int hashCode() {
return data.hashCode();
}
/**
* Set one row of data. If validate is set to true, the data is validated
* against the schema.
*
* @param data - A single row of data to be moved.
*/
public void setData(T data) {
this.data = data;
}
/**
* Get one row of data.
*
* @return - One row of data, represented in the internal/native format of
* the intermediate data format implementation.
*/
public T getData() {
return data;
}
/**
* Get one row of data as CSV.
*
* @return - String representing the data in CSV
*/
public abstract String getTextData();
/**
* Set one row of data as CSV.
*
*/
public abstract void setTextData(String text);
/**
* Get one row of data as an Object array.
*
* @return - String representing the data as an Object array
*/
public abstract Object[] getObjectData();
/**
* Set one row of data as an Object array.
*
*/
public abstract void setObjectData(Object[] data);
/**
* Set the schema to be used.
*
* @param schema - the schema to be used
*/
public abstract void setSchema(Schema schema);
/**
* Get the schema of the data.
*
* @return - The schema of the data.
*/
public abstract Schema getSchema();
/**
* Serialize the fields of this object to <code>out</code>.
*
* @param out <code>DataOuput</code> to serialize this object into.
* @throws IOException
*/
public abstract void write(DataOutput out) throws IOException;
/**
* Deserialize the fields of this object from <code>in</code>.
*
* <p>For efficiency, implementations should attempt to re-use storage in the
* existing object where possible.</p>
*
* @param in <code>DataInput</code> to deseriablize this object from.
* @throws IOException
*/
public abstract void read(DataInput in) throws IOException;
}

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sqoop.connector.idf;
import org.apache.sqoop.common.ErrorCode;
public enum IntermediateDataFormatError implements ErrorCode {
/** An unknown error has occurred. */
INTERMEDIATE_DATA_FORMAT_0000("An unknown error has occurred."),
/** An encoding is missing in the Java native libraries. */
INTERMEDIATE_DATA_FORMAT_0001("Native character set error."),
/** Error while escaping a row. */
INTERMEDIATE_DATA_FORMAT_0002("An error has occurred while escaping a row."),
/** Error while escaping a row. */
INTERMEDIATE_DATA_FORMAT_0003("An error has occurred while unescaping a row."),
/** Column type isn't known by Intermediate Data Format. */
INTERMEDIATE_DATA_FORMAT_0004("Unknown column type."),
/** Number of fields. */
INTERMEDIATE_DATA_FORMAT_0005("Wrong number of fields.")
;
private final String message;
private IntermediateDataFormatError(String message) {
this.message = message;
}
public String getCode() {
return name();
}
public String getMessage() {
return message;
}
}

View File

@ -0,0 +1,222 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sqoop.connector.idf;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.schema.type.Binary;
import org.apache.sqoop.schema.type.FixedPoint;
import org.apache.sqoop.schema.type.Text;
import org.junit.Before;
import org.junit.Test;
import java.io.UnsupportedEncodingException;
import java.util.Arrays;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class CSVIntermediateDataFormatTest {
private final String BYTE_FIELD_ENCODING = "ISO-8859-1";
private IntermediateDataFormat<?> data;
@Before
public void setUp() {
data = new CSVIntermediateDataFormat();
}
private String getByteFieldString(byte[] byteFieldData) {
try {
return new StringBuilder("'").append(new String(byteFieldData, BYTE_FIELD_ENCODING)).append("'").toString();
} catch(UnsupportedEncodingException e) {
// Should never get to this point because ISO-8859-1 is a standard codec.
return null;
}
}
@Test
public void testStringInStringOut() {
String testData = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54})
+ ",'" + String.valueOf(0x0A) + "'";
data.setTextData(testData);
assertEquals(testData, data.getTextData());
}
@Test
public void testNullStringInObjectOut() {
Schema schema = new Schema("test");
schema.addColumn(new FixedPoint("1"))
.addColumn(new FixedPoint("2"))
.addColumn(new Text("3"))
.addColumn(new Text("4"))
.addColumn(new Binary("5"))
.addColumn(new Text("6"));
data.setSchema(schema);
data.setTextData(null);
Object[] out = data.getObjectData();
assertNull(out);
}
@Test(expected=SqoopException.class)
public void testEmptyStringInObjectOut() {
Schema schema = new Schema("test");
schema.addColumn(new FixedPoint("1"))
.addColumn(new FixedPoint("2"))
.addColumn(new Text("3"))
.addColumn(new Text("4"))
.addColumn(new Binary("5"))
.addColumn(new Text("6"));
data.setSchema(schema);
data.setTextData("");
data.getObjectData();
}
@Test
public void testStringInObjectOut() {
//byte[0] = -112, byte[1] = 54 - 2's complements
String testData = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54})
+ ",'\\n'";
Schema schema = new Schema("test");
schema.addColumn(new FixedPoint("1"))
.addColumn(new FixedPoint("2"))
.addColumn(new Text("3"))
.addColumn(new Text("4"))
.addColumn(new Binary("5"))
.addColumn(new Text("6"));
data.setSchema(schema);
data.setTextData(testData);
Object[] out = data.getObjectData();
assertEquals(new Long(10),out[0]);
assertEquals(new Long(34),out[1]);
assertEquals("54",out[2]);
assertEquals("random data",out[3]);
assertEquals(-112, ((byte[])out[4])[0]);
assertEquals(54, ((byte[])out[4])[1]);
assertEquals("\n", out[5].toString());
}
@Test
public void testObjectInStringOut() {
Schema schema = new Schema("test");
schema.addColumn(new FixedPoint("1"))
.addColumn(new FixedPoint("2"))
.addColumn(new Text("3"))
.addColumn(new Text("4"))
.addColumn(new Binary("5"))
.addColumn(new Text("6"));
data.setSchema(schema);
byte[] byteFieldData = new byte[] { (byte) 0x0D, (byte) -112, (byte) 54};
Object[] in = new Object[6];
in[0] = new Long(10);
in[1] = new Long(34);
in[2] = "54";
in[3] = "random data";
in[4] = byteFieldData;
in[5] = new String(new char[] { 0x0A });
data.setObjectData(in);
//byte[0] = \r byte[1] = -112, byte[1] = 54 - 2's complements
String testData = "10,34,'54','random data'," +
getByteFieldString(byteFieldData).replaceAll("\r", "\\\\r") + ",'\\n'";
assertEquals(testData, data.getTextData());
}
@Test
public void testObjectInObjectOut() {
//Test escapable sequences too.
//byte[0] = -112, byte[1] = 54 - 2's complements
Schema schema = new Schema("test");
schema.addColumn(new FixedPoint("1"))
.addColumn(new FixedPoint("2"))
.addColumn(new Text("3"))
.addColumn(new Text("4"))
.addColumn(new Binary("5"))
.addColumn(new Text("6"));
data.setSchema(schema);
Object[] in = new Object[6];
in[0] = new Long(10);
in[1] = new Long(34);
in[2] = "54";
in[3] = "random data";
in[4] = new byte[] { (byte) -112, (byte) 54};
in[5] = new String(new char[] { 0x0A });
Object[] inCopy = new Object[6];
System.arraycopy(in,0,inCopy,0,in.length);
// Modifies the input array, so we use the copy to confirm
data.setObjectData(in);
assertTrue(Arrays.deepEquals(inCopy, data.getObjectData()));
}
@Test
public void testStringFullRangeOfCharacters() {
Schema schema = new Schema("test");
schema.addColumn(new Text("1"));
data.setSchema(schema);
char[] allCharArr = new char[256];
for(int i = 0; i < allCharArr.length; ++i) {
allCharArr[i] = (char)i;
}
String strData = new String(allCharArr);
Object[] in = {strData};
Object[] inCopy = new Object[1];
System.arraycopy(in,0,inCopy,0,in.length);
// Modifies the input array, so we use the copy to confirm
data.setObjectData(in);
assertEquals(strData, data.getObjectData()[0]);
assertTrue(Arrays.deepEquals(inCopy, data.getObjectData()));
}
@Test
public void testByteArrayFullRangeOfCharacters() {
Schema schema = new Schema("test");
schema.addColumn(new Binary("1"));
data.setSchema(schema);
byte[] allCharByteArr = new byte[256];
for(int i = 0; i < allCharByteArr.length; ++i) {
allCharByteArr[i] = (byte)i;
}
Object[] in = {allCharByteArr};
Object[] inCopy = new Object[1];
System.arraycopy(in,0,inCopy,0,in.length);
// Modifies the input array, so we use the copy to confirm
data.setObjectData(in);
assertTrue(Arrays.deepEquals(inCopy, data.getObjectData()));
}
}

View File

@ -22,6 +22,7 @@
import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.ConnectorManager; import org.apache.sqoop.connector.ConnectorManager;
import org.apache.sqoop.request.HttpEventContext; import org.apache.sqoop.request.HttpEventContext;
import org.apache.sqoop.connector.idf.IntermediateDataFormat;
import org.apache.sqoop.connector.spi.SqoopConnector; import org.apache.sqoop.connector.spi.SqoopConnector;
import org.apache.sqoop.core.Reconfigurable; import org.apache.sqoop.core.Reconfigurable;
import org.apache.sqoop.core.SqoopConfiguration; import org.apache.sqoop.core.SqoopConfiguration;
@ -327,6 +328,10 @@ public MSubmission submit(long jobId, HttpEventContext ctx) {
request.setJobName(job.getName()); request.setJobName(job.getName());
request.setJobId(job.getPersistenceId()); request.setJobId(job.getPersistenceId());
request.setNotificationUrl(notificationBaseUrl + jobId); request.setNotificationUrl(notificationBaseUrl + jobId);
Class<? extends IntermediateDataFormat<?>> dataFormatClass =
connector.getIntermediateDataFormat();
request.setIntermediateDataFormat(connector.getIntermediateDataFormat());
// Create request object
// Let's register all important jars // Let's register all important jars
// sqoop-common // sqoop-common
@ -343,6 +348,9 @@ public MSubmission submit(long jobId, HttpEventContext ctx) {
// Extra libraries that Sqoop code requires // Extra libraries that Sqoop code requires
request.addJarForClass(JSONValue.class); request.addJarForClass(JSONValue.class);
// The IDF is used in the ETL process.
request.addJarForClass(dataFormatClass);
// Get connector callbacks // Get connector callbacks
switch (job.getType()) { switch (job.getType()) {
case IMPORT: case IMPORT:

View File

@ -18,6 +18,7 @@
package org.apache.sqoop.framework; package org.apache.sqoop.framework;
import org.apache.sqoop.common.MutableMapContext; import org.apache.sqoop.common.MutableMapContext;
import org.apache.sqoop.connector.idf.IntermediateDataFormat;
import org.apache.sqoop.connector.spi.SqoopConnector; import org.apache.sqoop.connector.spi.SqoopConnector;
import org.apache.sqoop.job.etl.CallbackBase; import org.apache.sqoop.job.etl.CallbackBase;
import org.apache.sqoop.model.MJob; import org.apache.sqoop.model.MJob;
@ -107,6 +108,11 @@ public class SubmissionRequest {
*/ */
Integer loaders; Integer loaders;
/**
* The intermediate data format this submission should use.
*/
Class<? extends IntermediateDataFormat> intermediateDataFormat;
public SubmissionRequest() { public SubmissionRequest() {
this.jars = new LinkedList<String>(); this.jars = new LinkedList<String>();
this.connectorContext = new MutableMapContext(); this.connectorContext = new MutableMapContext();
@ -252,4 +258,13 @@ public Integer getLoaders() {
public void setLoaders(Integer loaders) { public void setLoaders(Integer loaders) {
this.loaders = loaders; this.loaders = loaders;
} }
public Class<? extends IntermediateDataFormat> getIntermediateDataFormat() {
return intermediateDataFormat;
}
public void setIntermediateDataFormat(Class<? extends IntermediateDataFormat> intermediateDataFormat) {
this.intermediateDataFormat = intermediateDataFormat;
}
} }

View File

@ -52,6 +52,10 @@ limitations under the License.
<artifactId>mockito-all</artifactId> <artifactId>mockito-all</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<!-- See profiles for Hadoop specific dependencies --> <!-- See profiles for Hadoop specific dependencies -->

View File

@ -34,7 +34,7 @@
import org.apache.sqoop.job.etl.HdfsSequenceImportLoader; import org.apache.sqoop.job.etl.HdfsSequenceImportLoader;
import org.apache.sqoop.job.etl.HdfsTextImportLoader; import org.apache.sqoop.job.etl.HdfsTextImportLoader;
import org.apache.sqoop.job.etl.Importer; import org.apache.sqoop.job.etl.Importer;
import org.apache.sqoop.job.io.Data; import org.apache.sqoop.job.io.SqoopWritable;
import org.apache.sqoop.job.mr.SqoopFileOutputFormat; import org.apache.sqoop.job.mr.SqoopFileOutputFormat;
import org.apache.sqoop.job.mr.SqoopInputFormat; import org.apache.sqoop.job.mr.SqoopInputFormat;
import org.apache.sqoop.job.mr.SqoopMapper; import org.apache.sqoop.job.mr.SqoopMapper;
@ -53,14 +53,7 @@ public SubmissionRequest createSubmissionRequest() {
return new MRSubmissionRequest(); return new MRSubmissionRequest();
} }
/** public void prepareSubmission(MRSubmissionRequest request) {
* {@inheritDoc}
*/
@Override
public void prepareImportSubmission(SubmissionRequest gRequest) {
MRSubmissionRequest request = (MRSubmissionRequest) gRequest;
ImportJobConfiguration jobConf = (ImportJobConfiguration) request.getConfigFrameworkJob();
// Add jar dependencies // Add jar dependencies
addDependencies(request); addDependencies(request);
@ -68,13 +61,35 @@ public void prepareImportSubmission(SubmissionRequest gRequest) {
request.setInputFormatClass(SqoopInputFormat.class); request.setInputFormatClass(SqoopInputFormat.class);
request.setMapperClass(SqoopMapper.class); request.setMapperClass(SqoopMapper.class);
request.setMapOutputKeyClass(Data.class); request.setMapOutputKeyClass(SqoopWritable.class);
request.setMapOutputValueClass(NullWritable.class); request.setMapOutputValueClass(NullWritable.class);
request.setOutputFormatClass(SqoopFileOutputFormat.class); request.setOutputFormatClass(SqoopNullOutputFormat.class);
request.setOutputKeyClass(Data.class); request.setOutputKeyClass(SqoopWritable.class);
request.setOutputValueClass(NullWritable.class); request.setOutputValueClass(NullWritable.class);
// Set up framework context
MutableMapContext context = request.getFrameworkContext();
context.setString(JobConstants.INTERMEDIATE_DATA_FORMAT,
request.getIntermediateDataFormat().getName());
if(request.getExtractors() != null) {
context.setInteger(JobConstants.JOB_ETL_EXTRACTOR_NUM, request.getExtractors());
}
}
/**
* {@inheritDoc}
*/
@Override
public void prepareImportSubmission(SubmissionRequest gRequest) {
MRSubmissionRequest request = (MRSubmissionRequest) gRequest;
prepareSubmission(request);
request.setOutputFormatClass(SqoopFileOutputFormat.class);
ImportJobConfiguration jobConf = (ImportJobConfiguration) request.getConfigFrameworkJob();
Importer importer = (Importer)request.getConnectorCallbacks(); Importer importer = (Importer)request.getConnectorCallbacks();
// Set up framework context // Set up framework context
@ -83,10 +98,6 @@ public void prepareImportSubmission(SubmissionRequest gRequest) {
context.setString(JobConstants.JOB_ETL_EXTRACTOR, importer.getExtractor().getName()); context.setString(JobConstants.JOB_ETL_EXTRACTOR, importer.getExtractor().getName());
context.setString(JobConstants.JOB_ETL_DESTROYER, importer.getDestroyer().getName()); context.setString(JobConstants.JOB_ETL_DESTROYER, importer.getDestroyer().getName());
if(request.getExtractors() != null) {
context.setInteger(JobConstants.JOB_ETL_EXTRACTOR_NUM, request.getExtractors());
}
// TODO: This settings should be abstracted to core module at some point // TODO: This settings should be abstracted to core module at some point
if(jobConf.output.outputFormat == OutputFormat.TEXT_FILE) { if(jobConf.output.outputFormat == OutputFormat.TEXT_FILE) {
context.setString(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName()); context.setString(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName());
@ -137,19 +148,7 @@ public void prepareExportSubmission(SubmissionRequest gRequest) {
MRSubmissionRequest request = (MRSubmissionRequest) gRequest; MRSubmissionRequest request = (MRSubmissionRequest) gRequest;
ExportJobConfiguration jobConf = (ExportJobConfiguration) request.getConfigFrameworkJob(); ExportJobConfiguration jobConf = (ExportJobConfiguration) request.getConfigFrameworkJob();
// Add jar dependencies prepareSubmission(request);
addDependencies(request);
// Configure map-reduce classes for import
request.setInputFormatClass(SqoopInputFormat.class);
request.setMapperClass(SqoopMapper.class);
request.setMapOutputKeyClass(Data.class);
request.setMapOutputValueClass(NullWritable.class);
request.setOutputFormatClass(SqoopNullOutputFormat.class);
request.setOutputKeyClass(Data.class);
request.setOutputValueClass(NullWritable.class);
Exporter exporter = (Exporter)request.getConnectorCallbacks(); Exporter exporter = (Exporter)request.getConnectorCallbacks();
@ -162,10 +161,6 @@ public void prepareExportSubmission(SubmissionRequest gRequest) {
// Extractor that will be able to read all supported file types // Extractor that will be able to read all supported file types
context.setString(JobConstants.JOB_ETL_EXTRACTOR, HdfsExportExtractor.class.getName()); context.setString(JobConstants.JOB_ETL_EXTRACTOR, HdfsExportExtractor.class.getName());
context.setString(JobConstants.HADOOP_INPUTDIR, jobConf.input.inputDirectory); context.setString(JobConstants.HADOOP_INPUTDIR, jobConf.input.inputDirectory);
if(request.getExtractors() != null) {
context.setInteger(JobConstants.JOB_ETL_EXTRACTOR_NUM, request.getExtractors());
}
} }
/** /**

View File

@ -18,6 +18,7 @@
package org.apache.sqoop.job; package org.apache.sqoop.job;
import org.apache.sqoop.core.ConfigurationConstants; import org.apache.sqoop.core.ConfigurationConstants;
import org.apache.sqoop.framework.FrameworkConstants;
public final class JobConstants extends Constants { public final class JobConstants extends Constants {
/** /**
@ -66,6 +67,9 @@ public final class JobConstants extends Constants {
public static final String HADOOP_COMPRESS_CODEC = public static final String HADOOP_COMPRESS_CODEC =
"mapred.output.compression.codec"; "mapred.output.compression.codec";
public static final String INTERMEDIATE_DATA_FORMAT =
FrameworkConstants.PREFIX_EXECUTION_CONFIG + "intermediate.format";
private JobConstants() { private JobConstants() {
// Disable explicit object creation // Disable explicit object creation
} }

View File

@ -36,7 +36,6 @@
import org.apache.sqoop.framework.configuration.ExportJobConfiguration; import org.apache.sqoop.framework.configuration.ExportJobConfiguration;
import org.apache.sqoop.job.MapreduceExecutionError; import org.apache.sqoop.job.MapreduceExecutionError;
import org.apache.sqoop.job.PrefixContext; import org.apache.sqoop.job.PrefixContext;
import org.apache.sqoop.job.io.Data;
/** /**
* Extract from HDFS. * Extract from HDFS.
@ -50,12 +49,6 @@ public class HdfsExportExtractor extends Extractor<ConnectionConfiguration, Expo
private DataWriter dataWriter; private DataWriter dataWriter;
private long rowRead = 0; private long rowRead = 0;
private final char fieldDelimiter;
public HdfsExportExtractor() {
fieldDelimiter = Data.DEFAULT_FIELD_DELIMITER;
}
@Override @Override
public void extract(ExtractorContext context, public void extract(ExtractorContext context,
ConnectionConfiguration connectionConfiguration, ConnectionConfiguration connectionConfiguration,
@ -63,7 +56,6 @@ public void extract(ExtractorContext context,
conf = ((PrefixContext) context.getContext()).getConfiguration(); conf = ((PrefixContext) context.getContext()).getConfiguration();
dataWriter = context.getDataWriter(); dataWriter = context.getDataWriter();
dataWriter.setFieldDelimiter(fieldDelimiter);
try { try {
HdfsExportPartition p = partition; HdfsExportPartition p = partition;
@ -113,7 +105,7 @@ private void extractSequenceFile(Path file, long start, long length)
boolean hasNext = filereader.next(line); boolean hasNext = filereader.next(line);
while (hasNext) { while (hasNext) {
rowRead++; rowRead++;
dataWriter.writeCsvRecord(line.toString()); dataWriter.writeStringRecord(line.toString());
line = new Text(); line = new Text();
hasNext = filereader.next(line); hasNext = filereader.next(line);
if (filereader.getPosition() >= end && filereader.syncSeen()) { if (filereader.getPosition() >= end && filereader.syncSeen()) {
@ -173,7 +165,7 @@ private void extractTextFile(Path file, long start, long length)
next = fileseeker.getPos(); next = fileseeker.getPos();
} }
rowRead++; rowRead++;
dataWriter.writeCsvRecord(line.toString()); dataWriter.writeStringRecord(line.toString());
} }
LOG.info("Extracting ended on position: " + fileseeker.getPos()); LOG.info("Extracting ended on position: " + fileseeker.getPos());
filestream.close(); filestream.close();

View File

@ -30,7 +30,6 @@
import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.job.JobConstants; import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.MapreduceExecutionError; import org.apache.sqoop.job.MapreduceExecutionError;
import org.apache.sqoop.job.io.Data;
import org.apache.sqoop.etl.io.DataReader; import org.apache.sqoop.etl.io.DataReader;
import org.apache.sqoop.utils.ClassUtils; import org.apache.sqoop.utils.ClassUtils;
@ -38,16 +37,9 @@ public class HdfsSequenceImportLoader extends Loader {
public static final String EXTENSION = ".seq"; public static final String EXTENSION = ".seq";
private final char fieldDelimiter;
public HdfsSequenceImportLoader() {
fieldDelimiter = Data.DEFAULT_FIELD_DELIMITER;
}
@Override @Override
public void load(LoaderContext context, Object oc, Object oj) throws Exception { public void load(LoaderContext context, Object oc, Object oj) throws Exception {
DataReader reader = context.getDataReader(); DataReader reader = context.getDataReader();
reader.setFieldDelimiter(fieldDelimiter);
Configuration conf = new Configuration(); Configuration conf = new Configuration();
// Configuration conf = ((EtlContext)context).getConfiguration(); // Configuration conf = ((EtlContext)context).getConfiguration();
@ -87,7 +79,7 @@ public void load(LoaderContext context, Object oc, Object oj) throws Exception {
String csv; String csv;
Text text = new Text(); Text text = new Text();
while ((csv = reader.readCsvRecord()) != null) { while ((csv = reader.readTextRecord()) != null) {
text.set(csv); text.set(csv);
filewriter.append(text, NullWritable.get()); filewriter.append(text, NullWritable.get());
} }

View File

@ -22,6 +22,7 @@
import java.io.IOException; import java.io.IOException;
import java.io.OutputStreamWriter; import java.io.OutputStreamWriter;
import com.google.common.base.Charsets;
import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -36,18 +37,15 @@
public class HdfsTextImportLoader extends Loader { public class HdfsTextImportLoader extends Loader {
private final char fieldDelimiter;
private final char recordDelimiter; private final char recordDelimiter;
public HdfsTextImportLoader() { public HdfsTextImportLoader() {
fieldDelimiter = Data.DEFAULT_FIELD_DELIMITER;
recordDelimiter = Data.DEFAULT_RECORD_DELIMITER; recordDelimiter = Data.DEFAULT_RECORD_DELIMITER;
} }
@Override @Override
public void load(LoaderContext context, Object oc, Object oj) throws Exception{ public void load(LoaderContext context, Object oc, Object oj) throws Exception{
DataReader reader = context.getDataReader(); DataReader reader = context.getDataReader();
reader.setFieldDelimiter(fieldDelimiter);
Configuration conf = new Configuration(); Configuration conf = new Configuration();
// Configuration conf = ((EtlContext)context).getConfiguration(); // Configuration conf = ((EtlContext)context).getConfiguration();
@ -81,15 +79,15 @@ public void load(LoaderContext context, Object oc, Object oj) throws Exception{
DataOutputStream filestream = fs.create(filepath, false); DataOutputStream filestream = fs.create(filepath, false);
if (codec != null) { if (codec != null) {
filewriter = new BufferedWriter(new OutputStreamWriter( filewriter = new BufferedWriter(new OutputStreamWriter(
codec.createOutputStream(filestream, codec.createCompressor()), codec.createOutputStream(filestream, codec.createCompressor()),
Data.CHARSET_NAME)); Charsets.UTF_8));
} else { } else {
filewriter = new BufferedWriter(new OutputStreamWriter( filewriter = new BufferedWriter(new OutputStreamWriter(
filestream, Data.CHARSET_NAME)); filestream, Charsets.UTF_8));
} }
String csv; String csv;
while ((csv = reader.readCsvRecord()) != null) { while ((csv = reader.readTextRecord()) != null) {
filewriter.write(csv + recordDelimiter); filewriter.write(csv + recordDelimiter);
} }
filewriter.close(); filewriter.close();

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sqoop.job.io;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class SqoopWritable implements WritableComparable<SqoopWritable> {
private String strData;
public SqoopWritable() {}
public void setString(String data) {
strData = data;
}
public String getString() {
return strData;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(strData);
}
@Override
public void readFields(DataInput in) throws IOException {
strData = in.readUTF();
}
@Override
public int compareTo(SqoopWritable o) {
return strData.compareTo(o.getString());
}
@Override
public String toString() {
return getString();
}
}

View File

@ -34,13 +34,13 @@
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.sqoop.job.JobConstants; import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.io.Data; import org.apache.sqoop.job.io.SqoopWritable;
/** /**
* An output format for MapReduce job. * An output format for MapReduce job.
*/ */
public class SqoopFileOutputFormat public class SqoopFileOutputFormat
extends FileOutputFormat<Data, NullWritable> { extends FileOutputFormat<SqoopWritable, NullWritable> {
public static final Logger LOG = public static final Logger LOG =
Logger.getLogger(SqoopFileOutputFormat.class); Logger.getLogger(SqoopFileOutputFormat.class);
@ -49,7 +49,7 @@ public class SqoopFileOutputFormat
DefaultCodec.class; DefaultCodec.class;
@Override @Override
public RecordWriter<Data, NullWritable> getRecordWriter( public RecordWriter<SqoopWritable, NullWritable> getRecordWriter(
TaskAttemptContext context) throws IOException { TaskAttemptContext context) throws IOException {
Configuration conf = context.getConfiguration(); Configuration conf = context.getConfiguration();
@ -69,6 +69,7 @@ public RecordWriter<Data, NullWritable> getRecordWriter(
return executor.getRecordWriter(); return executor.getRecordWriter();
} }
@Override
public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException { public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException {
Path output = getOutputPath(context); Path output = getOutputPath(context);
return new DestroyerFileOutputCommitter(output, context); return new DestroyerFileOutputCommitter(output, context);

View File

@ -27,21 +27,22 @@
import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.idf.IntermediateDataFormat;
import org.apache.sqoop.job.JobConstants; import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.MapreduceExecutionError; import org.apache.sqoop.job.MapreduceExecutionError;
import org.apache.sqoop.job.PrefixContext; import org.apache.sqoop.job.PrefixContext;
import org.apache.sqoop.job.etl.Extractor; import org.apache.sqoop.job.etl.Extractor;
import org.apache.sqoop.job.etl.ExtractorContext; import org.apache.sqoop.job.etl.ExtractorContext;
import org.apache.sqoop.job.io.Data;
import org.apache.sqoop.etl.io.DataWriter; import org.apache.sqoop.etl.io.DataWriter;
import org.apache.sqoop.schema.Schema; import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.job.io.SqoopWritable;
import org.apache.sqoop.submission.counter.SqoopCounters; import org.apache.sqoop.submission.counter.SqoopCounters;
import org.apache.sqoop.utils.ClassUtils; import org.apache.sqoop.utils.ClassUtils;
/** /**
* A mapper to perform map function. * A mapper to perform map function.
*/ */
public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, Data, NullWritable> { public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable, NullWritable> {
static { static {
ConfigurationUtils.configureLogging(); ConfigurationUtils.configureLogging();
@ -52,6 +53,8 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, Data, NullWrit
* Service for reporting progress to mapreduce. * Service for reporting progress to mapreduce.
*/ */
private final ScheduledExecutorService progressService = Executors.newSingleThreadScheduledExecutor(); private final ScheduledExecutorService progressService = Executors.newSingleThreadScheduledExecutor();
private IntermediateDataFormat data = null;
private SqoopWritable dataOut = null;
@Override @Override
public void run(Context context) throws IOException, InterruptedException { public void run(Context context) throws IOException, InterruptedException {
@ -60,6 +63,12 @@ public void run(Context context) throws IOException, InterruptedException {
String extractorName = conf.get(JobConstants.JOB_ETL_EXTRACTOR); String extractorName = conf.get(JobConstants.JOB_ETL_EXTRACTOR);
Extractor extractor = (Extractor) ClassUtils.instantiate(extractorName); Extractor extractor = (Extractor) ClassUtils.instantiate(extractorName);
String intermediateDataFormatName = conf.get(JobConstants
.INTERMEDIATE_DATA_FORMAT);
data = (IntermediateDataFormat) ClassUtils.instantiate(intermediateDataFormatName);
data.setSchema(ConfigurationUtils.getConnectorSchema(conf));
dataOut = new SqoopWritable();
// Objects that should be pass to the Executor execution // Objects that should be pass to the Executor execution
PrefixContext subContext = null; PrefixContext subContext = null;
Object configConnection = null; Object configConnection = null;
@ -109,46 +118,38 @@ public void run(Context context) throws IOException, InterruptedException {
} }
} }
public class MapDataWriter extends DataWriter { private class MapDataWriter extends DataWriter {
private Context context; private Context context;
private Data data;
public MapDataWriter(Context context) { public MapDataWriter(Context context) {
this.context = context; this.context = context;
} }
@Override
public void setFieldDelimiter(char fieldDelimiter) {
if (data == null) {
data = new Data();
}
data.setFieldDelimiter(fieldDelimiter);
}
@Override @Override
public void writeArrayRecord(Object[] array) { public void writeArrayRecord(Object[] array) {
writeContent(array, Data.ARRAY_RECORD); data.setObjectData(array);
writeContent();
} }
@Override @Override
public void writeCsvRecord(String csv) { public void writeStringRecord(String text) {
writeContent(csv, Data.CSV_RECORD); data.setTextData(text);
writeContent();
} }
@Override @Override
public void writeContent(Object content, int type) { public void writeRecord(Object obj) {
if (data == null) { data.setData(obj.toString());
data = new Data(); writeContent();
} }
data.setContent(content, type); private void writeContent() {
try { try {
context.write(data, NullWritable.get()); dataOut.setString(data.getTextData());
context.write(dataOut, NullWritable.get());
} catch (Exception e) { } catch (Exception e) {
throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0013, e); throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0013, e);
} }
} }
} }
} }

View File

@ -28,14 +28,14 @@
import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.sqoop.job.JobConstants; import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.io.Data; import org.apache.sqoop.job.io.SqoopWritable;
import java.io.IOException; import java.io.IOException;
/** /**
* An output format for MapReduce job. * An output format for MapReduce job.
*/ */
public class SqoopNullOutputFormat extends OutputFormat<Data, NullWritable> { public class SqoopNullOutputFormat extends OutputFormat<SqoopWritable, NullWritable> {
public static final Logger LOG = public static final Logger LOG =
Logger.getLogger(SqoopNullOutputFormat.class); Logger.getLogger(SqoopNullOutputFormat.class);
@ -46,7 +46,7 @@ public void checkOutputSpecs(JobContext context) {
} }
@Override @Override
public RecordWriter<Data, NullWritable> getRecordWriter( public RecordWriter<SqoopWritable, NullWritable> getRecordWriter(
TaskAttemptContext context) { TaskAttemptContext context) {
SqoopOutputFormatLoadExecutor executor = SqoopOutputFormatLoadExecutor executor =
new SqoopOutputFormatLoadExecutor(context); new SqoopOutputFormatLoadExecutor(context);

View File

@ -31,14 +31,16 @@
import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
import org.apache.sqoop.connector.idf.IntermediateDataFormat;
import org.apache.sqoop.job.JobConstants; import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.MapreduceExecutionError; import org.apache.sqoop.job.MapreduceExecutionError;
import org.apache.sqoop.job.PrefixContext; import org.apache.sqoop.job.PrefixContext;
import org.apache.sqoop.job.etl.Loader; import org.apache.sqoop.job.etl.Loader;
import org.apache.sqoop.job.etl.LoaderContext; import org.apache.sqoop.job.etl.LoaderContext;
import org.apache.sqoop.job.io.Data;
import org.apache.sqoop.etl.io.DataReader; import org.apache.sqoop.etl.io.DataReader;
import org.apache.sqoop.schema.Schema; import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.job.io.SqoopWritable;
import org.apache.sqoop.utils.ClassUtils; import org.apache.sqoop.utils.ClassUtils;
public class SqoopOutputFormatLoadExecutor { public class SqoopOutputFormatLoadExecutor {
@ -48,7 +50,7 @@ public class SqoopOutputFormatLoadExecutor {
private volatile boolean readerFinished = false; private volatile boolean readerFinished = false;
private volatile boolean writerFinished = false; private volatile boolean writerFinished = false;
private volatile Data data; private volatile IntermediateDataFormat data;
private JobContext context; private JobContext context;
private SqoopRecordWriter producer; private SqoopRecordWriter producer;
private Future<?> consumerFuture; private Future<?> consumerFuture;
@ -60,17 +62,19 @@ public class SqoopOutputFormatLoadExecutor {
SqoopOutputFormatLoadExecutor(boolean isTest, String loaderName){ SqoopOutputFormatLoadExecutor(boolean isTest, String loaderName){
this.isTest = isTest; this.isTest = isTest;
this.loaderName = loaderName; this.loaderName = loaderName;
data = new Data(); data = new CSVIntermediateDataFormat();
producer = new SqoopRecordWriter(); producer = new SqoopRecordWriter();
} }
public SqoopOutputFormatLoadExecutor(JobContext jobctx) { public SqoopOutputFormatLoadExecutor(JobContext jobctx) {
data = new Data();
context = jobctx; context = jobctx;
producer = new SqoopRecordWriter(); producer = new SqoopRecordWriter();
data = (IntermediateDataFormat) ClassUtils.instantiate(context
.getConfiguration().get(JobConstants.INTERMEDIATE_DATA_FORMAT));
data.setSchema(ConfigurationUtils.getConnectorSchema(context.getConfiguration()));
} }
public RecordWriter<Data, NullWritable> getRecordWriter() { public RecordWriter<SqoopWritable, NullWritable> getRecordWriter() {
consumerFuture = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat consumerFuture = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat
("OutputFormatLoader-consumer").build()).submit( ("OutputFormatLoader-consumer").build()).submit(
new ConsumerThread()); new ConsumerThread());
@ -81,14 +85,13 @@ public RecordWriter<Data, NullWritable> getRecordWriter() {
* This is a producer-consumer problem and can be solved * This is a producer-consumer problem and can be solved
* with two semaphores. * with two semaphores.
*/ */
private class SqoopRecordWriter extends RecordWriter<Data, NullWritable> { private class SqoopRecordWriter extends RecordWriter<SqoopWritable, NullWritable> {
@Override @Override
public void write(Data key, NullWritable value) throws InterruptedException { public void write(SqoopWritable key, NullWritable value) throws InterruptedException {
free.acquire(); free.acquire();
checkIfConsumerThrew(); checkIfConsumerThrew();
int type = key.getType(); data.setTextData(key.getString());
data.setContent(key.getContent(type), type);
filled.release(); filled.release();
} }
@ -135,23 +138,53 @@ private void waitForConsumer() {
} }
private class OutputFormatDataReader extends DataReader { private class OutputFormatDataReader extends DataReader {
@Override
public void setFieldDelimiter(char fieldDelimiter) {
data.setFieldDelimiter(fieldDelimiter);
}
@Override @Override
public Object[] readArrayRecord() throws InterruptedException { public Object[] readArrayRecord() throws InterruptedException {
return (Object[])readContent(Data.ARRAY_RECORD); acquireSema();
// If the writer has finished, there is definitely no data remaining
if (writerFinished) {
return null;
}
try {
return data.getObjectData();
} finally {
releaseSema();
}
} }
@Override @Override
public String readCsvRecord() throws InterruptedException { public String readTextRecord() throws InterruptedException {
return (String)readContent(Data.CSV_RECORD); acquireSema();
// If the writer has finished, there is definitely no data remaining
if (writerFinished) {
return null;
}
try {
return data.getTextData();
} finally {
releaseSema();
}
} }
@Override @Override
public Object readContent(int type) throws InterruptedException { public Object readContent() throws InterruptedException {
acquireSema();
if (writerFinished) {
return null;
}
try {
return data.getData();
} catch (Throwable t) {
readerFinished = true;
LOG.error("Caught exception e while getting content ", t);
throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0018, t);
} finally {
releaseSema();
}
}
private void acquireSema() throws InterruptedException {
// Has any more data been produced after I last consumed. // Has any more data been produced after I last consumed.
// If no, wait for the producer to produce. // If no, wait for the producer to produce.
try { try {
@ -159,23 +192,13 @@ public Object readContent(int type) throws InterruptedException {
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
//Really at this point, there is nothing to do. Just throw and get out //Really at this point, there is nothing to do. Just throw and get out
LOG.error("Interrupted while waiting for data to be available from " + LOG.error("Interrupted while waiting for data to be available from " +
"mapper", ex); "mapper", ex);
throw ex; throw ex;
} }
// If the writer has finished, there is definitely no data remaining }
if (writerFinished) {
return null; private void releaseSema(){
} free.release();
try {
Object content = data.getContent(type);
return content;
} catch (Throwable t) {
readerFinished = true;
LOG.error("Caught exception e while getting content ", t);
throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0018, t);
} finally {
free.release();
}
} }
} }

View File

@ -20,7 +20,7 @@
import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.sqoop.job.io.Data; import org.apache.sqoop.job.io.SqoopWritable;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -30,7 +30,7 @@
/** /**
* A reducer to perform reduce function. * A reducer to perform reduce function.
*/ */
public class SqoopReducer extends Reducer<Data, NullWritable, Data, NullWritable> { public class SqoopReducer extends Reducer<SqoopWritable, NullWritable, SqoopWritable, NullWritable> {
static { static {
ConfigurationUtils.configureLogging(); ConfigurationUtils.configureLogging();

View File

@ -18,6 +18,7 @@
package org.apache.sqoop.execution.mapreduce; package org.apache.sqoop.execution.mapreduce;
import org.apache.sqoop.common.MutableMapContext; import org.apache.sqoop.common.MutableMapContext;
import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
import org.apache.sqoop.framework.SubmissionRequest; import org.apache.sqoop.framework.SubmissionRequest;
import org.apache.sqoop.framework.configuration.ImportJobConfiguration; import org.apache.sqoop.framework.configuration.ImportJobConfiguration;
import org.apache.sqoop.framework.configuration.OutputCompression; import org.apache.sqoop.framework.configuration.OutputCompression;
@ -71,6 +72,7 @@ private void testImportCompressionInner(OutputCompression comprssionFormat,
request.setConnectorCallbacks(new Importer(Initializer.class, request.setConnectorCallbacks(new Importer(Initializer.class,
Partitioner.class, Extractor.class, Destroyer.class) { Partitioner.class, Extractor.class, Destroyer.class) {
}); });
request.setIntermediateDataFormat(CSVIntermediateDataFormat.class);
executionEngine.prepareImportSubmission(request); executionEngine.prepareImportSubmission(request);
MutableMapContext context = request.getFrameworkContext(); MutableMapContext context = request.getFrameworkContext();
@ -97,6 +99,7 @@ public void testCustomCompression() {
request.setConnectorCallbacks(new Importer(Initializer.class, request.setConnectorCallbacks(new Importer(Initializer.class,
Partitioner.class, Extractor.class, Destroyer.class) { Partitioner.class, Extractor.class, Destroyer.class) {
}); });
request.setIntermediateDataFormat(CSVIntermediateDataFormat.class);
executionEngine.prepareImportSubmission(request); executionEngine.prepareImportSubmission(request);
MutableMapContext context = request.getFrameworkContext(); MutableMapContext context = request.getFrameworkContext();

View File

@ -27,7 +27,7 @@
import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.sqoop.job.io.Data; import org.apache.sqoop.job.io.SqoopWritable;
import org.apache.sqoop.job.mr.SqoopFileOutputFormat; import org.apache.sqoop.job.mr.SqoopFileOutputFormat;
import org.apache.sqoop.job.mr.SqoopInputFormat; import org.apache.sqoop.job.mr.SqoopInputFormat;
import org.apache.sqoop.job.mr.SqoopMapper; import org.apache.sqoop.job.mr.SqoopMapper;
@ -44,17 +44,17 @@ public static void runJob(Configuration conf)
} }
public static void runJob(Configuration conf, public static void runJob(Configuration conf,
Class<? extends InputFormat<SqoopSplit, NullWritable>> input, Class<? extends InputFormat<SqoopSplit, NullWritable>> input,
Class<? extends Mapper<SqoopSplit, NullWritable, Data, NullWritable>> mapper, Class<? extends Mapper<SqoopSplit, NullWritable, SqoopWritable, NullWritable>> mapper,
Class<? extends OutputFormat<Data, NullWritable>> output) Class<? extends OutputFormat<SqoopWritable, NullWritable>> output)
throws IOException, InterruptedException, ClassNotFoundException { throws IOException, InterruptedException, ClassNotFoundException {
Job job = new Job(conf); Job job = new Job(conf);
job.setInputFormatClass(input); job.setInputFormatClass(input);
job.setMapperClass(mapper); job.setMapperClass(mapper);
job.setMapOutputKeyClass(Data.class); job.setMapOutputKeyClass(SqoopWritable.class);
job.setMapOutputValueClass(NullWritable.class); job.setMapOutputValueClass(NullWritable.class);
job.setOutputFormatClass(output); job.setOutputFormatClass(output);
job.setOutputKeyClass(Data.class); job.setOutputKeyClass(SqoopWritable.class);
job.setOutputValueClass(NullWritable.class); job.setOutputValueClass(NullWritable.class);
boolean success = job.waitForCompletion(true); boolean success = job.waitForCompletion(true);

View File

@ -30,10 +30,12 @@
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec; import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
import org.apache.sqoop.job.etl.HdfsExportExtractor; import org.apache.sqoop.job.etl.HdfsExportExtractor;
import org.apache.sqoop.job.etl.HdfsExportPartitioner; import org.apache.sqoop.job.etl.HdfsExportPartitioner;
import org.apache.sqoop.job.etl.HdfsSequenceImportLoader; import org.apache.sqoop.job.etl.HdfsSequenceImportLoader;
@ -45,6 +47,9 @@
import org.apache.sqoop.job.mr.ConfigurationUtils; import org.apache.sqoop.job.mr.ConfigurationUtils;
import org.apache.sqoop.job.mr.SqoopFileOutputFormat; import org.apache.sqoop.job.mr.SqoopFileOutputFormat;
import org.apache.sqoop.model.MJob; import org.apache.sqoop.model.MJob;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.schema.type.FixedPoint;
import org.apache.sqoop.schema.type.FloatingPoint;
import org.junit.Test; import org.junit.Test;
public class TestHdfsExtract extends TestCase { public class TestHdfsExtract extends TestCase {
@ -53,12 +58,22 @@ public class TestHdfsExtract extends TestCase {
private static final int NUMBER_OF_FILES = 5; private static final int NUMBER_OF_FILES = 5;
private static final int NUMBER_OF_ROWS_PER_FILE = 1000; private static final int NUMBER_OF_ROWS_PER_FILE = 1000;
private String indir; private final String indir;
public TestHdfsExtract() { public TestHdfsExtract() {
indir = INPUT_ROOT + getClass().getSimpleName(); indir = INPUT_ROOT + getClass().getSimpleName();
} }
@Override
public void setUp() throws IOException {
FileUtils.mkdirs(indir);
}
@Override
public void tearDown() throws IOException {
FileUtils.delete(indir);
}
/** /**
* Test case for validating the number of partitions creation * Test case for validating the number of partitions creation
* based on input. * based on input.
@ -68,12 +83,12 @@ public TestHdfsExtract() {
*/ */
@Test @Test
public void testHdfsExportPartitioner() throws Exception { public void testHdfsExportPartitioner() throws Exception {
FileUtils.delete(indir);
FileUtils.mkdirs(indir);
createTextInput(null); createTextInput(null);
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.set(JobConstants.HADOOP_INPUTDIR, indir); conf.set(JobConstants.HADOOP_INPUTDIR, indir);
conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
CSVIntermediateDataFormat.class.getName());
HdfsExportPartitioner partitioner = new HdfsExportPartitioner(); HdfsExportPartitioner partitioner = new HdfsExportPartitioner();
PrefixContext prefixContext = new PrefixContext(conf, ""); PrefixContext prefixContext = new PrefixContext(conf, "");
int[] partitionValues = {2, 3, 4, 5, 7, 8, 9, 10, 11, 12, 13, 17}; int[] partitionValues = {2, 3, 4, 5, 7, 8, 9, 10, 11, 12, 13, 17};
@ -87,87 +102,67 @@ public void testHdfsExportPartitioner() throws Exception {
@Test @Test
public void testUncompressedText() throws Exception { public void testUncompressedText() throws Exception {
FileUtils.delete(indir);
FileUtils.mkdirs(indir);
createTextInput(null); createTextInput(null);
Configuration conf = new Configuration(); JobUtils.runJob(createJob(createConf(), createSchema()).getConfiguration());
ConfigurationUtils.setJobType(conf, MJob.Type.EXPORT);
conf.set(JobConstants.JOB_ETL_PARTITIONER,
HdfsExportPartitioner.class.getName());
conf.set(JobConstants.JOB_ETL_EXTRACTOR,
HdfsExportExtractor.class.getName());
conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4");
conf.set(JobConstants.HADOOP_INPUTDIR, indir);
JobUtils.runJob(conf);
} }
@Test @Test
public void testCompressedText() throws Exception { public void testDefaultCompressedText() throws Exception {
FileUtils.delete(indir);
FileUtils.mkdirs(indir);
createTextInput(SqoopFileOutputFormat.DEFAULT_CODEC); createTextInput(SqoopFileOutputFormat.DEFAULT_CODEC);
Configuration conf = new Configuration(); JobUtils.runJob(createJob(createConf(), createSchema()).getConfiguration());
ConfigurationUtils.setJobType(conf, MJob.Type.EXPORT);
conf.set(JobConstants.JOB_ETL_PARTITIONER,
HdfsExportPartitioner.class.getName());
conf.set(JobConstants.JOB_ETL_EXTRACTOR,
HdfsExportExtractor.class.getName());
conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4");
conf.set(JobConstants.HADOOP_INPUTDIR, indir);
JobUtils.runJob(conf);
FileUtils.delete(indir);
FileUtils.mkdirs(indir);
createTextInput(BZip2Codec.class);
conf.set(JobConstants.JOB_ETL_PARTITIONER,
HdfsExportPartitioner.class.getName());
conf.set(JobConstants.JOB_ETL_EXTRACTOR,
HdfsExportExtractor.class.getName());
conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4");
conf.set(JobConstants.HADOOP_INPUTDIR, indir);
JobUtils.runJob(conf);
} }
@Test @Test
public void testCompressedSequence() throws Exception { public void testBZip2CompressedText() throws Exception {
FileUtils.delete(indir); createTextInput(BZip2Codec.class);
FileUtils.mkdirs(indir);
JobUtils.runJob(createJob(createConf(), createSchema()).getConfiguration());
}
@Test
public void testDefaultCompressedSequence() throws Exception {
createSequenceInput(SqoopFileOutputFormat.DEFAULT_CODEC); createSequenceInput(SqoopFileOutputFormat.DEFAULT_CODEC);
Configuration conf = new Configuration(); JobUtils.runJob(createJob(createConf(), createSchema()).getConfiguration());
ConfigurationUtils.setJobType(conf, MJob.Type.EXPORT);
conf.set(JobConstants.JOB_ETL_PARTITIONER,
HdfsExportPartitioner.class.getName());
conf.set(JobConstants.JOB_ETL_EXTRACTOR,
HdfsExportExtractor.class.getName());
conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4");
conf.set(JobConstants.HADOOP_INPUTDIR, indir);
JobUtils.runJob(conf);
} }
@Test @Test
public void testUncompressedSequence() throws Exception { public void testUncompressedSequence() throws Exception {
FileUtils.delete(indir);
FileUtils.mkdirs(indir);
createSequenceInput(null); createSequenceInput(null);
JobUtils.runJob(createJob(createConf(), createSchema()).getConfiguration());
}
private Schema createSchema() {
Schema schema = new Schema("Test");
schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
.addColumn(new org.apache.sqoop.schema.type.Text("3"));
return schema;
}
private Configuration createConf() {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
ConfigurationUtils.setJobType(conf, MJob.Type.EXPORT); ConfigurationUtils.setJobType(conf, MJob.Type.EXPORT);
conf.set(JobConstants.JOB_ETL_PARTITIONER, conf.setIfUnset(JobConstants.JOB_ETL_PARTITIONER,
HdfsExportPartitioner.class.getName()); HdfsExportPartitioner.class.getName());
conf.set(JobConstants.JOB_ETL_EXTRACTOR, conf.setIfUnset(JobConstants.JOB_ETL_EXTRACTOR,
HdfsExportExtractor.class.getName()); HdfsExportExtractor.class.getName());
conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName()); conf.setIfUnset(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4"); conf.setIfUnset(Constants.JOB_ETL_NUMBER_PARTITIONS, "4");
conf.set(JobConstants.HADOOP_INPUTDIR, indir); conf.setIfUnset(JobConstants.INTERMEDIATE_DATA_FORMAT,
JobUtils.runJob(conf); CSVIntermediateDataFormat.class.getName());
conf.setIfUnset(JobConstants.HADOOP_INPUTDIR, indir);
return conf;
}
private Job createJob(Configuration conf, Schema schema) throws Exception {
Job job = new Job(conf);
ConfigurationUtils.setConnectorSchema(job, schema);
job.getConfiguration().set(JobConstants.INTERMEDIATE_DATA_FORMAT,
CSVIntermediateDataFormat.class.getName());
return job;
} }
private void createTextInput(Class<? extends CompressionCodec> clz) private void createTextInput(Class<? extends CompressionCodec> clz)
@ -227,11 +222,11 @@ private void createSequenceInput(Class<? extends CompressionCodec> clz)
SequenceFile.Writer filewriter; SequenceFile.Writer filewriter;
if (codec != null) { if (codec != null) {
filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf), filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf),
conf, filepath, Text.class, NullWritable.class, conf, filepath, Text.class, NullWritable.class,
CompressionType.BLOCK, codec); CompressionType.BLOCK, codec);
} else { } else {
filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf), filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf),
conf, filepath, Text.class, NullWritable.class, CompressionType.NONE); conf, filepath, Text.class, NullWritable.class, CompressionType.NONE);
} }
Text text = new Text(); Text text = new Text();

View File

@ -26,6 +26,7 @@
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import com.google.common.base.Charsets;
import junit.framework.TestCase; import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -33,7 +34,9 @@
import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
import org.apache.sqoop.job.etl.Extractor; import org.apache.sqoop.job.etl.Extractor;
import org.apache.sqoop.job.etl.ExtractorContext; import org.apache.sqoop.job.etl.ExtractorContext;
import org.apache.sqoop.job.etl.HdfsSequenceImportLoader; import org.apache.sqoop.job.etl.HdfsSequenceImportLoader;
@ -45,6 +48,9 @@
import org.apache.sqoop.job.mr.ConfigurationUtils; import org.apache.sqoop.job.mr.ConfigurationUtils;
import org.apache.sqoop.job.mr.SqoopFileOutputFormat; import org.apache.sqoop.job.mr.SqoopFileOutputFormat;
import org.apache.sqoop.model.MJob; import org.apache.sqoop.model.MJob;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.schema.type.FixedPoint;
import org.apache.sqoop.schema.type.FloatingPoint;
public class TestHdfsLoad extends TestCase { public class TestHdfsLoad extends TestCase {
@ -68,13 +74,21 @@ public void testUncompressedText() throws Exception {
conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
conf.set(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName()); conf.set(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName());
conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
CSVIntermediateDataFormat.class.getName());
conf.set(JobConstants.HADOOP_OUTDIR, outdir); conf.set(JobConstants.HADOOP_OUTDIR, outdir);
JobUtils.runJob(conf); Schema schema = new Schema("Test");
schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
.addColumn(new org.apache.sqoop.schema.type.Text("3"));
Job job = new Job(conf);
ConfigurationUtils.setConnectorSchema(job, schema);
JobUtils.runJob(job.getConfiguration());
String fileName = outdir + "/" + OUTPUT_FILE; String fileName = outdir + "/" + OUTPUT_FILE;
InputStream filestream = FileUtils.open(fileName); InputStream filestream = FileUtils.open(fileName);
BufferedReader filereader = new BufferedReader(new InputStreamReader( BufferedReader filereader = new BufferedReader(new InputStreamReader(
filestream, Data.CHARSET_NAME)); filestream, Charsets.UTF_8));
verifyOutputText(filereader); verifyOutputText(filereader);
} }
@ -86,9 +100,18 @@ public void testCompressedText() throws Exception {
conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
conf.set(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName()); conf.set(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName());
conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
CSVIntermediateDataFormat.class.getName());
conf.set(JobConstants.HADOOP_OUTDIR, outdir); conf.set(JobConstants.HADOOP_OUTDIR, outdir);
conf.setBoolean(JobConstants.HADOOP_COMPRESS, true); conf.setBoolean(JobConstants.HADOOP_COMPRESS, true);
JobUtils.runJob(conf);
Schema schema = new Schema("Test");
schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
.addColumn(new org.apache.sqoop.schema.type.Text("3"));
Job job = new Job(conf);
ConfigurationUtils.setConnectorSchema(job, schema);
JobUtils.runJob(job.getConfiguration());
Class<? extends CompressionCodec> codecClass = conf.getClass( Class<? extends CompressionCodec> codecClass = conf.getClass(
JobConstants.HADOOP_COMPRESS_CODEC, SqoopFileOutputFormat.DEFAULT_CODEC) JobConstants.HADOOP_COMPRESS_CODEC, SqoopFileOutputFormat.DEFAULT_CODEC)
@ -97,7 +120,7 @@ public void testCompressedText() throws Exception {
String fileName = outdir + "/" + OUTPUT_FILE + codec.getDefaultExtension(); String fileName = outdir + "/" + OUTPUT_FILE + codec.getDefaultExtension();
InputStream filestream = codec.createInputStream(FileUtils.open(fileName)); InputStream filestream = codec.createInputStream(FileUtils.open(fileName));
BufferedReader filereader = new BufferedReader(new InputStreamReader( BufferedReader filereader = new BufferedReader(new InputStreamReader(
filestream, Data.CHARSET_NAME)); filestream, Charsets.UTF_8));
verifyOutputText(filereader); verifyOutputText(filereader);
} }
@ -108,7 +131,7 @@ private void verifyOutputText(BufferedReader reader) throws IOException {
int index = START_ID*NUMBER_OF_ROWS_PER_ID; int index = START_ID*NUMBER_OF_ROWS_PER_ID;
while ((actual = reader.readLine()) != null){ while ((actual = reader.readLine()) != null){
data.setContent(new Object[] { data.setContent(new Object[] {
index, (double) index, String.valueOf(index) }, index, (double) index, new String(new byte[] {(byte)(index + 127)}, Charsets.ISO_8859_1) },
Data.ARRAY_RECORD); Data.ARRAY_RECORD);
expected = data.toString(); expected = data.toString();
index++; index++;
@ -129,8 +152,17 @@ public void testUncompressedSequence() throws Exception {
conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
conf.set(JobConstants.JOB_ETL_LOADER, HdfsSequenceImportLoader.class.getName()); conf.set(JobConstants.JOB_ETL_LOADER, HdfsSequenceImportLoader.class.getName());
conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
CSVIntermediateDataFormat.class.getName());
conf.set(JobConstants.HADOOP_OUTDIR, outdir); conf.set(JobConstants.HADOOP_OUTDIR, outdir);
JobUtils.runJob(conf);
Schema schema = new Schema("Test");
schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
.addColumn(new org.apache.sqoop.schema.type.Text("3"));
Job job = new Job(conf);
ConfigurationUtils.setConnectorSchema(job, schema);
JobUtils.runJob(job.getConfiguration());
Path filepath = new Path(outdir, Path filepath = new Path(outdir,
OUTPUT_FILE + HdfsSequenceImportLoader.EXTENSION); OUTPUT_FILE + HdfsSequenceImportLoader.EXTENSION);
@ -147,10 +179,18 @@ public void testCompressedSequence() throws Exception {
conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
conf.set(JobConstants.JOB_ETL_LOADER, HdfsSequenceImportLoader.class.getName()); conf.set(JobConstants.JOB_ETL_LOADER, HdfsSequenceImportLoader.class.getName());
conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
CSVIntermediateDataFormat.class.getName());
conf.set(JobConstants.HADOOP_OUTDIR, outdir); conf.set(JobConstants.HADOOP_OUTDIR, outdir);
conf.setBoolean(JobConstants.HADOOP_COMPRESS, true); conf.setBoolean(JobConstants.HADOOP_COMPRESS, true);
JobUtils.runJob(conf);
Schema schema = new Schema("Test");
schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
.addColumn(new org.apache.sqoop.schema.type.Text("3"));
Job job = new Job(conf);
ConfigurationUtils.setConnectorSchema(job, schema);
JobUtils.runJob(job.getConfiguration());
Path filepath = new Path(outdir, Path filepath = new Path(outdir,
OUTPUT_FILE + HdfsSequenceImportLoader.EXTENSION); OUTPUT_FILE + HdfsSequenceImportLoader.EXTENSION);
SequenceFile.Reader filereader = new SequenceFile.Reader(filepath.getFileSystem(conf), filepath, conf); SequenceFile.Reader filereader = new SequenceFile.Reader(filepath.getFileSystem(conf), filepath, conf);
@ -164,7 +204,7 @@ private void verifyOutputSequence(SequenceFile.Reader reader) throws IOException
Data data = new Data(); Data data = new Data();
while (reader.next(actual)){ while (reader.next(actual)){
data.setContent(new Object[] { data.setContent(new Object[] {
index, (double) index, String.valueOf(index) }, index, (double) index, new String(new byte[] {(byte)(index + 127)}, Charsets.ISO_8859_1) },
Data.ARRAY_RECORD); Data.ARRAY_RECORD);
expected.set(data.toString()); expected.set(data.toString());
index++; index++;
@ -225,7 +265,7 @@ public void extract(ExtractorContext context, Object oc, Object oj, Object parti
Object[] array = new Object[] { Object[] array = new Object[] {
id * NUMBER_OF_ROWS_PER_ID + row, id * NUMBER_OF_ROWS_PER_ID + row,
(double) (id * NUMBER_OF_ROWS_PER_ID + row), (double) (id * NUMBER_OF_ROWS_PER_ID + row),
String.valueOf(id*NUMBER_OF_ROWS_PER_ID+row) new String(new byte[]{(byte)(id * NUMBER_OF_ROWS_PER_ID + row + 127)}, Charsets.ISO_8859_1)
}; };
context.getDataWriter().writeArrayRecord(array); context.getDataWriter().writeArrayRecord(array);
} }

View File

@ -34,6 +34,7 @@
import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
import org.apache.sqoop.job.etl.Extractor; import org.apache.sqoop.job.etl.Extractor;
import org.apache.sqoop.job.etl.ExtractorContext; import org.apache.sqoop.job.etl.ExtractorContext;
import org.apache.sqoop.job.etl.Loader; import org.apache.sqoop.job.etl.Loader;
@ -42,12 +43,17 @@
import org.apache.sqoop.job.etl.Partitioner; import org.apache.sqoop.job.etl.Partitioner;
import org.apache.sqoop.job.etl.PartitionerContext; import org.apache.sqoop.job.etl.PartitionerContext;
import org.apache.sqoop.job.io.Data; import org.apache.sqoop.job.io.Data;
import org.apache.sqoop.job.io.SqoopWritable;
import org.apache.sqoop.job.mr.ConfigurationUtils; import org.apache.sqoop.job.mr.ConfigurationUtils;
import org.apache.sqoop.job.mr.SqoopInputFormat; import org.apache.sqoop.job.mr.SqoopInputFormat;
import org.apache.sqoop.job.mr.SqoopMapper; import org.apache.sqoop.job.mr.SqoopMapper;
import org.apache.sqoop.job.mr.SqoopNullOutputFormat; import org.apache.sqoop.job.mr.SqoopNullOutputFormat;
import org.apache.sqoop.job.mr.SqoopSplit; import org.apache.sqoop.job.mr.SqoopSplit;
import org.apache.sqoop.model.MJob; import org.apache.sqoop.model.MJob;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.schema.type.FixedPoint;
import org.apache.sqoop.schema.type.FloatingPoint;
import org.apache.sqoop.schema.type.Text;
public class TestMapReduce extends TestCase { public class TestMapReduce extends TestCase {
@ -59,6 +65,8 @@ public void testInputFormat() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
ConfigurationUtils.setJobType(conf, MJob.Type.IMPORT); ConfigurationUtils.setJobType(conf, MJob.Type.IMPORT);
conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
CSVIntermediateDataFormat.class.getName());
Job job = new Job(conf); Job job = new Job(conf);
SqoopInputFormat inputformat = new SqoopInputFormat(); SqoopInputFormat inputformat = new SqoopInputFormat();
@ -77,8 +85,15 @@ public void testMapper() throws Exception {
ConfigurationUtils.setJobType(conf, MJob.Type.IMPORT); ConfigurationUtils.setJobType(conf, MJob.Type.IMPORT);
conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
CSVIntermediateDataFormat.class.getName());
Schema schema = new Schema("Test");
schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
.addColumn(new org.apache.sqoop.schema.type.Text("3"));
JobUtils.runJob(conf, SqoopInputFormat.class, SqoopMapper.class, Job job = new Job(conf);
ConfigurationUtils.setConnectorSchema(job, schema);
JobUtils.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class,
DummyOutputFormat.class); DummyOutputFormat.class);
} }
@ -88,8 +103,15 @@ public void testOutputFormat() throws Exception {
conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName()); conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
CSVIntermediateDataFormat.class.getName());
Schema schema = new Schema("Test");
schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
.addColumn(new Text("3"));
JobUtils.runJob(conf, SqoopInputFormat.class, SqoopMapper.class, Job job = new Job(conf);
ConfigurationUtils.setConnectorSchema(job, schema);
JobUtils.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class,
SqoopNullOutputFormat.class); SqoopNullOutputFormat.class);
} }
@ -152,14 +174,14 @@ public long getRowsRead() {
} }
public static class DummyOutputFormat public static class DummyOutputFormat
extends OutputFormat<Data, NullWritable> { extends OutputFormat<SqoopWritable, NullWritable> {
@Override @Override
public void checkOutputSpecs(JobContext context) { public void checkOutputSpecs(JobContext context) {
// do nothing // do nothing
} }
@Override @Override
public RecordWriter<Data, NullWritable> getRecordWriter( public RecordWriter<SqoopWritable, NullWritable> getRecordWriter(
TaskAttemptContext context) { TaskAttemptContext context) {
return new DummyRecordWriter(); return new DummyRecordWriter();
} }
@ -170,12 +192,13 @@ public OutputCommitter getOutputCommitter(TaskAttemptContext context) {
} }
public static class DummyRecordWriter public static class DummyRecordWriter
extends RecordWriter<Data, NullWritable> { extends RecordWriter<SqoopWritable, NullWritable> {
private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION; private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION;
private Data data = new Data(); private Data data = new Data();
@Override @Override
public void write(Data key, NullWritable value) { public void write(SqoopWritable key, NullWritable value) {
data.setContent(new Object[] { data.setContent(new Object[] {
index, index,
(double) index, (double) index,
@ -215,22 +238,22 @@ public boolean needsTaskCommit(TaskAttemptContext taskContext) {
public static class DummyLoader extends Loader { public static class DummyLoader extends Loader {
private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION; private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION;
private Data expected = new Data(); private Data expected = new Data();
private Data actual = new Data(); private CSVIntermediateDataFormat actual = new CSVIntermediateDataFormat();
@Override @Override
public void load(LoaderContext context, Object oc, Object oj) throws Exception{ public void load(LoaderContext context, Object oc, Object oj) throws Exception{
Object[] array; String data;
while ((array = context.getDataReader().readArrayRecord()) != null) { while ((data = context.getDataReader().readTextRecord()) != null) {
actual.setContent(array, Data.ARRAY_RECORD);
// actual.setSchema(context.getSchema());
// actual.setObjectData(array, false);
expected.setContent(new Object[] { expected.setContent(new Object[] {
index, index,
(double) index, (double) index,
String.valueOf(index)}, String.valueOf(index)},
Data.ARRAY_RECORD); Data.ARRAY_RECORD);
index++; index++;
assertEquals(expected.toString(), data);
assertEquals(expected.toString(), actual.toString());
} }
} }
} }

View File

@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sqoop.job.io;
import com.google.common.base.Charsets;
import junit.framework.Assert;
import junit.framework.TestCase;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
import org.apache.sqoop.job.JobConstants;
public class SqoopWritableTest extends TestCase {
private final SqoopWritable writable = new SqoopWritable();
public void testStringInStringOut() {
String testData = "Live Long and prosper";
writable.setString(testData);
Assert.assertEquals(testData,writable.getString());
}
public void testDataWritten() throws IOException {
String testData = "One ring to rule them all";
byte[] testDataBytes = testData.getBytes(Charsets.UTF_8);
writable.setString(testData);
ByteArrayOutputStream ostream = new ByteArrayOutputStream();
DataOutput out = new DataOutputStream(ostream);
writable.write(out);
byte[] written = ostream.toByteArray();
InputStream instream = new ByteArrayInputStream(written);
DataInput in = new DataInputStream(instream);
String readData = in.readUTF();
Assert.assertEquals(testData, readData);
}
public void testDataRead() throws IOException {
String testData = "Brandywine Bridge - 20 miles!";
ByteArrayOutputStream ostream = new ByteArrayOutputStream();
DataOutput out = new DataOutputStream(ostream);
out.writeUTF(testData);
InputStream instream = new ByteArrayInputStream(ostream.toByteArray());
DataInput in = new DataInputStream(instream);
writable.readFields(in);
Assert.assertEquals(testData, writable.getString());
}
public void testWriteReadUsingStream() throws IOException {
String testData = "You shall not pass";
ByteArrayOutputStream ostream = new ByteArrayOutputStream();
DataOutput out = new DataOutputStream(ostream);
writable.setString(testData);
writable.write(out);
byte[] written = ostream.toByteArray();
//Don't test what the data is, test that SqoopWritable can read it.
InputStream instream = new ByteArrayInputStream(written);
SqoopWritable newWritable = new SqoopWritable();
DataInput in = new DataInputStream(instream);
newWritable.readFields(in);
Assert.assertEquals(testData, newWritable.getString());
ostream.close();
instream.close();
}
}

View File

@ -23,11 +23,13 @@
import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
import org.apache.sqoop.connector.idf.IntermediateDataFormat;
import org.apache.sqoop.job.JobConstants; import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.etl.Loader; import org.apache.sqoop.job.etl.Loader;
import org.apache.sqoop.job.etl.LoaderContext; import org.apache.sqoop.job.etl.LoaderContext;
import org.apache.sqoop.job.io.Data;
import org.apache.sqoop.model.MJob; import org.apache.sqoop.model.MJob;
import org.apache.sqoop.job.io.SqoopWritable;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -47,7 +49,7 @@ public ThrowingLoader() {
@Override @Override
public void load(LoaderContext context, Object cc, Object jc) throws Exception { public void load(LoaderContext context, Object cc, Object jc) throws Exception {
context.getDataReader().readContent(Data.CSV_RECORD); context.getDataReader().readTextRecord();
throw new BrokenBarrierException(); throw new BrokenBarrierException();
} }
} }
@ -62,7 +64,7 @@ public void load(LoaderContext context, Object cc, Object jc) throws Exception {
int runCount = 0; int runCount = 0;
Object o; Object o;
String[] arr; String[] arr;
while ((o = context.getDataReader().readContent(Data.CSV_RECORD)) != null) { while ((o = context.getDataReader().readTextRecord()) != null) {
arr = o.toString().split(","); arr = o.toString().split(",");
Assert.assertEquals(100, arr.length); Assert.assertEquals(100, arr.length);
for (int i = 0; i < arr.length; i++) { for (int i = 0; i < arr.length; i++) {
@ -84,7 +86,7 @@ public GoodLoader() {
@Override @Override
public void load(LoaderContext context, Object cc, Object jc) throws Exception { public void load(LoaderContext context, Object cc, Object jc) throws Exception {
String[] arr = context.getDataReader().readContent(Data.CSV_RECORD).toString().split(","); String[] arr = context.getDataReader().readTextRecord().toString().split(",");
Assert.assertEquals(100, arr.length); Assert.assertEquals(100, arr.length);
for (int i = 0; i < arr.length; i++) { for (int i = 0; i < arr.length; i++) {
Assert.assertEquals(i, Integer.parseInt(arr[i])); Assert.assertEquals(i, Integer.parseInt(arr[i]));
@ -103,7 +105,7 @@ public void load(LoaderContext context, Object cc, Object jc) throws Exception {
int runCount = 0; int runCount = 0;
Object o; Object o;
String[] arr; String[] arr;
while ((o = context.getDataReader().readContent(Data.CSV_RECORD)) != null) { while ((o = context.getDataReader().readTextRecord()) != null) {
arr = o.toString().split(","); arr = o.toString().split(",");
Assert.assertEquals(100, arr.length); Assert.assertEquals(100, arr.length);
for (int i = 0; i < arr.length; i++) { for (int i = 0; i < arr.length; i++) {
@ -119,6 +121,7 @@ public void load(LoaderContext context, Object cc, Object jc) throws Exception {
@Before @Before
public void setUp() { public void setUp() {
conf = new Configuration(); conf = new Configuration();
conf.setIfUnset(JobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
} }
@ -128,12 +131,14 @@ public void testWhenLoaderThrows() throws Throwable {
conf.set(JobConstants.JOB_ETL_LOADER, ThrowingLoader.class.getName()); conf.set(JobConstants.JOB_ETL_LOADER, ThrowingLoader.class.getName());
SqoopOutputFormatLoadExecutor executor = new SqoopOutputFormatLoadExecutor executor = new
SqoopOutputFormatLoadExecutor(true, ThrowingLoader.class.getName()); SqoopOutputFormatLoadExecutor(true, ThrowingLoader.class.getName());
RecordWriter<Data, NullWritable> writer = executor.getRecordWriter(); RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter();
Data data = new Data(); IntermediateDataFormat data = new CSVIntermediateDataFormat();
SqoopWritable writable = new SqoopWritable();
try { try {
for (int count = 0; count < 100; count++) { for (int count = 0; count < 100; count++) {
data.setContent(String.valueOf(count), Data.CSV_RECORD); data.setTextData(String.valueOf(count));
writer.write(data, null); writable.setString(data.getTextData());
writer.write(writable, null);
} }
} catch (SqoopException ex) { } catch (SqoopException ex) {
throw ex.getCause(); throw ex.getCause();
@ -146,8 +151,9 @@ public void testSuccessfulContinuousLoader() throws Throwable {
conf.set(JobConstants.JOB_ETL_LOADER, GoodContinuousLoader.class.getName()); conf.set(JobConstants.JOB_ETL_LOADER, GoodContinuousLoader.class.getName());
SqoopOutputFormatLoadExecutor executor = new SqoopOutputFormatLoadExecutor executor = new
SqoopOutputFormatLoadExecutor(true, GoodContinuousLoader.class.getName()); SqoopOutputFormatLoadExecutor(true, GoodContinuousLoader.class.getName());
RecordWriter<Data, NullWritable> writer = executor.getRecordWriter(); RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter();
Data data = new Data(); IntermediateDataFormat data = new CSVIntermediateDataFormat();
SqoopWritable writable = new SqoopWritable();
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
StringBuilder builder = new StringBuilder(); StringBuilder builder = new StringBuilder();
for (int count = 0; count < 100; count++) { for (int count = 0; count < 100; count++) {
@ -156,8 +162,9 @@ public void testSuccessfulContinuousLoader() throws Throwable {
builder.append(","); builder.append(",");
} }
} }
data.setContent(builder.toString(), Data.CSV_RECORD); data.setTextData(builder.toString());
writer.write(data, null); writable.setString(data.getTextData());
writer.write(writable, null);
} }
writer.close(null); writer.close(null);
} }
@ -166,8 +173,9 @@ public void testSuccessfulContinuousLoader() throws Throwable {
public void testSuccessfulLoader() throws Throwable { public void testSuccessfulLoader() throws Throwable {
SqoopOutputFormatLoadExecutor executor = new SqoopOutputFormatLoadExecutor executor = new
SqoopOutputFormatLoadExecutor(true, GoodLoader.class.getName()); SqoopOutputFormatLoadExecutor(true, GoodLoader.class.getName());
RecordWriter<Data, NullWritable> writer = executor.getRecordWriter(); RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter();
Data data = new Data(); IntermediateDataFormat data = new CSVIntermediateDataFormat();
SqoopWritable writable = new SqoopWritable();
StringBuilder builder = new StringBuilder(); StringBuilder builder = new StringBuilder();
for (int count = 0; count < 100; count++) { for (int count = 0; count < 100; count++) {
builder.append(String.valueOf(count)); builder.append(String.valueOf(count));
@ -175,8 +183,10 @@ public void testSuccessfulLoader() throws Throwable {
builder.append(","); builder.append(",");
} }
} }
data.setContent(builder.toString(), Data.CSV_RECORD); data.setTextData(builder.toString());
writer.write(data, null); writable.setString(data.getTextData());
writer.write(writable, null);
//Allow writer to complete. //Allow writer to complete.
TimeUnit.SECONDS.sleep(5); TimeUnit.SECONDS.sleep(5);
writer.close(null); writer.close(null);
@ -189,8 +199,9 @@ public void testThrowingContinuousLoader() throws Throwable {
conf.set(JobConstants.JOB_ETL_LOADER, ThrowingContinuousLoader.class.getName()); conf.set(JobConstants.JOB_ETL_LOADER, ThrowingContinuousLoader.class.getName());
SqoopOutputFormatLoadExecutor executor = new SqoopOutputFormatLoadExecutor executor = new
SqoopOutputFormatLoadExecutor(true, ThrowingContinuousLoader.class.getName()); SqoopOutputFormatLoadExecutor(true, ThrowingContinuousLoader.class.getName());
RecordWriter<Data, NullWritable> writer = executor.getRecordWriter(); RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter();
Data data = new Data(); IntermediateDataFormat data = new CSVIntermediateDataFormat();
SqoopWritable writable = new SqoopWritable();
try { try {
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
StringBuilder builder = new StringBuilder(); StringBuilder builder = new StringBuilder();
@ -200,8 +211,9 @@ public void testThrowingContinuousLoader() throws Throwable {
builder.append(","); builder.append(",");
} }
} }
data.setContent(builder.toString(), Data.CSV_RECORD); data.setTextData(builder.toString());
writer.write(data, null); writable.setString(data.getTextData());
writer.write(writable, null);
} }
writer.close(null); writer.close(null);
} catch (SqoopException ex) { } catch (SqoopException ex) {

11
pom.xml
View File

@ -142,12 +142,6 @@ limitations under the License.
<version>${commons-io.version}</version> <version>${commons-io.version}</version>
</dependency> </dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency> <dependency>
<groupId>org.apache.hadoop</groupId> <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId> <artifactId>hadoop-core</artifactId>
@ -344,6 +338,11 @@ limitations under the License.
<artifactId>commons-lang</artifactId> <artifactId>commons-lang</artifactId>
<version>${commons-lang.version}</version> <version>${commons-lang.version}</version>
</dependency> </dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency> <dependency>
<groupId>javax.servlet</groupId> <groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId> <artifactId>servlet-api</artifactId>

View File

@ -36,5 +36,10 @@ limitations under the License.
<groupId>org.apache.sqoop</groupId> <groupId>org.apache.sqoop</groupId>
<artifactId>sqoop-common</artifactId> <artifactId>sqoop-common</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.sqoop</groupId>
<artifactId>connector-sdk</artifactId>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -20,6 +20,8 @@
import java.util.Locale; import java.util.Locale;
import java.util.ResourceBundle; import java.util.ResourceBundle;
import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
import org.apache.sqoop.connector.idf.IntermediateDataFormat;
import org.apache.sqoop.job.etl.Exporter; import org.apache.sqoop.job.etl.Exporter;
import org.apache.sqoop.job.etl.Importer; import org.apache.sqoop.job.etl.Importer;
import org.apache.sqoop.model.MJob; import org.apache.sqoop.model.MJob;
@ -79,4 +81,14 @@ public abstract class SqoopConnector {
*/ */
public abstract MetadataUpgrader getMetadataUpgrader(); public abstract MetadataUpgrader getMetadataUpgrader();
/**
* Returns the {@linkplain IntermediateDataFormat} this connector
* can return natively in. This will support retrieving the data as text
* and an array of objects. This should never return null.
*
* @return {@linkplain IntermediateDataFormat} object
*/
public Class<? extends IntermediateDataFormat<?>> getIntermediateDataFormat() {
return CSVIntermediateDataFormat.class;
}
} }

View File

@ -198,6 +198,7 @@ public boolean submit(SubmissionRequest generalRequest) {
ConfigurationUtils.setConfigConnectorJob(job, request.getConfigConnectorJob()); ConfigurationUtils.setConfigConnectorJob(job, request.getConfigConnectorJob());
ConfigurationUtils.setConfigFrameworkConnection(job, request.getConfigFrameworkConnection()); ConfigurationUtils.setConfigFrameworkConnection(job, request.getConfigFrameworkConnection());
ConfigurationUtils.setConfigFrameworkJob(job, request.getConfigFrameworkJob()); ConfigurationUtils.setConfigFrameworkJob(job, request.getConfigFrameworkJob());
ConfigurationUtils.setConnectorSchema(job, request.getSummary().getConnectorSchema());
if(request.getJobName() != null) { if(request.getJobName() != null) {
job.setJobName("Sqoop: " + request.getJobName()); job.setJobName("Sqoop: " + request.getJobName());