5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-08 11:42:27 +08:00

SQOOP-1965: Sqoop2: Naming support for Connector, Link, Job instances

(Banmeet Singh via Abraham Elmahrek)
This commit is contained in:
Abraham Elmahrek 2015-06-22 11:27:20 +02:00
parent d593a550aa
commit d16566ace2
24 changed files with 321 additions and 142 deletions

View File

@ -296,6 +296,7 @@ public MLink createLink(String connectorName) {
return createLink(connector.getPersistenceId());
}
/**
* Retrieve link for given id.
*
@ -303,7 +304,18 @@ public MLink createLink(String connectorName) {
* @return
*/
public MLink getLink(long linkId) {
return resourceRequests.readLink(linkId).getLinks().get(0);
//Cast long to string and pass (retained to prevent other functionality from breaking)
return resourceRequests.readLink(String.valueOf(linkId)).getLinks().get(0);
}
/**
* Retrieve link for given name.
*
* @param linkName Link name
* @return
*/
public MLink getLink(String linkName) {
return resourceRequests.readLink(linkName).getLinks().get(0);
}
/**
@ -335,6 +347,16 @@ public Status updateLink(MLink link) {
return applyLinkValidations(resourceRequests.updateLink(link), link);
}
/**
* Enable/disable link with given name
*
* @param linkName link name
* @param enabled Enable or disable
*/
public void enableLink(String linkName, boolean enabled) {
resourceRequests.enableLink(linkName, enabled);
}
/**
* Enable/disable link with given id
*
@ -342,7 +364,16 @@ public Status updateLink(MLink link) {
* @param enabled Enable or disable
*/
public void enableLink(long linkId, boolean enabled) {
resourceRequests.enableLink(linkId, enabled);
resourceRequests.enableLink(String.valueOf(linkId), enabled);
}
/**
* Delete link with given name
*
* @param linkName link name
*/
public void deleteLink(String linkName) {
resourceRequests.deleteLink(linkName);
}
/**
@ -351,7 +382,29 @@ public void enableLink(long linkId, boolean enabled) {
* @param linkId link id
*/
public void deleteLink(long linkId) {
resourceRequests.deleteLink(linkId);
resourceRequests.deleteLink(String.valueOf(linkId));
}
/**
* Create new job the for given links.
*
* @param fromLinkName From link name
* @param toLinkName To link name
* @return
*/
public MJob createJob(String fromLinkName, String toLinkName) {
MLink fromLink = getLink(fromLinkName);
MLink toLink = getLink(toLinkName);
return new MJob(
fromLink.getConnectorId(),
toLink.getConnectorId(),
fromLink.getPersistenceId(),
toLink.getPersistenceId(),
getConnector(fromLink.getConnectorId()).getFromConfig(),
getConnector(toLink.getConnectorId()).getToConfig(),
getDriverConfig()
);
}
/**
@ -366,13 +419,13 @@ public MJob createJob(long fromLinkId, long toLinkId) {
MLink toLink = getLink(toLinkId);
return new MJob(
fromLink.getConnectorId(),
toLink.getConnectorId(),
fromLink.getPersistenceId(),
toLink.getPersistenceId(),
getConnector(fromLink.getConnectorId()).getFromConfig(),
getConnector(toLink.getConnectorId()).getToConfig(),
getDriverConfig()
fromLink.getConnectorId(),
toLink.getConnectorId(),
fromLink.getPersistenceId(),
toLink.getPersistenceId(),
getConnector(fromLink.getConnectorId()).getFromConfig(),
getConnector(toLink.getConnectorId()).getToConfig(),
getDriverConfig()
);
}
@ -383,7 +436,18 @@ public MJob createJob(long fromLinkId, long toLinkId) {
* @return
*/
public MJob getJob(long jobId) {
return resourceRequests.readJob(jobId).getJobs().get(0);
//Cast long to string and pass (retained to prevent other functionality from breaking)
return resourceRequests.readJob(String.valueOf(jobId)).getJobs().get(0);
}
/**
* Retrieve job for given name.
*
* @param jobName Job name
* @return
*/
public MJob getJob(String jobName) {
return resourceRequests.readJob(jobName).getJobs().get(0);
}
/**
@ -396,12 +460,21 @@ public List<MJob> getJobs() {
}
/**
* Retrieve list of all jobs by connector
* Retrieve list of all jobs by connector id
*
* @return
*/
public List<MJob> getJobsByConnector(long cId) {
return resourceRequests.readJobsByConnector(cId).getJobs();
return resourceRequests.readJobsByConnector(String.valueOf(cId)).getJobs();
}
/**
* Retrieve list of all jobs by connector name
*
* @return
*/
public List<MJob> getJobsByConnector(String cName) {
return resourceRequests.readJobsByConnector(cName).getJobs();
}
/**
@ -424,22 +497,51 @@ public Status updateJob(MJob job) {
}
/**
* Enable/disable job with given id
* Enable/disable job with given name
*
* @param jid Job that is going to be enabled/disabled
* @param jName Job that is going to be enabled/disabled
* @param enabled Enable or disable
*/
public void enableJob(long jid, boolean enabled) {
resourceRequests.enableJob(jid, enabled);
public void enableJob(String jName, boolean enabled) {
resourceRequests.enableJob(jName, enabled);
}
/**
* Delete job with given id.
* Enable/disable job with given id
*
* @param jId Job that is going to be enabled/disabled
* @param enabled Enable or disable
*/
public void enableJob(long jId, boolean enabled) {
resourceRequests.enableJob(String.valueOf(jId), enabled);
}
/**
* Delete job with given name.
*
* @param jobName Job name
*/
public void deleteJob(String jobName) {
resourceRequests.deleteJob(jobName);
}
/**
* Delete job with given id
*
* @param jobId Job id
*/
public void deleteJob(long jobId) {
resourceRequests.deleteJob(jobId);
resourceRequests.deleteJob(String.valueOf(jobId));
}
/**
* Start job with given name.
*
* @param jobName Job name
* @return
*/
public MSubmission startJob(String jobName) {
return resourceRequests.startJob(jobName).getSubmissions().get(0);
}
/**
@ -449,20 +551,20 @@ public void deleteJob(long jobId) {
* @return
*/
public MSubmission startJob(long jobId) {
return resourceRequests.startJob(jobId).getSubmissions().get(0);
return resourceRequests.startJob(String.valueOf(jobId)).getSubmissions().get(0);
}
/**
* Method used for synchronous job submission.
* Pass null to callback parameter if submission status is not required and after completion
* job execution returns MSubmission which contains final status of submission.
* @param jobId - Job ID
* @param jobName - Job name
* @param callback - User may set null if submission status is not required, else callback methods invoked
* @param pollTime - Server poll time
* @return MSubmission - Final status of job submission
* @throws InterruptedException
*/
public MSubmission startJob(long jobId, SubmissionCallback callback, long pollTime)
public MSubmission startJob(String jobName, SubmissionCallback callback, long pollTime)
throws InterruptedException {
if(pollTime <= 0) {
throw new SqoopException(ClientError.CLIENT_0002);
@ -470,7 +572,7 @@ public MSubmission startJob(long jobId, SubmissionCallback callback, long pollTi
//TODO(https://issues.apache.org/jira/browse/SQOOP-1652): address the submit/start/first terminology difference
// What does first even mean in s distributed client/server model?
boolean first = true;
MSubmission submission = resourceRequests.startJob(jobId).getSubmissions().get(0);
MSubmission submission = resourceRequests.startJob(jobName).getSubmissions().get(0);
// what happens when the server fails, do we just say finished?
while(submission.getStatus().isRunning()) {
if(first) {
@ -480,12 +582,19 @@ public MSubmission startJob(long jobId, SubmissionCallback callback, long pollTi
invokeSubmissionCallback(callback, submission, SubmissionStatus.UPDATED);
}
Thread.sleep(pollTime);
submission = getJobStatus(jobId);
//Works with both name as well as id (in string form) as argument
submission = getJobStatus(jobName);
}
invokeSubmissionCallback(callback, submission, SubmissionStatus.FINISHED);
return submission;
}
public MSubmission startJob(long jobId, SubmissionCallback callback, long pollTime)
throws InterruptedException {
return startJob(String.valueOf(jobId), callback, pollTime);
}
/**
* Invokes the callback's methods with MSubmission object
* based on SubmissionStatus. If callback is null, no operation performed.
@ -513,13 +622,33 @@ private void invokeSubmissionCallback(SubmissionCallback callback, MSubmission s
}
/**
* stop job with given id.
* stop job with given name.
*
* @param jid Job id
* @param jName Job name
* @return
*/
public MSubmission stopJob(long jid) {
return resourceRequests.stopJob(jid).getSubmissions().get(0);
public MSubmission stopJob(String jName) {
return resourceRequests.stopJob(jName).getSubmissions().get(0);
}
/**
* stop job with given id.
*
* @param jId Job id
* @return
*/
public MSubmission stopJob(long jId) {
return resourceRequests.stopJob(String.valueOf(jId)).getSubmissions().get(0);
}
/**
* Get status for given job name.
*
* @param jName Job name
* @return
*/
public MSubmission getJobStatus(String jName) {
return resourceRequests.getJobStatus(jName).getSubmissions().get(0);
}
/**
@ -529,7 +658,7 @@ public MSubmission stopJob(long jid) {
* @return
*/
public MSubmission getJobStatus(long jid) {
return resourceRequests.getJobStatus(jid).getSubmissions().get(0);
return resourceRequests.getJobStatus(String.valueOf(jid)).getSubmissions().get(0);
}
/**
@ -548,7 +677,17 @@ public List<MSubmission> getSubmissions() {
* @return
*/
public List<MSubmission> getSubmissionsForJob(long jobId) {
return resourceRequests.readSubmission(jobId).getSubmissions();
return resourceRequests.readSubmission(String.valueOf(jobId)).getSubmissions();
}
/**
* Retrieve list of submissions for given job name.
*
* @param jobName Job name
* @return
*/
public List<MSubmission> getSubmissionsForJob(String jobName) {
return resourceRequests.readSubmission(jobName).getSubmissions();
}
/**

View File

@ -48,27 +48,27 @@ public JobResourceRequest(DelegationTokenAuthenticatedURL.Token token){
super(token);
}
public JobBean readByConnector(String serverUrl, Long cId) {
public JobBean readByConnector(String serverUrl, String cArg) {
JobsBean bean = new JobsBean();
if (cId != null) {
String response = super.get(serverUrl + RESOURCE + "?cname=" + cId);
if (cArg != null) {
String response = super.get(serverUrl + RESOURCE + "?cname=" + cArg);
JSONObject jsonObject = JSONUtils.parse(response);
bean.restore(jsonObject);
}
return bean;
}
public JobBean read(String serverUrl, Long jobId) {
public JobBean read(String serverUrl, String jobArg) {
String response;
if (jobId == null) {
if (jobArg == null) {
response = super.get(serverUrl + RESOURCE + "all");
} else {
response = super.get(serverUrl + RESOURCE + jobId);
response = super.get(serverUrl + RESOURCE + jobArg);
}
JSONObject jsonObject = JSONUtils.parse(response);
// defaults to all
JobBean bean = new JobsBean();
if (jobId != null) {
if (jobArg != null) {
bean = new JobBean();
}
bean.restore(jsonObject);
@ -96,30 +96,30 @@ public ValidationResultBean update(String serverUrl, MJob job) {
return validationBean;
}
public void delete(String serverUrl, Long jobId) {
super.delete(serverUrl + RESOURCE + jobId);
public void delete(String serverUrl, String jobArg) {
super.delete(serverUrl + RESOURCE + jobArg);
}
public void enable(String serverUrl, Long jobId, Boolean enabled) {
public void enable(String serverUrl, String jobArg, Boolean enabled) {
if (enabled) {
super.put(serverUrl + RESOURCE + jobId + ENABLE, null);
super.put(serverUrl + RESOURCE + jobArg + ENABLE, null);
} else {
super.put(serverUrl + RESOURCE + jobId + DISABLE, null);
super.put(serverUrl + RESOURCE + jobArg + DISABLE, null);
}
}
public SubmissionBean start(String serverUrl, Long jobId) {
String response = super.put(serverUrl + RESOURCE + jobId + START, null);
public SubmissionBean start(String serverUrl, String jobArg) {
String response = super.put(serverUrl + RESOURCE + jobArg + START, null);
return createJobSubmissionResponse(response);
}
public SubmissionBean stop(String serverUrl, Long jobId) {
String response = super.put(serverUrl + RESOURCE + jobId + STOP, null);
public SubmissionBean stop(String serverUrl, String jobArg) {
String response = super.put(serverUrl + RESOURCE + jobArg + STOP, null);
return createJobSubmissionResponse(response);
}
public SubmissionBean status(String serverUrl, Long jobId) {
String response = super.get(serverUrl + RESOURCE + jobId + STATUS);
public SubmissionBean status(String serverUrl, String jobArg) {
String response = super.get(serverUrl + RESOURCE + jobArg + STATUS);
return createJobSubmissionResponse(response);
}

View File

@ -44,17 +44,17 @@ public LinkResourceRequest(DelegationTokenAuthenticatedURL.Token token){
super(token);
}
public LinkBean read(String serverUrl, Long linkId) {
public LinkBean read(String serverUrl, String linkArg) {
String response;
if (linkId == null) {
if (linkArg == null) {
response = super.get(serverUrl + LINK_RESOURCE + "all");
} else {
response = super.get(serverUrl + LINK_RESOURCE + linkId);
response = super.get(serverUrl + LINK_RESOURCE + linkArg);
}
JSONObject jsonObject = JSONUtils.parse(response);
// defaults to all
LinkBean bean = new LinksBean();
if (linkId != null) {
if (linkArg != null) {
bean = new LinkBean();
}
bean.restore(jsonObject);
@ -81,15 +81,15 @@ public ValidationResultBean update(String serverUrl, MLink link) {
return validationBean;
}
public void delete(String serverUrl, Long id) {
super.delete(serverUrl + LINK_RESOURCE + id);
public void delete(String serverUrl, String arg) {
super.delete(serverUrl + LINK_RESOURCE + arg);
}
public void enable(String serverUrl, Long id, Boolean enabled) {
public void enable(String serverUrl, String lArg, Boolean enabled) {
if (enabled) {
super.put(serverUrl + LINK_RESOURCE + id + ENABLE, null);
super.put(serverUrl + LINK_RESOURCE + lArg + ENABLE, null);
} else {
super.put(serverUrl + LINK_RESOURCE + id + DISABLE, null);
super.put(serverUrl + LINK_RESOURCE + lArg + DISABLE, null);
}
}
}

View File

@ -109,60 +109,60 @@ public ValidationResultBean saveLink(MLink link) {
return getLinkResourceRequest().create(serverUrl, link);
}
public LinkBean readLink(Long linkId) {
return getLinkResourceRequest().read(serverUrl, linkId);
public LinkBean readLink(String linkArg) {
return getLinkResourceRequest().read(serverUrl, linkArg);
}
public ValidationResultBean updateLink(MLink link) {
return getLinkResourceRequest().update(serverUrl, link);
}
public void enableLink(Long lid, Boolean enabled) {
getLinkResourceRequest().enable(serverUrl, lid, enabled);
public void enableLink(String lArg, Boolean enabled) {
getLinkResourceRequest().enable(serverUrl, lArg, enabled);
}
public void deleteLink(Long lid) {
getLinkResourceRequest().delete(serverUrl, lid);
public void deleteLink(String lArg) {
getLinkResourceRequest().delete(serverUrl, lArg);
}
public ValidationResultBean saveJob(MJob job) {
return getJobResourceRequest().create(serverUrl, job);
}
public JobBean readJob(Long jobId) {
return getJobResourceRequest().read(serverUrl, jobId);
public JobBean readJob(String jobArg) {
return getJobResourceRequest().read(serverUrl, jobArg);
}
public JobBean readJobsByConnector(Long cId) {
return getJobResourceRequest().readByConnector(serverUrl, cId);
public JobBean readJobsByConnector(String cArg) {
return getJobResourceRequest().readByConnector(serverUrl, cArg);
}
public ValidationResultBean updateJob(MJob job) {
return getJobResourceRequest().update(serverUrl, job);
}
public void enableJob(Long jid, Boolean enabled) {
getJobResourceRequest().enable(serverUrl, jid, enabled);
public void enableJob(String jArg, Boolean enabled) {
getJobResourceRequest().enable(serverUrl, jArg, enabled);
}
public void deleteJob(Long jid) {
getJobResourceRequest().delete(serverUrl, jid);
public void deleteJob(String jArg) {
getJobResourceRequest().delete(serverUrl, jArg);
}
public SubmissionBean getJobStatus(Long jid) {
return getJobResourceRequest().status(serverUrl, jid);
public SubmissionBean getJobStatus(String jArg) {
return getJobResourceRequest().status(serverUrl, jArg);
}
public SubmissionBean startJob(Long jid) {
return getJobResourceRequest().start(serverUrl, jid);
public SubmissionBean startJob(String jArg) {
return getJobResourceRequest().start(serverUrl, jArg);
}
public SubmissionBean stopJob(Long jid) {
return getJobResourceRequest().stop(serverUrl, jid);
public SubmissionBean stopJob(String jArg) {
return getJobResourceRequest().stop(serverUrl, jArg);
}
public SubmissionsBean readSubmission(Long jid) {
return getSubmissionResourceRequest().read(serverUrl, jid);
public SubmissionsBean readSubmission(String jArg) {
return getSubmissionResourceRequest().read(serverUrl, jArg);
}
public RolesBean readRoles() {

View File

@ -38,14 +38,14 @@ public SubmissionResourceRequest(DelegationTokenAuthenticatedURL.Token token){
super(token);
}
public SubmissionsBean read(String serverUrl, Long jid) {
public SubmissionsBean read(String serverUrl, String jArg) {
String response;
if (jid == null) {
if (jArg == null) {
// all submissions
response = super.get(serverUrl + RESOURCE);
} else {
// submission per job ( name preferred, we fall back to id)
response = super.get(serverUrl + RESOURCE + "?jname=" + jid);
response = super.get(serverUrl + RESOURCE + "?jname=" + jArg);
}
JSONObject jsonObject = JSONUtils.parse(response);
SubmissionsBean submissionBean = new SubmissionsBean();

View File

@ -52,15 +52,15 @@ public CloneJobFunction() {
@SuppressWarnings("unchecked")
public Object executeFunction(CommandLine line, boolean isInteractive) throws IOException {
return cloneJob(getLong(line, Constants.OPT_JID), line.getArgList(), isInteractive);
return cloneJob(line.getOptionValue(Constants.OPT_JID), line.getArgList(), isInteractive);
}
private Status cloneJob(Long jobId, List<String> args, boolean isInteractive) throws IOException {
printlnResource(Constants.RES_CLONE_CLONING_JOB, jobId);
private Status cloneJob(String jobArg, List<String> args, boolean isInteractive) throws IOException {
printlnResource(Constants.RES_CLONE_CLONING_JOB, jobArg);
ConsoleReader reader = new ConsoleReader();
MJob job = client.getJob(jobId);
MJob job = client.getJob(jobArg);
job.setPersistenceId(MPersistableEntity.PERSISTANCE_ID_DEFAULT);
ResourceBundle fromConnectorBundle = client.getConnectorConfigBundle(

View File

@ -53,15 +53,15 @@ public CloneLinkFunction() {
@Override
@SuppressWarnings("unchecked")
public Object executeFunction(CommandLine line, boolean isInteractive) throws IOException {
return cloneLink(getLong(line, Constants.OPT_LID), line.getArgList(), isInteractive);
return cloneLink(line.getOptionValue(Constants.OPT_LID), line.getArgList(), isInteractive);
}
private Status cloneLink(Long connectionId, List<String> args, boolean isInteractive) throws IOException {
printlnResource(Constants.RES_CLONE_CLONING_LINK, connectionId);
private Status cloneLink(String linkArg, List<String> args, boolean isInteractive) throws IOException {
printlnResource(Constants.RES_CLONE_CLONING_LINK, linkArg);
ConsoleReader reader = new ConsoleReader();
MLink connection = client.getLink(connectionId);
MLink connection = client.getLink(linkArg);
// Remove persistent id as we're making a clone
connection.setPersistenceId(MPersistableEntity.PERSISTANCE_ID_DEFAULT);

View File

@ -62,17 +62,17 @@ public CreateJobFunction() {
@Override
@SuppressWarnings("unchecked")
public Object executeFunction(CommandLine line, boolean isInteractive) throws IOException {
return createJob(getLong(line, Constants.OPT_FROM),
getLong(line, Constants.OPT_TO),
return createJob(line.getOptionValue(Constants.OPT_FROM),
line.getOptionValue(Constants.OPT_TO),
line.getArgList(),
isInteractive);
}
private Status createJob(Long fromLinkId, Long toLinkId, List<String> args, boolean isInteractive) throws IOException {
printlnResource(Constants.RES_CREATE_CREATING_JOB, fromLinkId, toLinkId);
private Status createJob(String fromLinkArg, String toLinkArg, List<String> args, boolean isInteractive) throws IOException {
printlnResource(Constants.RES_CREATE_CREATING_JOB, fromLinkArg, toLinkArg);
ConsoleReader reader = new ConsoleReader();
MJob job = client.createJob(fromLinkId, toLinkId);
MJob job = client.createJob(fromLinkArg, toLinkArg);
MConnector fromConnector = client.getConnector(job.getFromConnectorId());
if (!fromConnector.getSupportedDirections().isDirectionSupported(Direction.FROM)) {

View File

@ -20,6 +20,7 @@
import jline.ConsoleReader;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.OptionBuilder;
import org.apache.sqoop.model.MConnector;
import org.apache.sqoop.model.MLink;
import org.apache.sqoop.shell.core.Constants;
import org.apache.sqoop.shell.utils.LinkDynamicConfigOptions;
@ -52,17 +53,40 @@ public CreateLinkFunction() {
@Override
@SuppressWarnings("unchecked")
public Object executeFunction(CommandLine line, boolean isInteractive) throws IOException {
return createLink(getLong(line, Constants.OPT_CID), line.getArgList(), isInteractive);
return createLink(line, line.getArgList(), isInteractive);
}
private Status createLink(long connectorId, List<String> args, boolean isInteractive) throws IOException {
printlnResource(Constants.RES_CREATE_CREATING_LINK, connectorId);
private Status createLink(CommandLine line, List<String> args, boolean isInteractive) throws IOException {
//Check if the command argument is a connector name
MLink link = null;
Long cid;
String connectorName = line.getOptionValue(Constants.OPT_CID);
MConnector connector = client.getConnector(connectorName);
if (null == connector) {
//Now check if command line argument is a connector id
//This works as getConnector(String...) does not throw an exception
cid = getLong(line, Constants.OPT_CID);
client.getConnector(cid);
//Would have thrown an exception before this if input was neither a valid name nor an id
//This will do an extra getConnector() call again inside createLink()
//but should not matter as connectors are cached
link = client.createLink(cid);
printlnResource(Constants.RES_CREATE_CREATING_LINK, cid);
}
else {
//Command line had connector name
//This will do an extra getConnector() call again inside createLink() but
//should not matter as connectors are cached
cid = connector.getPersistenceId();
link = client.createLink(connectorName);
printlnResource(Constants.RES_CREATE_CREATING_LINK, connectorName);
}
ConsoleReader reader = new ConsoleReader();
MLink link = client.createLink(connectorId);
ResourceBundle connectorConfigBundle = client.getConnectorConfigBundle(connectorId);
ResourceBundle connectorConfigBundle = client.getConnectorConfigBundle(cid);
Status status = Status.OK;
if (isInteractive) {
@ -85,8 +109,8 @@ private Status createLink(long connectorId, List<String> args, boolean isInterac
} else {
LinkDynamicConfigOptions options = new LinkDynamicConfigOptions();
options.prepareOptions(link);
CommandLine line = ConfigOptions.parseOptions(options, 0, args, false);
if (fillLink(line, link)) {
CommandLine linkoptsline = ConfigOptions.parseOptions(options, 0, args, false);
if (fillLink(linkoptsline, link)) {
status = client.saveLink(link);
if (!status.canProceed()) {
printLinkValidationMessages(link);

View File

@ -42,7 +42,7 @@ public DeleteJobFunction() {
@Override
public Object executeFunction(CommandLine line, boolean isInteractive) {
client.deleteJob(getLong(line, Constants.OPT_JID));
client.deleteJob(line.getOptionValue(Constants.OPT_JID));
return Status.OK;
}
}

View File

@ -41,7 +41,7 @@ public DeleteLinkFunction() {
@Override
public Object executeFunction(CommandLine line, boolean isInteractive) {
client.deleteLink(getLong(line, Constants.OPT_LID));
client.deleteLink(line.getOptionValue(Constants.OPT_LID));
return Status.OK;
}
}

View File

@ -50,7 +50,7 @@ public boolean validateArgs(CommandLine line) {
@Override
public Object executeFunction(CommandLine line, boolean isInteractive) {
client.enableJob(getLong(line, Constants.OPT_JID), false);
client.enableJob(line.getOptionValue(Constants.OPT_JID), false);
return Status.OK;
}
}

View File

@ -41,7 +41,7 @@ public DisableLinkFunction() {
@Override
public Object executeFunction(CommandLine line, boolean isInteractive) {
client.enableLink(getLong(line, Constants.OPT_LID), false);
client.enableLink(line.getOptionValue(Constants.OPT_LID), false);
return Status.OK;
}
}

View File

@ -42,7 +42,7 @@ public EnableJobFunction() {
@Override
public Object executeFunction(CommandLine line, boolean isInteractive) {
client.enableJob(getLong(line, Constants.OPT_JID), true);
client.enableJob(line.getOptionValue(Constants.OPT_JID), true);
return Status.OK;
}
}

View File

@ -41,7 +41,7 @@ public EnableLinkFunction() {
@Override
public Object executeFunction(CommandLine line, boolean isInteractive) {
client.enableLink(getLong(line, Constants.OPT_LID), true);
client.enableLink(line.getOptionValue(Constants.OPT_LID), true);
return Status.OK;
}
}

View File

@ -51,7 +51,7 @@ public Object executeFunction(CommandLine line, boolean isInteractive) {
if (line.hasOption(Constants.OPT_ALL)) {
showConnectors();
} else if (line.hasOption(Constants.OPT_CID)) {
showConnector(getLong(line, Constants.OPT_CID));
showConnector(line);
} else {
showSummary();
}
@ -96,8 +96,19 @@ private void showConnectors() {
}
}
private void showConnector(Long cid) {
MConnector connector = client.getConnector(cid);
private void showConnector(CommandLine line) {
//Check if the command argument is a connector name
String connectorName = line.getOptionValue(Constants.OPT_CID);
MConnector connector = client.getConnector(connectorName);
if (null == connector) {
//Now check if command line argument is a connector id
//This works as getConnector(String...) does not throw an exception
Long cid = getLong(line, Constants.OPT_CID);
connector = client.getConnector(cid);
}
//No null checks here - as before. This is because getConnector(long...)
//throws an exception if connector is not found.
printlnResource(Constants.RES_SHOW_PROMPT_CONNECTORS_TO_SHOW, 1);

View File

@ -60,9 +60,11 @@ public Object executeFunction(CommandLine line, boolean isInteractive) {
if (line.hasOption(Constants.OPT_ALL)) {
showJobs(null);
} else if (line.hasOption(Constants.OPT_CID)) {
showJobs(getLong(line, Constants.OPT_CID));
//showJobs(getLong(line, Constants.OPT_CID));
showJobs(line.getOptionValue(Constants.OPT_CID));
} else if (line.hasOption(Constants.OPT_JID)) {
showJob(getLong(line, Constants.OPT_JID));
//showJob(getLong(line, Constants.OPT_JID));
showJob(line.getOptionValue(Constants.OPT_JID));
} else {
showSummary();
}
@ -121,12 +123,12 @@ private void showSummary() {
TableDisplayer.display(header, ids, names, fromConnectors, toConnectors, availabilities);
}
private void showJobs(Long id) {
private void showJobs(String jArg) {
List<MJob> jobs;
if (id == null) {
if (jArg == null) {
jobs = client.getJobs();
} else {
jobs = client.getJobsByConnector(id);
jobs = client.getJobsByConnector(jArg);
}
printlnResource(Constants.RES_SHOW_PROMPT_JOBS_TO_SHOW, jobs.size());
@ -135,8 +137,8 @@ private void showJobs(Long id) {
}
}
private void showJob(Long jid) {
MJob job = client.getJob(jid);
private void showJob(String jobArg) {
MJob job = client.getJob(jobArg);
printlnResource(Constants.RES_SHOW_PROMPT_JOBS_TO_SHOW, 1);
displayJob(job);

View File

@ -42,7 +42,7 @@ public ShowJobStatusFunction() {
@Override
public Object executeFunction(CommandLine line, boolean isInteractive) {
if (line.hasOption(Constants.OPT_JID)) {
MSubmission submission = client.getJobStatus(getLong(line, Constants.OPT_JID));
MSubmission submission = client.getJobStatus(line.getOptionValue(Constants.OPT_JID));
if(submission.getStatus().isFailure() || submission.getStatus().equals(SubmissionStatus.SUCCEEDED)) {
SubmissionDisplayer.displayHeader(submission);
SubmissionDisplayer.displayFooter(submission);

View File

@ -58,7 +58,7 @@ public Object executeFunction(CommandLine line, boolean isInteractive) {
if (line.hasOption(Constants.OPT_ALL)) {
showLinks();
} else if (line.hasOption(Constants.OPT_LID)) {
showLink(getLong(line, Constants.OPT_LID));
showLink(line.getOptionValue(Constants.OPT_LID));
} else {
showSummary();
}
@ -103,8 +103,8 @@ private void showLinks() {
}
}
private void showLink(Long xid) {
MLink link = client.getLink(xid);
private void showLink(String linkArg) {
MLink link = client.getLink(linkArg);
printlnResource(Constants.RES_SHOW_PROMPT_LINKS_TO_SHOW, 1);

View File

@ -48,13 +48,13 @@ public ShowSubmissionFunction() {
public Object executeFunction(CommandLine line, boolean isInteractive) {
if (line.hasOption(Constants.OPT_DETAIL)) {
if (line.hasOption(Constants.OPT_JID)) {
showSubmissions(getLong(line, Constants.OPT_JID));
showSubmissions(line.getOptionValue(Constants.OPT_JID));
} else {
showSubmissions(null);
}
} else {
if (line.hasOption(Constants.OPT_JID)) {
showSummary(getLong(line, Constants.OPT_JID));
showSummary(line.getOptionValue(Constants.OPT_JID));
} else {
showSummary(null);
}
@ -63,12 +63,12 @@ public Object executeFunction(CommandLine line, boolean isInteractive) {
return Status.OK;
}
private void showSummary(Long jid) {
private void showSummary(String jArg) {
List<MSubmission> submissions;
if (jid == null) {
if (jArg == null) {
submissions = client.getSubmissions();
} else {
submissions = client.getSubmissionsForJob(jid);
submissions = client.getSubmissionsForJob(jArg);
}
List<String> header = new LinkedList<String>();
@ -92,12 +92,12 @@ private void showSummary(Long jid) {
TableDisplayer.display(header, jids, eids, status, dates);
}
private void showSubmissions(Long jid) {
private void showSubmissions(String jArg) {
List<MSubmission> submissions;
if (jid == null) {
if (jArg == null) {
submissions = client.getSubmissions();
} else {
submissions = client.getSubmissionsForJob(jid);
submissions = client.getSubmissionsForJob(jArg);
}
for (MSubmission submission : submissions) {

View File

@ -72,12 +72,14 @@ public void finished(MSubmission submission) {
};
try {
client.startJob(getLong(line, Constants.OPT_JID), callback, pollTimeout);
//client.startJob(getLong(line, Constants.OPT_JID), callback, pollTimeout);
client.startJob(line.getOptionValue(Constants.OPT_JID), callback, pollTimeout);
} catch (InterruptedException e) {
throw new SqoopException(ShellError.SHELL_0007, e);
}
} else if (line.hasOption(Constants.OPT_JID)) {
MSubmission submission = client.startJob(getLong(line, Constants.OPT_JID));
//MSubmission submission = client.startJob(getLong(line, Constants.OPT_JID));
MSubmission submission = client.startJob(line.getOptionValue(Constants.OPT_JID));
if(submission.getStatus().isFailure()) {
SubmissionDisplayer.displayFooter(submission);
} else {

View File

@ -40,7 +40,8 @@ public StopJobFunction() {
@Override
public Object executeFunction(CommandLine line, boolean isInteractive) {
if (line.hasOption(Constants.OPT_JID)) {
MSubmission submission = client.stopJob(getLong(line, Constants.OPT_JID));
//MSubmission submission = client.stopJob(getLong(line, Constants.OPT_JID));
MSubmission submission = client.stopJob(line.getOptionValue(Constants.OPT_JID));
if(submission.getStatus().isFailure()) {
SubmissionDisplayer.displayFooter(submission);
} else {

View File

@ -53,16 +53,16 @@ public UpdateJobFunction() {
@Override
@SuppressWarnings("unchecked")
public Object executeFunction(CommandLine line, boolean isInteractive) throws IOException {
return updateJob(getLong(line, Constants.OPT_JID), line.getArgList(), isInteractive);
return updateJob(line.getOptionValue(Constants.OPT_JID), line.getArgList(), isInteractive);
}
private Status updateJob(Long jobId, List<String> args, boolean isInteractive) throws IOException {
printlnResource(Constants.RES_SQOOP_UPDATING_JOB, jobId);
private Status updateJob(String jobArg, List<String> args, boolean isInteractive) throws IOException {
printlnResource(Constants.RES_SQOOP_UPDATING_JOB, jobArg);
ConsoleReader reader = new ConsoleReader();
// TODO(SQOOP-1634): using from/to and driver config id, this call can be avoided
MJob job = client.getJob(jobId);
MJob job = client.getJob(jobArg);
ResourceBundle fromConnectorBundle = client.getConnectorConfigBundle(
job.getFromConnectorId());

View File

@ -52,16 +52,16 @@ public UpdateLinkFunction() {
@Override
@SuppressWarnings("unchecked")
public Object executeFunction(CommandLine line, boolean isInteractive) throws IOException {
return updateLink(getLong(line, Constants.OPT_LID), line.getArgList(), isInteractive);
return updateLink(line.getOptionValue(Constants.OPT_LID), line.getArgList(), isInteractive);
}
private Status updateLink(Long linkId, List<String> args, boolean isInteractive) throws IOException {
printlnResource(Constants.RES_SQOOP_UPDATING_LINK, linkId);
private Status updateLink(String linkArg, List<String> args, boolean isInteractive) throws IOException {
printlnResource(Constants.RES_SQOOP_UPDATING_LINK, linkArg);
ConsoleReader reader = new ConsoleReader();
// TODO(SQOOP-1634): using link config id, this call can be avoided
MLink link = client.getLink(linkId);
MLink link = client.getLink(linkArg);
ResourceBundle connectorLinkConfigBundle = client.getConnectorConfigBundle(link.getConnectorId());