5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-02 23:21:22 +08:00

SQOOP-1272: Support importing mainframe sequential datasets

(Mariappan Asokan via Venkat Ranganathan)
This commit is contained in:
Venkat Ranganathan 2014-09-09 23:26:24 -07:00
parent 666700d33f
commit 268299ee52
41 changed files with 2882 additions and 138 deletions

View File

@ -170,10 +170,14 @@ under the License.
rev="${commons-cli.version}" conf="common->default"/>
<dependency org="commons-logging" name="commons-logging"
rev="${commons-logging.version}" conf="common->default"/>
<dependency org="commons-net" name="commons-net"
rev="${commons-net.version}" conf="common->default"/>
<dependency org="log4j" name="log4j" rev="${log4j.version}"
conf="common->master" />
<dependency org="junit" name="junit" rev="${junit.version}"
conf="test->default"/>
<dependency org="org.mockito" name="mockito-all"
rev="${mockito-all.version}" conf="test->default"/>
<!-- We're only using H2 for tests as it supports stored
procedures; once we move to HSQLDB 2.x we can drop
this -->

View File

@ -29,12 +29,14 @@ commons-collections.version=3.1
commons-io.version=1.4
commons-lang.version=2.4
commons-logging.version=1.0.4
commons-net.version=3.1
hsqldb.version=1.8.0.10
ivy.version=2.3.0
junit.version=4.11
mockito-all.version=1.9.5
h2.version=1.3.170

View File

@ -1,4 +1,3 @@
////
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
@ -7,9 +6,9 @@
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -17,45 +16,16 @@
limitations under the License.
////
Database connection and common options
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
--connect (jdbc-uri)::
Specify JDBC connect string (required)
--connection-manager (class-name)::
Specify connection manager class name (optional)
--driver (class-name)::
Manually specify JDBC driver class to use
--connection-param-file (filename)::
Optional properties file that provides connection parameters
--hadoop-mapred-home (dir)::
Override $HADOOP_MAPRED_HOME
--help::
Print usage instructions
--password-file (file containing the password)::
Set authentication password in a file on the users home
directory with 400 permissions
(Note: This is very secure and a preferred way of entering credentials)
--password (password)::
Set authentication password
(Note: This is very insecure. You should use -P instead.)
-P::
Prompt for user password
--username (username)::
Set authentication username
--verbose::
Print more information while working
--hadoop-home (dir)::
Deprecated. Override $HADOOP_HOME
include::database-independent-args.txt[]

View File

@ -0,0 +1,47 @@
////
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
////
--connection-manager (class-name)::
Specify connection manager class name (optional)
--hadoop-mapred-home (dir)::
Override $HADOOP_MAPRED_HOME
--help::
Print usage instructions
--password-file (file containing the password)::
Set authentication password in a file on the users home
directory with 400 permissions
(Note: This is very secure and a preferred way of entering credentials)
--password (password)::
Set authentication password
(Note: This is very insecure. You should use -P instead.)
-P::
Prompt for user password
--username (username)::
Set authentication username
--verbose::
Print more information while working
--hadoop-home (dir)::
Deprecated. Override $HADOOP_HOME

View File

@ -1,4 +1,3 @@
////
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
@ -7,9 +6,9 @@
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -31,7 +30,8 @@ HBase options
Specifies which input column to use as the row key
If input table contains composite key, value of
(col) must be a comma-separated list of composite
key attributes
key attributes. For mainframe dataset, this should be the
input field name
--hbase-table (table-name)::
Specifies an HBase table to use as the target instead of HDFS

View File

@ -1,4 +1,3 @@
////
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
@ -7,9 +6,9 @@
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -24,7 +23,7 @@ Hive options
Override $HIVE_HOME
--hive-import::
If set, then import the table into Hive
If set, then import the table or mainframe dataset into Hive
--hive-overwrite::
Overwrites existing data in the hive table if it exists.
@ -37,4 +36,4 @@ Hive options
When used with --hive-import, overrides the destination table name
--map-column-hive (mapping)::
Override default mapping for SQL types into Hive types for configured columns
Override default mapping for SQL types or input field types into Hive types for configured columns

View File

@ -1,4 +1,3 @@
////
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
@ -7,9 +6,9 @@
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -17,25 +16,11 @@
limitations under the License.
////
Import control options
~~~~~~~~~~~~~~~~~~~~~~
include::import-common-args.txt[]
--append::
Append data to an existing HDFS dataset
--as-avrodatafile::
Imports data to Avro Data Files
--as-sequencefile::
Imports data to SequenceFiles
--as-textfile::
Imports data as plain text (default)
--as-parquetfile::
Imports data to Parquet Files
--boundary-query (query)::
Using following query to select minimal and maximal value of '--split-by' column for creating splits
@ -51,10 +36,6 @@ Import control options
--inline-lob-limit (n)::
Set the maximum size for an inline LOB
--num-mappers (n)::
-m::
Use 'n' map tasks to import in parallel
--query (statement)::
Imports the results of +statement+ instead of a table
@ -64,24 +45,10 @@ Import control options
--table (table-name)::
The table to import
--target-dir (dir)::
Explicit HDFS target directory for the import.
--warehouse-dir (dir)::
Tables are uploaded to the HDFS path +/warehouse/dir/(tablename)/+
--where (clause)::
Import only the rows for which _clause_ is true.
e.g.: `--where "user_id > 400 AND hidden == 0"`
--compress::
-z::
Uses gzip (or, alternatively, the codec specified by +\--compression-codec+,
if set) to compress data as it is written to HDFS
--compression-codec (codec)::
Uses the Hadoop +codec+ class to compress data as it is written to HDFS.
--null-string::
The string to be written for a null value for string columns

View File

@ -0,0 +1,51 @@
////
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
////
Import control options
~~~~~~~~~~~~~~~~~~~~~~
--as-textfile::
Imports data as plain text (default)
--as-avrodatafile::
Imports data to Avro Data Files
--as-sequencefile::
Imports data to SequenceFiles
--as-parquetfile::
Imports data to Parquet Files
--num-mappers (n)::
-m::
Use 'n' map tasks to import in parallel
--target-dir (dir)::
Explicit HDFS target directory for the import.
--warehouse-dir (dir)::
Tables are uploaded to the HDFS path +/warehouse/dir/(tablename)/+
--compress::
-z::
Uses gzip (or, alternatively, the codec specified by +\--compression-codec+,
if set) to compress data as it is written to HDFS
--compression-codec (codec)::
Uses the Hadoop +codec+ class to compress data as it is written to HDFS.

View File

@ -0,0 +1,25 @@
////
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
////
.Connection arguments
--connect (hostname)::
Specify mainframe host name (required)
--dataset (partitioned dataset name)::
Specify a partitioned dataset name

View File

@ -0,0 +1,66 @@
////
Copyright 2011 The Apache Software Foundation
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
////
sqoop-import-mainframe(1)
=========================
NAME
----
sqoop-import-mainframe - Import mainframe sequential datasets to HDFS
SYNOPSIS
--------
'sqoop-import-mainframe' <generic-options> <tool-options>
'sqoop import-mainframe' <generic-options> <tool-options>
DESCRIPTION
-----------
include::../user/import-mainframe-purpose.txt[]
OPTIONS
-------
The +--connect+ and +--dataset+ options are required.
Mainframe connection and common options
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
include::mainframe-connection-args.txt[]
include::database-independent-args.txt[]
include::import-common-args.txt[]
include::hive-args.txt[]
include::hbase-args.txt[]
include::input-args.txt[]
include::output-args.txt[]
include::codegen-args.txt[]
ENVIRONMENT
-----------
See 'sqoop(1)'

View File

@ -12,7 +12,8 @@ SYNOPSIS
DESCRIPTION
-----------
Sqoop is a tool designed to help users of large data import existing
relational databases into their Hadoop clusters. Sqoop interfaces with
relational databases or mainframe datasets into their Hadoop clusters.
Sqoop interfaces with
databases via JDBC, importing the contents of tables into HDFS while
generating Java classes so users can interpret the table's schema.
Sqoop can also run in reverse, exporting records back from HDFS to a
@ -66,7 +67,7 @@ installed from a tarball.
////
Copyright 2011 The Apache Software Foundation
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
@ -74,9 +75,9 @@ installed from a tarball.
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

View File

@ -1,4 +1,3 @@
////
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
@ -7,9 +6,9 @@
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -30,9 +29,9 @@ Sqoop User Guide (v{revnumber})
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -52,6 +51,8 @@ include::import.txt[]
include::import-all-tables.txt[]
include::import-mainframe.txt[]
include::export.txt[]
include::validation.txt[]

View File

@ -1,4 +1,3 @@
////
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
@ -7,9 +6,9 @@
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -20,10 +19,12 @@
Basic Usage
-----------
With Sqoop, you can _import_ data from a relational database system into
HDFS. The input to the import process is a database table. Sqoop
will read the table row-by-row into HDFS. The output of this import
process is a set of files containing a copy of the imported table.
With Sqoop, you can _import_ data from a relational database system or a
mainframe into HDFS. The input to the import process is either database table
or mainframe datasets. For databases, Sqoop will read the table row-by-row
into HDFS. For mainframe datasets, Sqoop will read records from each mainframe
dataset into HDFS. The output of this import process is a set of files
containing a copy of the imported table or datasets.
The import process is performed in parallel. For this reason, the
output will be in multiple files. These files may be delimited text
files (for example, with commas or tabs separating each field), or
@ -54,9 +55,9 @@ within a schema (with the +sqoop-list-tables+ tool). Sqoop also
includes a primitive SQL execution shell (the +sqoop-eval+ tool).
Most aspects of the import, code generation, and export processes can
be customized. You can control the specific row range or columns imported.
You can specify particular delimiters and escape characters for the
file-based representation of the data, as well as the file format
be customized. For databases, you can control the specific row range or
columns imported. You can specify particular delimiters and escape characters
for the file-based representation of the data, as well as the file format
used. You can also control the class or package names used in
generated code. Subsequent sections of this document explain how to
specify these and other arguments to Sqoop.

