mirror of
https://github.com/apache/sqoop.git
synced 2025-05-06 18:59:35 +08:00
SQOOP-2703: Sqoop2: Use connector name in MJob
(Colin Ma via Jarek Jarcec Cecho)
This commit is contained in:
parent
6c4743de15
commit
e70b9c259a
@ -393,8 +393,8 @@ public MJob createJob(String fromLinkName, String toLinkName) {
|
||||
MConnector connectorForToLink = getConnector(toLink.getConnectorName());
|
||||
|
||||
return new MJob(
|
||||
connectorForFromLink.getPersistenceId(),
|
||||
connectorForToLink.getPersistenceId(),
|
||||
connectorForFromLink.getUniqueName(),
|
||||
connectorForToLink.getUniqueName(),
|
||||
fromLink.getPersistenceId(),
|
||||
toLink.getPersistenceId(),
|
||||
connectorForFromLink.getFromConfig().clone(false),
|
||||
|
@ -50,8 +50,8 @@ public class JobBean implements JsonBean {
|
||||
|
||||
static final String FROM_LINK_ID = "from-link-id";
|
||||
static final String TO_LINK_ID = "to-link-id";
|
||||
static final String FROM_CONNECTOR_ID = "from-connector-id";
|
||||
static final String TO_CONNECTOR_ID = "to-connector-id";
|
||||
static final String FROM_CONNECTOR_NAME = "from-connector-name";
|
||||
static final String TO_CONNECTOR_NAME = "to-connector-name";
|
||||
static final String FROM_CONFIG_VALUES = "from-config-values";
|
||||
static final String TO_CONFIG_VALUES = "to-config-values";
|
||||
static final String DRIVER_CONFIG_VALUES = "driver-config-values";
|
||||
@ -61,7 +61,7 @@ public class JobBean implements JsonBean {
|
||||
private List<MJob> jobs;
|
||||
|
||||
// Optional
|
||||
private Map<Long, ResourceBundle> connectorConfigBundles;
|
||||
private Map<String, ResourceBundle> connectorConfigBundles;
|
||||
private ResourceBundle driverConfigBundle;
|
||||
|
||||
// For "extract"
|
||||
@ -78,27 +78,27 @@ public JobBean(List<MJob> jobs) {
|
||||
|
||||
// For "restore"
|
||||
public JobBean() {
|
||||
connectorConfigBundles = new HashMap<Long, ResourceBundle>();
|
||||
connectorConfigBundles = new HashMap<String, ResourceBundle>();
|
||||
}
|
||||
|
||||
public void setDriverConfigBundle(ResourceBundle driverConfigBundle) {
|
||||
this.driverConfigBundle = driverConfigBundle;
|
||||
}
|
||||
|
||||
public void addConnectorConfigBundle(Long id, ResourceBundle connectorConfigBundle) {
|
||||
connectorConfigBundles.put(id, connectorConfigBundle);
|
||||
public void addConnectorConfigBundle(String connectorName, ResourceBundle connectorConfigBundle) {
|
||||
connectorConfigBundles.put(connectorName, connectorConfigBundle);
|
||||
}
|
||||
|
||||
public boolean hasConnectorConfigBundle(Long id) {
|
||||
return connectorConfigBundles.containsKey(id);
|
||||
public boolean hasConnectorConfigBundle(String connectorName) {
|
||||
return connectorConfigBundles.containsKey(connectorName);
|
||||
}
|
||||
|
||||
public List<MJob> getJobs() {
|
||||
return jobs;
|
||||
}
|
||||
|
||||
public ResourceBundle getConnectorConfigBundle(Long id) {
|
||||
return connectorConfigBundles.get(id);
|
||||
public ResourceBundle getConnectorConfigBundle(String connectorName) {
|
||||
return connectorConfigBundles.get(connectorName);
|
||||
}
|
||||
|
||||
public ResourceBundle getDriverConfigBundle() {
|
||||
@ -134,8 +134,8 @@ private JSONObject extractJob(boolean skipSensitive, MJob job) {
|
||||
object.put(UPDATE_DATE, job.getLastUpdateDate().getTime());
|
||||
// job link associated connectors
|
||||
// TODO(SQOOP-1634): fix not to require the connectorIds in the post data
|
||||
object.put(FROM_CONNECTOR_ID, job.getFromConnectorId());
|
||||
object.put(TO_CONNECTOR_ID, job.getToConnectorId());
|
||||
object.put(FROM_CONNECTOR_NAME, job.getFromConnectorName());
|
||||
object.put(TO_CONNECTOR_NAME, job.getToConnectorName());
|
||||
// job associated links
|
||||
object.put(FROM_LINK_ID, job.getFromLinkId());
|
||||
object.put(TO_LINK_ID, job.getToLinkId());
|
||||
@ -168,8 +168,8 @@ protected void restoreJobs(JSONArray array) {
|
||||
|
||||
private MJob restoreJob(Object obj) {
|
||||
JSONObject object = (JSONObject) obj;
|
||||
long fromConnectorId = JSONUtils.getLong(object, FROM_CONNECTOR_ID);
|
||||
long toConnectorId = JSONUtils.getLong(object, TO_CONNECTOR_ID);
|
||||
String fromConnectorName = JSONUtils.getString(object, FROM_CONNECTOR_NAME);
|
||||
String toConnectorName = JSONUtils.getString(object, TO_CONNECTOR_NAME);
|
||||
long fromConnectionId = JSONUtils.getLong(object, FROM_LINK_ID);
|
||||
long toConnectionId = JSONUtils.getLong(object, TO_LINK_ID);
|
||||
JSONObject fromConfigJson = JSONUtils.getJSONObject(object, FROM_CONFIG_VALUES);
|
||||
@ -186,8 +186,8 @@ private MJob restoreJob(Object obj) {
|
||||
List<MValidator> driverValidators = restoreValidator(JSONUtils.getJSONArray(driverConfigJson, ConfigInputConstants.CONFIG_VALIDATORS));
|
||||
|
||||
MJob job = new MJob(
|
||||
fromConnectorId,
|
||||
toConnectorId,
|
||||
fromConnectorName,
|
||||
toConnectorName,
|
||||
fromConnectionId,
|
||||
toConnectionId,
|
||||
new MFromConfig(fromConfigs, fromValidators),
|
||||
|
@ -35,8 +35,8 @@ public class MJob extends MAccountableEntity implements MClonable {
|
||||
* dependency through link object, but having this dependency explicitly
|
||||
* carried along helps with not having to make the DB call everytime
|
||||
*/
|
||||
private final long fromConnectorId;
|
||||
private final long toConnectorId;
|
||||
private final String fromConnectorName;
|
||||
private final String toConnectorName;
|
||||
private final long fromLinkId;
|
||||
private final long toLinkId;
|
||||
|
||||
@ -55,15 +55,15 @@ public class MJob extends MAccountableEntity implements MClonable {
|
||||
* @param toConfig TO job config
|
||||
* @param driverConfig driver config
|
||||
*/
|
||||
public MJob(long fromConnectorId,
|
||||
long toConnectorId,
|
||||
public MJob(String fromConnectorName,
|
||||
String toConnectorName,
|
||||
long fromLinkId,
|
||||
long toLinkId,
|
||||
MFromConfig fromConfig,
|
||||
MToConfig toConfig,
|
||||
MDriverConfig driverConfig) {
|
||||
this.fromConnectorId = fromConnectorId;
|
||||
this.toConnectorId = toConnectorId;
|
||||
this.fromConnectorName = fromConnectorName;
|
||||
this.toConnectorName = toConnectorName;
|
||||
this.fromLinkId = fromLinkId;
|
||||
this.toLinkId = toLinkId;
|
||||
this.fromConfig = fromConfig;
|
||||
@ -97,8 +97,8 @@ public MJob(MJob other) {
|
||||
public MJob(MJob other, MFromConfig fromConfig, MToConfig toConfig, MDriverConfig driverConfig) {
|
||||
super(other);
|
||||
|
||||
this.fromConnectorId = other.getFromConnectorId();
|
||||
this.toConnectorId = other.getToConnectorId();
|
||||
this.fromConnectorName = other.getFromConnectorName();
|
||||
this.toConnectorName = other.getToConnectorName();
|
||||
this.fromLinkId = other.getFromLinkId();
|
||||
this.toLinkId = other.getToLinkId();
|
||||
this.fromConfig = fromConfig;
|
||||
@ -125,12 +125,12 @@ public long getToLinkId() {
|
||||
return toLinkId;
|
||||
}
|
||||
|
||||
public long getFromConnectorId() {
|
||||
return fromConnectorId;
|
||||
public String getFromConnectorName() {
|
||||
return fromConnectorName;
|
||||
}
|
||||
|
||||
public long getToConnectorId() {
|
||||
return toConnectorId;
|
||||
public String getToConnectorName() {
|
||||
return toConnectorName;
|
||||
}
|
||||
|
||||
public MFromConfig getFromJobConfig() {
|
||||
@ -151,8 +151,8 @@ public MJob clone(boolean cloneWithValue) {
|
||||
return new MJob(this);
|
||||
} else {
|
||||
return new MJob(
|
||||
getFromConnectorId(),
|
||||
getToConnectorId(),
|
||||
getFromConnectorName(),
|
||||
getToConnectorName(),
|
||||
getFromLinkId(),
|
||||
getToLinkId(),
|
||||
getFromJobConfig().clone(false),
|
||||
@ -172,8 +172,8 @@ public boolean equals(Object object) {
|
||||
}
|
||||
|
||||
MJob job = (MJob)object;
|
||||
return (job.getFromConnectorId() == this.getFromConnectorId())
|
||||
&& (job.getToConnectorId() == this.getToConnectorId())
|
||||
return (job.getFromConnectorName().equals(this.getFromConnectorName()))
|
||||
&& (job.getToConnectorName().equals(this.getToConnectorName()))
|
||||
&& (job.getFromLinkId() == this.getFromLinkId())
|
||||
&& (job.getToLinkId() == this.getToLinkId())
|
||||
&& (job.getPersistenceId() == this.getPersistenceId())
|
||||
@ -184,8 +184,8 @@ public boolean equals(Object object) {
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
int result = (int) (fromConnectorId ^ (fromConnectorId >>> 32));
|
||||
result = 31 * result + (int) (toConnectorId ^ (toConnectorId >>> 32));
|
||||
int result = fromConnectorName != null ? fromConnectorName.hashCode() : 0;
|
||||
result = 31 * result + (toConnectorName != null ? toConnectorName.hashCode() : 0);
|
||||
result = 31 * result + (int) (fromLinkId ^ (fromLinkId >>> 32));
|
||||
result = 31 * result + (int) (toLinkId ^ (toLinkId >>> 32));
|
||||
result = 31 * result + (fromConfig != null ? fromConfig.hashCode() : 0);
|
||||
|
@ -63,8 +63,8 @@ public void testJobSerialization() throws ParseException {
|
||||
|
||||
assertEquals(target.getFromLinkId(), 1);
|
||||
assertEquals(target.getToLinkId(), 2);
|
||||
assertEquals(target.getFromConnectorId(), 1);
|
||||
assertEquals(target.getToConnectorId(), 2);
|
||||
assertEquals(target.getFromConnectorName(), "from_ahoj");
|
||||
assertEquals(target.getToConnectorName(), "to_ahoj");
|
||||
assertEquals(created, target.getCreationDate());
|
||||
assertEquals(updated, target.getLastUpdateDate());
|
||||
assertEquals(false, target.getEnabled());
|
||||
|
@ -74,8 +74,8 @@ public void testJobsSerialization() throws ParseException {
|
||||
|
||||
assertEquals(retrievedJob1.getFromLinkId(), 1);
|
||||
assertEquals(retrievedJob1.getToLinkId(), 2);
|
||||
assertEquals(retrievedJob1.getFromConnectorId(), 1);
|
||||
assertEquals(retrievedJob1.getToConnectorId(), 2);
|
||||
assertEquals(retrievedJob1.getFromConnectorName(), "from_ahoj");
|
||||
assertEquals(retrievedJob1.getToConnectorName(), "to_ahoj");
|
||||
assertEquals(created, retrievedJob1.getCreationDate());
|
||||
assertEquals(updated, retrievedJob1.getLastUpdateDate());
|
||||
assertEquals(false, retrievedJob1.getEnabled());
|
||||
|
@ -77,8 +77,10 @@ public static MConnector getConnector(Long id, String name, boolean from, boolea
|
||||
}
|
||||
|
||||
public static MJob getJob(String connectorName) {
|
||||
return new MJob(1, 2, 1, 2, getConnector(1L, connectorName).getFromConfig(), getConnector(1L, connectorName)
|
||||
.getToConfig(), ConfigTestUtil.getDriverConfig());
|
||||
String fromConnectorName = "from_" + connectorName;
|
||||
String toConnectorName = "to_" + connectorName;
|
||||
return new MJob(fromConnectorName, toConnectorName, 1, 2, getConnector(1L, fromConnectorName).getFromConfig(),
|
||||
getConnector(2L, toConnectorName).getToConfig(), ConfigTestUtil.getDriverConfig());
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -36,8 +36,8 @@ public class TestMJob {
|
||||
public void testInitialization() {
|
||||
// Test default constructor
|
||||
MJob job = job();
|
||||
assertEquals(123l, job.getFromConnectorId());
|
||||
assertEquals(456l, job.getToConnectorId());
|
||||
assertEquals("fromConnectorName", job.getFromConnectorName());
|
||||
assertEquals("toConnectorName", job.getToConnectorName());
|
||||
assertEquals("Buffy", job.getCreationUser());
|
||||
assertEquals("Vampire", job.getName());
|
||||
assertEquals(fromConfig(), job.getFromJobConfig());
|
||||
@ -46,8 +46,8 @@ public void testInitialization() {
|
||||
|
||||
// Test copy constructor
|
||||
MJob copy = new MJob(job);
|
||||
assertEquals(123l, copy.getFromConnectorId());
|
||||
assertEquals(456l, copy.getToConnectorId());
|
||||
assertEquals("fromConnectorName", copy.getFromConnectorName());
|
||||
assertEquals("toConnectorName", copy.getToConnectorName());
|
||||
assertEquals("Buffy", copy.getCreationUser());
|
||||
assertEquals("Vampire", copy.getName());
|
||||
assertEquals(fromConfig(), copy.getFromJobConfig());
|
||||
@ -56,8 +56,8 @@ public void testInitialization() {
|
||||
|
||||
// Test constructor for metadata upgrade (the order of configs is different)
|
||||
MJob upgradeCopy = new MJob(job, fromConfig(), toConfig(), driverConfig());
|
||||
assertEquals(123l, upgradeCopy.getFromConnectorId());
|
||||
assertEquals(456l, upgradeCopy.getToConnectorId());
|
||||
assertEquals("fromConnectorName", upgradeCopy.getFromConnectorName());
|
||||
assertEquals("toConnectorName", upgradeCopy.getToConnectorName());
|
||||
assertEquals("Buffy", upgradeCopy.getCreationUser());
|
||||
assertEquals("Vampire", upgradeCopy.getName());
|
||||
assertEquals(fromConfig(), upgradeCopy.getFromJobConfig());
|
||||
@ -98,7 +98,7 @@ public void testClone() {
|
||||
.getConfig("CONFIGFROMNAME").getInput("STRING-INPUT").getValue()); }
|
||||
|
||||
private MJob job() {
|
||||
MJob job = new MJob(123l, 456l, 1L, 2L, fromConfig(), toConfig(), driverConfig());
|
||||
MJob job = new MJob("fromConnectorName", "toConnectorName", 1L, 2L, fromConfig(), toConfig(), driverConfig());
|
||||
job.setName("Vampire");
|
||||
job.setCreationUser("Buffy");
|
||||
return job;
|
||||
|
@ -496,6 +496,7 @@ MLink getLink(long linkId) {
|
||||
return link;
|
||||
}
|
||||
|
||||
// TODO: this method should be removed when MSubmission link job with jobName
|
||||
MJob getJob(long jobId) {
|
||||
MJob job = RepositoryManager.getInstance().getRepository().findJob(jobId);
|
||||
if (job == null) {
|
||||
@ -508,6 +509,18 @@ MJob getJob(long jobId) {
|
||||
return job;
|
||||
}
|
||||
|
||||
MJob getJob(String jobName) {
|
||||
MJob job = RepositoryManager.getInstance().getRepository().findJob(jobName);
|
||||
if (job == null) {
|
||||
throw new SqoopException(DriverError.DRIVER_0004, "Unknown job name: " + jobName);
|
||||
}
|
||||
|
||||
if (!job.getEnabled()) {
|
||||
throw new SqoopException(DriverError.DRIVER_0009, "Job: " + jobName);
|
||||
}
|
||||
return job;
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
private void initializeConnector(JobRequest jobRequest, Direction direction, Initializer initializer, InitializerContext initializerContext) {
|
||||
// Initialize submission from the connector perspective
|
||||
@ -569,8 +582,8 @@ void invokeDestroyerOnJobSuccess(MSubmission submission) {
|
||||
try {
|
||||
MJob job = getJob(submission.getJobId());
|
||||
|
||||
SqoopConnector fromConnector = getSqoopConnector(job.getFromConnectorId());
|
||||
SqoopConnector toConnector = getSqoopConnector(job.getToConnectorId());
|
||||
SqoopConnector fromConnector = getSqoopConnector(job.getFromConnectorName());
|
||||
SqoopConnector toConnector = getSqoopConnector(job.getToConnectorName());
|
||||
|
||||
MLink fromConnection = getLink(job.getFromLinkId());
|
||||
MLink toConnection = getLink(job.getToLinkId());
|
||||
|
@ -447,7 +447,7 @@ private void deleteJobInputsOnly(List<MJob> jobs, RepositoryTransaction tx) {
|
||||
public final void upgradeConnector(MConnector oldConnector, MConnector newConnector) {
|
||||
LOG.info("Upgrading connector: " + oldConnector.getUniqueName());
|
||||
long connectorId = oldConnector.getPersistenceId();
|
||||
String connectorName = oldConnector.getUniqueName();
|
||||
String oldConnectorName = oldConnector.getUniqueName();
|
||||
String oldVersion = oldConnector.getVersion();
|
||||
newConnector.setPersistenceId(connectorId);
|
||||
|
||||
@ -460,7 +460,7 @@ public final void upgradeConnector(MConnector oldConnector, MConnector newConnec
|
||||
// 1. Get an upgrader for the connector
|
||||
ConnectorConfigurableUpgrader upgrader = connector.getConfigurableUpgrader(oldVersion);
|
||||
// 2. Get all links associated with the connector.
|
||||
List<MLink> existingLinksByConnector = findLinksForConnectorUpgrade(connectorName);
|
||||
List<MLink> existingLinksByConnector = findLinksForConnectorUpgrade(oldConnectorName);
|
||||
// 3. Get all jobs associated with the connector.
|
||||
List<MJob> existingJobsByConnector = findJobsForConnectorUpgrade(connectorId);
|
||||
// -- BEGIN TXN --
|
||||
@ -476,7 +476,7 @@ public final void upgradeConnector(MConnector oldConnector, MConnector newConnec
|
||||
// dont always rely on the repository implementation to return empty list for links
|
||||
if (existingLinksByConnector != null) {
|
||||
for (MLink link : existingLinksByConnector) {
|
||||
LOG.info(" Link upgrade for link:" + link.getName() + " for connector:" + connectorName);
|
||||
LOG.info(" Link upgrade for link:" + link.getName() + " for connector:" + oldConnectorName);
|
||||
// Make a new copy of the configs
|
||||
MConfigList linkConfig = newConnector.getLinkConfig().clone(false);
|
||||
MLinkConfig newLinkConfig = new MLinkConfig(linkConfig.getConfigs(), linkConfig.getCloneOfValidators());
|
||||
@ -496,7 +496,7 @@ public final void upgradeConnector(MConnector oldConnector, MConnector newConnec
|
||||
// and stop the bootup of Sqoop server
|
||||
logInvalidModelObject("link", newlink, validationResult);
|
||||
upgradeSuccessful = false;
|
||||
LOG.info(" LINK config upgrade FAILED for link: " + link.getName() + " for connector:" + connectorName);
|
||||
LOG.info(" LINK config upgrade FAILED for link: " + link.getName() + " for connector:" + oldConnectorName);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -506,14 +506,15 @@ public final void upgradeConnector(MConnector oldConnector, MConnector newConnec
|
||||
for (MJob job : existingJobsByConnector) {
|
||||
// every job has 2 parts, the FROM and the TO links and their
|
||||
// corresponding connectors.
|
||||
LOG.info(" Job upgrade for job:" + job.getName()+ " for connector:" + connectorName);
|
||||
LOG.info(" Job upgrade for job:" + job.getName()+ " for connector:" + oldConnectorName);
|
||||
|
||||
SupportedDirections supportedDirections = newConnector.getSupportedDirections();
|
||||
|
||||
// compare the old connector name with job's connector name
|
||||
if (supportedDirections.isDirectionSupported(Direction.FROM)
|
||||
&& job.getFromConnectorId() == newConnector.getPersistenceId()
|
||||
&& job.getFromConnectorName().equals(oldConnectorName)
|
||||
&& supportedDirections.isDirectionSupported(Direction.TO)
|
||||
&& job.getToConnectorId() == newConnector.getPersistenceId()) {
|
||||
&& job.getToConnectorName().equals(oldConnectorName)) {
|
||||
// Upgrade both configs
|
||||
MFromConfig newFromConfig = new MFromConfig(newConnector.getFromConfig().clone(false).getConfigs(), newConnector.getFromConfig().getCloneOfValidators());
|
||||
MFromConfig oldFromConfig = job.getFromJobConfig();
|
||||
@ -528,16 +529,15 @@ public final void upgradeConnector(MConnector oldConnector, MConnector newConnec
|
||||
newJob.getFromJobConfig().getConfigs(),
|
||||
connector.getJobConfigurationClass(Direction.FROM)
|
||||
);
|
||||
|
||||
if (validationResult.getStatus().canProceed()) {
|
||||
updateJob(newJob, tx);
|
||||
} else {
|
||||
logInvalidModelObject("job", newJob, validationResult);
|
||||
upgradeSuccessful = false;
|
||||
LOG.error(" JOB config upgrade FAILED for job: " + job.getName() + " for connector:" + connectorName);
|
||||
LOG.error(" JOB config upgrade FAILED for job: " + job.getName() + " for connector:" + oldConnectorName);
|
||||
}
|
||||
} else if (supportedDirections.isDirectionSupported(Direction.FROM)
|
||||
&& job.getFromConnectorId() == newConnector.getPersistenceId()) {
|
||||
&& job.getFromConnectorName().equals(oldConnectorName)) {
|
||||
MFromConfig newFromConfig = new MFromConfig(newConnector.getFromConfig().clone(false).getConfigs(), newConnector.getFromConfig().getCloneOfValidators());
|
||||
MFromConfig oldFromConfig = job.getFromJobConfig();
|
||||
upgrader.upgradeFromJobConfig(oldFromConfig, newFromConfig);
|
||||
@ -556,10 +556,10 @@ public final void upgradeConnector(MConnector oldConnector, MConnector newConnec
|
||||
} else {
|
||||
logInvalidModelObject("job", newJob, validationResult);
|
||||
upgradeSuccessful = false;
|
||||
LOG.error(" FROM JOB config upgrade FAILED for job: " + job.getName() + " for connector:" + connectorName);
|
||||
LOG.error(" FROM JOB config upgrade FAILED for job: " + job.getName() + " for connector:" + oldConnectorName);
|
||||
}
|
||||
} else if (supportedDirections.isDirectionSupported(Direction.TO)
|
||||
&& job.getToConnectorId() == newConnector.getPersistenceId()) {
|
||||
&& job.getToConnectorName().equals(oldConnectorName)) {
|
||||
MToConfig oldToConfig = job.getToJobConfig();
|
||||
MToConfig newToConfig = new MToConfig(newConnector.getToConfig().clone(false).getConfigs(), newConnector.getToConfig().getCloneOfValidators());
|
||||
upgrader.upgradeToJobConfig(oldToConfig, newToConfig);
|
||||
@ -578,7 +578,7 @@ public final void upgradeConnector(MConnector oldConnector, MConnector newConnec
|
||||
} else {
|
||||
logInvalidModelObject("job", newJob, validationResult);
|
||||
upgradeSuccessful = false;
|
||||
LOG.error(" TO JOB config upgrade FAILED for job: " + job.getName() + " for connector:" + connectorName);
|
||||
LOG.error(" TO JOB config upgrade FAILED for job: " + job.getName() + " for connector:" + oldConnectorName);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -587,7 +587,7 @@ public final void upgradeConnector(MConnector oldConnector, MConnector newConnec
|
||||
if (upgradeSuccessful) {
|
||||
tx.commit();
|
||||
} else {
|
||||
throw new SqoopException(RepositoryError.JDBCREPO_0027, " for connector:" + connectorName);
|
||||
throw new SqoopException(RepositoryError.JDBCREPO_0027, " for connector:" + oldConnectorName);
|
||||
}
|
||||
} catch (SqoopException ex) {
|
||||
if (tx != null) {
|
||||
@ -603,7 +603,7 @@ public final void upgradeConnector(MConnector oldConnector, MConnector newConnec
|
||||
if (tx != null) {
|
||||
tx.close();
|
||||
}
|
||||
LOG.info("Connector upgrade finished for: " + connectorName);
|
||||
LOG.info("Connector upgrade finished for: " + oldConnectorName);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -141,19 +141,19 @@ public void testDisabledLink() {
|
||||
|
||||
@Test
|
||||
public void testGetJob() {
|
||||
MJob testJob = job(123l, 456l);
|
||||
MJob testJob = job("jobName", "fromConnectorName", "toConnectorName");
|
||||
testJob.setEnabled(true);
|
||||
MJob mJobSpy = org.mockito.Mockito.spy(testJob);
|
||||
when(repositoryManagerMock.getRepository()).thenReturn(jdbcRepoMock);
|
||||
when(jdbcRepoMock.findJob(123l)).thenReturn(mJobSpy);
|
||||
assertEquals(jobManager.getJob(123l), mJobSpy);
|
||||
when(jdbcRepoMock.findJob("jobName")).thenReturn(mJobSpy);
|
||||
assertEquals(jobManager.getJob("jobName"), mJobSpy);
|
||||
verify(repositoryManagerMock, times(1)).getRepository();
|
||||
verify(jdbcRepoMock, times(1)).findJob(123l);
|
||||
verify(jdbcRepoMock, times(1)).findJob("jobName");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDisabledJob() {
|
||||
MJob testJob = job(123l, 456l);
|
||||
MJob testJob = job("jobName", "fromConnectorName", "toConnectorName");
|
||||
testJob.setEnabled(false);
|
||||
testJob.setPersistenceId(1111);
|
||||
SqoopException exception = new SqoopException(DriverError.DRIVER_0009, "Job: "
|
||||
@ -161,13 +161,13 @@ public void testDisabledJob() {
|
||||
|
||||
MJob mJobSpy = org.mockito.Mockito.spy(testJob);
|
||||
when(repositoryManagerMock.getRepository()).thenReturn(jdbcRepoMock);
|
||||
when(jdbcRepoMock.findJob(123l)).thenReturn(mJobSpy);
|
||||
when(jdbcRepoMock.findJob("jobName")).thenReturn(mJobSpy);
|
||||
try {
|
||||
jobManager.getJob(123l);
|
||||
jobManager.getJob("jobName");
|
||||
} catch (SqoopException ex) {
|
||||
assertEquals(ex.getMessage(), exception.getMessage());
|
||||
verify(repositoryManagerMock, times(1)).getRepository();
|
||||
verify(jdbcRepoMock, times(1)).findJob(123l);
|
||||
verify(jdbcRepoMock, times(1)).findJob("jobName");
|
||||
}
|
||||
}
|
||||
|
||||
@ -187,9 +187,9 @@ public void testUnknownJob() {
|
||||
}
|
||||
}
|
||||
|
||||
private MJob job(long fromId, long toId) {
|
||||
MJob job = new MJob(fromId, toId, 1L, 2L, null, null, null);
|
||||
job.setName("Vampire");
|
||||
private MJob job(String jobName, String fromConnectorName, String toConnectorName) {
|
||||
MJob job = new MJob(fromConnectorName, toConnectorName, 1L, 2L, null, null, null);
|
||||
job.setName(jobName);
|
||||
job.setCreationUser("Buffy");
|
||||
return job;
|
||||
}
|
||||
|
@ -229,7 +229,7 @@ public void testConnectorConfigUpgradeWithValidLinksAndJobs() {
|
||||
// prepare the links and jobs
|
||||
// the connector Id for both are the same
|
||||
List<MLink> linkList = links(link(1, "LA", oldConnector.getUniqueName()), link(2, "LB", oldConnector.getUniqueName()));
|
||||
List<MJob> jobList = jobs(job(1, "JA", 1, 1, 1, 1), job(2, "JB", 1, 1, 2, 2));
|
||||
List<MJob> jobList = jobs(job(1, "JA", "A1", "A1", 1, 1), job(2, "JB", "A1", "A1", 2, 2));
|
||||
|
||||
// mock necessary methods for upgradeConnector() procedure
|
||||
doReturn(linkList).when(repoSpy).findLinksForConnectorUpgrade(anyString());
|
||||
@ -277,7 +277,7 @@ public void testDriverConfigUpgradeWithValidJobs() {
|
||||
|
||||
when(driverMock.getConfigurableUpgrader(DriverBean.CURRENT_DRIVER_VERSION)).thenReturn(driverUpgraderMock);
|
||||
when(driverMock.getDriverJobConfigurationClass()).thenReturn(ValidConfiguration.class);
|
||||
List<MJob> jobList = jobs(job(1, "JA", 1, 1, 1, 1), job(2, "JB", 1, 1, 2, 1));
|
||||
List<MJob> jobList = jobs(job(1, "JA", "fromConnectorName", "toConnectorName", 1, 1), job(2, "JB", "fromConnectorName", "toConnectorName", 2, 1));
|
||||
|
||||
doReturn(jobList).when(repoSpy).findJobs();
|
||||
doNothing().when(repoSpy).updateLink(any(MLink.class), any(RepositoryTransaction.class));
|
||||
@ -315,7 +315,7 @@ public void testDriverConfigUpgradeWithInvalidJobs() {
|
||||
|
||||
when(driverMock.getConfigurableUpgrader(DriverBean.CURRENT_DRIVER_VERSION)).thenReturn(driverUpgraderMock);
|
||||
when(driverMock.getDriverJobConfigurationClass()).thenReturn(InvalidConfiguration.class);
|
||||
List<MJob> jobList = jobs(job(1, "JA", 1, 1, 1, 1), job(2, "JB", 1, 1, 2, 1));
|
||||
List<MJob> jobList = jobs(job(1, "JA", "fromConnectorName", "toConnectorName", 1, 1), job(2, "JB", "fromConnectorName", "toConnectorName", 2, 1));
|
||||
|
||||
doReturn(jobList).when(repoSpy).findJobs();
|
||||
doNothing().when(repoSpy).updateJob(any(MJob.class), any(RepositoryTransaction.class));
|
||||
@ -425,7 +425,7 @@ public void testConnectorConfigUpgradeHandlerWithDeleteJobInputsError() {
|
||||
when(connectorMgrMock.getSqoopConnector(anyString())).thenReturn(sqconnector);
|
||||
|
||||
List<MLink> linkList = links(link(1, "LA", oldConnector.getUniqueName()), link(2, "LB", oldConnector.getUniqueName()));
|
||||
List<MJob> jobList = jobs(job(1, "JA", 1, 1, 1, 1), job(2, "JB", 1, 1, 2, 1));
|
||||
List<MJob> jobList = jobs(job(1, "JA", "fromConnectorName", "toConnectorName", 1, 1), job(2, "JB", "fromConnectorName", "toConnectorName", 2, 1));
|
||||
doReturn(linkList).when(repoHandlerMock).findLinksForConnectorUpgrade(anyString(), any(Connection.class));
|
||||
doReturn(jobList).when(repoHandlerMock).findJobsForConnectorUpgrade
|
||||
(anyLong(), any(Connection.class));
|
||||
@ -463,7 +463,7 @@ public void testConnectorConfigUpgradeHandlerWithDeleteLinkInputsError() {
|
||||
when(connectorMgrMock.getSqoopConnector(anyString())).thenReturn(sqconnector);
|
||||
|
||||
List<MLink> linkList = links(link(1, "LA", oldConnector.getUniqueName()), link(2, "LB", oldConnector.getUniqueName()));
|
||||
List<MJob> jobList = jobs(job(1, "JA", 1, 1, 1, 1), job(2, "JB", 1, 1, 2, 1));
|
||||
List<MJob> jobList = jobs(job(1, "JA", "fromConnectorName", "toConnectorName", 1, 1), job(2, "JB", "fromConnectorName", "toConnectorName", 2, 1));
|
||||
doReturn(linkList).when(repoHandlerMock).findLinksForConnectorUpgrade(anyString(), any(Connection.class));
|
||||
doReturn(jobList).when(repoHandlerMock).findJobsForConnectorUpgrade(anyLong(), any(Connection.class));
|
||||
doNothing().when(repoHandlerMock).deleteJobInputs(anyString(), any(Connection.class));
|
||||
@ -501,7 +501,7 @@ public void testConnectorConfigUpgradeHandlerWithUpdateConnectorError() {
|
||||
when(connectorMgrMock.getSqoopConnector(anyString())).thenReturn(sqconnector);
|
||||
|
||||
List<MLink> linkList = links(link(1, "LA", oldConnector.getUniqueName()), link(2, "LB", oldConnector.getUniqueName()));
|
||||
List<MJob> jobList = jobs(job(1, "JA", 1, 1, 1, 1), job(2, "JB", 1, 1, 2, 1));
|
||||
List<MJob> jobList = jobs(job(1, "JA", "fromConnectorName", "toConnectorName", 1, 1), job(2, "JB", "fromConnectorName", "toConnectorName", 2, 1));
|
||||
doReturn(linkList).when(repoHandlerMock).findLinksForConnectorUpgrade(anyString(), any(Connection.class));
|
||||
doReturn(jobList).when(repoHandlerMock).findJobsForConnectorUpgrade(anyLong(), any(Connection.class));
|
||||
doNothing().when(repoHandlerMock).deleteJobInputs(anyString(), any(Connection.class));
|
||||
@ -543,7 +543,7 @@ public void testConnectorConfigUpgradeHandlerWithUpdateLinkError() {
|
||||
when(connectorMgrMock.getSqoopConnector(anyString())).thenReturn(sqconnector);
|
||||
|
||||
List<MLink> linkList = links(link(1, "LA", oldConnector.getUniqueName()), link(2, "LB", oldConnector.getUniqueName()));
|
||||
List<MJob> jobList = jobs(job(1, "JA", 1, 1, 1, 1), job(2, "JB", 1, 1, 2, 1));
|
||||
List<MJob> jobList = jobs(job(1, "JA", "fromConnectorName", "toConnectorName", 1, 1), job(2, "JB", "fromConnectorName", "toConnectorName", 2, 1));
|
||||
doReturn(linkList).when(repoHandlerMock).findLinksForConnectorUpgrade(anyString(), any(Connection.class));
|
||||
doReturn(jobList).when(repoHandlerMock).findJobsForConnectorUpgrade(anyLong(), any(Connection.class));
|
||||
doNothing().when(repoHandlerMock).deleteJobInputs(anyString(), any(Connection.class));
|
||||
@ -589,7 +589,7 @@ public void testConnectorConfigUpgradeHandlerWithUpdateJobError() {
|
||||
when(connectorMgrMock.getSqoopConnector(anyString())).thenReturn(sqconnector);
|
||||
|
||||
List<MLink> linkList = links(link(1, "LA", oldConnector.getUniqueName()), link(2, "LB", oldConnector.getUniqueName()));;
|
||||
List<MJob> jobList = jobs(job(1, "JA", 1, 1, 1, 1), job(2, "JB", 1, 1, 2, 1));
|
||||
List<MJob> jobList = jobs(job(1, "JA", "A1", "A1", 1, 1), job(2, "JB", "A1", "A1", 2, 1));
|
||||
doReturn(linkList).when(repoHandlerMock).findLinksForConnectorUpgrade(anyString(), any(Connection.class));
|
||||
doReturn(jobList).when(repoHandlerMock).findJobsForConnectorUpgrade(anyLong(), any(Connection.class));
|
||||
doNothing().when(repoHandlerMock).deleteJobInputs(anyString(), any(Connection.class));
|
||||
@ -659,7 +659,7 @@ public void testDriverConfigUpgradeHandlerWithDeleteJobInputsError() {
|
||||
|
||||
when(driverMock.getConfigurableUpgrader(DriverBean.CURRENT_DRIVER_VERSION)).thenReturn(driverUpgraderMock);
|
||||
|
||||
List<MJob> jobList = jobs(job(1, "JA", 1, 1, 1, 1), job(2, "JB", 1, 1, 2, 1));
|
||||
List<MJob> jobList = jobs(job(1, "JA", "fromConnectorName", "toConnectorName", 1, 1), job(2, "JB", "fromConnectorName", "toConnectorName", 2, 1));
|
||||
doReturn(jobList).when(repoHandlerMock).findJobs(any(Connection.class));
|
||||
|
||||
SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
|
||||
@ -689,7 +689,7 @@ public void testDriverConfigUpgradeHandlerWithUpdateDriverConfigError() {
|
||||
|
||||
when(driverMock.getConfigurableUpgrader(DriverBean.CURRENT_DRIVER_VERSION)).thenReturn(driverUpgraderMock);
|
||||
|
||||
List<MJob> jobList = jobs(job(1, "JA", 1, 1, 1, 1), job(2, "JB", 1, 1, 2, 1));
|
||||
List<MJob> jobList = jobs(job(1, "JA", "fromConnectorName", "toConnectorName", 1, 1), job(2, "JB", "fromConnectorName", "toConnectorName", 2, 1));
|
||||
doReturn(jobList).when(repoHandlerMock).findJobs(any(Connection.class));
|
||||
doNothing().when(repoHandlerMock).deleteJobInputs(anyString(), any(Connection.class));
|
||||
doNothing().when(repoHandlerMock).deleteLinkInputs(anyString(), any(Connection.class));
|
||||
@ -723,7 +723,7 @@ public void testDriverConfigUpgradeHandlerWithUpdateJobError() {
|
||||
|
||||
when(driverMock.getConfigurableUpgrader(DriverBean.CURRENT_DRIVER_VERSION)).thenReturn(driverUpgraderMock);
|
||||
when(driverMock.getDriverJobConfigurationClass()).thenReturn(ValidConfiguration.class);
|
||||
List<MJob> jobList = jobs(job(1, "JA", 1, 1, 1, 1), job(2, "JB", 1, 1, 2, 1));
|
||||
List<MJob> jobList = jobs(job(1, "JA", "fromConnectorName", "toConnectorName", 1, 1), job(2, "JB", "fromConnectorName", "toConnectorName", 2, 1));
|
||||
doReturn(jobList).when(repoHandlerMock).findJobs(any(Connection.class));
|
||||
doNothing().when(repoHandlerMock).deleteJobInputs(anyString(), any(Connection.class));
|
||||
doNothing().when(repoHandlerMock).upgradeDriverAndConfigs(any(MDriver.class), any(Connection.class));
|
||||
@ -782,8 +782,8 @@ private MLink link(long linkId, String linkName, String connectorName) {
|
||||
return link;
|
||||
}
|
||||
|
||||
private MJob job(long id, String jobName, long fromConnectorId, long toConnectorId, long fromLinkId, long toLinkId) {
|
||||
MJob job = new MJob(fromConnectorId, toConnectorId, fromLinkId, toLinkId,
|
||||
private MJob job(long id, String jobName, String fromConnectorName, String toConnectorName, long fromLinkId, long toLinkId) {
|
||||
MJob job = new MJob(fromConnectorName, toConnectorName, fromLinkId, toLinkId,
|
||||
new MFromConfig(new LinkedList<MConfig>(), new LinkedList<MValidator>()),
|
||||
new MToConfig(new LinkedList<MConfig>(), new LinkedList<MValidator>()),
|
||||
new MDriverConfig(new LinkedList<MConfig>(), new LinkedList<MValidator>()));
|
||||
|
@ -1539,7 +1539,6 @@ private List<MLink> loadLinksForUpgrade(PreparedStatement stmt,
|
||||
connectorConfigInputStatement, 2, conn);
|
||||
|
||||
MLink link = new MLink(connectorName, new MLinkConfig(connectorLinkConfig, Collections.EMPTY_LIST));
|
||||
|
||||
link.setPersistenceId(id);
|
||||
link.setName(name);
|
||||
link.setCreationUser(creationUser);
|
||||
@ -1581,7 +1580,6 @@ private List<MLink> loadLinks(PreparedStatement stmt,
|
||||
loadInputsForConfigs(connectorLinkConfig, configStmt, inputStmt);
|
||||
|
||||
MLink link = new MLink(connectorName, connectorLinkConfig);
|
||||
|
||||
link.setPersistenceId(id);
|
||||
link.setName(name);
|
||||
link.setCreationUser(creationUser);
|
||||
@ -1624,6 +1622,8 @@ private List<MJob> loadJobsForUpgrade(PreparedStatement stmt,
|
||||
Date creationDate = rsJob.getTimestamp(9);
|
||||
String updateBy = rsJob.getString(10);
|
||||
Date lastUpdateDate = rsJob.getTimestamp(11);
|
||||
String fromConnectorName = rsJob.getString(12);
|
||||
String toConnectorName = rsJob.getString(13);
|
||||
|
||||
fromConfigFetchStmt.setLong(1, fromConnectorId);
|
||||
toConfigFetchStmt.setLong(1,toConnectorId);
|
||||
@ -1652,7 +1652,7 @@ private List<MJob> loadJobsForUpgrade(PreparedStatement stmt,
|
||||
loadDriverConfigs(driverConfig, driverConfigfetchStmt, jobInputFetchStmt, 2, conn);
|
||||
|
||||
MJob job = new MJob(
|
||||
fromConnectorId, toConnectorId,
|
||||
fromConnectorName, toConnectorName,
|
||||
fromLinkId, toLinkId,
|
||||
new MFromConfig(fromConnectorFromJobConfig, Collections.EMPTY_LIST),
|
||||
new MToConfig(toConnectorToJobConfig, Collections.EMPTY_LIST),
|
||||
@ -1727,7 +1727,7 @@ private List<MJob> loadJobs(PreparedStatement stmt,
|
||||
loadDriverConfigs(driverConfig, driverConfigfetchStmt, jobInputFetchStmt, 2, conn);
|
||||
|
||||
MJob job = new MJob(
|
||||
fromConnectorId, toConnectorId,
|
||||
fromConnectorName, toConnectorName,
|
||||
fromLinkId, toLinkId,
|
||||
new MFromConfig(mFromConfig.getConfigs(), Collections.EMPTY_LIST),
|
||||
new MToConfig(mToConfig.getConfigs(), Collections.EMPTY_LIST),
|
||||
|
@ -343,7 +343,7 @@ public void testDeleteJob() throws Exception {
|
||||
}
|
||||
|
||||
public MJob getJob() {
|
||||
return new MJob(1, 1, 1, 1, handler.findConnector("A", derbyConnection).getFromConfig(),
|
||||
return new MJob("A", "A", 1, 1, handler.findConnector("A", derbyConnection).getFromConfig(),
|
||||
handler.findConnector("A", derbyConnection).getToConfig(), handler.findDriver(
|
||||
MDriver.DRIVER_NAME, derbyConnection).getDriverConfig());
|
||||
}
|
||||
|
@ -105,8 +105,8 @@ protected MJob getJob(String name, MConnector connectorA,
|
||||
MConnector connectorB, MLink linkA, MLink linkB) {
|
||||
MDriver driver = handler.findDriver(MDriver.DRIVER_NAME,
|
||||
provider.getConnection());
|
||||
MJob job = new MJob(connectorA.getPersistenceId(),
|
||||
connectorB.getPersistenceId(), linkA.getPersistenceId(),
|
||||
MJob job = new MJob(connectorA.getUniqueName(),
|
||||
connectorB.getUniqueName(), linkA.getPersistenceId(),
|
||||
linkB.getPersistenceId(), connectorA.getFromConfig(),
|
||||
connectorB.getToConfig(), driver.getDriverConfig());
|
||||
job.setName(name);
|
||||
|
@ -92,8 +92,8 @@ protected MLink getLink(String name, MConnector connector) {
|
||||
protected MJob getJob(String name, MConnector connectorA, MConnector connectorB, MLink linkA, MLink linkB) {
|
||||
MDriver driver = handler.findDriver(MDriver.DRIVER_NAME, provider.getConnection());
|
||||
MJob job = new MJob(
|
||||
connectorA.getPersistenceId(),
|
||||
connectorB.getPersistenceId(),
|
||||
connectorA.getUniqueName(),
|
||||
connectorB.getUniqueName(),
|
||||
linkA.getPersistenceId(),
|
||||
linkB.getPersistenceId(),
|
||||
connectorA.getFromConfig(),
|
||||
|
@ -191,9 +191,9 @@ private JsonBean createUpdateJob(RequestContext ctx, boolean create) {
|
||||
|
||||
// Verify that user is not trying to spoof us
|
||||
MFromConfig fromConfig = ConnectorManager.getInstance()
|
||||
.getConnectorConfigurable(postedJob.getFromConnectorId()).getFromConfig();
|
||||
.getConnectorConfigurable(postedJob.getFromConnectorName()).getFromConfig();
|
||||
MToConfig toConfig = ConnectorManager.getInstance()
|
||||
.getConnectorConfigurable(postedJob.getToConnectorId()).getToConfig();
|
||||
.getConnectorConfigurable(postedJob.getToConnectorName()).getToConfig();
|
||||
MDriverConfig driverConfig = Driver.getInstance().getDriver().getDriverConfig();
|
||||
|
||||
if (!fromConfig.equals(postedJob.getFromJobConfig())
|
||||
@ -213,9 +213,9 @@ private JsonBean createUpdateJob(RequestContext ctx, boolean create) {
|
||||
|
||||
// Corresponding connectors for this
|
||||
SqoopConnector fromConnector = ConnectorManager.getInstance().getSqoopConnector(
|
||||
postedJob.getFromConnectorId());
|
||||
postedJob.getFromConnectorName());
|
||||
SqoopConnector toConnector = ConnectorManager.getInstance().getSqoopConnector(
|
||||
postedJob.getToConnectorId());
|
||||
postedJob.getToConnectorName());
|
||||
|
||||
if (!fromConnector.getSupportedDirections().contains(Direction.FROM)) {
|
||||
throw new SqoopException(ServerError.SERVER_0004, "Connector "
|
||||
@ -325,16 +325,17 @@ private JobsBean createJobsBean(List<MJob> jobs, Locale locale) {
|
||||
private void addConnectorConfigBundle(JobBean bean, Locale locale) {
|
||||
// Add associated resources into the bean
|
||||
for (MJob job : bean.getJobs()) {
|
||||
long fromConnectorId = job.getFromConnectorId();
|
||||
long toConnectorId = job.getToConnectorId();
|
||||
String fromConnectorName = job.getFromConnectorName();
|
||||
String toConnectorName = job.getToConnectorName();
|
||||
|
||||
// replace it only if it does not already exist
|
||||
if (!bean.hasConnectorConfigBundle(fromConnectorId)) {
|
||||
bean.addConnectorConfigBundle(fromConnectorId, ConnectorManager.getInstance()
|
||||
.getResourceBundle(fromConnectorId, locale));
|
||||
if (!bean.hasConnectorConfigBundle(fromConnectorName)) {
|
||||
bean.addConnectorConfigBundle(fromConnectorName, ConnectorManager.getInstance()
|
||||
.getResourceBundle(fromConnectorName, locale));
|
||||
}
|
||||
if (!bean.hasConnectorConfigBundle(toConnectorId)) {
|
||||
bean.addConnectorConfigBundle(toConnectorId, ConnectorManager.getInstance()
|
||||
.getResourceBundle(toConnectorId, locale));
|
||||
if (!bean.hasConnectorConfigBundle(toConnectorName)) {
|
||||
bean.addConnectorConfigBundle(toConnectorName, ConnectorManager.getInstance()
|
||||
.getResourceBundle(toConnectorName, locale));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -67,13 +67,10 @@ private Status cloneJob(String jobArg, List<String> args, boolean isInteractive)
|
||||
MJob job = client.getJob(jobArg);
|
||||
job.setPersistenceId(MPersistableEntity.PERSISTANCE_ID_DEFAULT);
|
||||
|
||||
// TODO: the job should be related with connector by connectorName
|
||||
MConnector fromConnector = getClient().getConnector(job.getFromConnectorId());
|
||||
MConnector toConnector = getClient().getConnector(job.getToConnectorId());
|
||||
ResourceBundle fromConnectorBundle = client.getConnectorConfigBundle(
|
||||
fromConnector.getUniqueName());
|
||||
job.getFromConnectorName());
|
||||
ResourceBundle toConnectorBundle = client.getConnectorConfigBundle(
|
||||
toConnector.getUniqueName());
|
||||
job.getToConnectorName());
|
||||
ResourceBundle driverConfigBundle = client.getDriverConfigBundle();
|
||||
|
||||
Status status = Status.OK;
|
||||
|
@ -77,13 +77,13 @@ private Status createJob(String fromLinkArg, String toLinkArg, List<String> args
|
||||
ConsoleReader reader = getConsoleReader();
|
||||
MJob job = getClient().createJob(fromLinkArg, toLinkArg);
|
||||
|
||||
MConnector fromConnector = getClient().getConnector(job.getFromConnectorId());
|
||||
MConnector fromConnector = getClient().getConnector(job.getFromConnectorName());
|
||||
if (!fromConnector.getSupportedDirections().isDirectionSupported(Direction.FROM)) {
|
||||
errorMessage("Connector " + fromConnector.getUniqueName() + " does not support direction " + Direction.FROM);
|
||||
return Status.ERROR;
|
||||
}
|
||||
|
||||
MConnector toConnector = getClient().getConnector(job.getToConnectorId());
|
||||
MConnector toConnector = getClient().getConnector(job.getToConnectorName());
|
||||
if (!toConnector.getSupportedDirections().isDirectionSupported(Direction.TO)) {
|
||||
errorMessage("Connector " + toConnector.getUniqueName() + " does not support direction " + Direction.TO);
|
||||
return Status.ERROR;
|
||||
|
@ -99,12 +99,7 @@ private void showSummary() {
|
||||
if (fromLink != null) {
|
||||
fromLinkName = fromLink.getName();
|
||||
}
|
||||
String fromConnectorName = "";
|
||||
MConnector fromConnector = client.getConnector(job.getFromConnectorId());
|
||||
if (fromConnector != null) {
|
||||
fromConnectorName = fromConnector.getUniqueName();
|
||||
}
|
||||
fromConnectors.add(fromLinkName + " (" + fromConnectorName + ")");
|
||||
fromConnectors.add(fromLinkName + " (" + job.getFromConnectorName() + ")");
|
||||
|
||||
// To link and connector
|
||||
String toLinkName = "";
|
||||
@ -112,12 +107,7 @@ private void showSummary() {
|
||||
if (toLink != null) {
|
||||
toLinkName = toLink.getName();
|
||||
}
|
||||
String toConnnectorName = "";
|
||||
MConnector toConnector = client.getConnector(job.getToConnectorId());
|
||||
if (toConnector != null) {
|
||||
toConnnectorName = toConnector.getUniqueName();
|
||||
}
|
||||
toConnectors.add(toLinkName + " (" + toConnnectorName + ")");
|
||||
toConnectors.add(toLinkName + " (" + job.getToConnectorName() + ")");
|
||||
|
||||
availabilities.add(String.valueOf(job.getEnabled()));
|
||||
}
|
||||
@ -160,19 +150,15 @@ private void displayJob(MJob job) {
|
||||
formatter.format(job.getLastUpdateDate())
|
||||
);
|
||||
|
||||
// TODO: should be removed when MJob link with Connector by name.
|
||||
MConnector fromConnector = getClient().getConnector(job.getFromConnectorId());
|
||||
MConnector toConnector = getClient().getConnector(job.getToConnectorId());
|
||||
|
||||
displayConfig(job.getDriverConfig().getConfigs(),
|
||||
client.getDriverConfigBundle());
|
||||
printlnResource(Constants.RES_SHOW_PROMPT_JOB_FROM_LID_INFO,
|
||||
job.getFromLinkId());
|
||||
displayConfig(job.getFromJobConfig().getConfigs(),
|
||||
client.getConnectorConfigBundle(fromConnector.getUniqueName()));
|
||||
client.getConnectorConfigBundle(job.getFromConnectorName()));
|
||||
printlnResource(Constants.RES_SHOW_PROMPT_JOB_TO_LID_INFO,
|
||||
job.getToLinkId());
|
||||
displayConfig(job.getToJobConfig().getConfigs(),
|
||||
client.getConnectorConfigBundle(toConnector.getUniqueName()));
|
||||
client.getConnectorConfigBundle(job.getToConnectorName()));
|
||||
}
|
||||
}
|
||||
|
@ -68,14 +68,10 @@ private Status updateJob(String jobArg, List<String> args, boolean isInteractive
|
||||
// TODO(SQOOP-1634): using from/to and driver config id, this call can be avoided
|
||||
MJob job = client.getJob(jobArg);
|
||||
|
||||
// TODO: should be removed when MJob link with Connector by name.
|
||||
MConnector fromConnector = getClient().getConnector(job.getFromConnectorId());
|
||||
MConnector toConnector = getClient().getConnector(job.getToConnectorId());
|
||||
|
||||
ResourceBundle fromConnectorBundle = client.getConnectorConfigBundle(
|
||||
fromConnector.getUniqueName());
|
||||
job.getFromConnectorName());
|
||||
ResourceBundle toConnectorBundle = client.getConnectorConfigBundle(
|
||||
toConnector.getUniqueName());
|
||||
job.getToConnectorName());
|
||||
ResourceBundle driverConfigBundle = client.getDriverConfigBundle();
|
||||
|
||||
Status status = Status.OK;
|
||||
|
@ -155,7 +155,7 @@ public void testCloneLinkInteractive() {
|
||||
@Test
|
||||
public void testCloneJob() {
|
||||
ShellEnvironment.setInteractive(false);
|
||||
MJob job = new MJob(1L, 2L, 1L, 2L,
|
||||
MJob job = new MJob("fromConnectorName", "toConnectorName", 1L, 2L,
|
||||
new MFromConfig(new ArrayList<MConfig>(), new ArrayList<MValidator>()),
|
||||
new MToConfig(new ArrayList<MConfig>(), new ArrayList<MValidator>()),
|
||||
new MDriverConfig(new ArrayList<MConfig>(), new ArrayList<MValidator>()));
|
||||
@ -192,7 +192,7 @@ public void testCloneJob() {
|
||||
public void testCloneJobInteractive() {
|
||||
ShellEnvironment.setInteractive(true);
|
||||
initEnv();
|
||||
MJob job = new MJob(1, 2, 1, 2, new MFromConfig(getConfig("fromJobConfig"), new ArrayList<MValidator>()),
|
||||
MJob job = new MJob("fromConnectorName", "toConnectorName", 1, 2, new MFromConfig(getConfig("fromJobConfig"), new ArrayList<MValidator>()),
|
||||
new MToConfig(getConfig("toJobConfig"), new ArrayList<MValidator>()),
|
||||
new MDriverConfig(getConfig("driverConfig"), new ArrayList<MValidator>()));
|
||||
when(client.getJob("job_test")).thenReturn(job);
|
||||
|
@ -182,11 +182,11 @@ public void testCreateJob() {
|
||||
MConnector fromConnector = new MConnector("connector_from", "", "", null, new MFromConfig(new ArrayList<MConfig>(), new ArrayList<MValidator>()), null);
|
||||
MConnector toConnector = new MConnector("connector_to", "", "", null, null, new MToConfig(new ArrayList<MConfig>(), new ArrayList<MValidator>()));
|
||||
when(client.createJob("link_from", "link_to")).thenReturn(
|
||||
new MJob(1, 2, 1, 2, new MFromConfig(new ArrayList<MConfig>(), new ArrayList<MValidator>()),
|
||||
new MJob("fromConnectorName", "toConnectorName", 1, 2, new MFromConfig(new ArrayList<MConfig>(), new ArrayList<MValidator>()),
|
||||
new MToConfig(new ArrayList<MConfig>(), new ArrayList<MValidator>()),
|
||||
new MDriverConfig(new ArrayList<MConfig>(), new ArrayList<MValidator>())));
|
||||
when(client.getConnector(1)).thenReturn(fromConnector);
|
||||
when(client.getConnector(2)).thenReturn(toConnector);
|
||||
when(client.getConnector("fromConnectorName")).thenReturn(fromConnector);
|
||||
when(client.getConnector("toConnectorName")).thenReturn(toConnector);
|
||||
when(client.saveJob(any(MJob.class))).thenReturn(Status.OK);
|
||||
|
||||
// create job -f link_from -to link_to
|
||||
@ -222,12 +222,12 @@ public void testCreateJobInteractive() {
|
||||
initEnv();
|
||||
MConnector fromConnector = new MConnector("connector_from", "", "", null, new MFromConfig(new ArrayList<MConfig>(), new ArrayList<MValidator>()), null);
|
||||
MConnector toConnector = new MConnector("connector_to", "", "", null, null, new MToConfig(new ArrayList<MConfig>(), new ArrayList<MValidator>()));
|
||||
MJob job = new MJob(1, 2, 1, 2, new MFromConfig(getConfig("fromJobConfig"), new ArrayList<MValidator>()),
|
||||
MJob job = new MJob("fromConnectorName", "toConnectorName", 1, 2, new MFromConfig(getConfig("fromJobConfig"), new ArrayList<MValidator>()),
|
||||
new MToConfig(getConfig("toJobConfig"), new ArrayList<MValidator>()),
|
||||
new MDriverConfig(getConfig("driverConfig"), new ArrayList<MValidator>()));
|
||||
when(client.createJob("link_from", "link_to")).thenReturn(job);
|
||||
when(client.getConnector(1)).thenReturn(fromConnector);
|
||||
when(client.getConnector(2)).thenReturn(toConnector);
|
||||
when(client.getConnector("fromConnectorName")).thenReturn(fromConnector);
|
||||
when(client.getConnector("toConnectorName")).thenReturn(toConnector);
|
||||
when(client.saveJob(any(MJob.class))).thenReturn(Status.OK);
|
||||
when(client.getConnectorConfigBundle(any(String.class))).thenReturn(resourceBundle);
|
||||
when(client.getDriverConfigBundle()).thenReturn(resourceBundle);
|
||||
|
@ -235,11 +235,11 @@ public void testShowLink() {
|
||||
public void testShowJob() {
|
||||
when(client.getJobs()).thenReturn(new ArrayList<MJob>());
|
||||
when(client.getConnector(any(Long.class))).thenReturn(new MConnector("", "", "", null, null, null));
|
||||
when(client.getJob("1")).thenReturn(new MJob(1L, 2L, 1L, 2L,
|
||||
when(client.getJob("1")).thenReturn(new MJob("fromConnectorName", "toConnectorName", 1L, 2L,
|
||||
new MFromConfig(new ArrayList<MConfig>(), new ArrayList<MValidator>()),
|
||||
new MToConfig(new ArrayList<MConfig>(), new ArrayList<MValidator>()),
|
||||
new MDriverConfig(new ArrayList<MConfig>(), new ArrayList<MValidator>())));
|
||||
when(client.getJobsByConnector("2")).thenReturn(Arrays.asList(new MJob(1L, 2L, 1L, 2L,
|
||||
when(client.getJobsByConnector("2")).thenReturn(Arrays.asList(new MJob("fromConnectorName", "toConnectorName", 1L, 2L,
|
||||
new MFromConfig(new ArrayList<MConfig>(), new ArrayList<MValidator>()),
|
||||
new MToConfig(new ArrayList<MConfig>(), new ArrayList<MValidator>()),
|
||||
new MDriverConfig(new ArrayList<MConfig>(), new ArrayList<MValidator>()))));
|
||||
|
@ -173,7 +173,7 @@ public void testUpdateLinkInteractive() throws Exception {
|
||||
@Test
|
||||
public void testUpdateJob() throws InterruptedException {
|
||||
ShellEnvironment.setInteractive(false);
|
||||
MJob job = new MJob(1L, 2L, 1L, 2L,
|
||||
MJob job = new MJob("fromConnectorName", "toConnectorName", 1L, 2L,
|
||||
new MFromConfig(new ArrayList<MConfig>(), new ArrayList<MValidator>()),
|
||||
new MToConfig(new ArrayList<MConfig>(), new ArrayList<MValidator>()),
|
||||
new MDriverConfig(new ArrayList<MConfig>(), new ArrayList<MValidator>()));
|
||||
@ -210,7 +210,7 @@ public void testUpdateJob() throws InterruptedException {
|
||||
public void testUpdateJobInteractive() {
|
||||
ShellEnvironment.setInteractive(true);
|
||||
initEnv();
|
||||
MJob job = new MJob(1, 2, 1, 2, new MFromConfig(getConfig("fromJobConfig"), new ArrayList<MValidator>()),
|
||||
MJob job = new MJob("fromConnectorName", "toConnectorName", 1, 2, new MFromConfig(getConfig("fromJobConfig"), new ArrayList<MValidator>()),
|
||||
new MToConfig(getConfig("toJobConfig"), new ArrayList<MValidator>()),
|
||||
new MDriverConfig(getConfig("driverConfig"), new ArrayList<MValidator>()));
|
||||
when(client.getJob("job_test")).thenReturn(job);
|
||||
|
@ -324,8 +324,8 @@ private long loadLink(MLink link) {
|
||||
private long loadJob(MJob job) {
|
||||
// starting by pretending we have a brand new job
|
||||
resetPersistenceId(job);
|
||||
MConnector mFromConnector = ConnectorManager.getInstance().getConnectorConfigurable(job.getFromConnectorId());
|
||||
MConnector mToConnector = ConnectorManager.getInstance().getConnectorConfigurable(job.getToConnectorId());
|
||||
MConnector mFromConnector = ConnectorManager.getInstance().getConnectorConfigurable(job.getFromConnectorName());
|
||||
MConnector mToConnector = ConnectorManager.getInstance().getConnectorConfigurable(job.getToConnectorName());
|
||||
|
||||
MFromConfig fromConfig = job.getFromJobConfig();
|
||||
MToConfig toConfig = job.getToJobConfig();
|
||||
@ -347,10 +347,10 @@ private long loadJob(MJob job) {
|
||||
// Transform config structures to objects for validations
|
||||
SqoopConnector fromConnector =
|
||||
ConnectorManager.getInstance().getSqoopConnector(
|
||||
job.getFromConnectorId());
|
||||
job.getFromConnectorName());
|
||||
SqoopConnector toConnector =
|
||||
ConnectorManager.getInstance().getSqoopConnector(
|
||||
job.getToConnectorId());
|
||||
job.getToConnectorName());
|
||||
|
||||
Object fromConnectorConfig = ClassUtils.instantiate(
|
||||
fromConnector.getJobConfigurationClass(Direction.FROM));
|
||||
|
Loading…
Reference in New Issue
Block a user