diff --git a/common-test/pom.xml b/common-test/pom.xml index 609a8754..2d63f729 100644 --- a/common-test/pom.xml +++ b/common-test/pom.xml @@ -74,7 +74,6 @@ limitations under the License. commons-io commons-io - diff --git a/common-test/src/main/java/org/apache/sqoop/common/test/db/DatabaseProvider.java b/common-test/src/main/java/org/apache/sqoop/common/test/db/DatabaseProvider.java index 0a12d7b2..82289e82 100644 --- a/common-test/src/main/java/org/apache/sqoop/common/test/db/DatabaseProvider.java +++ b/common-test/src/main/java/org/apache/sqoop/common/test/db/DatabaseProvider.java @@ -348,6 +348,25 @@ public void dropTable(String tableName) { } } + /** + * Drop schema. + * + * Any exceptions will be ignored. + * + * @param schemaName + */ + public void dropSchema(String schemaName) { + StringBuilder sb = new StringBuilder("DROP SCHEMA "); + sb.append(escapeTableName(schemaName)); + sb.append(" CASCADE"); + + try { + executeUpdate(sb.toString()); + } catch(RuntimeException e) { + LOG.info("Ignoring exception: " + e); + } + } + /** * Return number of rows from given table. * diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java index ca590d88..5642081e 100644 --- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java +++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java @@ -236,6 +236,7 @@ public abstract class JdbcRepositoryHandler { * @param conn Connection to the repository */ public abstract void deleteLinkInputs(long linkId, Connection conn); + /** * Find link with given id in repository. * diff --git a/pom.xml b/pom.xml index ea157f79..fc65b9d3 100644 --- a/pom.xml +++ b/pom.xml @@ -333,6 +333,11 @@ limitations under the License. sqoop-repository-derby ${project.version} + + org.apache.sqoop.repository + sqoop-repository-postgresql + ${project.version} + org.apache.sqoop connector-sdk diff --git a/repository/pom.xml b/repository/pom.xml index 8c95c0e2..c63595c9 100644 --- a/repository/pom.xml +++ b/repository/pom.xml @@ -35,6 +35,7 @@ limitations under the License. repository-common repository-derby + repository-postgresql diff --git a/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositoryHandler.java b/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositoryHandler.java index 9fa2f9d0..1e139327 100644 --- a/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositoryHandler.java +++ b/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositoryHandler.java @@ -2196,9 +2196,13 @@ private void createInputValues(String query, } /** - * Execute given query on database. + * Execute given query via a PreparedStatement. + * A list of args can be passed to the query. + * + * Example: runQuery("SELECT * FROM example WHERE test = ?", "test"); * * @param query Query that should be executed + * @param args Long, String, or Object arguments */ protected void runQuery(String query, Connection conn, Object... args) { PreparedStatement stmt = null; diff --git a/repository/repository-postgresql/pom.xml b/repository/repository-postgresql/pom.xml new file mode 100644 index 00000000..e5e5ec8b --- /dev/null +++ b/repository/repository-postgresql/pom.xml @@ -0,0 +1,76 @@ + + + + + 4.0.0 + + + org.apache.sqoop + repository + 2.0.0-SNAPSHOT + + + org.apache.sqoop.repository + sqoop-repository-postgresql + Sqoop PostgreSQL Repository + + + + org.apache.sqoop + sqoop-common-test + test + + + + org.apache.sqoop + sqoop-core + + + + org.apache.sqoop.repository + sqoop-repository-common + + + + postgresql + postgresql + + + + commons-lang + commons-lang + + + + junit + junit + test + + + + org.apache.sqoop + sqoop-core + test-jar + test + + + + + diff --git a/repository/repository-postgresql/src/main/java/org/apache/sqoop/repository/postgresql/PostgresqlRepoConstants.java b/repository/repository-postgresql/src/main/java/org/apache/sqoop/repository/postgresql/PostgresqlRepoConstants.java new file mode 100644 index 00000000..bdefd4c3 --- /dev/null +++ b/repository/repository-postgresql/src/main/java/org/apache/sqoop/repository/postgresql/PostgresqlRepoConstants.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.repository.postgresql; + +public class PostgresqlRepoConstants { + + /** + * Expected version of the repository structures. + * + * History: + * 1 - Version 1.99.4 + */ + public static final int LATEST_POSTGRESQL_REPOSITORY_VERSION = 1; + + private PostgresqlRepoConstants() { + // Disable explicit object creation + } +} diff --git a/repository/repository-postgresql/src/main/java/org/apache/sqoop/repository/postgresql/PostgresqlRepoError.java b/repository/repository-postgresql/src/main/java/org/apache/sqoop/repository/postgresql/PostgresqlRepoError.java new file mode 100644 index 00000000..19ee505c --- /dev/null +++ b/repository/repository-postgresql/src/main/java/org/apache/sqoop/repository/postgresql/PostgresqlRepoError.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.repository.postgresql; + +import org.apache.sqoop.common.ErrorCode; + +public enum PostgresqlRepoError implements ErrorCode { + + POSTGRESQLREPO_0000("An unknown error has occurred"), + + POSTGRESQLREPO_0001("Unable to run specified query"), + + POSTGRESQLREPO_0002("Update of driver config failed"), + + POSTGRESQLREPO_0003("Could not add directions"), + + POSTGRESQLREPO_0004("Could not get ID of recently added direction"), + + POSTGRESQLREPO_0005("Unsupported repository version"), + + ; + + private final String message; + + private PostgresqlRepoError(String message) { + this.message = message; + } + + public String getCode() { + return name(); + } + + public String getMessage() { + return message; + } +} diff --git a/repository/repository-postgresql/src/main/java/org/apache/sqoop/repository/postgresql/PostgresqlRepositoryHandler.java b/repository/repository-postgresql/src/main/java/org/apache/sqoop/repository/postgresql/PostgresqlRepositoryHandler.java new file mode 100644 index 00000000..4013d22c --- /dev/null +++ b/repository/repository-postgresql/src/main/java/org/apache/sqoop/repository/postgresql/PostgresqlRepositoryHandler.java @@ -0,0 +1,224 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.repository.postgresql; + +import static org.apache.sqoop.repository.postgresql.PostgresqlSchemaQuery.*; +import static org.apache.sqoop.repository.postgresql.PostgresqlSchemaCreateQuery.*; + +import org.apache.log4j.Logger; +import org.apache.sqoop.common.Direction; +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.repository.JdbcRepositoryContext; +import org.apache.sqoop.repository.common.CommonRepoConstants; +import org.apache.sqoop.repository.common.CommonRepositoryHandler; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Map; +import java.util.TreeMap; + +public class PostgresqlRepositoryHandler extends CommonRepositoryHandler { + private static final Logger LOG = + Logger.getLogger(PostgresqlRepositoryHandler.class); + + private JdbcRepositoryContext repoContext; + + /** + * {@inheritDoc} + */ + @Override + public String name() { + return "PostgreSQL"; + } + + /** + * {@inheritDoc} + */ + @Override + public synchronized void initialize(JdbcRepositoryContext ctx) { + repoContext = ctx; + repoContext.getDataSource(); + LOG.info("PostgresqlRepositoryHandler initialized."); + } + + /** + * {@inheritDoc} + */ + @Override + public synchronized void shutdown() {} + + /** + * Detect version of underlying database structures + * + * @param conn JDBC Connection + * @return + */ + public int detectRepositoryVersion(Connection conn) { + ResultSet rs = null, metadataResultSet = null; + PreparedStatement stmt = null; + + // Select and return the version + try { + DatabaseMetaData md = conn.getMetaData(); + metadataResultSet = md.getTables(null, + PostgresqlSchemaConstants.SCHEMA_SQOOP.toLowerCase(), + PostgresqlSchemaConstants.TABLE_SQ_SYSTEM_NAME.toLowerCase(), null); + + if (metadataResultSet.next()) { + stmt = conn.prepareStatement(STMT_SELECT_SYSTEM); + stmt.setString(1, CommonRepoConstants.SYSKEY_VERSION); + rs = stmt.executeQuery(); + + if (!rs.next()) { + return 0; + } + + return rs.getInt(1); + } + } catch (SQLException e) { + LOG.info("Can't fetch repository structure version.", e); + return 0; + } finally { + closeResultSets(rs); + closeStatements(stmt); + } + + return 0; + } + + /** + * {@inheritDoc} + */ + @Override + public void createOrUpgradeRepository(Connection conn) { + int version = detectRepositoryVersion(conn); + + if (version == PostgresqlRepoConstants.LATEST_POSTGRESQL_REPOSITORY_VERSION) { + return; + } + + if (version == 0) { + runQuery(QUERY_CREATE_SCHEMA_SQOOP, conn); + runQuery(QUERY_CREATE_TABLE_SQ_CONFIGURABLE, conn); + runQuery(QUERY_CREATE_TABLE_SQ_CONFIG, conn); + runQuery(QUERY_CREATE_TABLE_SQ_INPUT, conn); + runQuery(QUERY_CREATE_TABLE_SQ_LINK, conn); + runQuery(QUERY_CREATE_TABLE_SQ_JOB, conn); + runQuery(QUERY_CREATE_TABLE_SQ_LINK_INPUT, conn); + runQuery(QUERY_CREATE_TABLE_SQ_JOB_INPUT, conn); + runQuery(QUERY_CREATE_TABLE_SQ_SUBMISSION, conn); + runQuery(QUERY_CREATE_TABLE_SQ_COUNTER_GROUP, conn); + runQuery(QUERY_CREATE_TABLE_SQ_COUNTER, conn); + runQuery(QUERY_CREATE_TABLE_SQ_COUNTER_SUBMISSION, conn); + runQuery(QUERY_CREATE_TABLE_SQ_SYSTEM, conn); + runQuery(QUERY_CREATE_TABLE_SQ_DIRECTION, conn); + runQuery(QUERY_CREATE_TABLE_SQ_CONNECTOR_DIRECTIONS, conn); + runQuery(QUERY_CREATE_TABLE_SQ_CONFIG_DIRECTIONS, conn); + + // Insert FROM and TO directions. + insertDirections(conn); + } else if (version < 4) { + LOG.error("Found unknown version for PostgreSQL repository: " + version); + throw new SqoopException(PostgresqlRepoError.POSTGRESQLREPO_0005, "Found version: " + version); + } + + ResultSet rs = null; + PreparedStatement stmt = null; + try { + stmt = conn.prepareStatement(STMT_DELETE_SYSTEM); + stmt.setString(1, CommonRepoConstants.SYSKEY_VERSION); + stmt.executeUpdate(); + + closeStatements(stmt); + + stmt = conn.prepareStatement(STMT_INSERT_SYSTEM); + stmt.setString(1, CommonRepoConstants.SYSKEY_VERSION); + stmt.setString(2, Integer.toString(PostgresqlRepoConstants.LATEST_POSTGRESQL_REPOSITORY_VERSION)); + stmt.executeUpdate(); + } catch (SQLException e) { + LOG.error("Can't persist the repository version", e); + } finally { + closeResultSets(rs); + closeStatements(stmt); + } + } + + /** + * Insert directions: FROM and TO. + * @param conn + * @return Map direction ID => Direction + */ + protected Map insertDirections(Connection conn) { + // Add directions + Map directionMap = new TreeMap(); + PreparedStatement insertDirectionStmt = null; + try { + // Insert directions and get IDs. + for (Direction direction : Direction.values()) { + insertDirectionStmt = conn.prepareStatement(STMT_INSERT_DIRECTION, Statement.RETURN_GENERATED_KEYS); + insertDirectionStmt.setString(1, direction.toString()); + if (insertDirectionStmt.executeUpdate() != 1) { + throw new SqoopException(PostgresqlRepoError.POSTGRESQLREPO_0003, "Could not add directions FROM and TO."); + } + + ResultSet directionId = insertDirectionStmt.getGeneratedKeys(); + if (directionId.next()) { + if (LOG.isInfoEnabled()) { + LOG.info("Loaded direction: " + directionId.getLong(1)); + } + + directionMap.put(direction, directionId.getLong(1)); + } else { + throw new SqoopException(PostgresqlRepoError.POSTGRESQLREPO_0004, "Could not get ID of direction " + direction); + } + } + } catch (SQLException e) { + throw new SqoopException(PostgresqlRepoError.POSTGRESQLREPO_0000, e); + } finally { + closeStatements(insertDirectionStmt); + } + + return directionMap; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isRepositorySuitableForUse(Connection conn) { + int version = detectRepositoryVersion(conn); + + if(version != PostgresqlRepoConstants.LATEST_POSTGRESQL_REPOSITORY_VERSION) { + return false; + } + + return true; + } + + /** + * {@inheritDoc} + */ + @Override + public String validationQuery() { + return "values(1)"; // Yes, this is valid PostgreSQL SQL + } +} diff --git a/repository/repository-postgresql/src/main/java/org/apache/sqoop/repository/postgresql/PostgresqlSchemaConstants.java b/repository/repository-postgresql/src/main/java/org/apache/sqoop/repository/postgresql/PostgresqlSchemaConstants.java new file mode 100644 index 00000000..eb8b681f --- /dev/null +++ b/repository/repository-postgresql/src/main/java/org/apache/sqoop/repository/postgresql/PostgresqlSchemaConstants.java @@ -0,0 +1,281 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.repository.postgresql; + +public class PostgresqlSchemaConstants { + public static final String SCHEMA_SQOOP = "SQOOP"; + + private static final String SCHEMA_PREFIX = SCHEMA_SQOOP + "."; + + // SQ_SYSTEM + + public static final String TABLE_SQ_SYSTEM_NAME = "SQ_SYSTEM"; + + public static final String TABLE_SQ_SYSTEM = SCHEMA_PREFIX + + TABLE_SQ_SYSTEM_NAME; + + public static final String COLUMN_SQM_ID = "SQM_ID"; + + public static final String COLUMN_SQM_KEY = "SQM_KEY"; + + public static final String COLUMN_SQM_VALUE = "SQM_VALUE"; + + // SQ_DIRECTION + + public static final String TABLE_SQ_DIRECTION_NAME = "SQ_DIRECTION"; + + public static final String TABLE_SQ_DIRECTION = SCHEMA_PREFIX + + TABLE_SQ_DIRECTION_NAME; + + public static final String COLUMN_SQD_ID = "SQD_ID"; + + public static final String COLUMN_SQD_NAME = "SQD_NAME"; + + // SQ_CONNECTOR + + public static final String TABLE_SQ_CONFIGURABLE_NAME = "SQ_CONFIGURABLE"; + + public static final String TABLE_SQ_CONFIGURABLE = SCHEMA_PREFIX + + TABLE_SQ_CONFIGURABLE_NAME; + + public static final String COLUMN_SQC_ID = "SQC_ID"; + + public static final String COLUMN_SQC_NAME = "SQC_NAME"; + + public static final String COLUMN_SQC_TYPE = "SQC_TYPE"; + + public static final String COLUMN_SQC_CLASS = "SQC_CLASS"; + + public static final String COLUMN_SQC_VERSION = "SQC_VERSION"; + + // SQ_CONNECTOR_DIRECTIONS + + public static final String TABLE_SQ_CONNECTOR_DIRECTIONS_NAME = "SQ_CONNECTOR_DIRECTIONS"; + + public static final String TABLE_SQ_CONNECTOR_DIRECTIONS = SCHEMA_PREFIX + + TABLE_SQ_CONNECTOR_DIRECTIONS_NAME; + + public static final String COLUMN_SQCD_ID = "SQCD_ID"; + + public static final String COLUMN_SQCD_CONNECTOR = "SQCD_CONNECTOR"; + + public static final String COLUMN_SQCD_DIRECTION = "SQCD_DIRECTION"; + + // SQ_CONFIG + + public static final String TABLE_SQ_CONFIG_NAME = "SQ_CONFIG"; + + public static final String TABLE_SQ_CONFIG = SCHEMA_PREFIX + + TABLE_SQ_CONFIG_NAME; + + public static final String COLUMN_SQ_CFG_ID = "SQ_CFG_ID"; + + public static final String COLUMN_SQ_CFG_CONFIGURABLE = "SQ_CFG_CONFIGURABLE"; + + public static final String COLUMN_SQ_CFG_NAME = "SQ_CFG_NAME"; + + public static final String COLUMN_SQ_CFG_TYPE = "SQ_CFG_TYPE"; + + public static final String COLUMN_SQ_CFG_INDEX = "SQ_CFG_INDEX"; + + // SQ_CONFIG_DIRECTIONS + + public static final String TABLE_SQ_CONFIG_DIRECTIONS_NAME = "SQ_CONFIG_DIRECTIONS"; + + public static final String TABLE_SQ_CONFIG_DIRECTIONS = SCHEMA_PREFIX + + TABLE_SQ_CONFIG_DIRECTIONS_NAME; + + public static final String COLUMN_SQ_CFG_DIR_ID = "SQ_CFG_DIR_ID"; + + public static final String COLUMN_SQ_CFG_DIR_CONFIG = "SQ_CFG_DIR_CONFIG"; + + public static final String COLUMN_SQ_CFG_DIR_DIRECTION = "SQ_CFG_DIR_DIRECTION"; + + // SQ_INPUT + + public static final String TABLE_SQ_INPUT_NAME = "SQ_INPUT"; + + public static final String TABLE_SQ_INPUT = SCHEMA_PREFIX + + TABLE_SQ_INPUT_NAME; + + public static final String COLUMN_SQI_ID = "SQI_ID"; + + public static final String COLUMN_SQI_NAME = "SQI_NAME"; + + public static final String COLUMN_SQI_CONFIG = "SQI_CONFIG"; + + public static final String COLUMN_SQI_INDEX = "SQI_INDEX"; + + public static final String COLUMN_SQI_TYPE = "SQI_TYPE"; + + public static final String COLUMN_SQI_STRMASK = "SQI_STRMASK"; + + public static final String COLUMN_SQI_STRLENGTH = "SQI_STRLENGTH"; + + public static final String COLUMN_SQI_ENUMVALS = "SQI_ENUMVALS"; + + public static final String TABLE_SQ_LINK_NAME = "SQ_LINK"; + + // SQ_LINK + + public static final String TABLE_SQ_LINK = SCHEMA_PREFIX + + TABLE_SQ_LINK_NAME; + + public static final String COLUMN_SQ_LNK_ID = "SQ_LNK_ID"; + + public static final String COLUMN_SQ_LNK_NAME = "SQ_LNK_NAME"; + + public static final String COLUMN_SQ_LNK_CONFIGURABLE = "SQ_LNK_CONFIGURABLE"; + + public static final String COLUMN_SQ_LNK_CREATION_USER = "SQ_LNK_CREATION_USER"; + + public static final String COLUMN_SQ_LNK_CREATION_DATE = "SQ_LNK_CREATION_DATE"; + + public static final String COLUMN_SQ_LNK_UPDATE_USER = "SQ_LNK_UPDATE_USER"; + + public static final String COLUMN_SQ_LNK_UPDATE_DATE = "SQ_LNK_UPDATE_DATE"; + + public static final String COLUMN_SQ_LNK_ENABLED = "SQ_LNK_ENABLED"; + + // SQ_JOB + + public static final String TABLE_SQ_JOB_NAME = "SQ_JOB"; + + public static final String TABLE_SQ_JOB = SCHEMA_PREFIX + + TABLE_SQ_JOB_NAME; + + public static final String COLUMN_SQB_ID = "SQB_ID"; + + public static final String COLUMN_SQB_NAME = "SQB_NAME"; + + public static final String COLUMN_SQB_FROM_LINK = "SQB_FROM_LINK"; + + public static final String COLUMN_SQB_TO_LINK = "SQB_TO_LINK"; + + public static final String COLUMN_SQB_CREATION_USER = "SQB_CREATION_USER"; + + public static final String COLUMN_SQB_CREATION_DATE = "SQB_CREATION_DATE"; + + public static final String COLUMN_SQB_UPDATE_USER = "SQB_UPDATE_USER"; + + public static final String COLUMN_SQB_UPDATE_DATE = "SQB_UPDATE_DATE"; + + public static final String COLUMN_SQB_ENABLED = "SQB_ENABLED"; + + // SQ_LINK_INPUT + + public static final String TABLE_SQ_LINK_INPUT_NAME = + "SQ_LINK_INPUT"; + + public static final String TABLE_SQ_LINK_INPUT = SCHEMA_PREFIX + + TABLE_SQ_LINK_INPUT_NAME; + + public static final String COLUMN_SQ_LNKI_LINK = "SQ_LNKI_LINK"; + + public static final String COLUMN_SQ_LNKI_INPUT = "SQ_LNKI_INPUT"; + + public static final String COLUMN_SQ_LNKI_VALUE = "SQ_LNKI_VALUE"; + + // SQ_JOB_INPUT + + public static final String TABLE_SQ_JOB_INPUT_NAME = + "SQ_JOB_INPUT"; + + public static final String TABLE_SQ_JOB_INPUT = SCHEMA_PREFIX + + TABLE_SQ_JOB_INPUT_NAME; + + public static final String COLUMN_SQBI_JOB = "SQBI_JOB"; + + public static final String COLUMN_SQBI_INPUT = "SQBI_INPUT"; + + public static final String COLUMN_SQBI_VALUE = "SQBI_VALUE"; + + // SQ_SUBMISSION + + public static final String TABLE_SQ_SUBMISSION_NAME = + "SQ_SUBMISSION"; + + public static final String TABLE_SQ_SUBMISSION = SCHEMA_PREFIX + + TABLE_SQ_SUBMISSION_NAME; + + public static final String COLUMN_SQS_ID = "SQS_ID"; + + public static final String COLUMN_SQS_JOB = "SQS_JOB"; + + public static final String COLUMN_SQS_STATUS = "SQS_STATUS"; + + public static final String COLUMN_SQS_CREATION_USER = "SQS_CREATION_USER"; + + public static final String COLUMN_SQS_CREATION_DATE = "SQS_CREATION_DATE"; + + public static final String COLUMN_SQS_UPDATE_USER = "SQS_UPDATE_USER"; + + public static final String COLUMN_SQS_UPDATE_DATE = "SQS_UPDATE_DATE"; + + public static final String COLUMN_SQS_EXTERNAL_ID = "SQS_EXTERNAL_ID"; + + public static final String COLUMN_SQS_EXTERNAL_LINK = "SQS_EXTERNAL_LINK"; + + public static final String COLUMN_SQS_EXCEPTION = "SQS_EXCEPTION"; + + public static final String COLUMN_SQS_EXCEPTION_TRACE = "SQS_EXCEPTION_TRACE"; + + // SQ_COUNTER_GROUP + + public static final String TABLE_SQ_COUNTER_GROUP_NAME = + "SQ_COUNTER_GROUP"; + + public static final String TABLE_SQ_COUNTER_GROUP = SCHEMA_PREFIX + + TABLE_SQ_COUNTER_GROUP_NAME; + + public static final String COLUMN_SQG_ID = "SQG_ID"; + + public static final String COLUMN_SQG_NAME = "SQG_NAME"; + + // SQ_COUNTER_GROUP + + public static final String TABLE_SQ_COUNTER_NAME = + "SQ_COUNTER"; + + public static final String TABLE_SQ_COUNTER = SCHEMA_PREFIX + + TABLE_SQ_COUNTER_NAME; + + public static final String COLUMN_SQR_ID = "SQR_ID"; + + public static final String COLUMN_SQR_NAME = "SQR_NAME"; + + // SQ_COUNTER_SUBMISSION + + public static final String TABLE_SQ_COUNTER_SUBMISSION_NAME = + "SQ_COUNTER_SUBMISSION"; + + public static final String TABLE_SQ_COUNTER_SUBMISSION = SCHEMA_PREFIX + + TABLE_SQ_COUNTER_SUBMISSION_NAME; + + public static final String COLUMN_SQRS_GROUP = "SQRS_GROUP"; + + public static final String COLUMN_SQRS_COUNTER = "SQRS_COUNTER"; + + public static final String COLUMN_SQRS_SUBMISSION = "SQRS_SUBMISSION"; + + public static final String COLUMN_SQRS_VALUE = "SQRS_VALUE"; + + private PostgresqlSchemaConstants() { + // Disable explicit object creation + } +} diff --git a/repository/repository-postgresql/src/main/java/org/apache/sqoop/repository/postgresql/PostgresqlSchemaCreateQuery.java b/repository/repository-postgresql/src/main/java/org/apache/sqoop/repository/postgresql/PostgresqlSchemaCreateQuery.java new file mode 100644 index 00000000..9caaa6ac --- /dev/null +++ b/repository/repository-postgresql/src/main/java/org/apache/sqoop/repository/postgresql/PostgresqlSchemaCreateQuery.java @@ -0,0 +1,376 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.repository.postgresql; + +import static org.apache.sqoop.repository.postgresql.PostgresqlSchemaConstants.*; + +/** + * DDL queries that create the Sqoop repository schema in PostgreSQL database. These + * queries create the following tables: + *

+ * SQ_SYSTEM: Store for various state information + *

+ *    +----------------------------+
+ *    | SQ_SYSTEM                  |
+ *    +----------------------------+
+ *    | SQM_ID: BIGSERIAL PK       |
+ *    | SQM_KEY: VARCHAR(64)       |
+ *    | SQM_VALUE: VARCHAR(64)     |
+ *    +----------------------------+
+ * 
+ *

+ *

+ * SQ_DIRECTION: Directions. + *

+ *    +--------------------------+
+ *    | SQ_DIRECTION             |
+ *    +--------------------------+
+ *    | SQD_ID: BIGSERIAL PK     |
+ *    | SQD_NAME: VARCHAR(64)    | "FROM"|"TO"
+ *    +--------------------------+
+ * 
+ *

+ *

+ * SQ_CONFIGURABLE: Configurable registration. + *

+ *    +-----------------------------+
+ *    | SQ_CONFIGURABLE             |
+ *    +-----------------------------+
+ *    | SQC_ID: BIGINT PK AUTO-GEN  |
+ *    | SQC_NAME: VARCHAR(64)       |
+ *    | SQC_CLASS: VARCHAR(255)     |
+ *    | SQC_TYPE: VARCHAR(32)       |"CONNECTOR"|"DRIVER"
+ *    | SQC_VERSION: VARCHAR(64)    |
+ *    +-----------------------------+
+ * 
+ *

+ *

+ * SQ_CONNECTOR_DIRECTIONS: Connector directions. + *

+ *    +------------------------------+
+ *    | SQ_CONNECTOR_DIRECTIONS      |
+ *    +------------------------------+
+ *    | SQCD_ID: BIGSERIAL PK        |
+ *    | SQCD_CONNECTOR: BIGINT       | FK SQCD_CONNECTOR(SQC_ID)
+ *    | SQCD_DIRECTION: BIGINT       | FK SQCD_DIRECTION(SQD_ID)
+ *    +------------------------------+
+ * 
+ *

+ *

+ * SQ_CONFIG: Config details. + *

+ *    +-------------------------------------+
+ *    | SQ_CONFIG                           |
+ *    +-------------------------------------+
+ *    | SQ_CFG_ID: BIGSERIAL PK             |
+ *    | SQ_CFG_CONNECTOR: BIGINT            | FK SQ_CFG_CONNECTOR(SQC_ID),NULL for driver
+ *    | SQ_CFG_NAME: VARCHAR(64)            |
+ *    | SQ_CFG_TYPE: VARCHAR(32)            | "LINK"|"JOB"
+ *    | SQ_CFG_INDEX: SMALLINT              |
+ *    +-------------------------------------+
+ * 
+ *

+ *

+ * SQ_CONFIG_DIRECTIONS: Connector directions. + *

+ *    +------------------------------+
+ *    | SQ_CONFIG_DIRECTIONS         |
+ *    +------------------------------+
+ *    | SQ_CFG_ID: BIGSERIAL PK      |
+ *    | SQ_CFG_DIR_CONFIG: BIGINT    | FK SQ_CFG_DIR_CONFIG(SQ_CFG_ID)
+ *    | SQ_CFG_DIR_DIRECTION: BIGINT | FK SQ_CFG_DIR_DIRECTION(SQD_ID)
+ *    +------------------------------+
+ * 
+ *

+ *

+ * SQ_INPUT: Input details + *

+ *    +----------------------------+
+ *    | SQ_INPUT                   |
+ *    +----------------------------+
+ *    | SQI_ID: BIGSERIAL PK       |
+ *    | SQI_NAME: VARCHAR(64)      |
+ *    | SQI_CONFIG: BIGINT         | FK SQI_CONFIG(SQ_CFG_ID)
+ *    | SQI_INDEX: SMALLINT        |
+ *    | SQI_TYPE: VARCHAR(32)      | "STRING"|"MAP"
+ *    | SQI_STRMASK: BOOLEAN       |
+ *    | SQI_STRLENGTH: SMALLINT    |
+ *    | SQI_ENUMVALS: VARCHAR(100) |
+ *    +----------------------------+
+ * 
+ *

+ *

+ * SQ_LINK: Stored connections + *

+ *    +-----------------------------------+
+ *    | SQ_LINK                           |
+ *    +-----------------------------------+
+ *    | SQ_LNK_ID: BIGSERIAL PK           |
+ *    | SQ_LNK_NAME: VARCHAR(64)          |
+ *    | SQ_LNK_CONNECTOR: BIGINT          | FK SQ_CONNECTOR(SQC_ID)
+ *    | SQ_LNK_CREATION_USER: VARCHAR(32) |
+ *    | SQ_LNK_CREATION_DATE: TIMESTAMP   |
+ *    | SQ_LNK_UPDATE_USER: VARCHAR(32)   |
+ *    | SQ_LNK_UPDATE_DATE: TIMESTAMP     |
+ *    | SQ_LNK_ENABLED: BOOLEAN           |
+ *    +-----------------------------------+
+ * 
+ *

+ *

+ * SQ_JOB: Stored jobs + *

+ *    +--------------------------------+
+ *    | SQ_JOB                         |
+ *    +--------------------------------+
+ *    | SQB_ID: BIGSERIAL PK           |
+ *    | SQB_NAME: VARCHAR(64)          |
+ *    | SQB_FROM_LINK: BIGINT          | FK SQ_LINK(SQ_LNK_ID)
+ *    | SQB_TO_LINK: BIGINT            | FK SQ_LINK(SQ_LNK_ID)
+ *    | SQB_CREATION_USER: VARCHAR(32) |
+ *    | SQB_CREATION_DATE: TIMESTAMP   |
+ *    | SQB_UPDATE_USER: VARCHAR(32)   |
+ *    | SQB_UPDATE_DATE: TIMESTAMP     |
+ *    | SQB_ENABLED: BOOLEAN           |
+ *    +--------------------------------+
+ * 
+ *

+ *

+ * SQ_LINK_INPUT: N:M relationship link and input + *

+ *    +----------------------------+
+ *    | SQ_LINK_INPUT              |
+ *    +----------------------------+
+ *    | SQ_LNK_LINK: BIGSERIAL     | FK SQ_LINK(SQ_LNK_ID)
+ *    | SQ_LNK_INPUT: BIGINT       | FK SQ_INPUT(SQI_ID)
+ *    | SQ_LNK_VALUE: VARCHAR      |
+ *    +----------------------------+
+ * 
+ *

+ *

+ * SQ_JOB_INPUT: N:M relationship job and input + *

+ *    +----------------------------+
+ *    | SQ_JOB_INPUT               |
+ *    +----------------------------+
+ *    | SQBI_JOB: BIGINT           | FK SQ_JOB(SQB_ID)
+ *    | SQBI_INPUT: BIGINT         | FK SQ_INPUT(SQI_ID)
+ *    | SQBI_VALUE: VARCHAR(1000)  |
+ *    +----------------------------+
+ * 
+ *

+ *

+ * SQ_SUBMISSION: List of submissions + *

+ *    +-----------------------------------+
+ *    | SQ_JOB_SUBMISSION                 |
+ *    +-----------------------------------+
+ *    | SQS_ID: BIGSERIAL PK              |
+ *    | SQS_JOB: BIGINT                   | FK SQ_JOB(SQB_ID)
+ *    | SQS_STATUS: VARCHAR(20)           |
+ *    | SQS_CREATION_USER: VARCHAR(32)    |
+ *    | SQS_CREATION_DATE: TIMESTAMP      |
+ *    | SQS_UPDATE_USER: VARCHAR(32)      |
+ *    | SQS_UPDATE_DATE: TIMESTAMP        |
+ *    | SQS_EXTERNAL_ID: VARCHAR(50)      |
+ *    | SQS_EXTERNAL_LINK: VARCHAR(150)   |
+ *    | SQS_EXCEPTION: VARCHAR(150)       |
+ *    | SQS_EXCEPTION_TRACE: VARCHAR(750) |
+ *    +-----------------------------------+
+ * 
+ *

+ *

+ * SQ_COUNTER_GROUP: List of counter groups + *

+ *    +----------------------------+
+ *    | SQ_COUNTER_GROUP           |
+ *    +----------------------------+
+ *    | SQG_ID: BIGINT PK          |
+ *    | SQG_NAME: VARCHAR(75)      |
+ *    +----------------------------+
+ * 
+ *

+ *

+ * SQ_COUNTER: List of counters + *

+ *    +----------------------------+
+ *    | SQ_COUNTER                 |
+ *    +----------------------------+
+ *    | SQR_ID: BIGINT PK          |
+ *    | SQR_NAME: VARCHAR(75)      |
+ *    +----------------------------+
+ * 
+ *

+ *

+ * SQ_COUNTER_SUBMISSION: N:M Relationship + *

+ *    +----------------------------+
+ *    | SQ_COUNTER_SUBMISSION      |
+ *    +----------------------------+
+ *    | SQRS_GROUP: BIGINT PK      | FK SQ_COUNTER_GROUP(SQR_ID)
+ *    | SQRS_COUNTER: BIGINT PK    | FK SQ_COUNTER(SQR_ID)
+ *    | SQRS_SUBMISSION: BIGINT PK | FK SQ_SUBMISSION(SQS_ID)
+ *    | SQRS_VALUE: BIGINT         |
+ *    +----------------------------+
+ * 
+ *

+ */ +public class PostgresqlSchemaCreateQuery { + + public static final String QUERY_CREATE_SCHEMA_SQOOP = + "CREATE SCHEMA " + SCHEMA_SQOOP; + + public static final String QUERY_CREATE_TABLE_SQ_SYSTEM = + "CREATE TABLE " + TABLE_SQ_SYSTEM + " (" + + COLUMN_SQM_ID + " BIGSERIAL PRIMARY KEY NOT NULL, " + + COLUMN_SQM_KEY + " VARCHAR(64), " + + COLUMN_SQM_VALUE + " VARCHAR(64) " + + ")"; + + public static final String QUERY_CREATE_TABLE_SQ_DIRECTION = + "CREATE TABLE " + TABLE_SQ_DIRECTION + " (" + + COLUMN_SQD_ID + " BIGSERIAL PRIMARY KEY NOT NULL, " + + COLUMN_SQD_NAME + " VARCHAR(64)" + + ")"; + + public static final String QUERY_CREATE_TABLE_SQ_CONFIGURABLE = + "CREATE TABLE " + TABLE_SQ_CONFIGURABLE + " (" + + COLUMN_SQC_ID + " BIGSERIAL PRIMARY KEY NOT NULL, " + + COLUMN_SQC_NAME + " VARCHAR(64) UNIQUE, " + + COLUMN_SQC_TYPE + " VARCHAR(32), " + + COLUMN_SQC_CLASS + " VARCHAR(255), " + + COLUMN_SQC_VERSION + " VARCHAR(64) " + + ")"; + + public static final String QUERY_CREATE_TABLE_SQ_CONNECTOR_DIRECTIONS = + "CREATE TABLE " + TABLE_SQ_CONNECTOR_DIRECTIONS + " (" + + COLUMN_SQCD_ID + " BIGSERIAL PRIMARY KEY NOT NULL, " + + COLUMN_SQCD_CONNECTOR + " BIGINT REFERENCES " + TABLE_SQ_CONFIGURABLE + "(" + COLUMN_SQC_ID + ")" + ", " + + COLUMN_SQCD_DIRECTION + " BIGINT REFERENCES " + TABLE_SQ_DIRECTION + "(" + COLUMN_SQD_ID + ")" + + ")"; + + public static final String QUERY_CREATE_TABLE_SQ_CONFIG = + "CREATE TABLE " + TABLE_SQ_CONFIG + " (" + + COLUMN_SQ_CFG_ID + " BIGSERIAL PRIMARY KEY NOT NULL, " + + COLUMN_SQ_CFG_CONFIGURABLE + " BIGINT REFERENCES " + TABLE_SQ_CONFIGURABLE + "(" + COLUMN_SQC_ID + ")" + ", " + + COLUMN_SQ_CFG_NAME + " VARCHAR(64), " + + COLUMN_SQ_CFG_TYPE + " VARCHAR(32), " + + COLUMN_SQ_CFG_INDEX + " SMALLINT, " + + "UNIQUE (" + COLUMN_SQ_CFG_NAME + ", " + COLUMN_SQ_CFG_TYPE + ", " + COLUMN_SQ_CFG_CONFIGURABLE + ") " + + ")"; + + public static final String QUERY_CREATE_TABLE_SQ_CONFIG_DIRECTIONS = + "CREATE TABLE " + TABLE_SQ_CONFIG_DIRECTIONS + " (" + + COLUMN_SQ_CFG_DIR_ID + " BIGSERIAL PRIMARY KEY NOT NULL, " + + COLUMN_SQ_CFG_DIR_CONFIG + " BIGINT REFERENCES " + TABLE_SQ_CONFIG + "(" + COLUMN_SQ_CFG_ID + ")" + ", " + + COLUMN_SQ_CFG_DIR_DIRECTION + " BIGINT REFERENCES " + TABLE_SQ_DIRECTION + "(" + COLUMN_SQD_ID + ")" + + ")"; + + public static final String QUERY_CREATE_TABLE_SQ_INPUT = + "CREATE TABLE " + TABLE_SQ_INPUT + " (" + + COLUMN_SQI_ID + " BIGSERIAL PRIMARY KEY NOT NULL, " + + COLUMN_SQI_NAME + " VARCHAR(64), " + + COLUMN_SQI_CONFIG + " BIGINT REFERENCES " + TABLE_SQ_CONFIG + "(" + COLUMN_SQ_CFG_ID + ")" + ", " + + COLUMN_SQI_INDEX + " SMALLINT, " + + COLUMN_SQI_TYPE + " VARCHAR(32), " + + COLUMN_SQI_STRMASK + " BOOLEAN, " + + COLUMN_SQI_STRLENGTH + " SMALLINT, " + + COLUMN_SQI_ENUMVALS + " VARCHAR(100), " + + " UNIQUE (" + COLUMN_SQI_NAME + ", " + COLUMN_SQI_TYPE + ", " + COLUMN_SQI_CONFIG + ") " + + ")"; + + public static final String QUERY_CREATE_TABLE_SQ_LINK = + "CREATE TABLE " + TABLE_SQ_LINK + " (" + + COLUMN_SQ_LNK_ID + " BIGSERIAL PRIMARY KEY NOT NULL, " + + COLUMN_SQ_LNK_CONFIGURABLE + " BIGINT REFERENCES " + TABLE_SQ_CONFIGURABLE + "(" + COLUMN_SQC_ID + ")" + ", " + + COLUMN_SQ_LNK_NAME + " VARCHAR(32) UNIQUE, " + + COLUMN_SQ_LNK_CREATION_DATE + " TIMESTAMP, " + + COLUMN_SQ_LNK_CREATION_USER + " VARCHAR(32) DEFAULT NULL, " + + COLUMN_SQ_LNK_UPDATE_DATE + " TIMESTAMP, " + + COLUMN_SQ_LNK_UPDATE_USER + " VARCHAR(32) DEFAULT NULL, " + + COLUMN_SQ_LNK_ENABLED + " BOOLEAN DEFAULT TRUE" + + ")"; + + public static final String QUERY_CREATE_TABLE_SQ_JOB = + "CREATE TABLE " + TABLE_SQ_JOB + " (" + + COLUMN_SQB_ID + " BIGSERIAL PRIMARY KEY NOT NULL, " + + COLUMN_SQB_FROM_LINK + " BIGINT REFERENCES " + TABLE_SQ_LINK + "(" + COLUMN_SQ_LNK_ID + ")" + ", " + + COLUMN_SQB_TO_LINK + " BIGINT REFERENCES " + TABLE_SQ_LINK + "(" + COLUMN_SQ_LNK_ID + ")" + ", " + + COLUMN_SQB_NAME + " VARCHAR(64) UNIQUE, " + + COLUMN_SQB_CREATION_DATE + " TIMESTAMP, " + + COLUMN_SQB_CREATION_USER + " VARCHAR(32) DEFAULT NULL, " + + COLUMN_SQB_UPDATE_DATE + " TIMESTAMP, " + + COLUMN_SQB_UPDATE_USER + " VARCHAR(32) DEFAULT NULL, " + + COLUMN_SQB_ENABLED + " BOOLEAN DEFAULT TRUE" + + ")"; + + public static final String QUERY_CREATE_TABLE_SQ_LINK_INPUT = + "CREATE TABLE " + TABLE_SQ_LINK_INPUT + " (" + + COLUMN_SQ_LNKI_LINK + " BIGINT REFERENCES " + TABLE_SQ_LINK + "(" + COLUMN_SQ_LNK_ID + ")" + ", " + + COLUMN_SQ_LNKI_INPUT + " BIGINT REFERENCES " + TABLE_SQ_INPUT + "(" + COLUMN_SQI_ID + ")" + ", " + + COLUMN_SQ_LNKI_VALUE + " VARCHAR, " + + "PRIMARY KEY (" + COLUMN_SQ_LNKI_LINK + ", " + COLUMN_SQ_LNKI_INPUT + ")" + + ")"; + + public static final String QUERY_CREATE_TABLE_SQ_JOB_INPUT = + "CREATE TABLE " + TABLE_SQ_JOB_INPUT + " (" + + COLUMN_SQBI_JOB + " BIGINT REFERENCES " + TABLE_SQ_JOB + "(" + COLUMN_SQB_ID + ")" + ", " + + COLUMN_SQBI_INPUT + " BIGINT REFERENCES " + TABLE_SQ_INPUT + "(" + COLUMN_SQI_ID + ")" + ", " + + COLUMN_SQBI_VALUE + " VARCHAR(1000), " + + "PRIMARY KEY (" + COLUMN_SQBI_JOB + ", " + COLUMN_SQBI_INPUT + ")" + + ")"; + + public static final String QUERY_CREATE_TABLE_SQ_SUBMISSION = + "CREATE TABLE " + TABLE_SQ_SUBMISSION + " (" + + COLUMN_SQS_ID + " BIGSERIAL PRIMARY KEY NOT NULL, " + + COLUMN_SQS_JOB + " BIGINT REFERENCES " + TABLE_SQ_JOB + "(" + COLUMN_SQB_ID + ")" + ", " + + COLUMN_SQS_STATUS + " VARCHAR(20), " + + COLUMN_SQS_CREATION_DATE + " TIMESTAMP, " + + COLUMN_SQS_CREATION_USER + " VARCHAR(32) DEFAULT NULL, " + + COLUMN_SQS_UPDATE_DATE + " TIMESTAMP, " + + COLUMN_SQS_UPDATE_USER + " VARCHAR(32) DEFAULT NULL, " + + COLUMN_SQS_EXTERNAL_ID + " VARCHAR(50), " + + COLUMN_SQS_EXTERNAL_LINK + " VARCHAR(150), " + + COLUMN_SQS_EXCEPTION + " VARCHAR(150), " + + COLUMN_SQS_EXCEPTION_TRACE + " VARCHAR(750)" + + ")"; + + public static final String QUERY_CREATE_TABLE_SQ_COUNTER_GROUP = + "CREATE TABLE " + TABLE_SQ_COUNTER_GROUP + " (" + + COLUMN_SQG_ID + " BIGSERIAL PRIMARY KEY NOT NULL, " + + COLUMN_SQG_NAME + " VARCHAR(75) UNIQUE" + + ")"; + + public static final String QUERY_CREATE_TABLE_SQ_COUNTER = + "CREATE TABLE " + TABLE_SQ_COUNTER + " (" + + COLUMN_SQR_ID + " BIGSERIAL PRIMARY KEY NOT NULL, " + + COLUMN_SQR_NAME + " VARCHAR(75) UNIQUE" + + ")"; + + public static final String QUERY_CREATE_TABLE_SQ_COUNTER_SUBMISSION = + "CREATE TABLE " + TABLE_SQ_COUNTER_SUBMISSION + " (" + + COLUMN_SQRS_GROUP + " BIGINT REFERENCES " + TABLE_SQ_COUNTER_GROUP + "(" + COLUMN_SQG_ID + ")" + ", " + + COLUMN_SQRS_COUNTER + " BIGINT REFERENCES " + TABLE_SQ_COUNTER + "(" + COLUMN_SQR_ID + ")" + ", " + + COLUMN_SQRS_SUBMISSION + " BIGINT REFERENCES " + TABLE_SQ_SUBMISSION + "(" + COLUMN_SQS_ID + ") ON DELETE CASCADE" + ", " + + COLUMN_SQRS_VALUE + " BIGINT, " + + "PRIMARY KEY (" + COLUMN_SQRS_GROUP + ", " + COLUMN_SQRS_COUNTER + ", " + COLUMN_SQRS_SUBMISSION + ")" + + ")"; + + private PostgresqlSchemaCreateQuery() { + // Disable explicit object creation + } +} diff --git a/repository/repository-postgresql/src/main/java/org/apache/sqoop/repository/postgresql/PostgresqlSchemaQuery.java b/repository/repository-postgresql/src/main/java/org/apache/sqoop/repository/postgresql/PostgresqlSchemaQuery.java new file mode 100644 index 00000000..a6319ac9 --- /dev/null +++ b/repository/repository-postgresql/src/main/java/org/apache/sqoop/repository/postgresql/PostgresqlSchemaQuery.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.repository.postgresql; + +import static org.apache.sqoop.repository.postgresql.PostgresqlSchemaConstants.*; + +/** + * DML for PostgreSQL repository. + */ +public class PostgresqlSchemaQuery { + + public static final String STMT_SELECT_SYSTEM = + "SELECT " + + COLUMN_SQM_VALUE + + " FROM " + TABLE_SQ_SYSTEM + + " WHERE " + COLUMN_SQM_KEY + " = ?"; + + public static final String STMT_DELETE_SYSTEM = + "DELETE FROM " + TABLE_SQ_SYSTEM + + " WHERE " + COLUMN_SQM_KEY + " = ?"; + + public static final String STMT_INSERT_SYSTEM = + "INSERT INTO " + TABLE_SQ_SYSTEM + "(" + + COLUMN_SQM_KEY + ", " + + COLUMN_SQM_VALUE + ") " + + "VALUES(?, ?)"; + + public static final String STMT_INSERT_DIRECTION = + "INSERT INTO " + TABLE_SQ_DIRECTION + + " (" + COLUMN_SQD_NAME+ ") VALUES (?)"; + + private PostgresqlSchemaQuery() { + // Disable explicit object creation + } +} diff --git a/repository/repository-postgresql/src/test/java/org/apache/sqoop/repository/postgresql/PostgresqlTestCase.java b/repository/repository-postgresql/src/test/java/org/apache/sqoop/repository/postgresql/PostgresqlTestCase.java new file mode 100644 index 00000000..5d80dce3 --- /dev/null +++ b/repository/repository-postgresql/src/test/java/org/apache/sqoop/repository/postgresql/PostgresqlTestCase.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.repository.postgresql; + +import org.apache.sqoop.common.test.db.DatabaseProvider; +import org.apache.sqoop.common.test.db.PostgreSQLProvider; +import org.junit.After; +import org.junit.Assume; +import org.junit.Before; +import org.junit.BeforeClass; + +/** + * Abstract class with convenience methods for testing postgresql repository. + */ +abstract public class PostgresqlTestCase { + + public static DatabaseProvider provider; + public static PostgresqlTestUtils utils; + public PostgresqlRepositoryHandler handler; + + @BeforeClass + public static void setUpClass() { + provider = new PostgreSQLProvider(); + utils = new PostgresqlTestUtils(provider); + } + + @Before + public void setUp() throws Exception { + try { + provider.start(); + } catch (RuntimeException e) { + Assume.assumeTrue(false); + } + + handler = new PostgresqlRepositoryHandler(); + handler.createOrUpgradeRepository(provider.getConnection()); + } + + @After + public void tearDown() throws Exception { + provider.dropSchema("sqoop"); + provider.stop(); + } +} \ No newline at end of file diff --git a/repository/repository-postgresql/src/test/java/org/apache/sqoop/repository/postgresql/PostgresqlTestUtils.java b/repository/repository-postgresql/src/test/java/org/apache/sqoop/repository/postgresql/PostgresqlTestUtils.java new file mode 100644 index 00000000..f3935215 --- /dev/null +++ b/repository/repository-postgresql/src/test/java/org/apache/sqoop/repository/postgresql/PostgresqlTestUtils.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.repository.postgresql; + +import org.apache.sqoop.common.test.db.DatabaseProvider; + +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +public class PostgresqlTestUtils { + + private DatabaseProvider provider; + + public PostgresqlTestUtils(DatabaseProvider provider) { + this.provider = provider; + } + + public void assertTableExists(String schema, String table) throws Exception { + DatabaseMetaData md = provider.getConnection().getMetaData(); + ResultSet rs = md.getTables(null, schema, table, null); + while (rs.next()) { + if (rs.getString(3).equals(table)) { + return; + } + } + + throw new AssertionError("Could not find table '" + table + "' part of schema '" + schema + "'"); + } + + public void assertForeignKey(String schema, String table, String column, + String foreignKeyTable, String foreignKeyColumn) throws Exception { + DatabaseMetaData md = provider.getConnection().getMetaData(); + ResultSet rs = md.getCrossReference(null, schema, table, null, schema, foreignKeyTable); + while (rs.next()) { + if (rs.getString(4).equals(column) && rs.getString(8).equals(foreignKeyColumn)) { + return; + } + } + + throw new AssertionError("Could not find '" + table + "." + column + + "' part of schema '" + schema + "' with reference to '" + table + "." + column + "'"); + } + + public void assertUniqueConstraints(String schema, String table, String... columns) throws Exception { + Set columnSet = new TreeSet(Arrays.asList(columns)); + Map> indexColumnMap = new HashMap>(); + DatabaseMetaData md = provider.getConnection().getMetaData(); + ResultSet rs = md.getIndexInfo(null, schema, table, true, false); + + // Get map of index => columns + while (rs.next()) { + String indexName = rs.getString(6); + String columnName = rs.getString(9); + if (!indexColumnMap.containsKey(indexName)) { + indexColumnMap.put(indexName, new TreeSet()); + } + indexColumnMap.get(indexName).add(columnName); + } + + // Validate unique constraints + for (String index : indexColumnMap.keySet()) { + if (indexColumnMap.get(index).equals(columnSet)) { + return; + } + } + + throw new AssertionError("Could not find unique constraint on table '" + table + + "' part of schema '" + schema + "' with reference to columns '" + columnSet + "'"); + } +} \ No newline at end of file diff --git a/repository/repository-postgresql/src/test/java/org/apache/sqoop/repository/postgresql/TestStructure.java b/repository/repository-postgresql/src/test/java/org/apache/sqoop/repository/postgresql/TestStructure.java new file mode 100644 index 00000000..2da19bc9 --- /dev/null +++ b/repository/repository-postgresql/src/test/java/org/apache/sqoop/repository/postgresql/TestStructure.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.repository.postgresql; + +import org.junit.Test; + +/** + * Test connector methods on PostgreSQL repository. + */ +public class TestStructure extends PostgresqlTestCase { + + @Test + public void testTables() throws Exception { + utils.assertTableExists("sqoop", "sq_system"); + utils.assertTableExists("sqoop", "sq_direction"); + utils.assertTableExists("sqoop", "sq_configurable"); + utils.assertTableExists("sqoop", "sq_connector_directions"); + utils.assertTableExists("sqoop", "sq_config"); + utils.assertTableExists("sqoop", "sq_connector_directions"); + utils.assertTableExists("sqoop", "sq_input"); + utils.assertTableExists("sqoop", "sq_link"); + utils.assertTableExists("sqoop", "sq_job"); + utils.assertTableExists("sqoop", "sq_link_input"); + utils.assertTableExists("sqoop", "sq_job_input"); + utils.assertTableExists("sqoop", "sq_submission"); + utils.assertTableExists("sqoop", "sq_counter_group"); + utils.assertTableExists("sqoop", "sq_counter"); + utils.assertTableExists("sqoop", "sq_counter_submission"); + } + + @Test + public void testForeignKeys() throws Exception { + utils.assertForeignKey("sqoop", "sq_configurable", "sqc_id", "sq_connector_directions", "sqcd_connector"); + utils.assertForeignKey("sqoop", "sq_direction", "sqd_id", "sq_connector_directions", "sqcd_direction"); + utils.assertForeignKey("sqoop", "sq_configurable", "sqc_id", "sq_config", "sq_cfg_configurable"); + utils.assertForeignKey("sqoop", "sq_config", "sq_cfg_id", "sq_config_directions", "sq_cfg_dir_config"); + utils.assertForeignKey("sqoop", "sq_direction", "sqd_id", "sq_config_directions", "sq_cfg_dir_direction"); + utils.assertForeignKey("sqoop", "sq_config", "sq_cfg_id", "sq_input", "sqi_config"); + utils.assertForeignKey("sqoop", "sq_configurable", "sqc_id", "sq_link", "sq_lnk_configurable"); + utils.assertForeignKey("sqoop", "sq_link", "sq_lnk_id", "sq_job", "sqb_from_link"); + utils.assertForeignKey("sqoop", "sq_link", "sq_lnk_id", "sq_job", "sqb_to_link"); + utils.assertForeignKey("sqoop", "sq_link", "sq_lnk_id", "sq_link_input", "sq_lnki_link"); + utils.assertForeignKey("sqoop", "sq_input", "sqi_id", "sq_link_input", "sq_lnki_input"); + utils.assertForeignKey("sqoop", "sq_job", "sqb_id", "sq_job_input", "sqbi_job"); + utils.assertForeignKey("sqoop", "sq_input", "sqi_id", "sq_job_input", "sqbi_input"); + utils.assertForeignKey("sqoop", "sq_job", "sqb_id", "sq_submission", "sqs_job"); + utils.assertForeignKey("sqoop", "sq_counter", "sqr_id", "sq_counter_submission", "sqrs_counter"); + utils.assertForeignKey("sqoop", "sq_counter_group", "sqg_id", "sq_counter_submission", "sqrs_group"); + utils.assertForeignKey("sqoop", "sq_submission", "sqs_id", "sq_counter_submission", "sqrs_submission"); + } + + @Test + public void testUniqueConstraints() throws Exception { + utils.assertUniqueConstraints("sqoop", "sq_configurable", "sqc_name"); + utils.assertUniqueConstraints("sqoop", "sq_link", "sq_lnk_name"); + utils.assertUniqueConstraints("sqoop", "sq_job", "sqb_name"); + utils.assertUniqueConstraints("sqoop", "sq_config", "sq_cfg_name", "sq_cfg_configurable", "sq_cfg_type"); + utils.assertUniqueConstraints("sqoop", "sq_input", "sqi_name", "sqi_type", "sqi_config"); + utils.assertUniqueConstraints("sqoop", "sq_counter", "sqr_name"); + utils.assertUniqueConstraints("sqoop", "sq_counter_group", "sqg_name"); + } +} diff --git a/repository/repository-postgresql/src/test/resources/log4j.properties b/repository/repository-postgresql/src/test/resources/log4j.properties new file mode 100644 index 00000000..44ffced2 --- /dev/null +++ b/repository/repository-postgresql/src/test/resources/log4j.properties @@ -0,0 +1,24 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Set root logger level to DEBUG and its only appender to A1. +log4j.rootLogger=DEBUG, A1 + +# A1 is set to be a ConsoleAppender. +log4j.appender.A1=org.apache.log4j.ConsoleAppender + +# A1 uses PatternLayout. +log4j.appender.A1.layout=org.apache.log4j.PatternLayout +log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n diff --git a/server/pom.xml b/server/pom.xml index 77477eee..5a35bf39 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -69,6 +69,11 @@ limitations under the License. sqoop-repository-derby
+ + org.apache.sqoop.repository + sqoop-repository-postgresql + + org.apache.sqoop.connector sqoop-connector-generic-jdbc