mirror of
https://github.com/apache/sqoop.git
synced 2025-05-05 04:20:08 +08:00
SQOOP-1790: Sqoop2: Upgrade configs with name conflicts
(Abraham Elmahrek via Jarek Jarcec Cecho)
This commit is contained in:
parent
37839d9ec5
commit
cb037c16a0
@ -32,83 +32,18 @@
|
||||
// NOTE: All config types have the similar upgrade path at this point
|
||||
public class GenericJdbcConnectorUpgrader extends ConnectorConfigurableUpgrader {
|
||||
|
||||
private static final String JOB_CONFIGURATION_FORM_NAME = "table";
|
||||
private static final String CONNECTION_CONFIGURATION_FORM_NAME = "connection";
|
||||
private static final Map<String, String> CONNECTION_TO_LINK_CONFIG_INPUT_MAP;
|
||||
private static final Map<String, String> IMPORT_JOB_TABLE_TO_FROM_CONFIG_INPUT_MAP;
|
||||
private static final Map<String, String> EXPORT_JOB_TABLE_TO_TO_CONFIG_INPUT_MAP;
|
||||
|
||||
static {
|
||||
CONNECTION_TO_LINK_CONFIG_INPUT_MAP = new HashMap<String, String>();
|
||||
CONNECTION_TO_LINK_CONFIG_INPUT_MAP.put(CONNECTION_CONFIGURATION_FORM_NAME + ".jdbcDriver", "linkConfig.jdbcDriver");
|
||||
CONNECTION_TO_LINK_CONFIG_INPUT_MAP.put(CONNECTION_CONFIGURATION_FORM_NAME + ".connectionString", "linkConfig.connectionString");
|
||||
CONNECTION_TO_LINK_CONFIG_INPUT_MAP.put(CONNECTION_CONFIGURATION_FORM_NAME + ".username", "linkConfig.username");
|
||||
CONNECTION_TO_LINK_CONFIG_INPUT_MAP.put(CONNECTION_CONFIGURATION_FORM_NAME + ".password", "linkConfig.password");
|
||||
CONNECTION_TO_LINK_CONFIG_INPUT_MAP.put(CONNECTION_CONFIGURATION_FORM_NAME + ".jdbcProperties", "linkConfig.jdbcProperties");
|
||||
|
||||
IMPORT_JOB_TABLE_TO_FROM_CONFIG_INPUT_MAP = new HashMap<String, String>();
|
||||
IMPORT_JOB_TABLE_TO_FROM_CONFIG_INPUT_MAP.put(JOB_CONFIGURATION_FORM_NAME + ".schemaName", "fromJobConfig.schemaName");
|
||||
IMPORT_JOB_TABLE_TO_FROM_CONFIG_INPUT_MAP.put(JOB_CONFIGURATION_FORM_NAME + ".tableName", "fromJobConfig.tableName");
|
||||
IMPORT_JOB_TABLE_TO_FROM_CONFIG_INPUT_MAP.put(JOB_CONFIGURATION_FORM_NAME + ".sql", "fromJobConfig.sql");
|
||||
IMPORT_JOB_TABLE_TO_FROM_CONFIG_INPUT_MAP.put(JOB_CONFIGURATION_FORM_NAME + ".columns", "fromJobConfig.columns");
|
||||
IMPORT_JOB_TABLE_TO_FROM_CONFIG_INPUT_MAP.put(JOB_CONFIGURATION_FORM_NAME + ".partitionColumn", "fromJobConfig.partitionColumn");
|
||||
IMPORT_JOB_TABLE_TO_FROM_CONFIG_INPUT_MAP.put(JOB_CONFIGURATION_FORM_NAME + ".partitionColumnNull", "fromJobConfig.allowNullValueInPartitionColumn");
|
||||
IMPORT_JOB_TABLE_TO_FROM_CONFIG_INPUT_MAP.put(JOB_CONFIGURATION_FORM_NAME + ".boundaryQuery", "fromJobConfig.boundaryQuery");
|
||||
|
||||
EXPORT_JOB_TABLE_TO_TO_CONFIG_INPUT_MAP = new HashMap<String, String>();
|
||||
EXPORT_JOB_TABLE_TO_TO_CONFIG_INPUT_MAP.put(JOB_CONFIGURATION_FORM_NAME + ".schemaName", "toJobConfig.schemaName");
|
||||
EXPORT_JOB_TABLE_TO_TO_CONFIG_INPUT_MAP.put(JOB_CONFIGURATION_FORM_NAME + ".tableName", "toJobConfig.tableName");
|
||||
EXPORT_JOB_TABLE_TO_TO_CONFIG_INPUT_MAP.put(JOB_CONFIGURATION_FORM_NAME + ".sql", "toJobConfig.sql");
|
||||
EXPORT_JOB_TABLE_TO_TO_CONFIG_INPUT_MAP.put(JOB_CONFIGURATION_FORM_NAME + ".columns", "toJobConfig.columns");
|
||||
EXPORT_JOB_TABLE_TO_TO_CONFIG_INPUT_MAP.put(JOB_CONFIGURATION_FORM_NAME + ".stageTableName", "toJobConfig.stageTableName");
|
||||
EXPORT_JOB_TABLE_TO_TO_CONFIG_INPUT_MAP.put(JOB_CONFIGURATION_FORM_NAME + ".clearStageTable", "toJobConfig.shouldClearStageTable");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void upgradeLinkConfig(MLinkConfig original, MLinkConfig upgradeTarget) {
|
||||
// Upgrade from 1.99.3 to 1.99.4
|
||||
for (MConfig config : original.getConfigs()) {
|
||||
if (config.getName().equals(CONNECTION_CONFIGURATION_FORM_NAME)) {
|
||||
for (MInput originalInput : config.getInputs()) {
|
||||
String inputName = CONNECTION_TO_LINK_CONFIG_INPUT_MAP.get(originalInput.getName());
|
||||
MInput input = upgradeTarget.getInput(inputName);
|
||||
input.setValue(originalInput.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), upgradeTarget.getConfigs());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void upgradeFromJobConfig(MFromConfig original, MFromConfig upgradeTarget) {
|
||||
// Upgrade from 1.99.3 to 1.99.4
|
||||
for (MConfig config : original.getConfigs()) {
|
||||
if (config.getName().equals(JOB_CONFIGURATION_FORM_NAME)) {
|
||||
for (MInput originalInput : config.getInputs()) {
|
||||
String inputName = IMPORT_JOB_TABLE_TO_FROM_CONFIG_INPUT_MAP.get(originalInput.getName());
|
||||
MInput input = upgradeTarget.getInput(inputName);
|
||||
input.setValue(originalInput.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), upgradeTarget.getConfigs());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void upgradeToJobConfig(MToConfig original, MToConfig upgradeTarget) {
|
||||
// Upgrade from 1.99.3 to 1.99.4
|
||||
for (MConfig config : original.getConfigs()) {
|
||||
if (config.getName().equals(JOB_CONFIGURATION_FORM_NAME)) {
|
||||
for (MInput originalInput : config.getInputs()) {
|
||||
String inputName = EXPORT_JOB_TABLE_TO_TO_CONFIG_INPUT_MAP.get(originalInput.getName());
|
||||
MInput input = upgradeTarget.getInput(inputName);
|
||||
input.setValue(originalInput.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), upgradeTarget.getConfigs());
|
||||
}
|
||||
}
|
||||
|
@ -68,31 +68,6 @@ public void testFromConfig() {
|
||||
assertEquals("test-columns", newConfigs.getInput("fromJobConfig.columns").getValue());
|
||||
assertEquals("test-partitionColumn", newConfigs.getInput("fromJobConfig.partitionColumn").getValue());
|
||||
assertEquals("test-allowNullValueInPartitionColumn", newConfigs.getInput("fromJobConfig.allowNullValueInPartitionColumn").getValue());
|
||||
|
||||
// 1.99.3 upgrade to 1.99.4
|
||||
originalConfigs = new MFromConfig(new LinkedList<MConfig>());
|
||||
newConfigs = new MFromConfig(ConfigUtils.toConfigs(FromJobConfiguration.class));
|
||||
originalConfigs.getConfigs().add(new MConfig("table", new LinkedList<MInput<?>>()));
|
||||
originalConfigs.getConfigs().get(0).getInputs().add(new MStringInput("table.schemaName", false, InputEditable.ANY, StringUtils.EMPTY, (short)50));
|
||||
originalConfigs.getConfigs().get(0).getInputs().add(new MStringInput("table.tableName", false, InputEditable.ANY, StringUtils.EMPTY, (short)50));
|
||||
originalConfigs.getConfigs().get(0).getInputs().add(new MStringInput("table.sql", false, InputEditable.ANY, StringUtils.EMPTY, (short)2000));
|
||||
originalConfigs.getConfigs().get(0).getInputs().add(new MStringInput("table.columns", false, InputEditable.ANY, StringUtils.EMPTY, (short)50));
|
||||
originalConfigs.getConfigs().get(0).getInputs().add(new MStringInput("table.partitionColumn", false, InputEditable.ANY, StringUtils.EMPTY, (short)50));
|
||||
originalConfigs.getConfigs().get(0).getInputs().add(new MBooleanInput("table.partitionColumnNull", false, InputEditable.ANY, StringUtils.EMPTY));
|
||||
originalConfigs.getConfigs().get(0).getInputs().add(new MStringInput("table.boundaryQuery", false, InputEditable.ANY, StringUtils.EMPTY, (short)50));
|
||||
originalConfigs.getInput("table.schemaName").setValue("test-schema");
|
||||
originalConfigs.getInput("table.tableName").setValue("test-tableName");
|
||||
originalConfigs.getInput("table.sql").setValue("test-sql");
|
||||
originalConfigs.getInput("table.columns").setValue("test-columns");
|
||||
originalConfigs.getInput("table.partitionColumn").setValue("test-partitionColumn");
|
||||
originalConfigs.getInput("table.partitionColumnNull").setValue("test-allowNullValueInPartitionColumn");
|
||||
upgrader.upgradeFromJobConfig(originalConfigs, newConfigs);
|
||||
assertEquals("test-schema", newConfigs.getInput("fromJobConfig.schemaName").getValue());
|
||||
assertEquals("test-tableName", newConfigs.getInput("fromJobConfig.tableName").getValue());
|
||||
assertEquals("test-sql", newConfigs.getInput("fromJobConfig.sql").getValue());
|
||||
assertEquals("test-columns", newConfigs.getInput("fromJobConfig.columns").getValue());
|
||||
assertEquals("test-partitionColumn", newConfigs.getInput("fromJobConfig.partitionColumn").getValue());
|
||||
assertEquals("test-allowNullValueInPartitionColumn", newConfigs.getInput("fromJobConfig.allowNullValueInPartitionColumn").getValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -114,31 +89,6 @@ public void testToConfig() {
|
||||
assertEquals("test-columns", newConfigs.getInput("toJobConfig.columns").getValue());
|
||||
assertEquals("test-stageTableName", newConfigs.getInput("toJobConfig.stageTableName").getValue());
|
||||
assertEquals("test-shouldClearStageTable", newConfigs.getInput("toJobConfig.shouldClearStageTable").getValue());
|
||||
|
||||
// 1.99.3 upgrade to 1.99.4
|
||||
originalConfigs = new MToConfig(new LinkedList<MConfig>());
|
||||
newConfigs = new MToConfig(ConfigUtils.toConfigs(ToJobConfiguration.class));
|
||||
originalConfigs.getConfigs().add(new MConfig("table", new LinkedList<MInput<?>>()));
|
||||
|
||||
originalConfigs.getConfigs().get(0).getInputs().add(new MStringInput("table.schemaName", false, InputEditable.ANY, StringUtils.EMPTY, (short)50));
|
||||
originalConfigs.getConfigs().get(0).getInputs().add(new MStringInput("table.tableName", false, InputEditable.ANY, StringUtils.EMPTY, (short)50));
|
||||
originalConfigs.getConfigs().get(0).getInputs().add(new MStringInput("table.sql", false, InputEditable.ANY, StringUtils.EMPTY,(short)2000));
|
||||
originalConfigs.getConfigs().get(0).getInputs().add(new MStringInput("table.columns", false, InputEditable.ANY, StringUtils.EMPTY, (short)50));
|
||||
originalConfigs.getConfigs().get(0).getInputs().add(new MStringInput("table.stageTableName", false, InputEditable.ANY, StringUtils.EMPTY, (short)50));
|
||||
originalConfigs.getConfigs().get(0).getInputs().add(new MBooleanInput("table.clearStageTable", false, InputEditable.ANY, StringUtils.EMPTY));
|
||||
originalConfigs.getInput("table.schemaName").setValue("test-schema");
|
||||
originalConfigs.getInput("table.tableName").setValue("test-tableName");
|
||||
originalConfigs.getInput("table.sql").setValue("test-sql");
|
||||
originalConfigs.getInput("table.columns").setValue("test-columns");
|
||||
originalConfigs.getInput("table.stageTableName").setValue("test-stageTableName");
|
||||
originalConfigs.getInput("table.clearStageTable").setValue("test-shouldClearStageTable");
|
||||
upgrader.upgradeToJobConfig(originalConfigs, newConfigs);
|
||||
assertEquals("test-schema", newConfigs.getInput("toJobConfig.schemaName").getValue());
|
||||
assertEquals("test-tableName", newConfigs.getInput("toJobConfig.tableName").getValue());
|
||||
assertEquals("test-sql", newConfigs.getInput("toJobConfig.sql").getValue());
|
||||
assertEquals("test-columns", newConfigs.getInput("toJobConfig.columns").getValue());
|
||||
assertEquals("test-stageTableName", newConfigs.getInput("toJobConfig.stageTableName").getValue());
|
||||
assertEquals("test-shouldClearStageTable", newConfigs.getInput("toJobConfig.shouldClearStageTable").getValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -158,26 +108,5 @@ public void testLinkConfig() {
|
||||
assertEquals("test-username", newConfigs.getInput("linkConfig.username").getValue());
|
||||
assertEquals("test-password", newConfigs.getInput("linkConfig.password").getValue());
|
||||
assertEquals("test-jdbcProperties", newConfigs.getInput("linkConfig.jdbcProperties").getValue());
|
||||
|
||||
// 1.99.3 upgrade to 1.99.4
|
||||
originalConfigs = new MLinkConfig(new LinkedList<MConfig>());
|
||||
newConfigs = new MLinkConfig(ConfigUtils.toConfigs(LinkConfiguration.class));
|
||||
originalConfigs.getConfigs().add(new MConfig("connection", new LinkedList<MInput<?>>()));
|
||||
originalConfigs.getConfigs().get(0).getInputs().add(new MStringInput("connection.jdbcDriver", false, InputEditable.ANY, StringUtils.EMPTY, (short)50));
|
||||
originalConfigs.getConfigs().get(0).getInputs().add(new MStringInput("connection.connectionString", false,InputEditable.ANY, StringUtils.EMPTY, (short)50));
|
||||
originalConfigs.getConfigs().get(0).getInputs().add(new MStringInput("connection.username", false, InputEditable.ANY, StringUtils.EMPTY, (short)2000));
|
||||
originalConfigs.getConfigs().get(0).getInputs().add(new MStringInput("connection.password", false, InputEditable.ANY, StringUtils.EMPTY, (short)50));
|
||||
originalConfigs.getConfigs().get(0).getInputs().add(new MStringInput("connection.jdbcProperties", false, InputEditable.ANY, StringUtils.EMPTY, (short)50));
|
||||
originalConfigs.getInput("connection.jdbcDriver").setValue("test-jdbcDriver");
|
||||
originalConfigs.getInput("connection.connectionString").setValue("test-connectionString");
|
||||
originalConfigs.getInput("connection.username").setValue("test-username");
|
||||
originalConfigs.getInput("connection.password").setValue("test-password");
|
||||
originalConfigs.getInput("connection.jdbcProperties").setValue("test-jdbcProperties");
|
||||
upgrader.upgradeLinkConfig(originalConfigs, newConfigs);
|
||||
assertEquals("test-jdbcDriver", newConfigs.getInput("linkConfig.jdbcDriver").getValue());
|
||||
assertEquals("test-connectionString", newConfigs.getInput("linkConfig.connectionString").getValue());
|
||||
assertEquals("test-username", newConfigs.getInput("linkConfig.username").getValue());
|
||||
assertEquals("test-password", newConfigs.getInput("linkConfig.password").getValue());
|
||||
assertEquals("test-jdbcProperties", newConfigs.getInput("linkConfig.jdbcProperties").getValue());
|
||||
}
|
||||
}
|
||||
|
@ -2523,7 +2523,7 @@ protected void closeResultSets(ResultSet ... resultSets) {
|
||||
*
|
||||
* @param stmts Statements to close
|
||||
*/
|
||||
protected void closeStatements(Statement... stmts) {
|
||||
public void closeStatements(Statement... stmts) {
|
||||
if(stmts == null) {
|
||||
return;
|
||||
}
|
||||
|
@ -77,6 +77,7 @@ public final class CommonRepositorySchemaConstants {
|
||||
|
||||
public static final String COLUMN_SQ_CFG_CONFIGURABLE = "SQ_CFG_CONFIGURABLE";
|
||||
|
||||
@Deprecated
|
||||
public static final String COLUMN_SQ_CFG_DIRECTION = "SQ_CFG_DIRECTION";
|
||||
|
||||
public static final String COLUMN_SQ_CFG_NAME = "SQ_CFG_NAME";
|
||||
|
@ -291,6 +291,13 @@ public void createOrUpgradeRepository(Connection conn) {
|
||||
updateDriverConfigInput(conn, driverId);
|
||||
LOG.info("Finished Updating config and inputs for the driver.");
|
||||
}
|
||||
|
||||
// Update generic jdbc connector
|
||||
if (repositoryVersion > 0) {
|
||||
DerbyUpgradeGenericJdbcConnectorConfigAndInputNames derbyUpgradeGenericJdbcConnectorConfigAndInputNames
|
||||
= new DerbyUpgradeGenericJdbcConnectorConfigAndInputNames(this, conn);
|
||||
derbyUpgradeGenericJdbcConnectorConfigAndInputNames.execute();
|
||||
}
|
||||
}
|
||||
|
||||
if (repositoryVersion < 5) {
|
||||
|
@ -598,6 +598,37 @@ public static final String getDropConstraintQuery(String schemaName, String tabl
|
||||
return queryBuilder.toString();
|
||||
}
|
||||
|
||||
// Update Generic Jdbc Connector configs
|
||||
|
||||
public static final String QUERY_UPDATE_TABLE_SQ_CONFIG_NAME =
|
||||
"UPDATE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_CONFIG_NAME)
|
||||
+ " SET " + CommonRepoUtils.escapeColumnName(COLUMN_SQ_CFG_NAME) + " = ?"
|
||||
+ " WHERE " + CommonRepoUtils.escapeColumnName(COLUMN_SQ_CFG_ID) + " = ?";
|
||||
|
||||
public static final String QUERY_UPDATE_TABLE_SQ_INPUT_SQI_NAME =
|
||||
"UPDATE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_INPUT_NAME)
|
||||
+ " SET " + CommonRepoUtils.escapeColumnName(COLUMN_SQI_NAME) + " = ?"
|
||||
+ " WHERE " + CommonRepoUtils.escapeColumnName(COLUMN_SQI_NAME) + " = ?"
|
||||
+ " AND " + CommonRepoUtils.escapeColumnName(COLUMN_SQI_CONFIG) + " = ?";
|
||||
|
||||
public static final String QUERY_SELECT_CONFIG_ID_BY_NAME =
|
||||
"SELECT " + CommonRepoUtils.escapeColumnName(COLUMN_SQ_CFG_ID)
|
||||
+ " FROM " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_CONFIG_NAME)
|
||||
+ " WHERE " + CommonRepoUtils.escapeColumnName(COLUMN_SQ_CFG_NAME) + " = ?";
|
||||
|
||||
public static final String QUERY_SELECT_DIRECTION_CONFIG_BY_DIRECTION_NAME =
|
||||
"SELECT " + CommonRepoUtils.escapeColumnName(COLUMN_SQ_CFG_DIR_CONFIG)
|
||||
+ " FROM " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_CONFIG_DIRECTIONS_NAME)
|
||||
+ " LEFT JOIN " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_DIRECTION_NAME)
|
||||
+ " ON " + CommonRepoUtils.escapeColumnName(COLUMN_SQ_CFG_DIR_DIRECTION)
|
||||
+ " = " + CommonRepoUtils.escapeColumnName(COLUMN_SQD_ID)
|
||||
+ " WHERE " + CommonRepoUtils.escapeColumnName(COLUMN_SQD_NAME) + " = ?";
|
||||
|
||||
public static final String QUERY_SELECT_CONFIG_ID_BY_NAME_AND_DIRECTION = QUERY_SELECT_CONFIG_ID_BY_NAME
|
||||
+ " AND " + CommonRepoUtils.escapeColumnName(COLUMN_SQ_CFG_ID) + " IN ("
|
||||
+ QUERY_SELECT_DIRECTION_CONFIG_BY_DIRECTION_NAME
|
||||
+ ")";
|
||||
|
||||
private DerbySchemaUpgradeQuery() {
|
||||
// Disable explicit object creation
|
||||
}
|
||||
|
@ -0,0 +1,196 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.sqoop.repository.derby;
|
||||
|
||||
import org.apache.commons.lang.ArrayUtils;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.sqoop.common.Direction;
|
||||
import org.apache.sqoop.common.SqoopException;
|
||||
import org.apache.sqoop.error.code.DerbyRepoError;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Sqoop 1.99.4 release included the following changes:
|
||||
* 1. FROM/TO refactoring
|
||||
* 2. Nomenclature improvements
|
||||
*
|
||||
* With the above improvements, the Generic JDBC Connector
|
||||
* and it's configurations need to be updated.
|
||||
*
|
||||
* This class is intended to handle the updates to the Generic JDBC Connector.
|
||||
*/
|
||||
public class DerbyUpgradeGenericJdbcConnectorConfigAndInputNames {
|
||||
private static final Logger LOG =
|
||||
Logger.getLogger(DerbyUpgradeGenericJdbcConnectorConfigAndInputNames.class);
|
||||
|
||||
private static final String JOB_CONFIGURATION_FORM_NAME = "table";
|
||||
private static final String CONNECTION_CONFIGURATION_FORM_NAME = "connection";
|
||||
private static final String LINK_CONFIG_NAME = "linkConfig";
|
||||
private static final String FROM_JOB_CONFIG_NAME = "fromJobConfig";
|
||||
private static final String TO_JOB_CONFIG_NAME = "toJobConfig";
|
||||
private static final Map<String, String> CONNECTION_TO_LINK_CONFIG_INPUT_MAP;
|
||||
private static final Map<String, String> IMPORT_JOB_TABLE_TO_FROM_CONFIG_INPUT_MAP;
|
||||
private static final Map<String, String> EXPORT_JOB_TABLE_TO_TO_CONFIG_INPUT_MAP;
|
||||
|
||||
static {
|
||||
CONNECTION_TO_LINK_CONFIG_INPUT_MAP = new HashMap<String, String>();
|
||||
CONNECTION_TO_LINK_CONFIG_INPUT_MAP.put(CONNECTION_CONFIGURATION_FORM_NAME + ".jdbcDriver", LINK_CONFIG_NAME + ".jdbcDriver");
|
||||
CONNECTION_TO_LINK_CONFIG_INPUT_MAP.put(CONNECTION_CONFIGURATION_FORM_NAME + ".connectionString", LINK_CONFIG_NAME + ".connectionString");
|
||||
CONNECTION_TO_LINK_CONFIG_INPUT_MAP.put(CONNECTION_CONFIGURATION_FORM_NAME + ".username", LINK_CONFIG_NAME + ".username");
|
||||
CONNECTION_TO_LINK_CONFIG_INPUT_MAP.put(CONNECTION_CONFIGURATION_FORM_NAME + ".password", LINK_CONFIG_NAME + ".password");
|
||||
CONNECTION_TO_LINK_CONFIG_INPUT_MAP.put(CONNECTION_CONFIGURATION_FORM_NAME + ".jdbcProperties", LINK_CONFIG_NAME + ".jdbcProperties");
|
||||
|
||||
IMPORT_JOB_TABLE_TO_FROM_CONFIG_INPUT_MAP = new HashMap<String, String>();
|
||||
IMPORT_JOB_TABLE_TO_FROM_CONFIG_INPUT_MAP.put(JOB_CONFIGURATION_FORM_NAME + ".schemaName", FROM_JOB_CONFIG_NAME + ".schemaName");
|
||||
IMPORT_JOB_TABLE_TO_FROM_CONFIG_INPUT_MAP.put(JOB_CONFIGURATION_FORM_NAME + ".tableName", FROM_JOB_CONFIG_NAME + ".tableName");
|
||||
IMPORT_JOB_TABLE_TO_FROM_CONFIG_INPUT_MAP.put(JOB_CONFIGURATION_FORM_NAME + ".sql", FROM_JOB_CONFIG_NAME + ".sql");
|
||||
IMPORT_JOB_TABLE_TO_FROM_CONFIG_INPUT_MAP.put(JOB_CONFIGURATION_FORM_NAME + ".columns", FROM_JOB_CONFIG_NAME + ".columns");
|
||||
IMPORT_JOB_TABLE_TO_FROM_CONFIG_INPUT_MAP.put(JOB_CONFIGURATION_FORM_NAME + ".partitionColumn", FROM_JOB_CONFIG_NAME + ".partitionColumn");
|
||||
IMPORT_JOB_TABLE_TO_FROM_CONFIG_INPUT_MAP.put(JOB_CONFIGURATION_FORM_NAME + ".partitionColumnNull", FROM_JOB_CONFIG_NAME + ".allowNullValueInPartitionColumn");
|
||||
IMPORT_JOB_TABLE_TO_FROM_CONFIG_INPUT_MAP.put(JOB_CONFIGURATION_FORM_NAME + ".boundaryQuery", FROM_JOB_CONFIG_NAME + ".boundaryQuery");
|
||||
|
||||
EXPORT_JOB_TABLE_TO_TO_CONFIG_INPUT_MAP = new HashMap<String, String>();
|
||||
EXPORT_JOB_TABLE_TO_TO_CONFIG_INPUT_MAP.put(JOB_CONFIGURATION_FORM_NAME + ".schemaName", TO_JOB_CONFIG_NAME + ".schemaName");
|
||||
EXPORT_JOB_TABLE_TO_TO_CONFIG_INPUT_MAP.put(JOB_CONFIGURATION_FORM_NAME + ".tableName", TO_JOB_CONFIG_NAME + ".tableName");
|
||||
EXPORT_JOB_TABLE_TO_TO_CONFIG_INPUT_MAP.put(JOB_CONFIGURATION_FORM_NAME + ".sql", TO_JOB_CONFIG_NAME + ".sql");
|
||||
EXPORT_JOB_TABLE_TO_TO_CONFIG_INPUT_MAP.put(JOB_CONFIGURATION_FORM_NAME + ".columns", TO_JOB_CONFIG_NAME + ".columns");
|
||||
EXPORT_JOB_TABLE_TO_TO_CONFIG_INPUT_MAP.put(JOB_CONFIGURATION_FORM_NAME + ".stageTableName", TO_JOB_CONFIG_NAME + ".stageTableName");
|
||||
EXPORT_JOB_TABLE_TO_TO_CONFIG_INPUT_MAP.put(JOB_CONFIGURATION_FORM_NAME + ".clearStageTable", TO_JOB_CONFIG_NAME + ".shouldClearStageTable");
|
||||
}
|
||||
|
||||
private Connection connection;
|
||||
private DerbyRepositoryHandler handler;
|
||||
|
||||
public DerbyUpgradeGenericJdbcConnectorConfigAndInputNames(DerbyRepositoryHandler handler, Connection connection) {
|
||||
this.handler = handler;
|
||||
this.connection = connection;
|
||||
}
|
||||
|
||||
public void execute() {
|
||||
LOG.info("Renaming Generic JDBC Connector configs and inputs.");
|
||||
|
||||
Long linkConfigId = getConfigId(false, CONNECTION_CONFIGURATION_FORM_NAME);
|
||||
Long fromJobConfigId = getConfigId(true, JOB_CONFIGURATION_FORM_NAME, Direction.FROM.toString());
|
||||
Long toJobConfigId = getConfigId(true, JOB_CONFIGURATION_FORM_NAME, Direction.TO.toString());
|
||||
|
||||
if (linkConfigId != null) {
|
||||
LOG.info("Renaming LINK config (" + linkConfigId + ") and inputs.");
|
||||
renameConfig(linkConfigId, LINK_CONFIG_NAME);
|
||||
renameConfigInputs(linkConfigId, CONNECTION_TO_LINK_CONFIG_INPUT_MAP);
|
||||
} else {
|
||||
LOG.info("Renaming LINK config and inputs skipped.");
|
||||
}
|
||||
|
||||
if (fromJobConfigId != null) {
|
||||
LOG.info("Renaming FROM config (" + fromJobConfigId + ") and inputs.");
|
||||
renameConfig(fromJobConfigId, FROM_JOB_CONFIG_NAME);
|
||||
renameConfigInputs(fromJobConfigId, IMPORT_JOB_TABLE_TO_FROM_CONFIG_INPUT_MAP);
|
||||
} else {
|
||||
LOG.info("Renaming FROM config and inputs skipped.");
|
||||
}
|
||||
|
||||
if (toJobConfigId != null) {
|
||||
LOG.info("Renaming TO config (" + fromJobConfigId + ") and inputs.");
|
||||
renameConfig(toJobConfigId, TO_JOB_CONFIG_NAME);
|
||||
renameConfigInputs(toJobConfigId, EXPORT_JOB_TABLE_TO_TO_CONFIG_INPUT_MAP);
|
||||
} else {
|
||||
LOG.info("Renaming TO config and inputs skipped.");
|
||||
}
|
||||
|
||||
LOG.info("Done Generic JDBC Connector configs and inputs.");
|
||||
}
|
||||
|
||||
private Long getConfigId(boolean direction, String ... args) {
|
||||
PreparedStatement statement = null;
|
||||
String configIdQuery = (direction) ?
|
||||
DerbySchemaUpgradeQuery.QUERY_SELECT_CONFIG_ID_BY_NAME_AND_DIRECTION : DerbySchemaUpgradeQuery.QUERY_SELECT_CONFIG_ID_BY_NAME;
|
||||
|
||||
try {
|
||||
statement = connection.prepareStatement(configIdQuery);
|
||||
|
||||
for (int i = 0; i < args.length; ++i) {
|
||||
statement.setString(i + 1, args[i]);
|
||||
}
|
||||
|
||||
ResultSet configIdResultSet = statement.executeQuery();
|
||||
|
||||
LOG.debug("QUERY(" + configIdQuery + ") with args [" + StringUtils.join(args, ",") + "] fetch size: " + configIdResultSet.getFetchSize());
|
||||
|
||||
if (!configIdResultSet.next() || configIdResultSet.getFetchSize() != 1) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return configIdResultSet.getLong(1);
|
||||
} catch (SQLException e) {
|
||||
throw new SqoopException(DerbyRepoError.DERBYREPO_0002, e);
|
||||
} finally {
|
||||
handler.closeStatements(statement);
|
||||
}
|
||||
}
|
||||
|
||||
private void renameConfig(long configId, String configName) {
|
||||
PreparedStatement statement = null;
|
||||
String query = DerbySchemaUpgradeQuery.QUERY_UPDATE_TABLE_SQ_CONFIG_NAME;
|
||||
|
||||
try {
|
||||
statement = connection.prepareStatement(query);
|
||||
statement.setString(1, configName);
|
||||
statement.setLong(2, configId);
|
||||
|
||||
int updateCount = statement.executeUpdate();
|
||||
LOG.debug("QUERY(" + query + ") with args [" + configName + ", " + configId + "] update count: " + updateCount);
|
||||
} catch (SQLException e) {
|
||||
throw new SqoopException(DerbyRepoError.DERBYREPO_0002, e);
|
||||
} finally {
|
||||
handler.closeStatements(statement);
|
||||
}
|
||||
}
|
||||
|
||||
private void renameConfigInputs(long configId, Map<String, String> inputNameMap) {
|
||||
PreparedStatement statement = null;
|
||||
|
||||
try {
|
||||
statement = connection.prepareStatement(DerbySchemaUpgradeQuery.QUERY_UPDATE_TABLE_SQ_INPUT_SQI_NAME);
|
||||
|
||||
for (String inputName : inputNameMap.keySet()) {
|
||||
statement.setString(1, inputNameMap.get(inputName));
|
||||
statement.setString(2, inputName);
|
||||
statement.setLong(3, configId);
|
||||
statement.addBatch();
|
||||
|
||||
LOG.debug("QUERY(" + DerbySchemaUpgradeQuery.QUERY_UPDATE_TABLE_SQ_INPUT_SQI_NAME + ") args ["
|
||||
+ inputNameMap.get(inputName) + "," + inputName + "," + configId + "]");
|
||||
}
|
||||
|
||||
int[] updateCounts = statement.executeBatch();
|
||||
LOG.debug("QUERY(" + DerbySchemaUpgradeQuery.QUERY_UPDATE_TABLE_SQ_INPUT_SQI_NAME + ") update count: "
|
||||
+ StringUtils.join(ArrayUtils.toObject(updateCounts), ","));
|
||||
} catch (SQLException e) {
|
||||
throw new SqoopException(DerbyRepoError.DERBYREPO_0002, e);
|
||||
} finally {
|
||||
handler.closeStatements(statement);
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,83 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.sqoop.integration.repository.derby.upgrade;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* This version contains the following structures:
|
||||
* Generic JDBC Connector link with name "Link1" and id 1
|
||||
* Generic JDBC Connector link with name "Link2" and id 2
|
||||
* Generic JDBC Connector link with name "Link3" and id 3
|
||||
* Generic JDBC Connector link with name "Link4" and id 4
|
||||
* Job IMPORT with name "Job1" and id 1
|
||||
* Job IMPORT with name "Job2" and id 2
|
||||
* Job IMPORT with name "Job3" and id 3
|
||||
* Job EXPORT with name "Job4" and id 4
|
||||
* Link with id 4 has been disabled
|
||||
* Job with id 3 has been disabled
|
||||
* Job with id 1 has been run 5 times
|
||||
*/
|
||||
public class Derby1_99_3UpgradeTest extends DerbyRepositoryUpgradeTest {
|
||||
|
||||
@Override
|
||||
public String getPathToRepositoryTarball() {
|
||||
return "/repository/derby/derby-repository-1.99.3.tar.gz";
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getNumberOfLinks() {
|
||||
return 5;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getNumberOfJobs() {
|
||||
return 4;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<Integer, Integer> getNumberOfSubmissions() {
|
||||
HashMap<Integer, Integer> ret = new HashMap<Integer, Integer>();
|
||||
ret.put(1, 5);
|
||||
ret.put(2, 0);
|
||||
ret.put(3, 0);
|
||||
ret.put(4, 0);
|
||||
return ret;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer[] getDisabledLinkIds() {
|
||||
return new Integer[] {4};
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer[] getDisabledJobIds() {
|
||||
return new Integer[] {3};
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer[] getDeleteLinkIds() {
|
||||
return new Integer[] {1, 2, 3, 4, 5};
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer[] getDeleteJobIds() {
|
||||
return new Integer[] {1, 2, 3, 4};
|
||||
}
|
||||
}
|
Binary file not shown.
Loading…
Reference in New Issue
Block a user