5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-02 23:21:22 +08:00

SQOOP-3224: Mainframe FTP transfer should have an option to use binary mode for transfer

(Chris Teoh via Szabolcs Vasas)
This commit is contained in:
Szabolcs Vasas 2018-08-23 17:08:00 +02:00
parent f5eda1e208
commit 0d6c455e5b
21 changed files with 989 additions and 122 deletions

View File

@ -253,6 +253,15 @@
<property name="sqoop.test.mainframe.ftp.dataset.gdg" value="TSODIQ1.GDGTEXT" />
<property name="sqoop.test.mainframe.ftp.dataset.gdg.filename" value="G0001V43" />
<property name="sqoop.test.mainframe.ftp.dataset.gdg.md5" value="f0d0d171fdb8a03dbc1266ed179d7093" />
<property name="sqoop.test.mainframe.ftp.binary.dataset.gdg" value="TSODIQ1.FOLDER" />
<property name="sqoop.test.mainframe.ftp.binary.dataset.gdg.filename" value="G0002V45" />
<property name="sqoop.test.mainframe.ftp.binary.dataset.gdg.md5" value="43eefbe34e466dd3f65a3e867a60809a" />
<property name="sqoop.test.mainframe.ftp.dataset.seq" value="TSODIQ1.GDGTEXT.G0001V43" />
<property name="sqoop.test.mainframe.ftp.dataset.seq.filename" value="G0001V43" />
<property name="sqoop.test.mainframe.ftp.dataset.seq.md5" value="f0d0d171fdb8a03dbc1266ed179d7093" />
<property name="sqoop.test.mainframe.ftp.binary.dataset.seq" value="TSODIQ1.FOLDER.FOLDERTXT" />
<property name="sqoop.test.mainframe.ftp.binary.dataset.seq.filename" value="FOLDERTXT" />
<property name="sqoop.test.mainframe.ftp.binary.dataset.seq.md5" value="1591c0fcc718fda7e9c1f3561d232b2b" />
<property name="s3.bucket.url" value="" />
<property name="s3.generator.command" value="" />
@ -890,10 +899,19 @@
<sysproperty key="sqoop.test.mainframe.ftp.dataset.gdg" value="${sqoop.test.mainframe.ftp.dataset.gdg}" />
<sysproperty key="sqoop.test.mainframe.ftp.dataset.gdg.filename" value="${sqoop.test.mainframe.ftp.dataset.gdg.filename}" />
<sysproperty key="sqoop.test.mainframe.ftp.dataset.gdg.md5" value="${sqoop.test.mainframe.ftp.dataset.gdg.md5}" />
<sysproperty key="sqoop.test.mainframe.ftp.binary.dataset.gdg" value="${sqoop.test.mainframe.ftp.binary.dataset.gdg}" />
<sysproperty key="sqoop.test.mainframe.ftp.binary.dataset.gdg.filename" value="${sqoop.test.mainframe.ftp.binary.dataset.gdg.filename}" />
<sysproperty key="sqoop.test.mainframe.ftp.binary.dataset.gdg.md5" value="${sqoop.test.mainframe.ftp.binary.dataset.gdg.md5}" />
<sysproperty key="s3.bucket.url" value="${s3.bucket.url}" />
<sysproperty key="s3.generator.command" value="${s3.generator.command}" />
<sysproperty key="sqoop.test.mainframe.ftp.dataset.seq" value="${sqoop.test.mainframe.ftp.dataset.seq}" />
<sysproperty key="sqoop.test.mainframe.ftp.dataset.seq.filename" value="${sqoop.test.mainframe.ftp.dataset.seq.filename}" />
<sysproperty key="sqoop.test.mainframe.ftp.dataset.seq.md5" value="${sqoop.test.mainframe.ftp.dataset.seq.md5}" />
<sysproperty key="sqoop.test.mainframe.ftp.binary.dataset.seq" value="${sqoop.test.mainframe.ftp.binary.dataset.seq}" />
<sysproperty key="sqoop.test.mainframe.ftp.binary.dataset.seq.filename" value="${sqoop.test.mainframe.ftp.binary.dataset.seq.filename}" />
<sysproperty key="sqoop.test.mainframe.ftp.binary.dataset.seq.md5" value="${sqoop.test.mainframe.ftp.binary.dataset.seq.md5}" />
<!-- Location of Hive logs -->
<!--<sysproperty key="hive.log.dir"
value="${test.build.data}/sqoop/logs"/> -->

View File

@ -49,6 +49,7 @@ Argument Description
+\--as-sequencefile+ Imports data to SequenceFiles
+\--as-textfile+ Imports data as plain text (default)
+\--as-parquetfile+ Imports data to Parquet Files
+\--as-binaryfile+ Imports data as binary files
+\--delete-target-dir+ Delete the import target directory\
if it exists
+-m,\--num-mappers <n>+ Use 'n' map tasks to import in parallel
@ -193,6 +194,26 @@ $ sqoop import-mainframe --dataset SomePDS --jar-file mydatatypes.jar \
This command will load the +SomePDSType+ class out of +mydatatypes.jar+.
Support for Generation Data Group and Sequential data sets.
This can be specified with the --datasettype option followed by one of:
'p' for partitioned dataset (default)
'g' for generation data group dataset
's' for sequential dataset
In the case of generation data group datasets, Sqoop will retrieve just the last or
latest file (or generation).
In the case of sequential datasets, Sqoop will retrieve just the file specified.
Support of datasets that are stored on tape volumes by specifying --tape true.
By default, mainframe datasets are assumed to be plain text. Attempting to transfer
binary datasets using this method will result in data corruption.
Support for binary datasets by specifying --as-binaryfile and optionally --buffersize followed by
buffer size specified in bytes. By default, --buffersize is set to 32760 bytes. Altering buffersize
will alter the number of records Sqoop reports to have imported. This is because it reads the
binary dataset in chunks specified by buffersize. Larger buffer size means lower number of records.
Additional Import Configuration Properties
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
There are some additional properties which can be configured by modifying
@ -228,6 +249,23 @@ $ sqoop import-mainframe --connect z390 --dataset EMPLOYEES \
Enter password: (hidden)
----
Import of a tape based generation data group dataset using a password alias and writing out to
an intermediate directory (--outdir) before moving it to (--target-dir).
----
$ sqoop import-mainframe --dataset SomeGdg --connect <host> --username myuser --password-alias \
mypasswordalias --datasettype g --tape true --outdir /tmp/imported/sqoop \
--target-dir /data/imported/mainframe/SomeGdg
----
Import of a tape based binary generation data group dataset with a buffer size of 64000 using a
password alias and writing out to an intermediate directory (--outdir) before moving it
to (--target-dir).
----
$ sqoop import-mainframe --dataset SomeGdg --connect <host> --username myuser --password-alias \
mypasswordalias --datasettype g --tape true --as-binaryfile --buffersize 64000 --outdir /tmp/imported/sqoop \
--target-dir /data/imported/mainframe/SomeGdg
----
Controlling the import parallelism (using 8 parallel tasks):
----

