5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-08 13:40:28 +08:00

SQOOP-1676: Sqoop2: clientAPI.rst changes to reflect latest code

(Veena Basavaraj via Abraham Elmahrek)
This commit is contained in:
Abraham Elmahrek 2014-11-04 18:15:22 -08:00
parent 8fa9b90311
commit 00f99559e4
3 changed files with 161 additions and 180 deletions

View File

@ -14,22 +14,29 @@
limitations under the License.
======================
Sqoop Client API Guide
======================
===========================
Sqoop Java Client API Guide
===========================
This document will explain how to use Sqoop Client API with external application. Client API allows you to execute the functions of sqoop commands. It requires Sqoop Client JAR and its dependencies.
This document will explain how to use Sqoop Java Client API with external application. Client API allows you to execute the functions of sqoop commands. It requires Sqoop Client JAR and its dependencies.
Client API is explained using Generic JDBC Connector properties. Before executing the application using the sqoop client API, check whether sqoop server is running.
The main class that provides wrapper methods for all the supported operations is the
::
public class SqoopClient {
...
}
Java Client API is explained using Generic JDBC Connector example. Before executing the application using the sqoop client API, check whether sqoop server is running.
Workflow
========
Given workflow has to be followed for executing a job in Sqoop server.
Given workflow has to be followed for executing a sqoop job in Sqoop server.
1. Create connection using Connector ID (cid) - Creates connection and returns connection ID (xid)
2. Create Job using Connection ID (xid) - Create job and returns Job ID (jid)
3. Job submission with Job ID (jid) - Submit sqoop Job to server
1. Create LINK object for a given connectorId - Creates Link object and returns linkId (lid)
2. Create a JOB for a given "from" and "to" linkId - Create Job object and returns jobId (jid)
3. Start the JOB for a given jobId - Start Job on the server and creates a submission record
Project Dependencies
====================
@ -60,203 +67,176 @@ Server URL value can be modfied by setting value to setServerUrl(String) method
client.setServerUrl(newUrl);
Connection
==========
Link
====
Connectors provide the facility to interact with many data sources and thus can be used as a means to transfer data between them in Sqoop. The registered connector implementation will provide logic to read from and/or write to a data source that it represents. A connector can have one or more links associated with it. The java client API allows you to create, update and delete a link for any registered connector. Creating or updating a link requires you to populate the Link Config for that particular connector. Hence the first thing to do is get the list of registered connectors and select the connector for which you would like to create a link. Then
you can get the list of all the config/inputs using `Display Config and Input Names For Connector`_ for that connector.
Client API allows you to create, update and delete connection. For creating or updating connection requires Connector forms and Framwork Forms. User has to retrive the connector and framework forms, then update the values.
Create Connection
-----------------
Save Link
---------
First create a new connection by invoking newConnection(cid) method with connector ID and returns a MConnection object with dummy id. Then fill the connection and framework forms as given below. Invoke create connection with updated connection object.
First create a new link by invoking ``createLink(cid)`` method with connector Id and it returns a MLink object with dummy id and the unfilled link config inputs for that connector. Then fill the config inputs with relevant values. Invoke ``saveLink`` passing it the filled MLink object.
::
//Dummy connection object
MConnection newCon = client.newConnection(1);
//Get connection and framework forms. Set name for connection
MConnectionForms conForms = newCon.getConnectorPart();
MConnectionForms frameworkForms = newCon.getFrameworkPart();
newCon.setName("MyConnection");
//Set connection forms values
conForms.getStringInput("connection.connectionString").setValue("jdbc:mysql://localhost/my");
conForms.getStringInput("connection.jdbcDriver").setValue("com.mysql.jdbc.Driver");
conForms.getStringInput("connection.username").setValue("root");
conForms.getStringInput("connection.password").setValue("root");
frameworkForms.getIntegerInput("security.maxConnections").setValue(0);
Status status = client.createConnection(newCon);
// create a placeholder for link
long connectorId = 1;
MLink link = client.createLink(connectorId);
link.setName("Vampire");
link.setCreationUser("Buffy");
MLinkConfig linkConfig = link.getConnectorLinkConfig();
// fill in the link config values
linkConfig.getStringInput("linkConfig.connectionString").setValue("jdbc:mysql://localhost/my");
linkConfig.getStringInput("linkConfig.jdbcDriver").setValue("com.mysql.jdbc.Driver");
linkConfig.getStringInput("linkConfig.username").setValue("root");
linkConfig.getStringInput("linkConfig.password").setValue("root");
// save the link object that was filled
Status status = client.saveLink(link);
if(status.canProceed()) {
System.out.println("Created. New Connection ID : " +newCon.getPersistenceId());
System.out.println("Created Link with Link Id : " + link.getPersistenceId());
} else {
System.out.println("Check for status and forms error ");
System.out.println("Something went wrong creating the link");
}
status.canProceed() returns true if status is FINE or ACCEPTABLE. Above code has given status after validation of connector and framework forms.
``status.canProceed()`` returns true if status is OK or a WARNING. Before sending the status, the link config values are validated using the corresponding validator associated with th link config inputs.
On successful execution, new connection ID is assigned for the connection. getPersistenceId() method returns ID.
User can retrieve a connection using below methods
On successful execution of the saveLink method, new link Id is assigned to the link object else an exception is thrown. ``link.getPersistenceId()`` method returns the unique Id for this object persisted in the sqoop repository.
User can retrieve a link using the following methods
+----------------------------+--------------------------------------+
| Method | Description |
+============================+======================================+
| ``getConnection(xid)`` | Returns a connection object. |
| ``getLink(lid)`` | Returns a link by id |
+----------------------------+--------------------------------------+
| ``getConnections()`` | Returns list of connection object |
| ``getLinks()`` | Returns list of links in the sqoop |
+----------------------------+--------------------------------------+
List of status code
-------------------
Job
===
A sqoop job holds the ``From`` and ``To`` parts for transferring data from the ``From`` data source to the ``To`` data source. Both the ``From`` and the ``To`` are uniquely identified by their corresponding connector Link Ids. i.e when creating a job we have to specifiy the ``FromLinkId`` and the ``ToLinkId``. Thus the pre-requisite for creating a job is to first create the links as described above.
Once the linkIds for the ``From`` and ``To`` are given, then the job configs for the associated connector for the link object have to be filled. You can get the list of all the from and to job config/inputs using `Display Config and Input Names For Connector`_ for that connector. A connector can have one or more links. We then use the links in the ``From`` and ``To`` direction to populate the corresponding ``MFromConfig`` and ``MToConfig`` respectively.
In addition to filling the job configs for the ``From`` and the ``To`` representing the link, we also need to fill the driver configs that control the job execution engine environment. For example, if the job execution engine happens to be the MapReduce we will specifiy the number of mappers to be used in reading data from the ``From`` data source.
Save Job
---------
Here is the code to create and then save a job
::
String url = "http://localhost:12000/sqoop/";
SqoopClient client = new SqoopClient(url);
//Creating dummy job object
long fromLinkId = 1;// for jdbc connector
long toLinkId = 2; // for HDFS connector
MJob job = client.createJob(fromLinkId, toLinkId);
job.setName("Vampire");
job.setCreationUser("Buffy");
// set the "FROM" link job config values
MFromConfig fromJobConfig = job.getFromJobConfig();
fromJobConfig.getStringInput("fromJobConfig.schemaName").setValue("sqoop");
fromJobConfig.getStringInput("fromJobConfig.tableName").setValue("sqoop");
fromJobConfig.getStringInput("fromJobConfig.partitionColumn").setValue("id");
// set the "TO" link job config values
MToConfig toJobConfig = job.getToJobConfig();
toJobConfig.getStringInput("toJobConfig.outputDirectory").setValue("/usr/tmp");
// set the driver config values
MDriverConfig driverConfig = job.getDriverConfig();
driverConfig.getStringInput("throttlingConfig.numExtractors").setValue("3");
Status status = client.saveJob(job);
if(status.canProceed()) {
System.out.println("Created Job with Job Id: "+ job.getPersistenceId());
} else {
System.out.println("Something went wrong creating the job");
}
User can retrieve a job using the following methods
+----------------------------+--------------------------------------+
| Method | Description |
+============================+======================================+
| ``getJob(jid)`` | Returns a job by id |
+----------------------------+--------------------------------------+
| ``getJobs()`` | Returns list of jobs in the sqoop |
+----------------------------+--------------------------------------+
List of status codes
--------------------
+------------------+------------------------------------------------------------------------------------------------------------+
| Function | Description |
+==================+============================================================================================================+
| ``OK`` | There are no issues, no warnings. |
+------------------+------------------------------------------------------------------------------------------------------------+
| ``WARNING`` | Validated entity is correct enough to be processed. There might be some warnings, but no errors. |
| ``WARNING`` | Validated entity is correct enough to be proceed. Not a fatal error |
+------------------+------------------------------------------------------------------------------------------------------------+
| ``ERROR`` | There are serious issues with validated entity. We can't proceed until reported issues will be resolved. |
+------------------+------------------------------------------------------------------------------------------------------------+
View Error or Warning message
-----------------------------
View Error or Warning valdiation message
----------------------------------------
In case of any UNACCEPTABLE AND ACCEPTABLE status, user has to iterate the connector part forms and framework part forms for getting actual error or warning message. Below piece of code describe how to itereate over the forms for input message.
In case of any WARNING AND ERROR status, user has to iterate the list of validation messages.
::
printMessage(newCon.getConnectorPart().getForms());
printMessage(newCon.getFrameworkPart().getForms());
printMessage(link.getConnectorLinkConfig().getConfigs());
private static void printMessage(List<MForm> formList) {
for(MForm form : formList) {
List<MInput<?>> inputlist = form.getInputs();
if (form.getValidationMessage() != null) {
System.out.println("Form message: " + form.getValidationMessage());
private static void printMessage(List<MConfig> configs) {
for(MConfig config : configs) {
List<MInput<?>> inputlist = config.getInputs();
if (config.getValidationMessages() != null) {
// print every validation message
for(Message message : config.getValidationMessages()) {
System.out.println("Config validation message: " + message.getMessage());
}
}
for (MInput minput : inputlist) {
if (minput.getValidationStatus() == Status.ACCEPTABLE) {
System.out.println("Warning:" + minput.getValidationMessage());
} else if (minput.getValidationStatus() == Status.UNACCEPTABLE) {
System.out.println("Error:" + minput.getValidationMessage());
if (minput.getValidationStatus() == Status.WARNING) {
for(Message message : config.getValidationMessages()) {
System.out.println("Config Input Validation Warning: " + message.getMessage());
}
}
}
}
else if (minput.getValidationStatus() == Status.ERROR) {
for(Message message : config.getValidationMessages()) {
System.out.println("Config Input Validation Error: " + message.getMessage());
}
}
}
}
Job
===
A job object holds database configurations, input or output configurations and resources required for executing as a hadoop job. Create job object requires filling connector part and framework part forms.
Below given code shows how to create a import job
::
String url = "http://localhost:12000/sqoop/";
SqoopClient client = new SqoopClient(url);
//Creating dummy job object
MJob newjob = client.newJob(1, org.apache.sqoop.model.MJob.Type.IMPORT);
MJobForms connectorForm = newjob.getConnectorPart();
MJobForms frameworkForm = newjob.getFrameworkPart();
newjob.setName("ImportJob");
//Database configuration
connectorForm.getStringInput("table.schemaName").setValue("");
//Input either table name or sql
connectorForm.getStringInput("table.tableName").setValue("table");
//connectorForm.getStringInput("table.sql").setValue("select id,name from table where ${CONDITIONS}");
connectorForm.getStringInput("table.columns").setValue("id,name");
connectorForm.getStringInput("table.partitionColumn").setValue("id");
//Set boundary value only if required
//connectorForm.getStringInput("table.boundaryQuery").setValue("");
//Output configurations
frameworkForm.getEnumInput("output.storageType").setValue("HDFS");
frameworkForm.getEnumInput("output.outputFormat").setValue("TEXT_FILE");//Other option: SEQUENCE_FILE
frameworkForm.getStringInput("output.outputDirectory").setValue("/output");
//Job resources
frameworkForm.getIntegerInput("throttling.extractors").setValue(1);
frameworkForm.getIntegerInput("throttling.loaders").setValue(1);
Status status = client.createJob(newjob);
if(status.canProceed()) {
System.out.println("New Job ID: "+ newjob.getPersistenceId());
} else {
System.out.println("Check for status and forms error ");
}
//Print errors or warnings
printMessage(newjob.getConnectorPart().getForms());
printMessage(newjob.getFrameworkPart().getForms());
Export job creation is same as import job, but only few input configuration changes
::
String url = "http://localhost:12000/sqoop/";
SqoopClient client = new SqoopClient(url);
MJob newjob = client.newJob(1, org.apache.sqoop.model.MJob.Type.EXPORT);
MJobForms connectorForm = newjob.getConnectorPart();
MJobForms frameworkForm = newjob.getFrameworkPart();
newjob.setName("ExportJob");
//Database configuration
connectorForm.getStringInput("table.schemaName").setValue("");
//Input either table name or sql
connectorForm.getStringInput("table.tableName").setValue("table");
//connectorForm.getStringInput("table.sql").setValue("select id,name from table where ${CONDITIONS}");
connectorForm.getStringInput("table.columns").setValue("id,name");
//Input configurations
frameworkForm.getStringInput("input.inputDirectory").setValue("/input");
//Job resources
frameworkForm.getIntegerInput("throttling.extractors").setValue(1);
frameworkForm.getIntegerInput("throttling.loaders").setValue(1);
Status status = client.createJob(newjob);
if(status.canProceed()) {
System.out.println("New Job ID: "+ newjob.getPersistenceId());
} else {
System.out.println("Check for status and forms error ");
}
//Print errors or warnings
printMessage(newjob.getConnectorPart().getForms());
printMessage(newjob.getFrameworkPart().getForms());
Managing connection and job
---------------------------
After creating connection or job object, you can update or delete a connection or job using given functions
Updating link and job
---------------------
After creating link or job in the repository, you can update or delete a link or job using the following functions
+----------------------------------+------------------------------------------------------------------------------------+
| Method | Description |
+==================================+====================================================================================+
| ``updateConnection(connection)`` | Invoke update with connection object and check status for any errors or warnings |
| ``updateLink(link)`` | Invoke update with link and check status for any errors or warnings |
+----------------------------------+------------------------------------------------------------------------------------+
| ``deleteConnection(xid)`` | Delete connection. Deletes only if specified connection is used by any job |
| ``deleteLink(lid)`` | Delete link. Deletes only if specified link is not used by any job |
+----------------------------------+------------------------------------------------------------------------------------+
| ``updateJob(job)`` | Invoke update with job object and check status for any errors or warnings |
| ``updateJob(job)`` | Invoke update with job and check status for any errors or warnings |
+----------------------------------+------------------------------------------------------------------------------------+
| ``deleteJob(jid)`` | Delete job |
+----------------------------------+------------------------------------------------------------------------------------+
Job Submission
Job Start
==============
Job submission requires a job id. On successful submission, getStatus() method returns "BOOTING" or "RUNNING".
Starting a job requires a job id. On successful start, getStatus() method returns "BOOTING" or "RUNNING".
::
//Job submission start
MSubmission submission = client.startSubmission(1);
System.out.println("Status : " + submission.getStatus());
//Job start
long jobId = 1;
MSubmission submission = client.startJob(jobId);
System.out.println("Job Submission Status : " + submission.getStatus());
if(submission.getStatus().isRunning() && submission.getProgress() != -1) {
System.out.println("Progress : " + String.format("%.2f %%", submission.getProgress() * 100));
}
@ -281,42 +261,44 @@ Job submission requires a job id. On successful submission, getStatus() method r
}
//Check job status
MSubmission submission = client.getSubmissionStatus(1);
//Check job status for a running job
MSubmission submission = client.getJobStatus(jobId);
if(submission.getStatus().isRunning() && submission.getProgress() != -1) {
System.out.println("Progress : " + String.format("%.2f %%", submission.getProgress() * 100));
}
//Stop a running job
submission.stopSubmission(jid);
submission.stopJob(jobId);
Above code block, job submission is asynchronous. For synchronous job submission, use startSubmission(jid, callback, pollTime) method. If user is not interested in getting submission status, then invoke method with null for callback parameter and returns final submission status. Polltime is request interval for getting submission status from sqoop server and value should be greater than zero. Frequently hit the sqoop server if the low value is set to pollTime.
When a synchronous job is submission started with callback, first invokes the callback's submitted(MSubmission) method on successful submission, after every poll time interval invokes updated(MSubmission) and finally on finished executing the job invokes finished(MSubmission) method.
Above code block, job start is asynchronous. For synchronous job start, use ``startJob(jid, callback, pollTime)`` method. If you are not interested in getting the job status, then invoke the same method with "null" as the value for the callback parameter and this returns the final job status. ``pollTime`` is the request interval for getting the job status from sqoop server and the value should be greater than zero. We will frequently hit the sqoop server if a low value is given for the ``pollTime``. When a synchronous job is started with a non null callback, it first invokes the callback's ``submitted(MSubmission)`` method on successful start, after every poll time interval, it then invokes the ``updated(MSubmission)`` method on the callback API and finally on finishing the job executuon it invokes the ``finished(MSubmission)`` method on the callback API.
Describe Forms
==========================
Display Config and Input Names For Connector
============================================
You can view the connection or job forms input values with labels of built-in resource bundle.
You can view the config/input names for the link and job config types per connector
::
String url = "http://localhost:12000/sqoop/";
SqoopClient client = new SqoopClient(url);
//Use getJob(jid) for describing job.
//While printing connection forms, pass connector id to getResourceBundle(cid).
describe(client.getConnection(1).getConnectorPart().getForms(), client.getResourceBundle(1));
describe(client.getConnection(1).getFrameworkPart().getForms(), client.getFrameworkResourceBundle());
long connectorId = 1;
// link config for connector
describe(client.getConnector(connectorId).getLinkConfig().getConfigs(), client.getConnectorConfigBundle(connectorId));
// from job config for connector
describe(client.getConnector(connectorId).getFromConfig().getConfigs(), client.getConnectorConfigBundle(connectorId));
// to job config for the connector
describe(client.getConnector(connectorId).getToConfig().getConfigs(), client.getConnectorConfigBundle(connectorId));
void describe(List<MForm> forms, ResourceBundle resource) {
for (MForm mf : forms) {
System.out.println(resource.getString(mf.getLabelKey())+":");
List<MInput<?>> mis = mf.getInputs();
for (MInput mi : mis) {
System.out.println(resource.getString(mi.getLabelKey()) + " : " + mi.getValue());
void describe(List<MConfig> configs, ResourceBundle resource) {
for (MConfig config : configs) {
System.out.println(resource.getString(config.getLabelKey())+":");
List<MInput<?>> inputs = config.getInputs();
for (MInput input : inputs) {
System.out.println(resource.getString(input.getLabelKey()) + " : " + input.getValue());
}
System.out.println();
}
}
Above Sqoop 2 Client API tutorial explained you how to create connection, create job and submit job.
Above Sqoop 2 Client API tutorial explained how to create a link, create job and and then start the job.

View File

@ -26,7 +26,7 @@ different data sources. Each connector will primarily focus on a particular data
What is a Sqoop Connector?
++++++++++++++++++++++++++
The connector provides the facilities to interact with varied data sources that can be used as a means to transfer between them. The connector implementation will provide logic to read from and/or write to a data source that it represents. For instance the ( ``GenericJdbcConnector`` ) encapsulates the logic to read from and/or write to jdbc enabled relational data sources. The connector part that enables reading from a data source and transferring this data to internal Sqoop format is called the FROM and the part that enables writng data to a data source by transferring data from Sqoop format is called TO. In order to interact with these data sources, the connector will provide one or many config classes and input fields within it.
Connectors provide the facility to interact with many data sources and thus can be used as a means to transfer data between them in Sqoop. The connector implementation will provide logic to read from and/or write to a data source that it represents. For instance the ( ``GenericJdbcConnector`` ) encapsulates the logic to read from and/or write to jdbc enabled relational data sources. The connector part that enables reading from a data source and transferring this data to internal Sqoop format is called the FROM and the part that enables writng data to a data source by transferring data from Sqoop format is called TO. In order to interact with these data sources, the connector will provide one or many config classes and input fields within it.
Broadly we support two main config types for connectors, link type represented by the enum ``ConfigType.LINK`` and job type represented by the enum ``ConfigType.JOB``. Link config represents the properties to physically connect to the data source. Job config represent the properties that are required to invoke reading from and/or writing to particular dataset in the data source it connects to. If a connector supports both reading from and writing to, it will provide the ``FromJobConfig`` and ``ToJobConfig`` objects. Each of these config objects are custom to each connector and can have one or more inputs associated with each of the Link, FromJob and ToJob config types. Hence we call the connectors as configurables i.e an entity that can provide configs for interacting with the data source it represents. As the connectors evolve over time to support new features in their data sources, the configs and inputs will change as well. Thus the connector API also provides methods for upgrading the config and input names and data related to these data sources across different versions.
@ -96,9 +96,8 @@ The ``getFrom`` method returns From_ instance which is a ``Transferable`` entity
GenericJdbcPartitioner.class,
GenericJdbcExtractor.class,
GenericJdbcFromDestroyer.class);
...
@Override
public From getFrom() {
return FROM;
@ -117,7 +116,7 @@ Initializer is instantiated before the submission of sqoop job to the execution
public List<String> getJars(InitializerContext context, LinkConfiguration linkConfiguration,
JobConfiguration jobConfiguration);
public abstract Schema getSchema(InitializerContext context, LinkConfiguration linkConfiguration,
JobConfiguration jobConfiguration);
@ -168,7 +167,7 @@ Extractor (E for ETL) extracts data from a given data source
The ``extract`` method extracts data from the data source using the link and job configuration properties and writes it to the ``DataWriter`` (provided by the extractor context) as the default `Intermediate representation`_ .
Extractors use Writer's provided by the ExtractorContext to send a record through the sqoop system.
Extractors use Writer's provided by the ExtractorContext to send a record through the sqoop system.
::
context.getDataWriter().writeArrayRecord(array);
@ -193,9 +192,8 @@ The ``getTo`` method returns ``TO`` instance which is a ``Transferable`` entity
GenericJdbcToInitializer.class,
GenericJdbcLoader.class,
GenericJdbcToDestroyer.class);
...
@Override
public To getTo() {
return TO;
@ -348,7 +346,7 @@ Validators validate the config objects and the inputs associated with the config
@Input(size = 128, validators = {@Validator(value = StartsWith.class, strArg = "jdbc:")} )
@Input(size = 255, validators = { @Validator(NotEmpty.class) })
Sqoop 2 provides a list of standard input validators that can be used by different connectors for the link and job type configuration inputs.
::

View File

@ -24,6 +24,7 @@ In order to perform the maintenance task each tool is suppose to do, they need t
.. note:: Running tools while the Sqoop Server is also running is not recommended as it might lead to a data corruption and service disruption.
List of available tools:
* verify