mirror of
https://github.com/apache/sqoop.git
synced 2025-05-17 01:11:07 +08:00
SQOOP-2265: Sqoop2: Standardize on methods
(Richard Zhou via Abraham Elmahrek)
This commit is contained in:
parent
391f9cfa76
commit
3a655ad37a
@ -685,10 +685,10 @@ private Status applyJobValidations(ValidationResultBean bean, MJob job) {
|
||||
ConfigValidationResult driver = bean.getValidationResults()[2];
|
||||
|
||||
ConfigUtils.applyValidation(
|
||||
job.getJobConfig(Direction.FROM).getConfigs(),
|
||||
job.getFromJobConfig().getConfigs(),
|
||||
fromConfig);
|
||||
ConfigUtils.applyValidation(
|
||||
job.getJobConfig(Direction.TO).getConfigs(),
|
||||
job.getToJobConfig().getConfigs(),
|
||||
toConfig);
|
||||
ConfigUtils.applyValidation(
|
||||
job.getDriverConfig().getConfigs(),
|
||||
|
@ -131,11 +131,11 @@ private JSONObject extractJob(boolean skipSensitive, MJob job) {
|
||||
object.put(UPDATE_DATE, job.getLastUpdateDate().getTime());
|
||||
// job link associated connectors
|
||||
// TODO(SQOOP-1634): fix not to require the connectorIds in the post data
|
||||
object.put(FROM_CONNECTOR_ID, job.getConnectorId(Direction.FROM));
|
||||
object.put(TO_CONNECTOR_ID, job.getConnectorId(Direction.TO));
|
||||
object.put(FROM_CONNECTOR_ID, job.getFromConnectorId());
|
||||
object.put(TO_CONNECTOR_ID, job.getToConnectorId());
|
||||
// job associated links
|
||||
object.put(FROM_LINK_ID, job.getLinkId(Direction.FROM));
|
||||
object.put(TO_LINK_ID, job.getLinkId(Direction.TO));
|
||||
object.put(FROM_LINK_ID, job.getFromLinkId());
|
||||
object.put(TO_LINK_ID, job.getToLinkId());
|
||||
// job configs
|
||||
MFromConfig fromConfigList = job.getFromJobConfig();
|
||||
object.put(FROM_CONFIG_VALUES,
|
||||
|
@ -70,11 +70,11 @@ public String toString() {
|
||||
sb.append(uniqueName).append(":").append(getPersistenceId()).append(":");
|
||||
sb.append(className);
|
||||
sb.append(", ").append(getLinkConfig().toString());
|
||||
if (getConfig(Direction.FROM) != null) {
|
||||
sb.append(", ").append(getConfig(Direction.FROM).toString());
|
||||
if (getFromConfig() != null) {
|
||||
sb.append(", ").append(getFromConfig().toString());
|
||||
}
|
||||
if (getConfig(Direction.TO) != null) {
|
||||
sb.append(", ").append(getConfig(Direction.TO).toString());
|
||||
if (getToConfig() != null) {
|
||||
sb.append(", ").append(getToConfig().toString());
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
@ -160,19 +160,6 @@ public MLinkConfig getLinkConfig() {
|
||||
return linkConfig;
|
||||
}
|
||||
|
||||
public MConfigList getConfig(Direction type) {
|
||||
switch (type) {
|
||||
case FROM:
|
||||
return fromConfig;
|
||||
|
||||
case TO:
|
||||
return toConfig;
|
||||
|
||||
default:
|
||||
throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
|
||||
}
|
||||
}
|
||||
|
||||
public MFromConfig getFromConfig() {
|
||||
return fromConfig;
|
||||
}
|
||||
@ -190,7 +177,7 @@ public MConfigurableType getType() {
|
||||
}
|
||||
|
||||
public SupportedDirections getSupportedDirections() {
|
||||
return new SupportedDirections(this.getConfig(Direction.FROM) != null,
|
||||
this.getConfig(Direction.TO) != null);
|
||||
return new SupportedDirections(this.getFromConfig() != null,
|
||||
this.getToConfig() != null);
|
||||
}
|
||||
}
|
||||
|
@ -97,10 +97,10 @@ public MJob(MJob other) {
|
||||
public MJob(MJob other, MFromConfig fromConfig, MToConfig toConfig, MDriverConfig driverConfig) {
|
||||
super(other);
|
||||
|
||||
this.fromConnectorId = other.getConnectorId(Direction.FROM);
|
||||
this.toConnectorId = other.getConnectorId(Direction.TO);
|
||||
this.fromLinkId = other.getLinkId(Direction.FROM);
|
||||
this.toLinkId = other.getLinkId(Direction.TO);
|
||||
this.fromConnectorId = other.getFromConnectorId();
|
||||
this.toConnectorId = other.getToConnectorId();
|
||||
this.fromLinkId = other.getFromLinkId();
|
||||
this.toLinkId = other.getToLinkId();
|
||||
this.fromConfig = fromConfig;
|
||||
this.toConfig = toConfig;
|
||||
this.driverConfig = driverConfig;
|
||||
@ -110,26 +110,13 @@ public MJob(MJob other, MFromConfig fromConfig, MToConfig toConfig, MDriverConfi
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder("job");
|
||||
sb.append("From job config: ").append(getJobConfig(Direction.FROM));
|
||||
sb.append(", To job config: ").append(getJobConfig(Direction.TO));
|
||||
sb.append("From job config: ").append(getFromJobConfig());
|
||||
sb.append(", To job config: ").append(getToJobConfig());
|
||||
sb.append(", Driver config: ").append(driverConfig);
|
||||
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
public long getLinkId(Direction type) {
|
||||
switch(type) {
|
||||
case FROM:
|
||||
return fromLinkId;
|
||||
|
||||
case TO:
|
||||
return toLinkId;
|
||||
|
||||
default:
|
||||
throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
|
||||
}
|
||||
}
|
||||
|
||||
public long getFromLinkId() {
|
||||
return fromLinkId;
|
||||
}
|
||||
@ -138,19 +125,6 @@ public long getToLinkId() {
|
||||
return toLinkId;
|
||||
}
|
||||
|
||||
public long getConnectorId(Direction type) {
|
||||
switch(type) {
|
||||
case FROM:
|
||||
return fromConnectorId;
|
||||
|
||||
case TO:
|
||||
return toConnectorId;
|
||||
|
||||
default:
|
||||
throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
|
||||
}
|
||||
}
|
||||
|
||||
public long getFromConnectorId() {
|
||||
return fromConnectorId;
|
||||
}
|
||||
@ -159,19 +133,6 @@ public long getToConnectorId() {
|
||||
return toConnectorId;
|
||||
}
|
||||
|
||||
public MConfigList getJobConfig(Direction type) {
|
||||
switch(type) {
|
||||
case FROM:
|
||||
return fromConfig;
|
||||
|
||||
case TO:
|
||||
return toConfig;
|
||||
|
||||
default:
|
||||
throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
|
||||
}
|
||||
}
|
||||
|
||||
public MFromConfig getFromJobConfig() {
|
||||
return fromConfig;
|
||||
}
|
||||
@ -190,10 +151,10 @@ public MJob clone(boolean cloneWithValue) {
|
||||
return new MJob(this);
|
||||
} else {
|
||||
return new MJob(
|
||||
getConnectorId(Direction.FROM),
|
||||
getConnectorId(Direction.TO),
|
||||
getLinkId(Direction.FROM),
|
||||
getLinkId(Direction.TO),
|
||||
getFromConnectorId(),
|
||||
getToConnectorId(),
|
||||
getFromLinkId(),
|
||||
getToLinkId(),
|
||||
getFromJobConfig().clone(false),
|
||||
getToJobConfig().clone(false),
|
||||
getDriverConfig().clone(false));
|
||||
@ -211,13 +172,13 @@ public boolean equals(Object object) {
|
||||
}
|
||||
|
||||
MJob job = (MJob)object;
|
||||
return (job.getConnectorId(Direction.FROM) == this.getConnectorId(Direction.FROM))
|
||||
&& (job.getConnectorId(Direction.TO) == this.getConnectorId(Direction.TO))
|
||||
&& (job.getLinkId(Direction.FROM) == this.getLinkId(Direction.FROM))
|
||||
&& (job.getLinkId(Direction.TO) == this.getLinkId(Direction.TO))
|
||||
return (job.getFromConnectorId() == this.getFromConnectorId())
|
||||
&& (job.getToConnectorId() == this.getToConnectorId())
|
||||
&& (job.getFromLinkId() == this.getFromLinkId())
|
||||
&& (job.getToLinkId() == this.getToLinkId())
|
||||
&& (job.getPersistenceId() == this.getPersistenceId())
|
||||
&& (job.getFromJobConfig().equals(this.getJobConfig(Direction.FROM)))
|
||||
&& (job.getToJobConfig().equals(this.getJobConfig(Direction.TO)))
|
||||
&& (job.getFromJobConfig().equals(this.getFromJobConfig()))
|
||||
&& (job.getToJobConfig().equals(this.getToJobConfig()))
|
||||
&& (job.getDriverConfig().equals(this.driverConfig));
|
||||
}
|
||||
}
|
||||
|
@ -38,10 +38,10 @@ public void testJobSerialization() throws ParseException {
|
||||
MJob job = BeanTestUtil.createJob("ahoj", "The big Job", 22L, created, updated);
|
||||
|
||||
// Fill some data at the beginning
|
||||
MStringInput input = (MStringInput) job.getJobConfig(Direction.FROM).getConfigs().get(0)
|
||||
MStringInput input = (MStringInput) job.getFromJobConfig().getConfigs().get(0)
|
||||
.getInputs().get(0);
|
||||
input.setValue("Hi there!");
|
||||
input = (MStringInput) job.getJobConfig(Direction.TO).getConfigs().get(0).getInputs().get(0);
|
||||
input = (MStringInput) job.getToJobConfig().getConfigs().get(0).getInputs().get(0);
|
||||
input.setValue("Hi there again!");
|
||||
|
||||
// Serialize it to JSON object
|
||||
@ -61,19 +61,19 @@ public void testJobSerialization() throws ParseException {
|
||||
assertEquals(22L, target.getPersistenceId());
|
||||
assertEquals("The big Job", target.getName());
|
||||
|
||||
assertEquals(target.getLinkId(Direction.FROM), 1);
|
||||
assertEquals(target.getLinkId(Direction.TO), 2);
|
||||
assertEquals(target.getConnectorId(Direction.FROM), 1);
|
||||
assertEquals(target.getConnectorId(Direction.TO), 2);
|
||||
assertEquals(target.getFromLinkId(), 1);
|
||||
assertEquals(target.getToLinkId(), 2);
|
||||
assertEquals(target.getFromConnectorId(), 1);
|
||||
assertEquals(target.getToConnectorId(), 2);
|
||||
assertEquals(created, target.getCreationDate());
|
||||
assertEquals(updated, target.getLastUpdateDate());
|
||||
assertEquals(false, target.getEnabled());
|
||||
|
||||
// Test that value was correctly moved
|
||||
MStringInput targetInput = (MStringInput) target.getJobConfig(Direction.FROM).getConfigs()
|
||||
MStringInput targetInput = (MStringInput) target.getFromJobConfig().getConfigs()
|
||||
.get(0).getInputs().get(0);
|
||||
assertEquals("Hi there!", targetInput.getValue());
|
||||
targetInput = (MStringInput) target.getJobConfig(Direction.TO).getConfigs().get(0).getInputs()
|
||||
targetInput = (MStringInput) target.getToJobConfig().getConfigs().get(0).getInputs()
|
||||
.get(0);
|
||||
assertEquals("Hi there again!", targetInput.getValue());
|
||||
}
|
||||
|
@ -45,10 +45,10 @@ public void testJobsSerialization() throws ParseException {
|
||||
jobs.add(job2);
|
||||
|
||||
// Fill some data at the beginning
|
||||
MStringInput input = (MStringInput) job1.getJobConfig(Direction.FROM).getConfigs().get(0)
|
||||
MStringInput input = (MStringInput) job1.getFromJobConfig().getConfigs().get(0)
|
||||
.getInputs().get(0);
|
||||
input.setValue("Hi there!");
|
||||
input = (MStringInput) job1.getJobConfig(Direction.TO).getConfigs().get(0).getInputs().get(0);
|
||||
input = (MStringInput) job1.getToJobConfig().getConfigs().get(0).getInputs().get(0);
|
||||
input.setValue("Hi there again!");
|
||||
|
||||
// Serialize it to JSON object
|
||||
@ -72,19 +72,19 @@ public void testJobsSerialization() throws ParseException {
|
||||
assertEquals(44L, retrievedJob2.getPersistenceId());
|
||||
assertEquals("The small Job", retrievedJob2.getName());
|
||||
|
||||
assertEquals(retrievedJob1.getLinkId(Direction.FROM), 1);
|
||||
assertEquals(retrievedJob1.getLinkId(Direction.TO), 2);
|
||||
assertEquals(retrievedJob1.getConnectorId(Direction.FROM), 1);
|
||||
assertEquals(retrievedJob1.getConnectorId(Direction.TO), 2);
|
||||
assertEquals(retrievedJob1.getFromLinkId(), 1);
|
||||
assertEquals(retrievedJob1.getToLinkId(), 2);
|
||||
assertEquals(retrievedJob1.getFromConnectorId(), 1);
|
||||
assertEquals(retrievedJob1.getToConnectorId(), 2);
|
||||
assertEquals(created, retrievedJob1.getCreationDate());
|
||||
assertEquals(updated, retrievedJob1.getLastUpdateDate());
|
||||
assertEquals(false, retrievedJob1.getEnabled());
|
||||
|
||||
// Test that value was correctly moved
|
||||
MStringInput targetInput = (MStringInput) retrievedJob1.getJobConfig(Direction.FROM)
|
||||
MStringInput targetInput = (MStringInput) retrievedJob1.getFromJobConfig()
|
||||
.getConfigs().get(0).getInputs().get(0);
|
||||
assertEquals("Hi there!", targetInput.getValue());
|
||||
targetInput = (MStringInput) retrievedJob1.getJobConfig(Direction.TO).getConfigs().get(0)
|
||||
targetInput = (MStringInput) retrievedJob1.getToJobConfig().getConfigs().get(0)
|
||||
.getInputs().get(0);
|
||||
assertEquals("Hi there again!", targetInput.getValue());
|
||||
}
|
||||
|
@ -113,11 +113,11 @@ public void testClone() {
|
||||
assertNull(clonedLinkConfig.getInputs().get(0).getValue());
|
||||
assertNull(clonedLinkConfig.getInputs().get(1).getValue());
|
||||
|
||||
MConfig clonedFromConfig = cloneConnector1.getConfig(Direction.FROM).getConfigs().get(0);
|
||||
MConfig clonedFromConfig = cloneConnector1.getFromConfig().getConfigs().get(0);
|
||||
assertNull(clonedFromConfig.getInputs().get(0).getValue());
|
||||
assertNull(clonedFromConfig.getInputs().get(1).getValue());
|
||||
|
||||
MConfig clonedToConfig = cloneConnector1.getConfig(Direction.TO).getConfigs().get(0);
|
||||
MConfig clonedToConfig = cloneConnector1.getToConfig().getConfigs().get(0);
|
||||
assertNull(clonedToConfig.getInputs().get(0).getValue());
|
||||
assertNull(clonedToConfig.getInputs().get(1).getValue());
|
||||
|
||||
@ -126,10 +126,10 @@ public void testClone() {
|
||||
clonedLinkConfig = clonedConnector2.getLinkConfig().getConfigs().get(0);
|
||||
assertNull(clonedLinkConfig.getInputs().get(0).getValue());
|
||||
assertNull(clonedLinkConfig.getInputs().get(1).getValue());
|
||||
clonedFromConfig = clonedConnector2.getConfig(Direction.FROM).getConfigs().get(0);
|
||||
clonedFromConfig = clonedConnector2.getFromConfig().getConfigs().get(0);
|
||||
assertNull(clonedFromConfig.getInputs().get(0).getValue());
|
||||
assertNull(clonedFromConfig.getInputs().get(1).getValue());
|
||||
clonedToConfig = clonedConnector2.getConfig(Direction.TO).getConfigs().get(0);
|
||||
clonedToConfig = clonedConnector2.getToConfig().getConfigs().get(0);
|
||||
assertNull(clonedToConfig.getInputs().get(0).getValue());
|
||||
assertNull(clonedToConfig.getInputs().get(1).getValue());
|
||||
}
|
||||
|
@ -35,32 +35,32 @@ public class TestMJob {
|
||||
public void testInitialization() {
|
||||
// Test default constructor
|
||||
MJob job = job();
|
||||
assertEquals(123l, job.getConnectorId(Direction.FROM));
|
||||
assertEquals(456l, job.getConnectorId(Direction.TO));
|
||||
assertEquals(123l, job.getFromConnectorId());
|
||||
assertEquals(456l, job.getToConnectorId());
|
||||
assertEquals("Buffy", job.getCreationUser());
|
||||
assertEquals("Vampire", job.getName());
|
||||
assertEquals(fromConfig(), job.getJobConfig(Direction.FROM));
|
||||
assertEquals(toConfig(), job.getJobConfig(Direction.TO));
|
||||
assertEquals(fromConfig(), job.getFromJobConfig());
|
||||
assertEquals(toConfig(), job.getToJobConfig());
|
||||
assertEquals(driverConfig(), job.getDriverConfig());
|
||||
|
||||
// Test copy constructor
|
||||
MJob copy = new MJob(job);
|
||||
assertEquals(123l, copy.getConnectorId(Direction.FROM));
|
||||
assertEquals(456l, copy.getConnectorId(Direction.TO));
|
||||
assertEquals(123l, copy.getFromConnectorId());
|
||||
assertEquals(456l, copy.getToConnectorId());
|
||||
assertEquals("Buffy", copy.getCreationUser());
|
||||
assertEquals("Vampire", copy.getName());
|
||||
assertEquals(fromConfig(), copy.getJobConfig(Direction.FROM));
|
||||
assertEquals(toConfig(), copy.getJobConfig(Direction.TO));
|
||||
assertEquals(fromConfig(), copy.getFromJobConfig());
|
||||
assertEquals(toConfig(), copy.getToJobConfig());
|
||||
assertEquals(driverConfig(), copy.getDriverConfig());
|
||||
|
||||
// Test constructor for metadata upgrade (the order of configs is different)
|
||||
MJob upgradeCopy = new MJob(job, fromConfig(), toConfig(), driverConfig());
|
||||
assertEquals(123l, upgradeCopy.getConnectorId(Direction.FROM));
|
||||
assertEquals(456l, upgradeCopy.getConnectorId(Direction.TO));
|
||||
assertEquals(123l, upgradeCopy.getFromConnectorId());
|
||||
assertEquals(456l, upgradeCopy.getToConnectorId());
|
||||
assertEquals("Buffy", upgradeCopy.getCreationUser());
|
||||
assertEquals("Vampire", upgradeCopy.getName());
|
||||
assertEquals(fromConfig(), upgradeCopy.getJobConfig(Direction.FROM));
|
||||
assertEquals(toConfig(), upgradeCopy.getJobConfig(Direction.TO));
|
||||
assertEquals(fromConfig(), upgradeCopy.getFromJobConfig());
|
||||
assertEquals(toConfig(), upgradeCopy.getToJobConfig());
|
||||
assertEquals(driverConfig(), upgradeCopy.getDriverConfig());
|
||||
}
|
||||
|
||||
@ -74,12 +74,12 @@ public void testClone() {
|
||||
assertEquals(MPersistableEntity.PERSISTANCE_ID_DEFAULT, withoutJobValue.getPersistenceId());
|
||||
assertNull(withoutJobValue.getName());
|
||||
assertNull(withoutJobValue.getCreationUser());
|
||||
assertEquals(fromConfig(), withoutJobValue.getJobConfig(Direction.FROM));
|
||||
assertEquals(toConfig(), withoutJobValue.getJobConfig(Direction.TO));
|
||||
assertEquals(fromConfig(), withoutJobValue.getFromJobConfig());
|
||||
assertEquals(toConfig(), withoutJobValue.getToJobConfig());
|
||||
assertEquals(driverConfig(), withoutJobValue.getDriverConfig());
|
||||
assertNull(withoutJobValue.getJobConfig(Direction.FROM)
|
||||
assertNull(withoutJobValue.getFromJobConfig()
|
||||
.getConfig("CONFIGFROMNAME").getInput("INTEGER-INPUT").getValue());
|
||||
assertNull(withoutJobValue.getJobConfig(Direction.FROM)
|
||||
assertNull(withoutJobValue.getFromJobConfig()
|
||||
.getConfig("CONFIGFROMNAME").getInput("STRING-INPUT").getValue());
|
||||
|
||||
// Clone with value
|
||||
@ -88,12 +88,12 @@ public void testClone() {
|
||||
assertEquals(job.getPersistenceId(), withJobValue.getPersistenceId());
|
||||
assertEquals(job.getName(), withJobValue.getName());
|
||||
assertEquals(job.getCreationUser(), withJobValue.getCreationUser());
|
||||
assertEquals(fromConfig(), withJobValue.getJobConfig(Direction.FROM));
|
||||
assertEquals(toConfig(), withJobValue.getJobConfig(Direction.TO));
|
||||
assertEquals(fromConfig(), withJobValue.getFromJobConfig());
|
||||
assertEquals(toConfig(), withJobValue.getToJobConfig());
|
||||
assertEquals(driverConfig(), withJobValue.getDriverConfig());
|
||||
assertEquals(100, withJobValue.getJobConfig(Direction.FROM)
|
||||
assertEquals(100, withJobValue.getFromJobConfig()
|
||||
.getConfig("CONFIGFROMNAME").getInput("INTEGER-INPUT").getValue());
|
||||
assertEquals("TEST-VALUE", withJobValue.getJobConfig(Direction.FROM)
|
||||
assertEquals("TEST-VALUE", withJobValue.getFromJobConfig()
|
||||
.getConfig("CONFIGFROMNAME").getInput("STRING-INPUT").getValue()); }
|
||||
|
||||
private MJob job() {
|
||||
|
@ -303,8 +303,8 @@ private JobRequest createJobRequest(long jobId, MSubmission submission) {
|
||||
MJob job = getJob(jobId);
|
||||
|
||||
// get from/to connections for the job
|
||||
MLink fromConnection = getLink(job.getLinkId(Direction.FROM));
|
||||
MLink toConnection = getLink(job.getLinkId(Direction.TO));
|
||||
MLink fromConnection = getLink(job.getFromLinkId());
|
||||
MLink toConnection = getLink(job.getToLinkId());
|
||||
|
||||
// get from/to connectors for the connection
|
||||
SqoopConnector fromConnector = getSqoopConnector(fromConnection.getConnectorId());
|
||||
@ -322,11 +322,11 @@ private JobRequest createJobRequest(long jobId, MSubmission submission) {
|
||||
|
||||
// from config for the job
|
||||
Object fromJob = ClassUtils.instantiate(fromConnector.getJobConfigurationClass(Direction.FROM));
|
||||
ConfigUtils.fromConfigs(job.getJobConfig(Direction.FROM).getConfigs(), fromJob);
|
||||
ConfigUtils.fromConfigs(job.getFromJobConfig().getConfigs(), fromJob);
|
||||
|
||||
// to config for the job
|
||||
Object toJob = ClassUtils.instantiate(toConnector.getJobConfigurationClass(Direction.TO));
|
||||
ConfigUtils.fromConfigs(job.getJobConfig(Direction.TO).getConfigs(), toJob);
|
||||
ConfigUtils.fromConfigs(job.getToJobConfig().getConfigs(), toJob);
|
||||
|
||||
// the only driver config for the job
|
||||
Object driverConfig = ClassUtils
|
||||
|
@ -664,8 +664,8 @@ public void createJob(MJob job, Connection conn) {
|
||||
try {
|
||||
stmt = conn.prepareStatement(crudQueries.getStmtInsertJob(), Statement.RETURN_GENERATED_KEYS);
|
||||
stmt.setString(1, job.getName());
|
||||
stmt.setLong(2, job.getLinkId(Direction.FROM));
|
||||
stmt.setLong(3, job.getLinkId(Direction.TO));
|
||||
stmt.setLong(2, job.getFromLinkId());
|
||||
stmt.setLong(3, job.getToLinkId());
|
||||
stmt.setBoolean(4, job.getEnabled());
|
||||
stmt.setString(5, job.getCreationUser());
|
||||
stmt.setTimestamp(6, new Timestamp(job.getCreationDate().getTime()));
|
||||
@ -689,12 +689,12 @@ public void createJob(MJob job, Connection conn) {
|
||||
// from config for the job
|
||||
createInputValues(crudQueries.getStmtInsertJobInput(),
|
||||
jobId,
|
||||
job.getJobConfig(Direction.FROM).getConfigs(),
|
||||
job.getFromJobConfig().getConfigs(),
|
||||
conn);
|
||||
// to config for the job
|
||||
createInputValues(crudQueries.getStmtInsertJobInput(),
|
||||
jobId,
|
||||
job.getJobConfig(Direction.TO).getConfigs(),
|
||||
job.getToJobConfig().getConfigs(),
|
||||
conn);
|
||||
// driver config per job
|
||||
createInputValues(crudQueries.getStmtInsertJobInput(),
|
||||
@ -737,11 +737,11 @@ public void updateJob(MJob job, Connection conn) {
|
||||
// And reinsert new values
|
||||
createInputValues(crudQueries.getStmtInsertJobInput(),
|
||||
job.getPersistenceId(),
|
||||
job.getJobConfig(Direction.FROM).getConfigs(),
|
||||
job.getFromJobConfig().getConfigs(),
|
||||
conn);
|
||||
createInputValues(crudQueries.getStmtInsertJobInput(),
|
||||
job.getPersistenceId(),
|
||||
job.getJobConfig(Direction.TO).getConfigs(),
|
||||
job.getToJobConfig().getConfigs(),
|
||||
conn);
|
||||
createInputValues(crudQueries.getStmtInsertJobInput(),
|
||||
job.getPersistenceId(),
|
||||
|
@ -932,11 +932,11 @@ protected void fillLink(MLink link) {
|
||||
}
|
||||
|
||||
protected void fillJob(MJob job) {
|
||||
List<MConfig> configs = job.getJobConfig(Direction.FROM).getConfigs();
|
||||
List<MConfig> configs = job.getFromJobConfig().getConfigs();
|
||||
((MStringInput) configs.get(0).getInputs().get(0)).setValue("Value1");
|
||||
((MStringInput) configs.get(1).getInputs().get(0)).setValue("Value2");
|
||||
|
||||
configs = job.getJobConfig(Direction.TO).getConfigs();
|
||||
configs = job.getToJobConfig().getConfigs();
|
||||
((MStringInput) configs.get(0).getInputs().get(0)).setValue("Value1");
|
||||
((MStringInput) configs.get(1).getInputs().get(0)).setValue("Value2");
|
||||
|
||||
|
@ -82,14 +82,14 @@ public void testFindJob() throws Exception {
|
||||
|
||||
List<MConfig> configs;
|
||||
|
||||
configs = firstJob.getJobConfig(Direction.FROM).getConfigs();
|
||||
configs = firstJob.getFromJobConfig().getConfigs();
|
||||
assertEquals(2, configs.size());
|
||||
assertEquals("Value5", configs.get(0).getInputs().get(0).getValue());
|
||||
assertNull(configs.get(0).getInputs().get(1).getValue());
|
||||
assertEquals("Value5", configs.get(0).getInputs().get(0).getValue());
|
||||
assertNull(configs.get(1).getInputs().get(1).getValue());
|
||||
|
||||
configs = firstJob.getJobConfig(Direction.TO).getConfigs();
|
||||
configs = firstJob.getToJobConfig().getConfigs();
|
||||
assertEquals(2, configs.size());
|
||||
assertEquals("Value9", configs.get(0).getInputs().get(0).getValue());
|
||||
assertNull(configs.get(0).getInputs().get(1).getValue());
|
||||
@ -198,10 +198,10 @@ public void testCreateJob() throws Exception {
|
||||
assertEquals(1, retrieved.getPersistenceId());
|
||||
|
||||
List<MConfig> configs;
|
||||
configs = job.getJobConfig(Direction.FROM).getConfigs();
|
||||
configs = job.getFromJobConfig().getConfigs();
|
||||
assertEquals("Value1", configs.get(0).getInputs().get(0).getValue());
|
||||
assertNull(configs.get(0).getInputs().get(1).getValue());
|
||||
configs = job.getJobConfig(Direction.TO).getConfigs();
|
||||
configs = job.getToJobConfig().getConfigs();
|
||||
assertEquals("Value1", configs.get(0).getInputs().get(0).getValue());
|
||||
assertNull(configs.get(0).getInputs().get(1).getValue());
|
||||
|
||||
@ -246,7 +246,7 @@ public void testUpdateJob() throws Exception {
|
||||
|
||||
List<MConfig> configs;
|
||||
|
||||
configs = job.getJobConfig(Direction.FROM).getConfigs();
|
||||
configs = job.getFromJobConfig().getConfigs();
|
||||
((MStringInput)configs.get(0).getInputs().get(0)).setValue("Updated");
|
||||
Map<String, String> newFromMap = new HashMap<String, String>();
|
||||
newFromMap.put("1F", "foo");
|
||||
@ -254,7 +254,7 @@ public void testUpdateJob() throws Exception {
|
||||
|
||||
((MMapInput)configs.get(0).getInputs().get(1)).setValue(newFromMap);
|
||||
|
||||
configs = job.getJobConfig(Direction.TO).getConfigs();
|
||||
configs = job.getToJobConfig().getConfigs();
|
||||
((MStringInput)configs.get(0).getInputs().get(0)).setValue("Updated");
|
||||
Map<String, String> newToMap = new HashMap<String, String>();
|
||||
newToMap.put("1T", "foo");
|
||||
@ -279,11 +279,11 @@ public void testUpdateJob() throws Exception {
|
||||
MJob retrieved = handler.findJob(1, derbyConnection);
|
||||
assertEquals("name", retrieved.getName());
|
||||
|
||||
configs = job.getJobConfig(Direction.FROM).getConfigs();
|
||||
configs = job.getFromJobConfig().getConfigs();
|
||||
assertEquals(2, configs.size());
|
||||
assertEquals("Updated", configs.get(0).getInputs().get(0).getValue());
|
||||
assertEquals(newFromMap, configs.get(0).getInputs().get(1).getValue());
|
||||
configs = job.getJobConfig(Direction.TO).getConfigs();
|
||||
configs = job.getToJobConfig().getConfigs();
|
||||
assertEquals(2, configs.size());
|
||||
assertEquals("Updated", configs.get(0).getInputs().get(0).getValue());
|
||||
assertEquals(newToMap, configs.get(0).getInputs().get(1).getValue());
|
||||
@ -343,7 +343,7 @@ public void testUpdateJobConfig() throws Exception {
|
||||
assertCountForTable("SQOOP.SQ_JOB_INPUT", 24);
|
||||
MJob job = handler.findJob(1, derbyConnection);
|
||||
|
||||
List<MConfig> fromConfigs = job.getJobConfig(Direction.FROM).getConfigs();
|
||||
List<MConfig> fromConfigs = job.getFromJobConfig().getConfigs();
|
||||
MConfig fromConfig = fromConfigs.get(0).clone(false);
|
||||
MConfig newFromConfig = new MConfig(fromConfig.getName(), fromConfig.getInputs());
|
||||
|
||||
@ -358,7 +358,7 @@ public void testUpdateJobConfig() throws Exception {
|
||||
MConfig updatedFromConfig = newFromConfigs.getConfigs().get(0);
|
||||
assertEquals("FromJobConfigUpdated", updatedFromConfig.getInputs().get(0).getValue());
|
||||
|
||||
List<MConfig> toConfigs = job.getJobConfig(Direction.TO).getConfigs();
|
||||
List<MConfig> toConfigs = job.getToJobConfig().getConfigs();
|
||||
MConfig toConfig = toConfigs.get(0).clone(false);
|
||||
MConfig newToConfig = new MConfig(toConfig.getName(), toConfig.getInputs());
|
||||
|
||||
@ -382,7 +382,7 @@ public void testIncorrectEntityCausingConfigUpdate() throws Exception {
|
||||
assertCountForTable("SQOOP.SQ_JOB_INPUT", 24);
|
||||
MJob job = handler.findJob(1, derbyConnection);
|
||||
|
||||
List<MConfig> fromConfigs = job.getJobConfig(Direction.FROM).getConfigs();
|
||||
List<MConfig> fromConfigs = job.getFromJobConfig().getConfigs();
|
||||
MConfig fromConfig = fromConfigs.get(0).clone(false);
|
||||
MConfig newFromConfig = new MConfig(fromConfig.getName(), fromConfig.getInputs());
|
||||
HashMap<String, String> newMap = new HashMap<String, String>();
|
||||
|
@ -109,11 +109,11 @@ protected void fillLink(MLink link) {
|
||||
}
|
||||
|
||||
protected void fillJob(MJob job) {
|
||||
List<MConfig> configs = job.getJobConfig(Direction.FROM).getConfigs();
|
||||
List<MConfig> configs = job.getFromJobConfig().getConfigs();
|
||||
((MStringInput) configs.get(0).getInputs().get(0)).setValue("Value1");
|
||||
((MStringInput) configs.get(1).getInputs().get(0)).setValue("Value2");
|
||||
|
||||
configs = job.getJobConfig(Direction.TO).getConfigs();
|
||||
configs = job.getToJobConfig().getConfigs();
|
||||
((MStringInput) configs.get(0).getInputs().get(0)).setValue("Value1");
|
||||
((MStringInput) configs.get(1).getInputs().get(0)).setValue("Value2");
|
||||
|
||||
|
@ -95,14 +95,14 @@ public void testFindJobSuccess() throws Exception {
|
||||
|
||||
List<MConfig> configs;
|
||||
|
||||
configs = firstJob.getJobConfig(Direction.FROM).getConfigs();
|
||||
configs = firstJob.getFromJobConfig().getConfigs();
|
||||
assertEquals(2, configs.size());
|
||||
assertEquals("Value1", configs.get(0).getInputs().get(0).getValue());
|
||||
assertNull(configs.get(0).getInputs().get(1).getValue());
|
||||
assertEquals("Value2", configs.get(1).getInputs().get(0).getValue());
|
||||
assertNull(configs.get(1).getInputs().get(1).getValue());
|
||||
|
||||
configs = firstJob.getJobConfig(Direction.TO).getConfigs();
|
||||
configs = firstJob.getToJobConfig().getConfigs();
|
||||
assertEquals(2, configs.size());
|
||||
assertEquals("Value1", configs.get(0).getInputs().get(0).getValue());
|
||||
assertNull(configs.get(0).getInputs().get(1).getValue());
|
||||
@ -188,10 +188,10 @@ public void testCreateJob() throws Exception {
|
||||
assertEquals(1, retrieved.getPersistenceId());
|
||||
|
||||
List<MConfig> configs;
|
||||
configs = retrieved.getJobConfig(Direction.FROM).getConfigs();
|
||||
configs = retrieved.getFromJobConfig().getConfigs();
|
||||
assertEquals("Value1", configs.get(0).getInputs().get(0).getValue());
|
||||
assertNull(configs.get(0).getInputs().get(1).getValue());
|
||||
configs = retrieved.getJobConfig(Direction.TO).getConfigs();
|
||||
configs = retrieved.getToJobConfig().getConfigs();
|
||||
assertEquals("Value2", configs.get(1).getInputs().get(0).getValue());
|
||||
assertNull(configs.get(0).getInputs().get(1).getValue());
|
||||
|
||||
@ -219,11 +219,11 @@ public void testUpdateJob() throws Exception {
|
||||
|
||||
List<MConfig> configs;
|
||||
|
||||
configs = job.getJobConfig(Direction.FROM).getConfigs();
|
||||
configs = job.getFromJobConfig().getConfigs();
|
||||
((MStringInput)configs.get(0).getInputs().get(0)).setValue("Updated");
|
||||
((MMapInput)configs.get(0).getInputs().get(1)).setValue(null);
|
||||
|
||||
configs = job.getJobConfig(Direction.TO).getConfigs();
|
||||
configs = job.getToJobConfig().getConfigs();
|
||||
((MStringInput)configs.get(0).getInputs().get(0)).setValue("Updated");
|
||||
((MMapInput)configs.get(0).getInputs().get(1)).setValue(null);
|
||||
|
||||
@ -244,11 +244,11 @@ public void testUpdateJob() throws Exception {
|
||||
MJob retrieved = handler.findJob(1, provider.getConnection());
|
||||
assertEquals("name", retrieved.getName());
|
||||
|
||||
configs = job.getJobConfig(Direction.FROM).getConfigs();
|
||||
configs = job.getFromJobConfig().getConfigs();
|
||||
assertEquals(2, configs.size());
|
||||
assertEquals("Updated", configs.get(0).getInputs().get(0).getValue());
|
||||
assertNull(configs.get(0).getInputs().get(1).getValue());
|
||||
configs = job.getJobConfig(Direction.TO).getConfigs();
|
||||
configs = job.getToJobConfig().getConfigs();
|
||||
assertEquals(2, configs.size());
|
||||
assertEquals("Updated", configs.get(0).getInputs().get(0).getValue());
|
||||
assertNull(configs.get(0).getInputs().get(1).getValue());
|
||||
|
@ -195,14 +195,14 @@ private JsonBean createUpdateJob(RequestContext ctx, boolean create) {
|
||||
|
||||
// Verify that user is not trying to spoof us
|
||||
MFromConfig fromConfig = ConnectorManager.getInstance()
|
||||
.getConnectorConfigurable(postedJob.getConnectorId(Direction.FROM)).getFromConfig();
|
||||
.getConnectorConfigurable(postedJob.getFromConnectorId()).getFromConfig();
|
||||
MToConfig toConfig = ConnectorManager.getInstance()
|
||||
.getConnectorConfigurable(postedJob.getConnectorId(Direction.TO)).getToConfig();
|
||||
.getConnectorConfigurable(postedJob.getToConnectorId()).getToConfig();
|
||||
MDriverConfig driverConfig = Driver.getInstance().getDriver().getDriverConfig();
|
||||
|
||||
if (!fromConfig.equals(postedJob.getJobConfig(Direction.FROM))
|
||||
if (!fromConfig.equals(postedJob.getFromJobConfig())
|
||||
|| !driverConfig.equals(postedJob.getDriverConfig())
|
||||
|| !toConfig.equals(postedJob.getJobConfig(Direction.TO))) {
|
||||
|| !toConfig.equals(postedJob.getToJobConfig())) {
|
||||
throw new SqoopException(ServerError.SERVER_0003, "Detected incorrect config structure");
|
||||
}
|
||||
|
||||
@ -219,9 +219,9 @@ private JsonBean createUpdateJob(RequestContext ctx, boolean create) {
|
||||
|
||||
// Corresponding connectors for this
|
||||
SqoopConnector fromConnector = ConnectorManager.getInstance().getSqoopConnector(
|
||||
postedJob.getConnectorId(Direction.FROM));
|
||||
postedJob.getFromConnectorId());
|
||||
SqoopConnector toConnector = ConnectorManager.getInstance().getSqoopConnector(
|
||||
postedJob.getConnectorId(Direction.TO));
|
||||
postedJob.getToConnectorId());
|
||||
|
||||
if (!fromConnector.getSupportedDirections().contains(Direction.FROM)) {
|
||||
throw new SqoopException(ServerError.SERVER_0004, "Connector "
|
||||
@ -235,10 +235,10 @@ private JsonBean createUpdateJob(RequestContext ctx, boolean create) {
|
||||
|
||||
// Validate user supplied data
|
||||
ConfigValidationResult fromConfigValidator = ConfigUtils.validateConfigs(
|
||||
postedJob.getJobConfig(Direction.FROM).getConfigs(),
|
||||
postedJob.getFromJobConfig().getConfigs(),
|
||||
fromConnector.getJobConfigurationClass(Direction.FROM));
|
||||
ConfigValidationResult toConfigValidator = ConfigUtils.validateConfigs(
|
||||
postedJob.getJobConfig(Direction.TO).getConfigs(),
|
||||
postedJob.getToJobConfig().getConfigs(),
|
||||
toConnector.getJobConfigurationClass(Direction.TO));
|
||||
ConfigValidationResult driverConfigValidator = ConfigUtils.validateConfigs(postedJob
|
||||
.getDriverConfig().getConfigs(), Driver.getInstance().getDriverJobConfigurationClass());
|
||||
@ -331,8 +331,8 @@ private JobsBean createJobsBean(List<MJob> jobs, Locale locale) {
|
||||
private void addJob(List<MJob> jobs, Locale locale, JobBean bean) {
|
||||
// Add associated resources into the bean
|
||||
for (MJob job : jobs) {
|
||||
long fromConnectorId = job.getConnectorId(Direction.FROM);
|
||||
long toConnectorId = job.getConnectorId(Direction.TO);
|
||||
long fromConnectorId = job.getFromConnectorId();
|
||||
long toConnectorId = job.getToConnectorId();
|
||||
// replace it only if it does not already exist
|
||||
if (!bean.hasConnectorConfigBundle(fromConnectorId)) {
|
||||
bean.addConnectorConfigBundle(fromConnectorId, ConnectorManager.getInstance()
|
||||
|
@ -64,9 +64,9 @@ private Status cloneJob(Long jobId, List<String> args, boolean isInteractive) th
|
||||
job.setPersistenceId(MPersistableEntity.PERSISTANCE_ID_DEFAULT);
|
||||
|
||||
ResourceBundle fromConnectorBundle = client.getConnectorConfigBundle(
|
||||
job.getConnectorId(Direction.FROM));
|
||||
job.getFromConnectorId());
|
||||
ResourceBundle toConnectorBundle = client.getConnectorConfigBundle(
|
||||
job.getConnectorId(Direction.TO));
|
||||
job.getToConnectorId());
|
||||
ResourceBundle driverConfigBundle = client.getDriverConfigBundle();
|
||||
|
||||
Status status = Status.OK;
|
||||
|
@ -74,9 +74,9 @@ private Status createJob(Long fromLinkId, Long toLinkId, List<String> args, bool
|
||||
MJob job = client.createJob(fromLinkId, toLinkId);
|
||||
|
||||
ResourceBundle fromConfigBundle = client.getConnectorConfigBundle(
|
||||
job.getConnectorId(Direction.FROM));
|
||||
job.getFromConnectorId());
|
||||
ResourceBundle toConfigBundle = client.getConnectorConfigBundle(
|
||||
job.getConnectorId(Direction.TO));
|
||||
job.getToConnectorId());
|
||||
ResourceBundle driverConfigBundle = client.getDriverConfigBundle();
|
||||
|
||||
Status status = Status.OK;
|
||||
|
@ -87,9 +87,9 @@ private void showSummary() {
|
||||
ids.add(String.valueOf(job.getPersistenceId()));
|
||||
names.add(job.getName());
|
||||
fromConnectors.add(String.valueOf(
|
||||
job.getConnectorId(Direction.FROM)));
|
||||
job.getFromConnectorId()));
|
||||
toConnectors.add(String.valueOf(
|
||||
job.getConnectorId(Direction.TO)));
|
||||
job.getToConnectorId()));
|
||||
availabilities.add(String.valueOf(job.getEnabled()));
|
||||
}
|
||||
|
||||
@ -131,14 +131,14 @@ private void displayJob(MJob job) {
|
||||
formatter.format(job.getLastUpdateDate())
|
||||
);
|
||||
printlnResource(Constants.RES_SHOW_PROMPT_JOB_LID_CID_INFO,
|
||||
job.getLinkId(Direction.FROM),
|
||||
job.getConnectorId(Direction.FROM));
|
||||
job.getFromLinkId(),
|
||||
job.getFromConnectorId());
|
||||
|
||||
displayConfig(job.getJobConfig(Direction.FROM).getConfigs(),
|
||||
client.getConnectorConfigBundle(job.getConnectorId(Direction.FROM)));
|
||||
displayConfig(job.getFromJobConfig().getConfigs(),
|
||||
client.getConnectorConfigBundle(job.getFromConnectorId()));
|
||||
displayConfig(job.getDriverConfig().getConfigs(),
|
||||
client.getDriverConfigBundle());
|
||||
displayConfig(job.getJobConfig(Direction.TO).getConfigs(),
|
||||
client.getConnectorConfigBundle(job.getConnectorId(Direction.TO)));
|
||||
displayConfig(job.getToJobConfig().getConfigs(),
|
||||
client.getConnectorConfigBundle(job.getToConnectorId()));
|
||||
}
|
||||
}
|
||||
|
@ -65,9 +65,9 @@ private Status updateJob(Long jobId, List<String> args, boolean isInteractive) t
|
||||
MJob job = client.getJob(jobId);
|
||||
|
||||
ResourceBundle fromConnectorBundle = client.getConnectorConfigBundle(
|
||||
job.getConnectorId(Direction.FROM));
|
||||
job.getFromConnectorId());
|
||||
ResourceBundle toConnectorBundle = client.getConnectorConfigBundle(
|
||||
job.getConnectorId(Direction.TO));
|
||||
job.getToConnectorId());
|
||||
ResourceBundle driverConfigBundle = client.getDriverConfigBundle();
|
||||
|
||||
Status status = Status.OK;
|
||||
|
@ -63,12 +63,15 @@ public static void displayConnectorConfigDetails(MConnector connector, ResourceB
|
||||
resourceString(Constants.RES_CONFIG_DISPLAYER_LINK),
|
||||
bundle);
|
||||
|
||||
for (Direction direction : new Direction[] {Direction.FROM, Direction.TO}) {
|
||||
for (Direction direction : new Direction[]{Direction.FROM, Direction.TO}) {
|
||||
if (connector.getSupportedDirections().isDirectionSupported(direction)) {
|
||||
List<MConfig> configs = direction.equals(Direction.FROM)
|
||||
? connector.getFromConfig().getConfigs()
|
||||
: connector.getToConfig().getConfigs();
|
||||
displayConfig(
|
||||
connector.getConfig(direction).getConfigs(),
|
||||
direction.toString() + " " + resourceString(Constants.RES_CONFIG_DISPLAYER_JOB),
|
||||
bundle);
|
||||
configs,
|
||||
direction.toString() + " " + resourceString(Constants.RES_CONFIG_DISPLAYER_JOB),
|
||||
bundle);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -150,9 +153,9 @@ public static void displayConfigWarning(MAccountableEntity entity) {
|
||||
configList.addAll(link.getConnectorLinkConfig().getConfigs());
|
||||
} else if(entity instanceof MJob) {
|
||||
MJob job = (MJob) entity;
|
||||
configList.addAll(job.getJobConfig(Direction.FROM).getConfigs());
|
||||
configList.addAll(job.getFromJobConfig().getConfigs());
|
||||
configList.addAll(job.getDriverConfig().getConfigs());
|
||||
configList.addAll(job.getJobConfig(Direction.TO).getConfigs());
|
||||
configList.addAll(job.getToJobConfig().getConfigs());
|
||||
}
|
||||
for(MConfig config : configList) {
|
||||
if(config.getValidationStatus() == Status.WARNING) {
|
||||
|
@ -71,8 +71,8 @@ public static boolean fillJob(CommandLine line,
|
||||
|
||||
job.setName(line.getOptionValue("name"));
|
||||
return fillJobConfig(line,
|
||||
job.getJobConfig(Direction.FROM).getConfigs(),
|
||||
job.getJobConfig(Direction.TO).getConfigs(),
|
||||
job.getFromJobConfig().getConfigs(),
|
||||
job.getToJobConfig().getConfigs(),
|
||||
job.getDriverConfig().getConfigs());
|
||||
}
|
||||
|
||||
@ -404,9 +404,9 @@ public static boolean fillJobWithBundle(ConsoleReader reader,
|
||||
job.setName(getName(reader, job.getName()));
|
||||
|
||||
return fillJobConfigWithBundle(reader,
|
||||
job.getJobConfig(Direction.FROM).getConfigs(),
|
||||
job.getFromJobConfig().getConfigs(),
|
||||
fromConfigBundle,
|
||||
job.getJobConfig(Direction.TO).getConfigs(),
|
||||
job.getToJobConfig().getConfigs(),
|
||||
toConfigBundle,
|
||||
job.getDriverConfig().getConfigs(),
|
||||
driverConfigBundle);
|
||||
@ -954,13 +954,13 @@ public static void printLinkValidationMessages(MLink link) {
|
||||
|
||||
// job has the from/to and the driver config
|
||||
public static void printJobValidationMessages(MJob job) {
|
||||
for (MConfig config : job.getJobConfig(Direction.FROM).getConfigs()) {
|
||||
for (MConfig config : job.getFromJobConfig().getConfigs()) {
|
||||
for (MInput<?> input : config.getInputs()) {
|
||||
printValidationMessage(input, true);
|
||||
}
|
||||
}
|
||||
|
||||
for (MConfig config : job.getJobConfig(Direction.TO).getConfigs()) {
|
||||
for (MConfig config : job.getToJobConfig().getConfigs()) {
|
||||
for (MInput<?> input : config.getInputs()) {
|
||||
printValidationMessage(input, true);
|
||||
}
|
||||
|
@ -35,13 +35,13 @@ public void prepareOptions(MJob job) {
|
||||
.withLongOpt("name")
|
||||
.hasArg()
|
||||
.create());
|
||||
for (Option option : ConfigOptions.getConfigsOptions("from", job.getJobConfig(Direction.FROM).getConfigs())) {
|
||||
for (Option option : ConfigOptions.getConfigsOptions("from", job.getFromJobConfig().getConfigs())) {
|
||||
this.addOption(option);
|
||||
}
|
||||
for (Option option : ConfigOptions.getConfigsOptions("driver", job.getDriverConfig().getConfigs())) {
|
||||
this.addOption(option);
|
||||
}
|
||||
for (Option option : ConfigOptions.getConfigsOptions("to", job.getJobConfig(Direction.TO).getConfigs())) {
|
||||
for (Option option : ConfigOptions.getConfigsOptions("to", job.getToJobConfig().getConfigs())) {
|
||||
this.addOption(option);
|
||||
}
|
||||
}
|
||||
|
@ -146,13 +146,13 @@ protected void fillRdbmsLinkConfig(MLink link) {
|
||||
}
|
||||
|
||||
protected void fillRdbmsFromConfig(MJob job, String partitionColumn) {
|
||||
MConfigList fromConfig = job.getJobConfig(Direction.FROM);
|
||||
MConfigList fromConfig = job.getFromJobConfig();
|
||||
fromConfig.getStringInput("fromJobConfig.tableName").setValue(provider.escapeTableName(getTableName().getTableName()));
|
||||
fromConfig.getStringInput("fromJobConfig.partitionColumn").setValue(provider.escapeColumnName(partitionColumn));
|
||||
}
|
||||
|
||||
protected void fillRdbmsToConfig(MJob job) {
|
||||
MConfigList toConfig = job.getJobConfig(Direction.TO);
|
||||
MConfigList toConfig = job.getToJobConfig();
|
||||
toConfig.getStringInput("toJobConfig.tableName").setValue(provider.escapeTableName(getTableName().getTableName()));
|
||||
}
|
||||
|
||||
@ -168,7 +168,7 @@ protected void fillHdfsLink(MLink link) {
|
||||
* @param output Output type that should be set
|
||||
*/
|
||||
protected void fillHdfsToConfig(MJob job, ToFormat output) {
|
||||
MConfigList toConfig = job.getJobConfig(Direction.TO);
|
||||
MConfigList toConfig = job.getToJobConfig();
|
||||
toConfig.getEnumInput("toJobConfig.outputFormat").setValue(output);
|
||||
toConfig.getStringInput("toJobConfig.outputDirectory").setValue(getMapreduceDirectory());
|
||||
}
|
||||
@ -179,7 +179,7 @@ protected void fillHdfsToConfig(MJob job, ToFormat output) {
|
||||
* @param job MJob object to fill
|
||||
*/
|
||||
protected void fillHdfsFromConfig(MJob job) {
|
||||
MConfigList fromConfig = job.getJobConfig(Direction.FROM);
|
||||
MConfigList fromConfig = job.getFromJobConfig();
|
||||
fromConfig.getStringInput("fromJobConfig.inputDirectory").setValue(getMapreduceDirectory());
|
||||
}
|
||||
|
||||
|
@ -60,7 +60,7 @@ protected void fillKafkaLinkConfig(MLink link) {
|
||||
}
|
||||
|
||||
protected void fillKafkaToConfig(MJob job){
|
||||
MConfigList toConfig = job.getJobConfig(Direction.TO);
|
||||
MConfigList toConfig = job.getToJobConfig();
|
||||
toConfig.getStringInput("toJobConfig.topic").setValue(TOPIC);
|
||||
List<String> topics = new ArrayList<String>(1);
|
||||
topics.add(TOPIC);
|
||||
|
@ -51,7 +51,7 @@ public void test() throws Exception {
|
||||
|
||||
// Fill the hdfs "TO" config
|
||||
fillHdfsToConfig(job, ToFormat.TEXT_FILE);
|
||||
MConfigList toConfig = job.getJobConfig(Direction.TO);
|
||||
MConfigList toConfig = job.getToJobConfig();
|
||||
toConfig.getBooleanInput("toJobConfig.appendMode").setValue(true);
|
||||
|
||||
|
||||
|
@ -67,13 +67,13 @@ public void testCities() throws Exception {
|
||||
MJob job = getClient().createJob(rdbmsLink.getPersistenceId(), kiteLink.getPersistenceId());
|
||||
|
||||
// Set rdbms "FROM" config
|
||||
MConfigList fromConfig = job.getJobConfig(Direction.FROM);
|
||||
MConfigList fromConfig = job.getFromJobConfig();
|
||||
fillRdbmsFromConfig(job, "id");
|
||||
// TODO: Kite have troubles with some data types, so we're limiting the columns to int only
|
||||
fromConfig.getStringInput("fromJobConfig.columns").setValue(provider.escapeColumnName("id"));
|
||||
|
||||
// Fill the Kite "TO" config
|
||||
MConfigList toConfig = job.getJobConfig(Direction.TO);
|
||||
MConfigList toConfig = job.getToJobConfig();
|
||||
toConfig.getStringInput("toJobConfig.uri").setValue("dataset:hive:testtable");
|
||||
toConfig.getEnumInput("toJobConfig.fileFormat").setValue(FileFormat.AVRO);
|
||||
|
||||
|
@ -85,7 +85,7 @@ public void testFrom() throws Exception {
|
||||
|
||||
// Fill rdbms "FROM" config
|
||||
fillRdbmsFromConfig(job, "id");
|
||||
MConfigList fromConfig = job.getJobConfig(Direction.FROM);
|
||||
MConfigList fromConfig = job.getFromJobConfig();
|
||||
fromConfig.getStringInput("fromJobConfig.columns").setValue(provider.escapeColumnName("value"));
|
||||
|
||||
// Fill the hdfs "TO" config
|
||||
|
@ -95,7 +95,7 @@ public void testStories() throws Exception {
|
||||
|
||||
// Connector values
|
||||
fillRdbmsFromConfig(job, "id");
|
||||
MConfigList configs = job.getJobConfig(Direction.FROM);
|
||||
MConfigList configs = job.getFromJobConfig();
|
||||
configs.getStringInput("fromJobConfig.columns").setValue(provider.escapeColumnName("id") + "," + provider.escapeColumnName("name") + "," + provider.escapeColumnName("story"));
|
||||
fillHdfsToConfig(job, ToFormat.TEXT_FILE);
|
||||
saveJob(job);
|
||||
@ -140,7 +140,7 @@ public void testColumns() throws Exception {
|
||||
|
||||
// Connector values
|
||||
fillRdbmsFromConfig(job, "id");
|
||||
MConfigList configs = job.getJobConfig(Direction.FROM);
|
||||
MConfigList configs = job.getFromJobConfig();
|
||||
configs.getStringInput("fromJobConfig.columns").setValue(provider.escapeColumnName("id") + "," + provider.escapeColumnName("country"));
|
||||
fillHdfsToConfig(job, ToFormat.TEXT_FILE);
|
||||
saveJob(job);
|
||||
@ -185,7 +185,7 @@ public void testSql() throws Exception {
|
||||
MJob job = getClient().createJob(rdbmsLink.getPersistenceId(), hdfsLink.getPersistenceId());
|
||||
|
||||
// Connector values
|
||||
MConfigList configs = job.getJobConfig(Direction.FROM);
|
||||
MConfigList configs = job.getFromJobConfig();
|
||||
configs.getStringInput("fromJobConfig.sql").setValue("SELECT " + provider.escapeColumnName("id")
|
||||
+ " FROM " + provider.escapeTableName(getTableName().getTableName()) + " WHERE ${CONDITIONS}");
|
||||
configs.getStringInput("fromJobConfig.partitionColumn").setValue(provider.escapeColumnName("id"));
|
||||
@ -233,7 +233,7 @@ public void testDuplicateColumns() throws Exception {
|
||||
|
||||
// Connector values
|
||||
String partitionColumn = provider.escapeTableName(getTableName().getTableName()) + "." + provider.escapeColumnName("id");
|
||||
MConfigList configs = job.getJobConfig(Direction.FROM);
|
||||
MConfigList configs = job.getFromJobConfig();
|
||||
configs.getStringInput("fromJobConfig.sql").setValue(
|
||||
"SELECT " + provider.escapeColumnName("id") + " as " + provider.escapeColumnName("i") + ", "
|
||||
+ provider.escapeColumnName("id") + " as " + provider.escapeColumnName("j")
|
||||
|
@ -79,7 +79,7 @@ public void testTable() throws Exception {
|
||||
|
||||
// Set the rdbms "FROM" config
|
||||
fillRdbmsFromConfig(job, "id");
|
||||
MConfigList fromConfig = job.getJobConfig(Direction.FROM);
|
||||
MConfigList fromConfig = job.getFromJobConfig();
|
||||
fromConfig.getStringInput("incrementalRead.checkColumn").setValue(provider.escapeColumnName(checkColumn));
|
||||
fromConfig.getStringInput("incrementalRead.lastValue").setValue(lastValue);
|
||||
|
||||
@ -130,7 +130,7 @@ public void testQuery() throws Exception {
|
||||
String query = "SELECT * FROM " + provider.escapeTableName(getTableName().getTableName()) + " WHERE ${CONDITIONS}";
|
||||
|
||||
// Set the rdbms "FROM" config
|
||||
MConfigList fromConfig = job.getJobConfig(Direction.FROM);
|
||||
MConfigList fromConfig = job.getFromJobConfig();
|
||||
fromConfig.getStringInput("fromJobConfig.sql").setValue(query);
|
||||
fromConfig.getStringInput("fromJobConfig.partitionColumn").setValue(provider.escapeColumnName("id"));
|
||||
fromConfig.getStringInput("incrementalRead.checkColumn").setValue(provider.escapeColumnName(checkColumn));
|
||||
|
@ -64,7 +64,7 @@ public void testStagedTransfer() throws Exception {
|
||||
|
||||
// fill rdbms "TO" config here
|
||||
fillRdbmsToConfig(job);
|
||||
MConfigList configs = job.getJobConfig(Direction.TO);
|
||||
MConfigList configs = job.getToJobConfig();
|
||||
configs.getStringInput("toJobConfig.stageTableName").setValue(provider.escapeTableName(stageTableName.getTableName()));
|
||||
|
||||
// driver config
|
||||
|
@ -71,11 +71,11 @@ public void testCities() throws Exception {
|
||||
// Set rdbms "FROM" config
|
||||
fillRdbmsFromConfig(job, "id");
|
||||
// TODO: Kite have troubles with some data types, so we're limiting the columns to int only
|
||||
MConfigList fromConfig = job.getJobConfig(Direction.FROM);
|
||||
MConfigList fromConfig = job.getFromJobConfig();
|
||||
fromConfig.getStringInput("fromJobConfig.columns").setValue(provider.escapeColumnName("id"));
|
||||
|
||||
// Fill the Kite "TO" config
|
||||
MConfigList toConfig = job.getJobConfig(Direction.TO);
|
||||
MConfigList toConfig = job.getToJobConfig();
|
||||
toConfig.getStringInput("toJobConfig.uri").setValue("dataset:hdfs:" + getMapreduceDirectory());
|
||||
toConfig.getEnumInput("toJobConfig.fileFormat").setValue(FileFormat.CSV);
|
||||
|
||||
|
@ -316,10 +316,10 @@ private long loadJob(MJob job) {
|
||||
// Transform config structures to objects for validations
|
||||
SqoopConnector fromConnector =
|
||||
ConnectorManager.getInstance().getSqoopConnector(
|
||||
job.getConnectorId(Direction.FROM));
|
||||
job.getFromConnectorId());
|
||||
SqoopConnector toConnector =
|
||||
ConnectorManager.getInstance().getSqoopConnector(
|
||||
job.getConnectorId(Direction.TO));
|
||||
job.getToConnectorId());
|
||||
|
||||
Object fromConnectorConfig = ClassUtils.instantiate(
|
||||
fromConnector.getJobConfigurationClass(Direction.FROM));
|
||||
|
Loading…
Reference in New Issue
Block a user