View File

@ -87,7 +87,8 @@ public enum FileLayout {
TextFile,
SequenceFile,
AvroDataFile,
ParquetFile
ParquetFile,
BinaryFile
}
/**
@ -362,7 +363,12 @@ public String toString() {
// Indicates if the data set is on tape to use different FTP parser
@StoredAsProperty("mainframe.input.dataset.tape")
private String mainframeInputDatasetTape;
// Indicates if binary or ascii FTP transfer mode should be used
@StoredAsProperty("mainframe.ftp.transfermode")
private String mainframeFtpTransferMode;
// Buffer size to use when using binary FTP transfer mode
@StoredAsProperty("mainframe.ftp.buffersize")
private Integer bufferSize;
// Accumulo home directory
private String accumuloHome; // not serialized to metastore.
// Zookeeper home directory
@ -1162,6 +1168,11 @@ private void initDefaults(Configuration baseConfiguration) {
this.escapeColumnMappingEnabled = true;
this.parquetConfiguratorImplementation = HADOOP;
// set default transfer mode to ascii
this.mainframeFtpTransferMode = MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE_ASCII;
// set default buffer size for mainframe binary transfers
this.bufferSize = MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE;
}
/**
@ -2499,6 +2510,23 @@ public Boolean getMainframeInputDatasetTape() {
public void setMainframeInputDatasetTape(String txtIsFromTape) {
mainframeInputDatasetTape = Boolean.valueOf(Boolean.parseBoolean(txtIsFromTape)).toString();
}
// returns the buffer size set.
public Integer getBufferSize() {
return bufferSize;
}
public void setMainframeFtpTransferMode(String transferMode) {
mainframeFtpTransferMode = transferMode;
}
public String getMainframeFtpTransferMode() {
return mainframeFtpTransferMode;
}
// sets the binary transfer buffer size, defaults to MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE
public void setBufferSize(int buf) {
bufferSize = buf;
}
public static String getAccumuloHomeDefault() {
// Set this with $ACCUMULO_HOME, but -Daccumulo.home can override.

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce;
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
/**
* An {@link OutputFormat} that writes binary files.
* Only writes the key. Does not write any delimiter/newline after the key.
*/
public class ByteKeyOutputFormat<K, V> extends RawKeyTextOutputFormat<K, V> {
// currently don't support compression
private static final String FILE_EXTENSION = "";
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
throws IOException {
DataOutputStream ostream = getFSDataOutputStream(context,FILE_EXTENSION);
return new KeyRecordWriters.BinaryKeyRecordWriter<K,V>(ostream);
}
}

View File

@ -0,0 +1,97 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.DataOutputStream;
import java.io.IOException;
public class KeyRecordWriters {
/**
* RecordWriter to write to plain text files.
*/
public static class GenericRecordWriter<K, V> extends RecordWriter<K, V> {
private static final String UTF8 = "UTF-8";
protected DataOutputStream out;
/**
* Write the object to the byte stream, handling Text as a special
* case.
*
* @param o the object to print
* @param value the corresponding value for key o
* @throws IOException if the write throws, we pass it on
*/
protected void writeObject(Object o,Object value) throws IOException {
if (o instanceof Text) {
Text to = (Text) o;
out.write(to.getBytes(), 0, to.getLength());
} else {
out.write(o.toString().getBytes(UTF8));
}
}
@Override
public synchronized void write(K key, V value) throws IOException, InterruptedException {
writeObject(key,value);
}
@Override
public synchronized void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
out.close();
}
}
public static class RawKeyRecordWriter<K, V> extends GenericRecordWriter<K, V> {
public RawKeyRecordWriter(DataOutputStream out) {
this.out = out;
}
}
/**
* RecordWriter to write to plain text files.
*/
public static class BinaryKeyRecordWriter<K, V> extends GenericRecordWriter<K, V> {
public BinaryKeyRecordWriter(DataOutputStream out) {
this.out = out;
}
/**
* Write the object to the byte stream, handling Text as a special
* case.
* @param o the object to print
* @throws IOException if the write throws, we pass it on
*/
@Override
protected void writeObject(Object o, Object value) throws IOException {
if (o instanceof BytesWritable) {
BytesWritable to = (BytesWritable) o;
out.write(to.getBytes(), 0, to.getLength());
}
}
}
}

View File

