diff --git a/ivy.xml b/ivy.xml index 6335e012..27597555 100644 --- a/ivy.xml +++ b/ivy.xml @@ -170,10 +170,14 @@ under the License. rev="${commons-cli.version}" conf="common->default"/> + + diff --git a/ivy/libraries.properties b/ivy/libraries.properties index 6818b3e6..8f987394 100644 --- a/ivy/libraries.properties +++ b/ivy/libraries.properties @@ -29,12 +29,14 @@ commons-collections.version=3.1 commons-io.version=1.4 commons-lang.version=2.4 commons-logging.version=1.0.4 +commons-net.version=3.1 hsqldb.version=1.8.0.10 ivy.version=2.3.0 junit.version=4.11 +mockito-all.version=1.9.5 h2.version=1.3.170 diff --git a/src/docs/man/common-args.txt b/src/docs/man/common-args.txt index e8d1f17e..24741f51 100644 --- a/src/docs/man/common-args.txt +++ b/src/docs/man/common-args.txt @@ -1,4 +1,3 @@ - //// Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file @@ -7,9 +6,9 @@ to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - + http://www.apache.org/licenses/LICENSE-2.0 - + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,45 +16,16 @@ limitations under the License. //// - Database connection and common options ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ --connect (jdbc-uri):: Specify JDBC connect string (required) ---connection-manager (class-name):: - Specify connection manager class name (optional) - --driver (class-name):: Manually specify JDBC driver class to use --connection-param-file (filename):: Optional properties file that provides connection parameters ---hadoop-mapred-home (dir):: - Override $HADOOP_MAPRED_HOME - ---help:: - Print usage instructions - ---password-file (file containing the password):: - Set authentication password in a file on the users home - directory with 400 permissions - (Note: This is very secure and a preferred way of entering credentials) - ---password (password):: - Set authentication password - (Note: This is very insecure. You should use -P instead.) - --P:: - Prompt for user password - ---username (username):: - Set authentication username - ---verbose:: - Print more information while working - ---hadoop-home (dir):: - Deprecated. Override $HADOOP_HOME +include::database-independent-args.txt[] diff --git a/src/docs/man/database-independent-args.txt b/src/docs/man/database-independent-args.txt new file mode 100644 index 00000000..d481ac6e --- /dev/null +++ b/src/docs/man/database-independent-args.txt @@ -0,0 +1,47 @@ +//// + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +//// + +--connection-manager (class-name):: + Specify connection manager class name (optional) + +--hadoop-mapred-home (dir):: + Override $HADOOP_MAPRED_HOME + +--help:: + Print usage instructions + +--password-file (file containing the password):: + Set authentication password in a file on the users home + directory with 400 permissions + (Note: This is very secure and a preferred way of entering credentials) + +--password (password):: + Set authentication password + (Note: This is very insecure. You should use -P instead.) + +-P:: + Prompt for user password + +--username (username):: + Set authentication username + +--verbose:: + Print more information while working + +--hadoop-home (dir):: + Deprecated. Override $HADOOP_HOME diff --git a/src/docs/man/hbase-args.txt b/src/docs/man/hbase-args.txt index 456bc143..afd5c5b2 100644 --- a/src/docs/man/hbase-args.txt +++ b/src/docs/man/hbase-args.txt @@ -1,4 +1,3 @@ - //// Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file @@ -7,9 +6,9 @@ to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - + http://www.apache.org/licenses/LICENSE-2.0 - + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -31,7 +30,8 @@ HBase options Specifies which input column to use as the row key If input table contains composite key, value of (col) must be a comma-separated list of composite - key attributes + key attributes. For mainframe dataset, this should be the + input field name --hbase-table (table-name):: Specifies an HBase table to use as the target instead of HDFS diff --git a/src/docs/man/hive-args.txt b/src/docs/man/hive-args.txt index dd77c3ec..7d9e4276 100644 --- a/src/docs/man/hive-args.txt +++ b/src/docs/man/hive-args.txt @@ -1,4 +1,3 @@ - //// Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file @@ -7,9 +6,9 @@ to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - + http://www.apache.org/licenses/LICENSE-2.0 - + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -24,7 +23,7 @@ Hive options Override $HIVE_HOME --hive-import:: - If set, then import the table into Hive + If set, then import the table or mainframe dataset into Hive --hive-overwrite:: Overwrites existing data in the hive table if it exists. @@ -37,4 +36,4 @@ Hive options When used with --hive-import, overrides the destination table name --map-column-hive (mapping):: - Override default mapping for SQL types into Hive types for configured columns + Override default mapping for SQL types or input field types into Hive types for configured columns diff --git a/src/docs/man/import-args.txt b/src/docs/man/import-args.txt index 2bb69ba1..93f65bab 100644 --- a/src/docs/man/import-args.txt +++ b/src/docs/man/import-args.txt @@ -1,4 +1,3 @@ - //// Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file @@ -7,9 +6,9 @@ to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - + http://www.apache.org/licenses/LICENSE-2.0 - + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,25 +16,11 @@ limitations under the License. //// - -Import control options -~~~~~~~~~~~~~~~~~~~~~~ +include::import-common-args.txt[] --append:: Append data to an existing HDFS dataset ---as-avrodatafile:: - Imports data to Avro Data Files - ---as-sequencefile:: - Imports data to SequenceFiles - ---as-textfile:: - Imports data as plain text (default) - ---as-parquetfile:: - Imports data to Parquet Files - --boundary-query (query):: Using following query to select minimal and maximal value of '--split-by' column for creating splits @@ -51,10 +36,6 @@ Import control options --inline-lob-limit (n):: Set the maximum size for an inline LOB ---num-mappers (n):: --m:: - Use 'n' map tasks to import in parallel - --query (statement):: Imports the results of +statement+ instead of a table @@ -64,24 +45,10 @@ Import control options --table (table-name):: The table to import ---target-dir (dir):: - Explicit HDFS target directory for the import. - ---warehouse-dir (dir):: - Tables are uploaded to the HDFS path +/warehouse/dir/(tablename)/+ - --where (clause):: Import only the rows for which _clause_ is true. e.g.: `--where "user_id > 400 AND hidden == 0"` ---compress:: --z:: - Uses gzip (or, alternatively, the codec specified by +\--compression-codec+, - if set) to compress data as it is written to HDFS - ---compression-codec (codec):: - Uses the Hadoop +codec+ class to compress data as it is written to HDFS. - --null-string:: The string to be written for a null value for string columns diff --git a/src/docs/man/import-common-args.txt b/src/docs/man/import-common-args.txt new file mode 100644 index 00000000..22e3448e --- /dev/null +++ b/src/docs/man/import-common-args.txt @@ -0,0 +1,51 @@ +//// + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +//// + + +Import control options +~~~~~~~~~~~~~~~~~~~~~~ + +--as-textfile:: + Imports data as plain text (default) + +--as-avrodatafile:: + Imports data to Avro Data Files + +--as-sequencefile:: + Imports data to SequenceFiles + +--as-parquetfile:: + Imports data to Parquet Files + +--num-mappers (n):: +-m:: + Use 'n' map tasks to import in parallel + +--target-dir (dir):: + Explicit HDFS target directory for the import. + +--warehouse-dir (dir):: + Tables are uploaded to the HDFS path +/warehouse/dir/(tablename)/+ + +--compress:: +-z:: + Uses gzip (or, alternatively, the codec specified by +\--compression-codec+, + if set) to compress data as it is written to HDFS + +--compression-codec (codec):: + Uses the Hadoop +codec+ class to compress data as it is written to HDFS. diff --git a/src/docs/man/mainframe-connection-args.txt b/src/docs/man/mainframe-connection-args.txt new file mode 100644 index 00000000..04e33c7d --- /dev/null +++ b/src/docs/man/mainframe-connection-args.txt @@ -0,0 +1,25 @@ +//// + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +//// + +.Connection arguments + +--connect (hostname):: + Specify mainframe host name (required) + +--dataset (partitioned dataset name):: + Specify a partitioned dataset name diff --git a/src/docs/man/sqoop-import-mainframe.txt b/src/docs/man/sqoop-import-mainframe.txt new file mode 100644 index 00000000..0e1ea815 --- /dev/null +++ b/src/docs/man/sqoop-import-mainframe.txt @@ -0,0 +1,66 @@ +//// + Copyright 2011 The Apache Software Foundation + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +//// + +sqoop-import-mainframe(1) +========================= + +NAME +---- +sqoop-import-mainframe - Import mainframe sequential datasets to HDFS + +SYNOPSIS +-------- +'sqoop-import-mainframe' + +'sqoop import-mainframe' + +DESCRIPTION +----------- + +include::../user/import-mainframe-purpose.txt[] + +OPTIONS +------- + +The +--connect+ and +--dataset+ options are required. + +Mainframe connection and common options +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +include::mainframe-connection-args.txt[] + +include::database-independent-args.txt[] + +include::import-common-args.txt[] + +include::hive-args.txt[] + +include::hbase-args.txt[] + +include::input-args.txt[] + +include::output-args.txt[] + +include::codegen-args.txt[] + +ENVIRONMENT +----------- + +See 'sqoop(1)' diff --git a/src/docs/man/sqoop.txt b/src/docs/man/sqoop.txt index febe8278..7977e130 100644 --- a/src/docs/man/sqoop.txt +++ b/src/docs/man/sqoop.txt @@ -12,7 +12,8 @@ SYNOPSIS DESCRIPTION ----------- Sqoop is a tool designed to help users of large data import existing -relational databases into their Hadoop clusters. Sqoop interfaces with +relational databases or mainframe datasets into their Hadoop clusters. +Sqoop interfaces with databases via JDBC, importing the contents of tables into HDFS while generating Java classes so users can interpret the table's schema. Sqoop can also run in reverse, exporting records back from HDFS to a @@ -66,7 +67,7 @@ installed from a tarball. //// Copyright 2011 The Apache Software Foundation - + Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information @@ -74,9 +75,9 @@ installed from a tarball. to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - + http://www.apache.org/licenses/LICENSE-2.0 - + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/src/docs/user/SqoopUserGuide.txt b/src/docs/user/SqoopUserGuide.txt index 2e888879..8d9c12d1 100644 --- a/src/docs/user/SqoopUserGuide.txt +++ b/src/docs/user/SqoopUserGuide.txt @@ -1,4 +1,3 @@ - //// Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file @@ -7,9 +6,9 @@ to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - + http://www.apache.org/licenses/LICENSE-2.0 - + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -30,9 +29,9 @@ Sqoop User Guide (v{revnumber}) to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - + http://www.apache.org/licenses/LICENSE-2.0 - + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -52,6 +51,8 @@ include::import.txt[] include::import-all-tables.txt[] +include::import-mainframe.txt[] + include::export.txt[] include::validation.txt[] diff --git a/src/docs/user/basics.txt b/src/docs/user/basics.txt index 7e5a76a9..362283f3 100644 --- a/src/docs/user/basics.txt +++ b/src/docs/user/basics.txt @@ -1,4 +1,3 @@ - //// Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file @@ -7,9 +6,9 @@ to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - + http://www.apache.org/licenses/LICENSE-2.0 - + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -20,10 +19,12 @@ Basic Usage ----------- -With Sqoop, you can _import_ data from a relational database system into -HDFS. The input to the import process is a database table. Sqoop -will read the table row-by-row into HDFS. The output of this import -process is a set of files containing a copy of the imported table. +With Sqoop, you can _import_ data from a relational database system or a +mainframe into HDFS. The input to the import process is either database table +or mainframe datasets. For databases, Sqoop will read the table row-by-row +into HDFS. For mainframe datasets, Sqoop will read records from each mainframe +dataset into HDFS. The output of this import process is a set of files +containing a copy of the imported table or datasets. The import process is performed in parallel. For this reason, the output will be in multiple files. These files may be delimited text files (for example, with commas or tabs separating each field), or @@ -54,9 +55,9 @@ within a schema (with the +sqoop-list-tables+ tool). Sqoop also includes a primitive SQL execution shell (the +sqoop-eval+ tool). Most aspects of the import, code generation, and export processes can -be customized. You can control the specific row range or columns imported. -You can specify particular delimiters and escape characters for the -file-based representation of the data, as well as the file format +be customized. For databases, you can control the specific row range or +columns imported. You can specify particular delimiters and escape characters +for the file-based representation of the data, as well as the file format used. You can also control the class or package names used in generated code. Subsequent sections of this document explain how to specify these and other arguments to Sqoop. diff --git a/src/docs/user/connecting-to-mainframe.txt b/src/docs/user/connecting-to-mainframe.txt new file mode 100644 index 00000000..5b3f0629 --- /dev/null +++ b/src/docs/user/connecting-to-mainframe.txt @@ -0,0 +1,66 @@ +//// + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +//// + +Connecting to a Mainframe +^^^^^^^^^^^^^^^^^^^^^^^^^ + +Sqoop is designed to import mainframe datasets into HDFS. To do +so, you must specify a mainframe host name in the Sqoop +\--connect+ argument. + +---- +$ sqoop import-mainframe --connect z390 +---- + +This will connect to the mainframe host z390 via ftp. + +You might need to authenticate against the mainframe host to +access it. You can use the +\--username+ to supply a username to the mainframe. +Sqoop provides couple of different ways to supply a password, +secure and non-secure, to the mainframe which is detailed below. + +.Secure way of supplying password to the mainframe +You should save the password in a file on the users home directory with 400 +permissions and specify the path to that file using the *+--password-file+* +argument, and is the preferred method of entering credentials. Sqoop will +then read the password from the file and pass it to the MapReduce cluster +using secure means with out exposing the password in the job configuration. +The file containing the password can either be on the Local FS or HDFS. + +Example: + +---- +$ sqoop import-mainframe --connect z390 \ + --username david --password-file ${user.home}/.password +---- + +Another way of supplying passwords is using the +-P+ argument which will +read a password from a console prompt. + +.Non-secure way of passing password + +WARNING: The +\--password+ parameter is insecure, as other users may +be able to read your password from the command-line arguments via +the output of programs such as `ps`. The *+-P+* argument is the preferred +method over using the +\--password+ argument. Credentials may still be +transferred between nodes of the MapReduce cluster using insecure means. + +Example: + +---- +$ sqoop import-mainframe --connect z390 --username david --password 12345 +---- diff --git a/src/docs/user/distributed-cache.txt b/src/docs/user/distributed-cache.txt new file mode 100644 index 00000000..5b4dc510 --- /dev/null +++ b/src/docs/user/distributed-cache.txt @@ -0,0 +1,30 @@ +//// + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +//// + +Controlling Distributed Cache +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Sqoop will copy the jars in $SQOOP_HOME/lib folder to job cache every +time when start a Sqoop job. When launched by Oozie this is unnecessary +since Oozie use its own Sqoop share lib which keeps Sqoop dependencies +in the distributed cache. Oozie will do the localization on each +worker node for the Sqoop dependencies only once during the first Sqoop +job and reuse the jars on worker node for subsquencial jobs. Using +option +--skip-dist-cache+ in Sqoop command when launched by Oozie will +skip the step which Sqoop copies its dependencies to job cache and save +massive I/O. diff --git a/src/docs/user/import-mainframe-purpose.txt b/src/docs/user/import-mainframe-purpose.txt new file mode 100644 index 00000000..90cc2c76 --- /dev/null +++ b/src/docs/user/import-mainframe-purpose.txt @@ -0,0 +1,23 @@ +//// + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +//// + +The +import-mainframe+ tool imports all sequential datasets +in a partitioned dataset(PDS) on a mainframe to HDFS. A PDS is +akin to a directory on the open systems. +The records in a dataset can contain only character data. +Records will be stored with the entire record as a single text field. diff --git a/src/docs/user/import-mainframe.txt b/src/docs/user/import-mainframe.txt new file mode 100644 index 00000000..abeb7cde --- /dev/null +++ b/src/docs/user/import-mainframe.txt @@ -0,0 +1,243 @@ +//// + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +//// + ++sqoop-import-mainframe+ +------------------------ + +Purpose +~~~~~~~ + +include::import-mainframe-purpose.txt[] + +Syntax +~~~~~~ + +---- +$ sqoop import-mainframe (generic-args) (import-args) +$ sqoop-import-mainframe (generic-args) (import-args) +---- + +While the Hadoop generic arguments must precede any import arguments, +you can type the import arguments in any order with respect to one +another. + +include::mainframe-common-args.txt[] + +include::connecting-to-mainframe.txt[] + +.Import control arguments: +[grid="all"] +`---------------------------------`-------------------------------------- +Argument Description +------------------------------------------------------------------------- ++\--as-avrodatafile+ Imports data to Avro Data Files ++\--as-sequencefile+ Imports data to SequenceFiles ++\--as-textfile+ Imports data as plain text (default) ++\--as-parquetfile+ Imports data to Parquet Files ++\--delete-target-dir+ Delete the import target directory\ + if it exists ++-m,\--num-mappers + Use 'n' map tasks to import in parallel ++\--target-dir + HDFS destination dir ++\--warehouse-dir + HDFS parent for table destination ++-z,\--compress+ Enable compression ++\--compression-codec + Use Hadoop codec (default gzip) +------------------------------------------------------------------------- + +Selecting the Files to Import +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +You can use the +\--dataset+ argument to specify a partitioned dataset name. +All sequential datasets in the partitioned dataset will be imported. + +Controlling Parallelism +^^^^^^^^^^^^^^^^^^^^^^^ + +Sqoop imports data in parallel by making multiple ftp connections to the +mainframe to transfer multiple files simultaneously. You can specify the +number of map tasks (parallel processes) to use to perform the import by +using the +-m+ or +\--num-mappers+ argument. Each of these arguments +takes an integer value which corresponds to the degree of parallelism +to employ. By default, four tasks are used. You can adjust this value to +maximize the data transfer rate from the mainframe. + +include::distributed-cache.txt[] + +Controlling the Import Process +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +By default, Sqoop will import all sequential files in a partitioned dataset ++pds+ to a directory named +pds+ inside your home directory in HDFS. For +example, if your username is +someuser+, then the import tool will write to ++/user/someuser/pds/(files)+. You can adjust the parent directory of +the import with the +\--warehouse-dir+ argument. For example: + +---- +$ sqoop import-mainframe --connnect --dataset foo --warehouse-dir /shared \ + ... +---- + +This command would write to a set of files in the +/shared/pds/+ directory. + +You can also explicitly choose the target directory, like so: + +---- +$ sqoop import-mainframe --connnect --dataset foo --target-dir /dest \ + ... +---- + +This will import the files into the +/dest+ directory. +\--target-dir+ is +incompatible with +\--warehouse-dir+. + +By default, imports go to a new target location. If the destination directory +already exists in HDFS, Sqoop will refuse to import and overwrite that +directory's contents. + +File Formats +^^^^^^^^^^^^ + +By default, each record in a dataset is stored +as a text record with a newline at the end. Each record is assumed to contain +a single text field with the name DEFAULT_COLUMN. +When Sqoop imports data to HDFS, it generates a Java class which can +reinterpret the text files that it creates. + +You can also import mainframe records to Sequence, Avro, or Parquet files. + +By default, data is not compressed. You can compress your data by +using the deflate (gzip) algorithm with the +-z+ or +\--compress+ +argument, or specify any Hadoop compression codec using the ++\--compression-codec+ argument. + +include::output-args.txt[] + +Since mainframe record contains only one field, importing to delimited files +will not contain any field delimiter. However, the field may be enclosed with +enclosing character or escaped by an escaping character. + +include::input-args.txt[] + +When Sqoop imports data to HDFS, it generates a Java class which can +reinterpret the text files that it creates when doing a +delimited-format import. The delimiters are chosen with arguments such +as +\--fields-terminated-by+; this controls both how the data is +written to disk, and how the generated +parse()+ method reinterprets +this data. The delimiters used by the +parse()+ method can be chosen +independently of the output arguments, by using ++\--input-fields-terminated-by+, and so on. This is useful, for example, to +generate classes which can parse records created with one set of +delimiters, and emit the records to a different set of files using a +separate set of delimiters. + +include::hive-args.txt[] + +include::hive.txt[] + +include::hbase-args.txt[] +include::hbase.txt[] + +include::accumulo-args.txt[] +include::accumulo.txt[] + +include::codegen-args.txt[] + +As mentioned earlier, a byproduct of importing a table to HDFS is a +class which can manipulate the imported data. +You should use this class in your subsequent +MapReduce processing of the data. + +The class is typically named after the partitioned dataset name; a +partitioned dataset named +foo+ will +generate a class named +foo+. You may want to override this class +name. For example, if your partitioned dataset +is named +EMPLOYEES+, you may want to +specify +\--class-name Employee+ instead. Similarly, you can specify +just the package name with +\--package-name+. The following import +generates a class named +com.foocorp.SomePDS+: + +---- +$ sqoop import-mainframe --connect --dataset SomePDS --package-name com.foocorp +---- + +The +.java+ source file for your class will be written to the current +working directory when you run +sqoop+. You can control the output +directory with +\--outdir+. For example, +\--outdir src/generated/+. + +The import process compiles the source into +.class+ and +.jar+ files; +these are ordinarily stored under +/tmp+. You can select an alternate +target directory with +\--bindir+. For example, +\--bindir /scratch+. + +If you already have a compiled class that can be used to perform the +import and want to suppress the code-generation aspect of the import +process, you can use an existing jar and class by +providing the +\--jar-file+ and +\--class-name+ options. For example: + +---- +$ sqoop import-mainframe --dataset SomePDS --jar-file mydatatypes.jar \ + --class-name SomePDSType +---- + +This command will load the +SomePDSType+ class out of +mydatatypes.jar+. + +Additional Import Configuration Properties +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +There are some additional properties which can be configured by modifying ++conf/sqoop-site.xml+. Properties can be specified the same as in Hadoop +configuration files, for example: + +---- + + property.name + property.value + +---- + +They can also be specified on the command line in the generic arguments, for +example: + +---- +sqoop import -D property.name=property.value ... +---- + +Example Invocations +~~~~~~~~~~~~~~~~~~~ + +The following examples illustrate how to use the import tool in a variety +of situations. + +A basic import of all sequential files in a partitioned dataset named ++EMPLOYEES+ in the mainframe host z390: + +---- +$ sqoop import-mainframe --connect z390 --dataset EMPLOYEES \ + --username SomeUser -P +Enter password: (hidden) +---- + +Controlling the import parallelism (using 8 parallel tasks): + +---- +$ sqoop import-mainframe --connect z390 --dataset EMPLOYEES \ + --username SomeUser --password-file mypassword -m 8 +---- + +Importing the data to Hive: + +---- +$ sqoop import-mainframe --connect z390 --dataset EMPLOYEES \ + --hive-import +---- diff --git a/src/docs/user/import.txt b/src/docs/user/import.txt index c5ffa50f..a6b23dfa 100644 --- a/src/docs/user/import.txt +++ b/src/docs/user/import.txt @@ -6,9 +6,9 @@ to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - + http://www.apache.org/licenses/LICENSE-2.0 - + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -206,18 +206,7 @@ multi-column indices. If your table has no index column, or has a multi-column key, then you must also manually choose a splitting column. -Controlling Distributed Cache -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -Sqoop will copy the jars in $SQOOP_HOME/lib folder to job cache every -time when start a Sqoop job. When launched by Oozie this is unnecessary -since Oozie use its own Sqoop share lib which keeps Sqoop dependencies -in the distributed cache. Oozie will do the localization on each -worker node for the Sqoop dependencies only once during the first Sqoop -job and reuse the jars on worker node for subsquencial jobs. Using -option +--skip-dist-cache+ in Sqoop command when launched by Oozie will -skip the step which Sqoop copies its dependencies to job cache and save -massive I/O. +include::distributed-cache.txt[] Controlling the Import Process ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/src/docs/user/intro.txt b/src/docs/user/intro.txt index 99cd4757..73f7598b 100644 --- a/src/docs/user/intro.txt +++ b/src/docs/user/intro.txt @@ -1,4 +1,3 @@ - //// Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file @@ -7,9 +6,9 @@ to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - + http://www.apache.org/licenses/LICENSE-2.0 - + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -22,9 +21,9 @@ Introduction ------------ Sqoop is a tool designed to transfer data between Hadoop and -relational databases. You can use Sqoop to import data from a -relational database management system (RDBMS) such as MySQL or Oracle -into the Hadoop Distributed File System (HDFS), +relational databases or mainframes. You can use Sqoop to import data from a +relational database management system (RDBMS) such as MySQL or Oracle or a +mainframe into the Hadoop Distributed File System (HDFS), transform the data in Hadoop MapReduce, and then export the data back into an RDBMS. @@ -34,9 +33,9 @@ to import and export the data, which provides parallel operation as well as fault tolerance. This document describes how to get started using Sqoop to move data -between databases and Hadoop and provides reference information for -the operation of the Sqoop command-line tool suite. This document is -intended for: +between databases and Hadoop or mainframe to Hadoop and provides reference +information for the operation of the Sqoop command-line tool suite. This +document is intended for: - System and application programmers - System administrators diff --git a/src/docs/user/mainframe-common-args.txt b/src/docs/user/mainframe-common-args.txt new file mode 100644 index 00000000..8263dae4 --- /dev/null +++ b/src/docs/user/mainframe-common-args.txt @@ -0,0 +1,37 @@ +//// + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +//// + +.Common arguments +[grid="all"] +`----------------------------------------`------------------------------------- +Argument Description +------------------------------------------------------------------------------- ++\--connect + Specify mainframe host to connect ++\--connection-manager + Specify connection manager class to\ + use ++\--hadoop-mapred-home + Override $HADOOP_MAPRED_HOME ++\--help+ Print usage instructions ++\--password-file+ Set path for a file containing the\ + authentication password ++-P+ Read password from console ++\--password + Set authentication password ++\--username + Set authentication username ++\--verbose+ Print more information while working ++\--connection-param-file + Optional properties file that\ + provides connection parameters +------------------------------------------------------------------------------- diff --git a/src/docs/user/tools.txt b/src/docs/user/tools.txt index 7d977d41..e0b336a1 100644 --- a/src/docs/user/tools.txt +++ b/src/docs/user/tools.txt @@ -1,4 +1,3 @@ - //// Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file @@ -7,9 +6,9 @@ to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - + http://www.apache.org/licenses/LICENSE-2.0 - + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -58,6 +57,7 @@ Available commands: help List available commands import Import a table from a database to HDFS import-all-tables Import tables from a database to HDFS + import-mainframe Import mainframe datasets to HDFS list-databases List available databases on a server list-tables List available tables in a database version Display version information @@ -69,7 +69,7 @@ You can display help for a specific tool by entering: +sqoop help (tool-name)+; for example, +sqoop help import+. You can also add the +\--help+ argument to any command: +sqoop import -\--help+. +\--help+. Using Command Aliases ~~~~~~~~~~~~~~~~~~~~~ @@ -128,16 +128,16 @@ usage: sqoop import [GENERIC-ARGS] [TOOL-ARGS] Common arguments: --connect Specify JDBC connect string - --connect-manager Specify connection manager class to use + --connect-manager Specify connection manager class to use --driver Manually specify JDBC driver class to use - --hadoop-mapred-home + Override $HADOOP_MAPRED_HOME + --hadoop-mapred-home Override $HADOOP_MAPRED_HOME --help Print usage instructions --password-file Set path for file containing authentication password -P Read password from console --password Set authentication password --username Set authentication username --verbose Print more information while working - --hadoop-home + Deprecated. Override $HADOOP_HOME + --hadoop-home Deprecated. Override $HADOOP_HOME [...] diff --git a/src/docs/user/validation-args.txt b/src/docs/user/validation-args.txt index 3cb5f660..4f05dc75 100644 --- a/src/docs/user/validation-args.txt +++ b/src/docs/user/validation-args.txt @@ -1,4 +1,3 @@ - //// Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file @@ -19,14 +18,14 @@ .Validation arguments <> [grid="all"] -`----------------------------------------`------------------------------------- -Argument Description -------------------------------------------------------------------------------- -+\--validate+ Enable validation of data copied, \ - supports single table copy only. \ -+\--validator + Specify validator class to use. -+\--validation-threshold + Specify validation threshold class \ - to use. +`-------------------------------------------`------------------------------------- +Argument Description +---------------------------------------------------------------------------------- ++\--validate+ Enable validation of data copied, \ + supports single table copy only. ++\--validator + Specify validator class to use. ++\--validation-threshold + Specify validation threshold class \ + to use. +\--validation-failurehandler + Specify validation failure \ - handler class to use. -------------------------------------------------------------------------------- + handler class to use. +---------------------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/SqoopOptions.java b/src/java/org/apache/sqoop/SqoopOptions.java index 3ef5a973..d16ccb39 100644 --- a/src/java/org/apache/sqoop/SqoopOptions.java +++ b/src/java/org/apache/sqoop/SqoopOptions.java @@ -262,6 +262,10 @@ public String toString() { // "key" column for the merge operation. @StoredAsProperty("merge.key.col") private String mergeKeyCol; + // Dataset name for mainframe import tool + @StoredAsProperty("mainframe.input.dataset.name") + private String mainframeInputDatasetName; + // Accumulo home directory private String accumuloHome; // not serialized to metastore. // Zookeeper home directory @@ -2164,6 +2168,21 @@ public String getMergeKeyCol() { return this.mergeKeyCol; } + /** + * Set the mainframe dataset name. + */ + public void setMainframeInputDatasetName(String name) { + mainframeInputDatasetName = name; + tableName = name; + } + + /** + * Return the mainframe dataset name. + */ + public String getMainframeInputDatasetName() { + return mainframeInputDatasetName; + } + public static String getAccumuloHomeDefault() { // Set this with $ACCUMULO_HOME, but -Daccumulo.home can override. String accumuloHome = System.getenv("ACCUMULO_HOME"); diff --git a/src/java/org/apache/sqoop/manager/MainframeManager.java b/src/java/org/apache/sqoop/manager/MainframeManager.java new file mode 100644 index 00000000..101f3cea --- /dev/null +++ b/src/java/org/apache/sqoop/manager/MainframeManager.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.manager; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Types; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.sqoop.accumulo.AccumuloUtil; +import org.apache.sqoop.hbase.HBaseUtil; +import org.apache.sqoop.mapreduce.AccumuloImportJob; +import org.apache.sqoop.mapreduce.HBaseBulkImportJob; +import org.apache.sqoop.mapreduce.HBaseImportJob; +import org.apache.sqoop.mapreduce.ImportJobBase; +import org.apache.sqoop.mapreduce.mainframe.MainframeDatasetInputFormat; +import org.apache.sqoop.mapreduce.mainframe.MainframeImportJob; + +import com.cloudera.sqoop.SqoopOptions; +import com.cloudera.sqoop.util.ImportException; + + +/** + * ConnManager implementation for mainframe datasets. + */ +public class MainframeManager extends com.cloudera.sqoop.manager.ConnManager { + public static final String DEFAULT_DATASET_COLUMN_NAME = "DEFAULT_COLUMN"; + protected SqoopOptions options; + private static final Log LOG + = LogFactory.getLog(MainframeManager.class.getName()); + + /** + * Constructs the MainframeManager. + * @param opts the SqoopOptions describing the user's requested action. + */ + public MainframeManager(final SqoopOptions opts) { + this.options = opts; + } + + /** + * Launch a MapReduce job via MainframeImportJob to read the + * partitioned dataset with MainframeDatasetInputFormat. + */ + @Override + public void importTable(com.cloudera.sqoop.manager.ImportJobContext context) + throws IOException, ImportException { + String pdsName = context.getTableName(); + String jarFile = context.getJarFile(); + SqoopOptions opts = context.getOptions(); + + context.setConnManager(this); + + ImportJobBase importer; + if (opts.getHBaseTable() != null) { + if (!HBaseUtil.isHBaseJarPresent()) { + throw new ImportException("HBase jars are not present in " + + "classpath, cannot import to HBase!"); + } + if (!opts.isBulkLoadEnabled()) { + importer = new HBaseImportJob(opts, context); + } else { + importer = new HBaseBulkImportJob(opts, context); + } + } else if (opts.getAccumuloTable() != null) { + if (!AccumuloUtil.isAccumuloJarPresent()) { + throw new ImportException("Accumulo jars are not present in " + + "classpath, cannot import to Accumulo!"); + } + importer = new AccumuloImportJob(opts, context); + } else { + // Import to HDFS. + importer = new MainframeImportJob(opts, context); + } + + importer.setInputFormatClass(MainframeDatasetInputFormat.class); + importer.runImport(pdsName, jarFile, null, opts.getConf()); + } + + @Override + public String[] getColumnNames(String tableName) { + // default is one column for the whole record + String[] colNames = new String[1]; + colNames[0] = DEFAULT_DATASET_COLUMN_NAME; + return colNames; + } + + @Override + public Map getColumnTypes(String tableName) { + Map colTypes = new HashMap(); + String[] colNames = getColumnNames(tableName); + colTypes.put(colNames[0], Types.VARCHAR); + return colTypes; + } + + @Override + public void discardConnection(boolean doClose) { + // do nothing + } + + @Override + public String[] listDatabases() { + LOG.error("MainframeManager.listDatabases() not supported"); + return null; + } + + @Override + public String[] listTables() { + LOG.error("MainframeManager.listTables() not supported"); + return null; + } + + @Override + public String getPrimaryKey(String tableName) { + return null; + } + + @Override + public ResultSet readTable(String tableName, String[] columns) + throws SQLException { + return null; + } + + @Override + public Connection getConnection() throws SQLException { + return null; + } + + @Override + public void close() throws SQLException { + release(); + } + + @Override + public void release() { + } + + @Override + public String getDriverClass(){ + return ""; + } + + @Override + public void execAndPrint(String s) { + } + +} diff --git a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeConfiguration.java b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeConfiguration.java new file mode 100644 index 00000000..f8894353 --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeConfiguration.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.mapreduce.mainframe; + +public class MainframeConfiguration +{ + public static final String MAINFRAME_INPUT_DATASET_NAME + = "mapreduce.mainframe.input.dataset.name"; + +} diff --git a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetFTPRecordReader.java b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetFTPRecordReader.java new file mode 100644 index 00000000..7c368428 --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetFTPRecordReader.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce.mainframe; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.net.ftp.FTPClient; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import org.apache.sqoop.lib.SqoopRecord; +import org.apache.sqoop.util.MainframeFTPClientUtils; + +/** + * A RecordReader that returns a record from a mainframe dataset. + */ +public class MainframeDatasetFTPRecordReader + extends MainframeDatasetRecordReader { + private FTPClient ftp = null; + private BufferedReader datasetReader = null; + + private static final Log LOG = LogFactory.getLog( + MainframeDatasetFTPRecordReader.class.getName()); + + @Override + public void initialize(InputSplit inputSplit, + TaskAttemptContext taskAttemptContext) + throws IOException, InterruptedException { + super.initialize(inputSplit, taskAttemptContext); + + Configuration conf = getConfiguration(); + ftp = MainframeFTPClientUtils.getFTPConnection(conf); + if (ftp != null) { + String dsName + = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME); + ftp.changeWorkingDirectory("'" + dsName + "'"); + } + } + + @Override + public void close() throws IOException { + if (datasetReader != null) { + datasetReader.close(); + } + if (ftp != null) { + MainframeFTPClientUtils.closeFTPConnection(ftp); + } + } + + protected boolean getNextRecord(T sqoopRecord) throws IOException { + String line = null; + try { + do { + if (datasetReader == null) { + String dsName = getNextDataset(); + if (dsName == null) { + break; + } + datasetReader = new BufferedReader(new InputStreamReader( + ftp.retrieveFileStream(dsName))); + } + line = datasetReader.readLine(); + if (line == null) { + datasetReader.close(); + datasetReader = null; + if (!ftp.completePendingCommand()) { + throw new IOException("Failed to complete ftp command."); + } else { + LOG.info("Data transfer completed."); + } + } + } while(line == null); + } catch (IOException ioe) { + throw new IOException("IOException during data transfer: " + + ioe.toString()); + } + if (line != null) { + convertToSqoopRecord(line, (SqoopRecord)sqoopRecord); + return true; + } + return false; + } + + private void convertToSqoopRecord(String line, SqoopRecord sqoopRecord) { + String fieldName + = sqoopRecord.getFieldMap().entrySet().iterator().next().getKey(); + sqoopRecord.setField(fieldName, line); + } +} diff --git a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetImportMapper.java b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetImportMapper.java new file mode 100644 index 00000000..0b7b5b85 --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetImportMapper.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce.mainframe; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; + +import org.apache.sqoop.config.ConfigurationConstants; +import org.apache.sqoop.lib.SqoopRecord; +import org.apache.sqoop.mapreduce.AutoProgressMapper; + +/** + * Mapper that writes mainframe dataset records in Text format to multiple files + * based on the key, which is the index of the datasets in the input split. + */ +public class MainframeDatasetImportMapper + extends AutoProgressMapper { + + private static final Log LOG = LogFactory.getLog( + MainframeDatasetImportMapper.class.getName()); + + private MainframeDatasetInputSplit inputSplit; + private MultipleOutputs mos; + private long numberOfRecords; + private Text outkey; + + public void map(LongWritable key, SqoopRecord val, Context context) + throws IOException, InterruptedException { + String dataset = inputSplit.getCurrentDataset(); + outkey.set(val.toString()); + numberOfRecords++; + mos.write(outkey, NullWritable.get(), dataset); + } + + @Override + protected void setup(Context context) + throws IOException, InterruptedException { + super.setup(context); + inputSplit = (MainframeDatasetInputSplit)context.getInputSplit(); + mos = new MultipleOutputs(context); + numberOfRecords = 0; + outkey = new Text(); + } + + @Override + protected void cleanup(Context context) + throws IOException, InterruptedException { + super.cleanup(context); + mos.close(); + context.getCounter( + ConfigurationConstants.COUNTER_GROUP_MAPRED_TASK_COUNTERS, + ConfigurationConstants.COUNTER_MAP_OUTPUT_RECORDS) + .increment(numberOfRecords); + } +} diff --git a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetInputFormat.java b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetInputFormat.java new file mode 100644 index 00000000..045bbd21 --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetInputFormat.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.mapreduce.mainframe; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import org.apache.sqoop.config.ConfigurationHelper; +import org.apache.sqoop.lib.SqoopRecord; +import org.apache.sqoop.util.MainframeFTPClientUtils; + +/** + * A InputFormat that retrieves a list of sequential dataset names in + * a mainframe partitioned dataset. It then creates splits containing one or + * more dataset names. + */ +public class MainframeDatasetInputFormat + extends InputFormat { + + private static final Log LOG = + LogFactory.getLog(MainframeDatasetInputFormat.class); + + @Override + public RecordReader createRecordReader( + InputSplit inputSplit, TaskAttemptContext taskAttemptContext) + throws IOException, InterruptedException { + return new MainframeDatasetFTPRecordReader(); + } + + @Override + public List getSplits(JobContext job) throws IOException { + List splits = new ArrayList(); + Configuration conf = job.getConfiguration(); + String dsName + = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME); + LOG.info("Datasets to transfer from: " + dsName); + List datasets = retrieveDatasets(dsName, conf); + if (datasets.isEmpty()) { + throw new IOException ("No sequential datasets retrieved from " + dsName); + } else { + int count = datasets.size(); + int chunks = Math.min(count, ConfigurationHelper.getJobNumMaps(job)); + for (int i = 0; i < chunks; i++) { + splits.add(new MainframeDatasetInputSplit()); + } + + int j = 0; + while(j < count) { + for (InputSplit sp : splits) { + if (j == count) { + break; + } + ((MainframeDatasetInputSplit)sp).addDataset(datasets.get(j)); + j++; + } + } + } + return splits; + } + + protected List retrieveDatasets(String dsName, Configuration conf) + throws IOException { + return MainframeFTPClientUtils.listSequentialDatasets(dsName, conf); + } +} diff --git a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetInputSplit.java b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetInputSplit.java new file mode 100644 index 00000000..ccb4c687 --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetInputSplit.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce.mainframe; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputSplit; + +/** + * A collection of mainframe datasets. + * + */ +public class MainframeDatasetInputSplit extends InputSplit implements Writable { + private List mainframeDatasets; + private String currentDataset; + private int currentIndex; + + public MainframeDatasetInputSplit() { + mainframeDatasets = new ArrayList(); + currentDataset = null; + currentIndex = -1; + } + + public void addDataset(String mainframeDataset) { + mainframeDatasets.add(mainframeDataset); + } + + public String getCurrentDataset() { + return currentDataset; + } + + public String getNextDataset() { + if (hasMore()) { + currentIndex++; + currentDataset = mainframeDatasets.get(currentIndex); + } else { + currentDataset = null; + } + return currentDataset; + } + + public boolean hasMore() { + return currentIndex < (mainframeDatasets.size() -1); + } + + @Override + public long getLength() throws IOException, InterruptedException { + return mainframeDatasets.size(); + } + + @Override + public String[] getLocations() throws IOException, InterruptedException { + return new String[0]; // No locations + } + + @Override + public void write(DataOutput dataOutput) throws IOException { + dataOutput.writeInt(mainframeDatasets.size()); + for (String ds : mainframeDatasets) { + dataOutput.writeUTF(ds); + } + } + + @Override + public void readFields(DataInput dataInput) throws IOException { + int numberOfDatasets = dataInput.readInt(); + for (int i = 0; i < numberOfDatasets; i++) { + mainframeDatasets.add(dataInput.readUTF()); + } + } +} diff --git a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetRecordReader.java b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetRecordReader.java new file mode 100644 index 00000000..c56bfa93 --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetRecordReader.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce.mainframe; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.util.ReflectionUtils; + +import org.apache.sqoop.mapreduce.DBWritable; + +import org.apache.sqoop.mapreduce.db.DBConfiguration; + +/** + * A RecordReader that reads records from a mainframe dataset. + * Emits LongWritables containing the record number as key and DBWritables as + * value. + */ +public abstract class MainframeDatasetRecordReader + extends RecordReader { + private Class inputClass; + private Configuration conf; + private MainframeDatasetInputSplit split; + private LongWritable key; + private T datasetRecord; + private long numberRecordRead; + private int datasetProcessed; + + private static final Log LOG = LogFactory.getLog( + MainframeDatasetRecordReader.class.getName()); + + @Override + public void initialize(InputSplit inputSplit, + TaskAttemptContext taskAttemptContext) + throws IOException, InterruptedException { + + split = (MainframeDatasetInputSplit)inputSplit; + conf = taskAttemptContext.getConfiguration(); + inputClass = (Class) (conf.getClass( + DBConfiguration.INPUT_CLASS_PROPERTY, null)); + key = null; + datasetRecord = null; + numberRecordRead = 0; + datasetProcessed = 0; + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + if (key == null) { + key = new LongWritable(); + } + if (datasetRecord == null) { + datasetRecord = ReflectionUtils.newInstance(inputClass, conf); + } + if (getNextRecord(datasetRecord)) { + numberRecordRead++; + key.set(numberRecordRead); + return true; + } + return false; + } + + @Override + public LongWritable getCurrentKey() throws IOException, InterruptedException { + return key; + } + + @Override + public T getCurrentValue() throws IOException, InterruptedException { + return datasetRecord; + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return datasetProcessed / (float)split.getLength(); + } + + protected String getNextDataset() { + String datasetName = split.getNextDataset(); + if (datasetName != null) { + datasetProcessed++; + LOG.info("Starting transfer of " + datasetName); + } + return datasetName; + } + + protected Configuration getConfiguration() { + return conf; + } + + protected abstract boolean getNextRecord (T datasetRecord) throws IOException; + +} diff --git a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeImportJob.java b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeImportJob.java new file mode 100644 index 00000000..16c2f75c --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeImportJob.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce.mainframe; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; + +import com.cloudera.sqoop.SqoopOptions; +import com.cloudera.sqoop.manager.ImportJobContext; + +import org.apache.sqoop.mapreduce.DataDrivenImportJob; + +/** + * Import data from a mainframe dataset, using MainframeDatasetInputFormat. + */ +public class MainframeImportJob extends DataDrivenImportJob { + + private static final Log LOG = LogFactory.getLog( + MainframeImportJob.class.getName()); + + public MainframeImportJob(final SqoopOptions opts, ImportJobContext context) { + super(opts, MainframeDatasetInputFormat.class, context); + } + + @Override + protected Class getMapperClass() { + if (options.getFileLayout() == SqoopOptions.FileLayout.TextFile) { + return MainframeDatasetImportMapper.class; + } else { + return super.getMapperClass(); + } + } + + @Override + protected void configureInputFormat(Job job, String tableName, + String tableClassName, String splitByCol) throws IOException { + super.configureInputFormat(job, tableName, tableClassName, splitByCol); + job.getConfiguration().set( + MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME, + options.getMainframeInputDatasetName()); + } + + @Override + protected void configureOutputFormat(Job job, String tableName, + String tableClassName) throws ClassNotFoundException, IOException { + super.configureOutputFormat(job, tableName, tableClassName); + LazyOutputFormat.setOutputFormatClass(job, getOutputFormatClass()); + } + +} diff --git a/src/java/org/apache/sqoop/tool/MainframeImportTool.java b/src/java/org/apache/sqoop/tool/MainframeImportTool.java new file mode 100644 index 00000000..bc4ae6c0 --- /dev/null +++ b/src/java/org/apache/sqoop/tool/MainframeImportTool.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.tool; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.util.ToolRunner; + +import com.cloudera.sqoop.SqoopOptions; +import com.cloudera.sqoop.SqoopOptions.InvalidOptionsException; +import com.cloudera.sqoop.cli.RelatedOptions; +import com.cloudera.sqoop.cli.ToolOptions; + +/** + * Tool that performs mainframe dataset imports to HDFS. + */ +public class MainframeImportTool extends ImportTool { + private static final Log LOG + = LogFactory.getLog(MainframeImportTool.class.getName()); + public static final String DS_ARG = "dataset"; + + public MainframeImportTool() { + super("import-mainframe", false); + } + + @Override + @SuppressWarnings("static-access") + protected RelatedOptions getImportOptions() { + // Imports + RelatedOptions importOpts + = new RelatedOptions("Import mainframe control arguments"); + importOpts.addOption(OptionBuilder.withArgName("Dataset name") + .hasArg().withDescription("Datasets to import") + .withLongOpt(DS_ARG) + .create()); + importOpts.addOption(OptionBuilder + .withDescription("Imports data in delete mode") + .withLongOpt(DELETE_ARG) + .create()); + importOpts.addOption(OptionBuilder.withArgName("dir") + .hasArg().withDescription("HDFS plain file destination") + .withLongOpt(TARGET_DIR_ARG) + .create()); + + addValidationOpts(importOpts); + + importOpts.addOption(OptionBuilder.withArgName("dir") + .hasArg().withDescription("HDFS parent for file destination") + .withLongOpt(WAREHOUSE_DIR_ARG) + .create()); + importOpts.addOption(OptionBuilder + .withDescription("Imports data as plain text (default)") + .withLongOpt(FMT_TEXTFILE_ARG) + .create()); + importOpts.addOption(OptionBuilder.withArgName("n") + .hasArg().withDescription("Use 'n' map tasks to import in parallel") + .withLongOpt(NUM_MAPPERS_ARG) + .create(NUM_MAPPERS_SHORT_ARG)); + importOpts.addOption(OptionBuilder.withArgName("name") + .hasArg().withDescription("Set name for generated mapreduce job") + .withLongOpt(MAPREDUCE_JOB_NAME) + .create()); + importOpts.addOption(OptionBuilder + .withDescription("Enable compression") + .withLongOpt(COMPRESS_ARG) + .create(COMPRESS_SHORT_ARG)); + importOpts.addOption(OptionBuilder.withArgName("codec") + .hasArg() + .withDescription("Compression codec to use for import") + .withLongOpt(COMPRESSION_CODEC_ARG) + .create()); + + return importOpts; + } + + @Override + public void configureOptions(ToolOptions toolOptions) { + toolOptions.addUniqueOptions(getCommonOptions()); + toolOptions.addUniqueOptions(getImportOptions()); + toolOptions.addUniqueOptions(getOutputFormatOptions()); + toolOptions.addUniqueOptions(getInputFormatOptions()); + toolOptions.addUniqueOptions(getHiveOptions(true)); + toolOptions.addUniqueOptions(getHBaseOptions()); + toolOptions.addUniqueOptions(getHCatalogOptions()); + toolOptions.addUniqueOptions(getHCatImportOnlyOptions()); + toolOptions.addUniqueOptions(getAccumuloOptions()); + + // get common codegen opts. + RelatedOptions codeGenOpts = getCodeGenOpts(false); + + // add import-specific codegen opts: + codeGenOpts.addOption(OptionBuilder.withArgName("file") + .hasArg() + .withDescription("Disable code generation; use specified jar") + .withLongOpt(JAR_FILE_NAME_ARG) + .create()); + + toolOptions.addUniqueOptions(codeGenOpts); + } + + @Override + public void printHelp(ToolOptions toolOptions) { + System.out.println("usage: sqoop " + getToolName() + + " [GENERIC-ARGS] [TOOL-ARGS]\n"); + + toolOptions.printHelp(); + + System.out.println("\nGeneric Hadoop command-line arguments:"); + System.out.println("(must preceed any tool-specific arguments)"); + ToolRunner.printGenericCommandUsage(System.out); + System.out.println( + "\nAt minimum, you must specify --connect and --" + DS_ARG); + } + + @Override + public void applyOptions(CommandLine in, SqoopOptions out) + throws InvalidOptionsException { + super.applyOptions(in, out); + + if (!in.hasOption(CONN_MANAGER_CLASS_NAME)) { + // set default ConnManager + out.setConnManagerClassName("org.apache.sqoop.manager.MainframeManager"); + } + if (in.hasOption(DS_ARG)) { + out.setMainframeInputDatasetName(in.getOptionValue(DS_ARG)); + } + } + + @Override + protected void validateImportOptions(SqoopOptions options) + throws InvalidOptionsException { + if (options.getMainframeInputDatasetName() == null) { + throw new InvalidOptionsException( + "--" + DS_ARG + " is required for mainframe import. " + HELP_STR); + } + super.validateImportOptions(options); + } +} diff --git a/src/java/org/apache/sqoop/tool/SqoopTool.java b/src/java/org/apache/sqoop/tool/SqoopTool.java index dbe429ac..5b8453d1 100644 --- a/src/java/org/apache/sqoop/tool/SqoopTool.java +++ b/src/java/org/apache/sqoop/tool/SqoopTool.java @@ -84,6 +84,8 @@ public abstract class SqoopTool { "Import a table from a database to HDFS"); registerTool("import-all-tables", ImportAllTablesTool.class, "Import tables from a database to HDFS"); + registerTool("import-mainframe", MainframeImportTool.class, + "Import datasets from a mainframe server to HDFS"); registerTool("help", HelpTool.class, "List available commands"); registerTool("list-databases", ListDatabasesTool.class, "List available databases on a server"); diff --git a/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java b/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java new file mode 100644 index 00000000..eae7a633 --- /dev/null +++ b/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.util; + +import java.io.IOException; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.net.PrintCommandListener; +import org.apache.commons.net.ftp.FTP; +import org.apache.commons.net.ftp.FTPClient; +import org.apache.commons.net.ftp.FTPClientConfig; +import org.apache.commons.net.ftp.FTPConnectionClosedException; +import org.apache.commons.net.ftp.FTPFile; +import org.apache.commons.net.ftp.FTPReply; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobConf; + +import org.apache.sqoop.mapreduce.JobBase; +import org.apache.sqoop.mapreduce.db.DBConfiguration; + +/** + * Utility methods used when accessing a mainframe server through FTP client. + */ +public final class MainframeFTPClientUtils { + private static final Log LOG = LogFactory.getLog( + MainframeFTPClientUtils.class.getName()); + + private static FTPClient mockFTPClient = null; // Used for unit testing + + private MainframeFTPClientUtils() { + } + + public static List listSequentialDatasets( + String pdsName, Configuration conf) throws IOException { + List datasets = new ArrayList(); + FTPClient ftp = null; + try { + ftp = getFTPConnection(conf); + if (ftp != null) { + ftp.changeWorkingDirectory("'" + pdsName + "'"); + FTPFile[] ftpFiles = ftp.listFiles(); + for (FTPFile f : ftpFiles) { + if (f.getType() == FTPFile.FILE_TYPE) { + datasets.add(f.getName()); + } + } + } + } catch(IOException ioe) { + throw new IOException ("Could not list datasets from " + pdsName + ":" + + ioe.toString()); + } finally { + if (ftp != null) { + closeFTPConnection(ftp); + } + } + return datasets; + } + + public static FTPClient getFTPConnection(Configuration conf) + throws IOException { + FTPClient ftp = null; + try { + String username = conf.get(DBConfiguration.USERNAME_PROPERTY); + String password; + if (username == null) { + username = "anonymous"; + password = ""; + } + else { + password = DBConfiguration.getPassword((JobConf) conf); + } + + String connectString = conf.get(DBConfiguration.URL_PROPERTY); + String server = connectString; + int port = 0; + String[] parts = connectString.split(":"); + if (parts.length == 2) { + server = parts[0]; + try { + port = Integer.parseInt(parts[1]); + } catch(NumberFormatException e) { + LOG.warn("Invalid port number: " + e.toString()); + } + } + + if (null != mockFTPClient) { + ftp = mockFTPClient; + } else { + ftp = new FTPClient(); + } + + FTPClientConfig config = new FTPClientConfig(FTPClientConfig.SYST_MVS); + ftp.configure(config); + + if (conf.getBoolean(JobBase.PROPERTY_VERBOSE, false)) { + ftp.addProtocolCommandListener(new PrintCommandListener( + new PrintWriter(System.out), true)); + } + try { + if (port > 0) { + ftp.connect(server, port); + } else { + ftp.connect(server); + } + } catch(IOException ioexp) { + throw new IOException("Could not connect to server " + server, ioexp); + } + + int reply = ftp.getReplyCode(); + if (!FTPReply.isPositiveCompletion(reply)) { + throw new IOException("FTP server " + server + + " refused connection:" + ftp.getReplyString()); + } + LOG.info("Connected to " + server + " on " + + (port>0 ? port : ftp.getDefaultPort())); + if (!ftp.login(username, password)) { + ftp.logout(); + throw new IOException("Could not login to server " + server + + ":" + ftp.getReplyString()); + } + // set ASCII transfer mode + ftp.setFileType(FTP.ASCII_FILE_TYPE); + // Use passive mode as default. + ftp.enterLocalPassiveMode(); + } catch(IOException ioe) { + if (ftp != null && ftp.isConnected()) { + try { + ftp.disconnect(); + } catch(IOException f) { + // do nothing + } + } + ftp = null; + throw ioe; + } + return ftp; + } + + public static boolean closeFTPConnection(FTPClient ftp) { + boolean success = true; + try { + ftp.noop(); // check that control connection is working OK + ftp.logout(); + } catch(FTPConnectionClosedException e) { + success = false; + LOG.warn("Server closed connection: " + e.toString()); + } catch(IOException e) { + success = false; + LOG.warn("Server closed connection: " + e.toString()); + } finally { + if (ftp.isConnected()) { + try { + ftp.disconnect(); + } catch(IOException f) { + success = false; + } + } + } + return success; + } + + // Used for testing only + public static void setMockFTPClient(FTPClient FTPClient) { + mockFTPClient = FTPClient; + } + +} diff --git a/src/test/org/apache/sqoop/manager/TestMainframeManager.java b/src/test/org/apache/sqoop/manager/TestMainframeManager.java new file mode 100644 index 00000000..79cbcb14 --- /dev/null +++ b/src/test/org/apache/sqoop/manager/TestMainframeManager.java @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.manager; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Types; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.StringUtils; +import org.apache.sqoop.accumulo.AccumuloUtil; +import org.apache.sqoop.hbase.HBaseUtil; +import org.apache.sqoop.tool.MainframeImportTool; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.cloudera.sqoop.ConnFactory; +import com.cloudera.sqoop.SqoopOptions; +import com.cloudera.sqoop.manager.ConnManager; +import com.cloudera.sqoop.manager.ImportJobContext; +import com.cloudera.sqoop.metastore.JobData; +import com.cloudera.sqoop.testutil.BaseSqoopTestCase; +import com.cloudera.sqoop.util.ImportException; + +/** + * Test methods of the generic SqlManager implementation. + */ +public class TestMainframeManager extends BaseSqoopTestCase { + + private static final Log LOG = LogFactory.getLog(TestMainframeManager.class + .getName()); + + private ConnManager manager; + + private SqoopOptions opts; + + private ImportJobContext context; + + @Before + public void setUp() { + Configuration conf = getConf(); + opts = getSqoopOptions(conf); + opts.setConnectString("dummy.server"); + opts.setTableName("dummy.pds"); + opts.setConnManagerClassName("org.apache.sqoop.manager.MainframeManager"); + context = new ImportJobContext(getTableName(), null, opts, null); + ConnFactory f = new ConnFactory(conf); + try { + this.manager = f.getManager(new JobData(opts, new MainframeImportTool())); + } catch (IOException ioe) { + fail("IOException instantiating manager: " + + StringUtils.stringifyException(ioe)); + } + } + + @After + public void tearDown() { + try { + manager.close(); + } catch (SQLException sqlE) { + LOG.error("Got SQLException: " + sqlE.toString()); + fail("Got SQLException: " + sqlE.toString()); + } + } + + @Test + public void testListColNames() { + String[] colNames = manager.getColumnNames(getTableName()); + assertNotNull("manager should return a column list", colNames); + assertEquals("Column list should be length 1", 1, colNames.length); + assertEquals(MainframeManager.DEFAULT_DATASET_COLUMN_NAME, colNames[0]); + } + + @Test + public void testListColTypes() { + Map types = manager.getColumnTypes(getTableName()); + assertNotNull("manager should return a column types map", types); + assertEquals("Column types map should be size 1", 1, types.size()); + assertEquals(types.get(MainframeManager.DEFAULT_DATASET_COLUMN_NAME) + .intValue(), Types.VARCHAR); + } + + @Test + public void testImportTableNoHBaseJarPresent() { + HBaseUtil.setAlwaysNoHBaseJarMode(true); + opts.setHBaseTable("dummy_table"); + try { + manager.importTable(context); + fail("An ImportException should be thrown: " + + "HBase jars are not present in classpath, cannot import to HBase!"); + } catch (ImportException e) { + assertEquals(e.toString(), + "HBase jars are not present in classpath, cannot import to HBase!"); + } catch (IOException e) { + fail("No IOException should be thrown!"); + } finally { + opts.setHBaseTable(null); + } + } + + @Test + public void testImportTableNoAccumuloJarPresent() { + AccumuloUtil.setAlwaysNoAccumuloJarMode(true); + opts.setAccumuloTable("dummy_table"); + try { + manager.importTable(context); + fail("An ImportException should be thrown: " + + "Accumulo jars are not present in classpath, cannot import to " + + "Accumulo!"); + } catch (ImportException e) { + assertEquals(e.toString(), + "Accumulo jars are not present in classpath, cannot import to " + + "Accumulo!"); + } catch (IOException e) { + fail("No IOException should be thrown!"); + } finally { + opts.setAccumuloTable(null); + } + } + + @Test + public void testListTables() { + String[] tables = manager.listTables(); + assertNull("manager should not return a list of tables", tables); + } + + @Test + public void testListDatabases() { + String[] databases = manager.listDatabases(); + assertNull("manager should not return a list of databases", databases); + } + + @Test + public void testGetPrimaryKey() { + String primaryKey = manager.getPrimaryKey(getTableName()); + assertNull("manager should not return a primary key", primaryKey); + } + + @Test + public void testReadTable() { + String[] colNames = manager.getColumnNames(getTableName()); + try { + ResultSet table = manager.readTable(getTableName(), colNames); + assertNull("manager should not read a table", table); + } catch (SQLException sqlE) { + fail("Got SQLException: " + sqlE.toString()); + } + } + + @Test + public void testGetConnection() { + try { + Connection con = manager.getConnection(); + assertNull("manager should not return a connection", con); + } catch (SQLException sqlE) { + fail("Got SQLException: " + sqlE.toString()); + } + } + + @Test + public void testGetDriverClass() { + String driverClass = manager.getDriverClass(); + assertNotNull("manager should return a driver class", driverClass); + assertEquals("manager should return an empty driver class", "", + driverClass); + } +} diff --git a/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeDatasetFTPRecordReader.java b/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeDatasetFTPRecordReader.java new file mode 100644 index 00000000..613ee7ad --- /dev/null +++ b/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeDatasetFTPRecordReader.java @@ -0,0 +1,292 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce.mainframe; + +import java.io.ByteArrayInputStream; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.fail; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.commons.net.ftp.FTPClient; +import org.apache.commons.net.ftp.FTPFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.sqoop.lib.SqoopRecord; +import org.apache.sqoop.mapreduce.DBWritable; +import org.apache.sqoop.mapreduce.db.DBConfiguration; +import org.apache.sqoop.util.MainframeFTPClientUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.cloudera.sqoop.lib.DelimiterSet; +import com.cloudera.sqoop.lib.LargeObjectLoader; + +public class TestMainframeDatasetFTPRecordReader { + + private MainframeImportJob mfImportJob; + + private MainframeImportJob avroImportJob; + + private MainframeDatasetInputSplit mfDIS; + + private TaskAttemptContext context; + + private MainframeDatasetRecordReader mfDRR; + + private MainframeDatasetFTPRecordReader mfDFTPRR; + + private FTPClient mockFTPClient; + + public static class DummySqoopRecord extends SqoopRecord { + private String field; + + public Map getFieldMap() { + Map map = new HashMap(); + map.put("fieldName", field); + return map; + } + + public void setField(String fieldName, Object fieldVal) { + if (fieldVal instanceof String) { + field = (String) fieldVal; + } + } + + public void setField(final String val) { + this.field = val; + } + + @Override + public void readFields(DataInput in) throws IOException { + field = in.readUTF(); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeUTF(field); + } + + @Override + public void readFields(ResultSet rs) throws SQLException { + field = rs.getString(1); + } + + @Override + public void write(PreparedStatement s) throws SQLException { + s.setString(1, field); + } + + @Override + public String toString() { + return field; + } + + @Override + public int write(PreparedStatement stmt, int offset) throws SQLException { + return 0; + } + + @Override + public String toString(DelimiterSet delimiters) { + return null; + } + + @Override + public int getClassFormatVersion() { + return 0; + } + + @Override + public int hashCode() { + return Integer.parseInt(field); + } + + public void loadLargeObjects(LargeObjectLoader loader) { + } + + public void parse(CharSequence s) { + } + + public void parse(Text s) { + } + + public void parse(byte[] s) { + } + + public void parse(char[] s) { + } + + public void parse(ByteBuffer s) { + } + + public void parse(CharBuffer s) { + } + + } + + @Before + public void setUp() throws IOException { + mockFTPClient = mock(FTPClient.class); + MainframeFTPClientUtils.setMockFTPClient(mockFTPClient); + try { + when(mockFTPClient.login("user", "pssword")).thenReturn(true); + when(mockFTPClient.logout()).thenReturn(true); + when(mockFTPClient.isConnected()).thenReturn(true); + when(mockFTPClient.completePendingCommand()).thenReturn(true); + when(mockFTPClient.changeWorkingDirectory(anyString())).thenReturn(true); + when(mockFTPClient.getReplyCode()).thenReturn(200); + when(mockFTPClient.noop()).thenReturn(200); + when(mockFTPClient.setFileType(anyInt())).thenReturn(true); + + FTPFile ftpFile1 = new FTPFile(); + ftpFile1.setType(FTPFile.FILE_TYPE); + ftpFile1.setName("test1"); + FTPFile ftpFile2 = new FTPFile(); + ftpFile2.setType(FTPFile.FILE_TYPE); + ftpFile2.setName("test2"); + FTPFile[] ftpFiles = { ftpFile1, ftpFile2 }; + when(mockFTPClient.listFiles()).thenReturn(ftpFiles); + + when(mockFTPClient.retrieveFileStream("test1")).thenReturn( + new ByteArrayInputStream("123\n456\n".getBytes())); + when(mockFTPClient.retrieveFileStream("test2")).thenReturn( + new ByteArrayInputStream("789\n".getBytes())); + when(mockFTPClient.retrieveFileStream("NotComplete")).thenReturn( + new ByteArrayInputStream("NotComplete\n".getBytes())); + } catch (IOException e) { + fail("No IOException should be thrown!"); + } + + JobConf conf = new JobConf(); + conf.set(DBConfiguration.URL_PROPERTY, "localhost:" + "11111"); + conf.set(DBConfiguration.USERNAME_PROPERTY, "user"); + conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword"); + // set the password in the secure credentials object + Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY); + conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY, + "pssword".getBytes()); + conf.setClass(DBConfiguration.INPUT_CLASS_PROPERTY, DummySqoopRecord.class, + DBWritable.class); + + Job job = Job.getInstance(conf); + mfDIS = new MainframeDatasetInputSplit(); + mfDIS.addDataset("test1"); + mfDIS.addDataset("test2"); + context = mock(TaskAttemptContext.class); + when(context.getConfiguration()).thenReturn(job.getConfiguration()); + mfDFTPRR = new MainframeDatasetFTPRecordReader(); + } + + @After + public void tearDown() { + try { + mfDFTPRR.close(); + } catch (IOException ioe) { + fail("Got IOException: " + ioe.toString()); + } + MainframeFTPClientUtils.setMockFTPClient(null); + } + + @Test + public void testReadAllData() { + try { + mfDFTPRR.initialize(mfDIS, context); + Assert.assertTrue("Retrieve of dataset", mfDFTPRR.nextKeyValue()); + Assert.assertEquals("Key should increase by records", 1, mfDFTPRR + .getCurrentKey().get()); + Assert.assertEquals("Read value by line and by dataset", "123", mfDFTPRR + .getCurrentValue().toString()); + Assert.assertEquals("Get progress according to left dataset", + mfDFTPRR.getProgress(), (float) 0.5, 0.02); + Assert.assertTrue("Retrieve of dataset", mfDFTPRR.nextKeyValue()); + Assert.assertEquals("Key should increase by records", 2, mfDFTPRR + .getCurrentKey().get()); + Assert.assertEquals("Read value by line and by dataset", "456", mfDFTPRR + .getCurrentValue().toString()); + Assert.assertEquals("Get progress according to left dataset", + mfDFTPRR.getProgress(), (float) 0.5, 0.02); + Assert.assertTrue("Retrieve of dataset", mfDFTPRR.nextKeyValue()); + Assert.assertEquals("Key should increase by records", 3, mfDFTPRR + .getCurrentKey().get()); + Assert.assertEquals("Read value by line and by dataset", "789", mfDFTPRR + .getCurrentValue().toString()); + Assert.assertEquals("Get progress according to left dataset", + mfDFTPRR.getProgress(), (float) 1, 0.02); + Assert.assertFalse("End of dataset", mfDFTPRR.nextKeyValue()); + } catch (IOException ioe) { + fail("Got IOException: " + ioe.toString()); + } catch (InterruptedException ie) { + fail("Got InterruptedException: " + ie.toString()); + } + } + + @Test + public void testReadPartOfData() { + try { + mfDFTPRR.initialize(mfDIS, context); + Assert.assertTrue("Retrieve of dataset", mfDFTPRR.nextKeyValue()); + Assert.assertEquals("Key should increase by records", 1, mfDFTPRR + .getCurrentKey().get()); + Assert.assertEquals("Read value by line and by dataset", "123", mfDFTPRR + .getCurrentValue().toString()); + Assert.assertEquals("Get progress according to left dataset", + mfDFTPRR.getProgress(), (float) 0.5, 0.02); + } catch (IOException ioe) { + fail("Got IOException: " + ioe.toString()); + } catch (InterruptedException ie) { + fail("Got InterruptedException: " + ie.toString()); + } + } + + @Test + public void testFTPNotComplete() { + try { + mfDIS = new MainframeDatasetInputSplit(); + mfDIS.addDataset("NotComplete"); + mfDFTPRR.initialize(mfDIS, context); + Assert.assertTrue("Retrieve of dataset", mfDFTPRR.nextKeyValue()); + when(mockFTPClient.completePendingCommand()).thenReturn(false); + mfDFTPRR.nextKeyValue(); + } catch (IOException ioe) { + Assert.assertEquals( + "java.io.IOException: IOException during data transfer: " + + "java.io.IOException: Failed to complete ftp command.", + ioe.toString()); + } catch (InterruptedException ie) { + fail("Got InterruptedException: " + ie.toString()); + } + } +} diff --git a/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeDatasetInputFormat.java b/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeDatasetInputFormat.java new file mode 100644 index 00000000..70958e09 --- /dev/null +++ b/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeDatasetInputFormat.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce.mainframe; + +import static org.junit.Assert.fail; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.net.ftp.FTPClient; +import org.apache.commons.net.ftp.FTPFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.sqoop.lib.SqoopRecord; +import org.apache.sqoop.mapreduce.db.DBConfiguration; +import org.apache.sqoop.mapreduce.mainframe.MainframeConfiguration; +import org.apache.sqoop.util.MainframeFTPClientUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestMainframeDatasetInputFormat { + + private MainframeDatasetInputFormat format; + + private FTPClient mockFTPClient; + + @Before + public void setUp() { + format = new MainframeDatasetInputFormat(); + mockFTPClient = mock(FTPClient.class); + MainframeFTPClientUtils.setMockFTPClient(mockFTPClient); + try { + when(mockFTPClient.login("user", "pssword")).thenReturn(true); + when(mockFTPClient.logout()).thenReturn(true); + when(mockFTPClient.isConnected()).thenReturn(true); + when(mockFTPClient.completePendingCommand()).thenReturn(true); + when(mockFTPClient.changeWorkingDirectory(anyString())).thenReturn(true); + when(mockFTPClient.getReplyCode()).thenReturn(200); + when(mockFTPClient.getReplyString()).thenReturn(""); + when(mockFTPClient.noop()).thenReturn(200); + when(mockFTPClient.setFileType(anyInt())).thenReturn(true); + + FTPFile ftpFile1 = new FTPFile(); + ftpFile1.setType(FTPFile.FILE_TYPE); + ftpFile1.setName("test1"); + FTPFile ftpFile2 = new FTPFile(); + ftpFile2.setType(FTPFile.FILE_TYPE); + ftpFile2.setName("test2"); + FTPFile[] ftpFiles = { ftpFile1, ftpFile2 }; + when(mockFTPClient.listFiles()).thenReturn(ftpFiles); + } catch (IOException e) { + fail("No IOException should be thrown!"); + } + } + + @After + public void tearDown() { + MainframeFTPClientUtils.setMockFTPClient(null); + } + + @Test + public void testRetrieveDatasets() throws IOException { + JobConf conf = new JobConf(); + conf.set(DBConfiguration.URL_PROPERTY, "localhost:12345"); + conf.set(DBConfiguration.USERNAME_PROPERTY, "user"); + conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword"); + // set the password in the secure credentials object + Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY); + conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY, + "pssword".getBytes()); + + String dsName = "dsName1"; + conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME, dsName); + Job job = Job.getInstance(conf); + format.getSplits(job); + + List splits = new ArrayList(); + splits = ((MainframeDatasetInputFormat) format).getSplits(job); + Assert.assertEquals("test1", ((MainframeDatasetInputSplit) splits.get(0)) + .getNextDataset().toString()); + Assert.assertEquals("test2", ((MainframeDatasetInputSplit) splits.get(1)) + .getNextDataset().toString()); + } +} diff --git a/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeDatasetInputSplit.java b/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeDatasetInputSplit.java new file mode 100644 index 00000000..5d92f6d5 --- /dev/null +++ b/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeDatasetInputSplit.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce.mainframe; + +import java.io.IOException; + +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.DataOutputBuffer; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestMainframeDatasetInputSplit { + + private MainframeDatasetInputSplit mfDatasetInputSplit; + + @Before + public void setUp() { + mfDatasetInputSplit = new MainframeDatasetInputSplit(); + } + + @Test + public void testGetCurrentDataset() { + String currentDataset = mfDatasetInputSplit.getCurrentDataset(); + Assert.assertNull(currentDataset); + } + + @Test + public void testGetNextDatasetWithNull() { + String currentDataset = mfDatasetInputSplit.getNextDataset(); + Assert.assertNull(currentDataset); + } + + @Test + public void testGetNextDataset() { + String mainframeDataset = "test"; + mfDatasetInputSplit.addDataset(mainframeDataset); + String currentDataset = mfDatasetInputSplit.getNextDataset(); + Assert.assertEquals("test", currentDataset); + } + + @Test + public void testHasMoreWithFalse() { + boolean retVal = mfDatasetInputSplit.hasMore(); + Assert.assertFalse(retVal); + } + + @Test + public void testHasMoreWithTrue() { + String mainframeDataset = "test"; + mfDatasetInputSplit.addDataset(mainframeDataset); + boolean retVal = mfDatasetInputSplit.hasMore(); + Assert.assertTrue(retVal); + } + + @Test + public void testGetLength() { + String mainframeDataset = "test"; + mfDatasetInputSplit.addDataset(mainframeDataset); + try { + long retVal = mfDatasetInputSplit.getLength(); + Assert.assertEquals(1, retVal); + } catch (IOException ioe) { + Assert.fail("No IOException should be thrown!"); + } catch (InterruptedException ie) { + Assert.fail("No InterruptedException should be thrown!"); + } + } + + @Test + public void testGetLocations() { + try { + String[] retVal = mfDatasetInputSplit.getLocations(); + Assert.assertNotNull(retVal); + } catch (IOException ioe) { + Assert.fail("No IOException should be thrown!"); + } catch (InterruptedException ie) { + Assert.fail("No InterruptedException should be thrown!"); + } + } + + @Test + public void testWriteRead() { + mfDatasetInputSplit.addDataset("dataSet1"); + mfDatasetInputSplit.addDataset("dataSet2"); + DataOutputBuffer dob = new DataOutputBuffer(); + DataInputBuffer dib = new DataInputBuffer(); + MainframeDatasetInputSplit mfReader = new MainframeDatasetInputSplit(); + try { + mfDatasetInputSplit.write(dob); + dib.reset(dob.getData(), dob.getLength()); + mfReader.readFields(dib); + Assert.assertNotNull("MFReader get data from tester", mfReader); + Assert.assertEquals(2, mfReader.getLength()); + Assert.assertEquals("dataSet1", mfReader.getNextDataset()); + Assert.assertEquals("dataSet2", mfReader.getNextDataset()); + } catch (IOException ioe) { + Assert.fail("No IOException should be thrown!"); + } catch (InterruptedException ie) { + Assert.fail("No InterruptedException should be thrown!"); + } + } +} diff --git a/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeImportJob.java b/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeImportJob.java new file mode 100644 index 00000000..ecaa8d50 --- /dev/null +++ b/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeImportJob.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce.mainframe; + +import static org.junit.Assert.assertEquals; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Mapper; +import org.junit.Before; +import org.junit.Test; + +import com.cloudera.sqoop.SqoopOptions; +import com.cloudera.sqoop.manager.ImportJobContext; + +public class TestMainframeImportJob { + + private MainframeImportJob mfImportJob; + + private MainframeImportJob avroImportJob; + + private SqoopOptions options; + + @Before + public void setUp() { + options = new SqoopOptions(); + } + + @Test + public void testGetMainframeDatasetImportMapperClass() + throws SecurityException, NoSuchMethodException, + IllegalArgumentException, IllegalAccessException, + InvocationTargetException { + String jarFile = "dummyJarFile"; + String tableName = "dummyTableName"; + Path path = new Path("dummyPath"); + ImportJobContext context = new ImportJobContext(tableName, jarFile, + options, path); + mfImportJob = new MainframeImportJob(options, context); + + // To access protected method by means of reflection + Class[] types = {}; + Method m_getMapperClass = MainframeImportJob.class.getDeclaredMethod( + "getMapperClass", types); + m_getMapperClass.setAccessible(true); + Class mapper = (Class) m_getMapperClass + .invoke(mfImportJob); + assertEquals(mapper, + org.apache.sqoop.mapreduce.mainframe.MainframeDatasetImportMapper.class); + } + + @Test + public void testSuperMapperClass() throws SecurityException, + NoSuchMethodException, IllegalArgumentException, IllegalAccessException, + InvocationTargetException { + String jarFile = "dummyJarFile"; + String tableName = "dummyTableName"; + Path path = new Path("dummyPath"); + options.setFileLayout(SqoopOptions.FileLayout.AvroDataFile); + ImportJobContext context = new ImportJobContext(tableName, jarFile, + options, path); + avroImportJob = new MainframeImportJob(options, context); + + // To access protected method by means of reflection + Class[] types = {}; + Method m_getMapperClass = MainframeImportJob.class.getDeclaredMethod( + "getMapperClass", types); + m_getMapperClass.setAccessible(true); + Class mapper = (Class) m_getMapperClass + .invoke(avroImportJob); + assertEquals(mapper, org.apache.sqoop.mapreduce.AvroImportMapper.class); + } +} diff --git a/src/test/org/apache/sqoop/tool/TestMainframeImportTool.java b/src/test/org/apache/sqoop/tool/TestMainframeImportTool.java new file mode 100644 index 00000000..936afd30 --- /dev/null +++ b/src/test/org/apache/sqoop/tool/TestMainframeImportTool.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.tool; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +import org.apache.commons.cli.ParseException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.sqoop.cli.RelatedOptions; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.cloudera.sqoop.SqoopOptions; +import com.cloudera.sqoop.SqoopOptions.InvalidOptionsException; +import com.cloudera.sqoop.cli.ToolOptions; +import com.cloudera.sqoop.testutil.BaseSqoopTestCase; + +public class TestMainframeImportTool extends BaseSqoopTestCase { + + private static final Log LOG = LogFactory.getLog(TestMainframeImportTool.class + .getName()); + + private MainframeImportTool mfImportTool; + + private final ByteArrayOutputStream outContent = new ByteArrayOutputStream(); + + @Before + public void setUp() { + + mfImportTool = new MainframeImportTool(); + System.setOut(new PrintStream(outContent)); + } + + @After + public void tearDown() { + System.setOut(null); + } + + @Test + public void testPrintHelp() { + ToolOptions toolOptions = new ToolOptions(); + String separator = System.getProperty("line.separator"); + mfImportTool.printHelp(toolOptions); + String outputMsg = "usage: sqoop " + + mfImportTool.getToolName() + + " [GENERIC-ARGS] [TOOL-ARGS]" + + separator + + "" + + separator + + "" + + separator + + "Generic Hadoop command-line arguments:" + + separator + + "(must preceed any tool-specific arguments)" + + separator + + "Generic options supported are" + + separator + + "-conf specify an application configuration file" + + separator + + "-D use value for given property" + + separator + + "-fs specify a namenode" + + separator + + "-jt specify a job tracker" + + separator + + "-files " + + "specify comma separated files to be copied to the map reduce cluster" + + separator + + "-libjars " + + "specify comma separated jar files to include in the classpath." + + separator + + "-archives " + + "specify comma separated archives to be unarchived on the compute machines.\n" + + separator + "The general command line syntax is" + separator + + "bin/hadoop command [genericOptions] [commandOptions]\n" + separator + + "" + separator + "At minimum, you must specify --connect and --" + + MainframeImportTool.DS_ARG + separator; + assertEquals(outputMsg, outContent.toString()); + } + + @SuppressWarnings("deprecation") + @Test + public void testGetImportOptions() throws SecurityException, + NoSuchMethodException, IllegalArgumentException, IllegalAccessException, + InvocationTargetException { + // To access protected method by means of reflection + Class[] types = {}; + Object[] params = {}; + Method m_getImportOptions = MainframeImportTool.class.getDeclaredMethod( + "getImportOptions", types); + m_getImportOptions.setAccessible(true); + RelatedOptions rOptions = (RelatedOptions) m_getImportOptions.invoke( + mfImportTool, params); + assertNotNull("It should return a RelatedOptions", rOptions); + assertTrue(rOptions.hasOption(MainframeImportTool.DS_ARG)); + assertTrue(rOptions.hasOption(MainframeImportTool.DELETE_ARG)); + assertTrue(rOptions.hasOption(MainframeImportTool.TARGET_DIR_ARG)); + assertTrue(rOptions.hasOption(MainframeImportTool.WAREHOUSE_DIR_ARG)); + assertTrue(rOptions.hasOption(MainframeImportTool.FMT_TEXTFILE_ARG)); + assertTrue(rOptions.hasOption(MainframeImportTool.NUM_MAPPERS_ARG)); + assertTrue(rOptions.hasOption(MainframeImportTool.MAPREDUCE_JOB_NAME)); + assertTrue(rOptions.hasOption(MainframeImportTool.COMPRESS_ARG)); + assertTrue(rOptions.hasOption(MainframeImportTool.COMPRESSION_CODEC_ARG)); + } + + @Test + public void testApplyOptions() + throws InvalidOptionsException, ParseException { + String[] args = { "--" + MainframeImportTool.DS_ARG, "dummy_ds" }; + ToolOptions toolOptions = new ToolOptions(); + SqoopOptions sqoopOption = new SqoopOptions(); + mfImportTool.configureOptions(toolOptions); + sqoopOption = mfImportTool.parseArguments(args, null, sqoopOption, false); + assertEquals(sqoopOption.getConnManagerClassName(), + "org.apache.sqoop.manager.MainframeManager"); + assertEquals(sqoopOption.getTableName(), "dummy_ds"); + } + + @Test + public void testNotApplyOptions() throws ParseException, + InvalidOptionsException { + String[] args = new String[] { "--connection-manager=dummy_ClassName" }; + ToolOptions toolOptions = new ToolOptions(); + SqoopOptions sqoopOption = new SqoopOptions(); + mfImportTool.configureOptions(toolOptions); + sqoopOption = mfImportTool.parseArguments(args, null, sqoopOption, false); + assertEquals(sqoopOption.getConnManagerClassName(), "dummy_ClassName"); + assertNull(sqoopOption.getTableName()); + } +} diff --git a/src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java b/src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java new file mode 100644 index 00000000..6b895021 --- /dev/null +++ b/src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.util; + +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; + +import org.apache.commons.net.ftp.FTPClient; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.sqoop.mapreduce.JobBase; +import org.apache.sqoop.mapreduce.db.DBConfiguration; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestMainframeFTPClientUtils { + + private JobConf conf; + + private FTPClient mockFTPClient; + + @Before + public void setUp() { + conf = new JobConf(); + mockFTPClient = mock(FTPClient.class); + when(mockFTPClient.getReplyString()).thenReturn(""); + MainframeFTPClientUtils.setMockFTPClient(mockFTPClient); + } + + @After + public void tearDown() { + MainframeFTPClientUtils.setMockFTPClient(null); + } + + @Test + public void testAnonymous_VERBOSE_IllegelPort() { + try { + when(mockFTPClient.login("anonymous", "")).thenReturn(true); + when(mockFTPClient.logout()).thenReturn(true); + when(mockFTPClient.isConnected()).thenReturn(false); + when(mockFTPClient.getReplyCode()).thenReturn(200); + } catch (IOException e) { + fail("No IOException should be thrown!"); + } + + conf.set(DBConfiguration.URL_PROPERTY, "localhost:testPort"); + conf.setBoolean(JobBase.PROPERTY_VERBOSE, true); + + FTPClient ftp = null; + boolean success = false; + try { + ftp = MainframeFTPClientUtils.getFTPConnection(conf); + } catch (IOException ioe) { + fail("No IOException should be thrown!"); + } finally { + success = MainframeFTPClientUtils.closeFTPConnection(ftp); + } + Assert.assertTrue(success); + } + + @Test + public void testCannotConnect() { + try { + when(mockFTPClient.login("testUser", "")).thenReturn(false); + } catch (IOException ioe) { + fail("No IOException should be thrown!"); + } + + conf.set(DBConfiguration.URL_PROPERTY, "testUser:11111"); + try { + MainframeFTPClientUtils.getFTPConnection(conf); + } catch (IOException ioe) { + Assert.assertEquals( + "java.io.IOException: FTP server testUser refused connection:", + ioe.toString()); + } + } + + @Test + public void testWrongUsername() { + try { + when(mockFTPClient.login("user", "pssword")).thenReturn(true); + when(mockFTPClient.logout()).thenReturn(true); + when(mockFTPClient.isConnected()).thenReturn(false); + when(mockFTPClient.getReplyCode()).thenReturn(200); + } catch (IOException e) { + fail("No IOException should be thrown!"); + } + + FTPClient ftp = null; + conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111"); + conf.set(DBConfiguration.USERNAME_PROPERTY, "userr"); + conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword"); + // set the password in the secure credentials object + Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY); + conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY, + "pssword".getBytes()); + + try { + ftp = MainframeFTPClientUtils.getFTPConnection(conf); + } catch (IOException ioe) { + Assert.assertEquals( + "java.io.IOException: Could not login to server localhost:", + ioe.toString()); + } + Assert.assertNull(ftp); + } + + @Test + public void testNotListDatasets() { + try { + when(mockFTPClient.login("user", "pssword")).thenReturn(true); + when(mockFTPClient.logout()).thenReturn(true); + when(mockFTPClient.isConnected()).thenReturn(false); + when(mockFTPClient.getReplyCode()).thenReturn(200); + } catch (IOException e) { + fail("No IOException should be thrown!"); + } + + conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111"); + conf.set(DBConfiguration.USERNAME_PROPERTY, "userr"); + conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword"); + // set the password in the secure credentials object + Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY); + conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY, + "pssword".getBytes()); + + try { + MainframeFTPClientUtils.listSequentialDatasets("pdsName", conf); + } catch (IOException ioe) { + Assert.assertEquals("java.io.IOException: " + + "Could not list datasets from pdsName:" + + "java.io.IOException: Could not login to server localhost:", + ioe.toString()); + } + } +}