View File

@ -0,0 +1,66 @@
////
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
////
Connecting to a Mainframe
^^^^^^^^^^^^^^^^^^^^^^^^^
Sqoop is designed to import mainframe datasets into HDFS. To do
so, you must specify a mainframe host name in the Sqoop +\--connect+ argument.
----
$ sqoop import-mainframe --connect z390
----
This will connect to the mainframe host z390 via ftp.
You might need to authenticate against the mainframe host to
access it. You can use the +\--username+ to supply a username to the mainframe.
Sqoop provides couple of different ways to supply a password,
secure and non-secure, to the mainframe which is detailed below.
.Secure way of supplying password to the mainframe
You should save the password in a file on the users home directory with 400
permissions and specify the path to that file using the *+--password-file+*
argument, and is the preferred method of entering credentials. Sqoop will
then read the password from the file and pass it to the MapReduce cluster
using secure means with out exposing the password in the job configuration.
The file containing the password can either be on the Local FS or HDFS.
Example:
----
$ sqoop import-mainframe --connect z390 \
--username david --password-file ${user.home}/.password
----
Another way of supplying passwords is using the +-P+ argument which will
read a password from a console prompt.
.Non-secure way of passing password
WARNING: The +\--password+ parameter is insecure, as other users may
be able to read your password from the command-line arguments via
the output of programs such as `ps`. The *+-P+* argument is the preferred
method over using the +\--password+ argument. Credentials may still be
transferred between nodes of the MapReduce cluster using insecure means.
Example:
----
$ sqoop import-mainframe --connect z390 --username david --password 12345
----

View File

@ -0,0 +1,30 @@
////
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
////
Controlling Distributed Cache
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Sqoop will copy the jars in $SQOOP_HOME/lib folder to job cache every
time when start a Sqoop job. When launched by Oozie this is unnecessary
since Oozie use its own Sqoop share lib which keeps Sqoop dependencies
in the distributed cache. Oozie will do the localization on each
worker node for the Sqoop dependencies only once during the first Sqoop
job and reuse the jars on worker node for subsquencial jobs. Using
option +--skip-dist-cache+ in Sqoop command when launched by Oozie will
skip the step which Sqoop copies its dependencies to job cache and save
massive I/O.

View File

@ -0,0 +1,23 @@
////
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
////
The +import-mainframe+ tool imports all sequential datasets
in a partitioned dataset(PDS) on a mainframe to HDFS. A PDS is
akin to a directory on the open systems.
The records in a dataset can contain only character data.
Records will be stored with the entire record as a single text field.

View File

@ -0,0 +1,243 @@
////
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
////
+sqoop-import-mainframe+
------------------------
Purpose
~~~~~~~
include::import-mainframe-purpose.txt[]
Syntax
~~~~~~
----
$ sqoop import-mainframe (generic-args) (import-args)
$ sqoop-import-mainframe (generic-args) (import-args)
----
While the Hadoop generic arguments must precede any import arguments,
you can type the import arguments in any order with respect to one
another.
include::mainframe-common-args.txt[]
include::connecting-to-mainframe.txt[]
.Import control arguments:
[grid="all"]
`---------------------------------`--------------------------------------
Argument Description
-------------------------------------------------------------------------
+\--as-avrodatafile+ Imports data to Avro Data Files
+\--as-sequencefile+ Imports data to SequenceFiles
+\--as-textfile+ Imports data as plain text (default)
+\--as-parquetfile+ Imports data to Parquet Files
+\--delete-target-dir+ Delete the import target directory\
if it exists
+-m,\--num-mappers <n>+ Use 'n' map tasks to import in parallel
+\--target-dir <dir>+ HDFS destination dir
+\--warehouse-dir <dir>+ HDFS parent for table destination
+-z,\--compress+ Enable compression
+\--compression-codec <c>+ Use Hadoop codec (default gzip)
-------------------------------------------------------------------------
Selecting the Files to Import
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
You can use the +\--dataset+ argument to specify a partitioned dataset name.
All sequential datasets in the partitioned dataset will be imported.
Controlling Parallelism
^^^^^^^^^^^^^^^^^^^^^^^
Sqoop imports data in parallel by making multiple ftp connections to the
mainframe to transfer multiple files simultaneously. You can specify the
number of map tasks (parallel processes) to use to perform the import by
using the +-m+ or +\--num-mappers+ argument. Each of these arguments
takes an integer value which corresponds to the degree of parallelism
to employ. By default, four tasks are used. You can adjust this value to
maximize the data transfer rate from the mainframe.
include::distributed-cache.txt[]
Controlling the Import Process
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
By default, Sqoop will import all sequential files in a partitioned dataset
+pds+ to a directory named +pds+ inside your home directory in HDFS. For
example, if your username is +someuser+, then the import tool will write to
+/user/someuser/pds/(files)+. You can adjust the parent directory of
the import with the +\--warehouse-dir+ argument. For example:
----
$ sqoop import-mainframe --connnect <host> --dataset foo --warehouse-dir /shared \
...
----
This command would write to a set of files in the +/shared/pds/+ directory.
You can also explicitly choose the target directory, like so:
----
$ sqoop import-mainframe --connnect <host> --dataset foo --target-dir /dest \
...
----
This will import the files into the +/dest+ directory. +\--target-dir+ is
incompatible with +\--warehouse-dir+.
By default, imports go to a new target location. If the destination directory
already exists in HDFS, Sqoop will refuse to import and overwrite that
directory's contents.
File Formats
^^^^^^^^^^^^
By default, each record in a dataset is stored
as a text record with a newline at the end. Each record is assumed to contain
a single text field with the name DEFAULT_COLUMN.
When Sqoop imports data to HDFS, it generates a Java class which can
reinterpret the text files that it creates.
You can also import mainframe records to Sequence, Avro, or Parquet files.
By default, data is not compressed. You can compress your data by
using the deflate (gzip) algorithm with the +-z+ or +\--compress+
argument, or specify any Hadoop compression codec using the
+\--compression-codec+ argument.
include::output-args.txt[]
Since mainframe record contains only one field, importing to delimited files
will not contain any field delimiter. However, the field may be enclosed with
enclosing character or escaped by an escaping character.
include::input-args.txt[]
When Sqoop imports data to HDFS, it generates a Java class which can
reinterpret the text files that it creates when doing a
delimited-format import. The delimiters are chosen with arguments such
as +\--fields-terminated-by+; this controls both how the data is
written to disk, and how the generated +parse()+ method reinterprets
this data. The delimiters used by the +parse()+ method can be chosen
independently of the output arguments, by using
+\--input-fields-terminated-by+, and so on. This is useful, for example, to
generate classes which can parse records created with one set of
delimiters, and emit the records to a different set of files using a
separate set of delimiters.
include::hive-args.txt[]
include::hive.txt[]
include::hbase-args.txt[]
include::hbase.txt[]
include::accumulo-args.txt[]
include::accumulo.txt[]
include::codegen-args.txt[]
As mentioned earlier, a byproduct of importing a table to HDFS is a
class which can manipulate the imported data.
You should use this class in your subsequent
MapReduce processing of the data.
The class is typically named after the partitioned dataset name; a
partitioned dataset named +foo+ will
generate a class named +foo+. You may want to override this class
name. For example, if your partitioned dataset
is named +EMPLOYEES+, you may want to
specify +\--class-name Employee+ instead. Similarly, you can specify
just the package name with +\--package-name+. The following import
generates a class named +com.foocorp.SomePDS+:
----
$ sqoop import-mainframe --connect <host> --dataset SomePDS --package-name com.foocorp
----
The +.java+ source file for your class will be written to the current
working directory when you run +sqoop+. You can control the output
directory with +\--outdir+. For example, +\--outdir src/generated/+.
The import process compiles the source into +.class+ and +.jar+ files;
these are ordinarily stored under +/tmp+. You can select an alternate
target directory with +\--bindir+. For example, +\--bindir /scratch+.
If you already have a compiled class that can be used to perform the
import and want to suppress the code-generation aspect of the import
process, you can use an existing jar and class by
providing the +\--jar-file+ and +\--class-name+ options. For example:
----
$ sqoop import-mainframe --dataset SomePDS --jar-file mydatatypes.jar \
--class-name SomePDSType
----
This command will load the +SomePDSType+ class out of +mydatatypes.jar+.
Additional Import Configuration Properties
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
There are some additional properties which can be configured by modifying
+conf/sqoop-site.xml+. Properties can be specified the same as in Hadoop
configuration files, for example:
----
<property>
<name>property.name</name>
<value>property.value</value>
</property>
----
They can also be specified on the command line in the generic arguments, for
example:
----
sqoop import -D property.name=property.value ...
----
Example Invocations
~~~~~~~~~~~~~~~~~~~
The following examples illustrate how to use the import tool in a variety
of situations.
A basic import of all sequential files in a partitioned dataset named
+EMPLOYEES+ in the mainframe host z390:
----
$ sqoop import-mainframe --connect z390 --dataset EMPLOYEES \
--username SomeUser -P
Enter password: (hidden)
----
Controlling the import parallelism (using 8 parallel tasks):
----
$ sqoop import-mainframe --connect z390 --dataset EMPLOYEES \
--username SomeUser --password-file mypassword -m 8
----
Importing the data to Hive:
----
$ sqoop import-mainframe --connect z390 --dataset EMPLOYEES \
--hive-import
----

View File

