5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-11 22:41:50 +08:00

SQOOP-2760: Sqoop2: Fix update command with name for link and job

(Colin Ma via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2016-01-04 06:07:15 -08:00
parent 8d97bb3e28
commit 0495bc7b4d
15 changed files with 119 additions and 53 deletions

View File

@ -313,8 +313,8 @@ public Status saveLink(MLink link) {
* @param link link that should be updated
* @return
*/
public Status updateLink(MLink link) {
return applyLinkValidations(resourceRequests.updateLink(link), link);
public Status updateLink(MLink link, String oldLinkName) {
return applyLinkValidations(resourceRequests.updateLink(link, oldLinkName), link);
}
/**
@ -403,8 +403,8 @@ public Status saveJob(MJob job) {
* @param job Job that should be updated
* @return
*/
public Status updateJob(MJob job) {
return applyJobValidations(resourceRequests.updateJob(job), job);
public Status updateJob(MJob job, String oldJobName) {
return applyJobValidations(resourceRequests.updateJob(job, oldJobName), job);
}
/**

View File

@ -86,11 +86,11 @@ public ValidationResultBean create(String serverUrl, MJob job) {
return validationResultBean;
}
public ValidationResultBean update(String serverUrl, MJob job) {
public ValidationResultBean update(String serverUrl, MJob job, String oldJobName) {
JobBean jobBean = new JobBean(job);
// Extract all config inputs including sensitive inputs
JSONObject jobJson = jobBean.extract(false);
String response = super.put(serverUrl + RESOURCE + UrlSafeUtils.urlPathEncode(job.getName()),
String response = super.put(serverUrl + RESOURCE + UrlSafeUtils.urlPathEncode(oldJobName),
jobJson.toJSONString());
ValidationResultBean validationBean = new ValidationResultBean();
validationBean.restore(JSONUtils.parse(response));

View File

@ -72,11 +72,11 @@ public ValidationResultBean create(String serverUrl, MLink link) {
return validationBean;
}
public ValidationResultBean update(String serverUrl, MLink link) {
public ValidationResultBean update(String serverUrl, MLink link, String oldLinkName) {
LinkBean linkBean = new LinkBean(link);
// Extract all config inputs including sensitive inputs
JSONObject linkJson = linkBean.extract(false);
String response = super.put(serverUrl + LINK_RESOURCE + UrlSafeUtils.urlPathEncode(link.getName()),
String response = super.put(serverUrl + LINK_RESOURCE + UrlSafeUtils.urlPathEncode(oldLinkName),
linkJson.toJSONString());
ValidationResultBean validationBean = new ValidationResultBean();
validationBean.restore(JSONUtils.parse(response));

View File

@ -126,8 +126,8 @@ public LinkBean readLink(String linkArg) {
return getLinkResourceRequest().read(serverUrl, linkArg);
}
public ValidationResultBean updateLink(MLink link) {
return getLinkResourceRequest().update(serverUrl, link);
public ValidationResultBean updateLink(MLink link, String oldLinkName) {
return getLinkResourceRequest().update(serverUrl, link, oldLinkName);
}
public void enableLink(String lArg, Boolean enabled) {
@ -150,8 +150,8 @@ public JobBean readJobsByConnector(String cArg) {
return getJobResourceRequest().readByConnector(serverUrl, cArg);
}
public ValidationResultBean updateJob(MJob job) {
return getJobResourceRequest().update(serverUrl, job);
public ValidationResultBean updateJob(MJob job, String oldJobName) {
return getJobResourceRequest().update(serverUrl, job, oldJobName);
}
public void enableJob(String jArg, Boolean enabled) {

View File

@ -293,9 +293,8 @@ public Object doIt(Connection conn) {
if (!link.hasPersistenceId()) {
throw new SqoopException(RepositoryError.JDBCREPO_0016);
}
if (!handler.existsLink(link.getName(), conn)) {
throw new SqoopException(RepositoryError.JDBCREPO_0017, "Invalid name: "
+ link.getName());
if (!handler.existsLink(link.getPersistenceId(), conn)) {
throw new SqoopException(RepositoryError.JDBCREPO_0016);
}
handler.updateLink(link, conn);
@ -438,9 +437,8 @@ public Object doIt(Connection conn) {
if(!job.hasPersistenceId()) {
throw new SqoopException(RepositoryError.JDBCREPO_0019);
}
if(!handler.existsJob(job.getName(), conn)) {
throw new SqoopException(RepositoryError.JDBCREPO_0020,
"Invalid id: " + job.getPersistenceId());
if(!handler.existsJob(job.getPersistenceId(), conn)) {
throw new SqoopException(RepositoryError.JDBCREPO_0019);
}
handler.updateJob(job, conn);

View File

@ -232,6 +232,15 @@ public abstract class JdbcRepositoryHandler {
*/
public abstract boolean existsLink(String linkName, Connection conn);
/**
* Check if given link exists in repository.
*
* @param linkId Link id
* @param conn Connection to the repository
* @return True if the link exists
*/
public abstract boolean existsLink(long linkId, Connection conn);
/**
* Check if given link is referenced somewhere and thus can't
* be removed.
@ -322,6 +331,15 @@ public abstract class JdbcRepositoryHandler {
*/
public abstract boolean existsJob(String jobName, Connection conn);
/**
* Check if given job exists in the repository.
*
* @param jobId Job id
* @param conn Connection to the repository
* @return True if the job exists
*/
public abstract boolean existsJob(long jobId, Connection conn);
/**
* Check if given job is referenced somewhere and thus can't
* be removed.

View File

@ -558,8 +558,7 @@ public void testConnectorConfigUpgradeHandlerWithUpdateLinkError() {
doNothing().when(repoHandlerMock).upgradeConnectorAndConfigs(any(MConnector.class), any(Connection.class));
doReturn(true).when(repoHandlerMock).existsLink(anyString(), any(Connection.class));
SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
"update link error.");
SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0016);
doThrow(exception).when(repoHandlerMock).updateLink(any(MLink.class), any(Connection.class));
try {
@ -571,9 +570,6 @@ public void testConnectorConfigUpgradeHandlerWithUpdateLinkError() {
verify(repoHandlerMock, times(2)).deleteJobInputs(anyString(), any(Connection.class));
verify(repoHandlerMock, times(2)).deleteLinkInputs(anyString(), any(Connection.class));
verify(repoHandlerMock, times(1)).upgradeConnectorAndConfigs(any(MConnector.class), any(Connection.class));
verify(repoHandlerMock, times(1)).existsLink(anyString(), any(Connection.class));
verify(repoHandlerMock, times(1)).updateLink(any(MLink.class), any(Connection.class));
verifyNoMoreInteractions(repoHandlerMock);
return ;
}
@ -607,8 +603,7 @@ public void testConnectorConfigUpgradeHandlerWithUpdateJobError() {
doReturn(true).when(repoHandlerMock).existsLink(anyString(), any(Connection.class));
doReturn(true).when(repoHandlerMock).existsJob(anyString(), any(Connection.class));
SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
"update job error.");
SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0016);
doThrow(exception).when(repoHandlerMock).updateJob(any(MJob.class), any(Connection.class));
try {
@ -620,10 +615,7 @@ public void testConnectorConfigUpgradeHandlerWithUpdateJobError() {
verify(repoHandlerMock, times(2)).deleteJobInputs(anyString(), any(Connection.class));
verify(repoHandlerMock, times(2)).deleteLinkInputs(anyString(), any(Connection.class));
verify(repoHandlerMock, times(1)).upgradeConnectorAndConfigs(any(MConnector.class), any(Connection.class));
verify(repoHandlerMock, times(2)).existsLink(anyString(), any(Connection.class));
verify(repoHandlerMock, times(2)).updateLink(any(MLink.class), any(Connection.class));
verify(repoHandlerMock, times(1)).existsJob(anyString(), any(Connection.class));
verify(repoHandlerMock, times(1)).updateJob(any(MJob.class), any(Connection.class));
verify(repoHandlerMock, times(1)).existsLink(anyLong(), any(Connection.class));
verifyNoMoreInteractions(repoHandlerMock);
return ;
}
@ -740,8 +732,7 @@ public void testDriverConfigUpgradeHandlerWithUpdateJobError() {
doNothing().when(repoHandlerMock).upgradeDriverAndConfigs(any(MDriver.class), any(Connection.class));
doReturn(true).when(repoHandlerMock).existsJob(anyString(), any(Connection.class));
SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
"update job error.");
SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0019);
doThrow(exception).when(repoHandlerMock).updateJob(any(MJob.class), any(Connection.class));
try {
@ -751,9 +742,7 @@ public void testDriverConfigUpgradeHandlerWithUpdateJobError() {
verify(repoHandlerMock, times(1)).findJobs(any(Connection.class));
verify(repoHandlerMock, times(2)).deleteJobInputs(anyString(), any(Connection.class));
verify(repoHandlerMock, times(1)).upgradeDriverAndConfigs(any(MDriver.class), any(Connection.class));
verify(repoHandlerMock, times(1)).existsJob(anyString(), any(Connection.class));
verify(repoHandlerMock, times(1)).updateJob(any(MJob.class), any(Connection.class));
verifyNoMoreInteractions(repoHandlerMock);
verify(repoHandlerMock, times(1)).existsJob(anyLong(), any(Connection.class));
return ;
}

View File

@ -462,6 +462,26 @@ public boolean existsLink(String linkName, Connection conn) {
}
}
/**
* {@inheritDoc}
*/
@Override
public boolean existsLink(long linkId, Connection conn) {
try (PreparedStatement stmt = conn.prepareStatement(crudQueries.getStmtSelectLinkCheckById())) {
stmt.setLong(1, linkId);
try (ResultSet rs = stmt.executeQuery()) {
// Should be always valid in query with count
rs.next();
return rs.getLong(1) == 1;
}
} catch (SQLException ex) {
logException(ex, linkId);
throw new SqoopException(CommonRepositoryError.COMMON_0022, ex);
}
}
/**
* {@inheritDoc}
*/
@ -751,6 +771,26 @@ public void updateJob(MJob job, Connection conn) {
}
}
/**
* {@inheritDoc}
*/
@Override
public boolean existsJob(long jobId, Connection conn) {
try (PreparedStatement stmt = conn.prepareStatement(crudQueries.getStmtSelectJobCheckById())) {
stmt.setLong(1, jobId);
try (ResultSet rs = stmt.executeQuery()) {
// Should be always valid in query with count
rs.next();
return rs.getLong(1) == 1;
}
} catch (SQLException ex) {
logException(ex, jobId);
throw new SqoopException(CommonRepositoryError.COMMON_0026, ex);
}
}
/**
* {@inheritDoc}
*/

View File

@ -364,6 +364,11 @@ public class CommonRepositoryInsertUpdateDeleteSelectQuery {
"SELECT count(*) FROM " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_LINK_NAME)
+ " WHERE " + CommonRepoUtils.escapeColumnName(COLUMN_SQ_LNK_NAME) + " = ?";
// DML: Check if given link exists
private static final String STMT_SELECT_LINK_CHECK_BY_ID =
"SELECT count(*) FROM " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_LINK_NAME)
+ " WHERE " + CommonRepoUtils.escapeColumnName(COLUMN_SQ_LNK_ID) + " = ?";
/**
* *******JOB TABLE *************
*/
@ -419,6 +424,11 @@ public class CommonRepositoryInsertUpdateDeleteSelectQuery {
"SELECT count(*) FROM " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_JOB_NAME)
+ " WHERE " + CommonRepoUtils.escapeColumnName(COLUMN_SQB_NAME) + " = ?";
// DML: Check if given job exists
private static final String STMT_SELECT_JOB_CHECK_BY_ID =
"SELECT count(*) FROM " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_JOB_NAME)
+ " WHERE " + CommonRepoUtils.escapeColumnName(COLUMN_SQB_ID) + " = ?";
// DML: Check if there are jobs for given link
private static final String STMT_SELECT_JOBS_FOR_LINK_CHECK =
"SELECT SUM(CNT) FROM ("
@ -841,6 +851,10 @@ public String getStmtSelectLinkCheckByName() {
return STMT_SELECT_LINK_CHECK_BY_NAME;
}
public String getStmtSelectLinkCheckById() {
return STMT_SELECT_LINK_CHECK_BY_ID;
}
public String getStmtInsertJob() {
return STMT_INSERT_JOB;
}
@ -869,6 +883,10 @@ public String getStmtSelectJobCheckByName() {
return STMT_SELECT_JOB_CHECK_BY_NAME;
}
public String getStmtSelectJobCheckById() {
return STMT_SELECT_JOB_CHECK_BY_ID;
}
public String getStmtSelectJobsForLinkCheck() {
return STMT_SELECT_JOBS_FOR_LINK_CHECK;
}

View File

@ -179,13 +179,14 @@ private JsonBean createUpdateJob(RequestContext ctx, boolean create) {
// Job object
MJob postedJob = jobs.get(0);
String oldJobName = ctx.getLastURLElement();
// Authorization check
if (create) {
AuthorizationEngine.createJob(ctx.getUserName(), postedJob.getFromLinkName(), postedJob.getToLinkName());
} else {
AuthorizationEngine.updateJob(ctx.getUserName(), postedJob.getFromLinkName(), postedJob.getToLinkName(),
postedJob.getName());
oldJobName);
}
// Verify that user is not trying to spoof us
@ -203,8 +204,7 @@ private JsonBean createUpdateJob(RequestContext ctx, boolean create) {
// if update get the job id from the request URI
if (!create) {
String jobIdentifier = ctx.getLastURLElement();
MJob existingJob = HandlerUtils.getJobFromIdentifier(jobIdentifier);
MJob existingJob = HandlerUtils.getJobFromIdentifier(oldJobName);
if (postedJob.getPersistenceId() == MPersistableEntity.PERSISTANCE_ID_DEFAULT) {
postedJob.setPersistenceId(existingJob.getPersistenceId());
}

View File

@ -131,6 +131,7 @@ private JsonBean createUpdateLink(RequestContext ctx, boolean create) {
MLink postedLink = links.get(0);
MConnector mConnector = HandlerUtils.getConnectorFromConnectorName(postedLink.getConnectorName());
String oldLinkName = ctx.getLastURLElement();
// Authorization check
if (create) {
@ -138,7 +139,7 @@ private JsonBean createUpdateLink(RequestContext ctx, boolean create) {
mConnector.getUniqueName());
} else {
AuthorizationEngine.updateLink(ctx.getUserName(), mConnector.getUniqueName(),
postedLink.getName());
oldLinkName);
}
MLinkConfig linkConfig = ConnectorManager.getInstance()
@ -148,8 +149,7 @@ private JsonBean createUpdateLink(RequestContext ctx, boolean create) {
}
// if update get the link id from the request URI
if (!create) {
String linkName = ctx.getLastURLElement();
MLink existingLink = repository.findLink(linkName);
MLink existingLink = repository.findLink(oldLinkName);
if (postedLink.getPersistenceId() == MPersistableEntity.PERSISTANCE_ID_DEFAULT) {
postedLink.setPersistenceId(existingLink.getPersistenceId());
}

View File

@ -67,7 +67,7 @@ private Status updateJob(String jobArg, List<String> args, boolean isInteractive
// TODO(SQOOP-1634): using from/to and driver config id, this call can be avoided
MJob job = client.getJob(jobArg);
String oldJobName = job.getName();
ResourceBundle fromConnectorBundle = client.getConnectorConfigBundle(
job.getFromConnectorName());
ResourceBundle toConnectorBundle = client.getConnectorConfigBundle(
@ -91,14 +91,14 @@ private Status updateJob(String jobArg, List<String> args, boolean isInteractive
}
// Try to create
status = client.updateJob(job);
status = client.updateJob(job, oldJobName);
} while(!status.canProceed());
} else {
JobDynamicConfigOptions options = new JobDynamicConfigOptions();
options.prepareOptions(job);
CommandLine line = ConfigOptions.parseOptions(options, 0, args, false);
if (fillJob(line, job)) {
status = client.updateJob(job);
status = client.updateJob(job, oldJobName);
if (!status.canProceed()) {
printJobValidationMessages(job);
return status;

View File

@ -65,6 +65,7 @@ private Status updateLink(String linkArg, List<String> args, boolean isInteracti
// TODO(SQOOP-1634): using link config id, this call can be avoided
MLink link = client.getLink(linkArg);
String oldLinkName = link.getName();
ResourceBundle connectorLinkConfigBundle = client.getConnectorConfigBundle(link.getConnectorName());
Status status = Status.OK;
@ -84,14 +85,14 @@ private Status updateLink(String linkArg, List<String> args, boolean isInteracti
}
// Try to create
status = client.updateLink(link);
status = client.updateLink(link, oldLinkName);
} while(!status.canProceed());
} else {
LinkDynamicConfigOptions options = new LinkDynamicConfigOptions();
options.prepareOptions(link);
CommandLine line = ConfigOptions.parseOptions(options, 0, args, false);
if (fillLink(line, link)) {
status = client.updateLink(link);
status = client.updateLink(link, oldLinkName);
if (!status.canProceed()) {
printLinkValidationMessages(link);
return null;

View File

@ -19,6 +19,7 @@
package org.apache.sqoop.shell;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
@ -108,7 +109,7 @@ public void testUpdateLink() throws InterruptedException {
MLink link = new MLink("connector_test", new MLinkConfig(new ArrayList<MConfig>(), new ArrayList<MValidator>()));
when(client.getLink("link_test")).thenReturn(link);
when(client.getConnectorConfigBundle("connector_test")).thenReturn(new MapResourceBundle(new HashMap()));
when(client.updateLink(any(MLink.class))).thenReturn(Status.OK);
when(client.updateLink(any(MLink.class), anyString())).thenReturn(Status.OK);
// update link -name link_test
Status status = (Status) updateCmd.execute(Arrays.asList(Constants.FN_LINK, "-name", "link_test"));
@ -140,7 +141,7 @@ public void testUpdateLinkInteractive() throws Exception {
when(client.getConnector("connector_test")).thenReturn(new MConnector("", "", "", null, null, null));
MLink link = new MLink("connector_test", new MLinkConfig(getConfig("CONFIGFROMNAME"), new ArrayList<MValidator>()));
when(client.getLink("link_test")).thenReturn(link);
when(client.updateLink(any(MLink.class))).thenReturn(Status.OK);
when(client.updateLink(any(MLink.class), anyString())).thenReturn(Status.OK);
when(client.getConnectorConfigBundle(any(String.class))).thenReturn(resourceBundle);
// update link -name link_test
@ -177,11 +178,12 @@ public void testUpdateJob() throws InterruptedException {
new MFromConfig(new ArrayList<MConfig>(), new ArrayList<MValidator>()),
new MToConfig(new ArrayList<MConfig>(), new ArrayList<MValidator>()),
new MDriverConfig(new ArrayList<MConfig>(), new ArrayList<MValidator>()));
job.setName("job_test");
when(client.getJob("job_test")).thenReturn(job);
when(client.getConnector(any(String.class))).thenReturn(new MConnector("connect_test", "", "", null, null, null));
when(client.getConnectorConfigBundle(any(String.class))).thenReturn(new MapResourceBundle(new HashMap()));
when(client.getDriverConfigBundle()).thenReturn(new MapResourceBundle(new HashMap()));
when(client.updateJob(job)).thenReturn(Status.OK);
when(client.updateJob(job, "job_test")).thenReturn(Status.OK);
// update job -name job_test
Status status = (Status) updateCmd.execute(Arrays.asList(Constants.FN_JOB, "-name", "job_test"));
@ -218,10 +220,10 @@ public void testUpdateJobInteractive() {
when(client.getConnector(any(String.class))).thenReturn(new MConnector("connect_test", "", "", null, null, null));
when(client.getConnectorConfigBundle(any(String.class))).thenReturn(resourceBundle);
when(client.getDriverConfigBundle()).thenReturn(resourceBundle);
when(client.updateJob(job)).thenReturn(Status.OK);
when(client.updateJob(any(MJob.class), any(String.class))).thenReturn(Status.OK);
// update job -name job_test
initData("jobname\r" + // job name
initData("job_test\r" + // job name
// From job config
"abc\r" + // for input with name "String"
"12345\r" + // for input with name "Integer"
@ -253,7 +255,7 @@ public void testUpdateJobInteractive() {
"7654321\r"); // for input with name "DateTime"
Status status = (Status) updateCmd.execute(Arrays.asList(Constants.FN_JOB, "-name", "job_test"));
assertTrue(status != null && status == Status.OK);
assertEquals(job.getName(), "jobname");
assertEquals(job.getName(), "job_test");
// check from job config
assertEquals(job.getFromJobConfig().getStringInput("fromJobConfig.String").getValue(), "abc");
assertEquals(job.getFromJobConfig().getIntegerInput("fromJobConfig.Integer").getValue().intValue(), 12345);

View File

@ -93,7 +93,7 @@ private void verifyActionsForLink(String linkName) {
assertEquals(rdbmsLink, repositoryLink);
// update link
getClient().updateLink(rdbmsLink);
getClient().updateLink(rdbmsLink, rdbmsLink.getName());
// enable link
getClient().enableLink(linkName, true);
@ -138,7 +138,7 @@ private void verifyActionsForJob(String jobName) throws Exception {
assertEquals(job, repositoryJob);
// update job
getClient().updateJob(job);
getClient().updateJob(job, job.getName());
// enable job
getClient().enableJob(jobName, true);