mirror of
https://github.com/apache/sqoop.git
synced 2025-05-06 06:12:26 +08:00
SQOOP-1256: Sqoop2: Tool: Load repository dump into a different repository
(Dian Fu via Abraham Elmahrek)
This commit is contained in:
parent
3c88f553de
commit
97c65879f2
@ -44,7 +44,9 @@
|
||||
public class SubmissionBean implements JsonBean {
|
||||
|
||||
private static final String SUBMISSION = "submission";
|
||||
@Deprecated
|
||||
private static final String JOB = "job";
|
||||
private static final String JOB_ID = "job-id";
|
||||
private static final String CREATION_USER = "creation-user";
|
||||
private static final String CREATION_DATE = "creation-date";
|
||||
private static final String LAST_UPDATE_USER = "last-udpate-user";
|
||||
@ -103,6 +105,7 @@ private JSONObject extractSubmission(MSubmission submission) {
|
||||
JSONObject object = new JSONObject();
|
||||
|
||||
object.put(JOB, submission.getJobId());
|
||||
object.put(JOB_ID, submission.getJobId());
|
||||
object.put(STATUS, submission.getStatus().name());
|
||||
object.put(PROGRESS, submission.getProgress());
|
||||
|
||||
@ -173,7 +176,11 @@ protected void restoreSubmissions(JSONArray array) {
|
||||
private MSubmission restoreSubmission(Object obj) {
|
||||
JSONObject object = (JSONObject) obj;
|
||||
MSubmission submission = new MSubmission();
|
||||
submission.setJobId((Long) object.get(JOB));
|
||||
Long jobId = (Long) object.get(JOB_ID);
|
||||
if (jobId == null) {
|
||||
jobId = (Long) object.get(JOB);
|
||||
}
|
||||
submission.setJobId(jobId);
|
||||
submission.setStatus(SubmissionStatus.valueOf((String) object.get(STATUS)));
|
||||
submission.setProgress((Double) object.get(PROGRESS));
|
||||
|
||||
|
@ -26,11 +26,17 @@ private JSONConstants() {
|
||||
|
||||
public static final String CONNECTOR_ID = "connector-id";
|
||||
public static final String FROM_CONNECTOR_ID = "from-connector-id";
|
||||
public static final String TO_CONNECTOR_ID = "from-connector-id";
|
||||
public static final String TO_CONNECTOR_ID = "to-connector-id";
|
||||
public static final String CONNECTOR_NAME = "connector-name";
|
||||
public static final String FROM_CONNECTOR_NAME = "from-connector-name";
|
||||
public static final String TO_CONNECTOR_NAME = "to-connector-name";
|
||||
public static final String NAME = "name";
|
||||
public static final String LINK_ID = "link-id";
|
||||
public static final String FROM_LINK_ID = "from-link-id";
|
||||
public static final String TO_LINK_ID = "to-link-id";
|
||||
public static final String FROM_LINK_NAME = "from-link-name";
|
||||
public static final String TO_LINK_NAME = "to-link-name";
|
||||
public static final String JOB_ID = "job-id";
|
||||
public static final String JOB_NAME = "job-name";
|
||||
public static final String LINKS = "links";
|
||||
public static final String JOBS = "jobs";
|
||||
public static final String SUBMISSIONS = "submissions";
|
||||
|
@ -107,20 +107,24 @@ private JSONObject dump(boolean skipSensitive) {
|
||||
LinksBean linkBeans = new LinksBean(links);
|
||||
JSONObject linksJsonObject = linkBeans.extract(skipSensitive);
|
||||
JSONArray linksJsonArray = (JSONArray)linksJsonObject.get(JSONConstants.LINKS);
|
||||
addConnectorName(linksJsonArray, JSONConstants.CONNECTOR_ID);
|
||||
addConnectorName(linksJsonArray, JSONConstants.CONNECTOR_ID, JSONConstants.CONNECTOR_NAME);
|
||||
result.put(JSONConstants.LINKS, linksJsonObject);
|
||||
|
||||
LOG.info("Dumping Jobs with skipSensitive=" + String.valueOf(skipSensitive));
|
||||
JobsBean jobs = new JobsBean(repository.findJobs());
|
||||
JSONObject jobsJsonObject = jobs.extract(skipSensitive);
|
||||
JSONArray jobsJsonArray = (JSONArray)jobsJsonObject.get(JSONConstants.JOBS);
|
||||
addConnectorName(jobsJsonArray, JSONConstants.FROM_CONNECTOR_ID);
|
||||
addConnectorName(jobsJsonArray, JSONConstants.TO_CONNECTOR_ID);
|
||||
addConnectorName(jobsJsonArray, JSONConstants.FROM_CONNECTOR_ID, JSONConstants.FROM_CONNECTOR_NAME);
|
||||
addConnectorName(jobsJsonArray, JSONConstants.TO_CONNECTOR_ID, JSONConstants.TO_CONNECTOR_NAME);
|
||||
addLinkName(jobsJsonArray, JSONConstants.FROM_LINK_ID, JSONConstants.FROM_LINK_NAME);
|
||||
addLinkName(jobsJsonArray, JSONConstants.TO_LINK_ID, JSONConstants.TO_LINK_NAME);
|
||||
result.put(JSONConstants.JOBS, jobsJsonObject);
|
||||
|
||||
LOG.info("Dumping Submissions with skipSensitive=" + String.valueOf(skipSensitive));
|
||||
SubmissionsBean submissions = new SubmissionsBean(repository.findSubmissions());
|
||||
JSONObject submissionsJsonObject = submissions.extract(skipSensitive);
|
||||
JSONArray submissionsJsonArray = (JSONArray)submissionsJsonObject.get(JSONConstants.SUBMISSIONS);
|
||||
addJobName(submissionsJsonArray, JSONConstants.JOB_ID);
|
||||
result.put(JSONConstants.SUBMISSIONS, submissionsJsonObject);
|
||||
|
||||
result.put(JSONConstants.METADATA, repoMetadata(skipSensitive));
|
||||
@ -139,7 +143,7 @@ private JSONObject repoMetadata(boolean skipSensitive) {
|
||||
return metadata;
|
||||
}
|
||||
|
||||
private JSONArray addConnectorName(JSONArray jsonArray, String connectorKey) {
|
||||
private JSONArray addConnectorName(JSONArray jsonArray, String connectorKey, String connectorName) {
|
||||
ConnectorManager connectorManager = ConnectorManager.getInstance();
|
||||
|
||||
Iterator<JSONObject> iterator = jsonArray.iterator();
|
||||
@ -147,7 +151,33 @@ private JSONArray addConnectorName(JSONArray jsonArray, String connectorKey) {
|
||||
while (iterator.hasNext()) {
|
||||
JSONObject result = iterator.next();
|
||||
Long connectorId = (Long) result.get(connectorKey);
|
||||
result.put(JSONConstants.CONNECTOR_NAME, connectorManager.getConnectorConfigurable(connectorId).getUniqueName());
|
||||
result.put(connectorName, connectorManager.getConnectorConfigurable(connectorId).getUniqueName());
|
||||
}
|
||||
|
||||
return jsonArray;
|
||||
}
|
||||
|
||||
private JSONArray addLinkName(JSONArray jsonArray, String linkKey, String linkName) {
|
||||
Repository repository = RepositoryManager.getInstance().getRepository();
|
||||
Iterator<JSONObject> iterator = jsonArray.iterator();
|
||||
|
||||
while (iterator.hasNext()) {
|
||||
JSONObject jobObject = iterator.next();
|
||||
Long linkId = (Long) jobObject.get(linkKey);
|
||||
jobObject.put(linkName, repository.findLink(linkId).getName());
|
||||
}
|
||||
|
||||
return jsonArray;
|
||||
}
|
||||
|
||||
private JSONArray addJobName(JSONArray jsonArray, String jobKey) {
|
||||
Repository repository = RepositoryManager.getInstance().getRepository();
|
||||
Iterator<JSONObject> iterator = jsonArray.iterator();
|
||||
|
||||
while (iterator.hasNext()) {
|
||||
JSONObject submissionObject = iterator.next();
|
||||
Long jobId = (Long) submissionObject.get(jobKey);
|
||||
submissionObject.put(JSONConstants.JOB_NAME, repository.findJob(jobId).getName());
|
||||
}
|
||||
|
||||
return jsonArray;
|
||||
|
@ -23,6 +23,7 @@
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@ -134,6 +135,7 @@ private boolean load(JSONObject repo) {
|
||||
Repository repository = RepositoryManager.getInstance().getRepository();
|
||||
|
||||
ConnectorManager.getInstance().initialize();
|
||||
Driver.getInstance().initialize();
|
||||
LOG.info("Loading Connections");
|
||||
|
||||
JSONObject jsonLinks = (JSONObject) repo.get(JSONConstants.LINKS);
|
||||
@ -145,7 +147,9 @@ private boolean load(JSONObject repo) {
|
||||
|
||||
updateConnectorIDUsingName(
|
||||
(JSONArray)jsonLinks.get(JSONConstants.LINKS),
|
||||
JSONConstants.CONNECTOR_ID);
|
||||
JSONConstants.CONNECTOR_ID,
|
||||
JSONConstants.CONNECTOR_NAME,
|
||||
true);
|
||||
|
||||
LinksBean linksBean = new LinksBean();
|
||||
linksBean.restore(jsonLinks);
|
||||
@ -174,12 +178,21 @@ private boolean load(JSONObject repo) {
|
||||
|
||||
updateConnectorIDUsingName(
|
||||
(JSONArray)jsonJobs.get(JSONConstants.JOBS),
|
||||
JSONConstants.FROM_CONNECTOR_ID);
|
||||
JSONConstants.FROM_CONNECTOR_ID,
|
||||
JSONConstants.FROM_CONNECTOR_NAME,
|
||||
false);
|
||||
updateConnectorIDUsingName(
|
||||
(JSONArray)jsonJobs.get(JSONConstants.JOBS),
|
||||
JSONConstants.TO_CONNECTOR_ID);
|
||||
updateIdUsingMap((JSONArray)jsonJobs.get(JSONConstants.JOBS),
|
||||
linkIds, JSONConstants.LINK_ID);
|
||||
JSONConstants.TO_CONNECTOR_ID,
|
||||
JSONConstants.TO_CONNECTOR_NAME,
|
||||
false);
|
||||
|
||||
updateLinkIdUsingName((JSONArray)jsonJobs.get(JSONConstants.JOBS),
|
||||
JSONConstants.FROM_LINK_ID,
|
||||
JSONConstants.FROM_LINK_NAME);
|
||||
updateLinkIdUsingName((JSONArray)jsonJobs.get(JSONConstants.JOBS),
|
||||
JSONConstants.TO_LINK_ID,
|
||||
JSONConstants.TO_LINK_NAME);
|
||||
|
||||
JobsBean jobsBean = new JobsBean();
|
||||
jobsBean.restore(jsonJobs);
|
||||
@ -207,7 +220,7 @@ private boolean load(JSONObject repo) {
|
||||
return false;
|
||||
}
|
||||
|
||||
updateIdUsingMap((JSONArray)jsonSubmissions.get(JSONConstants.SUBMISSIONS), jobIds, JSONConstants.JOB_ID);
|
||||
updateJobIdUsingName((JSONArray)jsonSubmissions.get(JSONConstants.SUBMISSIONS));
|
||||
|
||||
SubmissionsBean submissionsBean = new SubmissionsBean();
|
||||
submissionsBean.restore(jsonSubmissions);
|
||||
@ -359,7 +372,7 @@ private long loadJob(MJob job) {
|
||||
|
||||
}
|
||||
|
||||
private JSONArray updateConnectorIDUsingName(JSONArray jsonArray, String connectorIdKey) {
|
||||
private JSONArray updateConnectorIDUsingName(JSONArray jsonArray, String connectorIdKey, String connectorNameKey, boolean isLink) {
|
||||
Repository repository = RepositoryManager.getInstance().getRepository();
|
||||
|
||||
List<MConnector> connectors = repository.findConnectors();
|
||||
@ -369,16 +382,22 @@ private JSONArray updateConnectorIDUsingName(JSONArray jsonArray, String connect
|
||||
connectorMap.put(connector.getUniqueName(), connector.getPersistenceId());
|
||||
}
|
||||
|
||||
for (Object obj : jsonArray) {
|
||||
JSONObject object = (JSONObject) obj;
|
||||
for (Iterator iterator = jsonArray.iterator(); iterator.hasNext(); ) {
|
||||
JSONObject object = (JSONObject) iterator.next();
|
||||
long connectorId = (Long) object.get(connectorIdKey);
|
||||
String connectorName = (String) object.get(JSONConstants.CONNECTOR_NAME);
|
||||
long currentConnectorId = connectorMap.get(connectorName);
|
||||
String linkName = (String) object.get(JSONConstants.NAME);
|
||||
String connectorName = (String) object.get(connectorNameKey);
|
||||
Long currentConnectorId = connectorMap.get(connectorName);
|
||||
String name = (String) object.get(JSONConstants.NAME);
|
||||
|
||||
if (currentConnectorId == null) {
|
||||
// If a connector doesn't exist, remove the links and jobs relating to it
|
||||
iterator.remove();
|
||||
continue;
|
||||
}
|
||||
|
||||
// If a given connector now has a different ID, we need to update the ID
|
||||
if (connectorId != currentConnectorId) {
|
||||
LOG.warn("Link " + linkName + " uses connector " + connectorName + ". "
|
||||
LOG.warn((isLink ? "Link " : "Job ") + name + " uses connector " + connectorName + ". "
|
||||
+ "Replacing previous ID " + connectorId + " with new ID " + currentConnectorId);
|
||||
|
||||
object.put(connectorIdKey, currentConnectorId);
|
||||
@ -388,6 +407,65 @@ private JSONArray updateConnectorIDUsingName(JSONArray jsonArray, String connect
|
||||
return jsonArray;
|
||||
}
|
||||
|
||||
private JSONArray updateLinkIdUsingName(JSONArray jobsJsonArray, String linkIdKey, String linkNameKey) {
|
||||
Repository repository = RepositoryManager.getInstance().getRepository();
|
||||
|
||||
List<MLink> links = repository.findLinks();
|
||||
Map<String, Long> linkMap = new HashMap<String, Long>();
|
||||
for (MLink link : links) {
|
||||
linkMap.put(link.getName(), link.getPersistenceId());
|
||||
}
|
||||
|
||||
for(Iterator iterator = jobsJsonArray.iterator(); iterator.hasNext(); ) {
|
||||
JSONObject jobObject = (JSONObject) iterator.next();
|
||||
long linkId = (Long) jobObject.get(linkIdKey);
|
||||
String linkName = (String) jobObject.get(linkNameKey);
|
||||
Long currentLinkId = linkMap.get(linkName);
|
||||
String jobName = (String) jobObject.get(JSONConstants.NAME);
|
||||
|
||||
if (currentLinkId == null) {
|
||||
// If a link doesn't exist, remove the jobs relating to it
|
||||
iterator.remove();
|
||||
continue;
|
||||
}
|
||||
|
||||
if (linkId != currentLinkId) {
|
||||
LOG.warn("Job " + jobName + " uses link " + linkName + "."
|
||||
+ "Replacing previous ID " + linkId + " with new ID " + currentLinkId);
|
||||
|
||||
jobObject.put(linkIdKey, currentLinkId);
|
||||
}
|
||||
}
|
||||
|
||||
return jobsJsonArray;
|
||||
}
|
||||
|
||||
private JSONArray updateJobIdUsingName(JSONArray submissionsJsonArray) {
|
||||
Repository repository = RepositoryManager.getInstance().getRepository();
|
||||
|
||||
List<MJob> jobs = repository.findJobs();
|
||||
Map<String, Long> jobMap = new HashMap<String, Long>();
|
||||
for (MJob job : jobs) {
|
||||
jobMap.put(job.getName(), job.getPersistenceId());
|
||||
}
|
||||
|
||||
for(Iterator iterator = submissionsJsonArray.iterator(); iterator.hasNext(); ) {
|
||||
JSONObject submissionObject = (JSONObject) iterator.next();
|
||||
String jobName = (String) submissionObject.get(JSONConstants.JOB_NAME);
|
||||
Long currentJobId = jobMap.get(jobName);
|
||||
|
||||
if (currentJobId == null) {
|
||||
// If a job doesn't exist, remove the submissions relating to it
|
||||
iterator.remove();
|
||||
continue;
|
||||
}
|
||||
|
||||
submissionObject.put(JSONConstants.JOB_ID, currentJobId);
|
||||
}
|
||||
|
||||
return submissionsJsonArray;
|
||||
}
|
||||
|
||||
private JSONArray updateIdUsingMap(JSONArray jsonArray, HashMap<Long, Long> idMap, String fieldName) {
|
||||
for (Object obj : jsonArray) {
|
||||
JSONObject object = (JSONObject) obj;
|
||||
|
Loading…
Reference in New Issue
Block a user