@ -6,9 +6,9 @@
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -206,18 +206,7 @@ multi-column indices. If your table has no index column, or has a
multi-column key, then you must also manually choose a splitting
column.
Controlling Distributed Cache
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Sqoop will copy the jars in $SQOOP_HOME/lib folder to job cache every
time when start a Sqoop job. When launched by Oozie this is unnecessary
since Oozie use its own Sqoop share lib which keeps Sqoop dependencies
in the distributed cache. Oozie will do the localization on each
worker node for the Sqoop dependencies only once during the first Sqoop
job and reuse the jars on worker node for subsquencial jobs. Using
option +--skip-dist-cache+ in Sqoop command when launched by Oozie will
skip the step which Sqoop copies its dependencies to job cache and save
massive I/O.
include::distributed-cache.txt[]
Controlling the Import Process
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

View File

@ -1,4 +1,3 @@
////
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
@ -7,9 +6,9 @@
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -22,9 +21,9 @@ Introduction
------------
Sqoop is a tool designed to transfer data between Hadoop and
relational databases. You can use Sqoop to import data from a
relational database management system (RDBMS) such as MySQL or Oracle
into the Hadoop Distributed File System (HDFS),
relational databases or mainframes. You can use Sqoop to import data from a
relational database management system (RDBMS) such as MySQL or Oracle or a
mainframe into the Hadoop Distributed File System (HDFS),
transform the data in Hadoop MapReduce, and then export the data back
into an RDBMS.
@ -34,9 +33,9 @@ to import and export the data, which provides parallel operation as
well as fault tolerance.
This document describes how to get started using Sqoop to move data
between databases and Hadoop and provides reference information for
the operation of the Sqoop command-line tool suite. This document is
intended for:
between databases and Hadoop or mainframe to Hadoop and provides reference
information for the operation of the Sqoop command-line tool suite. This
document is intended for:
- System and application programmers
- System administrators

View File

@ -0,0 +1,37 @@
////
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
////
.Common arguments
[grid="all"]
`----------------------------------------`-------------------------------------
Argument Description
-------------------------------------------------------------------------------
+\--connect <hostname>+ Specify mainframe host to connect
+\--connection-manager <class-name>+ Specify connection manager class to\
use
+\--hadoop-mapred-home <dir>+ Override $HADOOP_MAPRED_HOME
+\--help+ Print usage instructions
+\--password-file+ Set path for a file containing the\
authentication password
+-P+ Read password from console
+\--password <password>+ Set authentication password
+\--username <username>+ Set authentication username
+\--verbose+ Print more information while working
+\--connection-param-file <filename>+ Optional properties file that\
provides connection parameters
-------------------------------------------------------------------------------

View File

@ -1,4 +1,3 @@
////
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
@ -7,9 +6,9 @@
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -58,6 +57,7 @@ Available commands:
help List available commands
import Import a table from a database to HDFS
import-all-tables Import tables from a database to HDFS
import-mainframe Import mainframe datasets to HDFS
list-databases List available databases on a server
list-tables List available tables in a database
version Display version information
@ -69,7 +69,7 @@ You can display help for a specific tool by entering: +sqoop help
(tool-name)+; for example, +sqoop help import+.
You can also add the +\--help+ argument to any command: +sqoop import
\--help+.
\--help+.
Using Command Aliases
~~~~~~~~~~~~~~~~~~~~~
@ -128,16 +128,16 @@ usage: sqoop import [GENERIC-ARGS] [TOOL-ARGS]
Common arguments:
--connect <jdbc-uri> Specify JDBC connect string
--connect-manager <jdbc-uri> Specify connection manager class to use
--connect-manager <class-name> Specify connection manager class to use
--driver <class-name> Manually specify JDBC driver class to use
--hadoop-mapred-home <dir>+ Override $HADOOP_MAPRED_HOME
--hadoop-mapred-home <dir> Override $HADOOP_MAPRED_HOME
--help Print usage instructions
--password-file Set path for file containing authentication password
-P Read password from console
--password <password> Set authentication password
--username <username> Set authentication username
--verbose Print more information while working
--hadoop-home <dir>+ Deprecated. Override $HADOOP_HOME
--hadoop-home <dir> Deprecated. Override $HADOOP_HOME
[...]

View File

@ -1,4 +1,3 @@
////
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
@ -19,14 +18,14 @@
.Validation arguments <<validation,More Details>>
[grid="all"]
`----------------------------------------`-------------------------------------
Argument Description
-------------------------------------------------------------------------------
+\--validate+ Enable validation of data copied, \
supports single table copy only. \
+\--validator <class-name>+ Specify validator class to use.
+\--validation-threshold <class-name>+ Specify validation threshold class \
to use.
`-------------------------------------------`-------------------------------------
Argument Description
----------------------------------------------------------------------------------
+\--validate+ Enable validation of data copied, \
supports single table copy only.
+\--validator <class-name>+ Specify validator class to use.
+\--validation-threshold <class-name>+ Specify validation threshold class \
to use.
+\--validation-failurehandler <class-name>+ Specify validation failure \
handler class to use.
-------------------------------------------------------------------------------
handler class to use.
----------------------------------------------------------------------------------

View File

@ -262,6 +262,10 @@ public String toString() {
// "key" column for the merge operation.
@StoredAsProperty("merge.key.col") private String mergeKeyCol;
// Dataset name for mainframe import tool
@StoredAsProperty("mainframe.input.dataset.name")
private String mainframeInputDatasetName;
// Accumulo home directory
private String accumuloHome; // not serialized to metastore.
// Zookeeper home directory
@ -2164,6 +2168,21 @@ public String getMergeKeyCol() {
return this.mergeKeyCol;
}
/**
* Set the mainframe dataset name.
*/
public void setMainframeInputDatasetName(String name) {
mainframeInputDatasetName = name;
tableName = name;
}
/**
* Return the mainframe dataset name.
*/
public String getMainframeInputDatasetName() {
return mainframeInputDatasetName;
}
public static String getAccumuloHomeDefault() {
// Set this with $ACCUMULO_HOME, but -Daccumulo.home can override.
String accumuloHome = System.getenv("ACCUMULO_HOME");

View File

@ -0,0 +1,167 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.manager;
import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sqoop.accumulo.AccumuloUtil;
import org.apache.sqoop.hbase.HBaseUtil;
import org.apache.sqoop.mapreduce.AccumuloImportJob;
import org.apache.sqoop.mapreduce.HBaseBulkImportJob;
import org.apache.sqoop.mapreduce.HBaseImportJob;
import org.apache.sqoop.mapreduce.ImportJobBase;
import org.apache.sqoop.mapreduce.mainframe.MainframeDatasetInputFormat;
import org.apache.sqoop.mapreduce.mainframe.MainframeImportJob;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.util.ImportException;
/**
* ConnManager implementation for mainframe datasets.
*/
public class MainframeManager extends com.cloudera.sqoop.manager.ConnManager {
public static final String DEFAULT_DATASET_COLUMN_NAME = "DEFAULT_COLUMN";
protected SqoopOptions options;
private static final Log LOG
= LogFactory.getLog(MainframeManager.class.getName());
/**
* Constructs the MainframeManager.
* @param opts the SqoopOptions describing the user's requested action.
*/
public MainframeManager(final SqoopOptions opts) {
this.options = opts;
}
/**
* Launch a MapReduce job via MainframeImportJob to read the
* partitioned dataset with MainframeDatasetInputFormat.
*/
@Override
public void importTable(com.cloudera.sqoop.manager.ImportJobContext context)
throws IOException, ImportException {
String pdsName = context.getTableName();
String jarFile = context.getJarFile();
SqoopOptions opts = context.getOptions();
context.setConnManager(this);
ImportJobBase importer;
if (opts.getHBaseTable() != null) {
if (!HBaseUtil.isHBaseJarPresent()) {
throw new ImportException("HBase jars are not present in "
+ "classpath, cannot import to HBase!");
}
if (!opts.isBulkLoadEnabled()) {
importer = new HBaseImportJob(opts, context);
} else {
importer = new HBaseBulkImportJob(opts, context);
}
} else if (opts.getAccumuloTable() != null) {
if (!AccumuloUtil.isAccumuloJarPresent()) {
throw new ImportException("Accumulo jars are not present in "
+ "classpath, cannot import to Accumulo!");
}
importer = new AccumuloImportJob(opts, context);
} else {
// Import to HDFS.
importer = new MainframeImportJob(opts, context);
}
importer.setInputFormatClass(MainframeDatasetInputFormat.class);
importer.runImport(pdsName, jarFile, null, opts.getConf());
}
@Override
public String[] getColumnNames(String tableName) {
// default is one column for the whole record
String[] colNames = new String[1];
colNames[0] = DEFAULT_DATASET_COLUMN_NAME;
return colNames;
}
@Override
public Map<String, Integer> getColumnTypes(String tableName) {
Map<String, Integer> colTypes = new HashMap<String, Integer>();
String[] colNames = getColumnNames(tableName);
colTypes.put(colNames[0], Types.VARCHAR);
return colTypes;
}
@Override
public void discardConnection(boolean doClose) {
// do nothing
}
@Override
public String[] listDatabases() {
LOG.error("MainframeManager.listDatabases() not supported");
return null;
}
@Override
public String[] listTables() {
LOG.error("MainframeManager.listTables() not supported");
return null;
}
@Override
public String getPrimaryKey(String tableName) {
return null;
}
@Override
public ResultSet readTable(String tableName, String[] columns)
throws SQLException {
return null;
}
@Override
public Connection getConnection() throws SQLException {
return null;
}
@Override
public void close() throws SQLException {
release();
}
@Override
public void release() {
}
@Override
public String getDriverClass(){
return "";
}
@Override
public void execAndPrint(String s) {
}
}

View File

@ -0,0 +1,25 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce.mainframe;
public class MainframeConfiguration
{
public static final String MAINFRAME_INPUT_DATASET_NAME
= "mapreduce.mainframe.input.dataset.name";
}

View File

@ -0,0 +1,110 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce.mainframe;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.sqoop.lib.SqoopRecord;
import org.apache.sqoop.util.MainframeFTPClientUtils;
/**
* A RecordReader that returns a record from a mainframe dataset.
*/
public class MainframeDatasetFTPRecordReader <T extends SqoopRecord>
extends MainframeDatasetRecordReader<T> {
private FTPClient ftp = null;
private BufferedReader datasetReader = null;
private static final Log LOG = LogFactory.getLog(
MainframeDatasetFTPRecordReader.class.getName());
@Override
public void initialize(InputSplit inputSplit,
TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
super.initialize(inputSplit, taskAttemptContext);
Configuration conf = getConfiguration();
ftp = MainframeFTPClientUtils.getFTPConnection(conf);
if (ftp != null) {
String dsName
= conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME);
ftp.changeWorkingDirectory("'" + dsName + "'");
}
}
@Override
public void close() throws IOException {
if (datasetReader != null) {
datasetReader.close();
}
if (ftp != null) {
MainframeFTPClientUtils.closeFTPConnection(ftp);
}
}
protected boolean getNextRecord(T sqoopRecord) throws IOException {
String line = null;
try {
do {
if (datasetReader == null) {
String dsName = getNextDataset();
if (dsName == null) {
break;
}
datasetReader = new BufferedReader(new InputStreamReader(
ftp.retrieveFileStream(dsName)));
}
line = datasetReader.readLine();
if (line == null) {
datasetReader.close();
datasetReader = null;
if (!ftp.completePendingCommand()) {
throw new IOException("Failed to complete ftp command.");
} else {
LOG.info("Data transfer completed.");
}
}
} while(line == null);
} catch (IOException ioe) {
throw new IOException("IOException during data transfer: " +
ioe.toString());
}
if (line != null) {
convertToSqoopRecord(line, (SqoopRecord)sqoopRecord);
return true;
}
return false;
}
private void convertToSqoopRecord(String line, SqoopRecord sqoopRecord) {
String fieldName
= sqoopRecord.getFieldMap().entrySet().iterator().next().getKey();
sqoopRecord.setField(fieldName, line);
}
}

View File

@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce.mainframe;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.sqoop.config.ConfigurationConstants;
import org.apache.sqoop.lib.SqoopRecord;
import org.apache.sqoop.mapreduce.AutoProgressMapper;
/**
* Mapper that writes mainframe dataset records in Text format to multiple files
* based on the key, which is the index of the datasets in the input split.
*/
public class MainframeDatasetImportMapper
extends AutoProgressMapper<LongWritable, SqoopRecord, Text, NullWritable> {
private static final Log LOG = LogFactory.getLog(
MainframeDatasetImportMapper.class.getName());
private MainframeDatasetInputSplit inputSplit;
private MultipleOutputs<Text, NullWritable> mos;
private long numberOfRecords;
private Text outkey;
public void map(LongWritable key, SqoopRecord val, Context context)
throws IOException, InterruptedException {
String dataset = inputSplit.getCurrentDataset();
outkey.set(val.toString());
numberOfRecords++;
mos.write(outkey, NullWritable.get(), dataset);
}
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
super.setup(context);
inputSplit = (MainframeDatasetInputSplit)context.getInputSplit();
mos = new MultipleOutputs<Text, NullWritable>(context);
numberOfRecords = 0;
outkey = new Text();
}
@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
super.cleanup(context);
mos.close();
context.getCounter(
ConfigurationConstants.COUNTER_GROUP_MAPRED_TASK_COUNTERS,
ConfigurationConstants.COUNTER_MAP_OUTPUT_RECORDS)
.increment(numberOfRecords);
}
}

