5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-13 07:21:08 +08:00

SQOOP-2743: Sqoop2: Update RepositoryLoadTool and RepositoryDumpTool

(Colin Ma via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2015-12-23 11:38:06 +01:00
parent c60d44f940
commit 889fde9a1a
7 changed files with 271 additions and 137 deletions

View File

@ -257,6 +257,21 @@ limitations under the License.
</properties>
</configuration>
</execution>
<execution>
<id>tools-test</id>
<goals>
<goal>test</goal>
</goals>
<phase>integration-test</phase>
<configuration>
<suiteXmlFiles>
<suiteXmlFile>src/test/resources/tools-tests-suite.xml</suiteXmlFile>
</suiteXmlFiles>
<properties>
<suitename>tools-tests</suitename>
</properties>
</configuration>
</execution>
</executions>
</plugin>
<plugin>

View File

@ -0,0 +1,148 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.integration.tools;
import org.apache.commons.io.Charsets;
import org.apache.commons.io.IOUtils;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.VersionInfo;
import org.apache.sqoop.json.JSONUtils;
import org.apache.sqoop.json.JobsBean;
import org.apache.sqoop.json.LinksBean;
import org.apache.sqoop.json.SubmissionsBean;
import org.apache.sqoop.model.*;
import org.apache.sqoop.submission.SubmissionStatus;
import org.apache.sqoop.test.testcases.ConnectorTestCase;
import org.apache.sqoop.test.utils.HdfsUtils;
import org.apache.sqoop.tools.tool.JSONConstants;
import org.apache.sqoop.tools.tool.RepositoryDumpTool;
import org.apache.sqoop.tools.tool.RepositoryLoadTool;
import org.apache.sqoop.utils.UrlSafeUtils;
import org.json.simple.JSONObject;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.io.*;
import java.util.List;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
public class RepositoryDumpLoadToolTest extends ConnectorTestCase {
private static final Logger LOG = Logger.getLogger(RepositoryDumpLoadToolTest.class);
private String jsonFilePath;
// do the load test and insert data to repo first, then do the dump test.
@Test(dependsOnMethods = { "testLoad" })
public void testDump() throws Exception {
// dump the repository
RepositoryDumpTool rdt = new RepositoryDumpTool();
rdt.setInTest(true);
String fileName = HdfsUtils.joinPathFragments(getTemporaryPath(), "repoDumpTest.json");
rdt.runToolWithConfiguration(new String[]{"-o", fileName});
// load the output json file and do the verification
try (InputStream input = new FileInputStream(fileName)) {
String jsonTxt = IOUtils.toString(input, Charsets.UTF_8);
JSONObject json = JSONUtils.parse(jsonTxt);
JSONObject metadata = (JSONObject) json.get(JSONConstants.METADATA);
assertEquals((String) metadata.get(JSONConstants.VERSION), VersionInfo.getBuildVersion());
// verify the links
JSONObject jsonLinks = (JSONObject) json.get(JSONConstants.LINKS);
LinksBean linksBean = new LinksBean();
linksBean.restore(jsonLinks);
verifyLinks(linksBean.getLinks());
// verify the job
JSONObject jsonJobs = (JSONObject) json.get(JSONConstants.JOBS);
JobsBean jobsBean = new JobsBean();
jobsBean.restore(jsonJobs);
verifyJobs(jobsBean.getJobs());
// verify the submission
JSONObject jsonSubmissions = (JSONObject) json.get(JSONConstants.SUBMISSIONS);
SubmissionsBean submissionsBean = new SubmissionsBean();
submissionsBean.restore(jsonSubmissions);
verifySubmissions(submissionsBean.getSubmissions());
}
}
@Test
public void testLoad() throws Exception {
RepositoryLoadTool rlt = new RepositoryLoadTool();
rlt.setInTest(true);
rlt.runToolWithConfiguration(new String[]{"-i", jsonFilePath});
verifyLinks(getClient().getLinks());
verifyJobs(getClient().getJobs());
verifySubmissions(getClient().getSubmissions());
}
private void verifyLinks(List<MLink> links) {
for (MLink link : links) {
String linkName = link.getName();
assertTrue("hdfsLink1".equals(linkName) || "hdfsLink2".equals(linkName));
if ("hdfsLink1".equals(linkName)) {
assertEquals(link.getConnectorName(), "hdfs-connector");
} else {
assertEquals(link.getConnectorName(), "hdfs-connector");
}
}
}
private void verifyJobs(List<MJob> jobs) {
assertEquals(jobs.size(), 1);
MJob job = jobs.get(0);
assertEquals(job.getFromConnectorName(), "hdfs-connector");
assertEquals(job.getToConnectorName(), "hdfs-connector");
assertEquals(job.getFromLinkName(), "hdfsLink1");
assertEquals(job.getToLinkName(), "hdfsLink2");
assertEquals(job.getName(), "jobName");
}
private void verifySubmissions(List<MSubmission> submissions) {
assertEquals(submissions.size(), 1);
MSubmission submission = submissions.get(0);
assertEquals(submission.getJobName(), "jobName");
assertEquals(submission.getStatus(), SubmissionStatus.SUCCEEDED);
}
// generate the json file without the license
@BeforeMethod
public void prepareJsonFile() throws Exception {
String testFilePath = getClass().getResource("/repoLoadToolTest.json").getPath();
jsonFilePath = HdfsUtils.joinPathFragments(getTemporaryPath(), "repoLoadTest.json");
try (BufferedReader reader = new BufferedReader(new FileReader(testFilePath));
FileWriter writer = new FileWriter(jsonFilePath)) {
String line;
while ((line = reader.readLine()) != null) {
// ignore the license line
if (!line.startsWith("#")) {
// for hdfs connector, DirectoryExistsValidator is responsible for validation
// replace the link config dir by the local path.
if (line.indexOf("linkConfReplacement") > 0) {
line = line.replaceAll("linkConfReplacement", UrlSafeUtils.urlEncode(getSqoopMiniClusterTemporaryPath() + "/config/"));
}
writer.write(line);
}
}
writer.flush();
}
}
}

View File

@ -0,0 +1,19 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
{"jobs":{"jobs":[{"driver-config-values":{"configs":[{"id":22,"validators":[],"inputs":[{"id":87,"validators":[],"overrides":"","name":"throttlingConfig.numExtractors","value":"3","type":"INTEGER","editable":"ANY","sensitive":false},{"id":88,"validators":[],"overrides":"","name":"throttlingConfig.numLoaders","type":"INTEGER","editable":"ANY","sensitive":false}],"name":"throttlingConfig","type":"JOB"},{"id":23,"validators":[],"inputs":[{"id":89,"validators":[],"overrides":"","name":"jarConfig.extraJars","type":"LIST","editable":"ANY","sensitive":false}],"name":"jarConfig","type":"JOB"}],"validators":[]},"enabled":true,"update-user":"user","from-link-name":"hdfsLink1","to-config-values":{"configs":[{"id":16,"validators":[{"validator-str-arg":"","validator-class":"org.apache.sqoop.connector.hdfs.configuration.ToJobConfig$ToJobConfigValidator"}],"inputs":[{"id":59,"validators":[],"overrides":"","name":"toJobConfig.overrideNullValue","type":"BOOLEAN","editable":"ANY","sensitive":false},{"id":60,"validators":[],"overrides":"","name":"toJobConfig.nullValue","type":"STRING","editable":"ANY","size":255,"sensitive":false},{"id":61,"values":"TEXT_FILE,SEQUENCE_FILE","validators":[],"overrides":"","name":"toJobConfig.outputFormat","value":"TEXT_FILE","type":"ENUM","editable":"ANY","sensitive":false},{"id":62,"values":"NONE,DEFAULT,DEFLATE,GZIP,BZIP2,LZO,LZ4,SNAPPY,CUSTOM","validators":[],"overrides":"","name":"toJobConfig.compression","type":"ENUM","editable":"ANY","sensitive":false},{"id":63,"validators":[],"overrides":"","name":"toJobConfig.customCompression","type":"STRING","editable":"ANY","size":255,"sensitive":false},{"id":64,"validators":[{"validator-str-arg":"","validator-class":"org.apache.sqoop.validation.validators.NotEmpty"}],"overrides":"","name":"toJobConfig.outputDirectory","value":"%2Fmapreduce-job-io%2Forg.apache.sqoop.integration.tools.RepositoryDumpLoadToolTest%2Ftest%2FTO%2F","type":"STRING","editable":"ANY","size":255,"sensitive":false},{"id":65,"validators":[],"overrides":"","name":"toJobConfig.appendMode","type":"BOOLEAN","editable":"ANY","sensitive":false}],"name":"toJobConfig","type":"JOB"}],"validators":[]},"creation-date":1450770986591,"update-date":1450770998270,"creation-user":"user","id":1,"from-config-values":{"configs":[{"id":14,"validators":[],"inputs":[{"id":54,"validators":[{"validator-str-arg":"","validator-class":"org.apache.sqoop.validation.validators.NotEmpty"}],"overrides":"","name":"fromJobConfig.inputDirectory","value":"%2Fmapreduce-job-io%2Forg.apache.sqoop.integration.tools.RepositoryDumpLoadToolTest%2Ftest%2F","type":"STRING","editable":"ANY","size":255,"sensitive":false},{"id":55,"validators":[],"overrides":"","name":"fromJobConfig.overrideNullValue","type":"BOOLEAN","editable":"ANY","sensitive":false},{"id":56,"validators":[],"overrides":"","name":"fromJobConfig.nullValue","type":"STRING","editable":"ANY","size":255,"sensitive":false}],"name":"fromJobConfig","type":"JOB"},{"id":15,"validators":[],"inputs":[{"id":57,"values":"NONE,NEW_FILES","validators":[],"overrides":"","name":"incremental.incrementalType","type":"ENUM","editable":"ANY","sensitive":false},{"id":58,"validators":[],"overrides":"","name":"incremental.lastImportedDate","type":"DATETIME","editable":"ANY","sensitive":false}],"name":"incremental","type":"JOB"}],"validators":[]},"to-link-name":"hdfsLink2","name":"jobName","from-connector-name":"hdfs-connector","to-connector-name":"hdfs-connector"}]},
"submissions":{"submissions":[{"progress":-1.0,"last-update-date":1450770998258,"external-id":"job_1450770981669_0001","last-udpate-user":"user","job-name":"jobName","status":"SUCCEEDED","creation-date":1450770986659,"external-link":"http:\/\/address:40606\/proxy\/application_1450770981669_0001\/","creation-user":"user","counters":{"org.apache.hadoop.mapreduce.JobCounter":{"SLOTS_MILLIS_MAPS":4897,"MB_MILLIS_MAPS":5014528,"TOTAL_LAUNCHED_MAPS":2,"MILLIS_MAPS":4897,"VCORES_MILLIS_MAPS":4897,"SLOTS_MILLIS_REDUCES":0,"OTHER_LOCAL_MAPS":2},"org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter":{"BYTES_WRITTEN":0},"org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter":{"BYTES_READ":0},"org.apache.hadoop.mapreduce.TaskCounter":{"MAP_INPUT_RECORDS":0,"MERGED_MAP_OUTPUTS":0,"PHYSICAL_MEMORY_BYTES":383397888,"SPILLED_RECORDS":0,"COMMITTED_HEAP_BYTES":272629760,"CPU_MILLISECONDS":1890,"FAILED_SHUFFLE":0,"VIRTUAL_MEMORY_BYTES":5550505984,"SPLIT_RAW_BYTES":607,"MAP_OUTPUT_RECORDS":4,"GC_TIME_MILLIS":45},"org.apache.hadoop.mapreduce.FileSystemCounter":{"FILE_WRITE_OPS":0,"FILE_READ_OPS":0,"FILE_LARGE_READ_OPS":0,"FILE_BYTES_READ":0,"HDFS_BYTES_READ":1007,"FILE_BYTES_WRITTEN":1130486,"HDFS_LARGE_READ_OPS":0,"HDFS_WRITE_OPS":2,"HDFS_READ_OPS":11,"HDFS_BYTES_WRITTEN":194},"org.apache.sqoop.submission.counter.SqoopCounters":{"ROWS_WRITTEN":4,"ROWS_READ":4}}}]},
"links":{"links":[{"id":1,"enabled":true,"update-user":"user","link-config-values":{"configs":[{"id":13,"validators":[{"validator-str-arg":"","validator-class":"org.apache.sqoop.connector.hdfs.configuration.LinkConfig$ConfigValidator"}],"inputs":[{"id":51,"validators":[],"overrides":"","name":"linkConfig.uri","type":"STRING","editable":"ANY","size":255,"sensitive":false},{"id":52,"validators":[{"validator-str-arg":"","validator-class":"org.apache.sqoop.validation.validators.DirectoryExistsValidator"}],"overrides":"","name":"linkConfig.confDir","value":"linkConfReplacement","type":"STRING","editable":"ANY","size":255,"sensitive":false},{"id":53,"validators":[],"overrides":"","name":"linkConfig.configOverrides","type":"MAP","editable":"ANY","sensitive":false}],"name":"linkConfig","type":"LINK"}],"validators":[]},"name":"hdfsLink1","connector-name":"hdfs-connector","creation-date":1450770986500,"update-date":1450770986500,"creation-user":"user"},{"id":2,"enabled":true,"update-user":"user","link-config-values":{"configs":[{"id":13,"validators":[{"validator-str-arg":"","validator-class":"org.apache.sqoop.connector.hdfs.configuration.LinkConfig$ConfigValidator"}],"inputs":[{"id":51,"validators":[],"overrides":"","name":"linkConfig.uri","type":"STRING","editable":"ANY","size":255,"sensitive":false},{"id":52,"validators":[{"validator-str-arg":"","validator-class":"org.apache.sqoop.validation.validators.DirectoryExistsValidator"}],"overrides":"","name":"linkConfig.confDir","value":"linkConfReplacement","type":"STRING","editable":"ANY","size":255,"sensitive":false},{"id":53,"validators":[],"overrides":"","name":"linkConfig.configOverrides","type":"MAP","editable":"ANY","sensitive":false}],"name":"linkConfig","type":"LINK"}],"validators":[]},"name":"hdfsLink2","connector-name":"hdfs-connector","creation-date":1450770986547,"update-date":1450770986547,"creation-user":"user"}]},
"metadata":{"revision":"f4909a5634b085c9291efb95e1dd3b8579f4b1b5","compile-user":"user","compile-date":"Mon Dec 21 15:01:01 CST 2015","include-sensitive":false,"version":"2.0.0-SNAPSHOT"}}

View File

@ -0,0 +1,33 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!DOCTYPE suite SYSTEM "http://testng.org/testng-1.0.dtd" >
<suite name="ToolsTests" verbose="2" parallel="false">
<listeners>
<listener class-name="org.apache.sqoop.test.testng.SqoopTestListener" />
</listeners>
<test name="ToolsTests">
<packages>
<package name="org.apache.sqoop.integration.tools"/>
</packages>
</test>
</suite>

View File

@ -24,18 +24,12 @@ private JSONConstants() {
// restrict instantiation
}
public static final String CONNECTOR_ID = "connector-id";
public static final String FROM_CONNECTOR_ID = "from-connector-id";
public static final String TO_CONNECTOR_ID = "to-connector-id";
public static final String CONNECTOR_NAME = "connector-name";
public static final String FROM_CONNECTOR_NAME = "from-connector-name";
public static final String TO_CONNECTOR_NAME = "to-connector-name";
public static final String NAME = "name";
public static final String FROM_LINK_ID = "from-link-id";
public static final String TO_LINK_ID = "to-link-id";
public static final String FROM_LINK_NAME = "from-link-name";
public static final String TO_LINK_NAME = "to-link-name";
public static final String JOB_ID = "job-id";
public static final String JOB_NAME = "job-name";
public static final String LINKS = "links";
public static final String JOBS = "jobs";

View File

@ -49,6 +49,7 @@
*/
public class RepositoryDumpTool extends ConfiguredTool {
public static final Logger LOG = Logger.getLogger(RepositoryDumpTool.class);
private boolean isInTest = false;
@Override
public boolean runToolWithConfiguration(String[] arguments) {
@ -96,8 +97,10 @@ public boolean runToolWithConfiguration(String[] arguments) {
private JSONObject dump(boolean skipSensitive) {
RepositoryManager.getInstance().initialize(true);
ConnectorManager.getInstance().initialize();
if (!isInTest) {
RepositoryManager.getInstance().initialize(true);
ConnectorManager.getInstance().initialize();
}
Repository repository = RepositoryManager.getInstance().getRepository();
@ -108,25 +111,16 @@ private JSONObject dump(boolean skipSensitive) {
List<MLink> links = repository.findLinks();
LinksBean linkBeans = new LinksBean(links);
JSONObject linksJsonObject = linkBeans.extract(skipSensitive);
JSONArray linksJsonArray = (JSONArray)linksJsonObject.get(JSONConstants.LINKS);
addConnectorName(linksJsonArray, JSONConstants.CONNECTOR_ID, JSONConstants.CONNECTOR_NAME);
result.put(JSONConstants.LINKS, linksJsonObject);
LOG.info("Dumping Jobs with skipSensitive=" + String.valueOf(skipSensitive));
JobsBean jobs = new JobsBean(repository.findJobs());
JSONObject jobsJsonObject = jobs.extract(skipSensitive);
JSONArray jobsJsonArray = (JSONArray)jobsJsonObject.get(JSONConstants.JOBS);
addConnectorName(jobsJsonArray, JSONConstants.FROM_CONNECTOR_ID, JSONConstants.FROM_CONNECTOR_NAME);
addConnectorName(jobsJsonArray, JSONConstants.TO_CONNECTOR_ID, JSONConstants.TO_CONNECTOR_NAME);
addLinkName(jobsJsonArray, JSONConstants.FROM_LINK_ID, JSONConstants.FROM_LINK_NAME);
addLinkName(jobsJsonArray, JSONConstants.TO_LINK_ID, JSONConstants.TO_LINK_NAME);
result.put(JSONConstants.JOBS, jobsJsonObject);
LOG.info("Dumping Submissions with skipSensitive=" + String.valueOf(skipSensitive));
SubmissionsBean submissions = new SubmissionsBean(repository.findSubmissions());
JSONObject submissionsJsonObject = submissions.extract(skipSensitive);
JSONArray submissionsJsonArray = (JSONArray)submissionsJsonObject.get(JSONConstants.SUBMISSIONS);
addJobName(submissionsJsonArray, JSONConstants.JOB_ID);
result.put(JSONConstants.SUBMISSIONS, submissionsJsonObject);
result.put(JSONConstants.METADATA, repoMetadata(skipSensitive));
@ -145,43 +139,7 @@ private JSONObject repoMetadata(boolean skipSensitive) {
return metadata;
}
private JSONArray addConnectorName(JSONArray jsonArray, String connectorKey, String connectorName) {
ConnectorManager connectorManager = ConnectorManager.getInstance();
Iterator<JSONObject> iterator = jsonArray.iterator();
while (iterator.hasNext()) {
JSONObject result = iterator.next();
Long connectorId = (Long) result.get(connectorKey);
result.put(connectorName, connectorManager.getConnectorConfigurable(connectorId).getUniqueName());
}
return jsonArray;
}
private JSONArray addLinkName(JSONArray jsonArray, String linkKey, String linkName) {
Repository repository = RepositoryManager.getInstance().getRepository();
Iterator<JSONObject> iterator = jsonArray.iterator();
while (iterator.hasNext()) {
JSONObject jobObject = iterator.next();
Long linkId = (Long) jobObject.get(linkKey);
jobObject.put(linkName, repository.findLink(linkId).getName());
}
return jsonArray;
}
private JSONArray addJobName(JSONArray jsonArray, String jobKey) {
Repository repository = RepositoryManager.getInstance().getRepository();
Iterator<JSONObject> iterator = jsonArray.iterator();
while (iterator.hasNext()) {
JSONObject submissionObject = iterator.next();
Long jobId = (Long) submissionObject.get(jobKey);
submissionObject.put(JSONConstants.JOB_NAME, repository.findJob(jobId).getName());
}
return jsonArray;
public void setInTest(boolean isInTest) {
this.isInTest = isInTest;
}
}

View File

@ -22,10 +22,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.*;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
@ -77,6 +74,7 @@
public class RepositoryLoadTool extends ConfiguredTool {
public static final Logger LOG = Logger.getLogger(RepositoryLoadTool.class);
private boolean isInTest = false;
@SuppressWarnings("static-access")
@Override
@ -130,14 +128,16 @@ private boolean load(JSONObject repo) {
return false;
}
// initialize repository as mutable
RepositoryManager.getInstance().initialize(false);
if (!isInTest) {
// initialize repository as mutable
RepositoryManager.getInstance().initialize(false);
ConnectorManager.getInstance().initialize();
Driver.getInstance().initialize();
}
Repository repository = RepositoryManager.getInstance().getRepository();
ConnectorManager.getInstance().initialize();
Driver.getInstance().initialize();
LOG.info("Loading Connections");
JSONObject jsonLinks = (JSONObject) repo.get(JSONConstants.LINKS);
if (jsonLinks == null) {
@ -145,28 +145,21 @@ private boolean load(JSONObject repo) {
return false;
}
updateConnectorIDUsingName(
(JSONArray)jsonLinks.get(JSONConstants.LINKS),
JSONConstants.CONNECTOR_ID,
JSONConstants.CONNECTOR_NAME,
true);
removeObjectIfConnectorNotExist(
(JSONArray) jsonLinks.get(JSONConstants.LINKS),
JSONConstants.CONNECTOR_NAME, true);
LinksBean linksBean = new LinksBean();
linksBean.restore(jsonLinks);
HashMap<Long, Long> linkIds = new HashMap<Long, Long>();
for (MLink link : linksBean.getLinks()) {
long oldId = link.getPersistenceId();
long newId = loadLink(link);
if (newId == link.PERSISTANCE_ID_DEFAULT) {
LOG.error("loading connection " + link.getName() + " with previous ID " + oldId
+ " failed. Aborting repository load. Check log for details.");
LOG.error("loading connection " + link.getName() + " failed. Aborting repository load. Check log for details.");
return false;
}
linkIds.put(oldId, newId);
}
LOG.info("Loaded " + linkIds.size() + " links");
LOG.info("Loaded " + linksBean.getLinks().size() + " links");
LOG.info("Loading Jobs");
JSONObject jsonJobs = (JSONObject) repo.get(JSONConstants.JOBS);
@ -176,30 +169,22 @@ private boolean load(JSONObject repo) {
return false;
}
updateConnectorIDUsingName(
(JSONArray)jsonJobs.get(JSONConstants.JOBS),
JSONConstants.FROM_CONNECTOR_ID,
JSONConstants.FROM_CONNECTOR_NAME,
false);
updateConnectorIDUsingName(
(JSONArray)jsonJobs.get(JSONConstants.JOBS),
JSONConstants.TO_CONNECTOR_ID,
JSONConstants.TO_CONNECTOR_NAME,
false);
removeObjectIfConnectorNotExist(
(JSONArray) jsonJobs.get(JSONConstants.JOBS),
JSONConstants.FROM_CONNECTOR_NAME, false);
removeObjectIfConnectorNotExist(
(JSONArray) jsonJobs.get(JSONConstants.JOBS),
JSONConstants.TO_CONNECTOR_NAME, false);
updateLinkIdUsingName((JSONArray)jsonJobs.get(JSONConstants.JOBS),
JSONConstants.FROM_LINK_ID,
JSONConstants.FROM_LINK_NAME);
updateLinkIdUsingName((JSONArray)jsonJobs.get(JSONConstants.JOBS),
JSONConstants.TO_LINK_ID,
JSONConstants.TO_LINK_NAME);
removeJobIfLinkNotExist((JSONArray) jsonJobs.get(JSONConstants.JOBS),
JSONConstants.FROM_LINK_NAME);
removeJobIfLinkNotExist((JSONArray) jsonJobs.get(JSONConstants.JOBS),
JSONConstants.TO_LINK_NAME);
JobsBean jobsBean = new JobsBean();
jobsBean.restore(jsonJobs);
HashMap<Long, Long> jobIds = new HashMap<Long, Long>();
for (MJob job : jobsBean.getJobs()) {
long oldId = job.getPersistenceId();
long newId = loadJob(job);
if (newId == job.PERSISTANCE_ID_DEFAULT) {
@ -207,10 +192,8 @@ private boolean load(JSONObject repo) {
+ " failed. Aborting repository load. Check log for details.");
return false;
}
jobIds.put(oldId, newId);
}
LOG.info("Loaded " + jobIds.size() + " jobs");
LOG.info("Loaded " + jobsBean.getJobs().size() + " jobs");
LOG.info("Loading Submissions");
JSONObject jsonSubmissions = (JSONObject) repo.get(JSONConstants.SUBMISSIONS);
@ -220,17 +203,15 @@ private boolean load(JSONObject repo) {
return false;
}
updateJobIdUsingName((JSONArray)jsonSubmissions.get(JSONConstants.SUBMISSIONS));
removeSubmissionIfJobNotExist((JSONArray)jsonSubmissions.get(JSONConstants.SUBMISSIONS));
SubmissionsBean submissionsBean = new SubmissionsBean();
submissionsBean.restore(jsonSubmissions);
int submissionCount = 0;
for (MSubmission submission : submissionsBean.getSubmissions()) {
resetPersistenceId(submission);
repository.createSubmission(submission);
submissionCount++;
}
LOG.info("Loaded " + submissionCount + " submissions.");
LOG.info("Loaded " + submissionsBean.getSubmissions().size() + " submissions.");
LOG.info("Repository load completed successfully.");
return true;
}
@ -390,97 +371,83 @@ private long loadJob(MJob job) {
}
private JSONArray updateConnectorIDUsingName(JSONArray jsonArray, String connectorIdKey, String connectorNameKey, boolean isLink) {
private JSONArray removeObjectIfConnectorNotExist(JSONArray jsonArray, String connectorNameKey, boolean isLink) {
Repository repository = RepositoryManager.getInstance().getRepository();
List<MConnector> connectors = repository.findConnectors();
Map<String, Long> connectorMap = new HashMap<String, Long>();
List<String> connectorNames = new ArrayList<String>();
for (MConnector connector : connectors) {
connectorMap.put(connector.getUniqueName(), connector.getPersistenceId());
connectorNames.add(connector.getUniqueName());
}
for (Iterator iterator = jsonArray.iterator(); iterator.hasNext(); ) {
JSONObject object = (JSONObject) iterator.next();
long connectorId = (Long) object.get(connectorIdKey);
String connectorName = (String) object.get(connectorNameKey);
Long currentConnectorId = connectorMap.get(connectorName);
String name = (String) object.get(JSONConstants.NAME);
String objectName = (String)object.get(JSONConstants.NAME);
if (currentConnectorId == null) {
if (!connectorNames.contains(connectorName)) {
// If a connector doesn't exist, remove the links and jobs relating to it
iterator.remove();
LOG.warn((isLink ? "Link " : "Job ") + objectName + " won't be loaded because connector "
+ connectorName + " is missing.");
continue;
}
// If a given connector now has a different ID, we need to update the ID
if (connectorId != currentConnectorId) {
LOG.warn((isLink ? "Link " : "Job ") + name + " uses connector " + connectorName + ". "
+ "Replacing previous ID " + connectorId + " with new ID " + currentConnectorId);
object.put(connectorIdKey, currentConnectorId);
}
}
return jsonArray;
}
private JSONArray updateLinkIdUsingName(JSONArray jobsJsonArray, String linkIdKey, String linkNameKey) {
private JSONArray removeJobIfLinkNotExist(JSONArray jobsJsonArray, String linkNameKey) {
Repository repository = RepositoryManager.getInstance().getRepository();
List<MLink> links = repository.findLinks();
Map<String, Long> linkMap = new HashMap<String, Long>();
List<String> linkNames = new ArrayList<String>();
for (MLink link : links) {
linkMap.put(link.getName(), link.getPersistenceId());
linkNames.add(link.getName());
}
for(Iterator iterator = jobsJsonArray.iterator(); iterator.hasNext(); ) {
JSONObject jobObject = (JSONObject) iterator.next();
long linkId = (Long) jobObject.get(linkIdKey);
String linkName = (String) jobObject.get(linkNameKey);
Long currentLinkId = linkMap.get(linkName);
String jobName = (String) jobObject.get(JSONConstants.NAME);
String jobName = (String)jobObject.get(JSONConstants.NAME);
if (currentLinkId == null) {
if (!linkNames.contains(linkName)) {
// If a link doesn't exist, remove the jobs relating to it
iterator.remove();
LOG.warn("Job " + jobName + " won't be loaded because link " + linkName + " is missing.");
continue;
}
if (linkId != currentLinkId) {
LOG.warn("Job " + jobName + " uses link " + linkName + "."
+ "Replacing previous ID " + linkId + " with new ID " + currentLinkId);
jobObject.put(linkIdKey, currentLinkId);
}
}
return jobsJsonArray;
}
private JSONArray updateJobIdUsingName(JSONArray submissionsJsonArray) {
private JSONArray removeSubmissionIfJobNotExist(JSONArray submissionsJsonArray) {
Repository repository = RepositoryManager.getInstance().getRepository();
List<MJob> jobs = repository.findJobs();
Map<String, Long> jobMap = new HashMap<String, Long>();
List<String> jobNames = new ArrayList<String>();
for (MJob job : jobs) {
jobMap.put(job.getName(), job.getPersistenceId());
jobNames.add(job.getName());
}
for(Iterator iterator = submissionsJsonArray.iterator(); iterator.hasNext(); ) {
JSONObject submissionObject = (JSONObject) iterator.next();
String jobName = (String) submissionObject.get(JSONConstants.JOB_NAME);
Long currentJobId = jobMap.get(jobName);
if (currentJobId == null) {
if (!jobNames.contains(jobName)) {
// If a job doesn't exist, remove the submissions relating to it
iterator.remove();
LOG.warn("Submission for " + jobName + " won't be loaded because job " + jobName + " is missing.");
continue;
}
submissionObject.put(JSONConstants.JOB_ID, currentJobId);
}
return submissionsJsonArray;
}
public void setInTest(boolean isInTest) {
this.isInTest = isInTest;
}
}