From 3fcd6c8768e9fdffc6be7354d71fbcd99c63127c Mon Sep 17 00:00:00 2001 From: Bilung Lee Date: Tue, 1 Nov 2011 07:39:43 +0000 Subject: [PATCH] SQOOP-375: Migrate metastore and metastore.hsqldb packages to new name space git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1195857 13f79535-47bb-0310-9956-ffa450edef68 --- .../com/cloudera/sqoop/metastore/JobData.java | 42 +- .../cloudera/sqoop/metastore/JobStorage.java | 74 +- .../sqoop/metastore/JobStorageFactory.java | 47 +- .../metastore/hsqldb/AutoHsqldbStorage.java | 97 +-- .../metastore/hsqldb/HsqldbJobStorage.java | 793 +---------------- .../metastore/hsqldb/HsqldbMetaStore.java | 158 +--- .../org/apache/sqoop/metastore/JobData.java | 69 ++ .../apache/sqoop/metastore/JobStorage.java | 94 ++ .../sqoop/metastore/JobStorageFactory.java | 73 ++ .../metastore/hsqldb/AutoHsqldbStorage.java | 115 +++ .../metastore/hsqldb/HsqldbJobStorage.java | 805 ++++++++++++++++++ .../metastore/hsqldb/HsqldbMetaStore.java | 180 ++++ 12 files changed, 1378 insertions(+), 1169 deletions(-) create mode 100644 src/java/org/apache/sqoop/metastore/JobData.java create mode 100644 src/java/org/apache/sqoop/metastore/JobStorage.java create mode 100644 src/java/org/apache/sqoop/metastore/JobStorageFactory.java create mode 100644 src/java/org/apache/sqoop/metastore/hsqldb/AutoHsqldbStorage.java create mode 100644 src/java/org/apache/sqoop/metastore/hsqldb/HsqldbJobStorage.java create mode 100644 src/java/org/apache/sqoop/metastore/hsqldb/HsqldbMetaStore.java diff --git a/src/java/com/cloudera/sqoop/metastore/JobData.java b/src/java/com/cloudera/sqoop/metastore/JobData.java index 12187a5a..94194d77 100644 --- a/src/java/com/cloudera/sqoop/metastore/JobData.java +++ b/src/java/com/cloudera/sqoop/metastore/JobData.java @@ -1,6 +1,4 @@ /** - * Copyright 2011 The Apache Software Foundation - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -24,47 +22,17 @@ import com.cloudera.sqoop.tool.SqoopTool; /** - * Container for all job data that should be stored to a - * permanent resource. + * @deprecated Moving to use org.apache.sqoop namespace. */ -public class JobData { - private SqoopOptions opts; - private SqoopTool tool; +public class JobData + extends org.apache.sqoop.metastore.JobData { public JobData() { + super(); } public JobData(SqoopOptions options, SqoopTool sqoopTool) { - this.opts = options; - this.tool = sqoopTool; - } - - /** - * Gets the SqoopOptions. - */ - public SqoopOptions getSqoopOptions() { - return this.opts; - } - - /** - * Gets the SqoopTool. - */ - public SqoopTool getSqoopTool() { - return this.tool; - } - - /** - * Sets the SqoopOptions. - */ - public void setSqoopOptions(SqoopOptions options) { - this.opts = options; - } - - /** - * Sets the SqoopTool. - */ - public void setSqoopTool(SqoopTool sqoopTool) { - this.tool = sqoopTool; + super(options, sqoopTool); } } diff --git a/src/java/com/cloudera/sqoop/metastore/JobStorage.java b/src/java/com/cloudera/sqoop/metastore/JobStorage.java index 599ff14f..bbc6aeac 100644 --- a/src/java/com/cloudera/sqoop/metastore/JobStorage.java +++ b/src/java/com/cloudera/sqoop/metastore/JobStorage.java @@ -1,6 +1,4 @@ /** - * Copyright 2011 The Apache Software Foundation - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,76 +18,10 @@ package com.cloudera.sqoop.metastore; -import java.io.Closeable; -import java.io.IOException; -import java.util.List; -import java.util.Map; - -import org.apache.hadoop.conf.Configured; - /** - * API that defines how jobs are saved, restored, and manipulated. - * - *

- * JobStorage instances may be created and then not used; the - * JobStorage factory may create additional JobStorage instances - * that return false from accept() and then discard them. The close() - * method will only be triggered for a JobStorage if the connect() - * method is called. Connection should not be triggered by a call to - * accept().

+ * @deprecated Moving to use org.apache.sqoop namespace. */ -public abstract class JobStorage extends Configured implements Closeable { - - /** - * Returns true if the JobStorage system can use the metadata in - * the descriptor to connect to an underlying storage resource. - */ - public abstract boolean canAccept(Map descriptor); - - - /** - * Opens / connects to the underlying storage resource specified by the - * descriptor. - */ - public abstract void open(Map descriptor) - throws IOException; - - /** - * Given a job name, reconstitute a JobData that contains all - * configuration information required for the job. Returns null if the - * job name does not match an available job. - */ - public abstract JobData read(String jobName) - throws IOException; - - /** - * Forget about a saved job. - */ - public abstract void delete(String jobName) throws IOException; - - /** - * Given a job name and the data describing a configured job, record the job - * information to the storage medium. - */ - public abstract void create(String jobName, JobData data) - throws IOException; - - /** - * Given a job name and configured job data, update the underlying resource - * to match the current job configuration. - */ - public abstract void update(String jobName, JobData data) - throws IOException; - - /** - * Close any resources opened by the JobStorage system. - */ - public void close() throws IOException { - } - - /** - * Enumerate all jobs held in the connected resource. - */ - public abstract List list() throws IOException; +public abstract class JobStorage + extends org.apache.sqoop.metastore.JobStorage { } diff --git a/src/java/com/cloudera/sqoop/metastore/JobStorageFactory.java b/src/java/com/cloudera/sqoop/metastore/JobStorageFactory.java index aea31a29..3809e6b5 100644 --- a/src/java/com/cloudera/sqoop/metastore/JobStorageFactory.java +++ b/src/java/com/cloudera/sqoop/metastore/JobStorageFactory.java @@ -1,6 +1,4 @@ /** - * Copyright 2011 The Apache Software Foundation - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,55 +18,20 @@ package com.cloudera.sqoop.metastore; -import java.util.List; -import java.util.Map; - import org.apache.hadoop.conf.Configuration; /** - * Factory that produces the correct JobStorage system to work with - * a particular job descriptor. + * @deprecated Moving to use org.apache.sqoop namespace. */ -public class JobStorageFactory { +public class JobStorageFactory + extends org.apache.sqoop.metastore.JobStorageFactory { - private Configuration conf; - - /** - * Configuration key describing the list of JobStorage implementations - * to use to handle jobs. - */ public static final String AVAILABLE_STORAGES_KEY = - "sqoop.job.storage.implementations"; - - /** The default list of available JobStorage implementations. */ - private static final String DEFAULT_AVAILABLE_STORAGES = - "com.cloudera.sqoop.metastore.hsqldb.HsqldbJobStorage," - + "com.cloudera.sqoop.metastore.hsqldb.AutoHsqldbStorage"; + org.apache.sqoop.metastore.JobStorageFactory.AVAILABLE_STORAGES_KEY; public JobStorageFactory(Configuration config) { - this.conf = config; - - // Ensure that we always have an available storages list. - if (this.conf.get(AVAILABLE_STORAGES_KEY) == null) { - this.conf.set(AVAILABLE_STORAGES_KEY, DEFAULT_AVAILABLE_STORAGES); - } + super(config); } - /** - * Given a storage descriptor, determine the correct JobStorage - * implementation to use to connect to the storage resource and return an - * instance of it -- or null if no JobStorage instance is appropriate. - */ - public JobStorage getJobStorage(Map descriptor) { - List storages = this.conf.getInstances( - AVAILABLE_STORAGES_KEY, JobStorage.class); - for (JobStorage stor : storages) { - if (stor.canAccept(descriptor)) { - return stor; - } - } - - return null; - } } diff --git a/src/java/com/cloudera/sqoop/metastore/hsqldb/AutoHsqldbStorage.java b/src/java/com/cloudera/sqoop/metastore/hsqldb/AutoHsqldbStorage.java index bfecc91b..259d9f63 100644 --- a/src/java/com/cloudera/sqoop/metastore/hsqldb/AutoHsqldbStorage.java +++ b/src/java/com/cloudera/sqoop/metastore/hsqldb/AutoHsqldbStorage.java @@ -1,6 +1,4 @@ /** - * Copyright 2011 The Apache Software Foundation - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,97 +18,24 @@ package com.cloudera.sqoop.metastore.hsqldb; -import java.io.File; -import java.io.IOException; - -import java.util.Map; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import org.apache.hadoop.conf.Configuration; - /** - * JobStorage implementation that auto-configures an HSQLDB - * local-file-based instance to hold jobs. + * @deprecated Moving to use org.apache.sqoop namespace. */ -public class AutoHsqldbStorage extends HsqldbJobStorage { +public class AutoHsqldbStorage + extends org.apache.sqoop.metastore.hsqldb.AutoHsqldbStorage { - public static final Log LOG = LogFactory.getLog( - AutoHsqldbStorage.class.getName()); - - /** - * Configuration key specifying whether this storage agent is active. - * Defaults to "on" to allow zero-conf local users. - */ public static final String AUTO_STORAGE_IS_ACTIVE_KEY = - "sqoop.metastore.client.enable.autoconnect"; - - /** - * Configuration key specifying the connect string used by this - * storage agent. - */ + org.apache.sqoop.metastore.hsqldb. + AutoHsqldbStorage.AUTO_STORAGE_IS_ACTIVE_KEY; public static final String AUTO_STORAGE_CONNECT_STRING_KEY = - "sqoop.metastore.client.autoconnect.url"; - - /** - * Configuration key specifying the username to bind with. - */ + org.apache.sqoop.metastore.hsqldb. + AutoHsqldbStorage.AUTO_STORAGE_CONNECT_STRING_KEY; public static final String AUTO_STORAGE_USER_KEY = - "sqoop.metastore.client.autoconnect.username"; - - - /** HSQLDB default user is named 'SA'. */ - private static final String DEFAULT_AUTO_USER = "SA"; - - /** - * Configuration key specifying the password to bind with. - */ + org.apache.sqoop.metastore.hsqldb.AutoHsqldbStorage.AUTO_STORAGE_USER_KEY; public static final String AUTO_STORAGE_PASS_KEY = - "sqoop.metastore.client.autoconnect.password"; + org.apache.sqoop.metastore.hsqldb.AutoHsqldbStorage.AUTO_STORAGE_PASS_KEY; + public static final String DEFAULT_AUTO_PASSWORD = + org.apache.sqoop.metastore.hsqldb.AutoHsqldbStorage.DEFAULT_AUTO_PASSWORD; - /** HSQLDB default user has an empty password. */ - public static final String DEFAULT_AUTO_PASSWORD = ""; - - @Override - /** {@inheritDoc} */ - public boolean canAccept(Map descriptor) { - Configuration conf = this.getConf(); - return conf.getBoolean(AUTO_STORAGE_IS_ACTIVE_KEY, true); - } - - /** - * Determine the user's home directory and return a connect - * string to HSQLDB that uses ~/.sqoop/ as the storage location - * for the metastore database. - */ - private String getHomeDirFileConnectStr() { - String homeDir = System.getProperty("user.home"); - - File homeDirObj = new File(homeDir); - File sqoopDataDirObj = new File(homeDirObj, ".sqoop"); - File databaseFileObj = new File(sqoopDataDirObj, "metastore.db"); - - String dbFileStr = databaseFileObj.toString(); - return "jdbc:hsqldb:file:" + dbFileStr - + ";hsqldb.write_delay=false;shutdown=true"; - } - - @Override - /** - * Set the connection information to use the auto-inferred connection - * string. - */ - public void open(Map descriptor) throws IOException { - Configuration conf = getConf(); - setMetastoreConnectStr(conf.get(AUTO_STORAGE_CONNECT_STRING_KEY, - getHomeDirFileConnectStr())); - setMetastoreUser(conf.get(AUTO_STORAGE_USER_KEY, DEFAULT_AUTO_USER)); - setMetastorePassword(conf.get(AUTO_STORAGE_PASS_KEY, - DEFAULT_AUTO_PASSWORD)); - setConnectedDescriptor(descriptor); - - init(); - } } diff --git a/src/java/com/cloudera/sqoop/metastore/hsqldb/HsqldbJobStorage.java b/src/java/com/cloudera/sqoop/metastore/hsqldb/HsqldbJobStorage.java index b1728ed9..083e2a37 100644 --- a/src/java/com/cloudera/sqoop/metastore/hsqldb/HsqldbJobStorage.java +++ b/src/java/com/cloudera/sqoop/metastore/hsqldb/HsqldbJobStorage.java @@ -1,6 +1,4 @@ /** - * Copyright 2011 The Apache Software Foundation - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,793 +15,22 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - - package com.cloudera.sqoop.metastore.hsqldb; -import java.io.IOException; - -import java.sql.Connection; -import java.sql.DatabaseMetaData; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import org.apache.hadoop.conf.Configuration; - -import com.cloudera.sqoop.SqoopOptions; -import com.cloudera.sqoop.metastore.JobData; -import com.cloudera.sqoop.metastore.JobStorage; -import com.cloudera.sqoop.tool.SqoopTool; - /** - * JobStorage implementation that uses an HSQLDB-backed database to - * hold job information. + * @deprecated Moving to use org.apache.sqoop namespace. */ -public class HsqldbJobStorage extends JobStorage { +public class HsqldbJobStorage + extends org.apache.sqoop.metastore.hsqldb.HsqldbJobStorage { - public static final Log LOG = LogFactory.getLog( - HsqldbJobStorage.class.getName()); - - /** descriptor key identifying the connect string for the metastore. */ - public static final String META_CONNECT_KEY = "metastore.connect.string"; - - /** descriptor key identifying the username to use when connecting - * to the metastore. - */ - public static final String META_USERNAME_KEY = "metastore.username"; - - /** descriptor key identifying the password to use when connecting - * to the metastore. - */ - public static final String META_PASSWORD_KEY = "metastore.password"; - - - /** Default name for the root metadata table in HSQLDB. */ - private static final String DEFAULT_ROOT_TABLE_NAME = "SQOOP_ROOT"; - - /** Configuration key used to override root table name. */ + public static final String META_CONNECT_KEY = + org.apache.sqoop.metastore.hsqldb.HsqldbJobStorage.META_CONNECT_KEY; + public static final String META_USERNAME_KEY = + org.apache.sqoop.metastore.hsqldb.HsqldbJobStorage.META_USERNAME_KEY; + public static final String META_PASSWORD_KEY = + org.apache.sqoop.metastore.hsqldb.HsqldbJobStorage.META_PASSWORD_KEY; public static final String ROOT_TABLE_NAME_KEY = - "sqoop.hsqldb.root.table.name"; + org.apache.sqoop.metastore.hsqldb.HsqldbJobStorage.ROOT_TABLE_NAME_KEY; - /** root metadata table key used to define the current schema version. */ - private static final String STORAGE_VERSION_KEY = - "sqoop.hsqldb.job.storage.version"; - - /** The current version number for the schema edition. */ - private static final int CUR_STORAGE_VERSION = 0; - - /** root metadata table key used to define the job table name. */ - private static final String SESSION_TABLE_KEY = - "sqoop.hsqldb.job.info.table"; - - /** Default value for SESSION_TABLE_KEY. */ - private static final String DEFAULT_SESSION_TABLE_NAME = - "SQOOP_SESSIONS"; - - /** Per-job key with propClass 'schema' that defines the set of - * properties valid to be defined for propClass 'SqoopOptions'. */ - private static final String PROPERTY_SET_KEY = - "sqoop.property.set.id"; - - /** Current value for PROPERTY_SET_KEY. */ - private static final String CUR_PROPERTY_SET_ID = "0"; - - // The following are values for propClass in the v0 schema which - // describe different aspects of the stored metadata. - - /** Property class for properties about the stored data itself. */ - private static final String PROPERTY_CLASS_SCHEMA = "schema"; - - /** Property class for properties that are loaded into SqoopOptions. */ - private static final String PROPERTY_CLASS_SQOOP_OPTIONS = "SqoopOptions"; - - /** Property class for properties that are loaded into a Configuration. */ - private static final String PROPERTY_CLASS_CONFIG = "config"; - - /** - * Per-job key with propClass 'schema' that specifies the SqoopTool - * to load. - */ - private static final String SQOOP_TOOL_KEY = "sqoop.tool"; - - - private Map connectedDescriptor; - private String metastoreConnectStr; - private String metastoreUser; - private String metastorePassword; - private Connection connection; - - protected Connection getConnection() { - return this.connection; - } - - // After connection to the database and initialization of the - // schema, this holds the name of the job table. - private String jobTableName; - - protected void setMetastoreConnectStr(String connectStr) { - this.metastoreConnectStr = connectStr; - } - - protected void setMetastoreUser(String user) { - this.metastoreUser = user; - } - - protected void setMetastorePassword(String pass) { - this.metastorePassword = pass; - } - - private static final String DB_DRIVER_CLASS = "org.hsqldb.jdbcDriver"; - - /** - * Set the descriptor used to open() this storage. - */ - protected void setConnectedDescriptor(Map descriptor) { - this.connectedDescriptor = descriptor; - } - - @Override - /** - * Initialize the connection to the database. - */ - public void open(Map descriptor) throws IOException { - setMetastoreConnectStr(descriptor.get(META_CONNECT_KEY)); - setMetastoreUser(descriptor.get(META_USERNAME_KEY)); - setMetastorePassword(descriptor.get(META_PASSWORD_KEY)); - setConnectedDescriptor(descriptor); - - init(); - } - - protected void init() throws IOException { - try { - // Load/initialize the JDBC driver. - Class.forName(DB_DRIVER_CLASS); - } catch (ClassNotFoundException cnfe) { - throw new IOException("Could not load HSQLDB JDBC driver", cnfe); - } - - try { - if (null == metastoreUser) { - this.connection = DriverManager.getConnection(metastoreConnectStr); - } else { - this.connection = DriverManager.getConnection(metastoreConnectStr, - metastoreUser, metastorePassword); - } - - connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE); - connection.setAutoCommit(false); - - // Initialize the root schema. - if (!rootTableExists()) { - createRootTable(); - } - - // Check the schema version. - String curStorageVerStr = getRootProperty(STORAGE_VERSION_KEY, null); - int actualStorageVer = -1; - try { - actualStorageVer = Integer.valueOf(curStorageVerStr); - } catch (NumberFormatException nfe) { - LOG.warn("Could not interpret as a number: " + curStorageVerStr); - } - if (actualStorageVer != CUR_STORAGE_VERSION) { - LOG.error("Can not interpret metadata schema"); - LOG.error("The metadata schema version is " + curStorageVerStr); - LOG.error("The highest version supported is " + CUR_STORAGE_VERSION); - LOG.error("To use this version of Sqoop, " - + "you must downgrade your metadata schema."); - throw new IOException("Invalid metadata version."); - } - - // Initialize the versioned schema. - initV0Schema(); - } catch (SQLException sqle) { - if (null != connection) { - try { - connection.rollback(); - } catch (SQLException e2) { - LOG.warn("Error rolling back transaction in error handler: " + e2); - } - } - - throw new IOException("Exception creating SQL connection", sqle); - } - } - - @Override - public void close() throws IOException { - if (null != this.connection) { - try { - LOG.debug("Flushing current transaction"); - this.connection.commit(); - } catch (SQLException sqlE) { - throw new IOException("Exception committing connection", sqlE); - } - - try { - LOG.debug("Closing connection"); - this.connection.close(); - } catch (SQLException sqlE) { - throw new IOException("Exception closing connection", sqlE); - } finally { - this.connection = null; - } - } - } - - @Override - /** {@inheritDoc} */ - public boolean canAccept(Map descriptor) { - // We return true if the desciptor contains a connect string to find - // the database. - return descriptor.get(META_CONNECT_KEY) != null; - } - - @Override - /** {@inheritDoc} */ - public JobData read(String jobName) throws IOException { - try { - if (!jobExists(jobName)) { - LOG.error("Cannot restore job: " + jobName); - LOG.error("(No such job)"); - throw new IOException("Cannot restore missing job " + jobName); - } - - LOG.debug("Restoring job: " + jobName); - Properties schemaProps = getV0Properties(jobName, - PROPERTY_CLASS_SCHEMA); - Properties sqoopOptProps = getV0Properties(jobName, - PROPERTY_CLASS_SQOOP_OPTIONS); - Properties configProps = getV0Properties(jobName, - PROPERTY_CLASS_CONFIG); - - // Check that we're not using a saved job from a previous - // version whose functionality has been deprecated. - String thisPropSetId = schemaProps.getProperty(PROPERTY_SET_KEY); - LOG.debug("System property set: " + CUR_PROPERTY_SET_ID); - LOG.debug("Stored property set: " + thisPropSetId); - if (!CUR_PROPERTY_SET_ID.equals(thisPropSetId)) { - LOG.warn("The property set present in this database was written by"); - LOG.warn("an incompatible version of Sqoop. This may result in an"); - LOG.warn("incomplete operation."); - // TODO(aaron): Should this fail out-right? - } - - String toolName = schemaProps.getProperty(SQOOP_TOOL_KEY); - if (null == toolName) { - // Don't know what tool to create. - throw new IOException("Incomplete metadata; missing " - + SQOOP_TOOL_KEY); - } - - SqoopTool tool = SqoopTool.getTool(toolName); - if (null == tool) { - throw new IOException("Error in job metadata: invalid tool " - + toolName); - } - - Configuration conf = new Configuration(); - for (Map.Entry entry : configProps.entrySet()) { - conf.set(entry.getKey().toString(), entry.getValue().toString()); - } - - SqoopOptions opts = new SqoopOptions(); - opts.setConf(conf); - opts.loadProperties(sqoopOptProps); - - // Set the job connection information for this job. - opts.setJobName(jobName); - opts.setStorageDescriptor(connectedDescriptor); - - return new JobData(opts, tool); - } catch (SQLException sqlE) { - throw new IOException("Error communicating with database", sqlE); - } - } - - private boolean jobExists(String jobName) throws SQLException { - PreparedStatement s = connection.prepareStatement( - "SELECT COUNT(job_name) FROM " + this.jobTableName - + " WHERE job_name = ? GROUP BY job_name"); - ResultSet rs = null; - try { - s.setString(1, jobName); - rs = s.executeQuery(); - if (rs.next()) { - return true; // We got a result, meaning the job exists. - } - } finally { - if (null != rs) { - try { - rs.close(); - } catch (SQLException sqlE) { - LOG.warn("Error closing result set: " + sqlE); - } - } - - s.close(); - } - - return false; // No result. - } - - @Override - /** {@inheritDoc} */ - public void delete(String jobName) throws IOException { - try { - if (!jobExists(jobName)) { - LOG.error("No such job: " + jobName); - } else { - LOG.debug("Deleting job: " + jobName); - PreparedStatement s = connection.prepareStatement("DELETE FROM " - + this.jobTableName + " WHERE job_name = ?"); - try { - s.setString(1, jobName); - s.executeUpdate(); - } finally { - s.close(); - } - connection.commit(); - } - } catch (SQLException sqlEx) { - try { - connection.rollback(); - } catch (SQLException e2) { - LOG.warn("Error rolling back transaction in error handler: " + e2); - } - throw new IOException("Error communicating with database", sqlEx); - } - } - - @Override - /** {@inheritDoc} */ - public void create(String jobName, JobData data) - throws IOException { - try { - if (jobExists(jobName)) { - LOG.error("Cannot create job " + jobName - + ": it already exists"); - throw new IOException("Job " + jobName + " already exists"); - } - } catch (SQLException sqlE) { - throw new IOException("Error communicating with database", sqlE); - } - - createInternal(jobName, data); - } - - /** - * Actually insert/update the resources for this job. - */ - private void createInternal(String jobName, JobData data) - throws IOException { - try { - LOG.debug("Creating job: " + jobName); - - // Save the name of the Sqoop tool. - setV0Property(jobName, PROPERTY_CLASS_SCHEMA, SQOOP_TOOL_KEY, - data.getSqoopTool().getToolName()); - - // Save the property set id. - setV0Property(jobName, PROPERTY_CLASS_SCHEMA, PROPERTY_SET_KEY, - CUR_PROPERTY_SET_ID); - - // Save all properties of the SqoopOptions. - Properties props = data.getSqoopOptions().writeProperties(); - setV0Properties(jobName, PROPERTY_CLASS_SQOOP_OPTIONS, props); - - // And save all unique properties of the configuration. - Configuration saveConf = data.getSqoopOptions().getConf(); - Configuration baseConf = new Configuration(); - - for (Map.Entry entry : saveConf) { - String key = entry.getKey(); - String rawVal = saveConf.getRaw(key); - String baseVal = baseConf.getRaw(key); - if (baseVal != null && rawVal.equals(baseVal)) { - continue; // Don't save this; it's set in the base configuration. - } - - LOG.debug("Saving " + key + " => " + rawVal + " / " + baseVal); - setV0Property(jobName, PROPERTY_CLASS_CONFIG, key, rawVal); - } - - connection.commit(); - } catch (SQLException sqlE) { - try { - connection.rollback(); - } catch (SQLException sqlE2) { - LOG.warn("Exception rolling back transaction during error handling: " - + sqlE2); - } - throw new IOException("Error communicating with database", sqlE); - } - } - - @Override - /** {@inheritDoc} */ - public void update(String jobName, JobData data) - throws IOException { - try { - if (!jobExists(jobName)) { - LOG.error("Cannot update job " + jobName + ": not found"); - throw new IOException("Job " + jobName + " does not exist"); - } - } catch (SQLException sqlE) { - throw new IOException("Error communicating with database", sqlE); - } - - // Since we set properties with update-or-insert, this is the same - // as create on this system. - createInternal(jobName, data); - } - - @Override - /** {@inheritDoc} */ - public List list() throws IOException { - ResultSet rs = null; - try { - PreparedStatement s = connection.prepareStatement( - "SELECT DISTINCT job_name FROM " + this.jobTableName); - try { - rs = s.executeQuery(); - ArrayList jobs = new ArrayList(); - while (rs.next()) { - jobs.add(rs.getString(1)); - } - - return jobs; - } finally { - if (null != rs) { - try { - rs.close(); - } catch (SQLException sqlE) { - LOG.warn("Error closing resultset: " + sqlE); - } - } - - if (null != s) { - s.close(); - } - } - } catch (SQLException sqlE) { - throw new IOException("Error communicating with database", sqlE); - } - } - - // Determine the name to use for the root metadata table. - private String getRootTableName() { - Configuration conf = getConf(); - return conf.get(ROOT_TABLE_NAME_KEY, DEFAULT_ROOT_TABLE_NAME); - } - - private boolean tableExists(String table) throws SQLException { - LOG.debug("Checking for table: " + table); - DatabaseMetaData dbmd = connection.getMetaData(); - String [] tableTypes = { "TABLE" }; - ResultSet rs = dbmd.getTables(null, null, null, tableTypes); - if (null != rs) { - try { - while (rs.next()) { - if (table.equalsIgnoreCase(rs.getString("TABLE_NAME"))) { - LOG.debug("Found table: " + table); - return true; - } - } - } finally { - rs.close(); - } - } - - LOG.debug("Could not find table."); - return false; - } - - private boolean rootTableExists() throws SQLException { - String rootTableName = getRootTableName(); - return tableExists(rootTableName); - } - - private void createRootTable() throws SQLException { - String rootTableName = getRootTableName(); - LOG.debug("Creating root table: " + rootTableName); - - // TODO: Sanity-check the value of rootTableName to ensure it is - // not a SQL-injection attack vector. - Statement s = connection.createStatement(); - try { - s.executeUpdate("CREATE TABLE " + rootTableName + " (" - + "version INT, " - + "propname VARCHAR(128) NOT NULL, " - + "propval VARCHAR(256), " - + "CONSTRAINT " + rootTableName + "_unq UNIQUE (version, propname))"); - } finally { - s.close(); - } - - setRootProperty(STORAGE_VERSION_KEY, null, - Integer.toString(CUR_STORAGE_VERSION)); - - LOG.debug("Saving root table."); - connection.commit(); - } - - /** - * Look up a value for the specified version (may be null) in the - * root metadata table. - */ - private String getRootProperty(String propertyName, Integer version) - throws SQLException { - LOG.debug("Looking up property " + propertyName + " for version " - + version); - PreparedStatement s = null; - ResultSet rs = null; - - try { - if (null == version) { - s = connection.prepareStatement( - "SELECT propval FROM " + getRootTableName() - + " WHERE version IS NULL AND propname = ?"); - s.setString(1, propertyName); - } else { - s = connection.prepareStatement( - "SELECT propval FROM " + getRootTableName() + " WHERE version = ? " - + " AND propname = ?"); - s.setInt(1, version); - s.setString(2, propertyName); - } - - rs = s.executeQuery(); - if (!rs.next()) { - LOG.debug(" => (no result)"); - return null; // No such result. - } else { - String result = rs.getString(1); // Return the only result col. - LOG.debug(" => " + result); - return result; - } - } finally { - if (null != rs) { - try { - rs.close(); - } catch (SQLException sqlE) { - LOG.warn("Error closing resultset: " + sqlE); - } - } - - if (null != s) { - s.close(); - } - } - } - - /** - * Set a value for the specified version (may be null) in the root - * metadata table. - */ - private void setRootProperty(String propertyName, Integer version, - String val) throws SQLException { - LOG.debug("Setting property " + propertyName + " for version " - + version + " => " + val); - - PreparedStatement s; - String curVal = getRootProperty(propertyName, version); - if (null == curVal) { - // INSERT the row. - s = connection.prepareStatement("INSERT INTO " + getRootTableName() - + " (propval, propname, version) VALUES ( ? , ? , ? )"); - } else if (version == null) { - // UPDATE an existing row with a null version - s = connection.prepareStatement("UPDATE " + getRootTableName() - + " SET propval = ? WHERE propname = ? AND version IS NULL"); - } else { - // UPDATE an existing row with non-null version. - s = connection.prepareStatement("UPDATE " + getRootTableName() - + " SET propval = ? WHERE propname = ? AND version = ?"); - } - - try { - s.setString(1, val); - s.setString(2, propertyName); - if (null != version) { - s.setInt(3, version); - } - s.executeUpdate(); - } finally { - s.close(); - } - } - - /** - * Create the jobs table in the V0 schema. - */ - private void createJobTable() throws SQLException { - String curTableName = DEFAULT_SESSION_TABLE_NAME; - int tableNum = -1; - while (true) { - if (tableExists(curTableName)) { - tableNum++; - curTableName = DEFAULT_SESSION_TABLE_NAME + "_" + tableNum; - } else { - break; - } - } - - // curTableName contains a table name that does not exist. - // Create this table. - LOG.debug("Creating job storage table: " + curTableName); - Statement s = connection.createStatement(); - try { - s.executeUpdate("CREATE TABLE " + curTableName + " (" - + "job_name VARCHAR(64) NOT NULL, " - + "propname VARCHAR(128) NOT NULL, " - + "propval VARCHAR(1024), " - + "propclass VARCHAR(32) NOT NULL, " - + "CONSTRAINT " + curTableName + "_unq UNIQUE " - + "(job_name, propname, propclass))"); - - // Then set a property in the root table pointing to it. - setRootProperty(SESSION_TABLE_KEY, 0, curTableName); - connection.commit(); - } finally { - s.close(); - } - - this.jobTableName = curTableName; - } - - /** - * Given a root schema that exists, - * initialize a version-0 key/value storage schema on top of it, - * if it does not already exist. - */ - private void initV0Schema() throws SQLException { - this.jobTableName = getRootProperty(SESSION_TABLE_KEY, 0); - if (null == this.jobTableName) { - createJobTable(); - } - if (!tableExists(this.jobTableName)) { - LOG.debug("Could not find job table: " + jobTableName); - createJobTable(); - } - } - - /** - * INSERT or UPDATE a single (job, propname, class) to point - * to the specified property value. - */ - private void setV0Property(String jobName, String propClass, - String propName, String propVal) throws SQLException { - LOG.debug("Job: " + jobName + "; Setting property " - + propName + " with class " + propClass + " => " + propVal); - - PreparedStatement s = null; - try { - String curValue = getV0Property(jobName, propClass, propName); - if (null == curValue) { - // Property is not yet set. - s = connection.prepareStatement("INSERT INTO " + this.jobTableName - + " (propval, job_name, propclass, propname) " - + "VALUES (?, ?, ?, ?)"); - } else { - // Overwrite existing property. - s = connection.prepareStatement("UPDATE " + this.jobTableName - + " SET propval = ? WHERE job_name = ? AND propclass = ? " - + "AND propname = ?"); - } - - s.setString(1, propVal); - s.setString(2, jobName); - s.setString(3, propClass); - s.setString(4, propName); - - s.executeUpdate(); - } finally { - if (null != s) { - s.close(); - } - } - } - - /** - * Return a string containing the value of a specified property, - * or null if it is not set. - */ - private String getV0Property(String jobName, String propClass, - String propertyName) throws SQLException { - LOG.debug("Job: " + jobName + "; Getting property " - + propertyName + " with class " + propClass); - - ResultSet rs = null; - PreparedStatement s = connection.prepareStatement( - "SELECT propval FROM " + this.jobTableName - + " WHERE job_name = ? AND propclass = ? AND propname = ?"); - - try { - s.setString(1, jobName); - s.setString(2, propClass); - s.setString(3, propertyName); - rs = s.executeQuery(); - - if (!rs.next()) { - LOG.debug(" => (no result)"); - return null; - } - - String result = rs.getString(1); - LOG.debug(" => " + result); - return result; - } finally { - if (null != rs) { - try { - rs.close(); - } catch (SQLException sqlE) { - LOG.warn("Error closing resultset: " + sqlE); - } - } - - s.close(); - } - } - - /** - * Get a java.util.Properties containing all propName -> propVal - * bindings for a given (jobName, propClass). - */ - private Properties getV0Properties(String jobName, String propClass) - throws SQLException { - LOG.debug("Job: " + jobName - + "; Getting properties with class " + propClass); - - ResultSet rs = null; - PreparedStatement s = connection.prepareStatement( - "SELECT propname, propval FROM " + this.jobTableName - + " WHERE job_name = ? AND propclass = ?"); - try { - s.setString(1, jobName); - s.setString(2, propClass); - rs = s.executeQuery(); - - Properties p = new Properties(); - while (rs.next()) { - p.setProperty(rs.getString(1), rs.getString(2)); - } - - return p; - } finally { - if (null != rs) { - try { - rs.close(); - } catch (SQLException sqlE) { - LOG.warn("Error closing result set: " + sqlE); - } - } - - s.close(); - } - } - - private void setV0Properties(String jobName, String propClass, - Properties properties) throws SQLException { - LOG.debug("Job: " + jobName - + "; Setting bulk properties for class " + propClass); - - for (Map.Entry entry : properties.entrySet()) { - String key = entry.getKey().toString(); - String val = entry.getValue().toString(); - setV0Property(jobName, propClass, key, val); - } - } } diff --git a/src/java/com/cloudera/sqoop/metastore/hsqldb/HsqldbMetaStore.java b/src/java/com/cloudera/sqoop/metastore/hsqldb/HsqldbMetaStore.java index cb26bb49..945c11a5 100644 --- a/src/java/com/cloudera/sqoop/metastore/hsqldb/HsqldbMetaStore.java +++ b/src/java/com/cloudera/sqoop/metastore/hsqldb/HsqldbMetaStore.java @@ -1,6 +1,4 @@ /** - * Copyright 2011 The Apache Software Foundation - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -21,164 +19,24 @@ package com.cloudera.sqoop.metastore.hsqldb; -import java.io.File; - -import java.sql.Connection; -import java.sql.SQLException; -import java.sql.Statement; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.util.StringUtils; - -import org.hsqldb.Server; -import org.hsqldb.ServerConstants; - -import com.cloudera.sqoop.SqoopOptions; - -import com.cloudera.sqoop.manager.HsqldbManager; - /** - * Container for an HSQLDB-backed metastore. + * @deprecated Moving to use org.apache.sqoop namespace. */ -public class HsqldbMetaStore { +public class HsqldbMetaStore + extends org.apache.sqoop.metastore.hsqldb.HsqldbMetaStore { - public static final Log LOG = LogFactory.getLog( - HsqldbMetaStore.class.getName()); - - /** Where on the local fs does the metastore put files? */ public static final String META_STORAGE_LOCATION_KEY = - "sqoop.metastore.server.location"; - - /** - * What port does the metastore listen on? - */ + org.apache.sqoop.metastore.hsqldb.HsqldbMetaStore.META_STORAGE_LOCATION_KEY; public static final String META_SERVER_PORT_KEY = - "sqoop.metastore.server.port"; - - /** Default to this port if unset. */ - public static final int DEFAULT_PORT = 16000; - - private int port; - private String fileLocation; - private Server server; - private Configuration conf; + org.apache.sqoop.metastore.hsqldb.HsqldbMetaStore.META_SERVER_PORT_KEY; + public static final int DEFAULT_PORT = + org.apache.sqoop.metastore.hsqldb.HsqldbMetaStore.DEFAULT_PORT; public HsqldbMetaStore(Configuration config) { - this.conf = config; - init(); + super(config); } - /** - * Determine the user's home directory and return a file path - * under this root where the shared metastore can be placed. - */ - private String getHomeDirFilePath() { - String homeDir = System.getProperty("user.home"); - - File homeDirObj = new File(homeDir); - File sqoopDataDirObj = new File(homeDirObj, ".sqoop"); - File databaseFileObj = new File(sqoopDataDirObj, "shared-metastore.db"); - - return databaseFileObj.toString(); - } - - private void init() { - if (null != server) { - LOG.debug("init(): server already exists."); - return; - } - - fileLocation = conf.get(META_STORAGE_LOCATION_KEY, null); - if (null == fileLocation) { - fileLocation = getHomeDirFilePath(); - LOG.warn("The location for metastore data has not been explicitly set. " - + "Placing shared metastore files in " + fileLocation); - } - - this.port = conf.getInt(META_SERVER_PORT_KEY, DEFAULT_PORT); - } - - - public void start() { - try { - if (server != null) { - server.checkRunning(false); - } - } catch (RuntimeException re) { - LOG.info("Server is already started."); - return; - } - - server = new Server(); - server.setDatabasePath(0, "file:" + fileLocation); - server.setDatabaseName(0, "sqoop"); - server.putPropertiesFromString("hsqldb.write_delay=false"); - server.setPort(port); - server.setSilent(true); - server.setNoSystemExit(true); - - server.start(); - LOG.info("Server started on port " + port + " with protocol " - + server.getProtocol()); - } - - /** - * Blocks the current thread until the server is shut down. - */ - public void waitForServer() { - while (true) { - int curState = server.getState(); - if (curState == ServerConstants.SERVER_STATE_SHUTDOWN) { - LOG.info("Got shutdown notification"); - break; - } - - try { - Thread.sleep(100); - } catch (InterruptedException ie) { - LOG.info("Interrupted while blocking for server:" - + StringUtils.stringifyException(ie)); - } - } - } - - /** - * Connects to the server and instructs it to shutdown. - */ - public void shutdown() { - // Send the SHUTDOWN command to the server via SQL. - SqoopOptions options = new SqoopOptions(conf); - options.setConnectString("jdbc:hsqldb:hsql://localhost:" - + port + "/sqoop"); - options.setUsername("SA"); - options.setPassword(""); - HsqldbManager manager = new HsqldbManager(options); - Statement s = null; - try { - Connection c = manager.getConnection(); - s = c.createStatement(); - s.execute("SHUTDOWN"); - } catch (SQLException sqlE) { - LOG.warn("Exception shutting down database: " - + StringUtils.stringifyException(sqlE)); - } finally { - if (null != s) { - try { - s.close(); - } catch (SQLException sqlE) { - LOG.warn("Error closing statement: " + sqlE); - } - } - - try { - manager.close(); - } catch (SQLException sqlE) { - LOG.warn("Error closing manager: " + sqlE); - } - } - } } diff --git a/src/java/org/apache/sqoop/metastore/JobData.java b/src/java/org/apache/sqoop/metastore/JobData.java new file mode 100644 index 00000000..f3d56772 --- /dev/null +++ b/src/java/org/apache/sqoop/metastore/JobData.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.metastore; + +import com.cloudera.sqoop.SqoopOptions; +import com.cloudera.sqoop.tool.SqoopTool; + +/** + * Container for all job data that should be stored to a + * permanent resource. + */ +public class JobData { + private SqoopOptions opts; + private SqoopTool tool; + + public JobData() { + } + + public JobData(SqoopOptions options, SqoopTool sqoopTool) { + this.opts = options; + this.tool = sqoopTool; + } + + /** + * Gets the SqoopOptions. + */ + public SqoopOptions getSqoopOptions() { + return this.opts; + } + + /** + * Gets the SqoopTool. + */ + public SqoopTool getSqoopTool() { + return this.tool; + } + + /** + * Sets the SqoopOptions. + */ + public void setSqoopOptions(SqoopOptions options) { + this.opts = options; + } + + /** + * Sets the SqoopTool. + */ + public void setSqoopTool(SqoopTool sqoopTool) { + this.tool = sqoopTool; + } + +} + diff --git a/src/java/org/apache/sqoop/metastore/JobStorage.java b/src/java/org/apache/sqoop/metastore/JobStorage.java new file mode 100644 index 00000000..e66ba5c4 --- /dev/null +++ b/src/java/org/apache/sqoop/metastore/JobStorage.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.metastore; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configured; +import com.cloudera.sqoop.metastore.JobData; + +/** + * API that defines how jobs are saved, restored, and manipulated. + * + *

+ * JobStorage instances may be created and then not used; the + * JobStorage factory may create additional JobStorage instances + * that return false from accept() and then discard them. The close() + * method will only be triggered for a JobStorage if the connect() + * method is called. Connection should not be triggered by a call to + * accept().

+ */ +public abstract class JobStorage extends Configured implements Closeable { + + /** + * Returns true if the JobStorage system can use the metadata in + * the descriptor to connect to an underlying storage resource. + */ + public abstract boolean canAccept(Map descriptor); + + + /** + * Opens / connects to the underlying storage resource specified by the + * descriptor. + */ + public abstract void open(Map descriptor) + throws IOException; + + /** + * Given a job name, reconstitute a JobData that contains all + * configuration information required for the job. Returns null if the + * job name does not match an available job. + */ + public abstract JobData read(String jobName) + throws IOException; + + /** + * Forget about a saved job. + */ + public abstract void delete(String jobName) throws IOException; + + /** + * Given a job name and the data describing a configured job, record the job + * information to the storage medium. + */ + public abstract void create(String jobName, JobData data) + throws IOException; + + /** + * Given a job name and configured job data, update the underlying resource + * to match the current job configuration. + */ + public abstract void update(String jobName, JobData data) + throws IOException; + + /** + * Close any resources opened by the JobStorage system. + */ + public void close() throws IOException { + } + + /** + * Enumerate all jobs held in the connected resource. + */ + public abstract List list() throws IOException; +} + diff --git a/src/java/org/apache/sqoop/metastore/JobStorageFactory.java b/src/java/org/apache/sqoop/metastore/JobStorageFactory.java new file mode 100644 index 00000000..7fac39d2 --- /dev/null +++ b/src/java/org/apache/sqoop/metastore/JobStorageFactory.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.metastore; + +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import com.cloudera.sqoop.metastore.JobStorage; + +/** + * Factory that produces the correct JobStorage system to work with + * a particular job descriptor. + */ +public class JobStorageFactory { + + private Configuration conf; + + /** + * Configuration key describing the list of JobStorage implementations + * to use to handle jobs. + */ + public static final String AVAILABLE_STORAGES_KEY = + "sqoop.job.storage.implementations"; + + /** The default list of available JobStorage implementations. */ + private static final String DEFAULT_AVAILABLE_STORAGES = + "com.cloudera.sqoop.metastore.hsqldb.HsqldbJobStorage," + + "com.cloudera.sqoop.metastore.hsqldb.AutoHsqldbStorage"; + + public JobStorageFactory(Configuration config) { + this.conf = config; + + // Ensure that we always have an available storages list. + if (this.conf.get(AVAILABLE_STORAGES_KEY) == null) { + this.conf.set(AVAILABLE_STORAGES_KEY, DEFAULT_AVAILABLE_STORAGES); + } + } + + /** + * Given a storage descriptor, determine the correct JobStorage + * implementation to use to connect to the storage resource and return an + * instance of it -- or null if no JobStorage instance is appropriate. + */ + public JobStorage getJobStorage(Map descriptor) { + List storages = this.conf.getInstances( + AVAILABLE_STORAGES_KEY, JobStorage.class); + for (JobStorage stor : storages) { + if (stor.canAccept(descriptor)) { + return stor; + } + } + + return null; + } +} + diff --git a/src/java/org/apache/sqoop/metastore/hsqldb/AutoHsqldbStorage.java b/src/java/org/apache/sqoop/metastore/hsqldb/AutoHsqldbStorage.java new file mode 100644 index 00000000..19fd9f75 --- /dev/null +++ b/src/java/org/apache/sqoop/metastore/hsqldb/AutoHsqldbStorage.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.metastore.hsqldb; + +import java.io.File; +import java.io.IOException; + +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.conf.Configuration; + +/** + * JobStorage implementation that auto-configures an HSQLDB + * local-file-based instance to hold jobs. + */ +public class AutoHsqldbStorage + extends com.cloudera.sqoop.metastore.hsqldb.HsqldbJobStorage { + + public static final Log LOG = LogFactory.getLog( + AutoHsqldbStorage.class.getName()); + + /** + * Configuration key specifying whether this storage agent is active. + * Defaults to "on" to allow zero-conf local users. + */ + public static final String AUTO_STORAGE_IS_ACTIVE_KEY = + "sqoop.metastore.client.enable.autoconnect"; + + /** + * Configuration key specifying the connect string used by this + * storage agent. + */ + public static final String AUTO_STORAGE_CONNECT_STRING_KEY = + "sqoop.metastore.client.autoconnect.url"; + + /** + * Configuration key specifying the username to bind with. + */ + public static final String AUTO_STORAGE_USER_KEY = + "sqoop.metastore.client.autoconnect.username"; + + + /** HSQLDB default user is named 'SA'. */ + private static final String DEFAULT_AUTO_USER = "SA"; + + /** + * Configuration key specifying the password to bind with. + */ + public static final String AUTO_STORAGE_PASS_KEY = + "sqoop.metastore.client.autoconnect.password"; + + /** HSQLDB default user has an empty password. */ + public static final String DEFAULT_AUTO_PASSWORD = ""; + + @Override + /** {@inheritDoc} */ + public boolean canAccept(Map descriptor) { + Configuration conf = this.getConf(); + return conf.getBoolean(AUTO_STORAGE_IS_ACTIVE_KEY, true); + } + + /** + * Determine the user's home directory and return a connect + * string to HSQLDB that uses ~/.sqoop/ as the storage location + * for the metastore database. + */ + private String getHomeDirFileConnectStr() { + String homeDir = System.getProperty("user.home"); + + File homeDirObj = new File(homeDir); + File sqoopDataDirObj = new File(homeDirObj, ".sqoop"); + File databaseFileObj = new File(sqoopDataDirObj, "metastore.db"); + + String dbFileStr = databaseFileObj.toString(); + return "jdbc:hsqldb:file:" + dbFileStr + + ";hsqldb.write_delay=false;shutdown=true"; + } + + @Override + /** + * Set the connection information to use the auto-inferred connection + * string. + */ + public void open(Map descriptor) throws IOException { + Configuration conf = getConf(); + setMetastoreConnectStr(conf.get(AUTO_STORAGE_CONNECT_STRING_KEY, + getHomeDirFileConnectStr())); + setMetastoreUser(conf.get(AUTO_STORAGE_USER_KEY, DEFAULT_AUTO_USER)); + setMetastorePassword(conf.get(AUTO_STORAGE_PASS_KEY, + DEFAULT_AUTO_PASSWORD)); + setConnectedDescriptor(descriptor); + + init(); + } +} + diff --git a/src/java/org/apache/sqoop/metastore/hsqldb/HsqldbJobStorage.java b/src/java/org/apache/sqoop/metastore/hsqldb/HsqldbJobStorage.java new file mode 100644 index 00000000..a0f29fd0 --- /dev/null +++ b/src/java/org/apache/sqoop/metastore/hsqldb/HsqldbJobStorage.java @@ -0,0 +1,805 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.metastore.hsqldb; + +import java.io.IOException; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.conf.Configuration; + +import com.cloudera.sqoop.SqoopOptions; +import com.cloudera.sqoop.metastore.JobData; +import com.cloudera.sqoop.metastore.JobStorage; +import com.cloudera.sqoop.tool.SqoopTool; + +/** + * JobStorage implementation that uses an HSQLDB-backed database to + * hold job information. + */ +public class HsqldbJobStorage extends JobStorage { + + public static final Log LOG = LogFactory.getLog( + HsqldbJobStorage.class.getName()); + + /** descriptor key identifying the connect string for the metastore. */ + public static final String META_CONNECT_KEY = "metastore.connect.string"; + + /** descriptor key identifying the username to use when connecting + * to the metastore. + */ + public static final String META_USERNAME_KEY = "metastore.username"; + + /** descriptor key identifying the password to use when connecting + * to the metastore. + */ + public static final String META_PASSWORD_KEY = "metastore.password"; + + + /** Default name for the root metadata table in HSQLDB. */ + private static final String DEFAULT_ROOT_TABLE_NAME = "SQOOP_ROOT"; + + /** Configuration key used to override root table name. */ + public static final String ROOT_TABLE_NAME_KEY = + "sqoop.hsqldb.root.table.name"; + + /** root metadata table key used to define the current schema version. */ + private static final String STORAGE_VERSION_KEY = + "sqoop.hsqldb.job.storage.version"; + + /** The current version number for the schema edition. */ + private static final int CUR_STORAGE_VERSION = 0; + + /** root metadata table key used to define the job table name. */ + private static final String SESSION_TABLE_KEY = + "sqoop.hsqldb.job.info.table"; + + /** Default value for SESSION_TABLE_KEY. */ + private static final String DEFAULT_SESSION_TABLE_NAME = + "SQOOP_SESSIONS"; + + /** Per-job key with propClass 'schema' that defines the set of + * properties valid to be defined for propClass 'SqoopOptions'. */ + private static final String PROPERTY_SET_KEY = + "sqoop.property.set.id"; + + /** Current value for PROPERTY_SET_KEY. */ + private static final String CUR_PROPERTY_SET_ID = "0"; + + // The following are values for propClass in the v0 schema which + // describe different aspects of the stored metadata. + + /** Property class for properties about the stored data itself. */ + private static final String PROPERTY_CLASS_SCHEMA = "schema"; + + /** Property class for properties that are loaded into SqoopOptions. */ + private static final String PROPERTY_CLASS_SQOOP_OPTIONS = "SqoopOptions"; + + /** Property class for properties that are loaded into a Configuration. */ + private static final String PROPERTY_CLASS_CONFIG = "config"; + + /** + * Per-job key with propClass 'schema' that specifies the SqoopTool + * to load. + */ + private static final String SQOOP_TOOL_KEY = "sqoop.tool"; + + + private Map connectedDescriptor; + private String metastoreConnectStr; + private String metastoreUser; + private String metastorePassword; + private Connection connection; + + protected Connection getConnection() { + return this.connection; + } + + // After connection to the database and initialization of the + // schema, this holds the name of the job table. + private String jobTableName; + + protected void setMetastoreConnectStr(String connectStr) { + this.metastoreConnectStr = connectStr; + } + + protected void setMetastoreUser(String user) { + this.metastoreUser = user; + } + + protected void setMetastorePassword(String pass) { + this.metastorePassword = pass; + } + + private static final String DB_DRIVER_CLASS = "org.hsqldb.jdbcDriver"; + + /** + * Set the descriptor used to open() this storage. + */ + protected void setConnectedDescriptor(Map descriptor) { + this.connectedDescriptor = descriptor; + } + + @Override + /** + * Initialize the connection to the database. + */ + public void open(Map descriptor) throws IOException { + setMetastoreConnectStr(descriptor.get(META_CONNECT_KEY)); + setMetastoreUser(descriptor.get(META_USERNAME_KEY)); + setMetastorePassword(descriptor.get(META_PASSWORD_KEY)); + setConnectedDescriptor(descriptor); + + init(); + } + + protected void init() throws IOException { + try { + // Load/initialize the JDBC driver. + Class.forName(DB_DRIVER_CLASS); + } catch (ClassNotFoundException cnfe) { + throw new IOException("Could not load HSQLDB JDBC driver", cnfe); + } + + try { + if (null == metastoreUser) { + this.connection = DriverManager.getConnection(metastoreConnectStr); + } else { + this.connection = DriverManager.getConnection(metastoreConnectStr, + metastoreUser, metastorePassword); + } + + connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE); + connection.setAutoCommit(false); + + // Initialize the root schema. + if (!rootTableExists()) { + createRootTable(); + } + + // Check the schema version. + String curStorageVerStr = getRootProperty(STORAGE_VERSION_KEY, null); + int actualStorageVer = -1; + try { + actualStorageVer = Integer.valueOf(curStorageVerStr); + } catch (NumberFormatException nfe) { + LOG.warn("Could not interpret as a number: " + curStorageVerStr); + } + if (actualStorageVer != CUR_STORAGE_VERSION) { + LOG.error("Can not interpret metadata schema"); + LOG.error("The metadata schema version is " + curStorageVerStr); + LOG.error("The highest version supported is " + CUR_STORAGE_VERSION); + LOG.error("To use this version of Sqoop, " + + "you must downgrade your metadata schema."); + throw new IOException("Invalid metadata version."); + } + + // Initialize the versioned schema. + initV0Schema(); + } catch (SQLException sqle) { + if (null != connection) { + try { + connection.rollback(); + } catch (SQLException e2) { + LOG.warn("Error rolling back transaction in error handler: " + e2); + } + } + + throw new IOException("Exception creating SQL connection", sqle); + } + } + + @Override + public void close() throws IOException { + if (null != this.connection) { + try { + LOG.debug("Flushing current transaction"); + this.connection.commit(); + } catch (SQLException sqlE) { + throw new IOException("Exception committing connection", sqlE); + } + + try { + LOG.debug("Closing connection"); + this.connection.close(); + } catch (SQLException sqlE) { + throw new IOException("Exception closing connection", sqlE); + } finally { + this.connection = null; + } + } + } + + @Override + /** {@inheritDoc} */ + public boolean canAccept(Map descriptor) { + // We return true if the desciptor contains a connect string to find + // the database. + return descriptor.get(META_CONNECT_KEY) != null; + } + + @Override + /** {@inheritDoc} */ + public JobData read(String jobName) throws IOException { + try { + if (!jobExists(jobName)) { + LOG.error("Cannot restore job: " + jobName); + LOG.error("(No such job)"); + throw new IOException("Cannot restore missing job " + jobName); + } + + LOG.debug("Restoring job: " + jobName); + Properties schemaProps = getV0Properties(jobName, + PROPERTY_CLASS_SCHEMA); + Properties sqoopOptProps = getV0Properties(jobName, + PROPERTY_CLASS_SQOOP_OPTIONS); + Properties configProps = getV0Properties(jobName, + PROPERTY_CLASS_CONFIG); + + // Check that we're not using a saved job from a previous + // version whose functionality has been deprecated. + String thisPropSetId = schemaProps.getProperty(PROPERTY_SET_KEY); + LOG.debug("System property set: " + CUR_PROPERTY_SET_ID); + LOG.debug("Stored property set: " + thisPropSetId); + if (!CUR_PROPERTY_SET_ID.equals(thisPropSetId)) { + LOG.warn("The property set present in this database was written by"); + LOG.warn("an incompatible version of Sqoop. This may result in an"); + LOG.warn("incomplete operation."); + // TODO(aaron): Should this fail out-right? + } + + String toolName = schemaProps.getProperty(SQOOP_TOOL_KEY); + if (null == toolName) { + // Don't know what tool to create. + throw new IOException("Incomplete metadata; missing " + + SQOOP_TOOL_KEY); + } + + SqoopTool tool = SqoopTool.getTool(toolName); + if (null == tool) { + throw new IOException("Error in job metadata: invalid tool " + + toolName); + } + + Configuration conf = new Configuration(); + for (Map.Entry entry : configProps.entrySet()) { + conf.set(entry.getKey().toString(), entry.getValue().toString()); + } + + SqoopOptions opts = new SqoopOptions(); + opts.setConf(conf); + opts.loadProperties(sqoopOptProps); + + // Set the job connection information for this job. + opts.setJobName(jobName); + opts.setStorageDescriptor(connectedDescriptor); + + return new JobData(opts, tool); + } catch (SQLException sqlE) { + throw new IOException("Error communicating with database", sqlE); + } + } + + private boolean jobExists(String jobName) throws SQLException { + PreparedStatement s = connection.prepareStatement( + "SELECT COUNT(job_name) FROM " + this.jobTableName + + " WHERE job_name = ? GROUP BY job_name"); + ResultSet rs = null; + try { + s.setString(1, jobName); + rs = s.executeQuery(); + if (rs.next()) { + return true; // We got a result, meaning the job exists. + } + } finally { + if (null != rs) { + try { + rs.close(); + } catch (SQLException sqlE) { + LOG.warn("Error closing result set: " + sqlE); + } + } + + s.close(); + } + + return false; // No result. + } + + @Override + /** {@inheritDoc} */ + public void delete(String jobName) throws IOException { + try { + if (!jobExists(jobName)) { + LOG.error("No such job: " + jobName); + } else { + LOG.debug("Deleting job: " + jobName); + PreparedStatement s = connection.prepareStatement("DELETE FROM " + + this.jobTableName + " WHERE job_name = ?"); + try { + s.setString(1, jobName); + s.executeUpdate(); + } finally { + s.close(); + } + connection.commit(); + } + } catch (SQLException sqlEx) { + try { + connection.rollback(); + } catch (SQLException e2) { + LOG.warn("Error rolling back transaction in error handler: " + e2); + } + throw new IOException("Error communicating with database", sqlEx); + } + } + + @Override + /** {@inheritDoc} */ + public void create(String jobName, JobData data) + throws IOException { + try { + if (jobExists(jobName)) { + LOG.error("Cannot create job " + jobName + + ": it already exists"); + throw new IOException("Job " + jobName + " already exists"); + } + } catch (SQLException sqlE) { + throw new IOException("Error communicating with database", sqlE); + } + + createInternal(jobName, data); + } + + /** + * Actually insert/update the resources for this job. + */ + private void createInternal(String jobName, JobData data) + throws IOException { + try { + LOG.debug("Creating job: " + jobName); + + // Save the name of the Sqoop tool. + setV0Property(jobName, PROPERTY_CLASS_SCHEMA, SQOOP_TOOL_KEY, + data.getSqoopTool().getToolName()); + + // Save the property set id. + setV0Property(jobName, PROPERTY_CLASS_SCHEMA, PROPERTY_SET_KEY, + CUR_PROPERTY_SET_ID); + + // Save all properties of the SqoopOptions. + Properties props = data.getSqoopOptions().writeProperties(); + setV0Properties(jobName, PROPERTY_CLASS_SQOOP_OPTIONS, props); + + // And save all unique properties of the configuration. + Configuration saveConf = data.getSqoopOptions().getConf(); + Configuration baseConf = new Configuration(); + + for (Map.Entry entry : saveConf) { + String key = entry.getKey(); + String rawVal = saveConf.getRaw(key); + String baseVal = baseConf.getRaw(key); + if (baseVal != null && rawVal.equals(baseVal)) { + continue; // Don't save this; it's set in the base configuration. + } + + LOG.debug("Saving " + key + " => " + rawVal + " / " + baseVal); + setV0Property(jobName, PROPERTY_CLASS_CONFIG, key, rawVal); + } + + connection.commit(); + } catch (SQLException sqlE) { + try { + connection.rollback(); + } catch (SQLException sqlE2) { + LOG.warn("Exception rolling back transaction during error handling: " + + sqlE2); + } + throw new IOException("Error communicating with database", sqlE); + } + } + + @Override + /** {@inheritDoc} */ + public void update(String jobName, JobData data) + throws IOException { + try { + if (!jobExists(jobName)) { + LOG.error("Cannot update job " + jobName + ": not found"); + throw new IOException("Job " + jobName + " does not exist"); + } + } catch (SQLException sqlE) { + throw new IOException("Error communicating with database", sqlE); + } + + // Since we set properties with update-or-insert, this is the same + // as create on this system. + createInternal(jobName, data); + } + + @Override + /** {@inheritDoc} */ + public List list() throws IOException { + ResultSet rs = null; + try { + PreparedStatement s = connection.prepareStatement( + "SELECT DISTINCT job_name FROM " + this.jobTableName); + try { + rs = s.executeQuery(); + ArrayList jobs = new ArrayList(); + while (rs.next()) { + jobs.add(rs.getString(1)); + } + + return jobs; + } finally { + if (null != rs) { + try { + rs.close(); + } catch (SQLException sqlE) { + LOG.warn("Error closing resultset: " + sqlE); + } + } + + if (null != s) { + s.close(); + } + } + } catch (SQLException sqlE) { + throw new IOException("Error communicating with database", sqlE); + } + } + + // Determine the name to use for the root metadata table. + private String getRootTableName() { + Configuration conf = getConf(); + return conf.get(ROOT_TABLE_NAME_KEY, DEFAULT_ROOT_TABLE_NAME); + } + + private boolean tableExists(String table) throws SQLException { + LOG.debug("Checking for table: " + table); + DatabaseMetaData dbmd = connection.getMetaData(); + String [] tableTypes = { "TABLE" }; + ResultSet rs = dbmd.getTables(null, null, null, tableTypes); + if (null != rs) { + try { + while (rs.next()) { + if (table.equalsIgnoreCase(rs.getString("TABLE_NAME"))) { + LOG.debug("Found table: " + table); + return true; + } + } + } finally { + rs.close(); + } + } + + LOG.debug("Could not find table."); + return false; + } + + private boolean rootTableExists() throws SQLException { + String rootTableName = getRootTableName(); + return tableExists(rootTableName); + } + + private void createRootTable() throws SQLException { + String rootTableName = getRootTableName(); + LOG.debug("Creating root table: " + rootTableName); + + // TODO: Sanity-check the value of rootTableName to ensure it is + // not a SQL-injection attack vector. + Statement s = connection.createStatement(); + try { + s.executeUpdate("CREATE TABLE " + rootTableName + " (" + + "version INT, " + + "propname VARCHAR(128) NOT NULL, " + + "propval VARCHAR(256), " + + "CONSTRAINT " + rootTableName + "_unq UNIQUE (version, propname))"); + } finally { + s.close(); + } + + setRootProperty(STORAGE_VERSION_KEY, null, + Integer.toString(CUR_STORAGE_VERSION)); + + LOG.debug("Saving root table."); + connection.commit(); + } + + /** + * Look up a value for the specified version (may be null) in the + * root metadata table. + */ + private String getRootProperty(String propertyName, Integer version) + throws SQLException { + LOG.debug("Looking up property " + propertyName + " for version " + + version); + PreparedStatement s = null; + ResultSet rs = null; + + try { + if (null == version) { + s = connection.prepareStatement( + "SELECT propval FROM " + getRootTableName() + + " WHERE version IS NULL AND propname = ?"); + s.setString(1, propertyName); + } else { + s = connection.prepareStatement( + "SELECT propval FROM " + getRootTableName() + " WHERE version = ? " + + " AND propname = ?"); + s.setInt(1, version); + s.setString(2, propertyName); + } + + rs = s.executeQuery(); + if (!rs.next()) { + LOG.debug(" => (no result)"); + return null; // No such result. + } else { + String result = rs.getString(1); // Return the only result col. + LOG.debug(" => " + result); + return result; + } + } finally { + if (null != rs) { + try { + rs.close(); + } catch (SQLException sqlE) { + LOG.warn("Error closing resultset: " + sqlE); + } + } + + if (null != s) { + s.close(); + } + } + } + + /** + * Set a value for the specified version (may be null) in the root + * metadata table. + */ + private void setRootProperty(String propertyName, Integer version, + String val) throws SQLException { + LOG.debug("Setting property " + propertyName + " for version " + + version + " => " + val); + + PreparedStatement s; + String curVal = getRootProperty(propertyName, version); + if (null == curVal) { + // INSERT the row. + s = connection.prepareStatement("INSERT INTO " + getRootTableName() + + " (propval, propname, version) VALUES ( ? , ? , ? )"); + } else if (version == null) { + // UPDATE an existing row with a null version + s = connection.prepareStatement("UPDATE " + getRootTableName() + + " SET propval = ? WHERE propname = ? AND version IS NULL"); + } else { + // UPDATE an existing row with non-null version. + s = connection.prepareStatement("UPDATE " + getRootTableName() + + " SET propval = ? WHERE propname = ? AND version = ?"); + } + + try { + s.setString(1, val); + s.setString(2, propertyName); + if (null != version) { + s.setInt(3, version); + } + s.executeUpdate(); + } finally { + s.close(); + } + } + + /** + * Create the jobs table in the V0 schema. + */ + private void createJobTable() throws SQLException { + String curTableName = DEFAULT_SESSION_TABLE_NAME; + int tableNum = -1; + while (true) { + if (tableExists(curTableName)) { + tableNum++; + curTableName = DEFAULT_SESSION_TABLE_NAME + "_" + tableNum; + } else { + break; + } + } + + // curTableName contains a table name that does not exist. + // Create this table. + LOG.debug("Creating job storage table: " + curTableName); + Statement s = connection.createStatement(); + try { + s.executeUpdate("CREATE TABLE " + curTableName + " (" + + "job_name VARCHAR(64) NOT NULL, " + + "propname VARCHAR(128) NOT NULL, " + + "propval VARCHAR(1024), " + + "propclass VARCHAR(32) NOT NULL, " + + "CONSTRAINT " + curTableName + "_unq UNIQUE " + + "(job_name, propname, propclass))"); + + // Then set a property in the root table pointing to it. + setRootProperty(SESSION_TABLE_KEY, 0, curTableName); + connection.commit(); + } finally { + s.close(); + } + + this.jobTableName = curTableName; + } + + /** + * Given a root schema that exists, + * initialize a version-0 key/value storage schema on top of it, + * if it does not already exist. + */ + private void initV0Schema() throws SQLException { + this.jobTableName = getRootProperty(SESSION_TABLE_KEY, 0); + if (null == this.jobTableName) { + createJobTable(); + } + if (!tableExists(this.jobTableName)) { + LOG.debug("Could not find job table: " + jobTableName); + createJobTable(); + } + } + + /** + * INSERT or UPDATE a single (job, propname, class) to point + * to the specified property value. + */ + private void setV0Property(String jobName, String propClass, + String propName, String propVal) throws SQLException { + LOG.debug("Job: " + jobName + "; Setting property " + + propName + " with class " + propClass + " => " + propVal); + + PreparedStatement s = null; + try { + String curValue = getV0Property(jobName, propClass, propName); + if (null == curValue) { + // Property is not yet set. + s = connection.prepareStatement("INSERT INTO " + this.jobTableName + + " (propval, job_name, propclass, propname) " + + "VALUES (?, ?, ?, ?)"); + } else { + // Overwrite existing property. + s = connection.prepareStatement("UPDATE " + this.jobTableName + + " SET propval = ? WHERE job_name = ? AND propclass = ? " + + "AND propname = ?"); + } + + s.setString(1, propVal); + s.setString(2, jobName); + s.setString(3, propClass); + s.setString(4, propName); + + s.executeUpdate(); + } finally { + if (null != s) { + s.close(); + } + } + } + + /** + * Return a string containing the value of a specified property, + * or null if it is not set. + */ + private String getV0Property(String jobName, String propClass, + String propertyName) throws SQLException { + LOG.debug("Job: " + jobName + "; Getting property " + + propertyName + " with class " + propClass); + + ResultSet rs = null; + PreparedStatement s = connection.prepareStatement( + "SELECT propval FROM " + this.jobTableName + + " WHERE job_name = ? AND propclass = ? AND propname = ?"); + + try { + s.setString(1, jobName); + s.setString(2, propClass); + s.setString(3, propertyName); + rs = s.executeQuery(); + + if (!rs.next()) { + LOG.debug(" => (no result)"); + return null; + } + + String result = rs.getString(1); + LOG.debug(" => " + result); + return result; + } finally { + if (null != rs) { + try { + rs.close(); + } catch (SQLException sqlE) { + LOG.warn("Error closing resultset: " + sqlE); + } + } + + s.close(); + } + } + + /** + * Get a java.util.Properties containing all propName -> propVal + * bindings for a given (jobName, propClass). + */ + private Properties getV0Properties(String jobName, String propClass) + throws SQLException { + LOG.debug("Job: " + jobName + + "; Getting properties with class " + propClass); + + ResultSet rs = null; + PreparedStatement s = connection.prepareStatement( + "SELECT propname, propval FROM " + this.jobTableName + + " WHERE job_name = ? AND propclass = ?"); + try { + s.setString(1, jobName); + s.setString(2, propClass); + rs = s.executeQuery(); + + Properties p = new Properties(); + while (rs.next()) { + p.setProperty(rs.getString(1), rs.getString(2)); + } + + return p; + } finally { + if (null != rs) { + try { + rs.close(); + } catch (SQLException sqlE) { + LOG.warn("Error closing result set: " + sqlE); + } + } + + s.close(); + } + } + + private void setV0Properties(String jobName, String propClass, + Properties properties) throws SQLException { + LOG.debug("Job: " + jobName + + "; Setting bulk properties for class " + propClass); + + for (Map.Entry entry : properties.entrySet()) { + String key = entry.getKey().toString(); + String val = entry.getValue().toString(); + setV0Property(jobName, propClass, key, val); + } + } +} + diff --git a/src/java/org/apache/sqoop/metastore/hsqldb/HsqldbMetaStore.java b/src/java/org/apache/sqoop/metastore/hsqldb/HsqldbMetaStore.java new file mode 100644 index 00000000..273cc1d9 --- /dev/null +++ b/src/java/org/apache/sqoop/metastore/hsqldb/HsqldbMetaStore.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.metastore.hsqldb; + +import java.io.File; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; + +import org.apache.hadoop.util.StringUtils; + +import org.hsqldb.Server; +import org.hsqldb.ServerConstants; + +import com.cloudera.sqoop.SqoopOptions; + +import com.cloudera.sqoop.manager.HsqldbManager; + +/** + * Container for an HSQLDB-backed metastore. + */ +public class HsqldbMetaStore { + + public static final Log LOG = LogFactory.getLog( + HsqldbMetaStore.class.getName()); + + /** Where on the local fs does the metastore put files? */ + public static final String META_STORAGE_LOCATION_KEY = + "sqoop.metastore.server.location"; + + /** + * What port does the metastore listen on? + */ + public static final String META_SERVER_PORT_KEY = + "sqoop.metastore.server.port"; + + /** Default to this port if unset. */ + public static final int DEFAULT_PORT = 16000; + + private int port; + private String fileLocation; + private Server server; + private Configuration conf; + + public HsqldbMetaStore(Configuration config) { + this.conf = config; + init(); + } + + /** + * Determine the user's home directory and return a file path + * under this root where the shared metastore can be placed. + */ + private String getHomeDirFilePath() { + String homeDir = System.getProperty("user.home"); + + File homeDirObj = new File(homeDir); + File sqoopDataDirObj = new File(homeDirObj, ".sqoop"); + File databaseFileObj = new File(sqoopDataDirObj, "shared-metastore.db"); + + return databaseFileObj.toString(); + } + + private void init() { + if (null != server) { + LOG.debug("init(): server already exists."); + return; + } + + fileLocation = conf.get(META_STORAGE_LOCATION_KEY, null); + if (null == fileLocation) { + fileLocation = getHomeDirFilePath(); + LOG.warn("The location for metastore data has not been explicitly set. " + + "Placing shared metastore files in " + fileLocation); + } + + this.port = conf.getInt(META_SERVER_PORT_KEY, DEFAULT_PORT); + } + + + public void start() { + try { + if (server != null) { + server.checkRunning(false); + } + } catch (RuntimeException re) { + LOG.info("Server is already started."); + return; + } + + server = new Server(); + server.setDatabasePath(0, "file:" + fileLocation); + server.setDatabaseName(0, "sqoop"); + server.putPropertiesFromString("hsqldb.write_delay=false"); + server.setPort(port); + server.setSilent(true); + server.setNoSystemExit(true); + + server.start(); + LOG.info("Server started on port " + port + " with protocol " + + server.getProtocol()); + } + + /** + * Blocks the current thread until the server is shut down. + */ + public void waitForServer() { + while (true) { + int curState = server.getState(); + if (curState == ServerConstants.SERVER_STATE_SHUTDOWN) { + LOG.info("Got shutdown notification"); + break; + } + + try { + Thread.sleep(100); + } catch (InterruptedException ie) { + LOG.info("Interrupted while blocking for server:" + + StringUtils.stringifyException(ie)); + } + } + } + + /** + * Connects to the server and instructs it to shutdown. + */ + public void shutdown() { + // Send the SHUTDOWN command to the server via SQL. + SqoopOptions options = new SqoopOptions(conf); + options.setConnectString("jdbc:hsqldb:hsql://localhost:" + + port + "/sqoop"); + options.setUsername("SA"); + options.setPassword(""); + HsqldbManager manager = new HsqldbManager(options); + Statement s = null; + try { + Connection c = manager.getConnection(); + s = c.createStatement(); + s.execute("SHUTDOWN"); + } catch (SQLException sqlE) { + LOG.warn("Exception shutting down database: " + + StringUtils.stringifyException(sqlE)); + } finally { + if (null != s) { + try { + s.close(); + } catch (SQLException sqlE) { + LOG.warn("Error closing statement: " + sqlE); + } + } + + try { + manager.close(); + } catch (SQLException sqlE) { + LOG.warn("Error closing manager: " + sqlE); + } + } + } +} +