View File

@ -0,0 +1,91 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce.mainframe;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.sqoop.config.ConfigurationHelper;
import org.apache.sqoop.lib.SqoopRecord;
import org.apache.sqoop.util.MainframeFTPClientUtils;
/**
* A InputFormat that retrieves a list of sequential dataset names in
* a mainframe partitioned dataset. It then creates splits containing one or
* more dataset names.
*/
public class MainframeDatasetInputFormat<T extends SqoopRecord>
extends InputFormat<LongWritable, T> {
private static final Log LOG =
LogFactory.getLog(MainframeDatasetInputFormat.class);
@Override
public RecordReader<LongWritable, T> createRecordReader(
InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
return new MainframeDatasetFTPRecordReader<T>();
}
@Override
public List<InputSplit> getSplits(JobContext job) throws IOException {
List<InputSplit> splits = new ArrayList<InputSplit>();
Configuration conf = job.getConfiguration();
String dsName
= conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME);
LOG.info("Datasets to transfer from: " + dsName);
List<String> datasets = retrieveDatasets(dsName, conf);
if (datasets.isEmpty()) {
throw new IOException ("No sequential datasets retrieved from " + dsName);
} else {
int count = datasets.size();
int chunks = Math.min(count, ConfigurationHelper.getJobNumMaps(job));
for (int i = 0; i < chunks; i++) {
splits.add(new MainframeDatasetInputSplit());
}
int j = 0;
while(j < count) {
for (InputSplit sp : splits) {
if (j == count) {
break;
}
((MainframeDatasetInputSplit)sp).addDataset(datasets.get(j));
j++;
}
}
}
return splits;
}
protected List<String> retrieveDatasets(String dsName, Configuration conf)
throws IOException {
return MainframeFTPClientUtils.listSequentialDatasets(dsName, conf);
}
}

View File

@ -0,0 +1,92 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce.mainframe;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
/**
* A collection of mainframe datasets.
*
*/
public class MainframeDatasetInputSplit extends InputSplit implements Writable {
private List<String> mainframeDatasets;
private String currentDataset;
private int currentIndex;
public MainframeDatasetInputSplit() {
mainframeDatasets = new ArrayList<String>();
currentDataset = null;
currentIndex = -1;
}
public void addDataset(String mainframeDataset) {
mainframeDatasets.add(mainframeDataset);
}
public String getCurrentDataset() {
return currentDataset;
}
public String getNextDataset() {
if (hasMore()) {
currentIndex++;
currentDataset = mainframeDatasets.get(currentIndex);
} else {
currentDataset = null;
}
return currentDataset;
}
public boolean hasMore() {
return currentIndex < (mainframeDatasets.size() -1);
}
@Override
public long getLength() throws IOException, InterruptedException {
return mainframeDatasets.size();
}
@Override
public String[] getLocations() throws IOException, InterruptedException {
return new String[0]; // No locations
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(mainframeDatasets.size());
for (String ds : mainframeDatasets) {
dataOutput.writeUTF(ds);
}
}
@Override
public void readFields(DataInput dataInput) throws IOException {
int numberOfDatasets = dataInput.readInt();
for (int i = 0; i < numberOfDatasets; i++) {
mainframeDatasets.add(dataInput.readUTF());
}
}
}

View File

@ -0,0 +1,115 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce.mainframe;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.sqoop.mapreduce.DBWritable;
import org.apache.sqoop.mapreduce.db.DBConfiguration;
/**
* A RecordReader that reads records from a mainframe dataset.
* Emits LongWritables containing the record number as key and DBWritables as
* value.
*/
public abstract class MainframeDatasetRecordReader <T extends DBWritable>
extends RecordReader<LongWritable, T> {
private Class<T> inputClass;
private Configuration conf;
private MainframeDatasetInputSplit split;
private LongWritable key;
private T datasetRecord;
private long numberRecordRead;
private int datasetProcessed;
private static final Log LOG = LogFactory.getLog(
MainframeDatasetRecordReader.class.getName());
@Override
public void initialize(InputSplit inputSplit,
TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
split = (MainframeDatasetInputSplit)inputSplit;
conf = taskAttemptContext.getConfiguration();
inputClass = (Class<T>) (conf.getClass(
DBConfiguration.INPUT_CLASS_PROPERTY, null));
key = null;
datasetRecord = null;
numberRecordRead = 0;
datasetProcessed = 0;
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (key == null) {
key = new LongWritable();
}
if (datasetRecord == null) {
datasetRecord = ReflectionUtils.newInstance(inputClass, conf);
}
if (getNextRecord(datasetRecord)) {
numberRecordRead++;
key.set(numberRecordRead);
return true;
}
return false;
}
@Override
public LongWritable getCurrentKey() throws IOException, InterruptedException {
return key;
}
@Override
public T getCurrentValue() throws IOException, InterruptedException {
return datasetRecord;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return datasetProcessed / (float)split.getLength();
}
protected String getNextDataset() {
String datasetName = split.getNextDataset();
if (datasetName != null) {
datasetProcessed++;
LOG.info("Starting transfer of " + datasetName);
}
return datasetName;
}
protected Configuration getConfiguration() {
return conf;
}
protected abstract boolean getNextRecord (T datasetRecord) throws IOException;
}

View File

@ -0,0 +1,71 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce.mainframe;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.manager.ImportJobContext;
import org.apache.sqoop.mapreduce.DataDrivenImportJob;
/**
* Import data from a mainframe dataset, using MainframeDatasetInputFormat.
*/
public class MainframeImportJob extends DataDrivenImportJob {
private static final Log LOG = LogFactory.getLog(
MainframeImportJob.class.getName());
public MainframeImportJob(final SqoopOptions opts, ImportJobContext context) {
super(opts, MainframeDatasetInputFormat.class, context);
}
@Override
protected Class<? extends Mapper> getMapperClass() {
if (options.getFileLayout() == SqoopOptions.FileLayout.TextFile) {
return MainframeDatasetImportMapper.class;
} else {
return super.getMapperClass();
}
}
@Override
protected void configureInputFormat(Job job, String tableName,
String tableClassName, String splitByCol) throws IOException {
super.configureInputFormat(job, tableName, tableClassName, splitByCol);
job.getConfiguration().set(
MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME,
options.getMainframeInputDatasetName());
}
@Override
protected void configureOutputFormat(Job job, String tableName,
String tableClassName) throws ClassNotFoundException, IOException {
super.configureOutputFormat(job, tableName, tableClassName);
LazyOutputFormat.setOutputFormatClass(job, getOutputFormatClass());
}
}

