diff --git a/common/src/main/java/org/apache/sqoop/model/MConnector.java b/common/src/main/java/org/apache/sqoop/model/MConnector.java index fdae2c9a..6f7fc205 100644 --- a/common/src/main/java/org/apache/sqoop/model/MConnector.java +++ b/common/src/main/java/org/apache/sqoop/model/MConnector.java @@ -17,30 +17,30 @@ */ package org.apache.sqoop.model; -import java.util.ArrayList; import java.util.List; -public final class MConnector extends MPersistableEntity { +/** + * Connector metadata. + * + * Includes unique id that identifies connector in metadata store, unique human + * readable name, corresponding name and all forms for both connections and + * jobs. + */ +public final class MConnector extends MFramework { private final String uniqueName; private final String className; - private final List connectionForms; - private final List jobForms; public MConnector(String uniqueName, String className, List connectionForms, List jobForms) { + super(connectionForms, jobForms); + if (uniqueName == null || className == null) { throw new NullPointerException(); } this.uniqueName = uniqueName; this.className = className; - - this.connectionForms = new ArrayList(connectionForms.size()); - this.connectionForms.addAll(connectionForms); - - this.jobForms = new ArrayList(jobForms.size()); - this.jobForms.addAll(jobForms); } public String getUniqueName() { @@ -55,8 +55,8 @@ public String getClassName() { public String toString() { StringBuilder sb = new StringBuilder("connector-"); sb.append(uniqueName).append(":").append(getPersistenceId()).append(":"); - sb.append(className).append("; conn-forms:").append(connectionForms); - sb.append("; job-forms:").append(jobForms); + sb.append(className).append("; conn-forms:").append(getConnectionForms()); + sb.append("; job-forms:").append(getJobForms()); return sb.toString(); } @@ -74,30 +74,15 @@ public boolean equals(Object other) { MConnector mc = (MConnector) other; return (uniqueName.equals(mc.uniqueName) && className.equals(mc.className)) - && connectionForms.equals(mc.connectionForms) - && jobForms.equals(mc.jobForms); + && super.equals(other); } @Override public int hashCode() { - int result = 23; + int result = super.hashCode(); result = 31 * result + uniqueName.hashCode(); result = 31 * result + className.hashCode(); - for (MForm cmf : connectionForms) { - result = 31 * result + cmf.hashCode(); - } - for (MForm jmf : jobForms) { - result = 31 * result + jmf.hashCode(); - } return result; } - - public List getConnectionForms() { - return connectionForms; - } - - public List getJobForms() { - return jobForms; - } } diff --git a/common/src/main/java/org/apache/sqoop/model/MFramework.java b/common/src/main/java/org/apache/sqoop/model/MFramework.java new file mode 100644 index 00000000..8af074f8 --- /dev/null +++ b/common/src/main/java/org/apache/sqoop/model/MFramework.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.model; + +import java.util.ArrayList; +import java.util.List; + +/** + * Metadata describing framework options for connections and jobs. + */ +public class MFramework extends MPersistableEntity { + + private final List connectionForms; + private final List jobForms; + + public MFramework(List connectionForms, List jobForms) { + this.connectionForms = new ArrayList(connectionForms.size()); + this.connectionForms.addAll(connectionForms); + + this.jobForms = new ArrayList(jobForms.size()); + this.jobForms.addAll(jobForms); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("framework-"); + sb.append(getPersistenceId()).append(":"); + sb.append("; conn-forms:").append(connectionForms); + sb.append("; job-forms:").append(jobForms); + + return sb.toString(); + } + + @Override + public boolean equals(Object other) { + if (other == this) { + return true; + } + + if (!(other instanceof MFramework)) { + return false; + } + + MFramework mc = (MFramework) other; + return connectionForms.equals(mc.connectionForms) + && jobForms.equals(mc.jobForms); + } + + @Override + public int hashCode() { + int result = 23; + for (MForm cmf : connectionForms) { + result = 31 * result + cmf.hashCode(); + } + for (MForm jmf : jobForms) { + result = 31 * result + jmf.hashCode(); + } + + return result; + } + + public List getConnectionForms() { + return connectionForms; + } + + public List getJobForms() { + return jobForms; + } +} diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkConstants.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkConstants.java new file mode 100644 index 00000000..e0fa0bf2 --- /dev/null +++ b/core/src/main/java/org/apache/sqoop/framework/FrameworkConstants.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.framework; + +/** + * Constants that are used in framework module. + */ +public class FrameworkConstants { + + public static final String INPUT_CONN_MAX_SIMULTANEOUS_CONNECTIONS = + "inp-conn-max-connections"; + public static final String INPUT_CONN_MAX_OUTPUT_FORMAT= + "inp-conn-output-format"; + + public static final String FORM_SECURITY = + "form-security"; + public static final String FORM_OUTPUT = + "form-output"; +} diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkError.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkError.java new file mode 100644 index 00000000..e0d91d4c --- /dev/null +++ b/core/src/main/java/org/apache/sqoop/framework/FrameworkError.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.framework; + +import org.apache.sqoop.common.ErrorCode; + +/** + * + */ +public enum FrameworkError implements ErrorCode { + + FRAMEWORK_0000("Metadata are not registered in repository"); + + private final String message; + + private FrameworkError(String message) { + this.message = message; + } + + public String getCode() { + return name(); + } + + public String getMessage() { + return message; + } +} diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java new file mode 100644 index 00000000..2694fc64 --- /dev/null +++ b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.framework; + +import org.apache.log4j.Logger; +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.model.MForm; +import org.apache.sqoop.model.MFramework; +import org.apache.sqoop.model.MInput; +import org.apache.sqoop.model.MStringInput; +import org.apache.sqoop.repository.RepositoryManager; + +import static org.apache.sqoop.framework.FrameworkConstants.*; + +import java.util.ArrayList; +import java.util.List; + +/** + * Manager for Sqoop framework itself. + * + * All Sqoop internals (job execution engine, metadata) should be handled + * within this manager. + * + */ +public class FrameworkManager { + + private static final Logger LOG = Logger.getLogger(FrameworkManager.class); + + private static final List CONNECTION_FORMS = new ArrayList(); + private static final List JOB_FORMS = new ArrayList(); + + private static MFramework mFramework; + + static { + // Build the connection forms + List> connFormInputs = new ArrayList>(); + + MStringInput maxConnections = new MStringInput( + INPUT_CONN_MAX_SIMULTANEOUS_CONNECTIONS, false, (short) 10); + connFormInputs.add(maxConnections); + + MForm connForm = new MForm(FORM_SECURITY, connFormInputs); + + CONNECTION_FORMS.add(connForm); + + // Build job forms + List> jobFormInputs = new ArrayList>(); + + MStringInput outputFormat = new MStringInput(INPUT_CONN_MAX_OUTPUT_FORMAT, + false, (short) 25); + jobFormInputs.add(outputFormat); + + MForm jobForm = new MForm(FORM_OUTPUT, jobFormInputs); + JOB_FORMS.add(jobForm); + } + + public static synchronized void initialize() { + LOG.trace("Begin connector manager initialization"); + + // Register framework metadata + mFramework = new MFramework(CONNECTION_FORMS, JOB_FORMS); + RepositoryManager.getRepository().registerFramework(mFramework); + if (!mFramework.hasPersistenceId()) { + throw new SqoopException(FrameworkError.FRAMEWORK_0000); + } + } + + public static synchronized void destroy() { + LOG.trace("Begin framework manager destroy"); + } + +} diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java index f907b31c..b2f793da 100644 --- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java +++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java @@ -21,8 +21,8 @@ import org.apache.log4j.Logger; import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.connector.ConnectorHandler; import org.apache.sqoop.model.MConnector; +import org.apache.sqoop.model.MFramework; public class JdbcRepository implements Repository { @@ -38,11 +38,17 @@ protected JdbcRepository(JdbcRepositoryHandler handler, this.repoContext = repoContext; } + /** + * {@inheritDoc} + */ @Override public JdbcRepositoryTransaction getTransaction() { return repoContext.getTransactionFactory().get(); } + /** + * {@inheritDoc} + */ @Override public MConnector registerConnector(MConnector mConnector) { MConnector result = null; @@ -80,4 +86,43 @@ public MConnector registerConnector(MConnector mConnector) { return result; } + + /** + * {@inheritDoc} + */ + @Override + public void registerFramework(MFramework mFramework) { + MFramework result = null; + JdbcRepositoryTransaction tx = null; + + try { + tx = getTransaction(); + tx.begin(); + Connection conn = tx.getConnection(); + result = handler.findFramework(conn); + if (result == null) { + handler.registerFramework(mFramework, conn); + } else { + if (!result.equals(mFramework)) { + throw new SqoopException(RepositoryError.JDBCREPO_0014, + "given[" + mFramework + "] found[" + result + "]"); + } + mFramework.setPersistenceId(result.getPersistenceId()); + } + tx.commit(); + } catch (Exception ex) { + if (tx != null) { + tx.rollback(); + } + if (ex instanceof SqoopException) { + throw (SqoopException) ex; + } + throw new SqoopException(RepositoryError.JDBCREPO_0012, + mFramework.toString(), ex); + } finally { + if (tx != null) { + tx.close(); + } + } + } } diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java index c5d00fe5..694f3799 100644 --- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java +++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java @@ -20,18 +20,79 @@ import java.sql.Connection; import org.apache.sqoop.model.MConnector; +import org.apache.sqoop.model.MFramework; +/** + * Set of methods required from each JDBC based repository. + */ public interface JdbcRepositoryHandler { + /** + * Initialize JDBC based repository. + * + * @param repoContext Context for this instance + */ public void initialize(JdbcRepositoryContext repoContext); + /** + * Search for connector with given name in repository. + * + * And return corresponding metadata structure. + * + * @param shortName Connector unique name + * @param conn JDBC connection for querying repository. + * @return null if connector is not yet registered in repository or + * loaded representation. + */ public MConnector findConnector(String shortName, Connection conn); + /** + * Register given connector in repository. + * + * Save given connector data to the repository. Given connector should not be + * already registered or present in the repository. + * + * @param mc Connector that should be registered. + * @param conn JDBC connection for querying repository. + */ public void registerConnector(MConnector mc, Connection conn); + /** + * Search for framework metadata in the repository. + * + * @param conn JDBC connection for querying repository. + * @return null if framework metadata are not yet present in repository or + * loaded representation. + */ + public MFramework findFramework(Connection conn); + + /** + * Register framework metadata in repository. + * + * Save framework metadata into repository. Metadata should not be already + * registered or present in the repository. + * + * @param mf Framework metadata that should be registered. + * @param conn JDBC connection for querying repository. + */ + public void registerFramework(MFramework mf, Connection conn); + + /** + * Check if schema is already present in the repository. + * + * @return true if schema is already present or false if it's not + */ public boolean schemaExists(); + /** + * Create required schema in repository. + */ public void createSchema(); + /** + * Termination callback for repository. + * + * Should clean up all resources and commit all uncommitted data. + */ public void shutdown(); } diff --git a/core/src/main/java/org/apache/sqoop/repository/Repository.java b/core/src/main/java/org/apache/sqoop/repository/Repository.java index 9ad1bca0..0090fd0b 100644 --- a/core/src/main/java/org/apache/sqoop/repository/Repository.java +++ b/core/src/main/java/org/apache/sqoop/repository/Repository.java @@ -18,6 +18,7 @@ package org.apache.sqoop.repository; import org.apache.sqoop.model.MConnector; +import org.apache.sqoop.model.MFramework; /** @@ -34,10 +35,24 @@ public interface Repository { * already registered, its associated metadata is returned from the * repository. * + * Method will set persistent ID of given MConnector instance in case of a + * success. + * * @param mConnector the connector metadata to be registered * @return null if the connector was successfully registered or * a instance of previously registered metadata with the same connector * unique name. */ public MConnector registerConnector(MConnector mConnector); + + /** + * Registers framework metadata in the repository. No more than one set of + * framework metadata structure is allowed. + * + * Method will set persistent ID of given MFramework instance in case of a + * success. + * + * @param mFramework Framework data that should be registered. + */ + public void registerFramework(MFramework mFramework); } diff --git a/core/src/main/java/org/apache/sqoop/repository/RepositoryError.java b/core/src/main/java/org/apache/sqoop/repository/RepositoryError.java index 4f4e1e37..f6b1a33d 100644 --- a/core/src/main/java/org/apache/sqoop/repository/RepositoryError.java +++ b/core/src/main/java/org/apache/sqoop/repository/RepositoryError.java @@ -73,12 +73,14 @@ public enum RepositoryError implements ErrorCode { */ JDBCREPO_0011("Attempt to reinitialize JDBC repository context"), - /** The system was unable to register the connector in its repository. */ - JDBCREPO_0012("Failed to register connector in repository"), + /** The system was unable to register metadata in its repository. */ + JDBCREPO_0012("Failure in repository metadata registration process."), /** The system found a change in connector metadata that requires upgrade. */ - JDBCREPO_0013("Connector metadata changed - upgrade may be required"); + JDBCREPO_0013("Connector metadata changed - upgrade may be required"), + /** The system found a change in framework metadata that requires upgrade. */ + JDBCREPO_0014("Framework metadata changed - upgrade may be required"); private final String message; diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java index 7bd5f6f2..ae400cd3 100644 --- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java +++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java @@ -33,8 +33,8 @@ public enum DerbyRepoError implements ErrorCode { /** The system was unable to run the specified query. */ DERBYREPO_0003("Unable to run specified query"), - /** The system was unable to query the repository for connector metadata. */ - DERBYREPO_0004("Unable to retrieve connector metadata"), + /** The system was unable to query the repository for metadata. */ + DERBYREPO_0004("Unable to retrieve metadata"), /** The metadata repository contains more than one connector with same name */ DERBYREPO_0005("Invalid metadata state - multiple connectors with name"), @@ -58,10 +58,10 @@ public enum DerbyRepoError implements ErrorCode { DERBYREPO_0010("The form retrieved does not match expteced position"), /** - * The system was not able to register connector metadata due to a - * pre-assigned persistence identifier. + * The system was not able to register metadata due to a pre-assigned + * persistence identifier. */ - DERBYREPO_0011("Connector metadata cannot have preassigned persistence id"), + DERBYREPO_0011("Metadata cannot have preassigned persistence id"), /** * The system was unable to register connector metadata due to an unexpected diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java index c483a17d..8b0fc388 100644 --- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java +++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java @@ -18,11 +18,6 @@ package org.apache.sqoop.repository.derby; import static org.apache.sqoop.repository.derby.DerbySchemaQuery.*; -import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_CONNECTOR; -import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_FORM; -import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_INPUT; -import static org.apache.sqoop.repository.derby.DerbySchemaQuery.STMT_FETCH_BASE_CONNECTOR; -import static org.apache.sqoop.repository.derby.DerbySchemaQuery.STMT_FETCH_FORM; import java.sql.Connection; import java.sql.DriverManager; @@ -30,6 +25,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.sql.Types; import java.util.ArrayList; import java.util.List; @@ -40,6 +36,7 @@ import org.apache.sqoop.model.MConnector; import org.apache.sqoop.model.MForm; import org.apache.sqoop.model.MFormType; +import org.apache.sqoop.model.MFramework; import org.apache.sqoop.model.MInput; import org.apache.sqoop.model.MInputType; import org.apache.sqoop.model.MMapInput; @@ -48,6 +45,11 @@ import org.apache.sqoop.repository.JdbcRepositoryHandler; import org.apache.sqoop.repository.JdbcRepositoryTransactionFactory; +/** + * JDBC based repository handler for Derby database. + * + * Repository implementation for Derby database. + */ public class DerbyRepositoryHandler implements JdbcRepositoryHandler { private static final Logger LOG = @@ -62,11 +64,13 @@ public class DerbyRepositoryHandler implements JdbcRepositoryHandler { private static final String EMBEDDED_DERBY_DRIVER_CLASSNAME = "org.apache.derby.jdbc.EmbeddedDriver"; - private JdbcRepositoryContext repoContext; private DataSource dataSource; private JdbcRepositoryTransactionFactory txFactory; + /** + * {@inheritDoc} + */ @Override public void registerConnector(MConnector mc, Connection conn) { if (mc.hasPersistenceId()) { @@ -139,64 +143,9 @@ public void registerConnector(MConnector mc, Connection conn) { } } - private void registerForms(long connectorId, List forms, String type, - PreparedStatement baseFormStmt, PreparedStatement baseInputStmt) - throws SQLException { - short formIndex = 0; - for (MForm form : forms) { - baseFormStmt.setLong(1, connectorId); - baseFormStmt.setString(2, form.getName()); - baseFormStmt.setString(3, type); - baseFormStmt.setShort(4, formIndex++); - - int baseFormCount = baseFormStmt.executeUpdate(); - if (baseFormCount != 1) { - throw new SqoopException(DerbyRepoError.DERBYREPO_0015, - new Integer(baseFormCount).toString()); - } - ResultSet rsetFormId = baseFormStmt.getGeneratedKeys(); - if (!rsetFormId.next()) { - throw new SqoopException(DerbyRepoError.DERBYREPO_0016); - } - - long formId = rsetFormId.getLong(1); - form.setPersistenceId(formId); - - // Insert all the inputs - List> inputs = form.getInputs(); - registerFormInputs(formId, inputs, baseInputStmt); - } - } - - private void registerFormInputs(long formId, List> inputs, - PreparedStatement baseInputStmt) throws SQLException { - short inputIndex = 0; - for (MInput input : inputs) { - baseInputStmt.setString(1, input.getName()); - baseInputStmt.setLong(2, formId); - baseInputStmt.setShort(3, inputIndex++); - baseInputStmt.setString(4, input.getType().name()); - if (input.getType().equals(MInputType.STRING)) { - MStringInput strInput = (MStringInput) input; - baseInputStmt.setBoolean(5, strInput.isMasked()); - baseInputStmt.setShort(6, strInput.getMaxLength()); - } - int baseInputCount = baseInputStmt.executeUpdate(); - if (baseInputCount != 1) { - throw new SqoopException(DerbyRepoError.DERBYREPO_0017, - new Integer(baseInputCount).toString()); - } - - ResultSet rsetInputId = baseInputStmt.getGeneratedKeys(); - if (!rsetInputId.next()) { - throw new SqoopException(DerbyRepoError.DERBYREPO_0018); - } - - long inputId = rsetInputId.getLong(1); - input.setPersistenceId(inputId); - } - } - + /** + * {@inheritDoc} + */ @Override public synchronized void initialize(JdbcRepositoryContext ctx) { repoContext = ctx; @@ -205,6 +154,9 @@ public synchronized void initialize(JdbcRepositoryContext ctx) { LOG.info("DerbyRepositoryHandler initialized."); } + /** + * {@inheritDoc} + */ @Override public synchronized void shutdown() { String driver = repoContext.getDriverClass(); @@ -244,6 +196,10 @@ public synchronized void shutdown() { } } + /** + * {@inheritDoc} + */ + @Override public void createSchema() { runQuery(QUERY_CREATE_SCHEMA_SQOOP); runQuery(QUERY_CREATE_TABLE_SQ_CONNECTOR); @@ -251,6 +207,10 @@ public void createSchema() { runQuery(QUERY_CREATE_TABLE_SQ_INPUT); } + /** + * {@inheritDoc} + */ + @Override public boolean schemaExists() { Connection connection = null; Statement stmt = null; @@ -295,7 +255,275 @@ public boolean schemaExists() { return true; } + /** + * {@inheritDoc} + */ + @Override + public MConnector findConnector(String shortName, Connection conn) { + if (LOG.isDebugEnabled()) { + LOG.debug("Looking up connector: " + shortName); + } + MConnector mc = null; + PreparedStatement baseConnectorFetchStmt = null; + PreparedStatement formFetchStmt = null; + PreparedStatement inputFetchStmt = null; + try { + baseConnectorFetchStmt = conn.prepareStatement(STMT_FETCH_BASE_CONNECTOR); + baseConnectorFetchStmt.setString(1, shortName); + ResultSet rsetBaseConnector = baseConnectorFetchStmt.executeQuery(); + if (!rsetBaseConnector.next()) { + LOG.debug("No connector found by name: " + shortName); + return null; + } + + long connectorId = rsetBaseConnector.getLong(1); + String connectorName = rsetBaseConnector.getString(2); + String connectorClassName = rsetBaseConnector.getString(3); + + List connectionForms = new ArrayList(); + List jobForms = new ArrayList(); + + formFetchStmt = conn.prepareStatement(STMT_FETCH_FORM_CONNECTOR); + formFetchStmt.setLong(1, connectorId); + inputFetchStmt = conn.prepareStatement(STMT_FETCH_INPUT); + + loadForms(shortName, formFetchStmt, inputFetchStmt, + connectionForms, jobForms); + + mc = new MConnector(connectorName, connectorClassName, + connectionForms, jobForms); + mc.setPersistenceId(connectorId); + + if (rsetBaseConnector.next()) { + throw new SqoopException(DerbyRepoError.DERBYREPO_0005, shortName); + } + } catch (SQLException ex) { + throw new SqoopException(DerbyRepoError.DERBYREPO_0004, shortName, ex); + } finally { + if (baseConnectorFetchStmt != null) { + try { + baseConnectorFetchStmt.close(); + } catch (SQLException ex) { + LOG.error("Unable to close base connector fetch statement", ex); + } + } + if (formFetchStmt != null) { + try { + formFetchStmt.close(); + } catch (SQLException ex) { + LOG.error("Unable to close form fetch statement", ex); + } + } + if (inputFetchStmt != null) { + try { + inputFetchStmt.close(); + } catch (SQLException ex) { + LOG.error("Unable to close input fetch statement", ex); + } + } + } + + LOG.debug("Looking up connector: " + shortName + ", found: " + mc); + return mc; + } + + /** + * {@inheritDoc} + */ + @Override + public void registerFramework(MFramework mf, Connection conn) { + if (mf.hasPersistenceId()) { + throw new SqoopException(DerbyRepoError.DERBYREPO_0011, + "Framework metadata"); + } + + PreparedStatement baseFormStmt = null; + PreparedStatement baseInputStmt = null; + try { + baseFormStmt = conn.prepareStatement(STMT_INSERT_FORM_BASE, + Statement.RETURN_GENERATED_KEYS); + baseInputStmt = conn.prepareStatement(STMT_INSERT_INPUT_BASE, + Statement.RETURN_GENERATED_KEYS); + + // Insert connection forms + registerForms(null, mf.getConnectionForms(), + MFormType.CONNECTION.name(), baseFormStmt, baseInputStmt); + + registerForms(null, mf.getJobForms(), + MFormType.JOB.name(), baseFormStmt, baseInputStmt); + + // We're using hardcoded value for framework metadata as they are + // represented as NULL in the database. + mf.setPersistenceId(1); + } catch (SQLException ex) { + throw new SqoopException(DerbyRepoError.DERBYREPO_0014, + mf.toString(), ex); + } finally { + if (baseFormStmt != null) { + try { + baseFormStmt.close(); + } catch (SQLException ex) { + LOG.error("Unable to close base form statement", ex); + } + } + if (baseInputStmt != null) { + try { + baseInputStmt.close(); + } catch (SQLException ex) { + LOG.error("Unable to close base input statement", ex); + } + } + } + } + + /** + * {@inheritDoc} + */ + @Override + public MFramework findFramework(Connection conn) { + LOG.debug("Looking up framework metadata"); + MFramework mf = null; + PreparedStatement formFetchStmt = null; + PreparedStatement inputFetchStmt = null; + try { + List connectionForms = new ArrayList(); + List jobForms = new ArrayList(); + + formFetchStmt = conn.prepareStatement(STMT_FETCH_FORM_FRAMEWORK); + inputFetchStmt = conn.prepareStatement(STMT_FETCH_INPUT); + + loadForms("Framework metadata", formFetchStmt, inputFetchStmt, + connectionForms, jobForms); + + mf = new MFramework(connectionForms, jobForms); + + // We're using hardcoded value for framework metadata as they are + // represented as NULL in the database. + mf.setPersistenceId(1); + + } catch (SQLException ex) { + throw new SqoopException(DerbyRepoError.DERBYREPO_0004, + "Framework metadata", ex); + } finally { + if (formFetchStmt != null) { + try { + formFetchStmt.close(); + } catch (SQLException ex) { + LOG.error("Unable to close form fetch statement", ex); + } + } + if (inputFetchStmt != null) { + try { + inputFetchStmt.close(); + } catch (SQLException ex) { + LOG.error("Unable to close input fetch statement", ex); + } + } + } + + LOG.debug("Looking up framework metadta found: " + mf); + + // If there aren't any framework metadata + if(mf.getConnectionForms().size() == 0 && mf.getJobForms().size() == 0) { + return null; + } + + // Returned loaded framework metadata + return mf; + } + + /** + * Register forms in derby database. + * + * Use given prepared statements to create entire form structure in database. + * + * @param connectorId + * @param forms + * @param type + * @param baseFormStmt + * @param baseInputStmt + * @throws SQLException + */ + private void registerForms(Long connectorId, List forms, String type, + PreparedStatement baseFormStmt, PreparedStatement baseInputStmt) + throws SQLException { + short formIndex = 0; + for (MForm form : forms) { + if(connectorId == null) { + baseFormStmt.setNull(1, Types.BIGINT); + } else { + baseFormStmt.setLong(1, connectorId); + } + baseFormStmt.setString(2, form.getName()); + baseFormStmt.setString(3, type); + baseFormStmt.setShort(4, formIndex++); + + int baseFormCount = baseFormStmt.executeUpdate(); + if (baseFormCount != 1) { + throw new SqoopException(DerbyRepoError.DERBYREPO_0015, + new Integer(baseFormCount).toString()); + } + ResultSet rsetFormId = baseFormStmt.getGeneratedKeys(); + if (!rsetFormId.next()) { + throw new SqoopException(DerbyRepoError.DERBYREPO_0016); + } + + long formId = rsetFormId.getLong(1); + form.setPersistenceId(formId); + + // Insert all the inputs + List> inputs = form.getInputs(); + registerFormInputs(formId, inputs, baseInputStmt); + } + } + + /** + * Save given inputs to the database. + * + * Use given prepare statement to save all inputs into repository. + * + * @param formId Identifier for corresponding form + * @param inputs List of inputs that needs to be saved + * @param baseInputStmt Statement that we can utilize + * @throws SQLException In case of any failure on Derby side + */ + private void registerFormInputs(long formId, List> inputs, + PreparedStatement baseInputStmt) throws SQLException { + short inputIndex = 0; + for (MInput input : inputs) { + baseInputStmt.setString(1, input.getName()); + baseInputStmt.setLong(2, formId); + baseInputStmt.setShort(3, inputIndex++); + baseInputStmt.setString(4, input.getType().name()); + if (input.getType().equals(MInputType.STRING)) { + MStringInput strInput = (MStringInput) input; + baseInputStmt.setBoolean(5, strInput.isMasked()); + baseInputStmt.setShort(6, strInput.getMaxLength()); + } + int baseInputCount = baseInputStmt.executeUpdate(); + if (baseInputCount != 1) { + throw new SqoopException(DerbyRepoError.DERBYREPO_0017, + new Integer(baseInputCount).toString()); + } + + ResultSet rsetInputId = baseInputStmt.getGeneratedKeys(); + if (!rsetInputId.next()) { + throw new SqoopException(DerbyRepoError.DERBYREPO_0018); + } + + long inputId = rsetInputId.getLong(1); + input.setPersistenceId(inputId); + } + } + + /** + * Execute given query on database. + * + * Passed query will be executed in it's own transaction + * + * @param query Query that should be executed + */ private void runQuery(String query) { Connection connection = null; Statement stmt = null; @@ -341,148 +569,99 @@ private void runQuery(String query) { } } - @Override - public MConnector findConnector(String shortName, Connection conn) { - if (LOG.isDebugEnabled()) { - LOG.debug("Looking up connector: " + shortName); - } - MConnector mc = null; - PreparedStatement baseConnectorFetchStmt = null; - PreparedStatement formFetchStmt = null; - PreparedStatement inputFetchStmt = null; - try { - baseConnectorFetchStmt = conn.prepareStatement(STMT_FETCH_BASE_CONNECTOR); - baseConnectorFetchStmt.setString(1, shortName); - ResultSet rsetBaseConnector = baseConnectorFetchStmt.executeQuery(); + /** + * Load forms and corresponding inputs from Derby database. + * + * Use given prepared statements to load all forms and corresponding inputs + * from Derby. + * + * @param connectorName Connector name for purpose of printing errors + * @param formFetchStmt Prepared statement for fetching forms + * @param inputFetchStmt Prepare statement for fetching inputs + * @param connectionForms List of connection forms that will be filled up + * @param jobForms List of job forms that will be filled up + * @throws SQLException In case of any failure on Derby side + */ + public void loadForms(String connectorName, + PreparedStatement formFetchStmt, + PreparedStatement inputFetchStmt, + List connectionForms, + List jobForms) throws SQLException { - if (!rsetBaseConnector.next()) { - LOG.debug("No connector found by name: " + shortName); - return null; - } + ResultSet rsetForm = formFetchStmt.executeQuery(); + while (rsetForm.next()) { + long formId = rsetForm.getLong(1); + long formConnectorId = rsetForm.getLong(2); + String formName = rsetForm.getString(3); + String formType = rsetForm.getString(4); + int formIndex = rsetForm.getInt(5); + List> formInputs = new ArrayList>(); - long connectorId = rsetBaseConnector.getLong(1); - String connectorName = rsetBaseConnector.getString(2); - String connectorClassName = rsetBaseConnector.getString(3); + MForm mf = new MForm(formName, formInputs); + mf.setPersistenceId(formId); - List connectionForms = new ArrayList(); - List jobForms = new ArrayList(); + inputFetchStmt.setLong(1, formId); - mc = new MConnector(connectorName, connectorClassName, - connectionForms, jobForms); - mc.setPersistenceId(connectorId); + ResultSet rsetInput = inputFetchStmt.executeQuery(); + while (rsetInput.next()) { + long inputId = rsetInput.getLong(1); + String inputName = rsetInput.getString(2); + long inputForm = rsetInput.getLong(3); + short inputIndex = rsetInput.getShort(4); + String inputType = rsetInput.getString(5); + boolean inputStrMask = rsetInput.getBoolean(6); + short inputStrLength = rsetInput.getShort(7); - if (rsetBaseConnector.next()) { - throw new SqoopException(DerbyRepoError.DERBYREPO_0005, shortName); - } + MInputType mit = MInputType.valueOf(inputType); - formFetchStmt = conn.prepareStatement(STMT_FETCH_FORM); - formFetchStmt.setLong(1, connectorId); - - inputFetchStmt = conn.prepareStatement(STMT_FETCH_INPUT); - - ResultSet rsetForm = formFetchStmt.executeQuery(); - while (rsetForm.next()) { - long formId = rsetForm.getLong(1); - long formConnectorId = rsetForm.getLong(2); - String formName = rsetForm.getString(3); - String formType = rsetForm.getString(4); - int formIndex = rsetForm.getInt(5); - List> formInputs = new ArrayList>(); - - MForm mf = new MForm(formName, formInputs); - mf.setPersistenceId(formId); - - inputFetchStmt.setLong(1, formId); - - ResultSet rsetInput = inputFetchStmt.executeQuery(); - while (rsetInput.next()) { - long inputId = rsetInput.getLong(1); - String inputName = rsetInput.getString(2); - long inputForm = rsetInput.getLong(3); - short inputIndex = rsetInput.getShort(4); - String inputType = rsetInput.getString(5); - boolean inputStrMask = rsetInput.getBoolean(6); - short inputStrLength = rsetInput.getShort(7); - - MInputType mit = MInputType.valueOf(inputType); - - MInput input = null; - switch (mit) { - case STRING: - input = new MStringInput(inputName, inputStrMask, inputStrLength); - break; - case MAP: - input = new MMapInput(inputName); - break; - default: - throw new SqoopException(DerbyRepoError.DERBYREPO_0006, - "input-" + inputName + ":" + inputId + ":" - + "form-" + inputForm + ":" + mit.name()); - } - input.setPersistenceId(inputId); - - if (mf.getInputs().size() != inputIndex) { - throw new SqoopException(DerbyRepoError.DERBYREPO_0009, - "form: " + mf + "; input: " + input); - } - - mf.getInputs().add(input); - } - - if (mf.getInputs().size() == 0) { - throw new SqoopException(DerbyRepoError.DERBYREPO_0008, - "connector-" + formConnectorId + ":" + mf); - } - - MFormType mft = MFormType.valueOf(formType); - switch (mft) { - case CONNECTION: - if (mc.getConnectionForms().size() != formIndex) { - throw new SqoopException(DerbyRepoError.DERBYREPO_0010, - "connector: " + mc + "; form: " + mf); - } - mc.getConnectionForms().add(mf); + MInput input = null; + switch (mit) { + case STRING: + input = new MStringInput(inputName, inputStrMask, inputStrLength); break; - case JOB: - if (mc.getConnectionForms().size() != formIndex) { - throw new SqoopException(DerbyRepoError.DERBYREPO_0010, - "connector: " + mc + "; form: " + mf); - } - mc.getJobForms().add(mf); + case MAP: + input = new MMapInput(inputName); break; default: - throw new SqoopException(DerbyRepoError.DERBYREPO_0007, - "connector-" + formConnectorId + ":" + mf); + throw new SqoopException(DerbyRepoError.DERBYREPO_0006, + "input-" + inputName + ":" + inputId + ":" + + "form-" + inputForm + ":" + mit.name()); } + input.setPersistenceId(inputId); + + if (mf.getInputs().size() != inputIndex) { + throw new SqoopException(DerbyRepoError.DERBYREPO_0009, + "form: " + mf + "; input: " + input); + } + + mf.getInputs().add(input); } - } catch (SQLException ex) { - throw new SqoopException(DerbyRepoError.DERBYREPO_0004, shortName, ex); - } finally { - if (baseConnectorFetchStmt != null) { - try { - baseConnectorFetchStmt.close(); - } catch (SQLException ex) { - LOG.error("Unable to close base connector fetch statement", ex); - } + + if (mf.getInputs().size() == 0) { + throw new SqoopException(DerbyRepoError.DERBYREPO_0008, + "connector-" + formConnectorId + ":" + mf); } - if (formFetchStmt != null) { - try { - formFetchStmt.close(); - } catch (SQLException ex) { - LOG.error("Unable to close form fetch statement", ex); + + MFormType mft = MFormType.valueOf(formType); + switch (mft) { + case CONNECTION: + if (connectionForms.size() != formIndex) { + throw new SqoopException(DerbyRepoError.DERBYREPO_0010, + "connector: " + connectorName + "; form: " + mf); } - } - if (inputFetchStmt != null) { - try { - inputFetchStmt.close(); - } catch (SQLException ex) { - LOG.error("Unable to close input fetch statement", ex); + connectionForms.add(mf); + break; + case JOB: + if (jobForms.size() != formIndex) { + throw new SqoopException(DerbyRepoError.DERBYREPO_0010, + "connector: " + connectorName + "; form: " + mf); } + jobForms.add(mf); + break; + default: + throw new SqoopException(DerbyRepoError.DERBYREPO_0007, + "connector-" + formConnectorId + ":" + mf); } } - if (LOG.isDebugEnabled()) { - LOG.debug("Looking up connector: " + shortName + ", found: " + mc); - } - return mc; } } diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java index b393d96d..b0c64d53 100644 --- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java +++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java @@ -42,7 +42,7 @@ * | SQ_FORM | * +-----------------------------+ * | SQF_ID: BIGINT PK AUTO-GEN | - * | SQF_CONNECTOR: BIGINT | FK SQ_CONNECTOR(SQC_ID) + * | SQF_CONNECTOR: BIGINT | FK SQ_CONNECTOR(SQC_ID),NULL for framework * | SQF_NAME: VARCHAR(64) | * | SQF_TYPE: VARCHAR(32) | "CONNECTION"|"JOB" * | SQF_INDEX: SMALLINT | @@ -108,12 +108,19 @@ public final class DerbySchemaQuery { // DML: Fetch all forms for a given connector - public static final String STMT_FETCH_FORM = + public static final String STMT_FETCH_FORM_CONNECTOR = "SELECT " + COLUMN_SQF_ID + ", " + COLUMN_SQF_CONNECTOR + ", " + COLUMN_SQF_NAME + ", " + COLUMN_SQF_TYPE + ", " + COLUMN_SQF_INDEX + " FROM " + TABLE_SQ_FORM + " WHERE " + COLUMN_SQF_CONNECTOR + " = ? ORDER BY " + COLUMN_SQF_INDEX; + // DML: Fetch all framework forms + public static final String STMT_FETCH_FORM_FRAMEWORK = + "SELECT " + COLUMN_SQF_ID + ", " + COLUMN_SQF_CONNECTOR + ", " + + COLUMN_SQF_NAME + ", " + COLUMN_SQF_TYPE + ", " + COLUMN_SQF_INDEX + + " FROM " + TABLE_SQ_FORM + " WHERE " + COLUMN_SQF_CONNECTOR + + " IS NULL ORDER BY " + COLUMN_SQF_INDEX; + // DML: Fetch inputs for a given form public static final String STMT_FETCH_INPUT = "SELECT " + COLUMN_SQI_ID + ", " + COLUMN_SQI_NAME + ", " diff --git a/server/src/main/java/org/apache/sqoop/server/ServerInitializer.java b/server/src/main/java/org/apache/sqoop/server/ServerInitializer.java index 6519f522..cd2b13eb 100644 --- a/server/src/main/java/org/apache/sqoop/server/ServerInitializer.java +++ b/server/src/main/java/org/apache/sqoop/server/ServerInitializer.java @@ -24,6 +24,7 @@ import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.ConnectorManager; import org.apache.sqoop.core.SqoopConfiguration; +import org.apache.sqoop.framework.FrameworkManager; import org.apache.sqoop.repository.RepositoryManager; @@ -37,6 +38,7 @@ public class ServerInitializer implements ServletContextListener { Logger.getLogger(ServerInitializer.class); public void contextDestroyed(ServletContextEvent arg0) { + FrameworkManager.destroy(); ConnectorManager.destroy(); RepositoryManager.destroy(); SqoopConfiguration.destroy(); @@ -47,6 +49,7 @@ public void contextInitialized(ServletContextEvent arg0) { SqoopConfiguration.initialize(); RepositoryManager.initialize(); ConnectorManager.initialize(); + FrameworkManager.initialize(); } catch (RuntimeException ex) { LOG.error("Server startup failure", ex); throw ex;