@ -24,7 +24,6 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
@ -38,47 +37,15 @@
*/
public class RawKeyTextOutputFormat<K, V> extends FileOutputFormat<K, V> {
/**
* RecordWriter to write to plain text files.
*/
public static class RawKeyRecordWriter<K, V> extends RecordWriter<K, V> {
private static final String UTF8 = "UTF-8";
protected DataOutputStream out;
public RawKeyRecordWriter(DataOutputStream out) {
this.out = out;
}
/**
* Write the object to the byte stream, handling Text as a special
* case.
* @param o the object to print
* @throws IOException if the write throws, we pass it on
*/
private void writeObject(Object o) throws IOException {
if (o instanceof Text) {
Text to = (Text) o;
out.write(to.getBytes(), 0, to.getLength());
} else {
out.write(o.toString().getBytes(UTF8));
}
}
public synchronized void write(K key, V value) throws IOException {
writeObject(key);
}
public synchronized void close(TaskAttemptContext context)
throws IOException {
out.close();
}
protected FSDataOutputStream getFSDataOutputStream(TaskAttemptContext context, String ext) throws IOException {
Configuration conf = context.getConfiguration();
Path file = getDefaultWorkFile(context, ext);
FileSystem fs = file.getFileSystem(conf);
FSDataOutputStream fileOut = fs.create(file, false);
return fileOut;
}
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
throws IOException {
protected DataOutputStream getOutputStream(TaskAttemptContext context) throws IOException {
boolean isCompressed = getCompressOutput(context);
Configuration conf = context.getConfiguration();
String ext = "";
@ -93,17 +60,18 @@ public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
ext = codec.getDefaultExtension();
}
Path file = getDefaultWorkFile(context, ext);
FileSystem fs = file.getFileSystem(conf);
FSDataOutputStream fileOut = fs.create(file, false);
FSDataOutputStream fileOut = getFSDataOutputStream(context,ext);
DataOutputStream ostream = fileOut;
if (isCompressed) {
ostream = new DataOutputStream(codec.createOutputStream(fileOut));
}
return new RawKeyRecordWriter<K, V>(ostream);
return ostream;
}
}
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
throws IOException {
DataOutputStream ostream = getOutputStream(context);
return new KeyRecordWriters.RawKeyRecordWriter<K, V>(ostream);
}
}

View File

@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce.mainframe;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.sqoop.config.ConfigurationConstants;
import org.apache.sqoop.lib.SqoopRecord;
import org.apache.sqoop.mapreduce.AutoProgressMapper;
import java.io.IOException;
public abstract class AbstractMainframeDatasetImportMapper<KEY>
extends AutoProgressMapper<LongWritable, SqoopRecord, KEY, NullWritable> {
private MainframeDatasetInputSplit inputSplit;
private MultipleOutputs<KEY, NullWritable> multiFileWriter;
private long numberOfRecords;
public void map(LongWritable key, SqoopRecord val, Context context)
throws IOException, InterruptedException {
String dataset = inputSplit.getCurrentDataset();
numberOfRecords++;
multiFileWriter.write(createOutKey(val), NullWritable.get(), dataset);
}
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
super.setup(context);
inputSplit = (MainframeDatasetInputSplit)context.getInputSplit();
multiFileWriter = new MultipleOutputs<>(context);
numberOfRecords = 0;
}
@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
super.cleanup(context);
multiFileWriter.close();
context.getCounter(
ConfigurationConstants.COUNTER_GROUP_MAPRED_TASK_COUNTERS,
ConfigurationConstants.COUNTER_MAP_OUTPUT_RECORDS)
.increment(numberOfRecords);
}
protected abstract KEY createOutKey(SqoopRecord sqoopRecord);
}

View File

@ -33,4 +33,15 @@ public class MainframeConfiguration
public static final String MAINFRAME_INPUT_DATASET_TAPE = "mainframe.input.dataset.tape";
public static final String MAINFRAME_FTP_FILE_ENTRY_PARSER_CLASSNAME = "org.apache.sqoop.mapreduce.mainframe.MainframeFTPFileEntryParser";
public static final String MAINFRAME_FTP_TRANSFER_MODE = "mainframe.ftp.transfermode";
public static final String MAINFRAME_FTP_TRANSFER_MODE_ASCII = "ascii";
public static final String MAINFRAME_FTP_TRANSFER_MODE_BINARY = "binary";
// this is the default buffer size used when doing binary ftp transfers from mainframe
public static final Integer MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE = 32760;
public static final String MAINFRAME_FTP_TRANSFER_BINARY_BUFFER_SIZE = "mainframe.ftp.buffersize";
}

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce.mainframe;
import org.apache.hadoop.io.BytesWritable;
import org.apache.sqoop.lib.SqoopRecord;
/**
* Mapper that writes mainframe dataset records in binary format to multiple files
* based on the key, which is the index of the datasets in the input split.
*/
public class MainframeDatasetBinaryImportMapper extends AbstractMainframeDatasetImportMapper<BytesWritable> {
@Override
protected BytesWritable createOutKey(SqoopRecord sqoopRecord) {
BytesWritable result = new BytesWritable();
byte[] bytes = (byte[]) sqoopRecord.getFieldMap().entrySet().iterator().next().getValue();
result.set(bytes,0, bytes.length);
return result;
}
}

View File

@ -0,0 +1,123 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce.mainframe;
import org.apache.sqoop.lib.DelimiterSet;
import org.apache.sqoop.lib.LargeObjectLoader;
import org.apache.hadoop.io.Text;
import org.apache.sqoop.lib.SqoopRecord;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
public class MainframeDatasetBinaryRecord extends SqoopRecord {
private byte[] field;
public Map<String, Object> getFieldMap() {
Map<String, Object> map = new HashMap<String, Object>();
map.put("fieldName", field);
return map;
}
public void setField(String fieldName, Object fieldVal) {
if (fieldVal instanceof byte[]) {
field = (byte[]) fieldVal;
}
}
public void setField(final byte[] val) {
this.field = val;
}
@Override
public void readFields(DataInput in) throws IOException {
in.readFully(field);
}
@Override
public void write(DataOutput out) throws IOException {
out.write(field);
}
@Override
public void readFields(ResultSet rs) throws SQLException {
field = rs.getBytes(1);
}
@Override
public void write(PreparedStatement s) throws SQLException {
s.setBytes(1, field);
}
@Override
public String toString() {
return field.toString();
}
@Override
public int write(PreparedStatement stmt, int offset) throws SQLException {
return 0;
}
@Override
public String toString(DelimiterSet delimiters) {
return null;
}
@Override
public int getClassFormatVersion() {
return 0;
}
@Override
public int hashCode() {
return field.hashCode();
}
public void parse(CharSequence s) {
}
public void parse(Text s) {
}
public void parse(byte[] s) {
}
public void parse(char[] s) {
}
public void parse(ByteBuffer s) {
}
public void parse(CharBuffer s) {
}
@Override
public void loadLargeObjects(LargeObjectLoader objLoader) throws SQLException, IOException, InterruptedException {
}
}