View File

@ -0,0 +1,156 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.tool;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.ToolRunner;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.SqoopOptions.InvalidOptionsException;
import com.cloudera.sqoop.cli.RelatedOptions;
import com.cloudera.sqoop.cli.ToolOptions;
/**
* Tool that performs mainframe dataset imports to HDFS.
*/
public class MainframeImportTool extends ImportTool {
private static final Log LOG
= LogFactory.getLog(MainframeImportTool.class.getName());
public static final String DS_ARG = "dataset";
public MainframeImportTool() {
super("import-mainframe", false);
}
@Override
@SuppressWarnings("static-access")
protected RelatedOptions getImportOptions() {
// Imports
RelatedOptions importOpts
= new RelatedOptions("Import mainframe control arguments");
importOpts.addOption(OptionBuilder.withArgName("Dataset name")
.hasArg().withDescription("Datasets to import")
.withLongOpt(DS_ARG)
.create());
importOpts.addOption(OptionBuilder
.withDescription("Imports data in delete mode")
.withLongOpt(DELETE_ARG)
.create());
importOpts.addOption(OptionBuilder.withArgName("dir")
.hasArg().withDescription("HDFS plain file destination")
.withLongOpt(TARGET_DIR_ARG)
.create());
addValidationOpts(importOpts);
importOpts.addOption(OptionBuilder.withArgName("dir")
.hasArg().withDescription("HDFS parent for file destination")
.withLongOpt(WAREHOUSE_DIR_ARG)
.create());
importOpts.addOption(OptionBuilder
.withDescription("Imports data as plain text (default)")
.withLongOpt(FMT_TEXTFILE_ARG)
.create());
importOpts.addOption(OptionBuilder.withArgName("n")
.hasArg().withDescription("Use 'n' map tasks to import in parallel")
.withLongOpt(NUM_MAPPERS_ARG)
.create(NUM_MAPPERS_SHORT_ARG));
importOpts.addOption(OptionBuilder.withArgName("name")
.hasArg().withDescription("Set name for generated mapreduce job")
.withLongOpt(MAPREDUCE_JOB_NAME)
.create());
importOpts.addOption(OptionBuilder
.withDescription("Enable compression")
.withLongOpt(COMPRESS_ARG)
.create(COMPRESS_SHORT_ARG));
importOpts.addOption(OptionBuilder.withArgName("codec")
.hasArg()
.withDescription("Compression codec to use for import")
.withLongOpt(COMPRESSION_CODEC_ARG)
.create());
return importOpts;
}
@Override
public void configureOptions(ToolOptions toolOptions) {
toolOptions.addUniqueOptions(getCommonOptions());
toolOptions.addUniqueOptions(getImportOptions());
toolOptions.addUniqueOptions(getOutputFormatOptions());
toolOptions.addUniqueOptions(getInputFormatOptions());
toolOptions.addUniqueOptions(getHiveOptions(true));
toolOptions.addUniqueOptions(getHBaseOptions());
toolOptions.addUniqueOptions(getHCatalogOptions());
toolOptions.addUniqueOptions(getHCatImportOnlyOptions());
toolOptions.addUniqueOptions(getAccumuloOptions());
// get common codegen opts.
RelatedOptions codeGenOpts = getCodeGenOpts(false);
// add import-specific codegen opts:
codeGenOpts.addOption(OptionBuilder.withArgName("file")
.hasArg()
.withDescription("Disable code generation; use specified jar")
.withLongOpt(JAR_FILE_NAME_ARG)
.create());
toolOptions.addUniqueOptions(codeGenOpts);
}
@Override
public void printHelp(ToolOptions toolOptions) {
System.out.println("usage: sqoop " + getToolName()
+ " [GENERIC-ARGS] [TOOL-ARGS]\n");
toolOptions.printHelp();
System.out.println("\nGeneric Hadoop command-line arguments:");
System.out.println("(must preceed any tool-specific arguments)");
ToolRunner.printGenericCommandUsage(System.out);
System.out.println(
"\nAt minimum, you must specify --connect and --" + DS_ARG);
}
@Override
public void applyOptions(CommandLine in, SqoopOptions out)
throws InvalidOptionsException {
super.applyOptions(in, out);
if (!in.hasOption(CONN_MANAGER_CLASS_NAME)) {
// set default ConnManager
out.setConnManagerClassName("org.apache.sqoop.manager.MainframeManager");
}
if (in.hasOption(DS_ARG)) {
out.setMainframeInputDatasetName(in.getOptionValue(DS_ARG));
}
}
@Override
protected void validateImportOptions(SqoopOptions options)
throws InvalidOptionsException {
if (options.getMainframeInputDatasetName() == null) {
throw new InvalidOptionsException(
"--" + DS_ARG + " is required for mainframe import. " + HELP_STR);
}
super.validateImportOptions(options);
}
}

View File

@ -84,6 +84,8 @@ public abstract class SqoopTool {
"Import a table from a database to HDFS");
registerTool("import-all-tables", ImportAllTablesTool.class,
"Import tables from a database to HDFS");
registerTool("import-mainframe", MainframeImportTool.class,
"Import datasets from a mainframe server to HDFS");
registerTool("help", HelpTool.class, "List available commands");
registerTool("list-databases", ListDatabasesTool.class,
"List available databases on a server");

View File

@ -0,0 +1,189 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.util;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.net.PrintCommandListener;
import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPClientConfig;
import org.apache.commons.net.ftp.FTPConnectionClosedException;
import org.apache.commons.net.ftp.FTPFile;
import org.apache.commons.net.ftp.FTPReply;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.sqoop.mapreduce.JobBase;
import org.apache.sqoop.mapreduce.db.DBConfiguration;
/**
* Utility methods used when accessing a mainframe server through FTP client.
*/
public final class MainframeFTPClientUtils {
private static final Log LOG = LogFactory.getLog(
MainframeFTPClientUtils.class.getName());
private static FTPClient mockFTPClient = null; // Used for unit testing
private MainframeFTPClientUtils() {
}
public static List<String> listSequentialDatasets(
String pdsName, Configuration conf) throws IOException {
List<String> datasets = new ArrayList<String>();
FTPClient ftp = null;
try {
ftp = getFTPConnection(conf);
if (ftp != null) {
ftp.changeWorkingDirectory("'" + pdsName + "'");
FTPFile[] ftpFiles = ftp.listFiles();
for (FTPFile f : ftpFiles) {
if (f.getType() == FTPFile.FILE_TYPE) {
datasets.add(f.getName());
}
}
}
} catch(IOException ioe) {
throw new IOException ("Could not list datasets from " + pdsName + ":"
+ ioe.toString());
} finally {
if (ftp != null) {
closeFTPConnection(ftp);
}
}
return datasets;
}
public static FTPClient getFTPConnection(Configuration conf)
throws IOException {
FTPClient ftp = null;
try {
String username = conf.get(DBConfiguration.USERNAME_PROPERTY);
String password;
if (username == null) {
username = "anonymous";
password = "";
}
else {
password = DBConfiguration.getPassword((JobConf) conf);
}
String connectString = conf.get(DBConfiguration.URL_PROPERTY);
String server = connectString;
int port = 0;
String[] parts = connectString.split(":");
if (parts.length == 2) {
server = parts[0];
try {
port = Integer.parseInt(parts[1]);
} catch(NumberFormatException e) {
LOG.warn("Invalid port number: " + e.toString());
}
}
if (null != mockFTPClient) {
ftp = mockFTPClient;
} else {
ftp = new FTPClient();
}
FTPClientConfig config = new FTPClientConfig(FTPClientConfig.SYST_MVS);
ftp.configure(config);
if (conf.getBoolean(JobBase.PROPERTY_VERBOSE, false)) {
ftp.addProtocolCommandListener(new PrintCommandListener(
new PrintWriter(System.out), true));
}
try {
if (port > 0) {
ftp.connect(server, port);
} else {
ftp.connect(server);
}
} catch(IOException ioexp) {
throw new IOException("Could not connect to server " + server, ioexp);
}
int reply = ftp.getReplyCode();
if (!FTPReply.isPositiveCompletion(reply)) {
throw new IOException("FTP server " + server
+ " refused connection:" + ftp.getReplyString());
}
LOG.info("Connected to " + server + " on " +
(port>0 ? port : ftp.getDefaultPort()));
if (!ftp.login(username, password)) {
ftp.logout();
throw new IOException("Could not login to server " + server
+ ":" + ftp.getReplyString());
}
// set ASCII transfer mode
ftp.setFileType(FTP.ASCII_FILE_TYPE);
// Use passive mode as default.
ftp.enterLocalPassiveMode();
} catch(IOException ioe) {
if (ftp != null && ftp.isConnected()) {
try {
ftp.disconnect();
} catch(IOException f) {
// do nothing
}
}
ftp = null;
throw ioe;
}
return ftp;
}
public static boolean closeFTPConnection(FTPClient ftp) {
boolean success = true;
try {
ftp.noop(); // check that control connection is working OK
ftp.logout();
} catch(FTPConnectionClosedException e) {
success = false;
LOG.warn("Server closed connection: " + e.toString());
} catch(IOException e) {
success = false;
LOG.warn("Server closed connection: " + e.toString());
} finally {
if (ftp.isConnected()) {
try {
ftp.disconnect();
} catch(IOException f) {
success = false;
}
}
}
return success;
}
// Used for testing only
public static void setMockFTPClient(FTPClient FTPClient) {
mockFTPClient = FTPClient;
}
}

