5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-11 22:41:50 +08:00

SQOOP-535: Support splitting metadata to connector and framework specific

git-svn-id: https://svn.apache.org/repos/asf/sqoop/branches/sqoop2@1370814 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bilung Lee 2012-08-08 15:41:15 +00:00
parent b5d0e56aa0
commit 517c2c1bb7
13 changed files with 775 additions and 231 deletions

View File

@ -17,30 +17,30 @@
*/
package org.apache.sqoop.model;
import java.util.ArrayList;
import java.util.List;
public final class MConnector extends MPersistableEntity {
/**
* Connector metadata.
*
* Includes unique id that identifies connector in metadata store, unique human
* readable name, corresponding name and all forms for both connections and
* jobs.
*/
public final class MConnector extends MFramework {
private final String uniqueName;
private final String className;
private final List<MForm> connectionForms;
private final List<MForm> jobForms;
public MConnector(String uniqueName, String className,
List<MForm> connectionForms, List<MForm> jobForms) {
super(connectionForms, jobForms);
if (uniqueName == null || className == null) {
throw new NullPointerException();
}
this.uniqueName = uniqueName;
this.className = className;
this.connectionForms = new ArrayList<MForm>(connectionForms.size());
this.connectionForms.addAll(connectionForms);
this.jobForms = new ArrayList<MForm>(jobForms.size());
this.jobForms.addAll(jobForms);
}
public String getUniqueName() {
@ -55,8 +55,8 @@ public String getClassName() {
public String toString() {
StringBuilder sb = new StringBuilder("connector-");
sb.append(uniqueName).append(":").append(getPersistenceId()).append(":");
sb.append(className).append("; conn-forms:").append(connectionForms);
sb.append("; job-forms:").append(jobForms);
sb.append(className).append("; conn-forms:").append(getConnectionForms());
sb.append("; job-forms:").append(getJobForms());
return sb.toString();
}
@ -74,30 +74,15 @@ public boolean equals(Object other) {
MConnector mc = (MConnector) other;
return (uniqueName.equals(mc.uniqueName)
&& className.equals(mc.className))
&& connectionForms.equals(mc.connectionForms)
&& jobForms.equals(mc.jobForms);
&& super.equals(other);
}
@Override
public int hashCode() {
int result = 23;
int result = super.hashCode();
result = 31 * result + uniqueName.hashCode();
result = 31 * result + className.hashCode();
for (MForm cmf : connectionForms) {
result = 31 * result + cmf.hashCode();
}
for (MForm jmf : jobForms) {
result = 31 * result + jmf.hashCode();
}
return result;
}
public List<MForm> getConnectionForms() {
return connectionForms;
}
public List<MForm> getJobForms() {
return jobForms;
}
}

View File

@ -0,0 +1,84 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.model;
import java.util.ArrayList;
import java.util.List;
/**
* Metadata describing framework options for connections and jobs.
*/
public class MFramework extends MPersistableEntity {
private final List<MForm> connectionForms;
private final List<MForm> jobForms;
public MFramework(List<MForm> connectionForms, List<MForm> jobForms) {
this.connectionForms = new ArrayList<MForm>(connectionForms.size());
this.connectionForms.addAll(connectionForms);
this.jobForms = new ArrayList<MForm>(jobForms.size());
this.jobForms.addAll(jobForms);
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder("framework-");
sb.append(getPersistenceId()).append(":");
sb.append("; conn-forms:").append(connectionForms);
sb.append("; job-forms:").append(jobForms);
return sb.toString();
}
@Override
public boolean equals(Object other) {
if (other == this) {
return true;
}
if (!(other instanceof MFramework)) {
return false;
}
MFramework mc = (MFramework) other;
return connectionForms.equals(mc.connectionForms)
&& jobForms.equals(mc.jobForms);
}
@Override
public int hashCode() {
int result = 23;
for (MForm cmf : connectionForms) {
result = 31 * result + cmf.hashCode();
}
for (MForm jmf : jobForms) {
result = 31 * result + jmf.hashCode();
}
return result;
}
public List<MForm> getConnectionForms() {
return connectionForms;
}
public List<MForm> getJobForms() {
return jobForms;
}
}

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.framework;
/**
* Constants that are used in framework module.
*/
public class FrameworkConstants {
public static final String INPUT_CONN_MAX_SIMULTANEOUS_CONNECTIONS =
"inp-conn-max-connections";
public static final String INPUT_CONN_MAX_OUTPUT_FORMAT=
"inp-conn-output-format";
public static final String FORM_SECURITY =
"form-security";
public static final String FORM_OUTPUT =
"form-output";
}

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.framework;
import org.apache.sqoop.common.ErrorCode;
/**
*
*/
public enum FrameworkError implements ErrorCode {
FRAMEWORK_0000("Metadata are not registered in repository");
private final String message;
private FrameworkError(String message) {
this.message = message;
}
public String getCode() {
return name();
}
public String getMessage() {
return message;
}
}

View File

@ -0,0 +1,87 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.framework;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.model.MForm;
import org.apache.sqoop.model.MFramework;
import org.apache.sqoop.model.MInput;
import org.apache.sqoop.model.MStringInput;
import org.apache.sqoop.repository.RepositoryManager;
import static org.apache.sqoop.framework.FrameworkConstants.*;
import java.util.ArrayList;
import java.util.List;
/**
* Manager for Sqoop framework itself.
*
* All Sqoop internals (job execution engine, metadata) should be handled
* within this manager.
*
*/
public class FrameworkManager {
private static final Logger LOG = Logger.getLogger(FrameworkManager.class);
private static final List<MForm> CONNECTION_FORMS = new ArrayList<MForm>();
private static final List<MForm> JOB_FORMS = new ArrayList<MForm>();
private static MFramework mFramework;
static {
// Build the connection forms
List<MInput<?>> connFormInputs = new ArrayList<MInput<?>>();
MStringInput maxConnections = new MStringInput(
INPUT_CONN_MAX_SIMULTANEOUS_CONNECTIONS, false, (short) 10);
connFormInputs.add(maxConnections);
MForm connForm = new MForm(FORM_SECURITY, connFormInputs);
CONNECTION_FORMS.add(connForm);
// Build job forms
List<MInput<?>> jobFormInputs = new ArrayList<MInput<?>>();
MStringInput outputFormat = new MStringInput(INPUT_CONN_MAX_OUTPUT_FORMAT,
false, (short) 25);
jobFormInputs.add(outputFormat);
MForm jobForm = new MForm(FORM_OUTPUT, jobFormInputs);
JOB_FORMS.add(jobForm);
}
public static synchronized void initialize() {
LOG.trace("Begin connector manager initialization");
// Register framework metadata
mFramework = new MFramework(CONNECTION_FORMS, JOB_FORMS);
RepositoryManager.getRepository().registerFramework(mFramework);
if (!mFramework.hasPersistenceId()) {
throw new SqoopException(FrameworkError.FRAMEWORK_0000);
}
}
public static synchronized void destroy() {
LOG.trace("Begin framework manager destroy");
}
}

View File

@ -21,8 +21,8 @@
import org.apache.log4j.Logger;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.ConnectorHandler;
import org.apache.sqoop.model.MConnector;
import org.apache.sqoop.model.MFramework;
public class JdbcRepository implements Repository {
@ -38,11 +38,17 @@ protected JdbcRepository(JdbcRepositoryHandler handler,
this.repoContext = repoContext;
}
/**
* {@inheritDoc}
*/
@Override
public JdbcRepositoryTransaction getTransaction() {
return repoContext.getTransactionFactory().get();
}
/**
* {@inheritDoc}
*/
@Override
public MConnector registerConnector(MConnector mConnector) {
MConnector result = null;
@ -80,4 +86,43 @@ public MConnector registerConnector(MConnector mConnector) {
return result;
}
/**
* {@inheritDoc}
*/
@Override
public void registerFramework(MFramework mFramework) {
MFramework result = null;
JdbcRepositoryTransaction tx = null;
try {
tx = getTransaction();
tx.begin();
Connection conn = tx.getConnection();
result = handler.findFramework(conn);
if (result == null) {
handler.registerFramework(mFramework, conn);
} else {
if (!result.equals(mFramework)) {
throw new SqoopException(RepositoryError.JDBCREPO_0014,
"given[" + mFramework + "] found[" + result + "]");
}
mFramework.setPersistenceId(result.getPersistenceId());
}
tx.commit();
} catch (Exception ex) {
if (tx != null) {
tx.rollback();
}
if (ex instanceof SqoopException) {
throw (SqoopException) ex;
}
throw new SqoopException(RepositoryError.JDBCREPO_0012,
mFramework.toString(), ex);
} finally {
if (tx != null) {
tx.close();
}
}
}
}

View File

@ -20,18 +20,79 @@
import java.sql.Connection;
import org.apache.sqoop.model.MConnector;
import org.apache.sqoop.model.MFramework;
/**
* Set of methods required from each JDBC based repository.
*/
public interface JdbcRepositoryHandler {
/**
* Initialize JDBC based repository.
*
* @param repoContext Context for this instance
*/
public void initialize(JdbcRepositoryContext repoContext);
/**
* Search for connector with given name in repository.
*
* And return corresponding metadata structure.
*
* @param shortName Connector unique name
* @param conn JDBC connection for querying repository.
* @return null if connector is not yet registered in repository or
* loaded representation.
*/
public MConnector findConnector(String shortName, Connection conn);
/**
* Register given connector in repository.
*
* Save given connector data to the repository. Given connector should not be
* already registered or present in the repository.
*
* @param mc Connector that should be registered.
* @param conn JDBC connection for querying repository.
*/
public void registerConnector(MConnector mc, Connection conn);
/**
* Search for framework metadata in the repository.
*
* @param conn JDBC connection for querying repository.
* @return null if framework metadata are not yet present in repository or
* loaded representation.
*/
public MFramework findFramework(Connection conn);
/**
* Register framework metadata in repository.
*
* Save framework metadata into repository. Metadata should not be already
* registered or present in the repository.
*
* @param mf Framework metadata that should be registered.
* @param conn JDBC connection for querying repository.
*/
public void registerFramework(MFramework mf, Connection conn);
/**
* Check if schema is already present in the repository.
*
* @return true if schema is already present or false if it's not
*/
public boolean schemaExists();
/**
* Create required schema in repository.
*/
public void createSchema();
/**
* Termination callback for repository.
*
* Should clean up all resources and commit all uncommitted data.
*/
public void shutdown();
}

View File

@ -18,6 +18,7 @@
package org.apache.sqoop.repository;
import org.apache.sqoop.model.MConnector;
import org.apache.sqoop.model.MFramework;
/**
@ -34,10 +35,24 @@ public interface Repository {
* already registered, its associated metadata is returned from the
* repository.
*
* Method will set persistent ID of given MConnector instance in case of a
* success.
*
* @param mConnector the connector metadata to be registered
* @return <tt>null</tt> if the connector was successfully registered or
* a instance of previously registered metadata with the same connector
* unique name.
*/
public MConnector registerConnector(MConnector mConnector);
/**
* Registers framework metadata in the repository. No more than one set of
* framework metadata structure is allowed.
*
* Method will set persistent ID of given MFramework instance in case of a
* success.
*
* @param mFramework Framework data that should be registered.
*/
public void registerFramework(MFramework mFramework);
}

View File

@ -73,12 +73,14 @@ public enum RepositoryError implements ErrorCode {
*/
JDBCREPO_0011("Attempt to reinitialize JDBC repository context"),
/** The system was unable to register the connector in its repository. */
JDBCREPO_0012("Failed to register connector in repository"),
/** The system was unable to register metadata in its repository. */
JDBCREPO_0012("Failure in repository metadata registration process."),
/** The system found a change in connector metadata that requires upgrade. */
JDBCREPO_0013("Connector metadata changed - upgrade may be required");
JDBCREPO_0013("Connector metadata changed - upgrade may be required"),
/** The system found a change in framework metadata that requires upgrade. */
JDBCREPO_0014("Framework metadata changed - upgrade may be required");
private final String message;

View File

@ -33,8 +33,8 @@ public enum DerbyRepoError implements ErrorCode {
/** The system was unable to run the specified query. */
DERBYREPO_0003("Unable to run specified query"),
/** The system was unable to query the repository for connector metadata. */
DERBYREPO_0004("Unable to retrieve connector metadata"),
/** The system was unable to query the repository for metadata. */
DERBYREPO_0004("Unable to retrieve metadata"),
/** The metadata repository contains more than one connector with same name */
DERBYREPO_0005("Invalid metadata state - multiple connectors with name"),
@ -58,10 +58,10 @@ public enum DerbyRepoError implements ErrorCode {
DERBYREPO_0010("The form retrieved does not match expteced position"),
/**
* The system was not able to register connector metadata due to a
* pre-assigned persistence identifier.
* The system was not able to register metadata due to a pre-assigned
* persistence identifier.
*/
DERBYREPO_0011("Connector metadata cannot have preassigned persistence id"),
DERBYREPO_0011("Metadata cannot have preassigned persistence id"),
/**
* The system was unable to register connector metadata due to an unexpected

View File

@ -18,11 +18,6 @@
package org.apache.sqoop.repository.derby;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.*;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_CONNECTOR;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_FORM;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_INPUT;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.STMT_FETCH_BASE_CONNECTOR;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.STMT_FETCH_FORM;
import java.sql.Connection;
import java.sql.DriverManager;
@ -30,6 +25,7 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.util.ArrayList;
import java.util.List;
@ -40,6 +36,7 @@
import org.apache.sqoop.model.MConnector;
import org.apache.sqoop.model.MForm;
import org.apache.sqoop.model.MFormType;
import org.apache.sqoop.model.MFramework;
import org.apache.sqoop.model.MInput;
import org.apache.sqoop.model.MInputType;
import org.apache.sqoop.model.MMapInput;
@ -48,6 +45,11 @@
import org.apache.sqoop.repository.JdbcRepositoryHandler;
import org.apache.sqoop.repository.JdbcRepositoryTransactionFactory;
/**
* JDBC based repository handler for Derby database.
*
* Repository implementation for Derby database.
*/
public class DerbyRepositoryHandler implements JdbcRepositoryHandler {
private static final Logger LOG =
@ -62,11 +64,13 @@ public class DerbyRepositoryHandler implements JdbcRepositoryHandler {
private static final String EMBEDDED_DERBY_DRIVER_CLASSNAME =
"org.apache.derby.jdbc.EmbeddedDriver";
private JdbcRepositoryContext repoContext;
private DataSource dataSource;
private JdbcRepositoryTransactionFactory txFactory;
/**
* {@inheritDoc}
*/
@Override
public void registerConnector(MConnector mc, Connection conn) {
if (mc.hasPersistenceId()) {
@ -139,64 +143,9 @@ public void registerConnector(MConnector mc, Connection conn) {
}
}
private void registerForms(long connectorId, List<MForm> forms, String type,
PreparedStatement baseFormStmt, PreparedStatement baseInputStmt)
throws SQLException {
short formIndex = 0;
for (MForm form : forms) {
baseFormStmt.setLong(1, connectorId);
baseFormStmt.setString(2, form.getName());
baseFormStmt.setString(3, type);
baseFormStmt.setShort(4, formIndex++);
int baseFormCount = baseFormStmt.executeUpdate();
if (baseFormCount != 1) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0015,
new Integer(baseFormCount).toString());
}
ResultSet rsetFormId = baseFormStmt.getGeneratedKeys();
if (!rsetFormId.next()) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0016);
}
long formId = rsetFormId.getLong(1);
form.setPersistenceId(formId);
// Insert all the inputs
List<MInput<?>> inputs = form.getInputs();
registerFormInputs(formId, inputs, baseInputStmt);
}
}
private void registerFormInputs(long formId, List<MInput<?>> inputs,
PreparedStatement baseInputStmt) throws SQLException {
short inputIndex = 0;
for (MInput<?> input : inputs) {
baseInputStmt.setString(1, input.getName());
baseInputStmt.setLong(2, formId);
baseInputStmt.setShort(3, inputIndex++);
baseInputStmt.setString(4, input.getType().name());
if (input.getType().equals(MInputType.STRING)) {
MStringInput strInput = (MStringInput) input;
baseInputStmt.setBoolean(5, strInput.isMasked());
baseInputStmt.setShort(6, strInput.getMaxLength());
}
int baseInputCount = baseInputStmt.executeUpdate();
if (baseInputCount != 1) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0017,
new Integer(baseInputCount).toString());
}
ResultSet rsetInputId = baseInputStmt.getGeneratedKeys();
if (!rsetInputId.next()) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0018);
}
long inputId = rsetInputId.getLong(1);
input.setPersistenceId(inputId);
}
}
/**
* {@inheritDoc}
*/
@Override
public synchronized void initialize(JdbcRepositoryContext ctx) {
repoContext = ctx;
@ -205,6 +154,9 @@ public synchronized void initialize(JdbcRepositoryContext ctx) {
LOG.info("DerbyRepositoryHandler initialized.");
}
/**
* {@inheritDoc}
*/
@Override
public synchronized void shutdown() {
String driver = repoContext.getDriverClass();
@ -244,6 +196,10 @@ public synchronized void shutdown() {
}
}
/**
* {@inheritDoc}
*/
@Override
public void createSchema() {
runQuery(QUERY_CREATE_SCHEMA_SQOOP);
runQuery(QUERY_CREATE_TABLE_SQ_CONNECTOR);
@ -251,6 +207,10 @@ public void createSchema() {
runQuery(QUERY_CREATE_TABLE_SQ_INPUT);
}
/**
* {@inheritDoc}
*/
@Override
public boolean schemaExists() {
Connection connection = null;
Statement stmt = null;
@ -295,7 +255,275 @@ public boolean schemaExists() {
return true;
}
/**
* {@inheritDoc}
*/
@Override
public MConnector findConnector(String shortName, Connection conn) {
if (LOG.isDebugEnabled()) {
LOG.debug("Looking up connector: " + shortName);
}
MConnector mc = null;
PreparedStatement baseConnectorFetchStmt = null;
PreparedStatement formFetchStmt = null;
PreparedStatement inputFetchStmt = null;
try {
baseConnectorFetchStmt = conn.prepareStatement(STMT_FETCH_BASE_CONNECTOR);
baseConnectorFetchStmt.setString(1, shortName);
ResultSet rsetBaseConnector = baseConnectorFetchStmt.executeQuery();
if (!rsetBaseConnector.next()) {
LOG.debug("No connector found by name: " + shortName);
return null;
}
long connectorId = rsetBaseConnector.getLong(1);
String connectorName = rsetBaseConnector.getString(2);
String connectorClassName = rsetBaseConnector.getString(3);
List<MForm> connectionForms = new ArrayList<MForm>();
List<MForm> jobForms = new ArrayList<MForm>();
formFetchStmt = conn.prepareStatement(STMT_FETCH_FORM_CONNECTOR);
formFetchStmt.setLong(1, connectorId);
inputFetchStmt = conn.prepareStatement(STMT_FETCH_INPUT);
loadForms(shortName, formFetchStmt, inputFetchStmt,
connectionForms, jobForms);
mc = new MConnector(connectorName, connectorClassName,
connectionForms, jobForms);
mc.setPersistenceId(connectorId);
if (rsetBaseConnector.next()) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0005, shortName);
}
} catch (SQLException ex) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0004, shortName, ex);
} finally {
if (baseConnectorFetchStmt != null) {
try {
baseConnectorFetchStmt.close();
} catch (SQLException ex) {
LOG.error("Unable to close base connector fetch statement", ex);
}
}
if (formFetchStmt != null) {
try {
formFetchStmt.close();
} catch (SQLException ex) {
LOG.error("Unable to close form fetch statement", ex);
}
}
if (inputFetchStmt != null) {
try {
inputFetchStmt.close();
} catch (SQLException ex) {
LOG.error("Unable to close input fetch statement", ex);
}
}
}
LOG.debug("Looking up connector: " + shortName + ", found: " + mc);
return mc;
}
/**
* {@inheritDoc}
*/
@Override
public void registerFramework(MFramework mf, Connection conn) {
if (mf.hasPersistenceId()) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0011,
"Framework metadata");
}
PreparedStatement baseFormStmt = null;
PreparedStatement baseInputStmt = null;
try {
baseFormStmt = conn.prepareStatement(STMT_INSERT_FORM_BASE,
Statement.RETURN_GENERATED_KEYS);
baseInputStmt = conn.prepareStatement(STMT_INSERT_INPUT_BASE,
Statement.RETURN_GENERATED_KEYS);
// Insert connection forms
registerForms(null, mf.getConnectionForms(),
MFormType.CONNECTION.name(), baseFormStmt, baseInputStmt);
registerForms(null, mf.getJobForms(),
MFormType.JOB.name(), baseFormStmt, baseInputStmt);
// We're using hardcoded value for framework metadata as they are
// represented as NULL in the database.
mf.setPersistenceId(1);
} catch (SQLException ex) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0014,
mf.toString(), ex);
} finally {
if (baseFormStmt != null) {
try {
baseFormStmt.close();
} catch (SQLException ex) {
LOG.error("Unable to close base form statement", ex);
}
}
if (baseInputStmt != null) {
try {
baseInputStmt.close();
} catch (SQLException ex) {
LOG.error("Unable to close base input statement", ex);
}
}
}
}
/**
* {@inheritDoc}
*/
@Override
public MFramework findFramework(Connection conn) {
LOG.debug("Looking up framework metadata");
MFramework mf = null;
PreparedStatement formFetchStmt = null;
PreparedStatement inputFetchStmt = null;
try {
List<MForm> connectionForms = new ArrayList<MForm>();
List<MForm> jobForms = new ArrayList<MForm>();
formFetchStmt = conn.prepareStatement(STMT_FETCH_FORM_FRAMEWORK);
inputFetchStmt = conn.prepareStatement(STMT_FETCH_INPUT);
loadForms("Framework metadata", formFetchStmt, inputFetchStmt,
connectionForms, jobForms);
mf = new MFramework(connectionForms, jobForms);
// We're using hardcoded value for framework metadata as they are
// represented as NULL in the database.
mf.setPersistenceId(1);
} catch (SQLException ex) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0004,
"Framework metadata", ex);
} finally {
if (formFetchStmt != null) {
try {
formFetchStmt.close();
} catch (SQLException ex) {
LOG.error("Unable to close form fetch statement", ex);
}
}
if (inputFetchStmt != null) {
try {
inputFetchStmt.close();
} catch (SQLException ex) {
LOG.error("Unable to close input fetch statement", ex);
}
}
}
LOG.debug("Looking up framework metadta found: " + mf);
// If there aren't any framework metadata
if(mf.getConnectionForms().size() == 0 && mf.getJobForms().size() == 0) {
return null;
}
// Returned loaded framework metadata
return mf;
}
/**
* Register forms in derby database.
*
* Use given prepared statements to create entire form structure in database.
*
* @param connectorId
* @param forms
* @param type
* @param baseFormStmt
* @param baseInputStmt
* @throws SQLException
*/
private void registerForms(Long connectorId, List<MForm> forms, String type,
PreparedStatement baseFormStmt, PreparedStatement baseInputStmt)
throws SQLException {
short formIndex = 0;
for (MForm form : forms) {
if(connectorId == null) {
baseFormStmt.setNull(1, Types.BIGINT);
} else {
baseFormStmt.setLong(1, connectorId);
}
baseFormStmt.setString(2, form.getName());
baseFormStmt.setString(3, type);
baseFormStmt.setShort(4, formIndex++);
int baseFormCount = baseFormStmt.executeUpdate();
if (baseFormCount != 1) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0015,
new Integer(baseFormCount).toString());
}
ResultSet rsetFormId = baseFormStmt.getGeneratedKeys();
if (!rsetFormId.next()) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0016);
}
long formId = rsetFormId.getLong(1);
form.setPersistenceId(formId);
// Insert all the inputs
List<MInput<?>> inputs = form.getInputs();
registerFormInputs(formId, inputs, baseInputStmt);
}
}
/**
* Save given inputs to the database.
*
* Use given prepare statement to save all inputs into repository.
*
* @param formId Identifier for corresponding form
* @param inputs List of inputs that needs to be saved
* @param baseInputStmt Statement that we can utilize
* @throws SQLException In case of any failure on Derby side
*/
private void registerFormInputs(long formId, List<MInput<?>> inputs,
PreparedStatement baseInputStmt) throws SQLException {
short inputIndex = 0;
for (MInput<?> input : inputs) {
baseInputStmt.setString(1, input.getName());
baseInputStmt.setLong(2, formId);
baseInputStmt.setShort(3, inputIndex++);
baseInputStmt.setString(4, input.getType().name());
if (input.getType().equals(MInputType.STRING)) {
MStringInput strInput = (MStringInput) input;
baseInputStmt.setBoolean(5, strInput.isMasked());
baseInputStmt.setShort(6, strInput.getMaxLength());
}
int baseInputCount = baseInputStmt.executeUpdate();
if (baseInputCount != 1) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0017,
new Integer(baseInputCount).toString());
}
ResultSet rsetInputId = baseInputStmt.getGeneratedKeys();
if (!rsetInputId.next()) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0018);
}
long inputId = rsetInputId.getLong(1);
input.setPersistenceId(inputId);
}
}
/**
* Execute given query on database.
*
* Passed query will be executed in it's own transaction
*
* @param query Query that should be executed
*/
private void runQuery(String query) {
Connection connection = null;
Statement stmt = null;
@ -341,148 +569,99 @@ private void runQuery(String query) {
}
}
@Override
public MConnector findConnector(String shortName, Connection conn) {
if (LOG.isDebugEnabled()) {
LOG.debug("Looking up connector: " + shortName);
}
MConnector mc = null;
PreparedStatement baseConnectorFetchStmt = null;
PreparedStatement formFetchStmt = null;
PreparedStatement inputFetchStmt = null;
try {
baseConnectorFetchStmt = conn.prepareStatement(STMT_FETCH_BASE_CONNECTOR);
baseConnectorFetchStmt.setString(1, shortName);
ResultSet rsetBaseConnector = baseConnectorFetchStmt.executeQuery();
/**
* Load forms and corresponding inputs from Derby database.
*
* Use given prepared statements to load all forms and corresponding inputs
* from Derby.
*
* @param connectorName Connector name for purpose of printing errors
* @param formFetchStmt Prepared statement for fetching forms
* @param inputFetchStmt Prepare statement for fetching inputs
* @param connectionForms List of connection forms that will be filled up
* @param jobForms List of job forms that will be filled up
* @throws SQLException In case of any failure on Derby side
*/
public void loadForms(String connectorName,
PreparedStatement formFetchStmt,
PreparedStatement inputFetchStmt,
List<MForm> connectionForms,
List<MForm> jobForms) throws SQLException {
if (!rsetBaseConnector.next()) {
LOG.debug("No connector found by name: " + shortName);
return null;
}
ResultSet rsetForm = formFetchStmt.executeQuery();
while (rsetForm.next()) {
long formId = rsetForm.getLong(1);
long formConnectorId = rsetForm.getLong(2);
String formName = rsetForm.getString(3);
String formType = rsetForm.getString(4);
int formIndex = rsetForm.getInt(5);
List<MInput<?>> formInputs = new ArrayList<MInput<?>>();
long connectorId = rsetBaseConnector.getLong(1);
String connectorName = rsetBaseConnector.getString(2);
String connectorClassName = rsetBaseConnector.getString(3);
MForm mf = new MForm(formName, formInputs);
mf.setPersistenceId(formId);
List<MForm> connectionForms = new ArrayList<MForm>();
List<MForm> jobForms = new ArrayList<MForm>();
inputFetchStmt.setLong(1, formId);
mc = new MConnector(connectorName, connectorClassName,
connectionForms, jobForms);
mc.setPersistenceId(connectorId);
ResultSet rsetInput = inputFetchStmt.executeQuery();
while (rsetInput.next()) {
long inputId = rsetInput.getLong(1);
String inputName = rsetInput.getString(2);
long inputForm = rsetInput.getLong(3);
short inputIndex = rsetInput.getShort(4);
String inputType = rsetInput.getString(5);
boolean inputStrMask = rsetInput.getBoolean(6);
short inputStrLength = rsetInput.getShort(7);
if (rsetBaseConnector.next()) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0005, shortName);
}
MInputType mit = MInputType.valueOf(inputType);
formFetchStmt = conn.prepareStatement(STMT_FETCH_FORM);
formFetchStmt.setLong(1, connectorId);
inputFetchStmt = conn.prepareStatement(STMT_FETCH_INPUT);
ResultSet rsetForm = formFetchStmt.executeQuery();
while (rsetForm.next()) {
long formId = rsetForm.getLong(1);
long formConnectorId = rsetForm.getLong(2);
String formName = rsetForm.getString(3);
String formType = rsetForm.getString(4);
int formIndex = rsetForm.getInt(5);
List<MInput<?>> formInputs = new ArrayList<MInput<?>>();
MForm mf = new MForm(formName, formInputs);
mf.setPersistenceId(formId);
inputFetchStmt.setLong(1, formId);
ResultSet rsetInput = inputFetchStmt.executeQuery();
while (rsetInput.next()) {
long inputId = rsetInput.getLong(1);
String inputName = rsetInput.getString(2);
long inputForm = rsetInput.getLong(3);
short inputIndex = rsetInput.getShort(4);
String inputType = rsetInput.getString(5);
boolean inputStrMask = rsetInput.getBoolean(6);
short inputStrLength = rsetInput.getShort(7);
MInputType mit = MInputType.valueOf(inputType);
MInput input = null;
switch (mit) {
case STRING:
input = new MStringInput(inputName, inputStrMask, inputStrLength);
break;
case MAP:
input = new MMapInput(inputName);
break;
default:
throw new SqoopException(DerbyRepoError.DERBYREPO_0006,
"input-" + inputName + ":" + inputId + ":"
+ "form-" + inputForm + ":" + mit.name());
}
input.setPersistenceId(inputId);
if (mf.getInputs().size() != inputIndex) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0009,
"form: " + mf + "; input: " + input);
}
mf.getInputs().add(input);
}
if (mf.getInputs().size() == 0) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0008,
"connector-" + formConnectorId + ":" + mf);
}
MFormType mft = MFormType.valueOf(formType);
switch (mft) {
case CONNECTION:
if (mc.getConnectionForms().size() != formIndex) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0010,
"connector: " + mc + "; form: " + mf);
}
mc.getConnectionForms().add(mf);
MInput input = null;
switch (mit) {
case STRING:
input = new MStringInput(inputName, inputStrMask, inputStrLength);
break;
case JOB:
if (mc.getConnectionForms().size() != formIndex) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0010,
"connector: " + mc + "; form: " + mf);
}
mc.getJobForms().add(mf);
case MAP:
input = new MMapInput(inputName);
break;
default:
throw new SqoopException(DerbyRepoError.DERBYREPO_0007,
"connector-" + formConnectorId + ":" + mf);
throw new SqoopException(DerbyRepoError.DERBYREPO_0006,
"input-" + inputName + ":" + inputId + ":"
+ "form-" + inputForm + ":" + mit.name());
}
input.setPersistenceId(inputId);
if (mf.getInputs().size() != inputIndex) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0009,
"form: " + mf + "; input: " + input);
}
mf.getInputs().add(input);
}
} catch (SQLException ex) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0004, shortName, ex);
} finally {
if (baseConnectorFetchStmt != null) {
try {
baseConnectorFetchStmt.close();
} catch (SQLException ex) {
LOG.error("Unable to close base connector fetch statement", ex);
}
if (mf.getInputs().size() == 0) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0008,
"connector-" + formConnectorId + ":" + mf);
}
if (formFetchStmt != null) {
try {
formFetchStmt.close();
} catch (SQLException ex) {
LOG.error("Unable to close form fetch statement", ex);
MFormType mft = MFormType.valueOf(formType);
switch (mft) {
case CONNECTION:
if (connectionForms.size() != formIndex) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0010,
"connector: " + connectorName + "; form: " + mf);
}
}
if (inputFetchStmt != null) {
try {
inputFetchStmt.close();
} catch (SQLException ex) {
LOG.error("Unable to close input fetch statement", ex);
connectionForms.add(mf);
break;
case JOB:
if (jobForms.size() != formIndex) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0010,
"connector: " + connectorName + "; form: " + mf);
}
jobForms.add(mf);
break;
default:
throw new SqoopException(DerbyRepoError.DERBYREPO_0007,
"connector-" + formConnectorId + ":" + mf);
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Looking up connector: " + shortName + ", found: " + mc);
}
return mc;
}
}

