5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-04 23:12:38 +08:00

SQOOP-1590: Sqoop2: PostgreSQL repository implementation

(Abraham Elmahrek via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2015-01-05 07:50:27 +01:00
parent c89f168d49
commit fdc40d39c6
18 changed files with 1377 additions and 2 deletions

View File

@ -74,7 +74,6 @@ limitations under the License.
<groupId>commons-io</groupId> <groupId>commons-io</groupId>
<artifactId>commons-io</artifactId> <artifactId>commons-io</artifactId>
</dependency> </dependency>
</dependencies> </dependencies>
<profiles> <profiles>

View File

@ -348,6 +348,25 @@ public void dropTable(String tableName) {
} }
} }
/**
* Drop schema.
*
* Any exceptions will be ignored.
*
* @param schemaName
*/
public void dropSchema(String schemaName) {
StringBuilder sb = new StringBuilder("DROP SCHEMA ");
sb.append(escapeTableName(schemaName));
sb.append(" CASCADE");
try {
executeUpdate(sb.toString());
} catch(RuntimeException e) {
LOG.info("Ignoring exception: " + e);
}
}
/** /**
* Return number of rows from given table. * Return number of rows from given table.
* *

View File

@ -236,6 +236,7 @@ public abstract class JdbcRepositoryHandler {
* @param conn Connection to the repository * @param conn Connection to the repository
*/ */
public abstract void deleteLinkInputs(long linkId, Connection conn); public abstract void deleteLinkInputs(long linkId, Connection conn);
/** /**
* Find link with given id in repository. * Find link with given id in repository.
* *

View File

@ -333,6 +333,11 @@ limitations under the License.
<artifactId>sqoop-repository-derby</artifactId> <artifactId>sqoop-repository-derby</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.sqoop.repository</groupId>
<artifactId>sqoop-repository-postgresql</artifactId>
<version>${project.version}</version>
</dependency>
<dependency> <dependency>
<groupId>org.apache.sqoop</groupId> <groupId>org.apache.sqoop</groupId>
<artifactId>connector-sdk</artifactId> <artifactId>connector-sdk</artifactId>

View File

@ -35,6 +35,7 @@ limitations under the License.
<modules> <modules>
<module>repository-common</module> <module>repository-common</module>
<module>repository-derby</module> <module>repository-derby</module>
<module>repository-postgresql</module>
</modules> </modules>
</project> </project>

View File

@ -2196,9 +2196,13 @@ private void createInputValues(String query,
} }
/** /**
* Execute given query on database. * Execute given query via a PreparedStatement.
* A list of args can be passed to the query.
*
* Example: runQuery("SELECT * FROM example WHERE test = ?", "test");
* *
* @param query Query that should be executed * @param query Query that should be executed
* @param args Long, String, or Object arguments
*/ */
protected void runQuery(String query, Connection conn, Object... args) { protected void runQuery(String query, Connection conn, Object... args) {
PreparedStatement stmt = null; PreparedStatement stmt = null;

View File

@ -0,0 +1,76 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.sqoop</groupId>
<artifactId>repository</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<groupId>org.apache.sqoop.repository</groupId>
<artifactId>sqoop-repository-postgresql</artifactId>
<name>Sqoop PostgreSQL Repository</name>
<dependencies>
<dependency>
<groupId>org.apache.sqoop</groupId>
<artifactId>sqoop-common-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.sqoop</groupId>
<artifactId>sqoop-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.sqoop.repository</groupId>
<artifactId>sqoop-repository-common</artifactId>
</dependency>
<dependency>
<groupId>postgresql</groupId>
<artifactId>postgresql</artifactId>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.sqoop</groupId>
<artifactId>sqoop-core</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.repository.postgresql;
public class PostgresqlRepoConstants {
/**
* Expected version of the repository structures.
*
* History:
* 1 - Version 1.99.4
*/
public static final int LATEST_POSTGRESQL_REPOSITORY_VERSION = 1;
private PostgresqlRepoConstants() {
// Disable explicit object creation
}
}

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.repository.postgresql;
import org.apache.sqoop.common.ErrorCode;
public enum PostgresqlRepoError implements ErrorCode {
POSTGRESQLREPO_0000("An unknown error has occurred"),
POSTGRESQLREPO_0001("Unable to run specified query"),
POSTGRESQLREPO_0002("Update of driver config failed"),
POSTGRESQLREPO_0003("Could not add directions"),
POSTGRESQLREPO_0004("Could not get ID of recently added direction"),
POSTGRESQLREPO_0005("Unsupported repository version"),
;
private final String message;
private PostgresqlRepoError(String message) {
this.message = message;
}
public String getCode() {
return name();
}
public String getMessage() {
return message;
}
}

View File

@ -0,0 +1,224 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.repository.postgresql;
import static org.apache.sqoop.repository.postgresql.PostgresqlSchemaQuery.*;
import static org.apache.sqoop.repository.postgresql.PostgresqlSchemaCreateQuery.*;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.repository.JdbcRepositoryContext;
import org.apache.sqoop.repository.common.CommonRepoConstants;
import org.apache.sqoop.repository.common.CommonRepositoryHandler;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Map;
import java.util.TreeMap;
public class PostgresqlRepositoryHandler extends CommonRepositoryHandler {
private static final Logger LOG =
Logger.getLogger(PostgresqlRepositoryHandler.class);
private JdbcRepositoryContext repoContext;
/**
* {@inheritDoc}
*/
@Override
public String name() {
return "PostgreSQL";
}
/**
* {@inheritDoc}
*/
@Override
public synchronized void initialize(JdbcRepositoryContext ctx) {
repoContext = ctx;
repoContext.getDataSource();
LOG.info("PostgresqlRepositoryHandler initialized.");
}
/**
* {@inheritDoc}
*/
@Override
public synchronized void shutdown() {}
/**
* Detect version of underlying database structures
*
* @param conn JDBC Connection
* @return
*/
public int detectRepositoryVersion(Connection conn) {
ResultSet rs = null, metadataResultSet = null;
PreparedStatement stmt = null;
// Select and return the version
try {
DatabaseMetaData md = conn.getMetaData();
metadataResultSet = md.getTables(null,
PostgresqlSchemaConstants.SCHEMA_SQOOP.toLowerCase(),
PostgresqlSchemaConstants.TABLE_SQ_SYSTEM_NAME.toLowerCase(), null);
if (metadataResultSet.next()) {
stmt = conn.prepareStatement(STMT_SELECT_SYSTEM);
stmt.setString(1, CommonRepoConstants.SYSKEY_VERSION);
rs = stmt.executeQuery();
if (!rs.next()) {
return 0;
}
return rs.getInt(1);
}
} catch (SQLException e) {
LOG.info("Can't fetch repository structure version.", e);
return 0;
} finally {
closeResultSets(rs);
closeStatements(stmt);
}
return 0;
}
/**
* {@inheritDoc}
*/
@Override
public void createOrUpgradeRepository(Connection conn) {
int version = detectRepositoryVersion(conn);
if (version == PostgresqlRepoConstants.LATEST_POSTGRESQL_REPOSITORY_VERSION) {
return;
}
if (version == 0) {
runQuery(QUERY_CREATE_SCHEMA_SQOOP, conn);
runQuery(QUERY_CREATE_TABLE_SQ_CONFIGURABLE, conn);
runQuery(QUERY_CREATE_TABLE_SQ_CONFIG, conn);
runQuery(QUERY_CREATE_TABLE_SQ_INPUT, conn);
runQuery(QUERY_CREATE_TABLE_SQ_LINK, conn);
runQuery(QUERY_CREATE_TABLE_SQ_JOB, conn);
runQuery(QUERY_CREATE_TABLE_SQ_LINK_INPUT, conn);
runQuery(QUERY_CREATE_TABLE_SQ_JOB_INPUT, conn);
runQuery(QUERY_CREATE_TABLE_SQ_SUBMISSION, conn);
runQuery(QUERY_CREATE_TABLE_SQ_COUNTER_GROUP, conn);
runQuery(QUERY_CREATE_TABLE_SQ_COUNTER, conn);
runQuery(QUERY_CREATE_TABLE_SQ_COUNTER_SUBMISSION, conn);
runQuery(QUERY_CREATE_TABLE_SQ_SYSTEM, conn);
runQuery(QUERY_CREATE_TABLE_SQ_DIRECTION, conn);
runQuery(QUERY_CREATE_TABLE_SQ_CONNECTOR_DIRECTIONS, conn);
runQuery(QUERY_CREATE_TABLE_SQ_CONFIG_DIRECTIONS, conn);
// Insert FROM and TO directions.
insertDirections(conn);
} else if (version < 4) {
LOG.error("Found unknown version for PostgreSQL repository: " + version);
throw new SqoopException(PostgresqlRepoError.POSTGRESQLREPO_0005, "Found version: " + version);
}
ResultSet rs = null;
PreparedStatement stmt = null;
try {
stmt = conn.prepareStatement(STMT_DELETE_SYSTEM);
stmt.setString(1, CommonRepoConstants.SYSKEY_VERSION);
stmt.executeUpdate();
closeStatements(stmt);
stmt = conn.prepareStatement(STMT_INSERT_SYSTEM);
stmt.setString(1, CommonRepoConstants.SYSKEY_VERSION);
stmt.setString(2, Integer.toString(PostgresqlRepoConstants.LATEST_POSTGRESQL_REPOSITORY_VERSION));
stmt.executeUpdate();
} catch (SQLException e) {
LOG.error("Can't persist the repository version", e);
} finally {
closeResultSets(rs);
closeStatements(stmt);
}
}
/**
* Insert directions: FROM and TO.
* @param conn
* @return Map<Direction, Long> direction ID => Direction
*/
protected Map<Direction, Long> insertDirections(Connection conn) {
// Add directions
Map<Direction, Long> directionMap = new TreeMap<Direction, Long>();
PreparedStatement insertDirectionStmt = null;
try {
// Insert directions and get IDs.
for (Direction direction : Direction.values()) {
insertDirectionStmt = conn.prepareStatement(STMT_INSERT_DIRECTION, Statement.RETURN_GENERATED_KEYS);
insertDirectionStmt.setString(1, direction.toString());
if (insertDirectionStmt.executeUpdate() != 1) {
throw new SqoopException(PostgresqlRepoError.POSTGRESQLREPO_0003, "Could not add directions FROM and TO.");
}
ResultSet directionId = insertDirectionStmt.getGeneratedKeys();
if (directionId.next()) {
if (LOG.isInfoEnabled()) {
LOG.info("Loaded direction: " + directionId.getLong(1));
}
directionMap.put(direction, directionId.getLong(1));
} else {
throw new SqoopException(PostgresqlRepoError.POSTGRESQLREPO_0004, "Could not get ID of direction " + direction);
}
}
} catch (SQLException e) {
throw new SqoopException(PostgresqlRepoError.POSTGRESQLREPO_0000, e);
} finally {
closeStatements(insertDirectionStmt);
}
return directionMap;
}
/**
* {@inheritDoc}
*/
@Override
public boolean isRepositorySuitableForUse(Connection conn) {
int version = detectRepositoryVersion(conn);
if(version != PostgresqlRepoConstants.LATEST_POSTGRESQL_REPOSITORY_VERSION) {
return false;
}
return true;
}
/**
* {@inheritDoc}
*/
@Override
public String validationQuery() {
return "values(1)"; // Yes, this is valid PostgreSQL SQL
}
}

View File

@ -0,0 +1,281 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.repository.postgresql;
public class PostgresqlSchemaConstants {
public static final String SCHEMA_SQOOP = "SQOOP";
private static final String SCHEMA_PREFIX = SCHEMA_SQOOP + ".";
// SQ_SYSTEM
public static final String TABLE_SQ_SYSTEM_NAME = "SQ_SYSTEM";
public static final String TABLE_SQ_SYSTEM = SCHEMA_PREFIX
+ TABLE_SQ_SYSTEM_NAME;
public static final String COLUMN_SQM_ID = "SQM_ID";
public static final String COLUMN_SQM_KEY = "SQM_KEY";
public static final String COLUMN_SQM_VALUE = "SQM_VALUE";
// SQ_DIRECTION
public static final String TABLE_SQ_DIRECTION_NAME = "SQ_DIRECTION";
public static final String TABLE_SQ_DIRECTION = SCHEMA_PREFIX
+ TABLE_SQ_DIRECTION_NAME;
public static final String COLUMN_SQD_ID = "SQD_ID";
public static final String COLUMN_SQD_NAME = "SQD_NAME";
// SQ_CONNECTOR
public static final String TABLE_SQ_CONFIGURABLE_NAME = "SQ_CONFIGURABLE";
public static final String TABLE_SQ_CONFIGURABLE = SCHEMA_PREFIX
+ TABLE_SQ_CONFIGURABLE_NAME;
public static final String COLUMN_SQC_ID = "SQC_ID";
public static final String COLUMN_SQC_NAME = "SQC_NAME";
public static final String COLUMN_SQC_TYPE = "SQC_TYPE";
public static final String COLUMN_SQC_CLASS = "SQC_CLASS";
public static final String COLUMN_SQC_VERSION = "SQC_VERSION";
// SQ_CONNECTOR_DIRECTIONS
public static final String TABLE_SQ_CONNECTOR_DIRECTIONS_NAME = "SQ_CONNECTOR_DIRECTIONS";
public static final String TABLE_SQ_CONNECTOR_DIRECTIONS = SCHEMA_PREFIX
+ TABLE_SQ_CONNECTOR_DIRECTIONS_NAME;
public static final String COLUMN_SQCD_ID = "SQCD_ID";
public static final String COLUMN_SQCD_CONNECTOR = "SQCD_CONNECTOR";
public static final String COLUMN_SQCD_DIRECTION = "SQCD_DIRECTION";
// SQ_CONFIG
public static final String TABLE_SQ_CONFIG_NAME = "SQ_CONFIG";
public static final String TABLE_SQ_CONFIG = SCHEMA_PREFIX
+ TABLE_SQ_CONFIG_NAME;
public static final String COLUMN_SQ_CFG_ID = "SQ_CFG_ID";
public static final String COLUMN_SQ_CFG_CONFIGURABLE = "SQ_CFG_CONFIGURABLE";
public static final String COLUMN_SQ_CFG_NAME = "SQ_CFG_NAME";
public static final String COLUMN_SQ_CFG_TYPE = "SQ_CFG_TYPE";
public static final String COLUMN_SQ_CFG_INDEX = "SQ_CFG_INDEX";
// SQ_CONFIG_DIRECTIONS
public static final String TABLE_SQ_CONFIG_DIRECTIONS_NAME = "SQ_CONFIG_DIRECTIONS";
public static final String TABLE_SQ_CONFIG_DIRECTIONS = SCHEMA_PREFIX
+ TABLE_SQ_CONFIG_DIRECTIONS_NAME;
public static final String COLUMN_SQ_CFG_DIR_ID = "SQ_CFG_DIR_ID";
public static final String COLUMN_SQ_CFG_DIR_CONFIG = "SQ_CFG_DIR_CONFIG";
public static final String COLUMN_SQ_CFG_DIR_DIRECTION = "SQ_CFG_DIR_DIRECTION";
// SQ_INPUT
public static final String TABLE_SQ_INPUT_NAME = "SQ_INPUT";
public static final String TABLE_SQ_INPUT = SCHEMA_PREFIX
+ TABLE_SQ_INPUT_NAME;
public static final String COLUMN_SQI_ID = "SQI_ID";
public static final String COLUMN_SQI_NAME = "SQI_NAME";
public static final String COLUMN_SQI_CONFIG = "SQI_CONFIG";
public static final String COLUMN_SQI_INDEX = "SQI_INDEX";
public static final String COLUMN_SQI_TYPE = "SQI_TYPE";
public static final String COLUMN_SQI_STRMASK = "SQI_STRMASK";
public static final String COLUMN_SQI_STRLENGTH = "SQI_STRLENGTH";
public static final String COLUMN_SQI_ENUMVALS = "SQI_ENUMVALS";
public static final String TABLE_SQ_LINK_NAME = "SQ_LINK";
// SQ_LINK
public static final String TABLE_SQ_LINK = SCHEMA_PREFIX
+ TABLE_SQ_LINK_NAME;
public static final String COLUMN_SQ_LNK_ID = "SQ_LNK_ID";
public static final String COLUMN_SQ_LNK_NAME = "SQ_LNK_NAME";
public static final String COLUMN_SQ_LNK_CONFIGURABLE = "SQ_LNK_CONFIGURABLE";
public static final String COLUMN_SQ_LNK_CREATION_USER = "SQ_LNK_CREATION_USER";
public static final String COLUMN_SQ_LNK_CREATION_DATE = "SQ_LNK_CREATION_DATE";
public static final String COLUMN_SQ_LNK_UPDATE_USER = "SQ_LNK_UPDATE_USER";
public static final String COLUMN_SQ_LNK_UPDATE_DATE = "SQ_LNK_UPDATE_DATE";
public static final String COLUMN_SQ_LNK_ENABLED = "SQ_LNK_ENABLED";
// SQ_JOB
public static final String TABLE_SQ_JOB_NAME = "SQ_JOB";
public static final String TABLE_SQ_JOB = SCHEMA_PREFIX
+ TABLE_SQ_JOB_NAME;
public static final String COLUMN_SQB_ID = "SQB_ID";
public static final String COLUMN_SQB_NAME = "SQB_NAME";
public static final String COLUMN_SQB_FROM_LINK = "SQB_FROM_LINK";
public static final String COLUMN_SQB_TO_LINK = "SQB_TO_LINK";
public static final String COLUMN_SQB_CREATION_USER = "SQB_CREATION_USER";
public static final String COLUMN_SQB_CREATION_DATE = "SQB_CREATION_DATE";
public static final String COLUMN_SQB_UPDATE_USER = "SQB_UPDATE_USER";
public static final String COLUMN_SQB_UPDATE_DATE = "SQB_UPDATE_DATE";
public static final String COLUMN_SQB_ENABLED = "SQB_ENABLED";
// SQ_LINK_INPUT
public static final String TABLE_SQ_LINK_INPUT_NAME =
"SQ_LINK_INPUT";
public static final String TABLE_SQ_LINK_INPUT = SCHEMA_PREFIX
+ TABLE_SQ_LINK_INPUT_NAME;
public static final String COLUMN_SQ_LNKI_LINK = "SQ_LNKI_LINK";
public static final String COLUMN_SQ_LNKI_INPUT = "SQ_LNKI_INPUT";
public static final String COLUMN_SQ_LNKI_VALUE = "SQ_LNKI_VALUE";
// SQ_JOB_INPUT
public static final String TABLE_SQ_JOB_INPUT_NAME =
"SQ_JOB_INPUT";
public static final String TABLE_SQ_JOB_INPUT = SCHEMA_PREFIX
+ TABLE_SQ_JOB_INPUT_NAME;
public static final String COLUMN_SQBI_JOB = "SQBI_JOB";
public static final String COLUMN_SQBI_INPUT = "SQBI_INPUT";
public static final String COLUMN_SQBI_VALUE = "SQBI_VALUE";
// SQ_SUBMISSION
public static final String TABLE_SQ_SUBMISSION_NAME =
"SQ_SUBMISSION";
public static final String TABLE_SQ_SUBMISSION = SCHEMA_PREFIX
+ TABLE_SQ_SUBMISSION_NAME;
public static final String COLUMN_SQS_ID = "SQS_ID";
public static final String COLUMN_SQS_JOB = "SQS_JOB";
public static final String COLUMN_SQS_STATUS = "SQS_STATUS";
public static final String COLUMN_SQS_CREATION_USER = "SQS_CREATION_USER";
public static final String COLUMN_SQS_CREATION_DATE = "SQS_CREATION_DATE";
public static final String COLUMN_SQS_UPDATE_USER = "SQS_UPDATE_USER";
public static final String COLUMN_SQS_UPDATE_DATE = "SQS_UPDATE_DATE";
public static final String COLUMN_SQS_EXTERNAL_ID = "SQS_EXTERNAL_ID";
public static final String COLUMN_SQS_EXTERNAL_LINK = "SQS_EXTERNAL_LINK";
public static final String COLUMN_SQS_EXCEPTION = "SQS_EXCEPTION";
public static final String COLUMN_SQS_EXCEPTION_TRACE = "SQS_EXCEPTION_TRACE";
// SQ_COUNTER_GROUP
public static final String TABLE_SQ_COUNTER_GROUP_NAME =
"SQ_COUNTER_GROUP";
public static final String TABLE_SQ_COUNTER_GROUP = SCHEMA_PREFIX
+ TABLE_SQ_COUNTER_GROUP_NAME;
public static final String COLUMN_SQG_ID = "SQG_ID";
public static final String COLUMN_SQG_NAME = "SQG_NAME";
// SQ_COUNTER_GROUP
public static final String TABLE_SQ_COUNTER_NAME =
"SQ_COUNTER";
public static final String TABLE_SQ_COUNTER = SCHEMA_PREFIX
+ TABLE_SQ_COUNTER_NAME;
public static final String COLUMN_SQR_ID = "SQR_ID";
public static final String COLUMN_SQR_NAME = "SQR_NAME";
// SQ_COUNTER_SUBMISSION
public static final String TABLE_SQ_COUNTER_SUBMISSION_NAME =
"SQ_COUNTER_SUBMISSION";
public static final String TABLE_SQ_COUNTER_SUBMISSION = SCHEMA_PREFIX
+ TABLE_SQ_COUNTER_SUBMISSION_NAME;
public static final String COLUMN_SQRS_GROUP = "SQRS_GROUP";
public static final String COLUMN_SQRS_COUNTER = "SQRS_COUNTER";
public static final String COLUMN_SQRS_SUBMISSION = "SQRS_SUBMISSION";
public static final String COLUMN_SQRS_VALUE = "SQRS_VALUE";
private PostgresqlSchemaConstants() {
// Disable explicit object creation
}
}

View File

@ -0,0 +1,376 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.repository.postgresql;
import static org.apache.sqoop.repository.postgresql.PostgresqlSchemaConstants.*;
/**
* DDL queries that create the Sqoop repository schema in PostgreSQL database. These
* queries create the following tables:
* <p>
* <strong>SQ_SYSTEM</strong>: Store for various state information
* <pre>
* +----------------------------+
* | SQ_SYSTEM |
* +----------------------------+
* | SQM_ID: BIGSERIAL PK |
* | SQM_KEY: VARCHAR(64) |
* | SQM_VALUE: VARCHAR(64) |
* +----------------------------+
* </pre>
* </p>
* <p>
* <strong>SQ_DIRECTION</strong>: Directions.
* <pre>
* +--------------------------+
* | SQ_DIRECTION |
* +--------------------------+
* | SQD_ID: BIGSERIAL PK |
* | SQD_NAME: VARCHAR(64) | "FROM"|"TO"
* +--------------------------+
* </pre>
* </p>
* <p>
* <strong>SQ_CONFIGURABLE</strong>: Configurable registration.
* <pre>
* +-----------------------------+
* | SQ_CONFIGURABLE |
* +-----------------------------+
* | SQC_ID: BIGINT PK AUTO-GEN |
* | SQC_NAME: VARCHAR(64) |
* | SQC_CLASS: VARCHAR(255) |
* | SQC_TYPE: VARCHAR(32) |"CONNECTOR"|"DRIVER"
* | SQC_VERSION: VARCHAR(64) |
* +-----------------------------+
* </pre>
* </p>
* <p>
* <strong>SQ_CONNECTOR_DIRECTIONS</strong>: Connector directions.
* <pre>
* +------------------------------+
* | SQ_CONNECTOR_DIRECTIONS |
* +------------------------------+
* | SQCD_ID: BIGSERIAL PK |
* | SQCD_CONNECTOR: BIGINT | FK SQCD_CONNECTOR(SQC_ID)
* | SQCD_DIRECTION: BIGINT | FK SQCD_DIRECTION(SQD_ID)
* +------------------------------+
* </pre>
* </p>
* <p>
* <strong>SQ_CONFIG</strong>: Config details.
* <pre>
* +-------------------------------------+
* | SQ_CONFIG |
* +-------------------------------------+
* | SQ_CFG_ID: BIGSERIAL PK |
* | SQ_CFG_CONNECTOR: BIGINT | FK SQ_CFG_CONNECTOR(SQC_ID),NULL for driver
* | SQ_CFG_NAME: VARCHAR(64) |
* | SQ_CFG_TYPE: VARCHAR(32) | "LINK"|"JOB"
* | SQ_CFG_INDEX: SMALLINT |
* +-------------------------------------+
* </pre>
* </p>
* <p>
* <strong>SQ_CONFIG_DIRECTIONS</strong>: Connector directions.
* <pre>
* +------------------------------+
* | SQ_CONFIG_DIRECTIONS |
* +------------------------------+
* | SQ_CFG_ID: BIGSERIAL PK |
* | SQ_CFG_DIR_CONFIG: BIGINT | FK SQ_CFG_DIR_CONFIG(SQ_CFG_ID)
* | SQ_CFG_DIR_DIRECTION: BIGINT | FK SQ_CFG_DIR_DIRECTION(SQD_ID)
* +------------------------------+
* </pre>
* </p>
* <p>
* <strong>SQ_INPUT</strong>: Input details
* <pre>
* +----------------------------+
* | SQ_INPUT |
* +----------------------------+
* | SQI_ID: BIGSERIAL PK |
* | SQI_NAME: VARCHAR(64) |
* | SQI_CONFIG: BIGINT | FK SQI_CONFIG(SQ_CFG_ID)
* | SQI_INDEX: SMALLINT |
* | SQI_TYPE: VARCHAR(32) | "STRING"|"MAP"
* | SQI_STRMASK: BOOLEAN |
* | SQI_STRLENGTH: SMALLINT |
* | SQI_ENUMVALS: VARCHAR(100) |
* +----------------------------+
* </pre>
* </p>
* <p>
* <strong>SQ_LINK</strong>: Stored connections
* <pre>
* +-----------------------------------+
* | SQ_LINK |
* +-----------------------------------+
* | SQ_LNK_ID: BIGSERIAL PK |
* | SQ_LNK_NAME: VARCHAR(64) |
* | SQ_LNK_CONNECTOR: BIGINT | FK SQ_CONNECTOR(SQC_ID)
* | SQ_LNK_CREATION_USER: VARCHAR(32) |
* | SQ_LNK_CREATION_DATE: TIMESTAMP |
* | SQ_LNK_UPDATE_USER: VARCHAR(32) |
* | SQ_LNK_UPDATE_DATE: TIMESTAMP |
* | SQ_LNK_ENABLED: BOOLEAN |
* +-----------------------------------+
* </pre>
* </p>
* <p>
* <strong>SQ_JOB</strong>: Stored jobs
* <pre>
* +--------------------------------+
* | SQ_JOB |
* +--------------------------------+
* | SQB_ID: BIGSERIAL PK |
* | SQB_NAME: VARCHAR(64) |
* | SQB_FROM_LINK: BIGINT | FK SQ_LINK(SQ_LNK_ID)
* | SQB_TO_LINK: BIGINT | FK SQ_LINK(SQ_LNK_ID)
* | SQB_CREATION_USER: VARCHAR(32) |
* | SQB_CREATION_DATE: TIMESTAMP |
* | SQB_UPDATE_USER: VARCHAR(32) |
* | SQB_UPDATE_DATE: TIMESTAMP |
* | SQB_ENABLED: BOOLEAN |
* +--------------------------------+
* </pre>
* </p>
* <p>
* <strong>SQ_LINK_INPUT</strong>: N:M relationship link and input
* <pre>
* +----------------------------+
* | SQ_LINK_INPUT |
* +----------------------------+
* | SQ_LNK_LINK: BIGSERIAL | FK SQ_LINK(SQ_LNK_ID)
* | SQ_LNK_INPUT: BIGINT | FK SQ_INPUT(SQI_ID)
* | SQ_LNK_VALUE: VARCHAR |
* +----------------------------+
* </pre>
* </p>
* <p>
* <strong>SQ_JOB_INPUT</strong>: N:M relationship job and input
* <pre>
* +----------------------------+
* | SQ_JOB_INPUT |
* +----------------------------+
* | SQBI_JOB: BIGINT | FK SQ_JOB(SQB_ID)
* | SQBI_INPUT: BIGINT | FK SQ_INPUT(SQI_ID)
* | SQBI_VALUE: VARCHAR(1000) |
* +----------------------------+
* </pre>
* </p>
* <p>
* <strong>SQ_SUBMISSION</strong>: List of submissions
* <pre>
* +-----------------------------------+
* | SQ_JOB_SUBMISSION |
* +-----------------------------------+
* | SQS_ID: BIGSERIAL PK |
* | SQS_JOB: BIGINT | FK SQ_JOB(SQB_ID)
* | SQS_STATUS: VARCHAR(20) |
* | SQS_CREATION_USER: VARCHAR(32) |
* | SQS_CREATION_DATE: TIMESTAMP |
* | SQS_UPDATE_USER: VARCHAR(32) |
* | SQS_UPDATE_DATE: TIMESTAMP |
* | SQS_EXTERNAL_ID: VARCHAR(50) |
* | SQS_EXTERNAL_LINK: VARCHAR(150) |
* | SQS_EXCEPTION: VARCHAR(150) |
* | SQS_EXCEPTION_TRACE: VARCHAR(750) |
* +-----------------------------------+
* </pre>
* </p>
* <p>
* <strong>SQ_COUNTER_GROUP</strong>: List of counter groups
* <pre>
* +----------------------------+
* | SQ_COUNTER_GROUP |
* +----------------------------+
* | SQG_ID: BIGINT PK |
* | SQG_NAME: VARCHAR(75) |
* +----------------------------+
* </pre>
* </p>
* <p>
* <strong>SQ_COUNTER</strong>: List of counters
* <pre>
* +----------------------------+
* | SQ_COUNTER |
* +----------------------------+
* | SQR_ID: BIGINT PK |
* | SQR_NAME: VARCHAR(75) |
* +----------------------------+
* </pre>
* </p>
* <p>
* <strong>SQ_COUNTER_SUBMISSION</strong>: N:M Relationship
* <pre>
* +----------------------------+
* | SQ_COUNTER_SUBMISSION |
* +----------------------------+
* | SQRS_GROUP: BIGINT PK | FK SQ_COUNTER_GROUP(SQR_ID)
* | SQRS_COUNTER: BIGINT PK | FK SQ_COUNTER(SQR_ID)
* | SQRS_SUBMISSION: BIGINT PK | FK SQ_SUBMISSION(SQS_ID)
* | SQRS_VALUE: BIGINT |
* +----------------------------+
* </pre>
* </p>
*/
public class PostgresqlSchemaCreateQuery {
public static final String QUERY_CREATE_SCHEMA_SQOOP =
"CREATE SCHEMA " + SCHEMA_SQOOP;
public static final String QUERY_CREATE_TABLE_SQ_SYSTEM =
"CREATE TABLE " + TABLE_SQ_SYSTEM + " ("
+ COLUMN_SQM_ID + " BIGSERIAL PRIMARY KEY NOT NULL, "
+ COLUMN_SQM_KEY + " VARCHAR(64), "
+ COLUMN_SQM_VALUE + " VARCHAR(64) "
+ ")";
public static final String QUERY_CREATE_TABLE_SQ_DIRECTION =
"CREATE TABLE " + TABLE_SQ_DIRECTION + " ("
+ COLUMN_SQD_ID + " BIGSERIAL PRIMARY KEY NOT NULL, "
+ COLUMN_SQD_NAME + " VARCHAR(64)"
+ ")";
public static final String QUERY_CREATE_TABLE_SQ_CONFIGURABLE =
"CREATE TABLE " + TABLE_SQ_CONFIGURABLE + " ("
+ COLUMN_SQC_ID + " BIGSERIAL PRIMARY KEY NOT NULL, "
+ COLUMN_SQC_NAME + " VARCHAR(64) UNIQUE, "
+ COLUMN_SQC_TYPE + " VARCHAR(32), "
+ COLUMN_SQC_CLASS + " VARCHAR(255), "
+ COLUMN_SQC_VERSION + " VARCHAR(64) "
+ ")";
public static final String QUERY_CREATE_TABLE_SQ_CONNECTOR_DIRECTIONS =
"CREATE TABLE " + TABLE_SQ_CONNECTOR_DIRECTIONS + " ("
+ COLUMN_SQCD_ID + " BIGSERIAL PRIMARY KEY NOT NULL, "
+ COLUMN_SQCD_CONNECTOR + " BIGINT REFERENCES " + TABLE_SQ_CONFIGURABLE + "(" + COLUMN_SQC_ID + ")" + ", "
+ COLUMN_SQCD_DIRECTION + " BIGINT REFERENCES " + TABLE_SQ_DIRECTION + "(" + COLUMN_SQD_ID + ")"
+ ")";
public static final String QUERY_CREATE_TABLE_SQ_CONFIG =
"CREATE TABLE " + TABLE_SQ_CONFIG + " ("
+ COLUMN_SQ_CFG_ID + " BIGSERIAL PRIMARY KEY NOT NULL, "
+ COLUMN_SQ_CFG_CONFIGURABLE + " BIGINT REFERENCES " + TABLE_SQ_CONFIGURABLE + "(" + COLUMN_SQC_ID + ")" + ", "
+ COLUMN_SQ_CFG_NAME + " VARCHAR(64), "
+ COLUMN_SQ_CFG_TYPE + " VARCHAR(32), "
+ COLUMN_SQ_CFG_INDEX + " SMALLINT, "
+ "UNIQUE (" + COLUMN_SQ_CFG_NAME + ", " + COLUMN_SQ_CFG_TYPE + ", " + COLUMN_SQ_CFG_CONFIGURABLE + ") "
+ ")";
public static final String QUERY_CREATE_TABLE_SQ_CONFIG_DIRECTIONS =
"CREATE TABLE " + TABLE_SQ_CONFIG_DIRECTIONS + " ("
+ COLUMN_SQ_CFG_DIR_ID + " BIGSERIAL PRIMARY KEY NOT NULL, "
+ COLUMN_SQ_CFG_DIR_CONFIG + " BIGINT REFERENCES " + TABLE_SQ_CONFIG + "(" + COLUMN_SQ_CFG_ID + ")" + ", "
+ COLUMN_SQ_CFG_DIR_DIRECTION + " BIGINT REFERENCES " + TABLE_SQ_DIRECTION + "(" + COLUMN_SQD_ID + ")"
+ ")";
public static final String QUERY_CREATE_TABLE_SQ_INPUT =
"CREATE TABLE " + TABLE_SQ_INPUT + " ("
+ COLUMN_SQI_ID + " BIGSERIAL PRIMARY KEY NOT NULL, "
+ COLUMN_SQI_NAME + " VARCHAR(64), "
+ COLUMN_SQI_CONFIG + " BIGINT REFERENCES " + TABLE_SQ_CONFIG + "(" + COLUMN_SQ_CFG_ID + ")" + ", "
+ COLUMN_SQI_INDEX + " SMALLINT, "
+ COLUMN_SQI_TYPE + " VARCHAR(32), "
+ COLUMN_SQI_STRMASK + " BOOLEAN, "
+ COLUMN_SQI_STRLENGTH + " SMALLINT, "
+ COLUMN_SQI_ENUMVALS + " VARCHAR(100), "
+ " UNIQUE (" + COLUMN_SQI_NAME + ", " + COLUMN_SQI_TYPE + ", " + COLUMN_SQI_CONFIG + ") "
+ ")";
public static final String QUERY_CREATE_TABLE_SQ_LINK =
"CREATE TABLE " + TABLE_SQ_LINK + " ("
+ COLUMN_SQ_LNK_ID + " BIGSERIAL PRIMARY KEY NOT NULL, "
+ COLUMN_SQ_LNK_CONFIGURABLE + " BIGINT REFERENCES " + TABLE_SQ_CONFIGURABLE + "(" + COLUMN_SQC_ID + ")" + ", "
+ COLUMN_SQ_LNK_NAME + " VARCHAR(32) UNIQUE, "
+ COLUMN_SQ_LNK_CREATION_DATE + " TIMESTAMP, "
+ COLUMN_SQ_LNK_CREATION_USER + " VARCHAR(32) DEFAULT NULL, "
+ COLUMN_SQ_LNK_UPDATE_DATE + " TIMESTAMP, "
+ COLUMN_SQ_LNK_UPDATE_USER + " VARCHAR(32) DEFAULT NULL, "
+ COLUMN_SQ_LNK_ENABLED + " BOOLEAN DEFAULT TRUE"
+ ")";
public static final String QUERY_CREATE_TABLE_SQ_JOB =
"CREATE TABLE " + TABLE_SQ_JOB + " ("
+ COLUMN_SQB_ID + " BIGSERIAL PRIMARY KEY NOT NULL, "
+ COLUMN_SQB_FROM_LINK + " BIGINT REFERENCES " + TABLE_SQ_LINK + "(" + COLUMN_SQ_LNK_ID + ")" + ", "
+ COLUMN_SQB_TO_LINK + " BIGINT REFERENCES " + TABLE_SQ_LINK + "(" + COLUMN_SQ_LNK_ID + ")" + ", "
+ COLUMN_SQB_NAME + " VARCHAR(64) UNIQUE, "
+ COLUMN_SQB_CREATION_DATE + " TIMESTAMP, "
+ COLUMN_SQB_CREATION_USER + " VARCHAR(32) DEFAULT NULL, "
+ COLUMN_SQB_UPDATE_DATE + " TIMESTAMP, "
+ COLUMN_SQB_UPDATE_USER + " VARCHAR(32) DEFAULT NULL, "
+ COLUMN_SQB_ENABLED + " BOOLEAN DEFAULT TRUE"
+ ")";
public static final String QUERY_CREATE_TABLE_SQ_LINK_INPUT =
"CREATE TABLE " + TABLE_SQ_LINK_INPUT + " ("
+ COLUMN_SQ_LNKI_LINK + " BIGINT REFERENCES " + TABLE_SQ_LINK + "(" + COLUMN_SQ_LNK_ID + ")" + ", "
+ COLUMN_SQ_LNKI_INPUT + " BIGINT REFERENCES " + TABLE_SQ_INPUT + "(" + COLUMN_SQI_ID + ")" + ", "
+ COLUMN_SQ_LNKI_VALUE + " VARCHAR, "
+ "PRIMARY KEY (" + COLUMN_SQ_LNKI_LINK + ", " + COLUMN_SQ_LNKI_INPUT + ")"
+ ")";
public static final String QUERY_CREATE_TABLE_SQ_JOB_INPUT =
"CREATE TABLE " + TABLE_SQ_JOB_INPUT + " ("
+ COLUMN_SQBI_JOB + " BIGINT REFERENCES " + TABLE_SQ_JOB + "(" + COLUMN_SQB_ID + ")" + ", "
+ COLUMN_SQBI_INPUT + " BIGINT REFERENCES " + TABLE_SQ_INPUT + "(" + COLUMN_SQI_ID + ")" + ", "
+ COLUMN_SQBI_VALUE + " VARCHAR(1000), "
+ "PRIMARY KEY (" + COLUMN_SQBI_JOB + ", " + COLUMN_SQBI_INPUT + ")"
+ ")";
public static final String QUERY_CREATE_TABLE_SQ_SUBMISSION =
"CREATE TABLE " + TABLE_SQ_SUBMISSION + " ("
+ COLUMN_SQS_ID + " BIGSERIAL PRIMARY KEY NOT NULL, "
+ COLUMN_SQS_JOB + " BIGINT REFERENCES " + TABLE_SQ_JOB + "(" + COLUMN_SQB_ID + ")" + ", "
+ COLUMN_SQS_STATUS + " VARCHAR(20), "
+ COLUMN_SQS_CREATION_DATE + " TIMESTAMP, "
+ COLUMN_SQS_CREATION_USER + " VARCHAR(32) DEFAULT NULL, "
+ COLUMN_SQS_UPDATE_DATE + " TIMESTAMP, "
+ COLUMN_SQS_UPDATE_USER + " VARCHAR(32) DEFAULT NULL, "
+ COLUMN_SQS_EXTERNAL_ID + " VARCHAR(50), "
+ COLUMN_SQS_EXTERNAL_LINK + " VARCHAR(150), "
+ COLUMN_SQS_EXCEPTION + " VARCHAR(150), "
+ COLUMN_SQS_EXCEPTION_TRACE + " VARCHAR(750)"
+ ")";
public static final String QUERY_CREATE_TABLE_SQ_COUNTER_GROUP =
"CREATE TABLE " + TABLE_SQ_COUNTER_GROUP + " ("
+ COLUMN_SQG_ID + " BIGSERIAL PRIMARY KEY NOT NULL, "
+ COLUMN_SQG_NAME + " VARCHAR(75) UNIQUE"
+ ")";
public static final String QUERY_CREATE_TABLE_SQ_COUNTER =
"CREATE TABLE " + TABLE_SQ_COUNTER + " ("
+ COLUMN_SQR_ID + " BIGSERIAL PRIMARY KEY NOT NULL, "
+ COLUMN_SQR_NAME + " VARCHAR(75) UNIQUE"
+ ")";
public static final String QUERY_CREATE_TABLE_SQ_COUNTER_SUBMISSION =
"CREATE TABLE " + TABLE_SQ_COUNTER_SUBMISSION + " ("
+ COLUMN_SQRS_GROUP + " BIGINT REFERENCES " + TABLE_SQ_COUNTER_GROUP + "(" + COLUMN_SQG_ID + ")" + ", "
+ COLUMN_SQRS_COUNTER + " BIGINT REFERENCES " + TABLE_SQ_COUNTER + "(" + COLUMN_SQR_ID + ")" + ", "
+ COLUMN_SQRS_SUBMISSION + " BIGINT REFERENCES " + TABLE_SQ_SUBMISSION + "(" + COLUMN_SQS_ID + ") ON DELETE CASCADE" + ", "
+ COLUMN_SQRS_VALUE + " BIGINT, "
+ "PRIMARY KEY (" + COLUMN_SQRS_GROUP + ", " + COLUMN_SQRS_COUNTER + ", " + COLUMN_SQRS_SUBMISSION + ")"
+ ")";
private PostgresqlSchemaCreateQuery() {
// Disable explicit object creation
}
}

View File

@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.repository.postgresql;
import static org.apache.sqoop.repository.postgresql.PostgresqlSchemaConstants.*;
/**
* DML for PostgreSQL repository.
*/
public class PostgresqlSchemaQuery {
public static final String STMT_SELECT_SYSTEM =
"SELECT "
+ COLUMN_SQM_VALUE
+ " FROM " + TABLE_SQ_SYSTEM
+ " WHERE " + COLUMN_SQM_KEY + " = ?";
public static final String STMT_DELETE_SYSTEM =
"DELETE FROM " + TABLE_SQ_SYSTEM
+ " WHERE " + COLUMN_SQM_KEY + " = ?";
public static final String STMT_INSERT_SYSTEM =
"INSERT INTO " + TABLE_SQ_SYSTEM + "("
+ COLUMN_SQM_KEY + ", "
+ COLUMN_SQM_VALUE + ") "
+ "VALUES(?, ?)";
public static final String STMT_INSERT_DIRECTION =
"INSERT INTO " + TABLE_SQ_DIRECTION
+ " (" + COLUMN_SQD_NAME+ ") VALUES (?)";
private PostgresqlSchemaQuery() {
// Disable explicit object creation
}
}

View File

@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.repository.postgresql;
import org.apache.sqoop.common.test.db.DatabaseProvider;
import org.apache.sqoop.common.test.db.PostgreSQLProvider;
import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
/**
* Abstract class with convenience methods for testing postgresql repository.
*/
abstract public class PostgresqlTestCase {
public static DatabaseProvider provider;
public static PostgresqlTestUtils utils;
public PostgresqlRepositoryHandler handler;
@BeforeClass
public static void setUpClass() {
provider = new PostgreSQLProvider();
utils = new PostgresqlTestUtils(provider);
}
@Before
public void setUp() throws Exception {
try {
provider.start();
} catch (RuntimeException e) {
Assume.assumeTrue(false);
}
handler = new PostgresqlRepositoryHandler();
handler.createOrUpgradeRepository(provider.getConnection());
}
@After
public void tearDown() throws Exception {
provider.dropSchema("sqoop");
provider.stop();
}
}

View File

@ -0,0 +1,90 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.repository.postgresql;
import org.apache.sqoop.common.test.db.DatabaseProvider;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
public class PostgresqlTestUtils {
private DatabaseProvider provider;
public PostgresqlTestUtils(DatabaseProvider provider) {
this.provider = provider;
}
public void assertTableExists(String schema, String table) throws Exception {
DatabaseMetaData md = provider.getConnection().getMetaData();
ResultSet rs = md.getTables(null, schema, table, null);
while (rs.next()) {
if (rs.getString(3).equals(table)) {
return;
}
}
throw new AssertionError("Could not find table '" + table + "' part of schema '" + schema + "'");
}
public void assertForeignKey(String schema, String table, String column,
String foreignKeyTable, String foreignKeyColumn) throws Exception {
DatabaseMetaData md = provider.getConnection().getMetaData();
ResultSet rs = md.getCrossReference(null, schema, table, null, schema, foreignKeyTable);
while (rs.next()) {
if (rs.getString(4).equals(column) && rs.getString(8).equals(foreignKeyColumn)) {
return;
}
}
throw new AssertionError("Could not find '" + table + "." + column
+ "' part of schema '" + schema + "' with reference to '" + table + "." + column + "'");
}
public void assertUniqueConstraints(String schema, String table, String... columns) throws Exception {
Set<String> columnSet = new TreeSet<String>(Arrays.asList(columns));
Map<String, Set<String>> indexColumnMap = new HashMap<String, Set<String>>();
DatabaseMetaData md = provider.getConnection().getMetaData();
ResultSet rs = md.getIndexInfo(null, schema, table, true, false);
// Get map of index => columns
while (rs.next()) {
String indexName = rs.getString(6);
String columnName = rs.getString(9);
if (!indexColumnMap.containsKey(indexName)) {
indexColumnMap.put(indexName, new TreeSet<String>());
}
indexColumnMap.get(indexName).add(columnName);
}
// Validate unique constraints
for (String index : indexColumnMap.keySet()) {
if (indexColumnMap.get(index).equals(columnSet)) {
return;
}
}
throw new AssertionError("Could not find unique constraint on table '" + table
+ "' part of schema '" + schema + "' with reference to columns '" + columnSet + "'");
}
}

View File

@ -0,0 +1,77 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.repository.postgresql;
import org.junit.Test;
/**
* Test connector methods on PostgreSQL repository.
*/
public class TestStructure extends PostgresqlTestCase {
@Test
public void testTables() throws Exception {
utils.assertTableExists("sqoop", "sq_system");
utils.assertTableExists("sqoop", "sq_direction");
utils.assertTableExists("sqoop", "sq_configurable");
utils.assertTableExists("sqoop", "sq_connector_directions");
utils.assertTableExists("sqoop", "sq_config");
utils.assertTableExists("sqoop", "sq_connector_directions");
utils.assertTableExists("sqoop", "sq_input");
utils.assertTableExists("sqoop", "sq_link");
utils.assertTableExists("sqoop", "sq_job");
utils.assertTableExists("sqoop", "sq_link_input");
utils.assertTableExists("sqoop", "sq_job_input");
utils.assertTableExists("sqoop", "sq_submission");
utils.assertTableExists("sqoop", "sq_counter_group");
utils.assertTableExists("sqoop", "sq_counter");
utils.assertTableExists("sqoop", "sq_counter_submission");
}
@Test
public void testForeignKeys() throws Exception {
utils.assertForeignKey("sqoop", "sq_configurable", "sqc_id", "sq_connector_directions", "sqcd_connector");
utils.assertForeignKey("sqoop", "sq_direction", "sqd_id", "sq_connector_directions", "sqcd_direction");
utils.assertForeignKey("sqoop", "sq_configurable", "sqc_id", "sq_config", "sq_cfg_configurable");
utils.assertForeignKey("sqoop", "sq_config", "sq_cfg_id", "sq_config_directions", "sq_cfg_dir_config");
utils.assertForeignKey("sqoop", "sq_direction", "sqd_id", "sq_config_directions", "sq_cfg_dir_direction");
utils.assertForeignKey("sqoop", "sq_config", "sq_cfg_id", "sq_input", "sqi_config");
utils.assertForeignKey("sqoop", "sq_configurable", "sqc_id", "sq_link", "sq_lnk_configurable");
utils.assertForeignKey("sqoop", "sq_link", "sq_lnk_id", "sq_job", "sqb_from_link");
utils.assertForeignKey("sqoop", "sq_link", "sq_lnk_id", "sq_job", "sqb_to_link");
utils.assertForeignKey("sqoop", "sq_link", "sq_lnk_id", "sq_link_input", "sq_lnki_link");
utils.assertForeignKey("sqoop", "sq_input", "sqi_id", "sq_link_input", "sq_lnki_input");
utils.assertForeignKey("sqoop", "sq_job", "sqb_id", "sq_job_input", "sqbi_job");
utils.assertForeignKey("sqoop", "sq_input", "sqi_id", "sq_job_input", "sqbi_input");
utils.assertForeignKey("sqoop", "sq_job", "sqb_id", "sq_submission", "sqs_job");
utils.assertForeignKey("sqoop", "sq_counter", "sqr_id", "sq_counter_submission", "sqrs_counter");
utils.assertForeignKey("sqoop", "sq_counter_group", "sqg_id", "sq_counter_submission", "sqrs_group");
utils.assertForeignKey("sqoop", "sq_submission", "sqs_id", "sq_counter_submission", "sqrs_submission");
}
@Test
public void testUniqueConstraints() throws Exception {
utils.assertUniqueConstraints("sqoop", "sq_configurable", "sqc_name");
utils.assertUniqueConstraints("sqoop", "sq_link", "sq_lnk_name");
utils.assertUniqueConstraints("sqoop", "sq_job", "sqb_name");
utils.assertUniqueConstraints("sqoop", "sq_config", "sq_cfg_name", "sq_cfg_configurable", "sq_cfg_type");
utils.assertUniqueConstraints("sqoop", "sq_input", "sqi_name", "sqi_type", "sqi_config");
utils.assertUniqueConstraints("sqoop", "sq_counter", "sqr_name");
utils.assertUniqueConstraints("sqoop", "sq_counter_group", "sqg_name");
}
}

View File

@ -0,0 +1,24 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Set root logger level to DEBUG and its only appender to A1.
log4j.rootLogger=DEBUG, A1
# A1 is set to be a ConsoleAppender.
log4j.appender.A1=org.apache.log4j.ConsoleAppender
# A1 uses PatternLayout.
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

View File

@ -69,6 +69,11 @@ limitations under the License.
<artifactId>sqoop-repository-derby</artifactId> <artifactId>sqoop-repository-derby</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.sqoop.repository</groupId>
<artifactId>sqoop-repository-postgresql</artifactId>
</dependency>
<dependency> <dependency>
<groupId>org.apache.sqoop.connector</groupId> <groupId>org.apache.sqoop.connector</groupId>
<artifactId>sqoop-connector-generic-jdbc</artifactId> <artifactId>sqoop-connector-generic-jdbc</artifactId>