View File

@ -0,0 +1,189 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.manager;
import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.StringUtils;
import org.apache.sqoop.accumulo.AccumuloUtil;
import org.apache.sqoop.hbase.HBaseUtil;
import org.apache.sqoop.tool.MainframeImportTool;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.cloudera.sqoop.ConnFactory;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.manager.ConnManager;
import com.cloudera.sqoop.manager.ImportJobContext;
import com.cloudera.sqoop.metastore.JobData;
import com.cloudera.sqoop.testutil.BaseSqoopTestCase;
import com.cloudera.sqoop.util.ImportException;
/**
* Test methods of the generic SqlManager implementation.
*/
public class TestMainframeManager extends BaseSqoopTestCase {
private static final Log LOG = LogFactory.getLog(TestMainframeManager.class
.getName());
private ConnManager manager;
private SqoopOptions opts;
private ImportJobContext context;
@Before
public void setUp() {
Configuration conf = getConf();
opts = getSqoopOptions(conf);
opts.setConnectString("dummy.server");
opts.setTableName("dummy.pds");
opts.setConnManagerClassName("org.apache.sqoop.manager.MainframeManager");
context = new ImportJobContext(getTableName(), null, opts, null);
ConnFactory f = new ConnFactory(conf);
try {
this.manager = f.getManager(new JobData(opts, new MainframeImportTool()));
} catch (IOException ioe) {
fail("IOException instantiating manager: "
+ StringUtils.stringifyException(ioe));
}
}
@After
public void tearDown() {
try {
manager.close();
} catch (SQLException sqlE) {
LOG.error("Got SQLException: " + sqlE.toString());
fail("Got SQLException: " + sqlE.toString());
}
}
@Test
public void testListColNames() {
String[] colNames = manager.getColumnNames(getTableName());
assertNotNull("manager should return a column list", colNames);
assertEquals("Column list should be length 1", 1, colNames.length);
assertEquals(MainframeManager.DEFAULT_DATASET_COLUMN_NAME, colNames[0]);
}
@Test
public void testListColTypes() {
Map<String, Integer> types = manager.getColumnTypes(getTableName());
assertNotNull("manager should return a column types map", types);
assertEquals("Column types map should be size 1", 1, types.size());
assertEquals(types.get(MainframeManager.DEFAULT_DATASET_COLUMN_NAME)
.intValue(), Types.VARCHAR);
}
@Test
public void testImportTableNoHBaseJarPresent() {
HBaseUtil.setAlwaysNoHBaseJarMode(true);
opts.setHBaseTable("dummy_table");
try {
manager.importTable(context);
fail("An ImportException should be thrown: "
+ "HBase jars are not present in classpath, cannot import to HBase!");
} catch (ImportException e) {
assertEquals(e.toString(),
"HBase jars are not present in classpath, cannot import to HBase!");
} catch (IOException e) {
fail("No IOException should be thrown!");
} finally {
opts.setHBaseTable(null);
}
}
@Test
public void testImportTableNoAccumuloJarPresent() {
AccumuloUtil.setAlwaysNoAccumuloJarMode(true);
opts.setAccumuloTable("dummy_table");
try {
manager.importTable(context);
fail("An ImportException should be thrown: "
+ "Accumulo jars are not present in classpath, cannot import to "
+ "Accumulo!");
} catch (ImportException e) {
assertEquals(e.toString(),
"Accumulo jars are not present in classpath, cannot import to "
+ "Accumulo!");
} catch (IOException e) {
fail("No IOException should be thrown!");
} finally {
opts.setAccumuloTable(null);
}
}
@Test
public void testListTables() {
String[] tables = manager.listTables();
assertNull("manager should not return a list of tables", tables);
}
@Test
public void testListDatabases() {
String[] databases = manager.listDatabases();
assertNull("manager should not return a list of databases", databases);
}
@Test
public void testGetPrimaryKey() {
String primaryKey = manager.getPrimaryKey(getTableName());
assertNull("manager should not return a primary key", primaryKey);
}
@Test
public void testReadTable() {
String[] colNames = manager.getColumnNames(getTableName());
try {
ResultSet table = manager.readTable(getTableName(), colNames);
assertNull("manager should not read a table", table);
} catch (SQLException sqlE) {
fail("Got SQLException: " + sqlE.toString());
}
}
@Test
public void testGetConnection() {
try {
Connection con = manager.getConnection();
assertNull("manager should not return a connection", con);
} catch (SQLException sqlE) {
fail("Got SQLException: " + sqlE.toString());
}
}
@Test
public void testGetDriverClass() {
String driverClass = manager.getDriverClass();
assertNotNull("manager should return a driver class", driverClass);
assertEquals("manager should return an empty driver class", "",
driverClass);
}
}

View File

@ -0,0 +1,292 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce.mainframe;
import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.sqoop.lib.SqoopRecord;
import org.apache.sqoop.mapreduce.DBWritable;
import org.apache.sqoop.mapreduce.db.DBConfiguration;
import org.apache.sqoop.util.MainframeFTPClientUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import com.cloudera.sqoop.lib.DelimiterSet;
import com.cloudera.sqoop.lib.LargeObjectLoader;
public class TestMainframeDatasetFTPRecordReader {
private MainframeImportJob mfImportJob;
private MainframeImportJob avroImportJob;
private MainframeDatasetInputSplit mfDIS;
private TaskAttemptContext context;
private MainframeDatasetRecordReader mfDRR;
private MainframeDatasetFTPRecordReader mfDFTPRR;
private FTPClient mockFTPClient;
public static class DummySqoopRecord extends SqoopRecord {
private String field;
public Map<String, Object> getFieldMap() {
Map<String, Object> map = new HashMap<String, Object>();
map.put("fieldName", field);
return map;
}
public void setField(String fieldName, Object fieldVal) {
if (fieldVal instanceof String) {
field = (String) fieldVal;
}
}
public void setField(final String val) {
this.field = val;
}
@Override
public void readFields(DataInput in) throws IOException {
field = in.readUTF();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(field);
}
@Override
public void readFields(ResultSet rs) throws SQLException {
field = rs.getString(1);
}
@Override
public void write(PreparedStatement s) throws SQLException {
s.setString(1, field);
}
@Override
public String toString() {
return field;
}
@Override
public int write(PreparedStatement stmt, int offset) throws SQLException {
return 0;
}
@Override
public String toString(DelimiterSet delimiters) {
return null;
}
@Override
public int getClassFormatVersion() {
return 0;
}
@Override
public int hashCode() {
return Integer.parseInt(field);
}
public void loadLargeObjects(LargeObjectLoader loader) {
}
public void parse(CharSequence s) {
}
public void parse(Text s) {
}
public void parse(byte[] s) {
}
public void parse(char[] s) {
}
public void parse(ByteBuffer s) {
}
public void parse(CharBuffer s) {
}
}
@Before
public void setUp() throws IOException {
mockFTPClient = mock(FTPClient.class);
MainframeFTPClientUtils.setMockFTPClient(mockFTPClient);
try {
when(mockFTPClient.login("user", "pssword")).thenReturn(true);
when(mockFTPClient.logout()).thenReturn(true);
when(mockFTPClient.isConnected()).thenReturn(true);
when(mockFTPClient.completePendingCommand()).thenReturn(true);
when(mockFTPClient.changeWorkingDirectory(anyString())).thenReturn(true);
when(mockFTPClient.getReplyCode()).thenReturn(200);
when(mockFTPClient.noop()).thenReturn(200);
when(mockFTPClient.setFileType(anyInt())).thenReturn(true);
FTPFile ftpFile1 = new FTPFile();
ftpFile1.setType(FTPFile.FILE_TYPE);
ftpFile1.setName("test1");
FTPFile ftpFile2 = new FTPFile();
ftpFile2.setType(FTPFile.FILE_TYPE);
ftpFile2.setName("test2");
FTPFile[] ftpFiles = { ftpFile1, ftpFile2 };
when(mockFTPClient.listFiles()).thenReturn(ftpFiles);
when(mockFTPClient.retrieveFileStream("test1")).thenReturn(
new ByteArrayInputStream("123\n456\n".getBytes()));
when(mockFTPClient.retrieveFileStream("test2")).thenReturn(
new ByteArrayInputStream("789\n".getBytes()));
when(mockFTPClient.retrieveFileStream("NotComplete")).thenReturn(
new ByteArrayInputStream("NotComplete\n".getBytes()));
} catch (IOException e) {
fail("No IOException should be thrown!");
}
JobConf conf = new JobConf();
conf.set(DBConfiguration.URL_PROPERTY, "localhost:" + "11111");
conf.set(DBConfiguration.USERNAME_PROPERTY, "user");
conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword");
// set the password in the secure credentials object
Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY);
conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY,
"pssword".getBytes());
conf.setClass(DBConfiguration.INPUT_CLASS_PROPERTY, DummySqoopRecord.class,
DBWritable.class);
Job job = Job.getInstance(conf);
mfDIS = new MainframeDatasetInputSplit();
mfDIS.addDataset("test1");
mfDIS.addDataset("test2");
context = mock(TaskAttemptContext.class);
when(context.getConfiguration()).thenReturn(job.getConfiguration());
mfDFTPRR = new MainframeDatasetFTPRecordReader();
}
@After
public void tearDown() {
try {
mfDFTPRR.close();
} catch (IOException ioe) {
fail("Got IOException: " + ioe.toString());
}
MainframeFTPClientUtils.setMockFTPClient(null);
}
@Test
public void testReadAllData() {
try {
mfDFTPRR.initialize(mfDIS, context);
Assert.assertTrue("Retrieve of dataset", mfDFTPRR.nextKeyValue());
Assert.assertEquals("Key should increase by records", 1, mfDFTPRR
.getCurrentKey().get());
Assert.assertEquals("Read value by line and by dataset", "123", mfDFTPRR
.getCurrentValue().toString());
Assert.assertEquals("Get progress according to left dataset",
mfDFTPRR.getProgress(), (float) 0.5, 0.02);
Assert.assertTrue("Retrieve of dataset", mfDFTPRR.nextKeyValue());
Assert.assertEquals("Key should increase by records", 2, mfDFTPRR
.getCurrentKey().get());
Assert.assertEquals("Read value by line and by dataset", "456", mfDFTPRR
.getCurrentValue().toString());
Assert.assertEquals("Get progress according to left dataset",
mfDFTPRR.getProgress(), (float) 0.5, 0.02);
Assert.assertTrue("Retrieve of dataset", mfDFTPRR.nextKeyValue());
Assert.assertEquals("Key should increase by records", 3, mfDFTPRR
.getCurrentKey().get());
Assert.assertEquals("Read value by line and by dataset", "789", mfDFTPRR
.getCurrentValue().toString());
Assert.assertEquals("Get progress according to left dataset",
mfDFTPRR.getProgress(), (float) 1, 0.02);
Assert.assertFalse("End of dataset", mfDFTPRR.nextKeyValue());
} catch (IOException ioe) {
fail("Got IOException: " + ioe.toString());
} catch (InterruptedException ie) {
fail("Got InterruptedException: " + ie.toString());
}
}
@Test
public void testReadPartOfData() {
try {
mfDFTPRR.initialize(mfDIS, context);
Assert.assertTrue("Retrieve of dataset", mfDFTPRR.nextKeyValue());
Assert.assertEquals("Key should increase by records", 1, mfDFTPRR
.getCurrentKey().get());
Assert.assertEquals("Read value by line and by dataset", "123", mfDFTPRR
.getCurrentValue().toString());
Assert.assertEquals("Get progress according to left dataset",
mfDFTPRR.getProgress(), (float) 0.5, 0.02);
} catch (IOException ioe) {
fail("Got IOException: " + ioe.toString());
} catch (InterruptedException ie) {
fail("Got InterruptedException: " + ie.toString());
}
}
@Test
public void testFTPNotComplete() {
try {
mfDIS = new MainframeDatasetInputSplit();
mfDIS.addDataset("NotComplete");
mfDFTPRR.initialize(mfDIS, context);
Assert.assertTrue("Retrieve of dataset", mfDFTPRR.nextKeyValue());
when(mockFTPClient.completePendingCommand()).thenReturn(false);
mfDFTPRR.nextKeyValue();
} catch (IOException ioe) {
Assert.assertEquals(
"java.io.IOException: IOException during data transfer: "
+ "java.io.IOException: Failed to complete ftp command.",
ioe.toString());
} catch (InterruptedException ie) {
fail("Got InterruptedException: " + ie.toString());
}
}
}