View File

@ -42,7 +42,7 @@
* | SQ_FORM |
* +-----------------------------+
* | SQF_ID: BIGINT PK AUTO-GEN |
* | SQF_CONNECTOR: BIGINT | FK SQ_CONNECTOR(SQC_ID)
* | SQF_CONNECTOR: BIGINT | FK SQ_CONNECTOR(SQC_ID),NULL for framework
* | SQF_NAME: VARCHAR(64) |
* | SQF_TYPE: VARCHAR(32) | "CONNECTION"|"JOB"
* | SQF_INDEX: SMALLINT |
@ -108,12 +108,19 @@ public final class DerbySchemaQuery {
// DML: Fetch all forms for a given connector
public static final String STMT_FETCH_FORM =
public static final String STMT_FETCH_FORM_CONNECTOR =
"SELECT " + COLUMN_SQF_ID + ", " + COLUMN_SQF_CONNECTOR + ", "
+ COLUMN_SQF_NAME + ", " + COLUMN_SQF_TYPE + ", " + COLUMN_SQF_INDEX
+ " FROM " + TABLE_SQ_FORM + " WHERE " + COLUMN_SQF_CONNECTOR
+ " = ? ORDER BY " + COLUMN_SQF_INDEX;
// DML: Fetch all framework forms
public static final String STMT_FETCH_FORM_FRAMEWORK =
"SELECT " + COLUMN_SQF_ID + ", " + COLUMN_SQF_CONNECTOR + ", "
+ COLUMN_SQF_NAME + ", " + COLUMN_SQF_TYPE + ", " + COLUMN_SQF_INDEX
+ " FROM " + TABLE_SQ_FORM + " WHERE " + COLUMN_SQF_CONNECTOR
+ " IS NULL ORDER BY " + COLUMN_SQF_INDEX;
// DML: Fetch inputs for a given form
public static final String STMT_FETCH_INPUT =
"SELECT " + COLUMN_SQI_ID + ", " + COLUMN_SQI_NAME + ", "

View File

@ -24,6 +24,7 @@
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.ConnectorManager;
import org.apache.sqoop.core.SqoopConfiguration;
import org.apache.sqoop.framework.FrameworkManager;
import org.apache.sqoop.repository.RepositoryManager;
@ -37,6 +38,7 @@ public class ServerInitializer implements ServletContextListener {
Logger.getLogger(ServerInitializer.class);
public void contextDestroyed(ServletContextEvent arg0) {
FrameworkManager.destroy();
ConnectorManager.destroy();
RepositoryManager.destroy();
SqoopConfiguration.destroy();
@ -47,6 +49,7 @@ public void contextInitialized(ServletContextEvent arg0) {
SqoopConfiguration.initialize();
RepositoryManager.initialize();
ConnectorManager.initialize();
FrameworkManager.initialize();
} catch (RuntimeException ex) {
LOG.error("Server startup failure", ex);
throw ex;