mirror of
https://github.com/apache/sqoop.git
synced 2025-05-21 19:31:13 +08:00
SQOOP-2356: Sqoop2: Derby upgrade should automatically rename jobs that have conflicting names
(Abraham Elmahrek via Gwen Shapira)
This commit is contained in:
parent
d2cf56680a
commit
9ee44d4f0a
@ -17,7 +17,14 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.sqoop.repository.common;
|
package org.apache.sqoop.repository.common;
|
||||||
|
|
||||||
|
import java.sql.ResultSet;
|
||||||
|
import java.sql.SQLException;
|
||||||
|
import java.sql.Statement;
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
|
|
||||||
public class CommonRepoUtils {
|
public class CommonRepoUtils {
|
||||||
|
public static final Logger LOG = Logger.getLogger(CommonRepoUtils.class);
|
||||||
|
|
||||||
public static final String QUOTE_CHARACTER = "\"";
|
public static final String QUOTE_CHARACTER = "\"";
|
||||||
|
|
||||||
public static final String escapeTableName(String tableName) {
|
public static final String escapeTableName(String tableName) {
|
||||||
@ -59,4 +66,49 @@ public static final String getConstraintName(String schemaName, String constrain
|
|||||||
return escapeConstraintName(constraintName);
|
return escapeConstraintName(constraintName);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Close all given statements.
|
||||||
|
*
|
||||||
|
* Any occurring exception is silently ignored and logged.
|
||||||
|
*
|
||||||
|
* @param stmts Statements to close
|
||||||
|
*/
|
||||||
|
public static final void closeStatements(Statement ... stmts) {
|
||||||
|
if(stmts == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (Statement stmt : stmts) {
|
||||||
|
if(stmt != null) {
|
||||||
|
try {
|
||||||
|
stmt.close();
|
||||||
|
} catch (SQLException ex) {
|
||||||
|
LOG.error("Exception during closing statement", ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Close all given Results set.
|
||||||
|
*
|
||||||
|
* Any occurring exception is silently ignored and logged.
|
||||||
|
*
|
||||||
|
* @param resultSets Result sets to close
|
||||||
|
*/
|
||||||
|
public static final void closeResultSets(ResultSet... resultSets) {
|
||||||
|
if(resultSets == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
for (ResultSet rs : resultSets) {
|
||||||
|
if(rs != null) {
|
||||||
|
try {
|
||||||
|
rs.close();
|
||||||
|
} catch(SQLException ex) {
|
||||||
|
LOG.error("Exception during closing result set", ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -2653,18 +2653,7 @@ protected void closeResultSets(ResultSet ... resultSets) {
|
|||||||
* @param stmts Statements to close
|
* @param stmts Statements to close
|
||||||
*/
|
*/
|
||||||
public void closeStatements(Statement... stmts) {
|
public void closeStatements(Statement... stmts) {
|
||||||
if(stmts == null) {
|
CommonRepoUtils.closeStatements(stmts);
|
||||||
return;
|
|
||||||
}
|
|
||||||
for (Statement stmt : stmts) {
|
|
||||||
if(stmt != null) {
|
|
||||||
try {
|
|
||||||
stmt.close();
|
|
||||||
} catch (SQLException ex) {
|
|
||||||
LOG.error("Exception during closing statement", ex);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -54,6 +54,7 @@
|
|||||||
import org.apache.sqoop.repository.common.CommonRepoConstants;
|
import org.apache.sqoop.repository.common.CommonRepoConstants;
|
||||||
import org.apache.sqoop.repository.common.CommonRepositoryHandler;
|
import org.apache.sqoop.repository.common.CommonRepositoryHandler;
|
||||||
import org.apache.sqoop.repository.common.CommonRepositorySchemaConstants;
|
import org.apache.sqoop.repository.common.CommonRepositorySchemaConstants;
|
||||||
|
import org.apache.sqoop.repository.derby.upgrade.UniqueJobRename;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* JDBC based repository handler for Derby database.
|
* JDBC based repository handler for Derby database.
|
||||||
@ -276,6 +277,7 @@ public void createOrUpgradeRepository(Connection conn) {
|
|||||||
renameConnectorToConfigurable(conn);
|
renameConnectorToConfigurable(conn);
|
||||||
|
|
||||||
// Add unique constraints on job and links for version 4 onwards
|
// Add unique constraints on job and links for version 4 onwards
|
||||||
|
new UniqueJobRename(conn).execute();
|
||||||
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_UNIQUE_CONSTRAINT_NAME, conn);
|
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_UNIQUE_CONSTRAINT_NAME, conn);
|
||||||
runQuery(QUERY_UPGRADE_TABLE_SQ_LINK_ADD_UNIQUE_CONSTRAINT_NAME, conn);
|
runQuery(QUERY_UPGRADE_TABLE_SQ_LINK_ADD_UNIQUE_CONSTRAINT_NAME, conn);
|
||||||
runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIGURABLE_ADD_UNIQUE_CONSTRAINT_NAME, conn);
|
runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIGURABLE_ADD_UNIQUE_CONSTRAINT_NAME, conn);
|
||||||
|
@ -636,4 +636,4 @@ public static final String getDropConstraintQuery(String schemaName, String tabl
|
|||||||
private DerbySchemaUpgradeQuery() {
|
private DerbySchemaUpgradeQuery() {
|
||||||
// Disable explicit object creation
|
// Disable explicit object creation
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,130 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.sqoop.repository.derby.upgrade;
|
||||||
|
|
||||||
|
import org.apache.commons.lang.StringUtils;
|
||||||
|
import org.apache.sqoop.common.SqoopException;
|
||||||
|
import org.apache.sqoop.error.code.DerbyRepoError;
|
||||||
|
import org.apache.sqoop.repository.common.CommonRepoUtils;
|
||||||
|
|
||||||
|
import java.sql.Connection;
|
||||||
|
import java.sql.PreparedStatement;
|
||||||
|
import java.sql.ResultSet;
|
||||||
|
import java.sql.SQLException;
|
||||||
|
import java.sql.Statement;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.TreeMap;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
import static org.apache.sqoop.repository.common.CommonRepositorySchemaConstants.COLUMN_SQB_ID;
|
||||||
|
import static org.apache.sqoop.repository.common.CommonRepositorySchemaConstants.COLUMN_SQB_NAME;
|
||||||
|
import static org.apache.sqoop.repository.common.CommonRepositorySchemaConstants.SCHEMA_SQOOP;
|
||||||
|
import static org.apache.sqoop.repository.common.CommonRepositorySchemaConstants.TABLE_SQ_JOB_NAME;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Rename all jobs that have the same name to a unique name.
|
||||||
|
* Just provide a suffix that auto-increments.
|
||||||
|
*/
|
||||||
|
public class UniqueJobRename {
|
||||||
|
public static final String QUERY_SELECT_JOBS_WITH_NON_UNIQUE_NAMES =
|
||||||
|
"SELECT j1." + CommonRepoUtils.escapeColumnName(COLUMN_SQB_ID)
|
||||||
|
+ ", j1." + CommonRepoUtils.escapeColumnName(COLUMN_SQB_NAME)
|
||||||
|
+ " FROM " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_JOB_NAME)
|
||||||
|
+ " AS j1 INNER JOIN ( SELECT " + CommonRepoUtils.escapeColumnName(COLUMN_SQB_NAME)
|
||||||
|
+ " FROM " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_JOB_NAME)
|
||||||
|
+ " GROUP BY (" + CommonRepoUtils.escapeColumnName(COLUMN_SQB_NAME) + ")"
|
||||||
|
+ " HAVING COUNT(*) > 1 ) AS j2 ON j1." + CommonRepoUtils.escapeColumnName(COLUMN_SQB_NAME)
|
||||||
|
+ " = j2." + CommonRepoUtils.escapeColumnName(COLUMN_SQB_NAME);
|
||||||
|
|
||||||
|
public static final String QUERY_UPDATE_JOB_NAME_BY_ID =
|
||||||
|
"UPDATE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_JOB_NAME)
|
||||||
|
+ " SET " + CommonRepoUtils.escapeColumnName(COLUMN_SQB_NAME) + " = ?"
|
||||||
|
+ " WHERE " + CommonRepoUtils.escapeColumnName(COLUMN_SQB_ID) + " = ?";
|
||||||
|
|
||||||
|
private Connection conn;
|
||||||
|
|
||||||
|
public UniqueJobRename(Connection conn) {
|
||||||
|
this.conn = conn;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void execute() {
|
||||||
|
Map<Long, String> idToNewNameMap = new TreeMap<Long, String>();
|
||||||
|
|
||||||
|
Statement fetchJobStmt = null;
|
||||||
|
PreparedStatement updateJobStmt = null;
|
||||||
|
ResultSet fetchJobResultSet = null;
|
||||||
|
|
||||||
|
// Fetch all non-unique job IDs and Names.
|
||||||
|
// Transform names.
|
||||||
|
try {
|
||||||
|
fetchJobStmt = conn.createStatement();
|
||||||
|
fetchJobResultSet = fetchJobStmt.executeQuery(QUERY_SELECT_JOBS_WITH_NON_UNIQUE_NAMES);
|
||||||
|
while (fetchJobResultSet.next()) {
|
||||||
|
idToNewNameMap.put(fetchJobResultSet.getLong(1), getNewName(fetchJobResultSet.getString(2)));
|
||||||
|
}
|
||||||
|
} catch (SQLException e) {
|
||||||
|
throw new SqoopException(DerbyRepoError.DERBYREPO_0000, e);
|
||||||
|
} finally {
|
||||||
|
CommonRepoUtils.closeResultSets(fetchJobResultSet);
|
||||||
|
CommonRepoUtils.closeStatements(fetchJobStmt);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
try {
|
||||||
|
updateJobStmt = conn.prepareStatement(QUERY_UPDATE_JOB_NAME_BY_ID);
|
||||||
|
for (Long jobId : idToNewNameMap.keySet()) {
|
||||||
|
updateJobStmt.setString(1, idToNewNameMap.get(jobId));
|
||||||
|
updateJobStmt.setLong(2, jobId);
|
||||||
|
updateJobStmt.addBatch();
|
||||||
|
}
|
||||||
|
|
||||||
|
int[] counts = updateJobStmt.executeBatch();
|
||||||
|
for (int count : counts) {
|
||||||
|
if (count != 1) {
|
||||||
|
throw new SqoopException(DerbyRepoError.DERBYREPO_0000,
|
||||||
|
"Update count wrong when changing names for non-unique jobs. Update coutns are: "
|
||||||
|
+ StringUtils.join(Arrays.asList(counts), ","));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (SQLException e) {
|
||||||
|
throw new SqoopException(DerbyRepoError.DERBYREPO_0000, e);
|
||||||
|
} finally {
|
||||||
|
CommonRepoUtils.closeResultSets(fetchJobResultSet);
|
||||||
|
CommonRepoUtils.closeStatements(fetchJobStmt);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Make new name from old name.
|
||||||
|
* New name takes on form: old name + "_" + uuid.
|
||||||
|
* New name will substring old name if old name is longer than 47 characters.
|
||||||
|
* @param oldName
|
||||||
|
* @return newName
|
||||||
|
*/
|
||||||
|
private String getNewName(String oldName) {
|
||||||
|
String suffix = "_" + UUID.randomUUID().toString();
|
||||||
|
// Make sure new name is max 64 characters.
|
||||||
|
int maxLength = 64 - suffix.length();
|
||||||
|
if (oldName.length() > maxLength) {
|
||||||
|
return oldName.substring(0, maxLength) + suffix;
|
||||||
|
} else {
|
||||||
|
return oldName + suffix;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -17,8 +17,16 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.sqoop.integration.repository.derby.upgrade;
|
package org.apache.sqoop.integration.repository.derby.upgrade;
|
||||||
|
|
||||||
|
import org.apache.sqoop.model.MJob;
|
||||||
|
import org.testng.annotations.Test;
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.TreeSet;
|
||||||
|
|
||||||
|
import static org.testng.Assert.assertFalse;
|
||||||
|
import static org.testng.AssertJUnit.assertEquals;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This version contains the following structures:
|
* This version contains the following structures:
|
||||||
@ -30,6 +38,8 @@
|
|||||||
* Job IMPORT with name "Job2" and id 2
|
* Job IMPORT with name "Job2" and id 2
|
||||||
* Job IMPORT with name "Job3" and id 3
|
* Job IMPORT with name "Job3" and id 3
|
||||||
* Job EXPORT with name "Job4" and id 4
|
* Job EXPORT with name "Job4" and id 4
|
||||||
|
* Job EXPORT with name "nonunique" and id 5
|
||||||
|
* Job EXPORT with name "nonunique" and id 6
|
||||||
* Link with id 4 has been disabled
|
* Link with id 4 has been disabled
|
||||||
* Job with id 3 has been disabled
|
* Job with id 3 has been disabled
|
||||||
* Job with id 1 has been run 5 times
|
* Job with id 1 has been run 5 times
|
||||||
@ -48,7 +58,7 @@ public int getNumberOfLinks() {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getNumberOfJobs() {
|
public int getNumberOfJobs() {
|
||||||
return 4;
|
return 6;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -78,6 +88,15 @@ public Integer[] getDeleteLinkIds() {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Integer[] getDeleteJobIds() {
|
public Integer[] getDeleteJobIds() {
|
||||||
return new Integer[] {1, 2, 3, 4};
|
return new Integer[] {1, 2, 3, 4, 5, 6};
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNonuniqueNames() throws Exception {
|
||||||
|
Set<String> jobNames = new TreeSet<String>();
|
||||||
|
for(MJob job : getClient().getJobs()) {
|
||||||
|
assertFalse(jobNames.contains(job.getName()));
|
||||||
|
jobNames.add(job.getName());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Binary file not shown.
Loading…
Reference in New Issue
Block a user