View File

@ -0,0 +1,109 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce.mainframe;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.sqoop.lib.SqoopRecord;
import org.apache.sqoop.mapreduce.db.DBConfiguration;
import org.apache.sqoop.mapreduce.mainframe.MainframeConfiguration;
import org.apache.sqoop.util.MainframeFTPClientUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TestMainframeDatasetInputFormat {
private MainframeDatasetInputFormat<SqoopRecord> format;
private FTPClient mockFTPClient;
@Before
public void setUp() {
format = new MainframeDatasetInputFormat<SqoopRecord>();
mockFTPClient = mock(FTPClient.class);
MainframeFTPClientUtils.setMockFTPClient(mockFTPClient);
try {
when(mockFTPClient.login("user", "pssword")).thenReturn(true);
when(mockFTPClient.logout()).thenReturn(true);
when(mockFTPClient.isConnected()).thenReturn(true);
when(mockFTPClient.completePendingCommand()).thenReturn(true);
when(mockFTPClient.changeWorkingDirectory(anyString())).thenReturn(true);
when(mockFTPClient.getReplyCode()).thenReturn(200);
when(mockFTPClient.getReplyString()).thenReturn("");
when(mockFTPClient.noop()).thenReturn(200);
when(mockFTPClient.setFileType(anyInt())).thenReturn(true);
FTPFile ftpFile1 = new FTPFile();
ftpFile1.setType(FTPFile.FILE_TYPE);
ftpFile1.setName("test1");
FTPFile ftpFile2 = new FTPFile();
ftpFile2.setType(FTPFile.FILE_TYPE);
ftpFile2.setName("test2");
FTPFile[] ftpFiles = { ftpFile1, ftpFile2 };
when(mockFTPClient.listFiles()).thenReturn(ftpFiles);
} catch (IOException e) {
fail("No IOException should be thrown!");
}
}
@After
public void tearDown() {
MainframeFTPClientUtils.setMockFTPClient(null);
}
@Test
public void testRetrieveDatasets() throws IOException {
JobConf conf = new JobConf();
conf.set(DBConfiguration.URL_PROPERTY, "localhost:12345");
conf.set(DBConfiguration.USERNAME_PROPERTY, "user");
conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword");
// set the password in the secure credentials object
Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY);
conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY,
"pssword".getBytes());
String dsName = "dsName1";
conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME, dsName);
Job job = Job.getInstance(conf);
format.getSplits(job);
List<InputSplit> splits = new ArrayList<InputSplit>();
splits = ((MainframeDatasetInputFormat<SqoopRecord>) format).getSplits(job);
Assert.assertEquals("test1", ((MainframeDatasetInputSplit) splits.get(0))
.getNextDataset().toString());
Assert.assertEquals("test2", ((MainframeDatasetInputSplit) splits.get(1))
.getNextDataset().toString());
}
}

View File

@ -0,0 +1,119 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce.mainframe;
import java.io.IOException;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TestMainframeDatasetInputSplit {
private MainframeDatasetInputSplit mfDatasetInputSplit;
@Before
public void setUp() {
mfDatasetInputSplit = new MainframeDatasetInputSplit();
}
@Test
public void testGetCurrentDataset() {
String currentDataset = mfDatasetInputSplit.getCurrentDataset();
Assert.assertNull(currentDataset);
}
@Test
public void testGetNextDatasetWithNull() {
String currentDataset = mfDatasetInputSplit.getNextDataset();
Assert.assertNull(currentDataset);
}
@Test
public void testGetNextDataset() {
String mainframeDataset = "test";
mfDatasetInputSplit.addDataset(mainframeDataset);
String currentDataset = mfDatasetInputSplit.getNextDataset();
Assert.assertEquals("test", currentDataset);
}
@Test
public void testHasMoreWithFalse() {
boolean retVal = mfDatasetInputSplit.hasMore();
Assert.assertFalse(retVal);
}
@Test
public void testHasMoreWithTrue() {
String mainframeDataset = "test";
mfDatasetInputSplit.addDataset(mainframeDataset);
boolean retVal = mfDatasetInputSplit.hasMore();
Assert.assertTrue(retVal);
}
@Test
public void testGetLength() {
String mainframeDataset = "test";
mfDatasetInputSplit.addDataset(mainframeDataset);
try {
long retVal = mfDatasetInputSplit.getLength();
Assert.assertEquals(1, retVal);
} catch (IOException ioe) {
Assert.fail("No IOException should be thrown!");
} catch (InterruptedException ie) {
Assert.fail("No InterruptedException should be thrown!");
}
}
@Test
public void testGetLocations() {
try {
String[] retVal = mfDatasetInputSplit.getLocations();
Assert.assertNotNull(retVal);
} catch (IOException ioe) {
Assert.fail("No IOException should be thrown!");
} catch (InterruptedException ie) {
Assert.fail("No InterruptedException should be thrown!");
}
}
@Test
public void testWriteRead() {
mfDatasetInputSplit.addDataset("dataSet1");
mfDatasetInputSplit.addDataset("dataSet2");
DataOutputBuffer dob = new DataOutputBuffer();
DataInputBuffer dib = new DataInputBuffer();
MainframeDatasetInputSplit mfReader = new MainframeDatasetInputSplit();
try {
mfDatasetInputSplit.write(dob);
dib.reset(dob.getData(), dob.getLength());
mfReader.readFields(dib);
Assert.assertNotNull("MFReader get data from tester", mfReader);
Assert.assertEquals(2, mfReader.getLength());
Assert.assertEquals("dataSet1", mfReader.getNextDataset());
Assert.assertEquals("dataSet2", mfReader.getNextDataset());
} catch (IOException ioe) {
Assert.fail("No IOException should be thrown!");
} catch (InterruptedException ie) {
Assert.fail("No InterruptedException should be thrown!");
}
}
}

View File