View File

@ -18,9 +18,11 @@
package org.apache.sqoop.mapreduce.mainframe;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.ByteBuffer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -38,6 +40,7 @@ public class MainframeDatasetFTPRecordReader <T extends SqoopRecord>
extends MainframeDatasetRecordReader<T> {
private FTPClient ftp = null;
private BufferedReader datasetReader = null;
private BufferedInputStream inputStream = null;
private static final Log LOG = LogFactory.getLog(
MainframeDatasetFTPRecordReader.class.getName());
@ -50,21 +53,27 @@ public void initialize(InputSplit inputSplit,
Configuration conf = getConfiguration();
ftp = MainframeFTPClientUtils.getFTPConnection(conf);
initialize(ftp,conf);
}
public void initialize(FTPClient ftpClient, Configuration conf)
throws IOException {
ftp = ftpClient;
if (ftp != null) {
String dsName = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME);
String dsType = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE);
MainframeDatasetPath p = null;
try {
p = new MainframeDatasetPath(dsName,conf);
} catch (Exception e) {
LOG.error(e.getMessage());
LOG.error("MainframeDatasetPath helper class incorrectly initialised");
e.printStackTrace();
}
if (dsType != null && p != null) {
dsName = p.getMainframeDatasetFolder();
}
ftp.changeWorkingDirectory("'" + dsName + "'");
String dsName = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME);
String dsType = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE);
MainframeDatasetPath p = null;
try {
p = new MainframeDatasetPath(dsName,conf);
} catch (Exception e) {
LOG.error(e.getMessage());
LOG.error("MainframeDatasetPath helper class incorrectly initialised");
e.printStackTrace();
}
if (dsType != null && p != null) {
dsName = p.getMainframeDatasetFolder();
}
ftp.changeWorkingDirectory("'" + dsName + "'");
}
}
@ -80,6 +89,10 @@ public void close() throws IOException {
protected boolean getNextRecord(T sqoopRecord) throws IOException {
String line = null;
Configuration conf = getConfiguration();
if (MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE_BINARY.equals(conf.get(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE))) {
return getNextBinaryRecord(sqoopRecord);
}
try {
do {
if (datasetReader == null) {
@ -112,9 +125,85 @@ protected boolean getNextRecord(T sqoopRecord) throws IOException {
return false;
}
protected boolean getNextBinaryRecord(T sqoopRecord) throws IOException {
Configuration conf = getConfiguration();
// typical estimated max size for mainframe record
int BUFFER_SIZE = MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE;
if (conf != null) {
BUFFER_SIZE = conf.getInt(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_BUFFER_SIZE, MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE);
}
byte[] buf = new byte[BUFFER_SIZE];
int bytesRead = -1;
int cumulativeBytesRead = 0;
try {
Boolean streamInited = initInputStream(BUFFER_SIZE);
if (!streamInited) {
LOG.info("No more datasets to process.");
return false;
}
do {
bytesRead = inputStream.read(buf,cumulativeBytesRead,BUFFER_SIZE-cumulativeBytesRead);
if (bytesRead == -1) {
// EOF
closeFtpInputStream();
LOG.info("Data transfer completed.");
return writeBytesToSqoopRecord(buf,cumulativeBytesRead,sqoopRecord);
}
cumulativeBytesRead += bytesRead;
if (cumulativeBytesRead == BUFFER_SIZE) {
return writeBytesToSqoopRecord(buf,cumulativeBytesRead,sqoopRecord);
}
} while (bytesRead != -1);
} catch (IOException ioe) {
throw new IOException("IOException during data transfer: " + ioe);
}
return false;
}
protected Boolean initInputStream(int bufferSize) throws IOException {
if (inputStream == null) {
String dsName = getNextDataset();
if (dsName == null) {
LOG.info("No more datasets to process. Returning.");
return false;
}
LOG.info("Attempting to retrieve file stream for: "+dsName);
LOG.info("Buffer size: "+bufferSize);
inputStream = new BufferedInputStream(ftp.retrieveFileStream(dsName));
if (inputStream == null) {
throw new IOException("Failed to retrieve FTP file stream.");
}
}
return true;
}
protected void closeFtpInputStream() throws IOException {
inputStream.close();
inputStream = null;
if (!ftp.completePendingCommand()) {
throw new IOException("Failed to complete ftp command. FTP Response: "+ftp.getReplyString());
}
}
protected Boolean writeBytesToSqoopRecord(byte[] buf, int cumulativeBytesRead, SqoopRecord sqoopRecord) {
if (cumulativeBytesRead <= 0) {
return false;
}
ByteBuffer buffer = ByteBuffer.allocate(cumulativeBytesRead);
buffer.put(buf,0,cumulativeBytesRead);
convertToSqoopRecord(buffer.array(), sqoopRecord);
return true;
}
private void convertToSqoopRecord(String line, SqoopRecord sqoopRecord) {
String fieldName
= sqoopRecord.getFieldMap().entrySet().iterator().next().getKey();
sqoopRecord.setField(fieldName, line);
}
private void convertToSqoopRecord(byte[] buf, SqoopRecord sqoopRecord) {
String fieldName
= sqoopRecord.getFieldMap().entrySet().iterator().next().getKey();
sqoopRecord.setField(fieldName, buf);
}
}

View File

@ -18,61 +18,19 @@
package org.apache.sqoop.mapreduce.mainframe;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.sqoop.config.ConfigurationConstants;
import org.apache.sqoop.lib.SqoopRecord;
import org.apache.sqoop.mapreduce.AutoProgressMapper;
/**
* Mapper that writes mainframe dataset records in Text format to multiple files
* based on the key, which is the index of the datasets in the input split.
*/
public class MainframeDatasetImportMapper
extends AutoProgressMapper<LongWritable, SqoopRecord, Text, NullWritable> {
private static final Log LOG = LogFactory.getLog(
MainframeDatasetImportMapper.class.getName());
private MainframeDatasetInputSplit inputSplit;
private MultipleOutputs<Text, NullWritable> mos;
private long numberOfRecords;
private Text outkey;
public void map(LongWritable key, SqoopRecord val, Context context)
throws IOException, InterruptedException {
String dataset = inputSplit.getCurrentDataset();
outkey.set(val.toString());
numberOfRecords++;
mos.write(outkey, NullWritable.get(), dataset);
}
public class MainframeDatasetImportMapper extends AbstractMainframeDatasetImportMapper<Text> {
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
super.setup(context);
inputSplit = (MainframeDatasetInputSplit)context.getInputSplit();
mos = new MultipleOutputs<Text, NullWritable>(context);
numberOfRecords = 0;
outkey = new Text();
}
@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
super.cleanup(context);
mos.close();
context.getCounter(
ConfigurationConstants.COUNTER_GROUP_MAPRED_TASK_COUNTERS,
ConfigurationConstants.COUNTER_MAP_OUTPUT_RECORDS)
.increment(numberOfRecords);
protected Text createOutKey(SqoopRecord sqoopRecord) {
Text result = new Text();
result.set(sqoopRecord.toString());
return result;
}
}

View File

@ -22,14 +22,19 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.sqoop.SqoopOptions;
import org.apache.sqoop.manager.ImportJobContext;
import org.apache.sqoop.mapreduce.DBWritable;
import org.apache.sqoop.mapreduce.DataDrivenImportJob;
import org.apache.sqoop.mapreduce.RawKeyTextOutputFormat;
import org.apache.sqoop.mapreduce.ByteKeyOutputFormat;
import org.apache.sqoop.mapreduce.parquet.ParquetImportJobConfigurator;
/**
@ -46,7 +51,10 @@ public MainframeImportJob(final SqoopOptions opts, ImportJobContext context, Par
@Override
protected Class<? extends Mapper> getMapperClass() {
if (options.getFileLayout() == SqoopOptions.FileLayout.TextFile) {
if (SqoopOptions.FileLayout.BinaryFile.equals(options.getFileLayout())) {
LOG.debug("Using MainframeDatasetBinaryImportMapper");
return MainframeDatasetBinaryImportMapper.class;
} else if (options.getFileLayout() == SqoopOptions.FileLayout.TextFile) {
return MainframeDatasetImportMapper.class;
} else {
return super.getMapperClass();
@ -66,13 +74,58 @@ protected void configureInputFormat(Job job, String tableName,
job.getConfiguration().set(
MainframeConfiguration.MAINFRAME_INPUT_DATASET_TAPE,
options.getMainframeInputDatasetTape().toString());
if (SqoopOptions.FileLayout.BinaryFile == options.getFileLayout()) {
job.getConfiguration().set(
MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE,
MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE_BINARY);
job.getConfiguration().setInt(
MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_BUFFER_SIZE,
options.getBufferSize()
);
} else {
job.getConfiguration().set(
MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE,
MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE_ASCII);
}
}
@Override
protected void configureOutputFormat(Job job, String tableName,
String tableClassName) throws ClassNotFoundException, IOException {
super.configureOutputFormat(job, tableName, tableClassName);
job.getConfiguration().set(
MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE,
options.getMainframeFtpTransferMode());
// use the default outputformat
LazyOutputFormat.setOutputFormatClass(job, getOutputFormatClass());
}
@Override
protected void configureMapper(Job job, String tableName,
String tableClassName) throws IOException {
super.configureMapper(job, tableName, tableClassName);
if (SqoopOptions.FileLayout.BinaryFile == options.getFileLayout()) {
job.setOutputKeyClass(BytesWritable.class);
job.setOutputValueClass(NullWritable.class);
// this is required as code generated class assumes setField method takes String
// and will fail with ClassCastException when a byte array is passed instead
// java.lang.ClassCastException: [B cannot be cast to java.lang.String
Configuration conf = job.getConfiguration();
conf.setClass(org.apache.sqoop.mapreduce.db.DBConfiguration.INPUT_CLASS_PROPERTY, MainframeDatasetBinaryRecord.class,
DBWritable.class);
}
}
@Override
protected Class<? extends OutputFormat> getOutputFormatClass() {
if (options.getFileLayout() == SqoopOptions.FileLayout.TextFile) {
return RawKeyTextOutputFormat.class;
} else if (options.getFileLayout()
== SqoopOptions.FileLayout.BinaryFile) {
return ByteKeyOutputFormat.class;
}
return null;
}
}

View File

@ -114,6 +114,7 @@ public abstract class BaseSqoopTool extends org.apache.sqoop.tool.SqoopTool {
public static final String FMT_TEXTFILE_ARG = "as-textfile";
public static final String FMT_AVRODATAFILE_ARG = "as-avrodatafile";
public static final String FMT_PARQUETFILE_ARG = "as-parquetfile";
public static final String FMT_BINARYFILE_ARG = "as-binaryfile";
public static final String HIVE_IMPORT_ARG = "hive-import";
public static final String HIVE_TABLE_ARG = "hive-table";
public static final String HIVE_DATABASE_ARG = "hive-database";

View File

@ -744,6 +744,10 @@ protected RelatedOptions getImportOptions() {
.withDescription("Imports data to Parquet files")
.withLongOpt(BaseSqoopTool.FMT_PARQUETFILE_ARG)
.create());
importOpts.addOption(OptionBuilder
.withDescription("Imports data to Binary files")
.withLongOpt(BaseSqoopTool.FMT_BINARYFILE_ARG)
.create());
importOpts.addOption(OptionBuilder.withArgName("n")
.hasArg().withDescription("Use 'n' map tasks to import in parallel")
.withLongOpt(NUM_MAPPERS_ARG)

View File

@ -20,6 +20,7 @@
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.ToolRunner;
@ -40,6 +41,7 @@ public class MainframeImportTool extends ImportTool {
public static final String DS_ARG = "dataset";
public static final String DS_TYPE_ARG = "datasettype";
public static final String DS_TAPE_ARG = "tape";
public static final String BUFFERSIZE_ARG = "buffersize";
public MainframeImportTool() {
super("import-mainframe", false);
@ -82,6 +84,14 @@ protected RelatedOptions getImportOptions() {
.withDescription("Imports data as plain text (default)")
.withLongOpt(FMT_TEXTFILE_ARG)
.create());
importOpts.addOption(OptionBuilder
.withDescription("Imports data as binary")
.withLongOpt(FMT_BINARYFILE_ARG)
.create());
importOpts.addOption(OptionBuilder
.hasArg().withDescription("Sets buffer size for binary import in bytes (default=32kB)")
.withLongOpt(BUFFERSIZE_ARG)
.create());
importOpts.addOption(OptionBuilder.withArgName("n")
.hasArg().withDescription("Use 'n' map tasks to import in parallel")
.withLongOpt(NUM_MAPPERS_ARG)
@ -168,6 +178,28 @@ public void applyOptions(CommandLine in, SqoopOptions out)
// set default tape value to false
out.setMainframeInputDatasetTape("false");
}
if (in.hasOption(FMT_BINARYFILE_ARG)) {
out.setMainframeFtpTransferMode(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE_BINARY);
out.setFileLayout(SqoopOptions.FileLayout.BinaryFile);
} else {
// set default transfer mode to ascii
out.setMainframeFtpTransferMode(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE_ASCII);
out.setFileLayout(SqoopOptions.FileLayout.TextFile);
}
if (in.hasOption(BUFFERSIZE_ARG)) {
// if we specify --buffersize set the buffer size
int bufSize = Integer.valueOf(in.getOptionValue(BUFFERSIZE_ARG));
if (bufSize > 0) {
out.setBufferSize(bufSize);
}
else {
out.setBufferSize(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE);
}
} else {
// set the default buffer size to 32kB
out.setBufferSize(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE);
}
}
@Override
@ -190,6 +222,17 @@ protected void validateImportOptions(SqoopOptions options)
throw new InvalidOptionsException(
"--" + DS_TAPE_ARG + " specified is invalid. " + HELP_STR);
}
/* only allow FileLayout.BinaryFile to be selected for mainframe import */
if (SqoopOptions.FileLayout.BinaryFile.equals(options.getFileLayout()) && StringUtils.isEmpty(options.getMainframeInputDatasetName())) {
throw new InvalidOptionsException("--as-binaryfile should only be used with import-mainframe module.");
}
// only allow buffer size to be set different to default when binary file is selected
// in any case, if --as-binaryfile isn't selected, --buffersize parameter is harmless
if (!SqoopOptions.FileLayout.BinaryFile.equals(options.getFileLayout()) && !MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE.equals(options.getBufferSize())) {
throw new InvalidOptionsException("--buffersize should only be used with --as-binaryfile parameter.");
}
super.validateImportOptions(options);
}
}

View File

@ -23,6 +23,7 @@
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.net.PrintCommandListener;
import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPClient;
@ -207,8 +208,18 @@ public static FTPClient getFTPConnection(Configuration conf)
throw new IOException("Could not login to server " + server
+ ":" + ftp.getReplyString());
}
// set ASCII transfer mode
ftp.setFileType(FTP.ASCII_FILE_TYPE);
// set transfer mode
String transferMode = conf.get(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE);
if (StringUtils.equalsIgnoreCase(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE_BINARY,transferMode)) {
LOG.info("Setting FTP transfer mode to binary");
// ftp.setFileTransferMode(FTP.BINARY_FILE_TYPE) doesn't work for MVS, it throws a syntax error
ftp.sendCommand("TYPE I");
// this is IMPORTANT - otherwise it will convert 0x0d0a to 0x0a = dropping bytes
ftp.setFileType(FTP.BINARY_FILE_TYPE);
} else {
LOG.info("Defaulting FTP transfer mode to ascii");
ftp.setFileTransferMode(FTP.ASCII_FILE_TYPE);
}
// Use passive mode as default.
ftp.enterLocalPassiveMode();
LOG.info("System type detected: " + ftp.getSystemType());

View File

@ -113,6 +113,42 @@ public void testImportGdgText() throws IOException {
doImportAndVerify(MainframeTestUtil.GDG_DATASET_NAME, MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_GDG, files);
}
@Test
public void testImportGdgBinary() throws IOException {
HashMap<String,String> files = new HashMap<String,String>();
files.put(MainframeTestUtil.GDG_BINARY_DATASET_FILENAME, MainframeTestUtil.EXPECTED_GDG_BINARY_DATASET_MD5);
doImportAndVerify(MainframeTestUtil.GDG_BINARY_DATASET_NAME, MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_GDG, files, "--as-binaryfile");
}
@Test
public void testImportGdgBinaryWithBufferSize() throws IOException {
HashMap<String,String> files = new HashMap<String,String>();
files.put(MainframeTestUtil.GDG_BINARY_DATASET_FILENAME, MainframeTestUtil.EXPECTED_GDG_BINARY_DATASET_MD5);
doImportAndVerify(MainframeTestUtil.GDG_BINARY_DATASET_NAME, MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_GDG, files, "--as-binaryfile", "--buffersize", "64000");
}
@Test
public void testImportSequential() throws IOException {
// can reuse the same dataset as binary as the dataset is plain text
HashMap<String,String> files = new HashMap<String,String>();
files.put(MainframeTestUtil.SEQ_DATASET_FILENAME, MainframeTestUtil.EXPECTED_SEQ_DATASET_MD5);
doImportAndVerify(MainframeTestUtil.SEQ_DATASET_NAME, MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_SEQUENTIAL, files);
}
@Test
public void testImportSequentialBinary() throws IOException {
HashMap<String,String> files = new HashMap<String,String>();
files.put(MainframeTestUtil.SEQ_BINARY_DATASET_FILENAME, MainframeTestUtil.EXPECTED_SEQ_BINARY_DATASET_MD5);
doImportAndVerify(MainframeTestUtil.SEQ_BINARY_DATASET_NAME, MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_SEQUENTIAL, files, "--as-binaryfile");
}
@Test
public void testImportSequentialBinaryWithBufferSize() throws IOException {
HashMap<String,String> files = new HashMap<String,String>();
files.put(MainframeTestUtil.SEQ_BINARY_DATASET_FILENAME, MainframeTestUtil.EXPECTED_SEQ_BINARY_DATASET_MD5);
doImportAndVerify(MainframeTestUtil.SEQ_BINARY_DATASET_NAME, MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_SEQUENTIAL, files, "--as-binaryfile", "--buffersize", "64000");
}
private String [] getArgv(String datasetName, String datasetType, String ... extraArgs) {
ArrayList<String> args = new ArrayList<String>();
@ -130,7 +166,6 @@ public void testImportGdgText() throws IOException {
args.add(datasetType);
if (extraArgs.length > 0) {
args.add("--");
for (String arg : extraArgs) {
args.add(arg);
}

View File

@ -41,4 +41,31 @@ public class MainframeTestUtil {
public static final String EXPECTED_GDG_DATASET_MD5 = System.getProperty(
"sqoop.test.mainframe.ftp.dataset.gdg.md5",
"f0d0d171fdb8a03dbc1266ed179d7093");
public static final String GDG_BINARY_DATASET_NAME = System.getProperty(
"sqoop.test.mainframe.ftp.binary.dataset.gdg",
"TSODIQ1.FOLDER");
public static final String GDG_BINARY_DATASET_FILENAME = System.getProperty(
"sqoop.test.mainframe.ftp.binary.dataset.gdg.filename",
"G0002V45");
public static final String EXPECTED_GDG_BINARY_DATASET_MD5 = System.getProperty(
"sqoop.test.mainframe.ftp.binary.dataset.gdg.md5",
"43eefbe34e466dd3f65a3e867a60809a");
public static final String SEQ_DATASET_NAME = System.getProperty(
"sqoop.test.mainframe.ftp.dataset.seq",
"TSODIQ1.GDGTEXT.G0001V43");
public static final String SEQ_DATASET_FILENAME = System.getProperty(
"sqoop.test.mainframe.ftp.dataset.seq.filename",
"G0001V43");
public static final String EXPECTED_SEQ_DATASET_MD5 = System.getProperty(
"sqoop.test.mainframe.ftp.dataset.seq.md5",
"f0d0d171fdb8a03dbc1266ed179d7093");
public static final String SEQ_BINARY_DATASET_NAME = System.getProperty(
"sqoop.test.mainframe.ftp.binary.dataset.seq",
"TSODIQ1.FOLDER.FOLDERTXT");
public static final String SEQ_BINARY_DATASET_FILENAME = System.getProperty(
"sqoop.test.mainframe.ftp.binary.dataset.seq.filename",
"FOLDERTXT");
public static final String EXPECTED_SEQ_BINARY_DATASET_MD5 = System.getProperty(
"sqoop.test.mainframe.ftp.binary.dataset.seq.md5",
"1591c0fcc718fda7e9c1f3561d232b2b");
}

View File

@ -0,0 +1,164 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce.mainframe;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.io.IOException;
import java.io.InputStream;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.doReturn;
public class TestMainframeDatasetBinaryRecord {
private MainframeDatasetFTPRecordReader ftpRecordReader;
private InputStream is;
private FTPClient ftp;
private final String DATASET_NAME = "dummy.ds";
private final String DATASET_TYPE = "g";
private static final Log LOG = LogFactory.getLog(
TestMainframeDatasetBinaryRecord.class.getName());
@Before
public void setUp() throws IOException, InterruptedException {
MainframeDatasetFTPRecordReader rdr = new MainframeDatasetFTPRecordReader();
Configuration conf;
MainframeDatasetInputSplit split;
TaskAttemptContext context;
ftpRecordReader = spy(rdr);
is = mock(InputStream.class);
ftp = mock(FTPClient.class);
split = mock(MainframeDatasetInputSplit.class);
context = mock(TaskAttemptContext.class);
conf = new Configuration();
when(ftp.retrieveFileStream(any(String.class))).thenReturn(is);
when(ftp.changeWorkingDirectory(any(String.class))).thenReturn(true);
doReturn("file1").when(ftpRecordReader).getNextDataset();
when(split.getNextDataset()).thenReturn(DATASET_NAME);
when(ftpRecordReader.getNextDataset()).thenReturn(DATASET_NAME);
conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME,DATASET_NAME);
conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE,DATASET_TYPE);
conf.setInt(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_BUFFER_SIZE,MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE);
ftpRecordReader.initialize(ftp, conf);
}
// Mock the inputstream.read method and manipulate the function parameters
protected Answer returnSqoopRecord(final int byteLength) {
return new Answer() {
public Object answer(InvocationOnMock invocation) {
return byteLength;
}
};
}
@Test
public void testGetNextBinaryRecordForFullRecord() {
MainframeDatasetBinaryRecord record = new MainframeDatasetBinaryRecord();
try {
when(is.read(any(byte[].class),anyInt(),anyInt()))
.thenAnswer(returnSqoopRecord(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE))
.thenReturn(-1);
when(ftp.completePendingCommand()).thenReturn(true);
Assert.assertTrue(ftpRecordReader.getNextBinaryRecord(record));
Assert.assertFalse(record.getFieldMap().values().isEmpty());
Assert.assertEquals(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE.intValue(),((byte[])record.getFieldMap().values().iterator().next()).length);
} catch (IOException ioe) {
LOG.error("Issue with reading 1 full binary buffer record", ioe);
throw new RuntimeException(ioe);
}
}
@Test
public void testGetNextBinaryRecordForPartialRecord() {
int expectedBytesRead = 10;
MainframeDatasetBinaryRecord record = new MainframeDatasetBinaryRecord();
try {
when(is.read(any(byte[].class),anyInt(),anyInt()))
.thenAnswer(returnSqoopRecord(10))
.thenReturn(-1);
when(ftp.completePendingCommand()).thenReturn(true);
Assert.assertTrue(ftpRecordReader.getNextBinaryRecord(record));
Assert.assertFalse(record.getFieldMap().values().isEmpty());
Assert.assertEquals(expectedBytesRead,(((byte[])record.getFieldMap().values().iterator().next()).length));
} catch (IOException ioe) {
LOG.error("Issue with reading 10 byte binary record", ioe);
throw new RuntimeException(ioe);
}
}
@Test
public void testGetNextBinaryRecordFor2Records() {
// test 1 full record, and 1 partial
int expectedBytesRead = 10;
MainframeDatasetBinaryRecord record = new MainframeDatasetBinaryRecord();
try {
when(is.read(any(byte[].class),anyInt(),anyInt()))
.thenAnswer(returnSqoopRecord(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE))
.thenAnswer(returnSqoopRecord(10))
.thenReturn(-1);
when(ftp.completePendingCommand()).thenReturn(true);
Assert.assertTrue(ftpRecordReader.getNextBinaryRecord(record));
Assert.assertFalse(record.getFieldMap().values().isEmpty());
Assert.assertTrue(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE.equals((((byte[])record.getFieldMap().values().iterator().next()).length)));
record = new MainframeDatasetBinaryRecord();
Assert.assertTrue(ftpRecordReader.getNextBinaryRecord(record));
Assert.assertFalse(record.getFieldMap().values().isEmpty());
Assert.assertEquals(expectedBytesRead,(((byte[])record.getFieldMap().values().iterator().next()).length));
} catch (IOException ioe) {
LOG.error("Issue with reading 1 full binary buffer record followed by 1 partial binary buffer record", ioe);
throw new RuntimeException(ioe);
}
}
@Test
public void testGetNextBinaryRecordForMultipleReads() {
// test reading 1 record where the stream returns less than a full buffer
MainframeDatasetBinaryRecord record = new MainframeDatasetBinaryRecord();
try {
when(is.read(any(byte[].class),anyInt(),anyInt()))
.thenAnswer(returnSqoopRecord(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE /2))
.thenAnswer(returnSqoopRecord(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE /2))
.thenReturn(-1);
when(ftp.completePendingCommand()).thenReturn(true);
Assert.assertTrue(ftpRecordReader.getNextBinaryRecord(record));
Assert.assertFalse(record.getFieldMap().values().isEmpty());
Assert.assertEquals(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE.intValue(),((byte[])record.getFieldMap().values().iterator().next()).length);
record = new MainframeDatasetBinaryRecord();
Assert.assertFalse(ftpRecordReader.getNextBinaryRecord(record));
Assert.assertNull((((byte[])record.getFieldMap().values().iterator().next())));
} catch (IOException ioe) {
LOG.error("Issue with verifying reading partial buffer binary records", ioe);
throw new RuntimeException(ioe);
}
}
}

View File

@ -22,18 +22,18 @@
import java.lang.reflect.Method;
import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sqoop.cli.RelatedOptions;
import org.apache.sqoop.mapreduce.mainframe.MainframeConfiguration;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.apache.sqoop.SqoopOptions;
import org.apache.sqoop.SqoopOptions.InvalidOptionsException;
import org.apache.sqoop.cli.ToolOptions;
import org.apache.sqoop.testutil.BaseSqoopTestCase;
import org.junit.rules.ExpectedException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@ -43,15 +43,16 @@
public class TestMainframeImportTool extends BaseSqoopTestCase {
private static final Log LOG = LogFactory.getLog(TestMainframeImportTool.class
.getName());
private MainframeImportTool mfImportTool;
private ToolOptions toolOptions;
private SqoopOptions sqoopOption;
@Before
public void setUp() {
mfImportTool = new MainframeImportTool();
toolOptions = new ToolOptions();
sqoopOption = new SqoopOptions();
}
@After
@ -183,4 +184,58 @@ public void testTapeOptionInvalidReturnsFalse() throws ParseException, InvalidOp
Boolean isTape = sqoopOption.getMainframeInputDatasetTape();
assert(isTape != null && isTape.toString().equals("false"));
}
@Test
public void testFtpTransferModeAscii() throws ParseException, InvalidOptionsException {
String[] args = new String[] { "--dataset", "mydatasetname", "--as-textfile" };
configureAndValidateOptions(args);
assertEquals(SqoopOptions.FileLayout.TextFile,sqoopOption.getFileLayout());
}
@Test
public void testFtpTransferModeDefaultsToAscii() throws ParseException, InvalidOptionsException {
String[] args = new String[] { "--dataset", "mydatasetname" };
configureAndValidateOptions(args);
assertEquals(SqoopOptions.FileLayout.TextFile,sqoopOption.getFileLayout());
}
@Test
public void testAsBinaryFileSetsCorrectFileLayoutAndDefaultBufferSize() throws ParseException, InvalidOptionsException {
String[] args = new String[] { "--dataset", "mydatasetname", "--as-binaryfile" };
configureAndValidateOptions(args);
assertEquals(SqoopOptions.FileLayout.BinaryFile,sqoopOption.getFileLayout());
assertEquals(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE, sqoopOption.getBufferSize());
}
@Test
public void testSetBufferSize() throws ParseException, InvalidOptionsException {
final Integer EXPECTED_BUFFER = 1024;
String[] args = new String[] { "--dataset", "mydatasetname", "--as-binaryfile", "--buffersize", EXPECTED_BUFFER.toString() };
configureAndValidateOptions(args);
assertEquals(SqoopOptions.FileLayout.BinaryFile,sqoopOption.getFileLayout());
assertEquals(EXPECTED_BUFFER, sqoopOption.getBufferSize());
}
@Rule
public final ExpectedException exception = ExpectedException.none();
@Test
public void testBufferSizeWithoutBinaryThrowsException() throws ParseException, InvalidOptionsException {
final Integer EXPECTED_BUFFER = 1024;
String[] args = new String[] { "--buffersize", EXPECTED_BUFFER.toString() };
exception.expect(InvalidOptionsException.class);
configureAndValidateOptions(args);
}
@Test
public void testInvalidBufferSizeThrowsNumberFormatException() throws ParseException, InvalidOptionsException {
String[] args = new String[] { "--buffersize", "invalidinteger" };
exception.expect(NumberFormatException.class);
configureAndValidateOptions(args);
}
private void configureAndValidateOptions(String[] args) throws ParseException, InvalidOptionsException {
mfImportTool.configureOptions(toolOptions);
sqoopOption = mfImportTool.parseArguments(args, null, sqoopOption, false);
mfImportTool.validateImportOptions(sqoopOption);
}
}