diff --git a/build.xml b/build.xml
index 084823cf..cd2e9e29 100644
--- a/build.xml
+++ b/build.xml
@@ -253,6 +253,15 @@
+
+
+
+
+
+
+
+
+
@@ -890,10 +899,19 @@
+
+
+
+
+
+
+
+
+
diff --git a/src/docs/user/import-mainframe.txt b/src/docs/user/import-mainframe.txt
index abeb7cde..3ecfb7e4 100644
--- a/src/docs/user/import-mainframe.txt
+++ b/src/docs/user/import-mainframe.txt
@@ -49,6 +49,7 @@ Argument Description
+\--as-sequencefile+ Imports data to SequenceFiles
+\--as-textfile+ Imports data as plain text (default)
+\--as-parquetfile+ Imports data to Parquet Files
++\--as-binaryfile+ Imports data as binary files
+\--delete-target-dir+ Delete the import target directory\
if it exists
+-m,\--num-mappers + Use 'n' map tasks to import in parallel
@@ -193,6 +194,26 @@ $ sqoop import-mainframe --dataset SomePDS --jar-file mydatatypes.jar \
This command will load the +SomePDSType+ class out of +mydatatypes.jar+.
+Support for Generation Data Group and Sequential data sets.
+This can be specified with the --datasettype option followed by one of:
+'p' for partitioned dataset (default)
+'g' for generation data group dataset
+'s' for sequential dataset
+
+In the case of generation data group datasets, Sqoop will retrieve just the last or
+latest file (or generation).
+
+In the case of sequential datasets, Sqoop will retrieve just the file specified.
+
+Support of datasets that are stored on tape volumes by specifying --tape true.
+
+By default, mainframe datasets are assumed to be plain text. Attempting to transfer
+binary datasets using this method will result in data corruption.
+Support for binary datasets by specifying --as-binaryfile and optionally --buffersize followed by
+buffer size specified in bytes. By default, --buffersize is set to 32760 bytes. Altering buffersize
+will alter the number of records Sqoop reports to have imported. This is because it reads the
+binary dataset in chunks specified by buffersize. Larger buffer size means lower number of records.
+
Additional Import Configuration Properties
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
There are some additional properties which can be configured by modifying
@@ -228,6 +249,23 @@ $ sqoop import-mainframe --connect z390 --dataset EMPLOYEES \
Enter password: (hidden)
----
+Import of a tape based generation data group dataset using a password alias and writing out to
+an intermediate directory (--outdir) before moving it to (--target-dir).
+----
+$ sqoop import-mainframe --dataset SomeGdg --connect --username myuser --password-alias \
+ mypasswordalias --datasettype g --tape true --outdir /tmp/imported/sqoop \
+ --target-dir /data/imported/mainframe/SomeGdg
+----
+
+Import of a tape based binary generation data group dataset with a buffer size of 64000 using a
+password alias and writing out to an intermediate directory (--outdir) before moving it
+to (--target-dir).
+----
+$ sqoop import-mainframe --dataset SomeGdg --connect --username myuser --password-alias \
+ mypasswordalias --datasettype g --tape true --as-binaryfile --buffersize 64000 --outdir /tmp/imported/sqoop \
+ --target-dir /data/imported/mainframe/SomeGdg
+----
+
Controlling the import parallelism (using 8 parallel tasks):
----
diff --git a/src/java/org/apache/sqoop/SqoopOptions.java b/src/java/org/apache/sqoop/SqoopOptions.java
index f97dbfdf..f06872f9 100644
--- a/src/java/org/apache/sqoop/SqoopOptions.java
+++ b/src/java/org/apache/sqoop/SqoopOptions.java
@@ -87,7 +87,8 @@ public enum FileLayout {
TextFile,
SequenceFile,
AvroDataFile,
- ParquetFile
+ ParquetFile,
+ BinaryFile
}
/**
@@ -362,7 +363,12 @@ public String toString() {
// Indicates if the data set is on tape to use different FTP parser
@StoredAsProperty("mainframe.input.dataset.tape")
private String mainframeInputDatasetTape;
-
+ // Indicates if binary or ascii FTP transfer mode should be used
+ @StoredAsProperty("mainframe.ftp.transfermode")
+ private String mainframeFtpTransferMode;
+ // Buffer size to use when using binary FTP transfer mode
+ @StoredAsProperty("mainframe.ftp.buffersize")
+ private Integer bufferSize;
// Accumulo home directory
private String accumuloHome; // not serialized to metastore.
// Zookeeper home directory
@@ -1162,6 +1168,11 @@ private void initDefaults(Configuration baseConfiguration) {
this.escapeColumnMappingEnabled = true;
this.parquetConfiguratorImplementation = HADOOP;
+
+ // set default transfer mode to ascii
+ this.mainframeFtpTransferMode = MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE_ASCII;
+ // set default buffer size for mainframe binary transfers
+ this.bufferSize = MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE;
}
/**
@@ -2499,6 +2510,23 @@ public Boolean getMainframeInputDatasetTape() {
public void setMainframeInputDatasetTape(String txtIsFromTape) {
mainframeInputDatasetTape = Boolean.valueOf(Boolean.parseBoolean(txtIsFromTape)).toString();
}
+ // returns the buffer size set.
+ public Integer getBufferSize() {
+ return bufferSize;
+ }
+
+ public void setMainframeFtpTransferMode(String transferMode) {
+ mainframeFtpTransferMode = transferMode;
+ }
+
+ public String getMainframeFtpTransferMode() {
+ return mainframeFtpTransferMode;
+ }
+
+ // sets the binary transfer buffer size, defaults to MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE
+ public void setBufferSize(int buf) {
+ bufferSize = buf;
+ }
public static String getAccumuloHomeDefault() {
// Set this with $ACCUMULO_HOME, but -Daccumulo.home can override.
diff --git a/src/java/org/apache/sqoop/mapreduce/ByteKeyOutputFormat.java b/src/java/org/apache/sqoop/mapreduce/ByteKeyOutputFormat.java
new file mode 100644
index 00000000..f7427dbd
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/ByteKeyOutputFormat.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * An {@link OutputFormat} that writes binary files.
+ * Only writes the key. Does not write any delimiter/newline after the key.
+ */
+public class ByteKeyOutputFormat extends RawKeyTextOutputFormat {
+
+ // currently don't support compression
+ private static final String FILE_EXTENSION = "";
+
+ public RecordWriter getRecordWriter(TaskAttemptContext context)
+ throws IOException {
+ DataOutputStream ostream = getFSDataOutputStream(context,FILE_EXTENSION);
+ return new KeyRecordWriters.BinaryKeyRecordWriter(ostream);
+ }
+
+}
diff --git a/src/java/org/apache/sqoop/mapreduce/KeyRecordWriters.java b/src/java/org/apache/sqoop/mapreduce/KeyRecordWriters.java
new file mode 100644
index 00000000..9630a818
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/KeyRecordWriters.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+public class KeyRecordWriters {
+ /**
+ * RecordWriter to write to plain text files.
+ */
+
+ public static class GenericRecordWriter extends RecordWriter {
+ private static final String UTF8 = "UTF-8";
+
+ protected DataOutputStream out;
+
+ /**
+ * Write the object to the byte stream, handling Text as a special
+ * case.
+ *
+ * @param o the object to print
+ * @param value the corresponding value for key o
+ * @throws IOException if the write throws, we pass it on
+ */
+ protected void writeObject(Object o,Object value) throws IOException {
+ if (o instanceof Text) {
+ Text to = (Text) o;
+ out.write(to.getBytes(), 0, to.getLength());
+ } else {
+ out.write(o.toString().getBytes(UTF8));
+ }
+ }
+
+ @Override
+ public synchronized void write(K key, V value) throws IOException, InterruptedException {
+ writeObject(key,value);
+ }
+
+ @Override
+ public synchronized void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+ out.close();
+ }
+ }
+
+ public static class RawKeyRecordWriter extends GenericRecordWriter {
+
+ public RawKeyRecordWriter(DataOutputStream out) {
+ this.out = out;
+ }
+ }
+
+ /**
+ * RecordWriter to write to plain text files.
+ */
+ public static class BinaryKeyRecordWriter extends GenericRecordWriter {
+
+ public BinaryKeyRecordWriter(DataOutputStream out) {
+ this.out = out;
+ }
+
+ /**
+ * Write the object to the byte stream, handling Text as a special
+ * case.
+ * @param o the object to print
+ * @throws IOException if the write throws, we pass it on
+ */
+ @Override
+ protected void writeObject(Object o, Object value) throws IOException {
+ if (o instanceof BytesWritable) {
+ BytesWritable to = (BytesWritable) o;
+ out.write(to.getBytes(), 0, to.getLength());
+ }
+ }
+ }
+}
diff --git a/src/java/org/apache/sqoop/mapreduce/RawKeyTextOutputFormat.java b/src/java/org/apache/sqoop/mapreduce/RawKeyTextOutputFormat.java
index fec34f21..8e81aa43 100644
--- a/src/java/org/apache/sqoop/mapreduce/RawKeyTextOutputFormat.java
+++ b/src/java/org/apache/sqoop/mapreduce/RawKeyTextOutputFormat.java
@@ -24,7 +24,6 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
@@ -38,47 +37,15 @@
*/
public class RawKeyTextOutputFormat extends FileOutputFormat {
- /**
- * RecordWriter to write to plain text files.
- */
- public static class RawKeyRecordWriter extends RecordWriter {
-
- private static final String UTF8 = "UTF-8";
-
- protected DataOutputStream out;
-
- public RawKeyRecordWriter(DataOutputStream out) {
- this.out = out;
- }
-
- /**
- * Write the object to the byte stream, handling Text as a special
- * case.
- * @param o the object to print
- * @throws IOException if the write throws, we pass it on
- */
- private void writeObject(Object o) throws IOException {
- if (o instanceof Text) {
- Text to = (Text) o;
- out.write(to.getBytes(), 0, to.getLength());
- } else {
- out.write(o.toString().getBytes(UTF8));
- }
- }
-
- public synchronized void write(K key, V value) throws IOException {
- writeObject(key);
- }
-
- public synchronized void close(TaskAttemptContext context)
- throws IOException {
- out.close();
- }
-
+ protected FSDataOutputStream getFSDataOutputStream(TaskAttemptContext context, String ext) throws IOException {
+ Configuration conf = context.getConfiguration();
+ Path file = getDefaultWorkFile(context, ext);
+ FileSystem fs = file.getFileSystem(conf);
+ FSDataOutputStream fileOut = fs.create(file, false);
+ return fileOut;
}
- public RecordWriter getRecordWriter(TaskAttemptContext context)
- throws IOException {
+ protected DataOutputStream getOutputStream(TaskAttemptContext context) throws IOException {
boolean isCompressed = getCompressOutput(context);
Configuration conf = context.getConfiguration();
String ext = "";
@@ -93,17 +60,18 @@ public RecordWriter getRecordWriter(TaskAttemptContext context)
ext = codec.getDefaultExtension();
}
- Path file = getDefaultWorkFile(context, ext);
- FileSystem fs = file.getFileSystem(conf);
- FSDataOutputStream fileOut = fs.create(file, false);
+ FSDataOutputStream fileOut = getFSDataOutputStream(context,ext);
DataOutputStream ostream = fileOut;
if (isCompressed) {
ostream = new DataOutputStream(codec.createOutputStream(fileOut));
}
-
- return new RawKeyRecordWriter(ostream);
+ return ostream;
}
-}
-
+ public RecordWriter getRecordWriter(TaskAttemptContext context)
+ throws IOException {
+ DataOutputStream ostream = getOutputStream(context);
+ return new KeyRecordWriters.RawKeyRecordWriter(ostream);
+ }
+}
\ No newline at end of file
diff --git a/src/java/org/apache/sqoop/mapreduce/mainframe/AbstractMainframeDatasetImportMapper.java b/src/java/org/apache/sqoop/mapreduce/mainframe/AbstractMainframeDatasetImportMapper.java
new file mode 100644
index 00000000..9304fe2f
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/mainframe/AbstractMainframeDatasetImportMapper.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.mainframe;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.sqoop.config.ConfigurationConstants;
+import org.apache.sqoop.lib.SqoopRecord;
+import org.apache.sqoop.mapreduce.AutoProgressMapper;
+
+import java.io.IOException;
+
+public abstract class AbstractMainframeDatasetImportMapper
+ extends AutoProgressMapper {
+
+ private MainframeDatasetInputSplit inputSplit;
+ private MultipleOutputs multiFileWriter;
+ private long numberOfRecords;
+
+ public void map(LongWritable key, SqoopRecord val, Context context)
+ throws IOException, InterruptedException {
+ String dataset = inputSplit.getCurrentDataset();
+ numberOfRecords++;
+ multiFileWriter.write(createOutKey(val), NullWritable.get(), dataset);
+ }
+
+ @Override
+ protected void setup(Context context)
+ throws IOException, InterruptedException {
+ super.setup(context);
+ inputSplit = (MainframeDatasetInputSplit)context.getInputSplit();
+ multiFileWriter = new MultipleOutputs<>(context);
+ numberOfRecords = 0;
+ }
+
+ @Override
+ protected void cleanup(Context context)
+ throws IOException, InterruptedException {
+ super.cleanup(context);
+ multiFileWriter.close();
+ context.getCounter(
+ ConfigurationConstants.COUNTER_GROUP_MAPRED_TASK_COUNTERS,
+ ConfigurationConstants.COUNTER_MAP_OUTPUT_RECORDS)
+ .increment(numberOfRecords);
+ }
+
+ protected abstract KEY createOutKey(SqoopRecord sqoopRecord);
+}
\ No newline at end of file
diff --git a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeConfiguration.java b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeConfiguration.java
index ea54b07f..9d6a2fe7 100644
--- a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeConfiguration.java
+++ b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeConfiguration.java
@@ -33,4 +33,15 @@ public class MainframeConfiguration
public static final String MAINFRAME_INPUT_DATASET_TAPE = "mainframe.input.dataset.tape";
public static final String MAINFRAME_FTP_FILE_ENTRY_PARSER_CLASSNAME = "org.apache.sqoop.mapreduce.mainframe.MainframeFTPFileEntryParser";
+
+ public static final String MAINFRAME_FTP_TRANSFER_MODE = "mainframe.ftp.transfermode";
+
+ public static final String MAINFRAME_FTP_TRANSFER_MODE_ASCII = "ascii";
+
+ public static final String MAINFRAME_FTP_TRANSFER_MODE_BINARY = "binary";
+
+ // this is the default buffer size used when doing binary ftp transfers from mainframe
+ public static final Integer MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE = 32760;
+
+ public static final String MAINFRAME_FTP_TRANSFER_BINARY_BUFFER_SIZE = "mainframe.ftp.buffersize";
}
diff --git a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetBinaryImportMapper.java b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetBinaryImportMapper.java
new file mode 100644
index 00000000..b2417b98
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetBinaryImportMapper.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.mainframe;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.sqoop.lib.SqoopRecord;
+
+/**
+ * Mapper that writes mainframe dataset records in binary format to multiple files
+ * based on the key, which is the index of the datasets in the input split.
+ */
+public class MainframeDatasetBinaryImportMapper extends AbstractMainframeDatasetImportMapper {
+
+ @Override
+ protected BytesWritable createOutKey(SqoopRecord sqoopRecord) {
+ BytesWritable result = new BytesWritable();
+ byte[] bytes = (byte[]) sqoopRecord.getFieldMap().entrySet().iterator().next().getValue();
+ result.set(bytes,0, bytes.length);
+ return result;
+ }
+}
\ No newline at end of file
diff --git a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetBinaryRecord.java b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetBinaryRecord.java
new file mode 100644
index 00000000..6e827988
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetBinaryRecord.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.mapreduce.mainframe;
+
+import org.apache.sqoop.lib.DelimiterSet;
+import org.apache.sqoop.lib.LargeObjectLoader;
+import org.apache.hadoop.io.Text;
+import org.apache.sqoop.lib.SqoopRecord;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class MainframeDatasetBinaryRecord extends SqoopRecord {
+
+ private byte[] field;
+
+ public Map getFieldMap() {
+ Map map = new HashMap();
+ map.put("fieldName", field);
+ return map;
+ }
+
+ public void setField(String fieldName, Object fieldVal) {
+ if (fieldVal instanceof byte[]) {
+ field = (byte[]) fieldVal;
+ }
+ }
+
+ public void setField(final byte[] val) {
+ this.field = val;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ in.readFully(field);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.write(field);
+ }
+
+ @Override
+ public void readFields(ResultSet rs) throws SQLException {
+ field = rs.getBytes(1);
+ }
+
+ @Override
+ public void write(PreparedStatement s) throws SQLException {
+ s.setBytes(1, field);
+ }
+
+ @Override
+ public String toString() {
+ return field.toString();
+ }
+
+ @Override
+ public int write(PreparedStatement stmt, int offset) throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public String toString(DelimiterSet delimiters) {
+ return null;
+ }
+
+ @Override
+ public int getClassFormatVersion() {
+ return 0;
+ }
+
+ @Override
+ public int hashCode() {
+ return field.hashCode();
+ }
+
+ public void parse(CharSequence s) {
+ }
+
+ public void parse(Text s) {
+ }
+
+ public void parse(byte[] s) {
+ }
+
+ public void parse(char[] s) {
+ }
+
+ public void parse(ByteBuffer s) {
+ }
+
+ public void parse(CharBuffer s) {
+ }
+
+ @Override
+ public void loadLargeObjects(LargeObjectLoader objLoader) throws SQLException, IOException, InterruptedException {
+
+ }
+}
\ No newline at end of file
diff --git a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetFTPRecordReader.java b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetFTPRecordReader.java
index 1f78384b..78c46656 100644
--- a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetFTPRecordReader.java
+++ b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetFTPRecordReader.java
@@ -18,9 +18,11 @@
package org.apache.sqoop.mapreduce.mainframe;
+import java.io.BufferedInputStream;
import java.io.BufferedReader;
-import java.io.InputStreamReader;
import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.ByteBuffer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -38,6 +40,7 @@ public class MainframeDatasetFTPRecordReader
extends MainframeDatasetRecordReader {
private FTPClient ftp = null;
private BufferedReader datasetReader = null;
+ private BufferedInputStream inputStream = null;
private static final Log LOG = LogFactory.getLog(
MainframeDatasetFTPRecordReader.class.getName());
@@ -50,21 +53,27 @@ public void initialize(InputSplit inputSplit,
Configuration conf = getConfiguration();
ftp = MainframeFTPClientUtils.getFTPConnection(conf);
+ initialize(ftp,conf);
+ }
+
+ public void initialize(FTPClient ftpClient, Configuration conf)
+ throws IOException {
+ ftp = ftpClient;
if (ftp != null) {
- String dsName = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME);
- String dsType = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE);
- MainframeDatasetPath p = null;
- try {
- p = new MainframeDatasetPath(dsName,conf);
- } catch (Exception e) {
- LOG.error(e.getMessage());
- LOG.error("MainframeDatasetPath helper class incorrectly initialised");
- e.printStackTrace();
- }
- if (dsType != null && p != null) {
- dsName = p.getMainframeDatasetFolder();
- }
- ftp.changeWorkingDirectory("'" + dsName + "'");
+ String dsName = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME);
+ String dsType = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE);
+ MainframeDatasetPath p = null;
+ try {
+ p = new MainframeDatasetPath(dsName,conf);
+ } catch (Exception e) {
+ LOG.error(e.getMessage());
+ LOG.error("MainframeDatasetPath helper class incorrectly initialised");
+ e.printStackTrace();
+ }
+ if (dsType != null && p != null) {
+ dsName = p.getMainframeDatasetFolder();
+ }
+ ftp.changeWorkingDirectory("'" + dsName + "'");
}
}
@@ -80,6 +89,10 @@ public void close() throws IOException {
protected boolean getNextRecord(T sqoopRecord) throws IOException {
String line = null;
+ Configuration conf = getConfiguration();
+ if (MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE_BINARY.equals(conf.get(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE))) {
+ return getNextBinaryRecord(sqoopRecord);
+ }
try {
do {
if (datasetReader == null) {
@@ -112,9 +125,85 @@ protected boolean getNextRecord(T sqoopRecord) throws IOException {
return false;
}
+ protected boolean getNextBinaryRecord(T sqoopRecord) throws IOException {
+ Configuration conf = getConfiguration();
+ // typical estimated max size for mainframe record
+ int BUFFER_SIZE = MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE;
+ if (conf != null) {
+ BUFFER_SIZE = conf.getInt(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_BUFFER_SIZE, MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE);
+ }
+ byte[] buf = new byte[BUFFER_SIZE];
+ int bytesRead = -1;
+ int cumulativeBytesRead = 0;
+ try {
+ Boolean streamInited = initInputStream(BUFFER_SIZE);
+ if (!streamInited) {
+ LOG.info("No more datasets to process.");
+ return false;
+ }
+ do {
+ bytesRead = inputStream.read(buf,cumulativeBytesRead,BUFFER_SIZE-cumulativeBytesRead);
+ if (bytesRead == -1) {
+ // EOF
+ closeFtpInputStream();
+ LOG.info("Data transfer completed.");
+ return writeBytesToSqoopRecord(buf,cumulativeBytesRead,sqoopRecord);
+ }
+ cumulativeBytesRead += bytesRead;
+ if (cumulativeBytesRead == BUFFER_SIZE) {
+ return writeBytesToSqoopRecord(buf,cumulativeBytesRead,sqoopRecord);
+ }
+ } while (bytesRead != -1);
+ } catch (IOException ioe) {
+ throw new IOException("IOException during data transfer: " + ioe);
+ }
+ return false;
+ }
+
+ protected Boolean initInputStream(int bufferSize) throws IOException {
+ if (inputStream == null) {
+ String dsName = getNextDataset();
+ if (dsName == null) {
+ LOG.info("No more datasets to process. Returning.");
+ return false;
+ }
+ LOG.info("Attempting to retrieve file stream for: "+dsName);
+ LOG.info("Buffer size: "+bufferSize);
+ inputStream = new BufferedInputStream(ftp.retrieveFileStream(dsName));
+ if (inputStream == null) {
+ throw new IOException("Failed to retrieve FTP file stream.");
+ }
+ }
+ return true;
+ }
+
+ protected void closeFtpInputStream() throws IOException {
+ inputStream.close();
+ inputStream = null;
+ if (!ftp.completePendingCommand()) {
+ throw new IOException("Failed to complete ftp command. FTP Response: "+ftp.getReplyString());
+ }
+ }
+
+ protected Boolean writeBytesToSqoopRecord(byte[] buf, int cumulativeBytesRead, SqoopRecord sqoopRecord) {
+ if (cumulativeBytesRead <= 0) {
+ return false;
+ }
+ ByteBuffer buffer = ByteBuffer.allocate(cumulativeBytesRead);
+ buffer.put(buf,0,cumulativeBytesRead);
+ convertToSqoopRecord(buffer.array(), sqoopRecord);
+ return true;
+ }
+
private void convertToSqoopRecord(String line, SqoopRecord sqoopRecord) {
String fieldName
= sqoopRecord.getFieldMap().entrySet().iterator().next().getKey();
sqoopRecord.setField(fieldName, line);
}
+
+ private void convertToSqoopRecord(byte[] buf, SqoopRecord sqoopRecord) {
+ String fieldName
+ = sqoopRecord.getFieldMap().entrySet().iterator().next().getKey();
+ sqoopRecord.setField(fieldName, buf);
+ }
}
diff --git a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetImportMapper.java b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetImportMapper.java
index 0b7b5b85..0510e829 100644
--- a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetImportMapper.java
+++ b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetImportMapper.java
@@ -18,61 +18,19 @@
package org.apache.sqoop.mapreduce.mainframe;
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
-
-import org.apache.sqoop.config.ConfigurationConstants;
import org.apache.sqoop.lib.SqoopRecord;
-import org.apache.sqoop.mapreduce.AutoProgressMapper;
/**
* Mapper that writes mainframe dataset records in Text format to multiple files
* based on the key, which is the index of the datasets in the input split.
*/
-public class MainframeDatasetImportMapper
- extends AutoProgressMapper {
-
- private static final Log LOG = LogFactory.getLog(
- MainframeDatasetImportMapper.class.getName());
-
- private MainframeDatasetInputSplit inputSplit;
- private MultipleOutputs mos;
- private long numberOfRecords;
- private Text outkey;
-
- public void map(LongWritable key, SqoopRecord val, Context context)
- throws IOException, InterruptedException {
- String dataset = inputSplit.getCurrentDataset();
- outkey.set(val.toString());
- numberOfRecords++;
- mos.write(outkey, NullWritable.get(), dataset);
- }
+public class MainframeDatasetImportMapper extends AbstractMainframeDatasetImportMapper {
@Override
- protected void setup(Context context)
- throws IOException, InterruptedException {
- super.setup(context);
- inputSplit = (MainframeDatasetInputSplit)context.getInputSplit();
- mos = new MultipleOutputs(context);
- numberOfRecords = 0;
- outkey = new Text();
- }
-
- @Override
- protected void cleanup(Context context)
- throws IOException, InterruptedException {
- super.cleanup(context);
- mos.close();
- context.getCounter(
- ConfigurationConstants.COUNTER_GROUP_MAPRED_TASK_COUNTERS,
- ConfigurationConstants.COUNTER_MAP_OUTPUT_RECORDS)
- .increment(numberOfRecords);
+ protected Text createOutKey(SqoopRecord sqoopRecord) {
+ Text result = new Text();
+ result.set(sqoopRecord.toString());
+ return result;
}
}
diff --git a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeImportJob.java b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeImportJob.java
index 8ef30d38..90dc2ddd 100644
--- a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeImportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeImportJob.java
@@ -22,14 +22,19 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
-
import org.apache.sqoop.SqoopOptions;
import org.apache.sqoop.manager.ImportJobContext;
-
+import org.apache.sqoop.mapreduce.DBWritable;
import org.apache.sqoop.mapreduce.DataDrivenImportJob;
+import org.apache.sqoop.mapreduce.RawKeyTextOutputFormat;
+import org.apache.sqoop.mapreduce.ByteKeyOutputFormat;
import org.apache.sqoop.mapreduce.parquet.ParquetImportJobConfigurator;
/**
@@ -46,7 +51,10 @@ public MainframeImportJob(final SqoopOptions opts, ImportJobContext context, Par
@Override
protected Class extends Mapper> getMapperClass() {
- if (options.getFileLayout() == SqoopOptions.FileLayout.TextFile) {
+ if (SqoopOptions.FileLayout.BinaryFile.equals(options.getFileLayout())) {
+ LOG.debug("Using MainframeDatasetBinaryImportMapper");
+ return MainframeDatasetBinaryImportMapper.class;
+ } else if (options.getFileLayout() == SqoopOptions.FileLayout.TextFile) {
return MainframeDatasetImportMapper.class;
} else {
return super.getMapperClass();
@@ -66,13 +74,58 @@ protected void configureInputFormat(Job job, String tableName,
job.getConfiguration().set(
MainframeConfiguration.MAINFRAME_INPUT_DATASET_TAPE,
options.getMainframeInputDatasetTape().toString());
+ if (SqoopOptions.FileLayout.BinaryFile == options.getFileLayout()) {
+ job.getConfiguration().set(
+ MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE,
+ MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE_BINARY);
+ job.getConfiguration().setInt(
+ MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_BUFFER_SIZE,
+ options.getBufferSize()
+ );
+ } else {
+ job.getConfiguration().set(
+ MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE,
+ MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE_ASCII);
+ }
+
}
@Override
protected void configureOutputFormat(Job job, String tableName,
String tableClassName) throws ClassNotFoundException, IOException {
super.configureOutputFormat(job, tableName, tableClassName);
+ job.getConfiguration().set(
+ MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE,
+ options.getMainframeFtpTransferMode());
+ // use the default outputformat
LazyOutputFormat.setOutputFormatClass(job, getOutputFormatClass());
}
+ @Override
+ protected void configureMapper(Job job, String tableName,
+ String tableClassName) throws IOException {
+ super.configureMapper(job, tableName, tableClassName);
+ if (SqoopOptions.FileLayout.BinaryFile == options.getFileLayout()) {
+ job.setOutputKeyClass(BytesWritable.class);
+ job.setOutputValueClass(NullWritable.class);
+
+ // this is required as code generated class assumes setField method takes String
+ // and will fail with ClassCastException when a byte array is passed instead
+ // java.lang.ClassCastException: [B cannot be cast to java.lang.String
+ Configuration conf = job.getConfiguration();
+ conf.setClass(org.apache.sqoop.mapreduce.db.DBConfiguration.INPUT_CLASS_PROPERTY, MainframeDatasetBinaryRecord.class,
+ DBWritable.class);
+ }
+ }
+
+ @Override
+ protected Class extends OutputFormat> getOutputFormatClass() {
+ if (options.getFileLayout() == SqoopOptions.FileLayout.TextFile) {
+ return RawKeyTextOutputFormat.class;
+ } else if (options.getFileLayout()
+ == SqoopOptions.FileLayout.BinaryFile) {
+ return ByteKeyOutputFormat.class;
+ }
+ return null;
+ }
}
diff --git a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
index 9dcbdd59..b47be72c 100644
--- a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
+++ b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
@@ -114,6 +114,7 @@ public abstract class BaseSqoopTool extends org.apache.sqoop.tool.SqoopTool {
public static final String FMT_TEXTFILE_ARG = "as-textfile";
public static final String FMT_AVRODATAFILE_ARG = "as-avrodatafile";
public static final String FMT_PARQUETFILE_ARG = "as-parquetfile";
+ public static final String FMT_BINARYFILE_ARG = "as-binaryfile";
public static final String HIVE_IMPORT_ARG = "hive-import";
public static final String HIVE_TABLE_ARG = "hive-table";
public static final String HIVE_DATABASE_ARG = "hive-database";
diff --git a/src/java/org/apache/sqoop/tool/ImportTool.java b/src/java/org/apache/sqoop/tool/ImportTool.java
index 478f1748..13973373 100644
--- a/src/java/org/apache/sqoop/tool/ImportTool.java
+++ b/src/java/org/apache/sqoop/tool/ImportTool.java
@@ -744,6 +744,10 @@ protected RelatedOptions getImportOptions() {
.withDescription("Imports data to Parquet files")
.withLongOpt(BaseSqoopTool.FMT_PARQUETFILE_ARG)
.create());
+ importOpts.addOption(OptionBuilder
+ .withDescription("Imports data to Binary files")
+ .withLongOpt(BaseSqoopTool.FMT_BINARYFILE_ARG)
+ .create());
importOpts.addOption(OptionBuilder.withArgName("n")
.hasArg().withDescription("Use 'n' map tasks to import in parallel")
.withLongOpt(NUM_MAPPERS_ARG)
diff --git a/src/java/org/apache/sqoop/tool/MainframeImportTool.java b/src/java/org/apache/sqoop/tool/MainframeImportTool.java
index cdd9d6d0..fbc8c3db 100644
--- a/src/java/org/apache/sqoop/tool/MainframeImportTool.java
+++ b/src/java/org/apache/sqoop/tool/MainframeImportTool.java
@@ -20,6 +20,7 @@
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.ToolRunner;
@@ -40,6 +41,7 @@ public class MainframeImportTool extends ImportTool {
public static final String DS_ARG = "dataset";
public static final String DS_TYPE_ARG = "datasettype";
public static final String DS_TAPE_ARG = "tape";
+ public static final String BUFFERSIZE_ARG = "buffersize";
public MainframeImportTool() {
super("import-mainframe", false);
@@ -82,6 +84,14 @@ protected RelatedOptions getImportOptions() {
.withDescription("Imports data as plain text (default)")
.withLongOpt(FMT_TEXTFILE_ARG)
.create());
+ importOpts.addOption(OptionBuilder
+ .withDescription("Imports data as binary")
+ .withLongOpt(FMT_BINARYFILE_ARG)
+ .create());
+ importOpts.addOption(OptionBuilder
+ .hasArg().withDescription("Sets buffer size for binary import in bytes (default=32kB)")
+ .withLongOpt(BUFFERSIZE_ARG)
+ .create());
importOpts.addOption(OptionBuilder.withArgName("n")
.hasArg().withDescription("Use 'n' map tasks to import in parallel")
.withLongOpt(NUM_MAPPERS_ARG)
@@ -168,6 +178,28 @@ public void applyOptions(CommandLine in, SqoopOptions out)
// set default tape value to false
out.setMainframeInputDatasetTape("false");
}
+ if (in.hasOption(FMT_BINARYFILE_ARG)) {
+ out.setMainframeFtpTransferMode(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE_BINARY);
+ out.setFileLayout(SqoopOptions.FileLayout.BinaryFile);
+ } else {
+ // set default transfer mode to ascii
+ out.setMainframeFtpTransferMode(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE_ASCII);
+ out.setFileLayout(SqoopOptions.FileLayout.TextFile);
+ }
+
+ if (in.hasOption(BUFFERSIZE_ARG)) {
+ // if we specify --buffersize set the buffer size
+ int bufSize = Integer.valueOf(in.getOptionValue(BUFFERSIZE_ARG));
+ if (bufSize > 0) {
+ out.setBufferSize(bufSize);
+ }
+ else {
+ out.setBufferSize(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE);
+ }
+ } else {
+ // set the default buffer size to 32kB
+ out.setBufferSize(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE);
+ }
}
@Override
@@ -190,6 +222,17 @@ protected void validateImportOptions(SqoopOptions options)
throw new InvalidOptionsException(
"--" + DS_TAPE_ARG + " specified is invalid. " + HELP_STR);
}
+ /* only allow FileLayout.BinaryFile to be selected for mainframe import */
+ if (SqoopOptions.FileLayout.BinaryFile.equals(options.getFileLayout()) && StringUtils.isEmpty(options.getMainframeInputDatasetName())) {
+ throw new InvalidOptionsException("--as-binaryfile should only be used with import-mainframe module.");
+ }
+
+ // only allow buffer size to be set different to default when binary file is selected
+ // in any case, if --as-binaryfile isn't selected, --buffersize parameter is harmless
+ if (!SqoopOptions.FileLayout.BinaryFile.equals(options.getFileLayout()) && !MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE.equals(options.getBufferSize())) {
+ throw new InvalidOptionsException("--buffersize should only be used with --as-binaryfile parameter.");
+ }
+
super.validateImportOptions(options);
}
}
diff --git a/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java b/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java
index 95bc0ecb..654721e3 100644
--- a/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java
+++ b/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java
@@ -23,6 +23,7 @@
import java.util.ArrayList;
import java.util.List;
+import org.apache.commons.lang3.StringUtils;
import org.apache.commons.net.PrintCommandListener;
import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPClient;
@@ -207,8 +208,18 @@ public static FTPClient getFTPConnection(Configuration conf)
throw new IOException("Could not login to server " + server
+ ":" + ftp.getReplyString());
}
- // set ASCII transfer mode
- ftp.setFileType(FTP.ASCII_FILE_TYPE);
+ // set transfer mode
+ String transferMode = conf.get(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE);
+ if (StringUtils.equalsIgnoreCase(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE_BINARY,transferMode)) {
+ LOG.info("Setting FTP transfer mode to binary");
+ // ftp.setFileTransferMode(FTP.BINARY_FILE_TYPE) doesn't work for MVS, it throws a syntax error
+ ftp.sendCommand("TYPE I");
+ // this is IMPORTANT - otherwise it will convert 0x0d0a to 0x0a = dropping bytes
+ ftp.setFileType(FTP.BINARY_FILE_TYPE);
+ } else {
+ LOG.info("Defaulting FTP transfer mode to ascii");
+ ftp.setFileTransferMode(FTP.ASCII_FILE_TYPE);
+ }
// Use passive mode as default.
ftp.enterLocalPassiveMode();
LOG.info("System type detected: " + ftp.getSystemType());
diff --git a/src/test/org/apache/sqoop/manager/mainframe/MainframeManagerImportTest.java b/src/test/org/apache/sqoop/manager/mainframe/MainframeManagerImportTest.java
index 041dfb78..3b8ed236 100644
--- a/src/test/org/apache/sqoop/manager/mainframe/MainframeManagerImportTest.java
+++ b/src/test/org/apache/sqoop/manager/mainframe/MainframeManagerImportTest.java
@@ -113,6 +113,42 @@ public void testImportGdgText() throws IOException {
doImportAndVerify(MainframeTestUtil.GDG_DATASET_NAME, MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_GDG, files);
}
+ @Test
+ public void testImportGdgBinary() throws IOException {
+ HashMap files = new HashMap();
+ files.put(MainframeTestUtil.GDG_BINARY_DATASET_FILENAME, MainframeTestUtil.EXPECTED_GDG_BINARY_DATASET_MD5);
+ doImportAndVerify(MainframeTestUtil.GDG_BINARY_DATASET_NAME, MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_GDG, files, "--as-binaryfile");
+ }
+
+ @Test
+ public void testImportGdgBinaryWithBufferSize() throws IOException {
+ HashMap files = new HashMap();
+ files.put(MainframeTestUtil.GDG_BINARY_DATASET_FILENAME, MainframeTestUtil.EXPECTED_GDG_BINARY_DATASET_MD5);
+ doImportAndVerify(MainframeTestUtil.GDG_BINARY_DATASET_NAME, MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_GDG, files, "--as-binaryfile", "--buffersize", "64000");
+ }
+
+ @Test
+ public void testImportSequential() throws IOException {
+ // can reuse the same dataset as binary as the dataset is plain text
+ HashMap files = new HashMap();
+ files.put(MainframeTestUtil.SEQ_DATASET_FILENAME, MainframeTestUtil.EXPECTED_SEQ_DATASET_MD5);
+ doImportAndVerify(MainframeTestUtil.SEQ_DATASET_NAME, MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_SEQUENTIAL, files);
+ }
+
+ @Test
+ public void testImportSequentialBinary() throws IOException {
+ HashMap files = new HashMap();
+ files.put(MainframeTestUtil.SEQ_BINARY_DATASET_FILENAME, MainframeTestUtil.EXPECTED_SEQ_BINARY_DATASET_MD5);
+ doImportAndVerify(MainframeTestUtil.SEQ_BINARY_DATASET_NAME, MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_SEQUENTIAL, files, "--as-binaryfile");
+ }
+
+ @Test
+ public void testImportSequentialBinaryWithBufferSize() throws IOException {
+ HashMap files = new HashMap();
+ files.put(MainframeTestUtil.SEQ_BINARY_DATASET_FILENAME, MainframeTestUtil.EXPECTED_SEQ_BINARY_DATASET_MD5);
+ doImportAndVerify(MainframeTestUtil.SEQ_BINARY_DATASET_NAME, MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_SEQUENTIAL, files, "--as-binaryfile", "--buffersize", "64000");
+ }
+
private String [] getArgv(String datasetName, String datasetType, String ... extraArgs) {
ArrayList args = new ArrayList();
@@ -130,7 +166,6 @@ public void testImportGdgText() throws IOException {
args.add(datasetType);
if (extraArgs.length > 0) {
- args.add("--");
for (String arg : extraArgs) {
args.add(arg);
}
diff --git a/src/test/org/apache/sqoop/manager/mainframe/MainframeTestUtil.java b/src/test/org/apache/sqoop/manager/mainframe/MainframeTestUtil.java
index f28ff36c..9f86f6cd 100644
--- a/src/test/org/apache/sqoop/manager/mainframe/MainframeTestUtil.java
+++ b/src/test/org/apache/sqoop/manager/mainframe/MainframeTestUtil.java
@@ -41,4 +41,31 @@ public class MainframeTestUtil {
public static final String EXPECTED_GDG_DATASET_MD5 = System.getProperty(
"sqoop.test.mainframe.ftp.dataset.gdg.md5",
"f0d0d171fdb8a03dbc1266ed179d7093");
+ public static final String GDG_BINARY_DATASET_NAME = System.getProperty(
+ "sqoop.test.mainframe.ftp.binary.dataset.gdg",
+ "TSODIQ1.FOLDER");
+ public static final String GDG_BINARY_DATASET_FILENAME = System.getProperty(
+ "sqoop.test.mainframe.ftp.binary.dataset.gdg.filename",
+ "G0002V45");
+ public static final String EXPECTED_GDG_BINARY_DATASET_MD5 = System.getProperty(
+ "sqoop.test.mainframe.ftp.binary.dataset.gdg.md5",
+ "43eefbe34e466dd3f65a3e867a60809a");
+ public static final String SEQ_DATASET_NAME = System.getProperty(
+ "sqoop.test.mainframe.ftp.dataset.seq",
+ "TSODIQ1.GDGTEXT.G0001V43");
+ public static final String SEQ_DATASET_FILENAME = System.getProperty(
+ "sqoop.test.mainframe.ftp.dataset.seq.filename",
+ "G0001V43");
+ public static final String EXPECTED_SEQ_DATASET_MD5 = System.getProperty(
+ "sqoop.test.mainframe.ftp.dataset.seq.md5",
+ "f0d0d171fdb8a03dbc1266ed179d7093");
+ public static final String SEQ_BINARY_DATASET_NAME = System.getProperty(
+ "sqoop.test.mainframe.ftp.binary.dataset.seq",
+ "TSODIQ1.FOLDER.FOLDERTXT");
+ public static final String SEQ_BINARY_DATASET_FILENAME = System.getProperty(
+ "sqoop.test.mainframe.ftp.binary.dataset.seq.filename",
+ "FOLDERTXT");
+ public static final String EXPECTED_SEQ_BINARY_DATASET_MD5 = System.getProperty(
+ "sqoop.test.mainframe.ftp.binary.dataset.seq.md5",
+ "1591c0fcc718fda7e9c1f3561d232b2b");
}
diff --git a/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeDatasetBinaryRecord.java b/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeDatasetBinaryRecord.java
new file mode 100644
index 00000000..b4cba28c
--- /dev/null
+++ b/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeDatasetBinaryRecord.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.mapreduce.mainframe;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.net.ftp.FTPClient;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.doReturn;
+
+public class TestMainframeDatasetBinaryRecord {
+
+ private MainframeDatasetFTPRecordReader ftpRecordReader;
+ private InputStream is;
+ private FTPClient ftp;
+ private final String DATASET_NAME = "dummy.ds";
+ private final String DATASET_TYPE = "g";
+ private static final Log LOG = LogFactory.getLog(
+ TestMainframeDatasetBinaryRecord.class.getName());
+
+ @Before
+ public void setUp() throws IOException, InterruptedException {
+ MainframeDatasetFTPRecordReader rdr = new MainframeDatasetFTPRecordReader();
+ Configuration conf;
+ MainframeDatasetInputSplit split;
+ TaskAttemptContext context;
+ ftpRecordReader = spy(rdr);
+ is = mock(InputStream.class);
+ ftp = mock(FTPClient.class);
+ split = mock(MainframeDatasetInputSplit.class);
+ context = mock(TaskAttemptContext.class);
+ conf = new Configuration();
+ when(ftp.retrieveFileStream(any(String.class))).thenReturn(is);
+ when(ftp.changeWorkingDirectory(any(String.class))).thenReturn(true);
+ doReturn("file1").when(ftpRecordReader).getNextDataset();
+ when(split.getNextDataset()).thenReturn(DATASET_NAME);
+ when(ftpRecordReader.getNextDataset()).thenReturn(DATASET_NAME);
+ conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME,DATASET_NAME);
+ conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE,DATASET_TYPE);
+ conf.setInt(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_BUFFER_SIZE,MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE);
+ ftpRecordReader.initialize(ftp, conf);
+ }
+
+ // Mock the inputstream.read method and manipulate the function parameters
+ protected Answer returnSqoopRecord(final int byteLength) {
+ return new Answer() {
+ public Object answer(InvocationOnMock invocation) {
+ return byteLength;
+ }
+ };
+ }
+
+ @Test
+ public void testGetNextBinaryRecordForFullRecord() {
+
+ MainframeDatasetBinaryRecord record = new MainframeDatasetBinaryRecord();
+ try {
+ when(is.read(any(byte[].class),anyInt(),anyInt()))
+ .thenAnswer(returnSqoopRecord(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE))
+ .thenReturn(-1);
+ when(ftp.completePendingCommand()).thenReturn(true);
+ Assert.assertTrue(ftpRecordReader.getNextBinaryRecord(record));
+ Assert.assertFalse(record.getFieldMap().values().isEmpty());
+ Assert.assertEquals(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE.intValue(),((byte[])record.getFieldMap().values().iterator().next()).length);
+ } catch (IOException ioe) {
+ LOG.error("Issue with reading 1 full binary buffer record", ioe);
+ throw new RuntimeException(ioe);
+ }
+ }
+
+ @Test
+ public void testGetNextBinaryRecordForPartialRecord() {
+ int expectedBytesRead = 10;
+ MainframeDatasetBinaryRecord record = new MainframeDatasetBinaryRecord();
+ try {
+ when(is.read(any(byte[].class),anyInt(),anyInt()))
+ .thenAnswer(returnSqoopRecord(10))
+ .thenReturn(-1);
+ when(ftp.completePendingCommand()).thenReturn(true);
+ Assert.assertTrue(ftpRecordReader.getNextBinaryRecord(record));
+ Assert.assertFalse(record.getFieldMap().values().isEmpty());
+ Assert.assertEquals(expectedBytesRead,(((byte[])record.getFieldMap().values().iterator().next()).length));
+ } catch (IOException ioe) {
+ LOG.error("Issue with reading 10 byte binary record", ioe);
+ throw new RuntimeException(ioe);
+ }
+ }
+
+ @Test
+ public void testGetNextBinaryRecordFor2Records() {
+ // test 1 full record, and 1 partial
+ int expectedBytesRead = 10;
+ MainframeDatasetBinaryRecord record = new MainframeDatasetBinaryRecord();
+ try {
+ when(is.read(any(byte[].class),anyInt(),anyInt()))
+ .thenAnswer(returnSqoopRecord(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE))
+ .thenAnswer(returnSqoopRecord(10))
+ .thenReturn(-1);
+ when(ftp.completePendingCommand()).thenReturn(true);
+ Assert.assertTrue(ftpRecordReader.getNextBinaryRecord(record));
+ Assert.assertFalse(record.getFieldMap().values().isEmpty());
+ Assert.assertTrue(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE.equals((((byte[])record.getFieldMap().values().iterator().next()).length)));
+ record = new MainframeDatasetBinaryRecord();
+ Assert.assertTrue(ftpRecordReader.getNextBinaryRecord(record));
+ Assert.assertFalse(record.getFieldMap().values().isEmpty());
+ Assert.assertEquals(expectedBytesRead,(((byte[])record.getFieldMap().values().iterator().next()).length));
+ } catch (IOException ioe) {
+ LOG.error("Issue with reading 1 full binary buffer record followed by 1 partial binary buffer record", ioe);
+ throw new RuntimeException(ioe);
+ }
+ }
+
+ @Test
+ public void testGetNextBinaryRecordForMultipleReads() {
+ // test reading 1 record where the stream returns less than a full buffer
+ MainframeDatasetBinaryRecord record = new MainframeDatasetBinaryRecord();
+ try {
+ when(is.read(any(byte[].class),anyInt(),anyInt()))
+ .thenAnswer(returnSqoopRecord(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE /2))
+ .thenAnswer(returnSqoopRecord(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE /2))
+ .thenReturn(-1);
+ when(ftp.completePendingCommand()).thenReturn(true);
+ Assert.assertTrue(ftpRecordReader.getNextBinaryRecord(record));
+ Assert.assertFalse(record.getFieldMap().values().isEmpty());
+ Assert.assertEquals(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE.intValue(),((byte[])record.getFieldMap().values().iterator().next()).length);
+ record = new MainframeDatasetBinaryRecord();
+ Assert.assertFalse(ftpRecordReader.getNextBinaryRecord(record));
+ Assert.assertNull((((byte[])record.getFieldMap().values().iterator().next())));
+ } catch (IOException ioe) {
+ LOG.error("Issue with verifying reading partial buffer binary records", ioe);
+ throw new RuntimeException(ioe);
+ }
+ }
+}
diff --git a/src/test/org/apache/sqoop/tool/TestMainframeImportTool.java b/src/test/org/apache/sqoop/tool/TestMainframeImportTool.java
index 0b0c6c34..c2edc534 100644
--- a/src/test/org/apache/sqoop/tool/TestMainframeImportTool.java
+++ b/src/test/org/apache/sqoop/tool/TestMainframeImportTool.java
@@ -22,18 +22,18 @@
import java.lang.reflect.Method;
import org.apache.commons.cli.ParseException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.sqoop.cli.RelatedOptions;
import org.apache.sqoop.mapreduce.mainframe.MainframeConfiguration;
import org.junit.After;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
import org.apache.sqoop.SqoopOptions;
import org.apache.sqoop.SqoopOptions.InvalidOptionsException;
import org.apache.sqoop.cli.ToolOptions;
import org.apache.sqoop.testutil.BaseSqoopTestCase;
+import org.junit.rules.ExpectedException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -43,15 +43,16 @@
public class TestMainframeImportTool extends BaseSqoopTestCase {
- private static final Log LOG = LogFactory.getLog(TestMainframeImportTool.class
- .getName());
-
private MainframeImportTool mfImportTool;
+ private ToolOptions toolOptions;
+ private SqoopOptions sqoopOption;
@Before
public void setUp() {
mfImportTool = new MainframeImportTool();
+ toolOptions = new ToolOptions();
+ sqoopOption = new SqoopOptions();
}
@After
@@ -183,4 +184,58 @@ public void testTapeOptionInvalidReturnsFalse() throws ParseException, InvalidOp
Boolean isTape = sqoopOption.getMainframeInputDatasetTape();
assert(isTape != null && isTape.toString().equals("false"));
}
+
+ @Test
+ public void testFtpTransferModeAscii() throws ParseException, InvalidOptionsException {
+ String[] args = new String[] { "--dataset", "mydatasetname", "--as-textfile" };
+ configureAndValidateOptions(args);
+ assertEquals(SqoopOptions.FileLayout.TextFile,sqoopOption.getFileLayout());
+ }
+ @Test
+ public void testFtpTransferModeDefaultsToAscii() throws ParseException, InvalidOptionsException {
+ String[] args = new String[] { "--dataset", "mydatasetname" };
+ configureAndValidateOptions(args);
+ assertEquals(SqoopOptions.FileLayout.TextFile,sqoopOption.getFileLayout());
+ }
+
+ @Test
+ public void testAsBinaryFileSetsCorrectFileLayoutAndDefaultBufferSize() throws ParseException, InvalidOptionsException {
+ String[] args = new String[] { "--dataset", "mydatasetname", "--as-binaryfile" };
+ configureAndValidateOptions(args);
+ assertEquals(SqoopOptions.FileLayout.BinaryFile,sqoopOption.getFileLayout());
+ assertEquals(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE, sqoopOption.getBufferSize());
+ }
+
+ @Test
+ public void testSetBufferSize() throws ParseException, InvalidOptionsException {
+ final Integer EXPECTED_BUFFER = 1024;
+ String[] args = new String[] { "--dataset", "mydatasetname", "--as-binaryfile", "--buffersize", EXPECTED_BUFFER.toString() };
+ configureAndValidateOptions(args);
+ assertEquals(SqoopOptions.FileLayout.BinaryFile,sqoopOption.getFileLayout());
+ assertEquals(EXPECTED_BUFFER, sqoopOption.getBufferSize());
+ }
+
+ @Rule
+ public final ExpectedException exception = ExpectedException.none();
+
+ @Test
+ public void testBufferSizeWithoutBinaryThrowsException() throws ParseException, InvalidOptionsException {
+ final Integer EXPECTED_BUFFER = 1024;
+ String[] args = new String[] { "--buffersize", EXPECTED_BUFFER.toString() };
+ exception.expect(InvalidOptionsException.class);
+ configureAndValidateOptions(args);
+ }
+
+ @Test
+ public void testInvalidBufferSizeThrowsNumberFormatException() throws ParseException, InvalidOptionsException {
+ String[] args = new String[] { "--buffersize", "invalidinteger" };
+ exception.expect(NumberFormatException.class);
+ configureAndValidateOptions(args);
+ }
+
+ private void configureAndValidateOptions(String[] args) throws ParseException, InvalidOptionsException {
+ mfImportTool.configureOptions(toolOptions);
+ sqoopOption = mfImportTool.parseArguments(args, null, sqoopOption, false);
+ mfImportTool.validateImportOptions(sqoopOption);
+ }
}