@ -0,0 +1,91 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce.mainframe;
import static org.junit.Assert.assertEquals;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Mapper;
import org.junit.Before;
import org.junit.Test;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.manager.ImportJobContext;
public class TestMainframeImportJob {
private MainframeImportJob mfImportJob;
private MainframeImportJob avroImportJob;
private SqoopOptions options;
@Before
public void setUp() {
options = new SqoopOptions();
}
@Test
public void testGetMainframeDatasetImportMapperClass()
throws SecurityException, NoSuchMethodException,
IllegalArgumentException, IllegalAccessException,
InvocationTargetException {
String jarFile = "dummyJarFile";
String tableName = "dummyTableName";
Path path = new Path("dummyPath");
ImportJobContext context = new ImportJobContext(tableName, jarFile,
options, path);
mfImportJob = new MainframeImportJob(options, context);
// To access protected method by means of reflection
Class[] types = {};
Method m_getMapperClass = MainframeImportJob.class.getDeclaredMethod(
"getMapperClass", types);
m_getMapperClass.setAccessible(true);
Class<? extends Mapper> mapper = (Class<? extends Mapper>) m_getMapperClass
.invoke(mfImportJob);
assertEquals(mapper,
org.apache.sqoop.mapreduce.mainframe.MainframeDatasetImportMapper.class);
}
@Test
public void testSuperMapperClass() throws SecurityException,
NoSuchMethodException, IllegalArgumentException, IllegalAccessException,
InvocationTargetException {
String jarFile = "dummyJarFile";
String tableName = "dummyTableName";
Path path = new Path("dummyPath");
options.setFileLayout(SqoopOptions.FileLayout.AvroDataFile);
ImportJobContext context = new ImportJobContext(tableName, jarFile,
options, path);
avroImportJob = new MainframeImportJob(options, context);
// To access protected method by means of reflection
Class[] types = {};
Method m_getMapperClass = MainframeImportJob.class.getDeclaredMethod(
"getMapperClass", types);
m_getMapperClass.setAccessible(true);
Class<? extends Mapper> mapper = (Class<? extends Mapper>) m_getMapperClass
.invoke(avroImportJob);
assertEquals(mapper, org.apache.sqoop.mapreduce.AvroImportMapper.class);
}
}

View File

@ -0,0 +1,151 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.tool;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sqoop.cli.RelatedOptions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.SqoopOptions.InvalidOptionsException;
import com.cloudera.sqoop.cli.ToolOptions;
import com.cloudera.sqoop.testutil.BaseSqoopTestCase;
public class TestMainframeImportTool extends BaseSqoopTestCase {
private static final Log LOG = LogFactory.getLog(TestMainframeImportTool.class
.getName());
private MainframeImportTool mfImportTool;
private final ByteArrayOutputStream outContent = new ByteArrayOutputStream();
@Before
public void setUp() {
mfImportTool = new MainframeImportTool();
System.setOut(new PrintStream(outContent));
}
@After
public void tearDown() {
System.setOut(null);
}
@Test
public void testPrintHelp() {
ToolOptions toolOptions = new ToolOptions();
String separator = System.getProperty("line.separator");
mfImportTool.printHelp(toolOptions);
String outputMsg = "usage: sqoop "
+ mfImportTool.getToolName()
+ " [GENERIC-ARGS] [TOOL-ARGS]"
+ separator
+ ""
+ separator
+ ""
+ separator
+ "Generic Hadoop command-line arguments:"
+ separator
+ "(must preceed any tool-specific arguments)"
+ separator
+ "Generic options supported are"
+ separator
+ "-conf <configuration file> specify an application configuration file"
+ separator
+ "-D <property=value> use value for given property"
+ separator
+ "-fs <local|namenode:port> specify a namenode"
+ separator
+ "-jt <local|jobtracker:port> specify a job tracker"
+ separator
+ "-files <comma separated list of files> "
+ "specify comma separated files to be copied to the map reduce cluster"
+ separator
+ "-libjars <comma separated list of jars> "
+ "specify comma separated jar files to include in the classpath."
+ separator
+ "-archives <comma separated list of archives> "
+ "specify comma separated archives to be unarchived on the compute machines.\n"
+ separator + "The general command line syntax is" + separator
+ "bin/hadoop command [genericOptions] [commandOptions]\n" + separator
+ "" + separator + "At minimum, you must specify --connect and --"
+ MainframeImportTool.DS_ARG + separator;
assertEquals(outputMsg, outContent.toString());
}
@SuppressWarnings("deprecation")
@Test
public void testGetImportOptions() throws SecurityException,
NoSuchMethodException, IllegalArgumentException, IllegalAccessException,
InvocationTargetException {
// To access protected method by means of reflection
Class[] types = {};
Object[] params = {};
Method m_getImportOptions = MainframeImportTool.class.getDeclaredMethod(
"getImportOptions", types);
m_getImportOptions.setAccessible(true);
RelatedOptions rOptions = (RelatedOptions) m_getImportOptions.invoke(
mfImportTool, params);
assertNotNull("It should return a RelatedOptions", rOptions);
assertTrue(rOptions.hasOption(MainframeImportTool.DS_ARG));
assertTrue(rOptions.hasOption(MainframeImportTool.DELETE_ARG));
assertTrue(rOptions.hasOption(MainframeImportTool.TARGET_DIR_ARG));
assertTrue(rOptions.hasOption(MainframeImportTool.WAREHOUSE_DIR_ARG));
assertTrue(rOptions.hasOption(MainframeImportTool.FMT_TEXTFILE_ARG));
assertTrue(rOptions.hasOption(MainframeImportTool.NUM_MAPPERS_ARG));
assertTrue(rOptions.hasOption(MainframeImportTool.MAPREDUCE_JOB_NAME));
assertTrue(rOptions.hasOption(MainframeImportTool.COMPRESS_ARG));
assertTrue(rOptions.hasOption(MainframeImportTool.COMPRESSION_CODEC_ARG));
}
@Test
public void testApplyOptions()
throws InvalidOptionsException, ParseException {
String[] args = { "--" + MainframeImportTool.DS_ARG, "dummy_ds" };
ToolOptions toolOptions = new ToolOptions();
SqoopOptions sqoopOption = new SqoopOptions();
mfImportTool.configureOptions(toolOptions);
sqoopOption = mfImportTool.parseArguments(args, null, sqoopOption, false);
assertEquals(sqoopOption.getConnManagerClassName(),
"org.apache.sqoop.manager.MainframeManager");
assertEquals(sqoopOption.getTableName(), "dummy_ds");
}
@Test
public void testNotApplyOptions() throws ParseException,
InvalidOptionsException {
String[] args = new String[] { "--connection-manager=dummy_ClassName" };
ToolOptions toolOptions = new ToolOptions();
SqoopOptions sqoopOption = new SqoopOptions();
mfImportTool.configureOptions(toolOptions);
sqoopOption = mfImportTool.parseArguments(args, null, sqoopOption, false);
assertEquals(sqoopOption.getConnManagerClassName(), "dummy_ClassName");
assertNull(sqoopOption.getTableName());
}
}

View File

@ -0,0 +1,158 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.util;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.sqoop.mapreduce.JobBase;
import org.apache.sqoop.mapreduce.db.DBConfiguration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TestMainframeFTPClientUtils {
private JobConf conf;
private FTPClient mockFTPClient;
@Before
public void setUp() {
conf = new JobConf();
mockFTPClient = mock(FTPClient.class);
when(mockFTPClient.getReplyString()).thenReturn("");
MainframeFTPClientUtils.setMockFTPClient(mockFTPClient);
}
@After
public void tearDown() {
MainframeFTPClientUtils.setMockFTPClient(null);
}
@Test
public void testAnonymous_VERBOSE_IllegelPort() {
try {
when(mockFTPClient.login("anonymous", "")).thenReturn(true);
when(mockFTPClient.logout()).thenReturn(true);
when(mockFTPClient.isConnected()).thenReturn(false);
when(mockFTPClient.getReplyCode()).thenReturn(200);
} catch (IOException e) {
fail("No IOException should be thrown!");
}
conf.set(DBConfiguration.URL_PROPERTY, "localhost:testPort");
conf.setBoolean(JobBase.PROPERTY_VERBOSE, true);
FTPClient ftp = null;
boolean success = false;
try {
ftp = MainframeFTPClientUtils.getFTPConnection(conf);
} catch (IOException ioe) {
fail("No IOException should be thrown!");
} finally {
success = MainframeFTPClientUtils.closeFTPConnection(ftp);
}
Assert.assertTrue(success);
}
@Test
public void testCannotConnect() {
try {
when(mockFTPClient.login("testUser", "")).thenReturn(false);
} catch (IOException ioe) {
fail("No IOException should be thrown!");
}
conf.set(DBConfiguration.URL_PROPERTY, "testUser:11111");
try {
MainframeFTPClientUtils.getFTPConnection(conf);
} catch (IOException ioe) {
Assert.assertEquals(
"java.io.IOException: FTP server testUser refused connection:",
ioe.toString());
}
}
@Test
public void testWrongUsername() {
try {
when(mockFTPClient.login("user", "pssword")).thenReturn(true);
when(mockFTPClient.logout()).thenReturn(true);
when(mockFTPClient.isConnected()).thenReturn(false);
when(mockFTPClient.getReplyCode()).thenReturn(200);
} catch (IOException e) {
fail("No IOException should be thrown!");
}
FTPClient ftp = null;
conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111");
conf.set(DBConfiguration.USERNAME_PROPERTY, "userr");
conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword");
// set the password in the secure credentials object
Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY);
conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY,
"pssword".getBytes());
try {
ftp = MainframeFTPClientUtils.getFTPConnection(conf);
} catch (IOException ioe) {
Assert.assertEquals(
"java.io.IOException: Could not login to server localhost:",
ioe.toString());
}
Assert.assertNull(ftp);
}
@Test
public void testNotListDatasets() {
try {
when(mockFTPClient.login("user", "pssword")).thenReturn(true);
when(mockFTPClient.logout()).thenReturn(true);
when(mockFTPClient.isConnected()).thenReturn(false);
when(mockFTPClient.getReplyCode()).thenReturn(200);
} catch (IOException e) {
fail("No IOException should be thrown!");
}
conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111");
conf.set(DBConfiguration.USERNAME_PROPERTY, "userr");
conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword");
// set the password in the secure credentials object
Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY);
conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY,
"pssword".getBytes());
try {
MainframeFTPClientUtils.listSequentialDatasets("pdsName", conf);
} catch (IOException ioe) {
Assert.assertEquals("java.io.IOException: "
+ "Could not list datasets from pdsName:"
+ "java.io.IOException: Could not login to server localhost:",
ioe.toString());
}
}
}