5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-16 17:00:53 +08:00

SQOOP-1075: Sqoop2: Persist Framework metadata version in repository

(Raghav Kumar Gautam via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2013-07-29 20:27:16 -07:00
parent 69ecc93d4a
commit a7d9b1b338
14 changed files with 165 additions and 22 deletions

View File

@ -184,7 +184,8 @@ private MConnector connector(long id) {
}
private MFramework framework() {
MFramework framework = new MFramework(new MConnectionForms(null), new LinkedList<MJobForms>());
MFramework framework = new MFramework(new MConnectionForms(null),
new LinkedList<MJobForms>(), "1");
framework.setPersistenceId(1);
return framework;
}

View File

@ -75,6 +75,7 @@ public JSONObject extract(boolean skipSensitive) {
JSONObject result = new JSONObject();
result.put(ID, framework.getPersistenceId());
result.put(FRAMEWORK_VERSION, framework.getVersion());
result.put(CON_FORMS, conForms);
result.put(JOB_FORMS, jobForms);
result.put(RESOURCES, extractResourceBundle(bundle));
@ -85,6 +86,7 @@ public JSONObject extract(boolean skipSensitive) {
@SuppressWarnings("unchecked")
public void restore(JSONObject jsonObject) {
long id = (Long) jsonObject.get(ID);
String frameworkVersion = (String) jsonObject.get(FRAMEWORK_VERSION);
List<MForm> connForms = restoreForms((JSONArray) jsonObject.get(CON_FORMS));
@ -101,7 +103,8 @@ public void restore(JSONObject jsonObject) {
jobs.add(new MJobForms(type, job));
}
framework = new MFramework(new MConnectionForms(connForms), jobs);
framework = new MFramework(new MConnectionForms(connForms), jobs,
frameworkVersion);
framework.setPersistenceId(id);
bundle = restoreResourceBundle((JSONObject) jsonObject.get(RESOURCES));

View File

@ -44,6 +44,7 @@ public final class FormSerialization {
public static final String ID = "id";
public static final String NAME = "name";
public static final String VERSION = "version";
public static final String FRAMEWORK_VERSION = "framework-version";
public static final String CLASS = "class";
public static final String CREATED = "created";
public static final String UPDATED = "updated";

View File

@ -30,11 +30,10 @@ public final class MConnector extends MFramework {
private final String uniqueName;
private final String className;
private final String version;
public MConnector(String uniqueName, String className, String version,
MConnectionForms connectionForms, List<MJobForms> jobForms) {
super(connectionForms, jobForms);
super(connectionForms, jobForms, version);
if (uniqueName == null || className == null) {
throw new NullPointerException();
@ -42,7 +41,6 @@ public MConnector(String uniqueName, String className, String version,
this.uniqueName = uniqueName;
this.className = className;
this.version = version;
}
public String getUniqueName() {
@ -53,10 +51,6 @@ public String getClassName() {
return className;
}
public String getVersion() {
return version;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder("connector-");

View File

@ -32,8 +32,11 @@ public class MFramework extends MPersistableEntity implements MClonable {
private final MConnectionForms connectionForms;
private final Map<MJob.Type, MJobForms> jobs;
String version;
public MFramework(MConnectionForms connectionForms, List<MJobForms> jobForms) {
public MFramework(MConnectionForms connectionForms, List<MJobForms> jobForms,
String version) {
this.version = version;
this.connectionForms = connectionForms;
this.jobs = new HashMap<MJob.Type, MJobForms>();
@ -52,6 +55,7 @@ public MFramework(MConnectionForms connectionForms, List<MJobForms> jobForms) {
public String toString() {
StringBuilder sb = new StringBuilder("framework-");
sb.append(getPersistenceId()).append(":");
sb.append("version = " + version);
sb.append(", ").append(connectionForms.toString());
for(MJobForms entry: jobs.values()) {
sb.append(entry.toString());
@ -71,7 +75,9 @@ public boolean equals(Object other) {
}
MFramework mo = (MFramework) other;
return connectionForms.equals(mo.connectionForms) && jobs.equals(mo.jobs);
return version.equals(mo.getVersion()) &&
connectionForms.equals(mo.connectionForms) &&
jobs.equals(mo.jobs);
}
@Override
@ -81,7 +87,7 @@ public int hashCode() {
for(MJobForms entry: jobs.values()) {
result = 31 * result + entry.hashCode();
}
result = 31 * result + version.hashCode();
return result;
}
@ -108,9 +114,19 @@ public MFramework clone(boolean cloneWithValue) {
copyJobForms.add(entry.clone(cloneWithValue));
}
}
MFramework copy = new MFramework(this.getConnectionForms().clone(cloneWithValue), copyJobForms);
MFramework copy = new MFramework(this.getConnectionForms().clone(cloneWithValue),
copyJobForms, this.version);
copy.setPersistenceId(this.getPersistenceId());
return copy;
}
public String getVersion() {
return version;
}
public void setVersion(String version) {
this.version = version;
}
}

View File

@ -44,7 +44,7 @@ public static MConnector getConnector(String name) {
}
public static MFramework getFramework() {
return new MFramework(getConnectionForms(), getAllJobForms());
return new MFramework(getConnectionForms(), getAllJobForms(), "1");
}
public static MConnection getConnection(String name) {

View File

@ -37,7 +37,7 @@ public void testFailureOnDuplicateJobTypes() {
jobForms.add(new MJobForms(MJob.Type.IMPORT, new ArrayList<MForm>()));
try {
new MFramework(connectionForms, jobForms);
new MFramework(connectionForms, jobForms, "1");
fail("We we're expecting exception for invalid usage");
} catch(Exception ex) {
// Expected case

View File

@ -111,6 +111,8 @@ public static void setInstance(FrameworkManager newInstance) {
*/
private static final boolean DEFAULT_AUTO_UPGRADE = false;
public static final String CURRENT_FRAMEWORK_VERSION = "1";
public Class getJobConfigurationClass(MJob.Type jobType) {
switch (jobType) {
case IMPORT:
@ -134,7 +136,8 @@ public FrameworkManager() {
FormUtils.toForms(getJobConfigurationClass(MJob.Type.IMPORT))));
jobForms.add(new MJobForms(MJob.Type.EXPORT,
FormUtils.toForms(getJobConfigurationClass(MJob.Type.EXPORT))));
mFramework = new MFramework(connectionForms, jobForms);
mFramework = new MFramework(connectionForms, jobForms,
CURRENT_FRAMEWORK_VERSION);
// Build validator
validator = new FrameworkValidator();

View File

@ -924,7 +924,7 @@ public void testFrameworkUpgradeHandlerUpdateJobError() {
doNothing().when(repoHandler).updateFramework(any(MFramework.class), any(Connection.class));
doReturn(true).when(repoHandler).existsConnection(anyLong(), any(Connection.class));
doReturn(true).when(repoHandler).existsJob(anyLong(), any(Connection.class));
doNothing().when(repoHandler).updateConnection(any(MConnection.class), any(Connection.class));;
doNothing().when(repoHandler).updateConnection(any(MConnection.class), any(Connection.class));
SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
"update job error.");
@ -969,13 +969,14 @@ private MFramework framework() {
jobForms.add(new MJobForms(MJob.Type.IMPORT, FormUtils.toForms(ImportJobConfiguration.class)));
MFramework framework = new MFramework(new MConnectionForms(new LinkedList<MForm>()),
jobForms);
jobForms, FrameworkManager.CURRENT_FRAMEWORK_VERSION);
framework.setPersistenceId(1);
return framework;
}
private MFramework anotherFramework() {
MFramework framework = new MFramework(null, new LinkedList<MJobForms>());
MFramework framework = new MFramework(null, new LinkedList<MJobForms>(),
FrameworkManager.CURRENT_FRAMEWORK_VERSION);
framework.setPersistenceId(1);
return framework;
}

View File

@ -23,6 +23,8 @@ public final class DerbyRepoConstants {
public static final String SYSKEY_VERSION = "version";
public static final String SYSKEY_FRAMEWORK_VERSION = "framework.version";
/**
* Expected version of the repository structures.
*

View File

@ -178,6 +178,7 @@ public enum DerbyRepoError implements ErrorCode {
/** Can't enable/disable job **/
DERBYREPO_0043("Can't enable/disable job"),
DERBYREPO_0044("Update of framework failed"),
;
private final String message;

View File

@ -41,6 +41,7 @@
import org.apache.log4j.Logger;
import org.apache.commons.lang.StringUtils;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.framework.FrameworkManager;
import org.apache.sqoop.model.MBooleanInput;
import org.apache.sqoop.model.MConnection;
import org.apache.sqoop.model.MConnectionForms;
@ -308,6 +309,59 @@ public int detectVersion(Connection conn) {
}
}
/**
* Detect version of the framework
*
* @param conn Connection to metadata repository
* @return Version of the MFramework
*/
private String detectFrameworkVersion (Connection conn) {
ResultSet rs = null;
PreparedStatement stmt = null;
try {
stmt = conn.prepareStatement(DerbySchemaQuery.STMT_SELECT_SYSTEM);
stmt.setString(1, DerbyRepoConstants.SYSKEY_FRAMEWORK_VERSION);
rs = stmt.executeQuery();
if(!rs.next()) {
return null;
}
return rs.getString(1);
} catch (SQLException e) {
LOG.info("Can't fetch framework version.", e);
return null;
} finally {
closeResultSets(rs);
closeStatements(stmt);
}
}
/**
* Create or update framework version
* @param conn Connection to the metadata repository
* @param mFramework
*/
private void createOrUpdateFrameworkVersion(Connection conn,
MFramework mFramework) {
ResultSet rs = null;
PreparedStatement stmt = null;
try {
stmt = conn.prepareStatement(STMT_DELETE_SYSTEM);
stmt.setString(1, DerbyRepoConstants.SYSKEY_FRAMEWORK_VERSION);
stmt.executeUpdate();
closeStatements(stmt);
stmt = conn.prepareStatement(STMT_INSERT_SYSTEM);
stmt.setString(1, DerbyRepoConstants.SYSKEY_FRAMEWORK_VERSION);
stmt.setString(2, mFramework.getVersion());
stmt.executeUpdate();
} catch (SQLException e) {
logException(e);
throw new SqoopException(DerbyRepoError.DERBYREPO_0044, e);
} finally {
closeResultSets(rs);
closeStatements(stmt);
}
}
/**
* {@inheritDoc}
@ -466,6 +520,7 @@ public void registerFramework(MFramework mf, Connection conn) {
} finally {
closeStatements(baseFormStmt, baseInputStmt);
}
createOrUpdateFrameworkVersion(conn, mf);
}
/**
@ -493,7 +548,7 @@ public MFramework findFramework(Connection conn) {
}
mf = new MFramework(new MConnectionForms(connectionForms),
convertToJobList(jobForms));
convertToJobList(jobForms), detectFrameworkVersion(conn));
// We're using hardcoded value for framework metadata as they are
// represented as NULL in the database.
@ -846,10 +901,11 @@ public void updateFramework(MFramework mFramework, Connection conn) {
} catch (SQLException e) {
logException(e, mFramework);
throw new SqoopException(DerbyRepoError.DERBYREPO_0038, e);
throw new SqoopException(DerbyRepoError.DERBYREPO_0044, e);
} finally {
closeStatements(deleteForm, deleteInput);
}
createOrUpdateFrameworkVersion(conn, mFramework);
insertFormsForFramework(mFramework, conn);
}
@ -1975,6 +2031,9 @@ private void createInputValues(String query,
* @param resultSets Result sets to close
*/
private void closeResultSets(ResultSet ... resultSets) {
if(resultSets == null) {
return;
}
for (ResultSet rs : resultSets) {
if(rs != null) {
try {
@ -1994,6 +2053,9 @@ private void closeResultSets(ResultSet ... resultSets) {
* @param stmts Statements to close
*/
private void closeStatements(Statement... stmts) {
if(stmts == null) {
return;
}
for (Statement stmt : stmts) {
if(stmt != null) {
try {

View File

@ -18,6 +18,7 @@
package org.apache.sqoop.repository.derby;
import junit.framework.TestCase;
import org.apache.sqoop.framework.FrameworkManager;
import org.apache.sqoop.model.MConnection;
import org.apache.sqoop.model.MConnectionForms;
import org.apache.sqoop.model.MConnector;
@ -102,6 +103,8 @@ protected void createSchema() throws Exception {
runQuery(QUERY_UPGRADE_TABLE_SQ_CONNECTION_ADD_COLUMN_ENABLED);
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_ENABLED);
runQuery("INSERT INTO SQOOP.SQ_SYSTEM(SQM_KEY, SQM_VALUE) VALUES('version', '2')");
runQuery("INSERT INTO SQOOP.SQ_SYSTEM(SQM_KEY, SQM_VALUE) " +
"VALUES('framework.version', '1')");
}
/**
@ -303,7 +306,8 @@ protected MConnector getConnector() {
}
protected MFramework getFramework() {
return new MFramework(getConnectionForms(), getJobForms());
return new MFramework(getConnectionForms(), getJobForms(),
FrameworkManager.CURRENT_FRAMEWORK_VERSION);
}
protected void fillConnection(MConnection connection) {

View File

@ -17,8 +17,13 @@
*/
package org.apache.sqoop.repository.derby;
import org.apache.sqoop.framework.FrameworkManager;
import org.apache.sqoop.model.MFramework;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
/**
* Test framework methods on Derby repository.
*/
@ -71,5 +76,55 @@ public void testRegisterConnector() throws Exception {
MFramework retrieved = handler.findFramework(getDerbyConnection());
assertNotNull(retrieved);
assertEquals(framework, retrieved);
assertEquals(framework.getVersion(), retrieved.getVersion());
}
private String getFrameworkVersion() throws Exception {
final String frameworkVersionQuery =
"SELECT SQM_VALUE FROM SQOOP.SQ_SYSTEM WHERE SQM_KEY=?";
String retVal = null;
PreparedStatement preparedStmt = null;
ResultSet resultSet = null;
try {
preparedStmt =
getDerbyConnection().prepareStatement(frameworkVersionQuery);
preparedStmt.setString(1, DerbyRepoConstants.SYSKEY_FRAMEWORK_VERSION);
resultSet = preparedStmt.executeQuery();
if(resultSet.next())
retVal = resultSet.getString(1);
return retVal;
} finally {
if(preparedStmt !=null) {
try {
preparedStmt.close();
} catch(SQLException e) {
}
}
if(resultSet != null) {
try {
resultSet.close();
} catch(SQLException e) {
}
}
}
}
public void testFrameworkVersion() throws Exception {
handler.registerFramework(getFramework(), getDerbyConnection());
final String lowerVersion = Integer.toString(
Integer.parseInt(FrameworkManager.CURRENT_FRAMEWORK_VERSION) - 1);
assertEquals(FrameworkManager.CURRENT_FRAMEWORK_VERSION, getFrameworkVersion());
runQuery("UPDATE SQOOP.SQ_SYSTEM SET SQM_VALUE='" + lowerVersion +
"' WHERE SQM_KEY = '" + DerbyRepoConstants.SYSKEY_FRAMEWORK_VERSION + "'");
assertEquals(lowerVersion, getFrameworkVersion());
MFramework framework = getFramework();
handler.updateFramework(framework, getDerbyConnection());
assertEquals(FrameworkManager.CURRENT_FRAMEWORK_VERSION, framework.getVersion());
assertEquals(FrameworkManager.CURRENT_FRAMEWORK_VERSION, getFrameworkVersion());
}
}