From 0d6c455e5bbfc092d4a90f352eb262f347758132 Mon Sep 17 00:00:00 2001 From: Szabolcs Vasas Date: Thu, 23 Aug 2018 17:08:00 +0200 Subject: [PATCH] SQOOP-3224: Mainframe FTP transfer should have an option to use binary mode for transfer (Chris Teoh via Szabolcs Vasas) --- build.xml | 18 ++ src/docs/user/import-mainframe.txt | 38 ++++ src/java/org/apache/sqoop/SqoopOptions.java | 32 +++- .../sqoop/mapreduce/ByteKeyOutputFormat.java | 42 +++++ .../sqoop/mapreduce/KeyRecordWriters.java | 97 +++++++++++ .../mapreduce/RawKeyTextOutputFormat.java | 62 ++----- .../AbstractMainframeDatasetImportMapper.java | 65 +++++++ .../mainframe/MainframeConfiguration.java | 11 ++ .../MainframeDatasetBinaryImportMapper.java | 37 ++++ .../MainframeDatasetBinaryRecord.java | 123 +++++++++++++ .../MainframeDatasetFTPRecordReader.java | 119 +++++++++++-- .../MainframeDatasetImportMapper.java | 52 +----- .../mainframe/MainframeImportJob.java | 59 ++++++- .../org/apache/sqoop/tool/BaseSqoopTool.java | 1 + .../org/apache/sqoop/tool/ImportTool.java | 4 + .../sqoop/tool/MainframeImportTool.java | 43 +++++ .../sqoop/util/MainframeFTPClientUtils.java | 15 +- .../mainframe/MainframeManagerImportTest.java | 37 +++- .../manager/mainframe/MainframeTestUtil.java | 27 +++ .../TestMainframeDatasetBinaryRecord.java | 164 ++++++++++++++++++ .../sqoop/tool/TestMainframeImportTool.java | 65 ++++++- 21 files changed, 989 insertions(+), 122 deletions(-) create mode 100644 src/java/org/apache/sqoop/mapreduce/ByteKeyOutputFormat.java create mode 100644 src/java/org/apache/sqoop/mapreduce/KeyRecordWriters.java create mode 100644 src/java/org/apache/sqoop/mapreduce/mainframe/AbstractMainframeDatasetImportMapper.java create mode 100644 src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetBinaryImportMapper.java create mode 100644 src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetBinaryRecord.java create mode 100644 src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeDatasetBinaryRecord.java diff --git a/build.xml b/build.xml index 084823cf..cd2e9e29 100644 --- a/build.xml +++ b/build.xml @@ -253,6 +253,15 @@ + + + + + + + + + @@ -890,10 +899,19 @@ + + + + + + + + + diff --git a/src/docs/user/import-mainframe.txt b/src/docs/user/import-mainframe.txt index abeb7cde..3ecfb7e4 100644 --- a/src/docs/user/import-mainframe.txt +++ b/src/docs/user/import-mainframe.txt @@ -49,6 +49,7 @@ Argument Description +\--as-sequencefile+ Imports data to SequenceFiles +\--as-textfile+ Imports data as plain text (default) +\--as-parquetfile+ Imports data to Parquet Files ++\--as-binaryfile+ Imports data as binary files +\--delete-target-dir+ Delete the import target directory\ if it exists +-m,\--num-mappers + Use 'n' map tasks to import in parallel @@ -193,6 +194,26 @@ $ sqoop import-mainframe --dataset SomePDS --jar-file mydatatypes.jar \ This command will load the +SomePDSType+ class out of +mydatatypes.jar+. +Support for Generation Data Group and Sequential data sets. +This can be specified with the --datasettype option followed by one of: +'p' for partitioned dataset (default) +'g' for generation data group dataset +'s' for sequential dataset + +In the case of generation data group datasets, Sqoop will retrieve just the last or +latest file (or generation). + +In the case of sequential datasets, Sqoop will retrieve just the file specified. + +Support of datasets that are stored on tape volumes by specifying --tape true. + +By default, mainframe datasets are assumed to be plain text. Attempting to transfer +binary datasets using this method will result in data corruption. +Support for binary datasets by specifying --as-binaryfile and optionally --buffersize followed by +buffer size specified in bytes. By default, --buffersize is set to 32760 bytes. Altering buffersize +will alter the number of records Sqoop reports to have imported. This is because it reads the +binary dataset in chunks specified by buffersize. Larger buffer size means lower number of records. + Additional Import Configuration Properties ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ There are some additional properties which can be configured by modifying @@ -228,6 +249,23 @@ $ sqoop import-mainframe --connect z390 --dataset EMPLOYEES \ Enter password: (hidden) ---- +Import of a tape based generation data group dataset using a password alias and writing out to +an intermediate directory (--outdir) before moving it to (--target-dir). +---- +$ sqoop import-mainframe --dataset SomeGdg --connect --username myuser --password-alias \ + mypasswordalias --datasettype g --tape true --outdir /tmp/imported/sqoop \ + --target-dir /data/imported/mainframe/SomeGdg +---- + +Import of a tape based binary generation data group dataset with a buffer size of 64000 using a +password alias and writing out to an intermediate directory (--outdir) before moving it +to (--target-dir). +---- +$ sqoop import-mainframe --dataset SomeGdg --connect --username myuser --password-alias \ + mypasswordalias --datasettype g --tape true --as-binaryfile --buffersize 64000 --outdir /tmp/imported/sqoop \ + --target-dir /data/imported/mainframe/SomeGdg +---- + Controlling the import parallelism (using 8 parallel tasks): ---- diff --git a/src/java/org/apache/sqoop/SqoopOptions.java b/src/java/org/apache/sqoop/SqoopOptions.java index f97dbfdf..f06872f9 100644 --- a/src/java/org/apache/sqoop/SqoopOptions.java +++ b/src/java/org/apache/sqoop/SqoopOptions.java @@ -87,7 +87,8 @@ public enum FileLayout { TextFile, SequenceFile, AvroDataFile, - ParquetFile + ParquetFile, + BinaryFile } /** @@ -362,7 +363,12 @@ public String toString() { // Indicates if the data set is on tape to use different FTP parser @StoredAsProperty("mainframe.input.dataset.tape") private String mainframeInputDatasetTape; - + // Indicates if binary or ascii FTP transfer mode should be used + @StoredAsProperty("mainframe.ftp.transfermode") + private String mainframeFtpTransferMode; + // Buffer size to use when using binary FTP transfer mode + @StoredAsProperty("mainframe.ftp.buffersize") + private Integer bufferSize; // Accumulo home directory private String accumuloHome; // not serialized to metastore. // Zookeeper home directory @@ -1162,6 +1168,11 @@ private void initDefaults(Configuration baseConfiguration) { this.escapeColumnMappingEnabled = true; this.parquetConfiguratorImplementation = HADOOP; + + // set default transfer mode to ascii + this.mainframeFtpTransferMode = MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE_ASCII; + // set default buffer size for mainframe binary transfers + this.bufferSize = MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE; } /** @@ -2499,6 +2510,23 @@ public Boolean getMainframeInputDatasetTape() { public void setMainframeInputDatasetTape(String txtIsFromTape) { mainframeInputDatasetTape = Boolean.valueOf(Boolean.parseBoolean(txtIsFromTape)).toString(); } + // returns the buffer size set. + public Integer getBufferSize() { + return bufferSize; + } + + public void setMainframeFtpTransferMode(String transferMode) { + mainframeFtpTransferMode = transferMode; + } + + public String getMainframeFtpTransferMode() { + return mainframeFtpTransferMode; + } + + // sets the binary transfer buffer size, defaults to MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE + public void setBufferSize(int buf) { + bufferSize = buf; + } public static String getAccumuloHomeDefault() { // Set this with $ACCUMULO_HOME, but -Daccumulo.home can override. diff --git a/src/java/org/apache/sqoop/mapreduce/ByteKeyOutputFormat.java b/src/java/org/apache/sqoop/mapreduce/ByteKeyOutputFormat.java new file mode 100644 index 00000000..f7427dbd --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/ByteKeyOutputFormat.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce; + +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * An {@link OutputFormat} that writes binary files. + * Only writes the key. Does not write any delimiter/newline after the key. + */ +public class ByteKeyOutputFormat extends RawKeyTextOutputFormat { + + // currently don't support compression + private static final String FILE_EXTENSION = ""; + + public RecordWriter getRecordWriter(TaskAttemptContext context) + throws IOException { + DataOutputStream ostream = getFSDataOutputStream(context,FILE_EXTENSION); + return new KeyRecordWriters.BinaryKeyRecordWriter(ostream); + } + +} diff --git a/src/java/org/apache/sqoop/mapreduce/KeyRecordWriters.java b/src/java/org/apache/sqoop/mapreduce/KeyRecordWriters.java new file mode 100644 index 00000000..9630a818 --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/KeyRecordWriters.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce; + +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.DataOutputStream; +import java.io.IOException; + +public class KeyRecordWriters { + /** + * RecordWriter to write to plain text files. + */ + + public static class GenericRecordWriter extends RecordWriter { + private static final String UTF8 = "UTF-8"; + + protected DataOutputStream out; + + /** + * Write the object to the byte stream, handling Text as a special + * case. + * + * @param o the object to print + * @param value the corresponding value for key o + * @throws IOException if the write throws, we pass it on + */ + protected void writeObject(Object o,Object value) throws IOException { + if (o instanceof Text) { + Text to = (Text) o; + out.write(to.getBytes(), 0, to.getLength()); + } else { + out.write(o.toString().getBytes(UTF8)); + } + } + + @Override + public synchronized void write(K key, V value) throws IOException, InterruptedException { + writeObject(key,value); + } + + @Override + public synchronized void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { + out.close(); + } + } + + public static class RawKeyRecordWriter extends GenericRecordWriter { + + public RawKeyRecordWriter(DataOutputStream out) { + this.out = out; + } + } + + /** + * RecordWriter to write to plain text files. + */ + public static class BinaryKeyRecordWriter extends GenericRecordWriter { + + public BinaryKeyRecordWriter(DataOutputStream out) { + this.out = out; + } + + /** + * Write the object to the byte stream, handling Text as a special + * case. + * @param o the object to print + * @throws IOException if the write throws, we pass it on + */ + @Override + protected void writeObject(Object o, Object value) throws IOException { + if (o instanceof BytesWritable) { + BytesWritable to = (BytesWritable) o; + out.write(to.getBytes(), 0, to.getLength()); + } + } + } +} diff --git a/src/java/org/apache/sqoop/mapreduce/RawKeyTextOutputFormat.java b/src/java/org/apache/sqoop/mapreduce/RawKeyTextOutputFormat.java index fec34f21..8e81aa43 100644 --- a/src/java/org/apache/sqoop/mapreduce/RawKeyTextOutputFormat.java +++ b/src/java/org/apache/sqoop/mapreduce/RawKeyTextOutputFormat.java @@ -24,7 +24,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; @@ -38,47 +37,15 @@ */ public class RawKeyTextOutputFormat extends FileOutputFormat { - /** - * RecordWriter to write to plain text files. - */ - public static class RawKeyRecordWriter extends RecordWriter { - - private static final String UTF8 = "UTF-8"; - - protected DataOutputStream out; - - public RawKeyRecordWriter(DataOutputStream out) { - this.out = out; - } - - /** - * Write the object to the byte stream, handling Text as a special - * case. - * @param o the object to print - * @throws IOException if the write throws, we pass it on - */ - private void writeObject(Object o) throws IOException { - if (o instanceof Text) { - Text to = (Text) o; - out.write(to.getBytes(), 0, to.getLength()); - } else { - out.write(o.toString().getBytes(UTF8)); - } - } - - public synchronized void write(K key, V value) throws IOException { - writeObject(key); - } - - public synchronized void close(TaskAttemptContext context) - throws IOException { - out.close(); - } - + protected FSDataOutputStream getFSDataOutputStream(TaskAttemptContext context, String ext) throws IOException { + Configuration conf = context.getConfiguration(); + Path file = getDefaultWorkFile(context, ext); + FileSystem fs = file.getFileSystem(conf); + FSDataOutputStream fileOut = fs.create(file, false); + return fileOut; } - public RecordWriter getRecordWriter(TaskAttemptContext context) - throws IOException { + protected DataOutputStream getOutputStream(TaskAttemptContext context) throws IOException { boolean isCompressed = getCompressOutput(context); Configuration conf = context.getConfiguration(); String ext = ""; @@ -93,17 +60,18 @@ public RecordWriter getRecordWriter(TaskAttemptContext context) ext = codec.getDefaultExtension(); } - Path file = getDefaultWorkFile(context, ext); - FileSystem fs = file.getFileSystem(conf); - FSDataOutputStream fileOut = fs.create(file, false); + FSDataOutputStream fileOut = getFSDataOutputStream(context,ext); DataOutputStream ostream = fileOut; if (isCompressed) { ostream = new DataOutputStream(codec.createOutputStream(fileOut)); } - - return new RawKeyRecordWriter(ostream); + return ostream; } -} - + public RecordWriter getRecordWriter(TaskAttemptContext context) + throws IOException { + DataOutputStream ostream = getOutputStream(context); + return new KeyRecordWriters.RawKeyRecordWriter(ostream); + } +} \ No newline at end of file diff --git a/src/java/org/apache/sqoop/mapreduce/mainframe/AbstractMainframeDatasetImportMapper.java b/src/java/org/apache/sqoop/mapreduce/mainframe/AbstractMainframeDatasetImportMapper.java new file mode 100644 index 00000000..9304fe2f --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/mainframe/AbstractMainframeDatasetImportMapper.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce.mainframe; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; +import org.apache.sqoop.config.ConfigurationConstants; +import org.apache.sqoop.lib.SqoopRecord; +import org.apache.sqoop.mapreduce.AutoProgressMapper; + +import java.io.IOException; + +public abstract class AbstractMainframeDatasetImportMapper + extends AutoProgressMapper { + + private MainframeDatasetInputSplit inputSplit; + private MultipleOutputs multiFileWriter; + private long numberOfRecords; + + public void map(LongWritable key, SqoopRecord val, Context context) + throws IOException, InterruptedException { + String dataset = inputSplit.getCurrentDataset(); + numberOfRecords++; + multiFileWriter.write(createOutKey(val), NullWritable.get(), dataset); + } + + @Override + protected void setup(Context context) + throws IOException, InterruptedException { + super.setup(context); + inputSplit = (MainframeDatasetInputSplit)context.getInputSplit(); + multiFileWriter = new MultipleOutputs<>(context); + numberOfRecords = 0; + } + + @Override + protected void cleanup(Context context) + throws IOException, InterruptedException { + super.cleanup(context); + multiFileWriter.close(); + context.getCounter( + ConfigurationConstants.COUNTER_GROUP_MAPRED_TASK_COUNTERS, + ConfigurationConstants.COUNTER_MAP_OUTPUT_RECORDS) + .increment(numberOfRecords); + } + + protected abstract KEY createOutKey(SqoopRecord sqoopRecord); +} \ No newline at end of file diff --git a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeConfiguration.java b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeConfiguration.java index ea54b07f..9d6a2fe7 100644 --- a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeConfiguration.java +++ b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeConfiguration.java @@ -33,4 +33,15 @@ public class MainframeConfiguration public static final String MAINFRAME_INPUT_DATASET_TAPE = "mainframe.input.dataset.tape"; public static final String MAINFRAME_FTP_FILE_ENTRY_PARSER_CLASSNAME = "org.apache.sqoop.mapreduce.mainframe.MainframeFTPFileEntryParser"; + + public static final String MAINFRAME_FTP_TRANSFER_MODE = "mainframe.ftp.transfermode"; + + public static final String MAINFRAME_FTP_TRANSFER_MODE_ASCII = "ascii"; + + public static final String MAINFRAME_FTP_TRANSFER_MODE_BINARY = "binary"; + + // this is the default buffer size used when doing binary ftp transfers from mainframe + public static final Integer MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE = 32760; + + public static final String MAINFRAME_FTP_TRANSFER_BINARY_BUFFER_SIZE = "mainframe.ftp.buffersize"; } diff --git a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetBinaryImportMapper.java b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetBinaryImportMapper.java new file mode 100644 index 00000000..b2417b98 --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetBinaryImportMapper.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce.mainframe; + +import org.apache.hadoop.io.BytesWritable; +import org.apache.sqoop.lib.SqoopRecord; + +/** + * Mapper that writes mainframe dataset records in binary format to multiple files + * based on the key, which is the index of the datasets in the input split. + */ +public class MainframeDatasetBinaryImportMapper extends AbstractMainframeDatasetImportMapper { + + @Override + protected BytesWritable createOutKey(SqoopRecord sqoopRecord) { + BytesWritable result = new BytesWritable(); + byte[] bytes = (byte[]) sqoopRecord.getFieldMap().entrySet().iterator().next().getValue(); + result.set(bytes,0, bytes.length); + return result; + } +} \ No newline at end of file diff --git a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetBinaryRecord.java b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetBinaryRecord.java new file mode 100644 index 00000000..6e827988 --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetBinaryRecord.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.mapreduce.mainframe; + +import org.apache.sqoop.lib.DelimiterSet; +import org.apache.sqoop.lib.LargeObjectLoader; +import org.apache.hadoop.io.Text; +import org.apache.sqoop.lib.SqoopRecord; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; + +public class MainframeDatasetBinaryRecord extends SqoopRecord { + + private byte[] field; + + public Map getFieldMap() { + Map map = new HashMap(); + map.put("fieldName", field); + return map; + } + + public void setField(String fieldName, Object fieldVal) { + if (fieldVal instanceof byte[]) { + field = (byte[]) fieldVal; + } + } + + public void setField(final byte[] val) { + this.field = val; + } + + @Override + public void readFields(DataInput in) throws IOException { + in.readFully(field); + } + + @Override + public void write(DataOutput out) throws IOException { + out.write(field); + } + + @Override + public void readFields(ResultSet rs) throws SQLException { + field = rs.getBytes(1); + } + + @Override + public void write(PreparedStatement s) throws SQLException { + s.setBytes(1, field); + } + + @Override + public String toString() { + return field.toString(); + } + + @Override + public int write(PreparedStatement stmt, int offset) throws SQLException { + return 0; + } + + @Override + public String toString(DelimiterSet delimiters) { + return null; + } + + @Override + public int getClassFormatVersion() { + return 0; + } + + @Override + public int hashCode() { + return field.hashCode(); + } + + public void parse(CharSequence s) { + } + + public void parse(Text s) { + } + + public void parse(byte[] s) { + } + + public void parse(char[] s) { + } + + public void parse(ByteBuffer s) { + } + + public void parse(CharBuffer s) { + } + + @Override + public void loadLargeObjects(LargeObjectLoader objLoader) throws SQLException, IOException, InterruptedException { + + } +} \ No newline at end of file diff --git a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetFTPRecordReader.java b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetFTPRecordReader.java index 1f78384b..78c46656 100644 --- a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetFTPRecordReader.java +++ b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetFTPRecordReader.java @@ -18,9 +18,11 @@ package org.apache.sqoop.mapreduce.mainframe; +import java.io.BufferedInputStream; import java.io.BufferedReader; -import java.io.InputStreamReader; import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.ByteBuffer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -38,6 +40,7 @@ public class MainframeDatasetFTPRecordReader extends MainframeDatasetRecordReader { private FTPClient ftp = null; private BufferedReader datasetReader = null; + private BufferedInputStream inputStream = null; private static final Log LOG = LogFactory.getLog( MainframeDatasetFTPRecordReader.class.getName()); @@ -50,21 +53,27 @@ public void initialize(InputSplit inputSplit, Configuration conf = getConfiguration(); ftp = MainframeFTPClientUtils.getFTPConnection(conf); + initialize(ftp,conf); + } + + public void initialize(FTPClient ftpClient, Configuration conf) + throws IOException { + ftp = ftpClient; if (ftp != null) { - String dsName = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME); - String dsType = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE); - MainframeDatasetPath p = null; - try { - p = new MainframeDatasetPath(dsName,conf); - } catch (Exception e) { - LOG.error(e.getMessage()); - LOG.error("MainframeDatasetPath helper class incorrectly initialised"); - e.printStackTrace(); - } - if (dsType != null && p != null) { - dsName = p.getMainframeDatasetFolder(); - } - ftp.changeWorkingDirectory("'" + dsName + "'"); + String dsName = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME); + String dsType = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE); + MainframeDatasetPath p = null; + try { + p = new MainframeDatasetPath(dsName,conf); + } catch (Exception e) { + LOG.error(e.getMessage()); + LOG.error("MainframeDatasetPath helper class incorrectly initialised"); + e.printStackTrace(); + } + if (dsType != null && p != null) { + dsName = p.getMainframeDatasetFolder(); + } + ftp.changeWorkingDirectory("'" + dsName + "'"); } } @@ -80,6 +89,10 @@ public void close() throws IOException { protected boolean getNextRecord(T sqoopRecord) throws IOException { String line = null; + Configuration conf = getConfiguration(); + if (MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE_BINARY.equals(conf.get(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE))) { + return getNextBinaryRecord(sqoopRecord); + } try { do { if (datasetReader == null) { @@ -112,9 +125,85 @@ protected boolean getNextRecord(T sqoopRecord) throws IOException { return false; } + protected boolean getNextBinaryRecord(T sqoopRecord) throws IOException { + Configuration conf = getConfiguration(); + // typical estimated max size for mainframe record + int BUFFER_SIZE = MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE; + if (conf != null) { + BUFFER_SIZE = conf.getInt(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_BUFFER_SIZE, MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE); + } + byte[] buf = new byte[BUFFER_SIZE]; + int bytesRead = -1; + int cumulativeBytesRead = 0; + try { + Boolean streamInited = initInputStream(BUFFER_SIZE); + if (!streamInited) { + LOG.info("No more datasets to process."); + return false; + } + do { + bytesRead = inputStream.read(buf,cumulativeBytesRead,BUFFER_SIZE-cumulativeBytesRead); + if (bytesRead == -1) { + // EOF + closeFtpInputStream(); + LOG.info("Data transfer completed."); + return writeBytesToSqoopRecord(buf,cumulativeBytesRead,sqoopRecord); + } + cumulativeBytesRead += bytesRead; + if (cumulativeBytesRead == BUFFER_SIZE) { + return writeBytesToSqoopRecord(buf,cumulativeBytesRead,sqoopRecord); + } + } while (bytesRead != -1); + } catch (IOException ioe) { + throw new IOException("IOException during data transfer: " + ioe); + } + return false; + } + + protected Boolean initInputStream(int bufferSize) throws IOException { + if (inputStream == null) { + String dsName = getNextDataset(); + if (dsName == null) { + LOG.info("No more datasets to process. Returning."); + return false; + } + LOG.info("Attempting to retrieve file stream for: "+dsName); + LOG.info("Buffer size: "+bufferSize); + inputStream = new BufferedInputStream(ftp.retrieveFileStream(dsName)); + if (inputStream == null) { + throw new IOException("Failed to retrieve FTP file stream."); + } + } + return true; + } + + protected void closeFtpInputStream() throws IOException { + inputStream.close(); + inputStream = null; + if (!ftp.completePendingCommand()) { + throw new IOException("Failed to complete ftp command. FTP Response: "+ftp.getReplyString()); + } + } + + protected Boolean writeBytesToSqoopRecord(byte[] buf, int cumulativeBytesRead, SqoopRecord sqoopRecord) { + if (cumulativeBytesRead <= 0) { + return false; + } + ByteBuffer buffer = ByteBuffer.allocate(cumulativeBytesRead); + buffer.put(buf,0,cumulativeBytesRead); + convertToSqoopRecord(buffer.array(), sqoopRecord); + return true; + } + private void convertToSqoopRecord(String line, SqoopRecord sqoopRecord) { String fieldName = sqoopRecord.getFieldMap().entrySet().iterator().next().getKey(); sqoopRecord.setField(fieldName, line); } + + private void convertToSqoopRecord(byte[] buf, SqoopRecord sqoopRecord) { + String fieldName + = sqoopRecord.getFieldMap().entrySet().iterator().next().getKey(); + sqoopRecord.setField(fieldName, buf); + } } diff --git a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetImportMapper.java b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetImportMapper.java index 0b7b5b85..0510e829 100644 --- a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetImportMapper.java +++ b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetImportMapper.java @@ -18,61 +18,19 @@ package org.apache.sqoop.mapreduce.mainframe; -import java.io.IOException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; - -import org.apache.sqoop.config.ConfigurationConstants; import org.apache.sqoop.lib.SqoopRecord; -import org.apache.sqoop.mapreduce.AutoProgressMapper; /** * Mapper that writes mainframe dataset records in Text format to multiple files * based on the key, which is the index of the datasets in the input split. */ -public class MainframeDatasetImportMapper - extends AutoProgressMapper { - - private static final Log LOG = LogFactory.getLog( - MainframeDatasetImportMapper.class.getName()); - - private MainframeDatasetInputSplit inputSplit; - private MultipleOutputs mos; - private long numberOfRecords; - private Text outkey; - - public void map(LongWritable key, SqoopRecord val, Context context) - throws IOException, InterruptedException { - String dataset = inputSplit.getCurrentDataset(); - outkey.set(val.toString()); - numberOfRecords++; - mos.write(outkey, NullWritable.get(), dataset); - } +public class MainframeDatasetImportMapper extends AbstractMainframeDatasetImportMapper { @Override - protected void setup(Context context) - throws IOException, InterruptedException { - super.setup(context); - inputSplit = (MainframeDatasetInputSplit)context.getInputSplit(); - mos = new MultipleOutputs(context); - numberOfRecords = 0; - outkey = new Text(); - } - - @Override - protected void cleanup(Context context) - throws IOException, InterruptedException { - super.cleanup(context); - mos.close(); - context.getCounter( - ConfigurationConstants.COUNTER_GROUP_MAPRED_TASK_COUNTERS, - ConfigurationConstants.COUNTER_MAP_OUTPUT_RECORDS) - .increment(numberOfRecords); + protected Text createOutKey(SqoopRecord sqoopRecord) { + Text result = new Text(); + result.set(sqoopRecord.toString()); + return result; } } diff --git a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeImportJob.java b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeImportJob.java index 8ef30d38..90dc2ddd 100644 --- a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeImportJob.java +++ b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeImportJob.java @@ -22,14 +22,19 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; - import org.apache.sqoop.SqoopOptions; import org.apache.sqoop.manager.ImportJobContext; - +import org.apache.sqoop.mapreduce.DBWritable; import org.apache.sqoop.mapreduce.DataDrivenImportJob; +import org.apache.sqoop.mapreduce.RawKeyTextOutputFormat; +import org.apache.sqoop.mapreduce.ByteKeyOutputFormat; import org.apache.sqoop.mapreduce.parquet.ParquetImportJobConfigurator; /** @@ -46,7 +51,10 @@ public MainframeImportJob(final SqoopOptions opts, ImportJobContext context, Par @Override protected Class getMapperClass() { - if (options.getFileLayout() == SqoopOptions.FileLayout.TextFile) { + if (SqoopOptions.FileLayout.BinaryFile.equals(options.getFileLayout())) { + LOG.debug("Using MainframeDatasetBinaryImportMapper"); + return MainframeDatasetBinaryImportMapper.class; + } else if (options.getFileLayout() == SqoopOptions.FileLayout.TextFile) { return MainframeDatasetImportMapper.class; } else { return super.getMapperClass(); @@ -66,13 +74,58 @@ protected void configureInputFormat(Job job, String tableName, job.getConfiguration().set( MainframeConfiguration.MAINFRAME_INPUT_DATASET_TAPE, options.getMainframeInputDatasetTape().toString()); + if (SqoopOptions.FileLayout.BinaryFile == options.getFileLayout()) { + job.getConfiguration().set( + MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE, + MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE_BINARY); + job.getConfiguration().setInt( + MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_BUFFER_SIZE, + options.getBufferSize() + ); + } else { + job.getConfiguration().set( + MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE, + MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE_ASCII); + } + } @Override protected void configureOutputFormat(Job job, String tableName, String tableClassName) throws ClassNotFoundException, IOException { super.configureOutputFormat(job, tableName, tableClassName); + job.getConfiguration().set( + MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE, + options.getMainframeFtpTransferMode()); + // use the default outputformat LazyOutputFormat.setOutputFormatClass(job, getOutputFormatClass()); } + @Override + protected void configureMapper(Job job, String tableName, + String tableClassName) throws IOException { + super.configureMapper(job, tableName, tableClassName); + if (SqoopOptions.FileLayout.BinaryFile == options.getFileLayout()) { + job.setOutputKeyClass(BytesWritable.class); + job.setOutputValueClass(NullWritable.class); + + // this is required as code generated class assumes setField method takes String + // and will fail with ClassCastException when a byte array is passed instead + // java.lang.ClassCastException: [B cannot be cast to java.lang.String + Configuration conf = job.getConfiguration(); + conf.setClass(org.apache.sqoop.mapreduce.db.DBConfiguration.INPUT_CLASS_PROPERTY, MainframeDatasetBinaryRecord.class, + DBWritable.class); + } + } + + @Override + protected Class getOutputFormatClass() { + if (options.getFileLayout() == SqoopOptions.FileLayout.TextFile) { + return RawKeyTextOutputFormat.class; + } else if (options.getFileLayout() + == SqoopOptions.FileLayout.BinaryFile) { + return ByteKeyOutputFormat.class; + } + return null; + } } diff --git a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java index 9dcbdd59..b47be72c 100644 --- a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java +++ b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java @@ -114,6 +114,7 @@ public abstract class BaseSqoopTool extends org.apache.sqoop.tool.SqoopTool { public static final String FMT_TEXTFILE_ARG = "as-textfile"; public static final String FMT_AVRODATAFILE_ARG = "as-avrodatafile"; public static final String FMT_PARQUETFILE_ARG = "as-parquetfile"; + public static final String FMT_BINARYFILE_ARG = "as-binaryfile"; public static final String HIVE_IMPORT_ARG = "hive-import"; public static final String HIVE_TABLE_ARG = "hive-table"; public static final String HIVE_DATABASE_ARG = "hive-database"; diff --git a/src/java/org/apache/sqoop/tool/ImportTool.java b/src/java/org/apache/sqoop/tool/ImportTool.java index 478f1748..13973373 100644 --- a/src/java/org/apache/sqoop/tool/ImportTool.java +++ b/src/java/org/apache/sqoop/tool/ImportTool.java @@ -744,6 +744,10 @@ protected RelatedOptions getImportOptions() { .withDescription("Imports data to Parquet files") .withLongOpt(BaseSqoopTool.FMT_PARQUETFILE_ARG) .create()); + importOpts.addOption(OptionBuilder + .withDescription("Imports data to Binary files") + .withLongOpt(BaseSqoopTool.FMT_BINARYFILE_ARG) + .create()); importOpts.addOption(OptionBuilder.withArgName("n") .hasArg().withDescription("Use 'n' map tasks to import in parallel") .withLongOpt(NUM_MAPPERS_ARG) diff --git a/src/java/org/apache/sqoop/tool/MainframeImportTool.java b/src/java/org/apache/sqoop/tool/MainframeImportTool.java index cdd9d6d0..fbc8c3db 100644 --- a/src/java/org/apache/sqoop/tool/MainframeImportTool.java +++ b/src/java/org/apache/sqoop/tool/MainframeImportTool.java @@ -20,6 +20,7 @@ import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.util.ToolRunner; @@ -40,6 +41,7 @@ public class MainframeImportTool extends ImportTool { public static final String DS_ARG = "dataset"; public static final String DS_TYPE_ARG = "datasettype"; public static final String DS_TAPE_ARG = "tape"; + public static final String BUFFERSIZE_ARG = "buffersize"; public MainframeImportTool() { super("import-mainframe", false); @@ -82,6 +84,14 @@ protected RelatedOptions getImportOptions() { .withDescription("Imports data as plain text (default)") .withLongOpt(FMT_TEXTFILE_ARG) .create()); + importOpts.addOption(OptionBuilder + .withDescription("Imports data as binary") + .withLongOpt(FMT_BINARYFILE_ARG) + .create()); + importOpts.addOption(OptionBuilder + .hasArg().withDescription("Sets buffer size for binary import in bytes (default=32kB)") + .withLongOpt(BUFFERSIZE_ARG) + .create()); importOpts.addOption(OptionBuilder.withArgName("n") .hasArg().withDescription("Use 'n' map tasks to import in parallel") .withLongOpt(NUM_MAPPERS_ARG) @@ -168,6 +178,28 @@ public void applyOptions(CommandLine in, SqoopOptions out) // set default tape value to false out.setMainframeInputDatasetTape("false"); } + if (in.hasOption(FMT_BINARYFILE_ARG)) { + out.setMainframeFtpTransferMode(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE_BINARY); + out.setFileLayout(SqoopOptions.FileLayout.BinaryFile); + } else { + // set default transfer mode to ascii + out.setMainframeFtpTransferMode(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE_ASCII); + out.setFileLayout(SqoopOptions.FileLayout.TextFile); + } + + if (in.hasOption(BUFFERSIZE_ARG)) { + // if we specify --buffersize set the buffer size + int bufSize = Integer.valueOf(in.getOptionValue(BUFFERSIZE_ARG)); + if (bufSize > 0) { + out.setBufferSize(bufSize); + } + else { + out.setBufferSize(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE); + } + } else { + // set the default buffer size to 32kB + out.setBufferSize(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE); + } } @Override @@ -190,6 +222,17 @@ protected void validateImportOptions(SqoopOptions options) throw new InvalidOptionsException( "--" + DS_TAPE_ARG + " specified is invalid. " + HELP_STR); } + /* only allow FileLayout.BinaryFile to be selected for mainframe import */ + if (SqoopOptions.FileLayout.BinaryFile.equals(options.getFileLayout()) && StringUtils.isEmpty(options.getMainframeInputDatasetName())) { + throw new InvalidOptionsException("--as-binaryfile should only be used with import-mainframe module."); + } + + // only allow buffer size to be set different to default when binary file is selected + // in any case, if --as-binaryfile isn't selected, --buffersize parameter is harmless + if (!SqoopOptions.FileLayout.BinaryFile.equals(options.getFileLayout()) && !MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE.equals(options.getBufferSize())) { + throw new InvalidOptionsException("--buffersize should only be used with --as-binaryfile parameter."); + } + super.validateImportOptions(options); } } diff --git a/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java b/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java index 95bc0ecb..654721e3 100644 --- a/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java +++ b/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.List; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.net.PrintCommandListener; import org.apache.commons.net.ftp.FTP; import org.apache.commons.net.ftp.FTPClient; @@ -207,8 +208,18 @@ public static FTPClient getFTPConnection(Configuration conf) throw new IOException("Could not login to server " + server + ":" + ftp.getReplyString()); } - // set ASCII transfer mode - ftp.setFileType(FTP.ASCII_FILE_TYPE); + // set transfer mode + String transferMode = conf.get(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE); + if (StringUtils.equalsIgnoreCase(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE_BINARY,transferMode)) { + LOG.info("Setting FTP transfer mode to binary"); + // ftp.setFileTransferMode(FTP.BINARY_FILE_TYPE) doesn't work for MVS, it throws a syntax error + ftp.sendCommand("TYPE I"); + // this is IMPORTANT - otherwise it will convert 0x0d0a to 0x0a = dropping bytes + ftp.setFileType(FTP.BINARY_FILE_TYPE); + } else { + LOG.info("Defaulting FTP transfer mode to ascii"); + ftp.setFileTransferMode(FTP.ASCII_FILE_TYPE); + } // Use passive mode as default. ftp.enterLocalPassiveMode(); LOG.info("System type detected: " + ftp.getSystemType()); diff --git a/src/test/org/apache/sqoop/manager/mainframe/MainframeManagerImportTest.java b/src/test/org/apache/sqoop/manager/mainframe/MainframeManagerImportTest.java index 041dfb78..3b8ed236 100644 --- a/src/test/org/apache/sqoop/manager/mainframe/MainframeManagerImportTest.java +++ b/src/test/org/apache/sqoop/manager/mainframe/MainframeManagerImportTest.java @@ -113,6 +113,42 @@ public void testImportGdgText() throws IOException { doImportAndVerify(MainframeTestUtil.GDG_DATASET_NAME, MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_GDG, files); } + @Test + public void testImportGdgBinary() throws IOException { + HashMap files = new HashMap(); + files.put(MainframeTestUtil.GDG_BINARY_DATASET_FILENAME, MainframeTestUtil.EXPECTED_GDG_BINARY_DATASET_MD5); + doImportAndVerify(MainframeTestUtil.GDG_BINARY_DATASET_NAME, MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_GDG, files, "--as-binaryfile"); + } + + @Test + public void testImportGdgBinaryWithBufferSize() throws IOException { + HashMap files = new HashMap(); + files.put(MainframeTestUtil.GDG_BINARY_DATASET_FILENAME, MainframeTestUtil.EXPECTED_GDG_BINARY_DATASET_MD5); + doImportAndVerify(MainframeTestUtil.GDG_BINARY_DATASET_NAME, MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_GDG, files, "--as-binaryfile", "--buffersize", "64000"); + } + + @Test + public void testImportSequential() throws IOException { + // can reuse the same dataset as binary as the dataset is plain text + HashMap files = new HashMap(); + files.put(MainframeTestUtil.SEQ_DATASET_FILENAME, MainframeTestUtil.EXPECTED_SEQ_DATASET_MD5); + doImportAndVerify(MainframeTestUtil.SEQ_DATASET_NAME, MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_SEQUENTIAL, files); + } + + @Test + public void testImportSequentialBinary() throws IOException { + HashMap files = new HashMap(); + files.put(MainframeTestUtil.SEQ_BINARY_DATASET_FILENAME, MainframeTestUtil.EXPECTED_SEQ_BINARY_DATASET_MD5); + doImportAndVerify(MainframeTestUtil.SEQ_BINARY_DATASET_NAME, MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_SEQUENTIAL, files, "--as-binaryfile"); + } + + @Test + public void testImportSequentialBinaryWithBufferSize() throws IOException { + HashMap files = new HashMap(); + files.put(MainframeTestUtil.SEQ_BINARY_DATASET_FILENAME, MainframeTestUtil.EXPECTED_SEQ_BINARY_DATASET_MD5); + doImportAndVerify(MainframeTestUtil.SEQ_BINARY_DATASET_NAME, MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_SEQUENTIAL, files, "--as-binaryfile", "--buffersize", "64000"); + } + private String [] getArgv(String datasetName, String datasetType, String ... extraArgs) { ArrayList args = new ArrayList(); @@ -130,7 +166,6 @@ public void testImportGdgText() throws IOException { args.add(datasetType); if (extraArgs.length > 0) { - args.add("--"); for (String arg : extraArgs) { args.add(arg); } diff --git a/src/test/org/apache/sqoop/manager/mainframe/MainframeTestUtil.java b/src/test/org/apache/sqoop/manager/mainframe/MainframeTestUtil.java index f28ff36c..9f86f6cd 100644 --- a/src/test/org/apache/sqoop/manager/mainframe/MainframeTestUtil.java +++ b/src/test/org/apache/sqoop/manager/mainframe/MainframeTestUtil.java @@ -41,4 +41,31 @@ public class MainframeTestUtil { public static final String EXPECTED_GDG_DATASET_MD5 = System.getProperty( "sqoop.test.mainframe.ftp.dataset.gdg.md5", "f0d0d171fdb8a03dbc1266ed179d7093"); + public static final String GDG_BINARY_DATASET_NAME = System.getProperty( + "sqoop.test.mainframe.ftp.binary.dataset.gdg", + "TSODIQ1.FOLDER"); + public static final String GDG_BINARY_DATASET_FILENAME = System.getProperty( + "sqoop.test.mainframe.ftp.binary.dataset.gdg.filename", + "G0002V45"); + public static final String EXPECTED_GDG_BINARY_DATASET_MD5 = System.getProperty( + "sqoop.test.mainframe.ftp.binary.dataset.gdg.md5", + "43eefbe34e466dd3f65a3e867a60809a"); + public static final String SEQ_DATASET_NAME = System.getProperty( + "sqoop.test.mainframe.ftp.dataset.seq", + "TSODIQ1.GDGTEXT.G0001V43"); + public static final String SEQ_DATASET_FILENAME = System.getProperty( + "sqoop.test.mainframe.ftp.dataset.seq.filename", + "G0001V43"); + public static final String EXPECTED_SEQ_DATASET_MD5 = System.getProperty( + "sqoop.test.mainframe.ftp.dataset.seq.md5", + "f0d0d171fdb8a03dbc1266ed179d7093"); + public static final String SEQ_BINARY_DATASET_NAME = System.getProperty( + "sqoop.test.mainframe.ftp.binary.dataset.seq", + "TSODIQ1.FOLDER.FOLDERTXT"); + public static final String SEQ_BINARY_DATASET_FILENAME = System.getProperty( + "sqoop.test.mainframe.ftp.binary.dataset.seq.filename", + "FOLDERTXT"); + public static final String EXPECTED_SEQ_BINARY_DATASET_MD5 = System.getProperty( + "sqoop.test.mainframe.ftp.binary.dataset.seq.md5", + "1591c0fcc718fda7e9c1f3561d232b2b"); } diff --git a/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeDatasetBinaryRecord.java b/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeDatasetBinaryRecord.java new file mode 100644 index 00000000..b4cba28c --- /dev/null +++ b/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeDatasetBinaryRecord.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.mapreduce.mainframe; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.net.ftp.FTPClient; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.IOException; +import java.io.InputStream; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.doReturn; + +public class TestMainframeDatasetBinaryRecord { + + private MainframeDatasetFTPRecordReader ftpRecordReader; + private InputStream is; + private FTPClient ftp; + private final String DATASET_NAME = "dummy.ds"; + private final String DATASET_TYPE = "g"; + private static final Log LOG = LogFactory.getLog( + TestMainframeDatasetBinaryRecord.class.getName()); + + @Before + public void setUp() throws IOException, InterruptedException { + MainframeDatasetFTPRecordReader rdr = new MainframeDatasetFTPRecordReader(); + Configuration conf; + MainframeDatasetInputSplit split; + TaskAttemptContext context; + ftpRecordReader = spy(rdr); + is = mock(InputStream.class); + ftp = mock(FTPClient.class); + split = mock(MainframeDatasetInputSplit.class); + context = mock(TaskAttemptContext.class); + conf = new Configuration(); + when(ftp.retrieveFileStream(any(String.class))).thenReturn(is); + when(ftp.changeWorkingDirectory(any(String.class))).thenReturn(true); + doReturn("file1").when(ftpRecordReader).getNextDataset(); + when(split.getNextDataset()).thenReturn(DATASET_NAME); + when(ftpRecordReader.getNextDataset()).thenReturn(DATASET_NAME); + conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME,DATASET_NAME); + conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE,DATASET_TYPE); + conf.setInt(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_BUFFER_SIZE,MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE); + ftpRecordReader.initialize(ftp, conf); + } + + // Mock the inputstream.read method and manipulate the function parameters + protected Answer returnSqoopRecord(final int byteLength) { + return new Answer() { + public Object answer(InvocationOnMock invocation) { + return byteLength; + } + }; + } + + @Test + public void testGetNextBinaryRecordForFullRecord() { + + MainframeDatasetBinaryRecord record = new MainframeDatasetBinaryRecord(); + try { + when(is.read(any(byte[].class),anyInt(),anyInt())) + .thenAnswer(returnSqoopRecord(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE)) + .thenReturn(-1); + when(ftp.completePendingCommand()).thenReturn(true); + Assert.assertTrue(ftpRecordReader.getNextBinaryRecord(record)); + Assert.assertFalse(record.getFieldMap().values().isEmpty()); + Assert.assertEquals(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE.intValue(),((byte[])record.getFieldMap().values().iterator().next()).length); + } catch (IOException ioe) { + LOG.error("Issue with reading 1 full binary buffer record", ioe); + throw new RuntimeException(ioe); + } + } + + @Test + public void testGetNextBinaryRecordForPartialRecord() { + int expectedBytesRead = 10; + MainframeDatasetBinaryRecord record = new MainframeDatasetBinaryRecord(); + try { + when(is.read(any(byte[].class),anyInt(),anyInt())) + .thenAnswer(returnSqoopRecord(10)) + .thenReturn(-1); + when(ftp.completePendingCommand()).thenReturn(true); + Assert.assertTrue(ftpRecordReader.getNextBinaryRecord(record)); + Assert.assertFalse(record.getFieldMap().values().isEmpty()); + Assert.assertEquals(expectedBytesRead,(((byte[])record.getFieldMap().values().iterator().next()).length)); + } catch (IOException ioe) { + LOG.error("Issue with reading 10 byte binary record", ioe); + throw new RuntimeException(ioe); + } + } + + @Test + public void testGetNextBinaryRecordFor2Records() { + // test 1 full record, and 1 partial + int expectedBytesRead = 10; + MainframeDatasetBinaryRecord record = new MainframeDatasetBinaryRecord(); + try { + when(is.read(any(byte[].class),anyInt(),anyInt())) + .thenAnswer(returnSqoopRecord(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE)) + .thenAnswer(returnSqoopRecord(10)) + .thenReturn(-1); + when(ftp.completePendingCommand()).thenReturn(true); + Assert.assertTrue(ftpRecordReader.getNextBinaryRecord(record)); + Assert.assertFalse(record.getFieldMap().values().isEmpty()); + Assert.assertTrue(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE.equals((((byte[])record.getFieldMap().values().iterator().next()).length))); + record = new MainframeDatasetBinaryRecord(); + Assert.assertTrue(ftpRecordReader.getNextBinaryRecord(record)); + Assert.assertFalse(record.getFieldMap().values().isEmpty()); + Assert.assertEquals(expectedBytesRead,(((byte[])record.getFieldMap().values().iterator().next()).length)); + } catch (IOException ioe) { + LOG.error("Issue with reading 1 full binary buffer record followed by 1 partial binary buffer record", ioe); + throw new RuntimeException(ioe); + } + } + + @Test + public void testGetNextBinaryRecordForMultipleReads() { + // test reading 1 record where the stream returns less than a full buffer + MainframeDatasetBinaryRecord record = new MainframeDatasetBinaryRecord(); + try { + when(is.read(any(byte[].class),anyInt(),anyInt())) + .thenAnswer(returnSqoopRecord(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE /2)) + .thenAnswer(returnSqoopRecord(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE /2)) + .thenReturn(-1); + when(ftp.completePendingCommand()).thenReturn(true); + Assert.assertTrue(ftpRecordReader.getNextBinaryRecord(record)); + Assert.assertFalse(record.getFieldMap().values().isEmpty()); + Assert.assertEquals(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE.intValue(),((byte[])record.getFieldMap().values().iterator().next()).length); + record = new MainframeDatasetBinaryRecord(); + Assert.assertFalse(ftpRecordReader.getNextBinaryRecord(record)); + Assert.assertNull((((byte[])record.getFieldMap().values().iterator().next()))); + } catch (IOException ioe) { + LOG.error("Issue with verifying reading partial buffer binary records", ioe); + throw new RuntimeException(ioe); + } + } +} diff --git a/src/test/org/apache/sqoop/tool/TestMainframeImportTool.java b/src/test/org/apache/sqoop/tool/TestMainframeImportTool.java index 0b0c6c34..c2edc534 100644 --- a/src/test/org/apache/sqoop/tool/TestMainframeImportTool.java +++ b/src/test/org/apache/sqoop/tool/TestMainframeImportTool.java @@ -22,18 +22,18 @@ import java.lang.reflect.Method; import org.apache.commons.cli.ParseException; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.sqoop.cli.RelatedOptions; import org.apache.sqoop.mapreduce.mainframe.MainframeConfiguration; import org.junit.After; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import org.apache.sqoop.SqoopOptions; import org.apache.sqoop.SqoopOptions.InvalidOptionsException; import org.apache.sqoop.cli.ToolOptions; import org.apache.sqoop.testutil.BaseSqoopTestCase; +import org.junit.rules.ExpectedException; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -43,15 +43,16 @@ public class TestMainframeImportTool extends BaseSqoopTestCase { - private static final Log LOG = LogFactory.getLog(TestMainframeImportTool.class - .getName()); - private MainframeImportTool mfImportTool; + private ToolOptions toolOptions; + private SqoopOptions sqoopOption; @Before public void setUp() { mfImportTool = new MainframeImportTool(); + toolOptions = new ToolOptions(); + sqoopOption = new SqoopOptions(); } @After @@ -183,4 +184,58 @@ public void testTapeOptionInvalidReturnsFalse() throws ParseException, InvalidOp Boolean isTape = sqoopOption.getMainframeInputDatasetTape(); assert(isTape != null && isTape.toString().equals("false")); } + + @Test + public void testFtpTransferModeAscii() throws ParseException, InvalidOptionsException { + String[] args = new String[] { "--dataset", "mydatasetname", "--as-textfile" }; + configureAndValidateOptions(args); + assertEquals(SqoopOptions.FileLayout.TextFile,sqoopOption.getFileLayout()); + } + @Test + public void testFtpTransferModeDefaultsToAscii() throws ParseException, InvalidOptionsException { + String[] args = new String[] { "--dataset", "mydatasetname" }; + configureAndValidateOptions(args); + assertEquals(SqoopOptions.FileLayout.TextFile,sqoopOption.getFileLayout()); + } + + @Test + public void testAsBinaryFileSetsCorrectFileLayoutAndDefaultBufferSize() throws ParseException, InvalidOptionsException { + String[] args = new String[] { "--dataset", "mydatasetname", "--as-binaryfile" }; + configureAndValidateOptions(args); + assertEquals(SqoopOptions.FileLayout.BinaryFile,sqoopOption.getFileLayout()); + assertEquals(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE, sqoopOption.getBufferSize()); + } + + @Test + public void testSetBufferSize() throws ParseException, InvalidOptionsException { + final Integer EXPECTED_BUFFER = 1024; + String[] args = new String[] { "--dataset", "mydatasetname", "--as-binaryfile", "--buffersize", EXPECTED_BUFFER.toString() }; + configureAndValidateOptions(args); + assertEquals(SqoopOptions.FileLayout.BinaryFile,sqoopOption.getFileLayout()); + assertEquals(EXPECTED_BUFFER, sqoopOption.getBufferSize()); + } + + @Rule + public final ExpectedException exception = ExpectedException.none(); + + @Test + public void testBufferSizeWithoutBinaryThrowsException() throws ParseException, InvalidOptionsException { + final Integer EXPECTED_BUFFER = 1024; + String[] args = new String[] { "--buffersize", EXPECTED_BUFFER.toString() }; + exception.expect(InvalidOptionsException.class); + configureAndValidateOptions(args); + } + + @Test + public void testInvalidBufferSizeThrowsNumberFormatException() throws ParseException, InvalidOptionsException { + String[] args = new String[] { "--buffersize", "invalidinteger" }; + exception.expect(NumberFormatException.class); + configureAndValidateOptions(args); + } + + private void configureAndValidateOptions(String[] args) throws ParseException, InvalidOptionsException { + mfImportTool.configureOptions(toolOptions); + sqoopOption = mfImportTool.parseArguments(args, null, sqoopOption, false); + mfImportTool.validateImportOptions(sqoopOption); + } }