5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-08 14:11:36 +08:00

SQOOP-1416: Sqoop2: From/To: Investigate multiple variables for "from" and "to"

(Abraham Elmahrek via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2014-08-13 18:51:20 -07:00 committed by Abraham Elmahrek
parent 153b6badf7
commit 2e79f9094c
7 changed files with 288 additions and 103 deletions

View File

@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.common;
public enum ConnectorTypeError implements ErrorCode {
/** An unknown error has occurred. */
CONNECTOR_TYPE_0000("Unknown connector type")
;
private final String message;
private ConnectorTypeError(String message) {
this.message = message;
}
public String getCode() {
return name();
}
public String getMessage() {
return message;
}
}

View File

@ -18,6 +18,8 @@
package org.apache.sqoop.json;
import org.apache.sqoop.common.ConnectorType;
import org.apache.sqoop.common.ConnectorTypeError;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.validation.Status;
import org.apache.sqoop.validation.Validation;
import org.json.simple.JSONObject;
@ -43,27 +45,35 @@ public class JobValidationBean implements JsonBean {
private static final String MESSAGES = "messages";
private Long id;
private Map<ConnectorType, Validation> connectorValidation;
private Validation fromConnectorValidation;
private Validation toConnectorValidation;
private Validation frameworkValidation;
// For "extract"
public JobValidationBean(Validation fromConnector, Validation framework, Validation toConnector) {
this();
this.connectorValidation = new HashMap<ConnectorType, Validation>();
this.connectorValidation.put(ConnectorType.FROM, fromConnector);
this.connectorValidation.put(ConnectorType.TO, toConnector);
this.fromConnectorValidation = fromConnector;
this.toConnectorValidation = toConnector;
this.frameworkValidation = framework;
}
// For "restore"
public JobValidationBean() {
id = null;
connectorValidation = new HashMap<ConnectorType, Validation>();
}
public Validation getConnectorValidation(ConnectorType type) {
return connectorValidation.get(type);
switch(type) {
case FROM:
return fromConnectorValidation;
case TO:
return toConnectorValidation;
default:
throw new SqoopException(ConnectorTypeError.CONNECTOR_TYPE_0000, "Connector type: " + type);
}
}
public Validation getFrameworkValidation() {
@ -125,10 +135,10 @@ public void restore(JSONObject jsonObject) {
JSONObject jsonConnectorObject = (JSONObject)jsonObject.get(CONNECTOR);
connectorValidation.put(ConnectorType.FROM, restoreValidation(
(JSONObject)jsonConnectorObject.get(FROM)));
connectorValidation.put(ConnectorType.TO, restoreValidation(
(JSONObject)jsonConnectorObject.get(TO)));
fromConnectorValidation = restoreValidation(
(JSONObject)jsonConnectorObject.get(FROM));
toConnectorValidation = restoreValidation(
(JSONObject)jsonConnectorObject.get(TO));
frameworkValidation = restoreValidation(
(JSONObject)jsonObject.get(FRAMEWORK));
}

View File

@ -17,10 +17,9 @@
*/
package org.apache.sqoop.model;
import java.util.HashMap;
import java.util.Map;
import org.apache.sqoop.common.ConnectorType;
import org.apache.sqoop.common.ConnectorTypeError;
import org.apache.sqoop.common.SqoopException;
/**
* Connector metadata.
@ -33,18 +32,17 @@ public final class MConnector extends MPersistableEntity implements MClonable {
private final String uniqueName;
private final String className;
private final MConnectionForms connectionForms;
private final Map<ConnectorType, MJobForms> jobForms;
private final MJobForms fromJobForms;
private final MJobForms toJobForms;
String version;
public MConnector(String uniqueName, String className,
String version, MConnectionForms connectionForms,
MJobForms fromJobForms, MJobForms toJobForms) {
this.jobForms = new HashMap<ConnectorType, MJobForms>();
this.version = version;
this.connectionForms = connectionForms;
this.jobForms.put(ConnectorType.FROM, fromJobForms);
this.jobForms.put(ConnectorType.TO, toJobForms);
this.fromJobForms = fromJobForms;
this.toJobForms = toJobForms;
if (uniqueName == null || className == null) {
throw new NullPointerException();
@ -88,8 +86,8 @@ public boolean equals(Object other) {
&& className.equals(mc.className)
&& version.equals(mc.version)
&& connectionForms.equals(mc.getConnectionForms())
&& jobForms.get(ConnectorType.FROM).equals(mc.getJobForms(ConnectorType.FROM))
&& jobForms.get(ConnectorType.TO).equals(mc.getJobForms(ConnectorType.TO));
&& fromJobForms.equals(mc.getJobForms(ConnectorType.FROM))
&& toJobForms.equals(mc.getJobForms(ConnectorType.TO));
}
@Override
@ -122,7 +120,16 @@ public MConnectionForms getConnectionForms() {
}
public MJobForms getJobForms(ConnectorType type) {
return jobForms.get(type);
switch(type) {
case FROM:
return fromJobForms;
case TO:
return toJobForms;
default:
throw new SqoopException(ConnectorTypeError.CONNECTOR_TYPE_0000, "Connector type: " + type);
}
}
public String getVersion() {

View File

@ -18,9 +18,8 @@
package org.apache.sqoop.model;
import org.apache.sqoop.common.ConnectorType;
import java.util.HashMap;
import java.util.Map;
import org.apache.sqoop.common.ConnectorTypeError;
import org.apache.sqoop.common.SqoopException;
/**
* Model describing entire job object including both connector and
@ -34,28 +33,33 @@ public class MJob extends MAccountableEntity implements MClonable {
* dependency through connection object, but having this dependency explicitly
* carried along helps a lot.
*/
private final Map<ConnectorType, Long> connectorIds;
private final long fromConnectorId;
private final long toConnectorId;
/**
* Corresponding connection objects for connector.
*/
private final Map<ConnectorType, Long> connectionIds;
private final long fromConnectionId;
private final long toConnectionId;
/**
* User name for this object
*/
private String name;
private final Map<ConnectorType, MJobForms> connectorParts;
private final MJobForms fromConnectorPart;
private final MJobForms toConnectorPart;
private final MJobForms frameworkPart;
/**
* Default constructor to build new MJob model.
*
* @param fromConnectorId Connector id
* @param fromConnectionId Connection id
* @param fromPart From Connector forms
* @param toPart To Connector forms
* @param fromConnectorId FROM Connector id
* @param toConnectorId TO Connector id
* @param fromConnectionId FROM Connection id
* @param toConnectionId TO Connection id
* @param fromPart FROM Connector forms
* @param toPart TO Connector forms
* @param frameworkPart Framework forms
*/
public MJob(long fromConnectorId,
@ -65,15 +69,12 @@ public MJob(long fromConnectorId,
MJobForms fromPart,
MJobForms toPart,
MJobForms frameworkPart) {
connectorIds = new HashMap<ConnectorType, Long>();
connectorIds.put(ConnectorType.FROM, fromConnectorId);
connectorIds.put(ConnectorType.TO, toConnectorId);
connectionIds = new HashMap<ConnectorType, Long>();
connectionIds.put(ConnectorType.FROM, fromConnectionId);
connectionIds.put(ConnectorType.TO, toConnectionId);
connectorParts = new HashMap<ConnectorType, MJobForms>();
connectorParts.put(ConnectorType.FROM, fromPart);
connectorParts.put(ConnectorType.TO, toPart);
this.fromConnectorId = fromConnectorId;
this.toConnectorId = toConnectorId;
this.fromConnectionId = fromConnectionId;
this.toConnectionId = toConnectionId;
this.fromConnectorPart = fromPart;
this.toConnectorPart = toPart;
this.frameworkPart = frameworkPart;
}
@ -96,22 +97,21 @@ public MJob(MJob other) {
* used otherwise.
*
* @param other MJob model to copy
* @param fromPart From Connector forms
* @param fromPart FROM Connector forms
* @param toPart TO Connector forms
* @param frameworkPart Framework forms
* @param toPart To Connector forms
*/
public MJob(MJob other, MJobForms fromPart, MJobForms frameworkPart, MJobForms toPart) {
public MJob(MJob other, MJobForms fromPart, MJobForms toPart, MJobForms frameworkPart) {
super(other);
connectorIds = new HashMap<ConnectorType, Long>();
connectorIds.put(ConnectorType.FROM, other.getConnectorId(ConnectorType.FROM));
connectorIds.put(ConnectorType.TO, other.getConnectorId(ConnectorType.TO));
connectionIds = new HashMap<ConnectorType, Long>();
connectorIds.put(ConnectorType.FROM, other.getConnectionId(ConnectorType.FROM));
connectorIds.put(ConnectorType.TO, other.getConnectionId(ConnectorType.TO));
connectorParts = new HashMap<ConnectorType, MJobForms>();
connectorParts.put(ConnectorType.FROM, fromPart);
connectorParts.put(ConnectorType.TO, toPart);
this.name = other.name;
this.fromConnectorId = other.getConnectorId(ConnectorType.FROM);
this.toConnectorId = other.getConnectorId(ConnectorType.TO);
this.fromConnectionId = other.getConnectionId(ConnectorType.FROM);
this.toConnectionId = other.getConnectionId(ConnectorType.TO);
this.fromConnectorPart = fromPart;
this.toConnectorPart = toPart;
this.frameworkPart = frameworkPart;
}
@ -134,15 +134,42 @@ public void setName(String name) {
}
public long getConnectionId(ConnectorType type) {
return connectionIds.get(type);
switch(type) {
case FROM:
return fromConnectionId;
case TO:
return toConnectionId;
default:
throw new SqoopException(ConnectorTypeError.CONNECTOR_TYPE_0000, "Connector type: " + type);
}
}
public long getConnectorId(ConnectorType type) {
return connectorIds.get(type);
switch(type) {
case FROM:
return fromConnectorId;
case TO:
return toConnectorId;
default:
throw new SqoopException(ConnectorTypeError.CONNECTOR_TYPE_0000, "Connector type: " + type);
}
}
public MJobForms getConnectorPart(ConnectorType type) {
return connectorParts.get(type);
switch(type) {
case FROM:
return fromConnectorPart;
case TO:
return toConnectorPart;
default:
throw new SqoopException(ConnectorTypeError.CONNECTOR_TYPE_0000, "Connector type: " + type);
}
}
public MJobForms getFrameworkPart() {

View File

@ -18,17 +18,17 @@
package org.apache.sqoop.framework;
import org.apache.sqoop.common.ConnectorType;
import org.apache.sqoop.common.ConnectorTypeError;
import org.apache.sqoop.common.MutableMapContext;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.idf.IntermediateDataFormat;
import org.apache.sqoop.connector.spi.SqoopConnector;
import org.apache.sqoop.job.etl.CallbackBase;
import org.apache.sqoop.model.MSubmission;
import org.apache.sqoop.utils.ClassUtils;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
/**
* Submission details class is used when creating new submission and contains
@ -53,9 +53,10 @@ public class SubmissionRequest {
long jobId;
/**
* Connector instance associated with this submission request
* Connector instances associated with this submission request
*/
Map<ConnectorType, SqoopConnector > connectors;
SqoopConnector fromConnector;
SqoopConnector toConnector;
/**
* List of required local jars for the job
@ -75,15 +76,19 @@ public class SubmissionRequest {
/**
* All configuration objects
*/
Map<ConnectorType, Object> connectorConnectionConfigs;
Map<ConnectorType, Object> connectorJobConfigs;
Map<ConnectorType, Object> frameworkConnectionConfigs;
Object fromConnectorConnectionConfig;
Object toConnectorConnectionConfig;
Object fromConnectorJobConfig;
Object toConnectorJobConfig;
Object fromFrameworkConnectionConfig;
Object toFrameworkConnectionConfig;
Object configFrameworkJob;
/**
* Connector context (submission specific configuration)
*/
Map<ConnectorType, MutableMapContext> connectorContexts;
MutableMapContext fromConnectorContext;
MutableMapContext toConnectorContext;
/**
* Framework context (submission specific configuration)
@ -117,17 +122,17 @@ public class SubmissionRequest {
public SubmissionRequest() {
this.jars = new LinkedList<String>();
this.connectorContexts = new HashMap<ConnectorType, MutableMapContext>();
this.connectorContexts.put(ConnectorType.FROM, new MutableMapContext());
this.connectorContexts.put(ConnectorType.TO, new MutableMapContext());
this.fromConnectorContext = new MutableMapContext();
this.toConnectorContext = new MutableMapContext();
this.frameworkContext = new MutableMapContext();
this.connectorConnectionConfigs = new HashMap<ConnectorType, Object>();
this.connectorJobConfigs = new HashMap<ConnectorType, Object>();
this.frameworkConnectionConfigs = new HashMap<ConnectorType, Object>();
this.connectors = new HashMap<ConnectorType, SqoopConnector>();
this.fromConnector = null;
this.toConnector = null;
this.fromConnectorConnectionConfig = null;
this.toConnectorConnectionConfig = null;
this.fromConnectorJobConfig = null;
this.toConnectorJobConfig = null;
this.fromFrameworkConnectionConfig = null;
this.toFrameworkConnectionConfig = null;
}
public MSubmission getSummary() {
@ -155,11 +160,29 @@ public void setJobId(long jobId) {
}
public SqoopConnector getConnector(ConnectorType type) {
return connectors.get(type);
switch(type) {
case FROM:
return fromConnector;
case TO:
return toConnector;
default:
throw new SqoopException(ConnectorTypeError.CONNECTOR_TYPE_0000, "Connector type: " + type);
}
}
public void setConnector(ConnectorType type, SqoopConnector connector) {
this.connectors.put(type, connector);
switch(type) {
case FROM:
fromConnector = connector;
case TO:
toConnector = connector;
default:
throw new SqoopException(ConnectorTypeError.CONNECTOR_TYPE_0000, "Connector type: " + type);
}
}
public List<String> getJars() {
@ -199,27 +222,81 @@ public void setToCallback(CallbackBase toCallback) {
}
public Object getConnectorConnectionConfig(ConnectorType type) {
return connectorConnectionConfigs.get(type);
switch(type) {
case FROM:
return fromConnectorConnectionConfig;
case TO:
return toConnectorConnectionConfig;
default:
throw new SqoopException(ConnectorTypeError.CONNECTOR_TYPE_0000, "Connector type: " + type);
}
}
public void setConnectorConnectionConfig(ConnectorType type, Object config) {
connectorConnectionConfigs.put(type, config);
switch(type) {
case FROM:
fromConnectorConnectionConfig = config;
case TO:
toConnectorConnectionConfig = config;
default:
throw new SqoopException(ConnectorTypeError.CONNECTOR_TYPE_0000, "Connector type: " + type);
}
}
public Object getConnectorJobConfig(ConnectorType type) {
return connectorJobConfigs.get(type);
switch(type) {
case FROM:
return fromConnectorJobConfig;
case TO:
return toConnectorJobConfig;
default:
throw new SqoopException(ConnectorTypeError.CONNECTOR_TYPE_0000, "Connector type: " + type);
}
}
public void setConnectorJobConfig(ConnectorType type, Object config) {
connectorJobConfigs.put(type, config);
switch(type) {
case FROM:
fromConnectorJobConfig = config;
case TO:
toConnectorJobConfig = config;
default:
throw new SqoopException(ConnectorTypeError.CONNECTOR_TYPE_0000, "Connector type: " + type);
}
}
public Object getFrameworkConnectionConfig(ConnectorType type) {
return frameworkConnectionConfigs.get(type);
switch(type) {
case FROM:
return fromFrameworkConnectionConfig;
case TO:
return toFrameworkConnectionConfig;
default:
throw new SqoopException(ConnectorTypeError.CONNECTOR_TYPE_0000, "Connector type: " + type);
}
}
public void setFrameworkConnectionConfig(ConnectorType type, Object config) {
frameworkConnectionConfigs.put(type, config);
switch(type) {
case FROM:
fromFrameworkConnectionConfig = config;
case TO:
toFrameworkConnectionConfig = config;
default:
throw new SqoopException(ConnectorTypeError.CONNECTOR_TYPE_0000, "Connector type: " + type);
}
}
public Object getConfigFrameworkJob() {
@ -231,7 +308,16 @@ public void setConfigFrameworkJob(Object config) {
}
public MutableMapContext getConnectorContext(ConnectorType type) {
return connectorContexts.get(type);
switch(type) {
case FROM:
return fromConnectorContext;
case TO:
return toConnectorContext;
default:
throw new SqoopException(ConnectorTypeError.CONNECTOR_TYPE_0000, "Connector type: " + type);
}
}
public MutableMapContext getFrameworkContext() {

View File

@ -450,7 +450,7 @@ public final void upgradeConnector(MConnector oldConnector, MConnector newConnec
MJobForms newJobForms = new MJobForms(forms);
upgrader.upgrade(job.getConnectorPart(ConnectorType.FROM), newJobForms);
// @TODO(Abe): Check From and To
MJob newJob = new MJob(job, newJobForms, job.getFrameworkPart(), newJobForms);
MJob newJob = new MJob(job, newJobForms, newJobForms, job.getFrameworkPart());
// Transform form structures to objects for validations
// @TODO(Abe): Check From and To
@ -536,7 +536,7 @@ public final void upgradeFramework(MFramework framework) {
List<MForm> forms = framework.getJobForms().clone(false).getForms();
MJobForms newJobForms = new MJobForms(forms);
upgrader.upgrade(job.getFrameworkPart(), newJobForms);
MJob newJob = new MJob(job, job.getConnectorPart(ConnectorType.FROM), newJobForms, job.getConnectorPart(ConnectorType.TO));
MJob newJob = new MJob(job, job.getConnectorPart(ConnectorType.FROM), job.getConnectorPart(ConnectorType.TO), newJobForms);
// Transform form structures to objects for validations
Object newConfigurationObject = ClassUtils.instantiate(FrameworkManager.getInstance().getJobConfigurationClass());

View File

@ -29,11 +29,9 @@
import java.sql.Types;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.sql.DataSource;
@ -41,6 +39,7 @@
import org.apache.log4j.Logger;
import org.apache.commons.lang.StringUtils;
import org.apache.sqoop.common.ConnectorType;
import org.apache.sqoop.common.ConnectorTypeError;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.model.MBooleanInput;
import org.apache.sqoop.model.MConnection;
@ -1619,14 +1618,16 @@ private List<MConnector> loadConnectors(PreparedStatement stmt,Connection conn)
formFetchStmt.setLong(1, connectorId);
List<MForm> connectionForms = new ArrayList<MForm>();
Map<ConnectorType, List<MForm>> jobForms = new HashMap<ConnectorType, List<MForm>>();
List<MForm> fromJobForms = new ArrayList<MForm>();
List<MForm> toJobForms = new ArrayList<MForm>();
loadConnectorForms(connectionForms, jobForms, formFetchStmt, inputFetchStmt, 1);
loadConnectorForms(connectionForms, fromJobForms, toJobForms,
formFetchStmt, inputFetchStmt, 1);
MConnector mc = new MConnector(connectorName, connectorClassName, connectorVersion,
new MConnectionForms(connectionForms),
new MJobForms(jobForms.get(ConnectorType.FROM)),
new MJobForms(jobForms.get(ConnectorType.TO)));
new MJobForms(fromJobForms),
new MJobForms(toJobForms));
mc.setPersistenceId(connectorId);
connectors.add(mc);
@ -1674,9 +1675,10 @@ private List<MConnection> loadConnections(PreparedStatement stmt,
List<MForm> connectorConnForms = new ArrayList<MForm>();
List<MForm> frameworkConnForms = new ArrayList<MForm>();
List<MForm> frameworkJobForms = new ArrayList<MForm>();
Map<ConnectorType, List<MForm>> connectorJobForms = new HashMap<ConnectorType, List<MForm>>();
List<MForm> fromJobForms = new ArrayList<MForm>();
List<MForm> toJobForms = new ArrayList<MForm>();
loadConnectorForms(connectorConnForms, connectorJobForms,
loadConnectorForms(connectorConnForms, fromJobForms, toJobForms,
formConnectorFetchStmt, inputFetchStmt, 2);
loadForms(frameworkConnForms, frameworkJobForms,
formFrameworkFetchStmt, inputFetchStmt, 2);
@ -1742,9 +1744,10 @@ private List<MJob> loadJobs(PreparedStatement stmt,
List<MForm> connectorConnForms = new ArrayList<MForm>();
List<MForm> frameworkConnForms = new ArrayList<MForm>();
List<MForm> frameworkJobForms = new ArrayList<MForm>();
Map<ConnectorType, List<MForm>> connectorJobForms = new HashMap<ConnectorType, List<MForm>>();
List<MForm> fromJobForms = new ArrayList<MForm>();
List<MForm> toJobForms = new ArrayList<MForm>();
loadConnectorForms(connectorConnForms, connectorJobForms,
loadConnectorForms(connectorConnForms, fromJobForms, toJobForms,
formConnectorFetchStmt, inputFetchStmt, 2);
loadForms(frameworkConnForms, frameworkJobForms,
formFrameworkFetchStmt, inputFetchStmt, 2);
@ -1752,8 +1755,8 @@ private List<MJob> loadJobs(PreparedStatement stmt,
MJob job = new MJob(
fromConnectorId, toConnectorId,
fromConnectionId, toConnectionId,
new MJobForms(connectorJobForms.get(ConnectorType.FROM)),
new MJobForms(connectorJobForms.get(ConnectorType.TO)),
new MJobForms(fromJobForms),
new MJobForms(toJobForms),
new MJobForms(frameworkJobForms));
job.setPersistenceId(id);
@ -2043,13 +2046,15 @@ public void loadForms(List<MForm> connectionForms,
* from Derby.
*
* @param connectionForms List of connection forms that will be filled up
* @param jobForms Map with job forms that will be filled up
* @param fromJobForms FROM job forms that will be filled up
* @param toJobForms TO job forms that will be filled up
* @param formFetchStmt Prepared statement for fetching forms
* @param inputFetchStmt Prepare statement for fetching inputs
* @throws SQLException In case of any failure on Derby side
*/
public void loadConnectorForms(List<MForm> connectionForms,
Map<ConnectorType, List<MForm>> jobForms,
List<MForm> fromJobForms,
List<MForm> toJobForms,
PreparedStatement formFetchStmt,
PreparedStatement inputFetchStmt,
int formPosition) throws SQLException {
@ -2151,20 +2156,30 @@ public void loadConnectorForms(List<MForm> connectionForms,
break;
case JOB:
ConnectorType type = ConnectorType.valueOf(operation);
if (!jobForms.containsKey(type)) {
jobForms.put(type, new ArrayList<MForm>());
List<MForm> jobForms;
switch(type) {
case FROM:
jobForms = fromJobForms;
break;
case TO:
jobForms = toJobForms;
break;
default:
throw new SqoopException(ConnectorTypeError.CONNECTOR_TYPE_0000, "Connector type: " + type);
}
if (jobForms.get(type).size() != formIndex) {
if (jobForms.size() != formIndex) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0010,
"connector-" + formConnectorId
+ "; form: " + mf
+ "; index: " + formIndex
+ "; expected: " + jobForms.get(type).size()
+ "; expected: " + jobForms.size()
);
}
jobForms.get(type).add(mf);
jobForms.add(mf);
break;
default:
throw new SqoopException(DerbyRepoError.DERBYREPO_0007,