From fc74316fbdcee80272106064ac4e1e747c300b97 Mon Sep 17 00:00:00 2001
From: Jarek Jarcec Cecho
Date: Thu, 9 Oct 2014 08:36:35 -0700
Subject: [PATCH] SQOOP-1545: Sqoop2: From/To: Add supported directions to
Repository
(Abraham Elmahrek via Jarek Jarcec Cecho)
---
.../sqoop/common/SupportedDirections.java | 79 +++-
.../sqoop/common/TestSupportedDirections.java | 19 +
.../repository/derby/DerbyRepoError.java | 8 +
.../derby/DerbyRepositoryHandler.java | 398 +++++++++++++++---
.../derby/DerbySchemaConstants.java | 53 +++
.../repository/derby/DerbySchemaQuery.java | 164 +++++++-
.../sqoop/repository/derby/DerbyTestCase.java | 131 ++++--
.../derby/TestConnectorHandling.java | 62 ++-
.../repository/derby/TestJobHandling.java | 4 -
.../sqoop/shell/ShowConnectorFunction.java | 30 +-
.../sqoop/connector/spi/SqoopConnector.java | 2 +-
11 files changed, 808 insertions(+), 142 deletions(-)
diff --git a/common/src/main/java/org/apache/sqoop/common/SupportedDirections.java b/common/src/main/java/org/apache/sqoop/common/SupportedDirections.java
index 25ba2762..c527117a 100644
--- a/common/src/main/java/org/apache/sqoop/common/SupportedDirections.java
+++ b/common/src/main/java/org/apache/sqoop/common/SupportedDirections.java
@@ -20,7 +20,9 @@
/**
* Represents which Directions are supported.
*/
-public class SupportedDirections {
+public class SupportedDirections implements Comparable {
+ private static final char SUPPORTED_DIRECTIONS_SEPARATOR = '/';
+
private boolean from;
private boolean to;
@@ -38,4 +40,79 @@ public boolean isDirectionSupported(Direction direction) {
return direction == Direction.FROM && from
|| direction == Direction.TO && to;
}
+
+ /**
+ * @return String "FROM", "TO", "FROM/TO", "".
+ */
+ public String toString() {
+ StringBuffer buffer = new StringBuffer();
+
+ if (isDirectionSupported(Direction.FROM)) {
+ buffer.append(Direction.FROM);
+
+ if (isDirectionSupported(Direction.TO)) {
+ buffer.append(SUPPORTED_DIRECTIONS_SEPARATOR);
+ buffer.append(Direction.TO);
+ }
+ } else if (isDirectionSupported(Direction.TO)) {
+ buffer.append(Direction.TO);
+ }
+
+ return buffer.toString();
+ }
+
+ public static SupportedDirections fromString(String supportedDirections) {
+ boolean from = false, to = false;
+
+ if (supportedDirections != null && !supportedDirections.equals("")) {
+ for (String direction : supportedDirections.split("/")) {
+ switch (Direction.valueOf(direction)) {
+ case FROM:
+ from = true;
+ break;
+
+ case TO:
+ to = true;
+ break;
+ }
+ }
+ }
+
+ return new SupportedDirections(from, to);
+ }
+
+ public static SupportedDirections fromDirection(Direction direction) {
+ boolean from = false, to = false;
+ switch (direction) {
+ case FROM:
+ from = true;
+ break;
+
+ case TO:
+ to = true;
+ break;
+ }
+ return new SupportedDirections(from, to);
+ }
+
+ @Override
+ public int compareTo(SupportedDirections o) {
+ int hash = 0;
+ if (this.isDirectionSupported(Direction.FROM)) {
+ hash |= 1;
+ }
+ if (this.isDirectionSupported(Direction.TO)) {
+ hash |= 2;
+ }
+
+ int oHash = 0;
+ if (this.isDirectionSupported(Direction.FROM)) {
+ oHash |= 1;
+ }
+ if (this.isDirectionSupported(Direction.TO)) {
+ oHash |= 2;
+ }
+
+ return hash - oHash;
+ }
}
diff --git a/common/src/test/java/org/apache/sqoop/common/TestSupportedDirections.java b/common/src/test/java/org/apache/sqoop/common/TestSupportedDirections.java
index 4fbaf82d..4f0cdd67 100644
--- a/common/src/test/java/org/apache/sqoop/common/TestSupportedDirections.java
+++ b/common/src/test/java/org/apache/sqoop/common/TestSupportedDirections.java
@@ -52,4 +52,23 @@ public void testIsDirectionSupported() {
Assert.assertFalse(
supportedDirections.isDirectionSupported(Direction.TO));
}
+
+ @Test
+ public void testToString() {
+ // Both
+ SupportedDirections supportedDirections = new SupportedDirections(true, true);
+ Assert.assertEquals("FROM/TO", supportedDirections.toString());
+
+ // FROM
+ supportedDirections = new SupportedDirections(true, false);
+ Assert.assertEquals("FROM", supportedDirections.toString());
+
+ // TO
+ supportedDirections = new SupportedDirections(false, true);
+ Assert.assertEquals("TO", supportedDirections.toString());
+
+ // NONE
+ supportedDirections = new SupportedDirections(false, false);
+ Assert.assertEquals("", supportedDirections.toString());
+ }
}
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java
index cc31d063..0f0f7c4c 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java
@@ -181,6 +181,14 @@ public enum DerbyRepoError implements ErrorCode {
DERBYREPO_0044("Update of driver config failed"),
DERBYREPO_0045("Can't retrieve all connectors"),
+
+ DERBYREPO_0046("Could not add directions"),
+
+ DERBYREPO_0047("Could not get ID of recently added direction"),
+
+ DERBYREPO_0048("Could not register config direction"),
+
+ DERBYREPO_0049("Could not set connector direction")
;
private final String message;
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
index 73d83876..10a7b1a5 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
@@ -33,13 +33,16 @@
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import java.util.TreeMap;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.common.DirectionError;
import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.common.SupportedDirections;
import org.apache.sqoop.connector.ConnectorHandler;
import org.apache.sqoop.connector.ConnectorManagerUtils;
import org.apache.sqoop.model.MBooleanInput;
@@ -88,6 +91,8 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
*/
private static final String CONNECTOR_HDFS = "hdfs-connector";
+ private static final String LINK_HDFS = "hdfs-link";
+
private JdbcRepositoryContext repoContext;
/**
@@ -121,7 +126,7 @@ private void insertConfigsForDriver(MDriver mDriver, Connection conn) {
// Register the job config type, since driver config is per job
registerConfigs(null, null, mDriver.getDriverConfig().getConfigs(),
- MConfigType.JOB.name(), baseConfigStmt, baseInputStmt);
+ MConfigType.JOB.name(), baseConfigStmt, baseInputStmt, conn);
} catch (SQLException ex) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0014, mDriver.toString(), ex);
@@ -150,14 +155,17 @@ private void insertConfigsForConnector (MConnector mc, Connection conn) {
// Register link type config
registerConfigs(connectorId, null, mc.getLinkConfig().getConfigs(),
- MConfigType.LINK.name(), baseConfigStmt, baseInputStmt);
+ MConfigType.LINK.name(), baseConfigStmt, baseInputStmt, conn);
// Register both from/to job type config
- registerConfigs(connectorId, Direction.FROM, mc.getConfig(Direction.FROM).getConfigs(),
- MConfigType.JOB.name(), baseConfigStmt, baseInputStmt);
- registerConfigs(connectorId, Direction.TO, mc.getConfig(Direction.TO).getConfigs(),
- MConfigType.JOB.name(), baseConfigStmt, baseInputStmt);
-
+ if (mc.getSupportedDirections().isDirectionSupported(Direction.FROM)) {
+ registerConfigs(connectorId, Direction.FROM, mc.getConfig(Direction.FROM).getConfigs(),
+ MConfigType.JOB.name(), baseConfigStmt, baseInputStmt, conn);
+ }
+ if (mc.getSupportedDirections().isDirectionSupported(Direction.TO)) {
+ registerConfigs(connectorId, Direction.TO, mc.getConfig(Direction.TO).getConfigs(),
+ MConfigType.JOB.name(), baseConfigStmt, baseInputStmt, conn);
+ }
} catch (SQLException ex) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0014,
mc.toString(), ex);
@@ -167,6 +175,34 @@ private void insertConfigsForConnector (MConnector mc, Connection conn) {
}
+ private void insertConnectorDirection(Long connectorId, Direction direction, Connection conn)
+ throws SQLException {
+ PreparedStatement stmt = null;
+
+ try {
+ stmt = conn.prepareStatement(STMT_INSERT_SQ_CONNECTOR_DIRECTIONS);
+ stmt.setLong(1, connectorId);
+ stmt.setLong(2, getDirection(direction, conn));
+
+ if (stmt.executeUpdate() != 1) {
+ throw new SqoopException(DerbyRepoError.DERBYREPO_0049);
+ }
+ } finally {
+ closeStatements(stmt);
+ }
+ }
+
+ private void insertConnectorDirections(Long connectorId, SupportedDirections directions, Connection conn)
+ throws SQLException {
+ if (directions.isDirectionSupported(Direction.FROM)) {
+ insertConnectorDirection(connectorId, Direction.FROM, conn);
+ }
+
+ if (directions.isDirectionSupported(Direction.TO)) {
+ insertConnectorDirection(connectorId, Direction.TO, conn);
+ }
+ }
+
private long getConnectorId(MConnector mc, Connection conn) {
PreparedStatement baseConnectorStmt = null;
try {
@@ -187,6 +223,10 @@ private long getConnectorId(MConnector mc, Connection conn) {
if (!rsetConnectorId.next()) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0013);
}
+
+ insertConnectorDirections(rsetConnectorId.getLong(1),
+ mc.getSupportedDirections(), conn);
+
return rsetConnectorId.getLong(1);
} catch (SQLException ex) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0014,
@@ -399,6 +439,7 @@ public void createOrUpdateInternals(Connection conn) {
}
if(version <= 3) {
// Schema modifications
+ runQuery(QUERY_CREATE_TABLE_SQ_DIRECTION, conn);
runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_RENAME_COLUMN_SQ_CFG_OPERATION_TO_SQ_CFG_DIRECTION, conn);
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_RENAME_COLUMN_SQB_LINK_TO_SQB_FROM_LINK, conn);
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_SQB_TO_LINK, conn);
@@ -412,6 +453,9 @@ public void createOrUpdateInternals(Connection conn) {
updteJobInternals(conn, registerHdfsConnector(conn));
}
+ // Change direction from VARCHAR to BIGINT + foreign key.
+ updateDirections(conn, insertDirections(conn));
+
// Wait to remove SQB_TYPE (IMPORT/EXPORT) until we update data.
// Data updates depend on knowledge of the type of job.
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_REMOVE_COLUMN_SQB_TYPE, conn);
@@ -442,6 +486,110 @@ public void createOrUpdateInternals(Connection conn) {
}
}
+ /**
+ * Insert directions: FROM and TO.
+ * @param conn
+ * @return Map direction ID => Direction
+ */
+ protected Map insertDirections(Connection conn) {
+ // Add directions
+ Map directionMap = new TreeMap();
+ PreparedStatement insertDirectionStmt = null;
+ try {
+ // Insert directions and get IDs.
+ for (Direction direction : Direction.values()) {
+ insertDirectionStmt = conn.prepareStatement(STMT_INSERT_DIRECTION, Statement.RETURN_GENERATED_KEYS);
+ insertDirectionStmt.setString(1, direction.toString());
+ if (insertDirectionStmt.executeUpdate() != 1) {
+ throw new SqoopException(DerbyRepoError.DERBYREPO_0046, "Could not add directions FROM and TO.");
+ }
+
+ ResultSet directionId = insertDirectionStmt.getGeneratedKeys();
+ if (directionId.next()) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Loaded direction: " + directionId.getLong(1));
+ }
+
+ directionMap.put(direction, directionId.getLong(1));
+ } else {
+ throw new SqoopException(DerbyRepoError.DERBYREPO_0047, "Could not get ID of direction " + direction);
+ }
+ }
+ } catch (SQLException e) {
+ throw new SqoopException(DerbyRepoError.DERBYREPO_0000, e);
+ } finally {
+ closeStatements(insertDirectionStmt);
+ }
+
+ return directionMap;
+ }
+
+ /**
+ * Add normalized M2M for SQ_CONNECTOR and SQ_CONFIG for Direction.
+ * 1. Remember all ID => direction for configs.
+ * 2. Drop SQF_DIRECTION (varhchar).
+ * 3. Add new M2M tables for SQ_CONNECTOR and SQ_CONFIG.
+ * 4. Add directions via updating SQ_CONFIG with proper Direction IDs.
+ * 5. Make sure all connectors have all supported directions.
+ * @param conn
+ */
+ protected void updateDirections(Connection conn, Map directionMap) {
+ // Remember directions
+ Statement fetchFormsStmt = null,
+ fetchConnectorsStmt = null;
+ List connectorIds = new LinkedList();
+ List configIds = new LinkedList();
+ List directions = new LinkedList();
+ try {
+ fetchFormsStmt = conn.createStatement();
+ ResultSet rs = fetchFormsStmt.executeQuery(STMT_FETCH_CONFIG_DIRECTIONS);
+ while (rs.next()) {
+ configIds.add(rs.getLong(1));
+ directions.add(rs.getString(2));
+ }
+ rs.close();
+ } catch (SQLException e) {
+ throw new SqoopException(DerbyRepoError.DERBYREPO_0000, e);
+ } finally {
+ closeStatements(fetchFormsStmt);
+ }
+
+ // Change Schema
+ runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_DROP_COLUMN_SQ_CFG_DIRECTION_VARCHAR, conn);
+ runQuery(QUERY_CREATE_TABLE_SQ_CONNECTOR_DIRECTIONS, conn);
+ runQuery(QUERY_CREATE_TABLE_SQ_CONFIG_DIRECTIONS, conn);
+
+ // Add directions back
+ while (!configIds.isEmpty() && !directions.isEmpty()) {
+ Long configId = configIds.remove(0);
+ String directionString = directions.remove(0);
+ if (directionString != null && !directionString.isEmpty()) {
+ Direction direction = Direction.valueOf(directionString);
+ runQuery(STMT_INSERT_SQ_CONFIG_DIRECTIONS, conn, configId, directionMap.get(direction));
+ }
+ }
+
+ // Add connector directions
+ try {
+ fetchConnectorsStmt = conn.createStatement();
+ ResultSet rs = fetchConnectorsStmt.executeQuery(STMT_SELECT_CONNECTOR_ALL);
+ while (rs.next()) {
+ connectorIds.add(rs.getLong(1));
+ }
+ rs.close();
+ } catch (SQLException e) {
+ throw new SqoopException(DerbyRepoError.DERBYREPO_0000, e);
+ } finally {
+ closeStatements(fetchConnectorsStmt);
+ }
+
+ for (Long connectorId : connectorIds) {
+ for (Long directionId : directionMap.values()) {
+ runQuery(STMT_INSERT_SQ_CONNECTOR_DIRECTIONS, conn, connectorId, directionId);
+ }
+ }
+ }
+
/**
* Upgrade job data from IMPORT/EXPORT to FROM/TO.
* Since the framework is no longer responsible for HDFS,
@@ -509,13 +657,13 @@ private void updteJobInternals(Connection conn, long connectorId) {
runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_UPDATE_DRIVER_INDEX, conn,
new Long(0), "throttling");
- MLink hdfsLink = createHdfsLink(conn);
+ Long linkId = createHdfsLink(conn, connectorId);
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_TO_LINK_COPY_SQB_FROM_LINK, conn,
"EXPORT");
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_FROM_LINK, conn,
- new Long(hdfsLink.getPersistenceId()), "EXPORT");
+ new Long(linkId), "EXPORT");
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_TO_LINK, conn,
- new Long(hdfsLink.getPersistenceId()), "IMPORT");
+ new Long(linkId), "IMPORT");
runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_UPDATE_SQ_CFG_NAME, conn,
"fromJobConfig", "table", Direction.FROM.toString());
@@ -556,7 +704,7 @@ protected long registerHdfsConnector(Connection conn) {
if (handler.getUniqueName().equals(CONNECTOR_HDFS)) {
try {
PreparedStatement baseConnectorStmt = conn.prepareStatement(
- STMT_INSERT_CONNECTOR_BASE,
+ STMT_INSERT_CONNECTOR_WITHOUT_SUPPORTED_DIRECTIONS,
Statement.RETURN_GENERATED_KEYS);
baseConnectorStmt.setString(1, handler.getMetadata().getUniqueName());
baseConnectorStmt.setString(2, handler.getMetadata().getClassName());
@@ -588,22 +736,46 @@ protected long registerHdfsConnector(Connection conn) {
*
* NOTE: Upgrade path only!
*/
- private MLink createHdfsLink(Connection conn) {
+ private Long createHdfsLink(Connection conn, Long connectorId) {
if (LOG.isTraceEnabled()) {
LOG.trace("Creating HDFS link.");
}
- MConnector hdfsConnector = this.findConnector(CONNECTOR_HDFS, conn);
- MLink hdfsLink = new MLink(
- hdfsConnector.getPersistenceId(),
- hdfsConnector.getLinkConfig());
- this.createLink(hdfsLink, conn);
+ PreparedStatement stmt = null;
+ int result;
+ try {
+ stmt = conn.prepareStatement(STMT_INSERT_LINK,
+ Statement.RETURN_GENERATED_KEYS);
+ stmt.setString(1, LINK_HDFS);
+ stmt.setLong(2, connectorId);
+ stmt.setBoolean(3, true);
+ stmt.setNull(4, Types.VARCHAR);
+ stmt.setTimestamp(5, new Timestamp(System.currentTimeMillis()));
+ stmt.setNull(6, Types.VARCHAR);
+ stmt.setTimestamp(7, new Timestamp(System.currentTimeMillis()));
- if (LOG.isTraceEnabled()) {
- LOG.trace("Created HDFS link.");
+ result = stmt.executeUpdate();
+ if (result != 1) {
+ throw new SqoopException(DerbyRepoError.DERBYREPO_0012,
+ Integer.toString(result));
+ }
+
+ ResultSet rsetConnectionId = stmt.getGeneratedKeys();
+
+ if (!rsetConnectionId.next()) {
+ throw new SqoopException(DerbyRepoError.DERBYREPO_0013);
+ }
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Created HDFS link.");
+ }
+
+ return rsetConnectionId.getLong(1);
+ } catch (SQLException ex) {
+ throw new SqoopException(DerbyRepoError.DERBYREPO_0019, ex);
+ } finally {
+ closeStatements(stmt);
}
-
- return hdfsLink;
}
/**
@@ -695,7 +867,7 @@ public void registerDriver(MDriver mDriver, Connection conn) {
// Register a driver config as a job type with no owner/connector and direction
registerConfigs(null/* owner*/, null /*direction*/, mDriver.getDriverConfig().getConfigs(),
- MConfigType.JOB.name(), baseConfigStmt, baseInputStmt);
+ MConfigType.JOB.name(), baseConfigStmt, baseInputStmt, conn);
// We're using hardcoded value for driver config as they are
// represented as NULL in the database.
@@ -1744,13 +1916,13 @@ private Counters loadCountersSubmission(long submissionId, Connection conn) thro
Counters counters = new Counters();
- while(rs.next()) {
+ while (rs.next()) {
String groupName = rs.getString(1);
String counterName = rs.getString(2);
long value = rs.getLong(3);
CounterGroup group = counters.getCounterGroup(groupName);
- if(group == null) {
+ if (group == null) {
group = new CounterGroup(groupName);
counters.addCounterGroup(group);
}
@@ -1758,7 +1930,7 @@ private Counters loadCountersSubmission(long submissionId, Connection conn) thro
group.addCounter(new Counter(counterName, value));
}
- if(counters.isEmpty()) {
+ if (counters.isEmpty()) {
return null;
} else {
return counters;
@@ -1769,7 +1941,83 @@ private Counters loadCountersSubmission(long submissionId, Connection conn) thro
}
}
- private List loadConnectors(PreparedStatement stmt,Connection conn) throws SQLException {
+ private Long getDirection(Direction direction, Connection conn) throws SQLException {
+ PreparedStatement directionStmt = null;
+ ResultSet rs = null;
+
+ try {
+ directionStmt = conn.prepareStatement(STMT_SELECT_SQD_ID_BY_SQD_NAME);
+ directionStmt.setString(1, direction.toString());
+ rs = directionStmt.executeQuery();
+
+ rs.next();
+ return rs.getLong(1);
+ } finally {
+ if (rs != null) {
+ closeResultSets(rs);
+ }
+ if (directionStmt != null) {
+ closeStatements(directionStmt);
+ }
+ }
+ }
+
+ private Direction getDirection(long directionId, Connection conn) throws SQLException {
+ PreparedStatement directionStmt = null;
+ ResultSet rs = null;
+
+ try {
+ directionStmt = conn.prepareStatement(STMT_SELECT_SQD_NAME_BY_SQD_ID);
+ directionStmt.setLong(1, directionId);
+ rs = directionStmt.executeQuery();
+
+ rs.next();
+ return Direction.valueOf(rs.getString(1));
+ } finally {
+ if (rs != null) {
+ closeResultSets(rs);
+ }
+ if (directionStmt != null) {
+ closeStatements(directionStmt);
+ }
+ }
+ }
+
+ private SupportedDirections findConnectorSupportedDirections(long connectorId, Connection conn) throws SQLException {
+ PreparedStatement connectorDirectionsStmt = null;
+ ResultSet rs = null;
+
+ boolean from = false, to = false;
+
+ try {
+ connectorDirectionsStmt = conn.prepareStatement(STMT_SELECT_SQ_CONNECTOR_DIRECTIONS);
+ connectorDirectionsStmt.setLong(1, connectorId);
+ rs = connectorDirectionsStmt.executeQuery();
+
+ while(rs.next()) {
+ switch(getDirection(rs.getLong(2), conn)) {
+ case FROM:
+ from = true;
+ break;
+
+ case TO:
+ to = true;
+ break;
+ }
+ }
+ } finally {
+ if (rs != null) {
+ closeResultSets(rs);
+ }
+ if (connectorDirectionsStmt != null) {
+ closeStatements(connectorDirectionsStmt);
+ }
+ }
+
+ return new SupportedDirections(from, to);
+ }
+
+ private List loadConnectors(PreparedStatement stmt, Connection conn) throws SQLException {
List connectors = new ArrayList();
ResultSet rsConnectors = null;
PreparedStatement connectorConfigFetchStmt = null;
@@ -1792,13 +2040,21 @@ private List loadConnectors(PreparedStatement stmt,Connection conn)
List fromConfig = new ArrayList();
List toConfig = new ArrayList();
- loadConfigTypes(linkConfig, fromConfig, toConfig,
- connectorConfigFetchStmt, connectorConfigInputFetchStmt, 1);
+ loadConfigTypes(linkConfig, fromConfig, toConfig, connectorConfigFetchStmt,
+ connectorConfigInputFetchStmt, 1, conn);
+ SupportedDirections supportedDirections
+ = findConnectorSupportedDirections(connectorId, conn);
+ MFromConfig fromJobConfig = null;
+ MToConfig toJobConfig = null;
+ if (supportedDirections.isDirectionSupported(Direction.FROM)) {
+ fromJobConfig = new MFromConfig(fromConfig);
+ }
+ if (supportedDirections.isDirectionSupported(Direction.TO)) {
+ toJobConfig = new MToConfig(toConfig);
+ }
MConnector mc = new MConnector(connectorName, connectorClassName, connectorVersion,
- new MLinkConfig(linkConfig),
- new MFromConfig(fromConfig),
- new MToConfig(toConfig));
+ new MLinkConfig(linkConfig), fromJobConfig, toJobConfig);
mc.setPersistenceId(connectorId);
connectors.add(mc);
@@ -1845,7 +2101,7 @@ private List loadLinks(PreparedStatement stmt,
List toConfig = new ArrayList();
loadConfigTypes(connectorLinkConfig, fromConfig, toConfig, connectorConfigFetchStatement,
- connectorConfigInputStatement, 2);
+ connectorConfigInputStatement, 2, conn);
MLink link = new MLink(connectorId, new MLinkConfig(connectorLinkConfig));
link.setPersistenceId(id);
@@ -1911,7 +2167,7 @@ private List loadJobs(PreparedStatement stmt,
List fromConnectorToJobConfig = new ArrayList();
loadConfigTypes(fromConnectorLinkConfig, fromConnectorFromJobConfig, fromConnectorToJobConfig,
- fromConfigFetchStmt, jobInputFetchStmt, 2);
+ fromConfigFetchStmt, jobInputFetchStmt, 2, conn);
// TO entity configs
List toConnectorLinkConfig = new ArrayList();
@@ -1922,7 +2178,7 @@ private List loadJobs(PreparedStatement stmt,
List driverConfig = new ArrayList();
loadConfigTypes(toConnectorLinkConfig, toConnectorFromJobConfig, toConnectorToJobConfig,
- toConfigFetchStmt, jobInputFetchStmt, 2);
+ toConfigFetchStmt, jobInputFetchStmt, 2, conn);
loadDriverConfigs(driverConfig, driverConfigfetchStmt, jobInputFetchStmt, 2);
@@ -1951,6 +2207,21 @@ private List loadJobs(PreparedStatement stmt,
return jobs;
}
+ private void registerConfigDirection(Long configId, Direction direction, Connection conn)
+ throws SQLException {
+ PreparedStatement stmt = null;
+ try {
+ stmt = conn.prepareStatement(STMT_INSERT_SQ_CONFIG_DIRECTIONS);
+ stmt.setLong(1, configId);
+ stmt.setLong(2, getDirection(direction, conn));
+ if (stmt.executeUpdate() != 1) {
+ throw new SqoopException(DerbyRepoError.DERBYREPO_0048);
+ }
+ } finally {
+ closeStatements(stmt);
+ }
+ }
+
/**
* Register configs in derby database. This method will insert the ids
* generated by the repository into the configs passed in itself.
@@ -1962,12 +2233,13 @@ private List loadJobs(PreparedStatement stmt,
* @param type
* @param baseConfigStmt
* @param baseInputStmt
+ * @param conn
* @return short number of configs registered.
* @throws SQLException
*/
private short registerConfigs(Long connectorId, Direction direction,
List configs, String type, PreparedStatement baseConfigStmt,
- PreparedStatement baseInputStmt)
+ PreparedStatement baseInputStmt, Connection conn)
throws SQLException {
short configIndex = 0;
@@ -1977,14 +2249,10 @@ private short registerConfigs(Long connectorId, Direction direction,
} else {
baseConfigStmt.setLong(1, connectorId);
}
- if(direction == null) {
- baseConfigStmt.setNull(2, Types.VARCHAR);
- } else {
- baseConfigStmt.setString(2, direction.name());
- }
- baseConfigStmt.setString(3, config.getName());
- baseConfigStmt.setString(4, type);
- baseConfigStmt.setShort(5, configIndex++);
+
+ baseConfigStmt.setString(2, config.getName());
+ baseConfigStmt.setString(3, type);
+ baseConfigStmt.setShort(4, configIndex++);
int baseConfigCount = baseConfigStmt.executeUpdate();
if (baseConfigCount != 1) {
@@ -1999,6 +2267,10 @@ private short registerConfigs(Long connectorId, Direction direction,
long configId = rsetConfigId.getLong(1);
config.setPersistenceId(configId);
+ if (direction != null) {
+ registerConfigDirection(configId, direction, conn);
+ }
+
// Insert all the inputs
List> inputs = config.getInputs();
registerConfigInputs(configId, inputs, baseInputStmt);
@@ -2071,7 +2343,7 @@ private void runQuery(String query, Connection conn, Object... args) {
} else if (args[i] instanceof Long) {
stmt.setLong(i + 1, (Long) args[i]);
} else {
- stmt.setObject(i, args[i]);
+ stmt.setObject(i + 1, args[i]);
}
}
@@ -2115,9 +2387,9 @@ public void loadDriverConfigs(List driverConfig,
while (rsetConfig.next()) {
long configId = rsetConfig.getLong(1);
Long fromConnectorId = rsetConfig.getLong(2);
- String configName = rsetConfig.getString(4);
- String configTYpe = rsetConfig.getString(5);
- int configIndex = rsetConfig.getInt(6);
+ String configName = rsetConfig.getString(3);
+ String configTYpe = rsetConfig.getString(4);
+ int configIndex = rsetConfig.getInt(5);
List> configInputs = new ArrayList>();
MConfig mDriverConfig = new MConfig(configName, configInputs);
@@ -2211,6 +2483,26 @@ public void loadDriverConfigs(List driverConfig,
}
}
+ private Direction findConfigDirection(long configId, Connection conn) throws SQLException {
+ PreparedStatement stmt = null;
+ ResultSet rs = null;
+
+ try {
+ stmt = conn.prepareStatement(STMT_SELECT_SQ_CONFIG_DIRECTIONS);
+ stmt.setLong(1, configId);
+ rs = stmt.executeQuery();
+ rs.next();
+ return getDirection(rs.getLong(2), conn);
+ } finally {
+ if (rs != null) {
+ closeResultSets(rs);
+ }
+ if (stmt != null) {
+ closeStatements(stmt);
+ }
+ }
+ }
+
/**
* Load configs and corresponding inputs from Derby database.
*
@@ -2222,21 +2514,21 @@ public void loadDriverConfigs(List driverConfig,
* @param toConfig TO job configs that will be filled up
* @param configFetchStmt Prepared statement for fetching configs
* @param inputFetchStmt Prepare statement for fetching inputs
+ * @param conn Connection object that is used to find config direction.
* @throws SQLException In case of any failure on Derby side
*/
public void loadConfigTypes(List linkConfig, List fromConfig,
List toConfig, PreparedStatement configFetchStmt, PreparedStatement inputFetchStmt,
- int configPosition) throws SQLException {
+ int configPosition, Connection conn) throws SQLException {
// Get list of structures from database
ResultSet rsetConfig = configFetchStmt.executeQuery();
while (rsetConfig.next()) {
long configId = rsetConfig.getLong(1);
Long configConnectorId = rsetConfig.getLong(2);
- String operation = rsetConfig.getString(3);
- String configName = rsetConfig.getString(4);
- String configType = rsetConfig.getString(5);
- int configIndex = rsetConfig.getInt(6);
+ String configName = rsetConfig.getString(3);
+ String configType = rsetConfig.getString(4);
+ int configIndex = rsetConfig.getInt(5);
List> configInputs = new ArrayList>();
MConfig config = new MConfig(configName, configInputs);
@@ -2324,7 +2616,7 @@ public void loadConfigTypes(List linkConfig, List fromConfig,
linkConfig.add(config);
break;
case JOB:
- Direction type = Direction.valueOf(operation);
+ Direction type = findConfigDirection(configId, conn);
List jobConfigs;
switch(type) {
case FROM:
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java
index ad749ed2..cf6e657c 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java
@@ -41,6 +41,17 @@ public final class DerbySchemaConstants {
public static final String COLUMN_SQM_VALUE = "SQM_VALUE";
+ // SQ_DIRECTION
+
+ public static final String TABLE_SQ_DIRECTION_NAME = "SQ_DIRECTION";
+
+ public static final String TABLE_SQ_DIRECTION = SCHEMA_PREFIX
+ + TABLE_SQ_DIRECTION_NAME;
+
+ public static final String COLUMN_SQD_ID = "SQD_ID";
+
+ public static final String COLUMN_SQD_NAME = "SQD_NAME";
+
// SQ_CONNECTOR
public static final String TABLE_SQ_CONNECTOR_NAME = "SQ_CONNECTOR";
@@ -56,6 +67,27 @@ public final class DerbySchemaConstants {
public static final String COLUMN_SQC_VERSION = "SQC_VERSION";
+ // SQ_CONNECTOR_DIRECTIONS
+
+ public static final String TABLE_SQ_CONNECTOR_DIRECTIONS_NAME = "SQ_CONNECTOR_DIRECTIONS";
+
+ public static final String TABLE_SQ_CONNECTOR_DIRECTIONS = SCHEMA_PREFIX
+ + TABLE_SQ_CONNECTOR_DIRECTIONS_NAME;
+
+ public static final String COLUMN_SQCD_ID = "SQCD_ID";
+
+ public static final String COLUMN_SQCD_CONNECTOR = "SQCD_CONNECTOR";
+
+ public static final String COLUMN_SQCD_DIRECTION = "SQCD_DIRECTION";
+
+ public static final String CONSTRAINT_SQCD_SQC_NAME = CONSTRAINT_PREFIX + "SQCD_SQC";
+
+ public static final String CONSTRAINT_SQCD_SQC = SCHEMA_PREFIX + CONSTRAINT_SQCD_SQC_NAME;
+
+ public static final String CONSTRAINT_SQCD_SQD_NAME = CONSTRAINT_PREFIX + "SQCD_SQD";
+
+ public static final String CONSTRAINT_SQCD_SQD = SCHEMA_PREFIX + CONSTRAINT_SQCD_SQD_NAME;
+
// SQ_CONFIG
public static final String TABLE_SQ_CONFIG_NAME = "SQ_CONFIG";
@@ -81,6 +113,27 @@ public final class DerbySchemaConstants {
public static final String CONSTRAINT_SQ_CFG_SQC = SCHEMA_PREFIX + CONSTRAINT_SQ_CFG_SQC_NAME;
+ // SQ_CONFIG_DIRECTIONS
+
+ public static final String TABLE_SQ_CONFIG_DIRECTIONS_NAME = "SQ_CONFIG_DIRECTIONS";
+
+ public static final String TABLE_SQ_CONFIG_DIRECTIONS = SCHEMA_PREFIX
+ + TABLE_SQ_CONFIG_DIRECTIONS_NAME;
+
+ public static final String COLUMN_SQ_CFG_DIR_ID = "SQ_CFG_DIR_ID";
+
+ public static final String COLUMN_SQ_CFG_DIR_CONFIG = "SQ_CFG_DIR_CONFIG";
+
+ public static final String COLUMN_SQ_CFG_DIR_DIRECTION = "SQ_CFG_DIR_DIRECTION";
+
+ public static final String CONSTRAINT_SQ_CFG_DIR_CONFIG_NAME = CONSTRAINT_PREFIX + "SQ_CFG_DIR_CONFIG";
+
+ public static final String CONSTRAINT_SQ_CFG_DIR_CONFIG = SCHEMA_PREFIX + CONSTRAINT_SQ_CFG_DIR_CONFIG_NAME;
+
+ public static final String CONSTRAINT_SQ_CFG_DIR_DIRECTION_NAME = CONSTRAINT_PREFIX + "SQ_CFG_DIR_DIRECTION";
+
+ public static final String CONSTRAINT_SQ_CFG_DIR_DIRECTION = SCHEMA_PREFIX + CONSTRAINT_SQ_CFG_DIR_DIRECTION_NAME;
+
// SQ_INPUT
public static final String TABLE_SQ_INPUT_NAME = "SQ_INPUT";
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
index 478afe25..56ea147a 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
@@ -35,31 +35,65 @@
*
*
*
+ * SQ_DIRECTION: Directions.
+ *
+ * +---------------------------------------+
+ * | SQ_DIRECTION |
+ * +---------------------------------------+
+ * | SQD_ID: BIGINT PK AUTO-GEN |
+ * | SQD_NAME: VARCHAR(64) | "FROM"|"TO"
+ * +---------------------------------------+
+ *
+ *
+ *
* SQ_CONNECTOR: Connector registration.
*
- * +----------------------------+
- * | SQ_CONNECTOR |
- * +----------------------------+
- * | SQC_ID: BIGINT PK AUTO-GEN |
- * | SQC_NAME: VARCHAR(64) |
- * | SQC_CLASS: VARCHAR(255) |
- * | SQC_VERSION: VARCHAR(64) |
- * +----------------------------+
+ * +-----------------------------+
+ * | SQ_CONNECTOR |
+ * +-----------------------------+
+ * | SQC_ID: BIGINT PK AUTO-GEN |
+ * | SQC_NAME: VARCHAR(64) |
+ * | SQC_CLASS: VARCHAR(255) |
+ * | SQC_VERSION: VARCHAR(64) |
+ * +-----------------------------+
+ *
+ *
+ *
+ * SQ_CONNECTOR_DIRECTIONS: Connector directions.
+ *
+ * +------------------------------+
+ * | SQ_CONNECTOR_DIRECTIONS |
+ * +------------------------------+
+ * | SQCD_ID: BIGINT PK AUTO-GEN |
+ * | SQCD_CONNECTOR: BIGINT | FK SQCD_CONNECTOR(SQC_ID)
+ * | SQCD_DIRECTION: BIGINT | FK SQCD_DIRECTION(SQD_ID)
+ * +------------------------------+
*
*
*
* SQ_CONFIG: Config details.
*
- * +----------------------------------+
- * | SQ_CONFIG |
- * +----------------------------------+
+ * +-------------------------------------+
+ * | SQ_CONFIG |
+ * +-------------------------------------+
* | SQ_CFG_ID: BIGINT PK AUTO-GEN |
* | SQ_CFG_OWNER: BIGINT | FK SQ_CFG_OWNER(SQC_ID),NULL for driver
- * | SQ_CFG_DIRECTION: VARCHAR(32) | "FROM"|"TO"|NULL
* | SQ_CFG_NAME: VARCHAR(64) |
* | SQ_CFG_TYPE: VARCHAR(32) | "LINK"|"JOB"
* | SQ_CFG_INDEX: SMALLINT |
- * +----------------------------------+
+ * +-------------------------------------+
+ *
+ *
+ *
+ * SQ_CONFIG_DIRECTIONS: Connector directions.
+ *
+ * +------------------------------+
+ * | SQ_CONNECTOR_DIRECTIONS |
+ * +------------------------------+
+ * | SQCD_ID: BIGINT PK AUTO-GEN |
+ * | SQCD_CONFIG: BIGINT | FK SQCD_CONFIG(SQ_CFG_ID)
+ * | SQCD_DIRECTION: BIGINT | FK SQCD_DIRECTION(SQD_ID)
+ * +------------------------------+
*
*
*
@@ -118,11 +152,11 @@
* SQ_LINK_INPUT: N:M relationship link and input
*
* +----------------------------+
- * | SQ_LINK_INPUT |
+ * | SQ_LINK_INPUT |
* +----------------------------+
- * | SQ_LNKI_LINK: BIGINT PK | FK SQ_LINK(SQ_LNK_ID)
- * | SQ_LNKI_INPUT: BIGINT PK | FK SQ_INPUT(SQI_ID)
- * | SQ_LNKI_VALUE: LONG VARCHAR |
+ * | SQ_LNKI_LINK: BIGINT PK | FK SQ_LINK(SQ_LNK_ID)
+ * | SQ_LNKI_INPUT: BIGINT PK | FK SQ_INPUT(SQI_ID)
+ * | SQ_LNKI_VALUE: LONG VARCHAR|
* +----------------------------+
*
*
@@ -212,6 +246,13 @@ public final class DerbySchemaQuery {
+ COLUMN_SQM_VALUE + " VARCHAR(64) "
+ ")";
+ // DDL: Create table SQ_DIRECTION
+ public static final String QUERY_CREATE_TABLE_SQ_DIRECTION =
+ "CREATE TABLE " + TABLE_SQ_DIRECTION + " ("
+ + COLUMN_SQD_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
+ + COLUMN_SQD_NAME + " VARCHAR(64)"
+ + ")";
+
// DDL: Create table SQ_CONNECTOR
public static final String QUERY_CREATE_TABLE_SQ_CONNECTOR =
"CREATE TABLE " + TABLE_SQ_CONNECTOR + " ("
@@ -221,6 +262,20 @@ public final class DerbySchemaQuery {
+ COLUMN_SQC_VERSION + " VARCHAR(64) "
+ ")";
+ // DDL: Create table SQ_CONNECTOR_DIRECTIONS
+ public static final String QUERY_CREATE_TABLE_SQ_CONNECTOR_DIRECTIONS =
+ "CREATE TABLE " + TABLE_SQ_CONNECTOR_DIRECTIONS + " ("
+ + COLUMN_SQCD_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
+ + COLUMN_SQCD_CONNECTOR + " BIGINT, "
+ + COLUMN_SQCD_DIRECTION + " BIGINT, "
+ + "CONSTRAINT " + CONSTRAINT_SQCD_SQC + " "
+ + "FOREIGN KEY (" + COLUMN_SQCD_CONNECTOR + ") "
+ + "REFERENCES " + TABLE_SQ_CONNECTOR + " (" + COLUMN_SQC_ID + "), "
+ + "CONSTRAINT " + CONSTRAINT_SQCD_SQD + " "
+ + "FOREIGN KEY (" + COLUMN_SQCD_DIRECTION + ") "
+ + "REFERENCES " + TABLE_SQ_DIRECTION + " (" + COLUMN_SQD_ID + ")"
+ + ")";
+
// DDL: Create table SQ_CONFIG ( It stores the configs defined by every connector), if connector is null then it is driver config
public static final String QUERY_CREATE_TABLE_SQ_CONFIG =
"CREATE TABLE " + TABLE_SQ_CONFIG + " ("
@@ -235,6 +290,20 @@ public final class DerbySchemaQuery {
+ "REFERENCES " + TABLE_SQ_CONNECTOR + " (" + COLUMN_SQC_ID + ")"
+ ")";
+ // DDL: Create table SQ_CONFIG_DIRECTIONS
+ public static final String QUERY_CREATE_TABLE_SQ_CONFIG_DIRECTIONS =
+ "CREATE TABLE " + TABLE_SQ_CONFIG_DIRECTIONS + " ("
+ + COLUMN_SQ_CFG_DIR_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
+ + COLUMN_SQ_CFG_DIR_CONFIG + " BIGINT, "
+ + COLUMN_SQ_CFG_DIR_DIRECTION + " BIGINT, "
+ + "CONSTRAINT " + CONSTRAINT_SQ_CFG_DIR_CONFIG + " "
+ + "FOREIGN KEY (" + COLUMN_SQ_CFG_DIR_CONFIG + ") "
+ + "REFERENCES " + TABLE_SQ_CONFIG + " (" + COLUMN_SQ_CFG_ID + "), "
+ + "CONSTRAINT " + CONSTRAINT_SQ_CFG_DIR_DIRECTION + " "
+ + "FOREIGN KEY (" + COLUMN_SQ_CFG_DIR_DIRECTION + ") "
+ + "REFERENCES " + TABLE_SQ_DIRECTION + " (" + COLUMN_SQD_ID + ")"
+ + ")";
+
// DDL: Create table SQ_INPUT
public static final String QUERY_CREATE_TABLE_SQ_INPUT =
"CREATE TABLE " + TABLE_SQ_INPUT + " ("
@@ -435,6 +504,14 @@ public final class DerbySchemaQuery {
+ COLUMN_SQM_VALUE + ") "
+ "VALUES(?, ?)";
+ public static final String STMT_SELECT_SQD_ID_BY_SQD_NAME =
+ "SELECT " + COLUMN_SQD_ID + " FROM " + TABLE_SQ_DIRECTION
+ + " WHERE " + COLUMN_SQD_NAME + "=?";
+
+ public static final String STMT_SELECT_SQD_NAME_BY_SQD_ID =
+ "SELECT " + COLUMN_SQD_NAME + " FROM " + TABLE_SQ_DIRECTION
+ + " WHERE " + COLUMN_SQD_ID + "=?";
+
// DML: Fetch connector Given Name
public static final String STMT_FETCH_BASE_CONNECTOR =
"SELECT "
@@ -459,7 +536,6 @@ public final class DerbySchemaQuery {
"SELECT "
+ COLUMN_SQ_CFG_ID + ", "
+ COLUMN_SQ_CFG_OWNER + ", "
- + COLUMN_SQ_CFG_DIRECTION + ", "
+ COLUMN_SQ_CFG_NAME + ", "
+ COLUMN_SQ_CFG_TYPE + ", "
+ COLUMN_SQ_CFG_INDEX
@@ -472,13 +548,12 @@ public final class DerbySchemaQuery {
"SELECT "
+ COLUMN_SQ_CFG_ID + ", "
+ COLUMN_SQ_CFG_OWNER + ", "
- + COLUMN_SQ_CFG_DIRECTION + ", "
+ COLUMN_SQ_CFG_NAME + ", "
+ COLUMN_SQ_CFG_TYPE + ", "
+ COLUMN_SQ_CFG_INDEX
+ " FROM " + TABLE_SQ_CONFIG
+ " WHERE " + COLUMN_SQ_CFG_OWNER + " IS NULL "
- + " ORDER BY " + COLUMN_SQ_CFG_TYPE + ", " + COLUMN_SQ_CFG_DIRECTION + ", " + COLUMN_SQ_CFG_INDEX;
+ + " ORDER BY " + COLUMN_SQ_CFG_TYPE + ", " + COLUMN_SQ_CFG_INDEX;
// DML: Fetch inputs for a given config
public static final String STMT_FETCH_INPUT =
@@ -544,15 +619,21 @@ public final class DerbySchemaQuery {
+ COLUMN_SQC_VERSION
+ ") VALUES (?, ?, ?)";
+ public static final String STMT_INSERT_CONNECTOR_WITHOUT_SUPPORTED_DIRECTIONS =
+ "INSERT INTO " + TABLE_SQ_CONNECTOR + " ("
+ + COLUMN_SQC_NAME + ", "
+ + COLUMN_SQC_CLASS + ", "
+ + COLUMN_SQC_VERSION
+ + ") VALUES (?, ?, ?)";
+
// DML: Insert config base
public static final String STMT_INSERT_CONFIG_BASE =
"INSERT INTO " + TABLE_SQ_CONFIG + " ("
+ COLUMN_SQ_CFG_OWNER + ", "
- + COLUMN_SQ_CFG_DIRECTION + ", "
+ COLUMN_SQ_CFG_NAME + ", "
+ COLUMN_SQ_CFG_TYPE + ", "
+ COLUMN_SQ_CFG_INDEX
- + ") VALUES ( ?, ?, ?, ?, ?)";
+ + ") VALUES ( ?, ?, ?, ?)";
// DML: Insert config input
public static final String STMT_INSERT_INPUT_BASE =
@@ -1058,6 +1139,45 @@ public final class DerbySchemaQuery {
"ALTER TABLE " + TABLE_SQ_LINK + " ADD CONSTRAINT "
+ CONSTRAINT_SQ_LNK_NAME_UNIQUE + " UNIQUE (" + COLUMN_SQ_LNK_NAME + ")";
+ public static final String STMT_INSERT_DIRECTION = "INSERT INTO " + TABLE_SQ_DIRECTION + " "
+ + "(" + COLUMN_SQD_NAME + ") VALUES (?)";
+
+ // DML: Fetch all configs
+ public static final String STMT_FETCH_CONFIG_DIRECTIONS =
+ "SELECT "
+ + COLUMN_SQ_CFG_ID + ", "
+ + COLUMN_SQ_CFG_DIRECTION
+ + " FROM " + TABLE_SQ_CONFIG;
+
+ public static final String QUERY_UPGRADE_TABLE_SQ_CONFIG_DROP_COLUMN_SQ_CFG_DIRECTION_VARCHAR =
+ "ALTER TABLE " + TABLE_SQ_CONFIG + " DROP COLUMN " + COLUMN_SQ_CFG_DIRECTION;
+
+ public static final String STMT_INSERT_SQ_CONNECTOR_DIRECTIONS =
+ "INSERT INTO " + TABLE_SQ_CONNECTOR_DIRECTIONS + " "
+ + "(" + COLUMN_SQCD_CONNECTOR + ", " + COLUMN_SQCD_DIRECTION + ")"
+ + " VALUES (?, ?)";
+
+ public static final String STMT_INSERT_SQ_CONFIG_DIRECTIONS =
+ "INSERT INTO " + TABLE_SQ_CONFIG_DIRECTIONS + " "
+ + "(" + COLUMN_SQ_CFG_DIR_CONFIG + ", " + COLUMN_SQ_CFG_DIR_DIRECTION + ")"
+ + " VALUES (?, ?)";
+
+ public static final String STMT_SELECT_SQ_CONNECTOR_DIRECTIONS_ALL =
+ "SELECT " + COLUMN_SQCD_CONNECTOR + ", " + COLUMN_SQCD_DIRECTION
+ + " FROM " + TABLE_SQ_CONNECTOR_DIRECTIONS;
+
+ public static final String STMT_SELECT_SQ_CONNECTOR_DIRECTIONS =
+ STMT_SELECT_SQ_CONNECTOR_DIRECTIONS_ALL + " WHERE "
+ + COLUMN_SQCD_CONNECTOR + " = ?";
+
+ public static final String STMT_SELECT_SQ_CONFIG_DIRECTIONS_ALL =
+ "SELECT " + COLUMN_SQ_CFG_DIR_CONFIG + ", " + COLUMN_SQ_CFG_DIR_DIRECTION
+ + " FROM " + TABLE_SQ_CONFIG_DIRECTIONS;
+
+ public static final String STMT_SELECT_SQ_CONFIG_DIRECTIONS =
+ STMT_SELECT_SQ_CONFIG_DIRECTIONS_ALL + " WHERE "
+ + COLUMN_SQ_CFG_DIR_CONFIG + " = ?";
+
private DerbySchemaQuery() {
// Disable explicit object creation
}
diff --git a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java
index 2da084f9..93166872 100644
--- a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java
+++ b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java
@@ -19,10 +19,13 @@
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_SCHEMA_SQOOP;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_CONFIG;
+import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_CONFIG_DIRECTIONS;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_CONNECTOR;
+import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_CONNECTOR_DIRECTIONS;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_COUNTER;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_COUNTER_GROUP;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_COUNTER_SUBMISSION;
+import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_DIRECTION;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_INPUT;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_JOB;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_JOB_INPUT;
@@ -31,6 +34,7 @@
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_SUBMISSION;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_SYSTEM;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_CONFIG_RENAME_COLUMN_SQ_CFG_OPERATION_TO_SQ_CFG_DIRECTION;
+import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_CONFIG_DROP_COLUMN_SQ_CFG_DIRECTION_VARCHAR;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_CREATION_USER;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_ENABLED;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_SQB_TO_LINK;
@@ -47,6 +51,8 @@
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_LINK_ADD_UNIQUE_CONSTRAINT_NAME;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_SUBMISSION_ADD_COLUMN_CREATION_USER;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_SUBMISSION_ADD_COLUMN_UPDATE_USER;
+import static org.apache.sqoop.repository.derby.DerbySchemaQuery.STMT_INSERT_DIRECTION;
+
import static org.junit.Assert.assertEquals;
import java.sql.Connection;
@@ -147,6 +153,7 @@ protected void createSchema(int version) throws Exception {
}
if (version > 3) {
+ runQuery(QUERY_CREATE_TABLE_SQ_DIRECTION);
runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_RENAME_COLUMN_SQ_CFG_OPERATION_TO_SQ_CFG_DIRECTION);
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_RENAME_COLUMN_SQB_LINK_TO_SQB_FROM_LINK);
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_SQB_TO_LINK);
@@ -156,6 +163,13 @@ protected void createSchema(int version) throws Exception {
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_REMOVE_COLUMN_SQB_TYPE);
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_UNIQUE_CONSTRAINT_NAME);
runQuery(QUERY_UPGRADE_TABLE_SQ_LINK_ADD_UNIQUE_CONSTRAINT_NAME);
+ runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_DROP_COLUMN_SQ_CFG_DIRECTION_VARCHAR);
+ runQuery(QUERY_CREATE_TABLE_SQ_CONNECTOR_DIRECTIONS);
+ runQuery(QUERY_CREATE_TABLE_SQ_CONFIG_DIRECTIONS);
+
+ for (Direction direction : Direction.values()) {
+ runQuery(STMT_INSERT_DIRECTION, direction.toString());
+ }
}
runQuery("INSERT INTO SQOOP.SQ_SYSTEM(SQM_KEY, SQM_VALUE) VALUES('version', '" + version + "')");
@@ -196,6 +210,42 @@ protected void runQuery(String query, Object... args) throws Exception {
}
}
+ /**
+ * Run single, arbitrary insert query on derby memory repository.
+ *
+ * @param query Query to execute
+ * @return Long id of newly inserted row (-1 if none).
+ * @throws Exception
+ */
+ protected Long runInsertQuery(String query, Object... args) throws Exception {
+ PreparedStatement stmt = null;
+ try {
+ stmt = getDerbyDatabaseConnection().prepareStatement(query, PreparedStatement.RETURN_GENERATED_KEYS);
+
+ for (int i = 0; i < args.length; ++i) {
+ if (args[i] instanceof String) {
+ stmt.setString(i + 1, (String)args[i]);
+ } else if (args[i] instanceof Long) {
+ stmt.setLong(i + 1, (Long)args[i]);
+ } else {
+ stmt.setString(i + 1, args[i].toString());
+ }
+ }
+
+ if (!stmt.execute()) {
+ ResultSet rs = stmt.getGeneratedKeys();
+ rs.next();
+ return rs.getLong(1);
+ }
+ } finally {
+ if (stmt != null) {
+ stmt.close();
+ }
+ }
+
+ return -1L;
+ }
+
protected Connection getDerbyDatabaseConnection() {
return connection;
}
@@ -291,54 +341,59 @@ protected void loadConnectorAndDriverConfigVersion2() throws Exception {
}
protected void loadConnectorAndDriverConfigVersion4() throws Exception {
+ Long configId;
+
// Connector entry
runQuery("INSERT INTO SQOOP.SQ_CONNECTOR(SQC_NAME, SQC_CLASS, SQC_VERSION)"
+ "VALUES('A', 'org.apache.sqoop.test.A', '1.0-test')");
for (String connector : new String[]{"1"}) {
+ // Directions
+ runQuery("INSERT INTO SQOOP.SQ_CONNECTOR_DIRECTIONS(SQCD_CONNECTOR, SQCD_DIRECTION)"
+ + "VALUES(" + connector + ", 1)");
+ runQuery("INSERT INTO SQOOP.SQ_CONNECTOR_DIRECTIONS(SQCD_CONNECTOR, SQCD_DIRECTION)"
+ + "VALUES(" + connector + ", 2)");
+
// connector configs
- for (String direction : new String[]{"null", "'FROM'", "'TO'"}) {
+ for (String direction : new String[]{null, "1", "2"}) {
String type;
- if (direction.equals("null")) {
+ if (direction == null) {
type = "LINK";
} else {
type = "JOB";
}
- runQuery("INSERT INTO SQOOP.SQ_CONFIG"
- + "(SQ_CFG_OWNER, SQ_CFG_DIRECTION, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
- + "VALUES("
- + connector + ", "
- + direction
- + ", 'C1', '"
- + type
- + "', 0)");
- runQuery("INSERT INTO SQOOP.SQ_CONFIG"
- + "(SQ_CFG_OWNER, SQ_CFG_DIRECTION, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
- + "VALUES("
- + connector + ", "
- + direction
- + ", 'C2', '"
- + type
- + "', 1)");
+ configId = runInsertQuery("INSERT INTO SQOOP.SQ_CONFIG"
+ + "(SQ_CFG_OWNER, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+ + "VALUES(" + connector + ", 'C1', '" + type + "', 0)");
+
+ if (direction != null) {
+ runInsertQuery("INSERT INTO SQOOP.SQ_CONFIG_DIRECTIONS"
+ + "(SQ_CFG_DIR_CONFIG, SQ_CFG_DIR_DIRECTION) "
+ + "VALUES(" + configId + ", " + direction + ")");
+ }
+
+ configId = runInsertQuery("INSERT INTO SQOOP.SQ_CONFIG"
+ + "(SQ_CFG_OWNER, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+ + "VALUES(" + connector + ", 'C2', '" + type + "', 1)");
+
+ if (direction != null) {
+ runInsertQuery("INSERT INTO SQOOP.SQ_CONFIG_DIRECTIONS"
+ + "(SQ_CFG_DIR_CONFIG, SQ_CFG_DIR_DIRECTION) "
+ + "VALUES(" + configId + ", " + direction + ")");
+ }
}
}
// driver config
for (String type : new String[]{"JOB"}) {
runQuery("INSERT INTO SQOOP.SQ_CONFIG"
- + "(SQ_CFG_OWNER, SQ_CFG_DIRECTION, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
- + "VALUES(NULL, NULL"
- + ", 'C1', '"
- + type
- + "', 0)");
+ + "(SQ_CFG_OWNER, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+ + "VALUES(NULL" + ", 'C1', '" + type + "', 0)");
runQuery("INSERT INTO SQOOP.SQ_CONFIG"
- + "(SQ_CFG_OWNER, SQ_CFG_DIRECTION, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
- + "VALUES(NULL, NULL"
- + ", 'C2', '"
- + type
- + "', 1)");
+ + "(SQ_CFG_OWNER, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+ + "VALUES(NULL" + ", 'C2', '" + type + "', 1)");
}
// Input entries
@@ -512,8 +567,10 @@ public void loadJobs() throws Exception {
*/
public void addConnector() throws Exception {
// Connector entry
- runQuery("INSERT INTO SQOOP.SQ_CONNECTOR(SQC_NAME, SQC_CLASS, SQC_VERSION)"
- + "VALUES('B', 'org.apache.sqoop.test.B', '1.0-test')");
+ Long connectorId = runInsertQuery("INSERT INTO SQOOP.SQ_CONNECTOR(SQC_NAME, SQC_CLASS, SQC_VERSION)"
+ + "VALUES('B', 'org.apache.sqoop.test.B', '1.0-test')");
+ runQuery("INSERT INTO SQOOP.SQ_CONNECTOR_DIRECTIONS (SQCD_CONNECTOR, SQCD_DIRECTION) VALUES (" + connectorId + ", 1)");
+ runQuery("INSERT INTO SQOOP.SQ_CONNECTOR_DIRECTIONS (SQCD_CONNECTOR, SQCD_DIRECTION) VALUES (" + connectorId + ", 2)");
}
/**
@@ -560,8 +617,20 @@ public void loadSubmissions() throws Exception {
}
protected MConnector getConnector() {
+ return getConnector(true, true);
+ }
+
+ protected MConnector getConnector(boolean from, boolean to) {
+ MFromConfig fromJobForms = null;
+ MToConfig toJobForms = null;
+ if (from) {
+ fromJobForms = getFromConfig();
+ }
+ if (to) {
+ toJobForms = getToConfig();
+ }
return new MConnector("A", "org.apache.sqoop.test.A", "1.0-test",
- getLinkConfig(), getFromConfig(), getToConfig());
+ getLinkConfig(), fromJobForms, toJobForms);
}
protected MDriver getDriver() {
diff --git a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestConnectorHandling.java b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestConnectorHandling.java
index a0e8b912..fc952227 100644
--- a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestConnectorHandling.java
+++ b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestConnectorHandling.java
@@ -78,8 +78,6 @@ public void testFindAllConnectors() throws Exception {
assertEquals(connectors.size(),2);
assertEquals(connectors.get(0).getUniqueName(),"A");
assertEquals(connectors.get(1).getUniqueName(),"B");
-
-
}
@Test
@@ -101,4 +99,64 @@ public void testRegisterConnector() throws Exception {
assertNotNull(retrieved);
assertEquals(connector, retrieved);
}
+
+ @Test
+ public void testFromDirection() throws Exception {
+ MConnector connector = getConnector(true, false);
+
+ handler.registerConnector(connector, getDerbyDatabaseConnection());
+
+ // Connector should get persistence ID
+ assertEquals(1, connector.getPersistenceId());
+
+ // Now check content in corresponding tables
+ assertCountForTable("SQOOP.SQ_CONNECTOR", 1);
+ assertCountForTable("SQOOP.SQ_CONFIG", 4);
+ assertCountForTable("SQOOP.SQ_INPUT", 8);
+
+ // Registered connector should be easily recovered back
+ MConnector retrieved = handler.findConnector("A", getDerbyDatabaseConnection());
+ assertNotNull(retrieved);
+ assertEquals(connector, retrieved);
+ }
+
+ @Test
+ public void testToDirection() throws Exception {
+ MConnector connector = getConnector(false, true);
+
+ handler.registerConnector(connector, getDerbyDatabaseConnection());
+
+ // Connector should get persistence ID
+ assertEquals(1, connector.getPersistenceId());
+
+ // Now check content in corresponding tables
+ assertCountForTable("SQOOP.SQ_CONNECTOR", 1);
+ assertCountForTable("SQOOP.SQ_CONFIG", 4);
+ assertCountForTable("SQOOP.SQ_INPUT", 8);
+
+ // Registered connector should be easily recovered back
+ MConnector retrieved = handler.findConnector("A", getDerbyDatabaseConnection());
+ assertNotNull(retrieved);
+ assertEquals(connector, retrieved);
+ }
+
+ @Test
+ public void testNeitherDirection() throws Exception {
+ MConnector connector = getConnector(false, false);
+
+ handler.registerConnector(connector, getDerbyDatabaseConnection());
+
+ // Connector should get persistence ID
+ assertEquals(1, connector.getPersistenceId());
+
+ // Now check content in corresponding tables
+ assertCountForTable("SQOOP.SQ_CONNECTOR", 1);
+ assertCountForTable("SQOOP.SQ_CONFIG", 2);
+ assertCountForTable("SQOOP.SQ_INPUT", 4);
+
+ // Registered connector should be easily recovered back
+ MConnector retrieved = handler.findConnector("A", getDerbyDatabaseConnection());
+ assertNotNull(retrieved);
+ assertEquals(connector, retrieved);
+ }
}
diff --git a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestJobHandling.java b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestJobHandling.java
index 0752923c..01a05b2a 100644
--- a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestJobHandling.java
+++ b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestJobHandling.java
@@ -31,10 +31,6 @@
import org.junit.Before;
import org.junit.Test;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
import static org.junit.Assert.*;
/**
diff --git a/shell/src/main/java/org/apache/sqoop/shell/ShowConnectorFunction.java b/shell/src/main/java/org/apache/sqoop/shell/ShowConnectorFunction.java
index 09fb195c..d605457a 100644
--- a/shell/src/main/java/org/apache/sqoop/shell/ShowConnectorFunction.java
+++ b/shell/src/main/java/org/apache/sqoop/shell/ShowConnectorFunction.java
@@ -35,7 +35,6 @@
@SuppressWarnings("serial")
public class ShowConnectorFunction extends SqoopFunction {
- private static final char SUPPORTED_DIRECTIONS_SEPARATOR = '/';
@SuppressWarnings("static-access")
public ShowConnectorFunction() {
@@ -83,7 +82,7 @@ private void showSummary() {
uniqueNames.add(connector.getUniqueName());
versions.add(connector.getVersion());
classes.add(connector.getClassName());
- supportedDirections.add(getSupportedDirections(connector));
+ supportedDirections.add(connector.getSupportedDirections().toString());
}
TableDisplayer.display(header, ids, uniqueNames, versions, classes, supportedDirections);
@@ -113,33 +112,8 @@ private void displayConnector(MConnector connector) {
connector.getUniqueName(),
connector.getClassName(),
connector.getVersion(),
- getSupportedDirections(connector)
+ connector.getSupportedDirections().toString()
);
displayConnectorConfigDetails(connector, client.getConnectorConfigBundle(connector.getPersistenceId()));
}
-
- /**
- * Creates a nicely formatted string for which directions are supported.
- * Example: FROM/TO.
- * @param connector
- * @return String
- */
- private String getSupportedDirections(MConnector connector) {
- StringBuffer supportedDirectionsBuffer = new StringBuffer();
- SupportedDirections supportedDirections
- = connector.getSupportedDirections();
-
- if (supportedDirections.isDirectionSupported(Direction.FROM)) {
- supportedDirectionsBuffer.append(Direction.FROM);
-
- if (supportedDirections.isDirectionSupported(Direction.TO)) {
- supportedDirectionsBuffer.append(SUPPORTED_DIRECTIONS_SEPARATOR);
- supportedDirectionsBuffer.append(Direction.TO);
- }
- } else if (supportedDirections.isDirectionSupported(Direction.TO)) {
- supportedDirectionsBuffer.append(Direction.TO);
- }
-
- return supportedDirectionsBuffer.toString();
- }
}
diff --git a/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java b/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java
index 42724709..5315e1fa 100644
--- a/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java
+++ b/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java
@@ -73,7 +73,7 @@ public List getSupportedDirections() {
public abstract From getFrom();
/**
- * @return an To that provides classes for performing export.
+ * @return an To that provides classes for performing export.n
*/
public abstract To getTo();