mirror of
https://github.com/apache/sqoop.git
synced 2025-05-03 04:11:44 +08:00
SQOOP-375: Migrate metastore and metastore.hsqldb packages to new name space
git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1195857 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d91085714f
commit
3fcd6c8768
@ -1,6 +1,4 @@
|
||||
/**
|
||||
* Copyright 2011 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
@ -24,47 +22,17 @@
|
||||
import com.cloudera.sqoop.tool.SqoopTool;
|
||||
|
||||
/**
|
||||
* Container for all job data that should be stored to a
|
||||
* permanent resource.
|
||||
* @deprecated Moving to use org.apache.sqoop namespace.
|
||||
*/
|
||||
public class JobData {
|
||||
private SqoopOptions opts;
|
||||
private SqoopTool tool;
|
||||
public class JobData
|
||||
extends org.apache.sqoop.metastore.JobData {
|
||||
|
||||
public JobData() {
|
||||
super();
|
||||
}
|
||||
|
||||
public JobData(SqoopOptions options, SqoopTool sqoopTool) {
|
||||
this.opts = options;
|
||||
this.tool = sqoopTool;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the SqoopOptions.
|
||||
*/
|
||||
public SqoopOptions getSqoopOptions() {
|
||||
return this.opts;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the SqoopTool.
|
||||
*/
|
||||
public SqoopTool getSqoopTool() {
|
||||
return this.tool;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the SqoopOptions.
|
||||
*/
|
||||
public void setSqoopOptions(SqoopOptions options) {
|
||||
this.opts = options;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the SqoopTool.
|
||||
*/
|
||||
public void setSqoopTool(SqoopTool sqoopTool) {
|
||||
this.tool = sqoopTool;
|
||||
super(options, sqoopTool);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1,6 +1,4 @@
|
||||
/**
|
||||
* Copyright 2011 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
@ -20,76 +18,10 @@
|
||||
|
||||
package com.cloudera.sqoop.metastore;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.conf.Configured;
|
||||
|
||||
/**
|
||||
* API that defines how jobs are saved, restored, and manipulated.
|
||||
*
|
||||
* <p>
|
||||
* JobStorage instances may be created and then not used; the
|
||||
* JobStorage factory may create additional JobStorage instances
|
||||
* that return false from accept() and then discard them. The close()
|
||||
* method will only be triggered for a JobStorage if the connect()
|
||||
* method is called. Connection should not be triggered by a call to
|
||||
* accept().</p>
|
||||
* @deprecated Moving to use org.apache.sqoop namespace.
|
||||
*/
|
||||
public abstract class JobStorage extends Configured implements Closeable {
|
||||
|
||||
/**
|
||||
* Returns true if the JobStorage system can use the metadata in
|
||||
* the descriptor to connect to an underlying storage resource.
|
||||
*/
|
||||
public abstract boolean canAccept(Map<String, String> descriptor);
|
||||
|
||||
|
||||
/**
|
||||
* Opens / connects to the underlying storage resource specified by the
|
||||
* descriptor.
|
||||
*/
|
||||
public abstract void open(Map<String, String> descriptor)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Given a job name, reconstitute a JobData that contains all
|
||||
* configuration information required for the job. Returns null if the
|
||||
* job name does not match an available job.
|
||||
*/
|
||||
public abstract JobData read(String jobName)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Forget about a saved job.
|
||||
*/
|
||||
public abstract void delete(String jobName) throws IOException;
|
||||
|
||||
/**
|
||||
* Given a job name and the data describing a configured job, record the job
|
||||
* information to the storage medium.
|
||||
*/
|
||||
public abstract void create(String jobName, JobData data)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Given a job name and configured job data, update the underlying resource
|
||||
* to match the current job configuration.
|
||||
*/
|
||||
public abstract void update(String jobName, JobData data)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Close any resources opened by the JobStorage system.
|
||||
*/
|
||||
public void close() throws IOException {
|
||||
}
|
||||
|
||||
/**
|
||||
* Enumerate all jobs held in the connected resource.
|
||||
*/
|
||||
public abstract List<String> list() throws IOException;
|
||||
public abstract class JobStorage
|
||||
extends org.apache.sqoop.metastore.JobStorage {
|
||||
}
|
||||
|
||||
|
@ -1,6 +1,4 @@
|
||||
/**
|
||||
* Copyright 2011 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
@ -20,55 +18,20 @@
|
||||
|
||||
package com.cloudera.sqoop.metastore;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
/**
|
||||
* Factory that produces the correct JobStorage system to work with
|
||||
* a particular job descriptor.
|
||||
* @deprecated Moving to use org.apache.sqoop namespace.
|
||||
*/
|
||||
public class JobStorageFactory {
|
||||
public class JobStorageFactory
|
||||
extends org.apache.sqoop.metastore.JobStorageFactory {
|
||||
|
||||
private Configuration conf;
|
||||
|
||||
/**
|
||||
* Configuration key describing the list of JobStorage implementations
|
||||
* to use to handle jobs.
|
||||
*/
|
||||
public static final String AVAILABLE_STORAGES_KEY =
|
||||
"sqoop.job.storage.implementations";
|
||||
|
||||
/** The default list of available JobStorage implementations. */
|
||||
private static final String DEFAULT_AVAILABLE_STORAGES =
|
||||
"com.cloudera.sqoop.metastore.hsqldb.HsqldbJobStorage,"
|
||||
+ "com.cloudera.sqoop.metastore.hsqldb.AutoHsqldbStorage";
|
||||
org.apache.sqoop.metastore.JobStorageFactory.AVAILABLE_STORAGES_KEY;
|
||||
|
||||
public JobStorageFactory(Configuration config) {
|
||||
this.conf = config;
|
||||
|
||||
// Ensure that we always have an available storages list.
|
||||
if (this.conf.get(AVAILABLE_STORAGES_KEY) == null) {
|
||||
this.conf.set(AVAILABLE_STORAGES_KEY, DEFAULT_AVAILABLE_STORAGES);
|
||||
}
|
||||
super(config);
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a storage descriptor, determine the correct JobStorage
|
||||
* implementation to use to connect to the storage resource and return an
|
||||
* instance of it -- or null if no JobStorage instance is appropriate.
|
||||
*/
|
||||
public JobStorage getJobStorage(Map<String, String> descriptor) {
|
||||
List<JobStorage> storages = this.conf.getInstances(
|
||||
AVAILABLE_STORAGES_KEY, JobStorage.class);
|
||||
for (JobStorage stor : storages) {
|
||||
if (stor.canAccept(descriptor)) {
|
||||
return stor;
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,6 +1,4 @@
|
||||
/**
|
||||
* Copyright 2011 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
@ -20,97 +18,24 @@
|
||||
|
||||
package com.cloudera.sqoop.metastore.hsqldb;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
/**
|
||||
* JobStorage implementation that auto-configures an HSQLDB
|
||||
* local-file-based instance to hold jobs.
|
||||
* @deprecated Moving to use org.apache.sqoop namespace.
|
||||
*/
|
||||
public class AutoHsqldbStorage extends HsqldbJobStorage {
|
||||
public class AutoHsqldbStorage
|
||||
extends org.apache.sqoop.metastore.hsqldb.AutoHsqldbStorage {
|
||||
|
||||
public static final Log LOG = LogFactory.getLog(
|
||||
AutoHsqldbStorage.class.getName());
|
||||
|
||||
/**
|
||||
* Configuration key specifying whether this storage agent is active.
|
||||
* Defaults to "on" to allow zero-conf local users.
|
||||
*/
|
||||
public static final String AUTO_STORAGE_IS_ACTIVE_KEY =
|
||||
"sqoop.metastore.client.enable.autoconnect";
|
||||
|
||||
/**
|
||||
* Configuration key specifying the connect string used by this
|
||||
* storage agent.
|
||||
*/
|
||||
org.apache.sqoop.metastore.hsqldb.
|
||||
AutoHsqldbStorage.AUTO_STORAGE_IS_ACTIVE_KEY;
|
||||
public static final String AUTO_STORAGE_CONNECT_STRING_KEY =
|
||||
"sqoop.metastore.client.autoconnect.url";
|
||||
|
||||
/**
|
||||
* Configuration key specifying the username to bind with.
|
||||
*/
|
||||
org.apache.sqoop.metastore.hsqldb.
|
||||
AutoHsqldbStorage.AUTO_STORAGE_CONNECT_STRING_KEY;
|
||||
public static final String AUTO_STORAGE_USER_KEY =
|
||||
"sqoop.metastore.client.autoconnect.username";
|
||||
|
||||
|
||||
/** HSQLDB default user is named 'SA'. */
|
||||
private static final String DEFAULT_AUTO_USER = "SA";
|
||||
|
||||
/**
|
||||
* Configuration key specifying the password to bind with.
|
||||
*/
|
||||
org.apache.sqoop.metastore.hsqldb.AutoHsqldbStorage.AUTO_STORAGE_USER_KEY;
|
||||
public static final String AUTO_STORAGE_PASS_KEY =
|
||||
"sqoop.metastore.client.autoconnect.password";
|
||||
org.apache.sqoop.metastore.hsqldb.AutoHsqldbStorage.AUTO_STORAGE_PASS_KEY;
|
||||
public static final String DEFAULT_AUTO_PASSWORD =
|
||||
org.apache.sqoop.metastore.hsqldb.AutoHsqldbStorage.DEFAULT_AUTO_PASSWORD;
|
||||
|
||||
/** HSQLDB default user has an empty password. */
|
||||
public static final String DEFAULT_AUTO_PASSWORD = "";
|
||||
|
||||
@Override
|
||||
/** {@inheritDoc} */
|
||||
public boolean canAccept(Map<String, String> descriptor) {
|
||||
Configuration conf = this.getConf();
|
||||
return conf.getBoolean(AUTO_STORAGE_IS_ACTIVE_KEY, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine the user's home directory and return a connect
|
||||
* string to HSQLDB that uses ~/.sqoop/ as the storage location
|
||||
* for the metastore database.
|
||||
*/
|
||||
private String getHomeDirFileConnectStr() {
|
||||
String homeDir = System.getProperty("user.home");
|
||||
|
||||
File homeDirObj = new File(homeDir);
|
||||
File sqoopDataDirObj = new File(homeDirObj, ".sqoop");
|
||||
File databaseFileObj = new File(sqoopDataDirObj, "metastore.db");
|
||||
|
||||
String dbFileStr = databaseFileObj.toString();
|
||||
return "jdbc:hsqldb:file:" + dbFileStr
|
||||
+ ";hsqldb.write_delay=false;shutdown=true";
|
||||
}
|
||||
|
||||
@Override
|
||||
/**
|
||||
* Set the connection information to use the auto-inferred connection
|
||||
* string.
|
||||
*/
|
||||
public void open(Map<String, String> descriptor) throws IOException {
|
||||
Configuration conf = getConf();
|
||||
setMetastoreConnectStr(conf.get(AUTO_STORAGE_CONNECT_STRING_KEY,
|
||||
getHomeDirFileConnectStr()));
|
||||
setMetastoreUser(conf.get(AUTO_STORAGE_USER_KEY, DEFAULT_AUTO_USER));
|
||||
setMetastorePassword(conf.get(AUTO_STORAGE_PASS_KEY,
|
||||
DEFAULT_AUTO_PASSWORD));
|
||||
setConnectedDescriptor(descriptor);
|
||||
|
||||
init();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,6 +1,4 @@
|
||||
/**
|
||||
* Copyright 2011 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
@ -17,793 +15,22 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
|
||||
package com.cloudera.sqoop.metastore.hsqldb;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.DatabaseMetaData;
|
||||
import java.sql.DriverManager;
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
import com.cloudera.sqoop.SqoopOptions;
|
||||
import com.cloudera.sqoop.metastore.JobData;
|
||||
import com.cloudera.sqoop.metastore.JobStorage;
|
||||
import com.cloudera.sqoop.tool.SqoopTool;
|
||||
|
||||
/**
|
||||
* JobStorage implementation that uses an HSQLDB-backed database to
|
||||
* hold job information.
|
||||
* @deprecated Moving to use org.apache.sqoop namespace.
|
||||
*/
|
||||
public class HsqldbJobStorage extends JobStorage {
|
||||
public class HsqldbJobStorage
|
||||
extends org.apache.sqoop.metastore.hsqldb.HsqldbJobStorage {
|
||||
|
||||
public static final Log LOG = LogFactory.getLog(
|
||||
HsqldbJobStorage.class.getName());
|
||||
|
||||
/** descriptor key identifying the connect string for the metastore. */
|
||||
public static final String META_CONNECT_KEY = "metastore.connect.string";
|
||||
|
||||
/** descriptor key identifying the username to use when connecting
|
||||
* to the metastore.
|
||||
*/
|
||||
public static final String META_USERNAME_KEY = "metastore.username";
|
||||
|
||||
/** descriptor key identifying the password to use when connecting
|
||||
* to the metastore.
|
||||
*/
|
||||
public static final String META_PASSWORD_KEY = "metastore.password";
|
||||
|
||||
|
||||
/** Default name for the root metadata table in HSQLDB. */
|
||||
private static final String DEFAULT_ROOT_TABLE_NAME = "SQOOP_ROOT";
|
||||
|
||||
/** Configuration key used to override root table name. */
|
||||
public static final String META_CONNECT_KEY =
|
||||
org.apache.sqoop.metastore.hsqldb.HsqldbJobStorage.META_CONNECT_KEY;
|
||||
public static final String META_USERNAME_KEY =
|
||||
org.apache.sqoop.metastore.hsqldb.HsqldbJobStorage.META_USERNAME_KEY;
|
||||
public static final String META_PASSWORD_KEY =
|
||||
org.apache.sqoop.metastore.hsqldb.HsqldbJobStorage.META_PASSWORD_KEY;
|
||||
public static final String ROOT_TABLE_NAME_KEY =
|
||||
"sqoop.hsqldb.root.table.name";
|
||||
org.apache.sqoop.metastore.hsqldb.HsqldbJobStorage.ROOT_TABLE_NAME_KEY;
|
||||
|
||||
/** root metadata table key used to define the current schema version. */
|
||||
private static final String STORAGE_VERSION_KEY =
|
||||
"sqoop.hsqldb.job.storage.version";
|
||||
|
||||
/** The current version number for the schema edition. */
|
||||
private static final int CUR_STORAGE_VERSION = 0;
|
||||
|
||||
/** root metadata table key used to define the job table name. */
|
||||
private static final String SESSION_TABLE_KEY =
|
||||
"sqoop.hsqldb.job.info.table";
|
||||
|
||||
/** Default value for SESSION_TABLE_KEY. */
|
||||
private static final String DEFAULT_SESSION_TABLE_NAME =
|
||||
"SQOOP_SESSIONS";
|
||||
|
||||
/** Per-job key with propClass 'schema' that defines the set of
|
||||
* properties valid to be defined for propClass 'SqoopOptions'. */
|
||||
private static final String PROPERTY_SET_KEY =
|
||||
"sqoop.property.set.id";
|
||||
|
||||
/** Current value for PROPERTY_SET_KEY. */
|
||||
private static final String CUR_PROPERTY_SET_ID = "0";
|
||||
|
||||
// The following are values for propClass in the v0 schema which
|
||||
// describe different aspects of the stored metadata.
|
||||
|
||||
/** Property class for properties about the stored data itself. */
|
||||
private static final String PROPERTY_CLASS_SCHEMA = "schema";
|
||||
|
||||
/** Property class for properties that are loaded into SqoopOptions. */
|
||||
private static final String PROPERTY_CLASS_SQOOP_OPTIONS = "SqoopOptions";
|
||||
|
||||
/** Property class for properties that are loaded into a Configuration. */
|
||||
private static final String PROPERTY_CLASS_CONFIG = "config";
|
||||
|
||||
/**
|
||||
* Per-job key with propClass 'schema' that specifies the SqoopTool
|
||||
* to load.
|
||||
*/
|
||||
private static final String SQOOP_TOOL_KEY = "sqoop.tool";
|
||||
|
||||
|
||||
private Map<String, String> connectedDescriptor;
|
||||
private String metastoreConnectStr;
|
||||
private String metastoreUser;
|
||||
private String metastorePassword;
|
||||
private Connection connection;
|
||||
|
||||
protected Connection getConnection() {
|
||||
return this.connection;
|
||||
}
|
||||
|
||||
// After connection to the database and initialization of the
|
||||
// schema, this holds the name of the job table.
|
||||
private String jobTableName;
|
||||
|
||||
protected void setMetastoreConnectStr(String connectStr) {
|
||||
this.metastoreConnectStr = connectStr;
|
||||
}
|
||||
|
||||
protected void setMetastoreUser(String user) {
|
||||
this.metastoreUser = user;
|
||||
}
|
||||
|
||||
protected void setMetastorePassword(String pass) {
|
||||
this.metastorePassword = pass;
|
||||
}
|
||||
|
||||
private static final String DB_DRIVER_CLASS = "org.hsqldb.jdbcDriver";
|
||||
|
||||
/**
|
||||
* Set the descriptor used to open() this storage.
|
||||
*/
|
||||
protected void setConnectedDescriptor(Map<String, String> descriptor) {
|
||||
this.connectedDescriptor = descriptor;
|
||||
}
|
||||
|
||||
@Override
|
||||
/**
|
||||
* Initialize the connection to the database.
|
||||
*/
|
||||
public void open(Map<String, String> descriptor) throws IOException {
|
||||
setMetastoreConnectStr(descriptor.get(META_CONNECT_KEY));
|
||||
setMetastoreUser(descriptor.get(META_USERNAME_KEY));
|
||||
setMetastorePassword(descriptor.get(META_PASSWORD_KEY));
|
||||
setConnectedDescriptor(descriptor);
|
||||
|
||||
init();
|
||||
}
|
||||
|
||||
protected void init() throws IOException {
|
||||
try {
|
||||
// Load/initialize the JDBC driver.
|
||||
Class.forName(DB_DRIVER_CLASS);
|
||||
} catch (ClassNotFoundException cnfe) {
|
||||
throw new IOException("Could not load HSQLDB JDBC driver", cnfe);
|
||||
}
|
||||
|
||||
try {
|
||||
if (null == metastoreUser) {
|
||||
this.connection = DriverManager.getConnection(metastoreConnectStr);
|
||||
} else {
|
||||
this.connection = DriverManager.getConnection(metastoreConnectStr,
|
||||
metastoreUser, metastorePassword);
|
||||
}
|
||||
|
||||
connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
|
||||
connection.setAutoCommit(false);
|
||||
|
||||
// Initialize the root schema.
|
||||
if (!rootTableExists()) {
|
||||
createRootTable();
|
||||
}
|
||||
|
||||
// Check the schema version.
|
||||
String curStorageVerStr = getRootProperty(STORAGE_VERSION_KEY, null);
|
||||
int actualStorageVer = -1;
|
||||
try {
|
||||
actualStorageVer = Integer.valueOf(curStorageVerStr);
|
||||
} catch (NumberFormatException nfe) {
|
||||
LOG.warn("Could not interpret as a number: " + curStorageVerStr);
|
||||
}
|
||||
if (actualStorageVer != CUR_STORAGE_VERSION) {
|
||||
LOG.error("Can not interpret metadata schema");
|
||||
LOG.error("The metadata schema version is " + curStorageVerStr);
|
||||
LOG.error("The highest version supported is " + CUR_STORAGE_VERSION);
|
||||
LOG.error("To use this version of Sqoop, "
|
||||
+ "you must downgrade your metadata schema.");
|
||||
throw new IOException("Invalid metadata version.");
|
||||
}
|
||||
|
||||
// Initialize the versioned schema.
|
||||
initV0Schema();
|
||||
} catch (SQLException sqle) {
|
||||
if (null != connection) {
|
||||
try {
|
||||
connection.rollback();
|
||||
} catch (SQLException e2) {
|
||||
LOG.warn("Error rolling back transaction in error handler: " + e2);
|
||||
}
|
||||
}
|
||||
|
||||
throw new IOException("Exception creating SQL connection", sqle);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
if (null != this.connection) {
|
||||
try {
|
||||
LOG.debug("Flushing current transaction");
|
||||
this.connection.commit();
|
||||
} catch (SQLException sqlE) {
|
||||
throw new IOException("Exception committing connection", sqlE);
|
||||
}
|
||||
|
||||
try {
|
||||
LOG.debug("Closing connection");
|
||||
this.connection.close();
|
||||
} catch (SQLException sqlE) {
|
||||
throw new IOException("Exception closing connection", sqlE);
|
||||
} finally {
|
||||
this.connection = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
/** {@inheritDoc} */
|
||||
public boolean canAccept(Map<String, String> descriptor) {
|
||||
// We return true if the desciptor contains a connect string to find
|
||||
// the database.
|
||||
return descriptor.get(META_CONNECT_KEY) != null;
|
||||
}
|
||||
|
||||
@Override
|
||||
/** {@inheritDoc} */
|
||||
public JobData read(String jobName) throws IOException {
|
||||
try {
|
||||
if (!jobExists(jobName)) {
|
||||
LOG.error("Cannot restore job: " + jobName);
|
||||
LOG.error("(No such job)");
|
||||
throw new IOException("Cannot restore missing job " + jobName);
|
||||
}
|
||||
|
||||
LOG.debug("Restoring job: " + jobName);
|
||||
Properties schemaProps = getV0Properties(jobName,
|
||||
PROPERTY_CLASS_SCHEMA);
|
||||
Properties sqoopOptProps = getV0Properties(jobName,
|
||||
PROPERTY_CLASS_SQOOP_OPTIONS);
|
||||
Properties configProps = getV0Properties(jobName,
|
||||
PROPERTY_CLASS_CONFIG);
|
||||
|
||||
// Check that we're not using a saved job from a previous
|
||||
// version whose functionality has been deprecated.
|
||||
String thisPropSetId = schemaProps.getProperty(PROPERTY_SET_KEY);
|
||||
LOG.debug("System property set: " + CUR_PROPERTY_SET_ID);
|
||||
LOG.debug("Stored property set: " + thisPropSetId);
|
||||
if (!CUR_PROPERTY_SET_ID.equals(thisPropSetId)) {
|
||||
LOG.warn("The property set present in this database was written by");
|
||||
LOG.warn("an incompatible version of Sqoop. This may result in an");
|
||||
LOG.warn("incomplete operation.");
|
||||
// TODO(aaron): Should this fail out-right?
|
||||
}
|
||||
|
||||
String toolName = schemaProps.getProperty(SQOOP_TOOL_KEY);
|
||||
if (null == toolName) {
|
||||
// Don't know what tool to create.
|
||||
throw new IOException("Incomplete metadata; missing "
|
||||
+ SQOOP_TOOL_KEY);
|
||||
}
|
||||
|
||||
SqoopTool tool = SqoopTool.getTool(toolName);
|
||||
if (null == tool) {
|
||||
throw new IOException("Error in job metadata: invalid tool "
|
||||
+ toolName);
|
||||
}
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
for (Map.Entry<Object, Object> entry : configProps.entrySet()) {
|
||||
conf.set(entry.getKey().toString(), entry.getValue().toString());
|
||||
}
|
||||
|
||||
SqoopOptions opts = new SqoopOptions();
|
||||
opts.setConf(conf);
|
||||
opts.loadProperties(sqoopOptProps);
|
||||
|
||||
// Set the job connection information for this job.
|
||||
opts.setJobName(jobName);
|
||||
opts.setStorageDescriptor(connectedDescriptor);
|
||||
|
||||
return new JobData(opts, tool);
|
||||
} catch (SQLException sqlE) {
|
||||
throw new IOException("Error communicating with database", sqlE);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean jobExists(String jobName) throws SQLException {
|
||||
PreparedStatement s = connection.prepareStatement(
|
||||
"SELECT COUNT(job_name) FROM " + this.jobTableName
|
||||
+ " WHERE job_name = ? GROUP BY job_name");
|
||||
ResultSet rs = null;
|
||||
try {
|
||||
s.setString(1, jobName);
|
||||
rs = s.executeQuery();
|
||||
if (rs.next()) {
|
||||
return true; // We got a result, meaning the job exists.
|
||||
}
|
||||
} finally {
|
||||
if (null != rs) {
|
||||
try {
|
||||
rs.close();
|
||||
} catch (SQLException sqlE) {
|
||||
LOG.warn("Error closing result set: " + sqlE);
|
||||
}
|
||||
}
|
||||
|
||||
s.close();
|
||||
}
|
||||
|
||||
return false; // No result.
|
||||
}
|
||||
|
||||
@Override
|
||||
/** {@inheritDoc} */
|
||||
public void delete(String jobName) throws IOException {
|
||||
try {
|
||||
if (!jobExists(jobName)) {
|
||||
LOG.error("No such job: " + jobName);
|
||||
} else {
|
||||
LOG.debug("Deleting job: " + jobName);
|
||||
PreparedStatement s = connection.prepareStatement("DELETE FROM "
|
||||
+ this.jobTableName + " WHERE job_name = ?");
|
||||
try {
|
||||
s.setString(1, jobName);
|
||||
s.executeUpdate();
|
||||
} finally {
|
||||
s.close();
|
||||
}
|
||||
connection.commit();
|
||||
}
|
||||
} catch (SQLException sqlEx) {
|
||||
try {
|
||||
connection.rollback();
|
||||
} catch (SQLException e2) {
|
||||
LOG.warn("Error rolling back transaction in error handler: " + e2);
|
||||
}
|
||||
throw new IOException("Error communicating with database", sqlEx);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
/** {@inheritDoc} */
|
||||
public void create(String jobName, JobData data)
|
||||
throws IOException {
|
||||
try {
|
||||
if (jobExists(jobName)) {
|
||||
LOG.error("Cannot create job " + jobName
|
||||
+ ": it already exists");
|
||||
throw new IOException("Job " + jobName + " already exists");
|
||||
}
|
||||
} catch (SQLException sqlE) {
|
||||
throw new IOException("Error communicating with database", sqlE);
|
||||
}
|
||||
|
||||
createInternal(jobName, data);
|
||||
}
|
||||
|
||||
/**
|
||||
* Actually insert/update the resources for this job.
|
||||
*/
|
||||
private void createInternal(String jobName, JobData data)
|
||||
throws IOException {
|
||||
try {
|
||||
LOG.debug("Creating job: " + jobName);
|
||||
|
||||
// Save the name of the Sqoop tool.
|
||||
setV0Property(jobName, PROPERTY_CLASS_SCHEMA, SQOOP_TOOL_KEY,
|
||||
data.getSqoopTool().getToolName());
|
||||
|
||||
// Save the property set id.
|
||||
setV0Property(jobName, PROPERTY_CLASS_SCHEMA, PROPERTY_SET_KEY,
|
||||
CUR_PROPERTY_SET_ID);
|
||||
|
||||
// Save all properties of the SqoopOptions.
|
||||
Properties props = data.getSqoopOptions().writeProperties();
|
||||
setV0Properties(jobName, PROPERTY_CLASS_SQOOP_OPTIONS, props);
|
||||
|
||||
// And save all unique properties of the configuration.
|
||||
Configuration saveConf = data.getSqoopOptions().getConf();
|
||||
Configuration baseConf = new Configuration();
|
||||
|
||||
for (Map.Entry<String, String> entry : saveConf) {
|
||||
String key = entry.getKey();
|
||||
String rawVal = saveConf.getRaw(key);
|
||||
String baseVal = baseConf.getRaw(key);
|
||||
if (baseVal != null && rawVal.equals(baseVal)) {
|
||||
continue; // Don't save this; it's set in the base configuration.
|
||||
}
|
||||
|
||||
LOG.debug("Saving " + key + " => " + rawVal + " / " + baseVal);
|
||||
setV0Property(jobName, PROPERTY_CLASS_CONFIG, key, rawVal);
|
||||
}
|
||||
|
||||
connection.commit();
|
||||
} catch (SQLException sqlE) {
|
||||
try {
|
||||
connection.rollback();
|
||||
} catch (SQLException sqlE2) {
|
||||
LOG.warn("Exception rolling back transaction during error handling: "
|
||||
+ sqlE2);
|
||||
}
|
||||
throw new IOException("Error communicating with database", sqlE);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
/** {@inheritDoc} */
|
||||
public void update(String jobName, JobData data)
|
||||
throws IOException {
|
||||
try {
|
||||
if (!jobExists(jobName)) {
|
||||
LOG.error("Cannot update job " + jobName + ": not found");
|
||||
throw new IOException("Job " + jobName + " does not exist");
|
||||
}
|
||||
} catch (SQLException sqlE) {
|
||||
throw new IOException("Error communicating with database", sqlE);
|
||||
}
|
||||
|
||||
// Since we set properties with update-or-insert, this is the same
|
||||
// as create on this system.
|
||||
createInternal(jobName, data);
|
||||
}
|
||||
|
||||
@Override
|
||||
/** {@inheritDoc} */
|
||||
public List<String> list() throws IOException {
|
||||
ResultSet rs = null;
|
||||
try {
|
||||
PreparedStatement s = connection.prepareStatement(
|
||||
"SELECT DISTINCT job_name FROM " + this.jobTableName);
|
||||
try {
|
||||
rs = s.executeQuery();
|
||||
ArrayList<String> jobs = new ArrayList<String>();
|
||||
while (rs.next()) {
|
||||
jobs.add(rs.getString(1));
|
||||
}
|
||||
|
||||
return jobs;
|
||||
} finally {
|
||||
if (null != rs) {
|
||||
try {
|
||||
rs.close();
|
||||
} catch (SQLException sqlE) {
|
||||
LOG.warn("Error closing resultset: " + sqlE);
|
||||
}
|
||||
}
|
||||
|
||||
if (null != s) {
|
||||
s.close();
|
||||
}
|
||||
}
|
||||
} catch (SQLException sqlE) {
|
||||
throw new IOException("Error communicating with database", sqlE);
|
||||
}
|
||||
}
|
||||
|
||||
// Determine the name to use for the root metadata table.
|
||||
private String getRootTableName() {
|
||||
Configuration conf = getConf();
|
||||
return conf.get(ROOT_TABLE_NAME_KEY, DEFAULT_ROOT_TABLE_NAME);
|
||||
}
|
||||
|
||||
private boolean tableExists(String table) throws SQLException {
|
||||
LOG.debug("Checking for table: " + table);
|
||||
DatabaseMetaData dbmd = connection.getMetaData();
|
||||
String [] tableTypes = { "TABLE" };
|
||||
ResultSet rs = dbmd.getTables(null, null, null, tableTypes);
|
||||
if (null != rs) {
|
||||
try {
|
||||
while (rs.next()) {
|
||||
if (table.equalsIgnoreCase(rs.getString("TABLE_NAME"))) {
|
||||
LOG.debug("Found table: " + table);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
rs.close();
|
||||
}
|
||||
}
|
||||
|
||||
LOG.debug("Could not find table.");
|
||||
return false;
|
||||
}
|
||||
|
||||
private boolean rootTableExists() throws SQLException {
|
||||
String rootTableName = getRootTableName();
|
||||
return tableExists(rootTableName);
|
||||
}
|
||||
|
||||
private void createRootTable() throws SQLException {
|
||||
String rootTableName = getRootTableName();
|
||||
LOG.debug("Creating root table: " + rootTableName);
|
||||
|
||||
// TODO: Sanity-check the value of rootTableName to ensure it is
|
||||
// not a SQL-injection attack vector.
|
||||
Statement s = connection.createStatement();
|
||||
try {
|
||||
s.executeUpdate("CREATE TABLE " + rootTableName + " ("
|
||||
+ "version INT, "
|
||||
+ "propname VARCHAR(128) NOT NULL, "
|
||||
+ "propval VARCHAR(256), "
|
||||
+ "CONSTRAINT " + rootTableName + "_unq UNIQUE (version, propname))");
|
||||
} finally {
|
||||
s.close();
|
||||
}
|
||||
|
||||
setRootProperty(STORAGE_VERSION_KEY, null,
|
||||
Integer.toString(CUR_STORAGE_VERSION));
|
||||
|
||||
LOG.debug("Saving root table.");
|
||||
connection.commit();
|
||||
}
|
||||
|
||||
/**
|
||||
* Look up a value for the specified version (may be null) in the
|
||||
* root metadata table.
|
||||
*/
|
||||
private String getRootProperty(String propertyName, Integer version)
|
||||
throws SQLException {
|
||||
LOG.debug("Looking up property " + propertyName + " for version "
|
||||
+ version);
|
||||
PreparedStatement s = null;
|
||||
ResultSet rs = null;
|
||||
|
||||
try {
|
||||
if (null == version) {
|
||||
s = connection.prepareStatement(
|
||||
"SELECT propval FROM " + getRootTableName()
|
||||
+ " WHERE version IS NULL AND propname = ?");
|
||||
s.setString(1, propertyName);
|
||||
} else {
|
||||
s = connection.prepareStatement(
|
||||
"SELECT propval FROM " + getRootTableName() + " WHERE version = ? "
|
||||
+ " AND propname = ?");
|
||||
s.setInt(1, version);
|
||||
s.setString(2, propertyName);
|
||||
}
|
||||
|
||||
rs = s.executeQuery();
|
||||
if (!rs.next()) {
|
||||
LOG.debug(" => (no result)");
|
||||
return null; // No such result.
|
||||
} else {
|
||||
String result = rs.getString(1); // Return the only result col.
|
||||
LOG.debug(" => " + result);
|
||||
return result;
|
||||
}
|
||||
} finally {
|
||||
if (null != rs) {
|
||||
try {
|
||||
rs.close();
|
||||
} catch (SQLException sqlE) {
|
||||
LOG.warn("Error closing resultset: " + sqlE);
|
||||
}
|
||||
}
|
||||
|
||||
if (null != s) {
|
||||
s.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Set a value for the specified version (may be null) in the root
|
||||
* metadata table.
|
||||
*/
|
||||
private void setRootProperty(String propertyName, Integer version,
|
||||
String val) throws SQLException {
|
||||
LOG.debug("Setting property " + propertyName + " for version "
|
||||
+ version + " => " + val);
|
||||
|
||||
PreparedStatement s;
|
||||
String curVal = getRootProperty(propertyName, version);
|
||||
if (null == curVal) {
|
||||
// INSERT the row.
|
||||
s = connection.prepareStatement("INSERT INTO " + getRootTableName()
|
||||
+ " (propval, propname, version) VALUES ( ? , ? , ? )");
|
||||
} else if (version == null) {
|
||||
// UPDATE an existing row with a null version
|
||||
s = connection.prepareStatement("UPDATE " + getRootTableName()
|
||||
+ " SET propval = ? WHERE propname = ? AND version IS NULL");
|
||||
} else {
|
||||
// UPDATE an existing row with non-null version.
|
||||
s = connection.prepareStatement("UPDATE " + getRootTableName()
|
||||
+ " SET propval = ? WHERE propname = ? AND version = ?");
|
||||
}
|
||||
|
||||
try {
|
||||
s.setString(1, val);
|
||||
s.setString(2, propertyName);
|
||||
if (null != version) {
|
||||
s.setInt(3, version);
|
||||
}
|
||||
s.executeUpdate();
|
||||
} finally {
|
||||
s.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the jobs table in the V0 schema.
|
||||
*/
|
||||
private void createJobTable() throws SQLException {
|
||||
String curTableName = DEFAULT_SESSION_TABLE_NAME;
|
||||
int tableNum = -1;
|
||||
while (true) {
|
||||
if (tableExists(curTableName)) {
|
||||
tableNum++;
|
||||
curTableName = DEFAULT_SESSION_TABLE_NAME + "_" + tableNum;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// curTableName contains a table name that does not exist.
|
||||
// Create this table.
|
||||
LOG.debug("Creating job storage table: " + curTableName);
|
||||
Statement s = connection.createStatement();
|
||||
try {
|
||||
s.executeUpdate("CREATE TABLE " + curTableName + " ("
|
||||
+ "job_name VARCHAR(64) NOT NULL, "
|
||||
+ "propname VARCHAR(128) NOT NULL, "
|
||||
+ "propval VARCHAR(1024), "
|
||||
+ "propclass VARCHAR(32) NOT NULL, "
|
||||
+ "CONSTRAINT " + curTableName + "_unq UNIQUE "
|
||||
+ "(job_name, propname, propclass))");
|
||||
|
||||
// Then set a property in the root table pointing to it.
|
||||
setRootProperty(SESSION_TABLE_KEY, 0, curTableName);
|
||||
connection.commit();
|
||||
} finally {
|
||||
s.close();
|
||||
}
|
||||
|
||||
this.jobTableName = curTableName;
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a root schema that exists,
|
||||
* initialize a version-0 key/value storage schema on top of it,
|
||||
* if it does not already exist.
|
||||
*/
|
||||
private void initV0Schema() throws SQLException {
|
||||
this.jobTableName = getRootProperty(SESSION_TABLE_KEY, 0);
|
||||
if (null == this.jobTableName) {
|
||||
createJobTable();
|
||||
}
|
||||
if (!tableExists(this.jobTableName)) {
|
||||
LOG.debug("Could not find job table: " + jobTableName);
|
||||
createJobTable();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* INSERT or UPDATE a single (job, propname, class) to point
|
||||
* to the specified property value.
|
||||
*/
|
||||
private void setV0Property(String jobName, String propClass,
|
||||
String propName, String propVal) throws SQLException {
|
||||
LOG.debug("Job: " + jobName + "; Setting property "
|
||||
+ propName + " with class " + propClass + " => " + propVal);
|
||||
|
||||
PreparedStatement s = null;
|
||||
try {
|
||||
String curValue = getV0Property(jobName, propClass, propName);
|
||||
if (null == curValue) {
|
||||
// Property is not yet set.
|
||||
s = connection.prepareStatement("INSERT INTO " + this.jobTableName
|
||||
+ " (propval, job_name, propclass, propname) "
|
||||
+ "VALUES (?, ?, ?, ?)");
|
||||
} else {
|
||||
// Overwrite existing property.
|
||||
s = connection.prepareStatement("UPDATE " + this.jobTableName
|
||||
+ " SET propval = ? WHERE job_name = ? AND propclass = ? "
|
||||
+ "AND propname = ?");
|
||||
}
|
||||
|
||||
s.setString(1, propVal);
|
||||
s.setString(2, jobName);
|
||||
s.setString(3, propClass);
|
||||
s.setString(4, propName);
|
||||
|
||||
s.executeUpdate();
|
||||
} finally {
|
||||
if (null != s) {
|
||||
s.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a string containing the value of a specified property,
|
||||
* or null if it is not set.
|
||||
*/
|
||||
private String getV0Property(String jobName, String propClass,
|
||||
String propertyName) throws SQLException {
|
||||
LOG.debug("Job: " + jobName + "; Getting property "
|
||||
+ propertyName + " with class " + propClass);
|
||||
|
||||
ResultSet rs = null;
|
||||
PreparedStatement s = connection.prepareStatement(
|
||||
"SELECT propval FROM " + this.jobTableName
|
||||
+ " WHERE job_name = ? AND propclass = ? AND propname = ?");
|
||||
|
||||
try {
|
||||
s.setString(1, jobName);
|
||||
s.setString(2, propClass);
|
||||
s.setString(3, propertyName);
|
||||
rs = s.executeQuery();
|
||||
|
||||
if (!rs.next()) {
|
||||
LOG.debug(" => (no result)");
|
||||
return null;
|
||||
}
|
||||
|
||||
String result = rs.getString(1);
|
||||
LOG.debug(" => " + result);
|
||||
return result;
|
||||
} finally {
|
||||
if (null != rs) {
|
||||
try {
|
||||
rs.close();
|
||||
} catch (SQLException sqlE) {
|
||||
LOG.warn("Error closing resultset: " + sqlE);
|
||||
}
|
||||
}
|
||||
|
||||
s.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a java.util.Properties containing all propName -> propVal
|
||||
* bindings for a given (jobName, propClass).
|
||||
*/
|
||||
private Properties getV0Properties(String jobName, String propClass)
|
||||
throws SQLException {
|
||||
LOG.debug("Job: " + jobName
|
||||
+ "; Getting properties with class " + propClass);
|
||||
|
||||
ResultSet rs = null;
|
||||
PreparedStatement s = connection.prepareStatement(
|
||||
"SELECT propname, propval FROM " + this.jobTableName
|
||||
+ " WHERE job_name = ? AND propclass = ?");
|
||||
try {
|
||||
s.setString(1, jobName);
|
||||
s.setString(2, propClass);
|
||||
rs = s.executeQuery();
|
||||
|
||||
Properties p = new Properties();
|
||||
while (rs.next()) {
|
||||
p.setProperty(rs.getString(1), rs.getString(2));
|
||||
}
|
||||
|
||||
return p;
|
||||
} finally {
|
||||
if (null != rs) {
|
||||
try {
|
||||
rs.close();
|
||||
} catch (SQLException sqlE) {
|
||||
LOG.warn("Error closing result set: " + sqlE);
|
||||
}
|
||||
}
|
||||
|
||||
s.close();
|
||||
}
|
||||
}
|
||||
|
||||
private void setV0Properties(String jobName, String propClass,
|
||||
Properties properties) throws SQLException {
|
||||
LOG.debug("Job: " + jobName
|
||||
+ "; Setting bulk properties for class " + propClass);
|
||||
|
||||
for (Map.Entry<Object, Object> entry : properties.entrySet()) {
|
||||
String key = entry.getKey().toString();
|
||||
String val = entry.getValue().toString();
|
||||
setV0Property(jobName, propClass, key, val);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,6 +1,4 @@
|
||||
/**
|
||||
* Copyright 2011 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
@ -21,164 +19,24 @@
|
||||
|
||||
package com.cloudera.sqoop.metastore.hsqldb;
|
||||
|
||||
import java.io.File;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
import org.hsqldb.Server;
|
||||
import org.hsqldb.ServerConstants;
|
||||
|
||||
import com.cloudera.sqoop.SqoopOptions;
|
||||
|
||||
import com.cloudera.sqoop.manager.HsqldbManager;
|
||||
|
||||
/**
|
||||
* Container for an HSQLDB-backed metastore.
|
||||
* @deprecated Moving to use org.apache.sqoop namespace.
|
||||
*/
|
||||
public class HsqldbMetaStore {
|
||||
public class HsqldbMetaStore
|
||||
extends org.apache.sqoop.metastore.hsqldb.HsqldbMetaStore {
|
||||
|
||||
public static final Log LOG = LogFactory.getLog(
|
||||
HsqldbMetaStore.class.getName());
|
||||
|
||||
/** Where on the local fs does the metastore put files? */
|
||||
public static final String META_STORAGE_LOCATION_KEY =
|
||||
"sqoop.metastore.server.location";
|
||||
|
||||
/**
|
||||
* What port does the metastore listen on?
|
||||
*/
|
||||
org.apache.sqoop.metastore.hsqldb.HsqldbMetaStore.META_STORAGE_LOCATION_KEY;
|
||||
public static final String META_SERVER_PORT_KEY =
|
||||
"sqoop.metastore.server.port";
|
||||
|
||||
/** Default to this port if unset. */
|
||||
public static final int DEFAULT_PORT = 16000;
|
||||
|
||||
private int port;
|
||||
private String fileLocation;
|
||||
private Server server;
|
||||
private Configuration conf;
|
||||
org.apache.sqoop.metastore.hsqldb.HsqldbMetaStore.META_SERVER_PORT_KEY;
|
||||
public static final int DEFAULT_PORT =
|
||||
org.apache.sqoop.metastore.hsqldb.HsqldbMetaStore.DEFAULT_PORT;
|
||||
|
||||
public HsqldbMetaStore(Configuration config) {
|
||||
this.conf = config;
|
||||
init();
|
||||
super(config);
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine the user's home directory and return a file path
|
||||
* under this root where the shared metastore can be placed.
|
||||
*/
|
||||
private String getHomeDirFilePath() {
|
||||
String homeDir = System.getProperty("user.home");
|
||||
|
||||
File homeDirObj = new File(homeDir);
|
||||
File sqoopDataDirObj = new File(homeDirObj, ".sqoop");
|
||||
File databaseFileObj = new File(sqoopDataDirObj, "shared-metastore.db");
|
||||
|
||||
return databaseFileObj.toString();
|
||||
}
|
||||
|
||||
private void init() {
|
||||
if (null != server) {
|
||||
LOG.debug("init(): server already exists.");
|
||||
return;
|
||||
}
|
||||
|
||||
fileLocation = conf.get(META_STORAGE_LOCATION_KEY, null);
|
||||
if (null == fileLocation) {
|
||||
fileLocation = getHomeDirFilePath();
|
||||
LOG.warn("The location for metastore data has not been explicitly set. "
|
||||
+ "Placing shared metastore files in " + fileLocation);
|
||||
}
|
||||
|
||||
this.port = conf.getInt(META_SERVER_PORT_KEY, DEFAULT_PORT);
|
||||
}
|
||||
|
||||
|
||||
public void start() {
|
||||
try {
|
||||
if (server != null) {
|
||||
server.checkRunning(false);
|
||||
}
|
||||
} catch (RuntimeException re) {
|
||||
LOG.info("Server is already started.");
|
||||
return;
|
||||
}
|
||||
|
||||
server = new Server();
|
||||
server.setDatabasePath(0, "file:" + fileLocation);
|
||||
server.setDatabaseName(0, "sqoop");
|
||||
server.putPropertiesFromString("hsqldb.write_delay=false");
|
||||
server.setPort(port);
|
||||
server.setSilent(true);
|
||||
server.setNoSystemExit(true);
|
||||
|
||||
server.start();
|
||||
LOG.info("Server started on port " + port + " with protocol "
|
||||
+ server.getProtocol());
|
||||
}
|
||||
|
||||
/**
|
||||
* Blocks the current thread until the server is shut down.
|
||||
*/
|
||||
public void waitForServer() {
|
||||
while (true) {
|
||||
int curState = server.getState();
|
||||
if (curState == ServerConstants.SERVER_STATE_SHUTDOWN) {
|
||||
LOG.info("Got shutdown notification");
|
||||
break;
|
||||
}
|
||||
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException ie) {
|
||||
LOG.info("Interrupted while blocking for server:"
|
||||
+ StringUtils.stringifyException(ie));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Connects to the server and instructs it to shutdown.
|
||||
*/
|
||||
public void shutdown() {
|
||||
// Send the SHUTDOWN command to the server via SQL.
|
||||
SqoopOptions options = new SqoopOptions(conf);
|
||||
options.setConnectString("jdbc:hsqldb:hsql://localhost:"
|
||||
+ port + "/sqoop");
|
||||
options.setUsername("SA");
|
||||
options.setPassword("");
|
||||
HsqldbManager manager = new HsqldbManager(options);
|
||||
Statement s = null;
|
||||
try {
|
||||
Connection c = manager.getConnection();
|
||||
s = c.createStatement();
|
||||
s.execute("SHUTDOWN");
|
||||
} catch (SQLException sqlE) {
|
||||
LOG.warn("Exception shutting down database: "
|
||||
+ StringUtils.stringifyException(sqlE));
|
||||
} finally {
|
||||
if (null != s) {
|
||||
try {
|
||||
s.close();
|
||||
} catch (SQLException sqlE) {
|
||||
LOG.warn("Error closing statement: " + sqlE);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
manager.close();
|
||||
} catch (SQLException sqlE) {
|
||||
LOG.warn("Error closing manager: " + sqlE);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
69
src/java/org/apache/sqoop/metastore/JobData.java
Normal file
69
src/java/org/apache/sqoop/metastore/JobData.java
Normal file
@ -0,0 +1,69 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.sqoop.metastore;
|
||||
|
||||
import com.cloudera.sqoop.SqoopOptions;
|
||||
import com.cloudera.sqoop.tool.SqoopTool;
|
||||
|
||||
/**
|
||||
* Container for all job data that should be stored to a
|
||||
* permanent resource.
|
||||
*/
|
||||
public class JobData {
|
||||
private SqoopOptions opts;
|
||||
private SqoopTool tool;
|
||||
|
||||
public JobData() {
|
||||
}
|
||||
|
||||
public JobData(SqoopOptions options, SqoopTool sqoopTool) {
|
||||
this.opts = options;
|
||||
this.tool = sqoopTool;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the SqoopOptions.
|
||||
*/
|
||||
public SqoopOptions getSqoopOptions() {
|
||||
return this.opts;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the SqoopTool.
|
||||
*/
|
||||
public SqoopTool getSqoopTool() {
|
||||
return this.tool;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the SqoopOptions.
|
||||
*/
|
||||
public void setSqoopOptions(SqoopOptions options) {
|
||||
this.opts = options;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the SqoopTool.
|
||||
*/
|
||||
public void setSqoopTool(SqoopTool sqoopTool) {
|
||||
this.tool = sqoopTool;
|
||||
}
|
||||
|
||||
}
|
||||
|
94
src/java/org/apache/sqoop/metastore/JobStorage.java
Normal file
94
src/java/org/apache/sqoop/metastore/JobStorage.java
Normal file
@ -0,0 +1,94 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.sqoop.metastore;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.conf.Configured;
|
||||
import com.cloudera.sqoop.metastore.JobData;
|
||||
|
||||
/**
|
||||
* API that defines how jobs are saved, restored, and manipulated.
|
||||
*
|
||||
* <p>
|
||||
* JobStorage instances may be created and then not used; the
|
||||
* JobStorage factory may create additional JobStorage instances
|
||||
* that return false from accept() and then discard them. The close()
|
||||
* method will only be triggered for a JobStorage if the connect()
|
||||
* method is called. Connection should not be triggered by a call to
|
||||
* accept().</p>
|
||||
*/
|
||||
public abstract class JobStorage extends Configured implements Closeable {
|
||||
|
||||
/**
|
||||
* Returns true if the JobStorage system can use the metadata in
|
||||
* the descriptor to connect to an underlying storage resource.
|
||||
*/
|
||||
public abstract boolean canAccept(Map<String, String> descriptor);
|
||||
|
||||
|
||||
/**
|
||||
* Opens / connects to the underlying storage resource specified by the
|
||||
* descriptor.
|
||||
*/
|
||||
public abstract void open(Map<String, String> descriptor)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Given a job name, reconstitute a JobData that contains all
|
||||
* configuration information required for the job. Returns null if the
|
||||
* job name does not match an available job.
|
||||
*/
|
||||
public abstract JobData read(String jobName)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Forget about a saved job.
|
||||
*/
|
||||
public abstract void delete(String jobName) throws IOException;
|
||||
|
||||
/**
|
||||
* Given a job name and the data describing a configured job, record the job
|
||||
* information to the storage medium.
|
||||
*/
|
||||
public abstract void create(String jobName, JobData data)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Given a job name and configured job data, update the underlying resource
|
||||
* to match the current job configuration.
|
||||
*/
|
||||
public abstract void update(String jobName, JobData data)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Close any resources opened by the JobStorage system.
|
||||
*/
|
||||
public void close() throws IOException {
|
||||
}
|
||||
|
||||
/**
|
||||
* Enumerate all jobs held in the connected resource.
|
||||
*/
|
||||
public abstract List<String> list() throws IOException;
|
||||
}
|
||||
|
73
src/java/org/apache/sqoop/metastore/JobStorageFactory.java
Normal file
73
src/java/org/apache/sqoop/metastore/JobStorageFactory.java
Normal file
@ -0,0 +1,73 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.sqoop.metastore;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import com.cloudera.sqoop.metastore.JobStorage;
|
||||
|
||||
/**
|
||||
* Factory that produces the correct JobStorage system to work with
|
||||
* a particular job descriptor.
|
||||
*/
|
||||
public class JobStorageFactory {
|
||||
|
||||
private Configuration conf;
|
||||
|
||||
/**
|
||||
* Configuration key describing the list of JobStorage implementations
|
||||
* to use to handle jobs.
|
||||
*/
|
||||
public static final String AVAILABLE_STORAGES_KEY =
|
||||
"sqoop.job.storage.implementations";
|
||||
|
||||
/** The default list of available JobStorage implementations. */
|
||||
private static final String DEFAULT_AVAILABLE_STORAGES =
|
||||
"com.cloudera.sqoop.metastore.hsqldb.HsqldbJobStorage,"
|
||||
+ "com.cloudera.sqoop.metastore.hsqldb.AutoHsqldbStorage";
|
||||
|
||||
public JobStorageFactory(Configuration config) {
|
||||
this.conf = config;
|
||||
|
||||
// Ensure that we always have an available storages list.
|
||||
if (this.conf.get(AVAILABLE_STORAGES_KEY) == null) {
|
||||
this.conf.set(AVAILABLE_STORAGES_KEY, DEFAULT_AVAILABLE_STORAGES);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a storage descriptor, determine the correct JobStorage
|
||||
* implementation to use to connect to the storage resource and return an
|
||||
* instance of it -- or null if no JobStorage instance is appropriate.
|
||||
*/
|
||||
public JobStorage getJobStorage(Map<String, String> descriptor) {
|
||||
List<JobStorage> storages = this.conf.getInstances(
|
||||
AVAILABLE_STORAGES_KEY, JobStorage.class);
|
||||
for (JobStorage stor : storages) {
|
||||
if (stor.canAccept(descriptor)) {
|
||||
return stor;
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,115 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.sqoop.metastore.hsqldb;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
/**
|
||||
* JobStorage implementation that auto-configures an HSQLDB
|
||||
* local-file-based instance to hold jobs.
|
||||
*/
|
||||
public class AutoHsqldbStorage
|
||||
extends com.cloudera.sqoop.metastore.hsqldb.HsqldbJobStorage {
|
||||
|
||||
public static final Log LOG = LogFactory.getLog(
|
||||
AutoHsqldbStorage.class.getName());
|
||||
|
||||
/**
|
||||
* Configuration key specifying whether this storage agent is active.
|
||||
* Defaults to "on" to allow zero-conf local users.
|
||||
*/
|
||||
public static final String AUTO_STORAGE_IS_ACTIVE_KEY =
|
||||
"sqoop.metastore.client.enable.autoconnect";
|
||||
|
||||
/**
|
||||
* Configuration key specifying the connect string used by this
|
||||
* storage agent.
|
||||
*/
|
||||
public static final String AUTO_STORAGE_CONNECT_STRING_KEY =
|
||||
"sqoop.metastore.client.autoconnect.url";
|
||||
|
||||
/**
|
||||
* Configuration key specifying the username to bind with.
|
||||
*/
|
||||
public static final String AUTO_STORAGE_USER_KEY =
|
||||
"sqoop.metastore.client.autoconnect.username";
|
||||
|
||||
|
||||
/** HSQLDB default user is named 'SA'. */
|
||||
private static final String DEFAULT_AUTO_USER = "SA";
|
||||
|
||||
/**
|
||||
* Configuration key specifying the password to bind with.
|
||||
*/
|
||||
public static final String AUTO_STORAGE_PASS_KEY =
|
||||
"sqoop.metastore.client.autoconnect.password";
|
||||
|
||||
/** HSQLDB default user has an empty password. */
|
||||
public static final String DEFAULT_AUTO_PASSWORD = "";
|
||||
|
||||
@Override
|
||||
/** {@inheritDoc} */
|
||||
public boolean canAccept(Map<String, String> descriptor) {
|
||||
Configuration conf = this.getConf();
|
||||
return conf.getBoolean(AUTO_STORAGE_IS_ACTIVE_KEY, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine the user's home directory and return a connect
|
||||
* string to HSQLDB that uses ~/.sqoop/ as the storage location
|
||||
* for the metastore database.
|
||||
*/
|
||||
private String getHomeDirFileConnectStr() {
|
||||
String homeDir = System.getProperty("user.home");
|
||||
|
||||
File homeDirObj = new File(homeDir);
|
||||
File sqoopDataDirObj = new File(homeDirObj, ".sqoop");
|
||||
File databaseFileObj = new File(sqoopDataDirObj, "metastore.db");
|
||||
|
||||
String dbFileStr = databaseFileObj.toString();
|
||||
return "jdbc:hsqldb:file:" + dbFileStr
|
||||
+ ";hsqldb.write_delay=false;shutdown=true";
|
||||
}
|
||||
|
||||
@Override
|
||||
/**
|
||||
* Set the connection information to use the auto-inferred connection
|
||||
* string.
|
||||
*/
|
||||
public void open(Map<String, String> descriptor) throws IOException {
|
||||
Configuration conf = getConf();
|
||||
setMetastoreConnectStr(conf.get(AUTO_STORAGE_CONNECT_STRING_KEY,
|
||||
getHomeDirFileConnectStr()));
|
||||
setMetastoreUser(conf.get(AUTO_STORAGE_USER_KEY, DEFAULT_AUTO_USER));
|
||||
setMetastorePassword(conf.get(AUTO_STORAGE_PASS_KEY,
|
||||
DEFAULT_AUTO_PASSWORD));
|
||||
setConnectedDescriptor(descriptor);
|
||||
|
||||
init();
|
||||
}
|
||||
}
|
||||
|
805
src/java/org/apache/sqoop/metastore/hsqldb/HsqldbJobStorage.java
Normal file
805
src/java/org/apache/sqoop/metastore/hsqldb/HsqldbJobStorage.java
Normal file
@ -0,0 +1,805 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.sqoop.metastore.hsqldb;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.DatabaseMetaData;
|
||||
import java.sql.DriverManager;
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
import com.cloudera.sqoop.SqoopOptions;
|
||||
import com.cloudera.sqoop.metastore.JobData;
|
||||
import com.cloudera.sqoop.metastore.JobStorage;
|
||||
import com.cloudera.sqoop.tool.SqoopTool;
|
||||
|
||||
/**
|
||||
* JobStorage implementation that uses an HSQLDB-backed database to
|
||||
* hold job information.
|
||||
*/
|
||||
public class HsqldbJobStorage extends JobStorage {
|
||||
|
||||
public static final Log LOG = LogFactory.getLog(
|
||||
HsqldbJobStorage.class.getName());
|
||||
|
||||
/** descriptor key identifying the connect string for the metastore. */
|
||||
public static final String META_CONNECT_KEY = "metastore.connect.string";
|
||||
|
||||
/** descriptor key identifying the username to use when connecting
|
||||
* to the metastore.
|
||||
*/
|
||||
public static final String META_USERNAME_KEY = "metastore.username";
|
||||
|
||||
/** descriptor key identifying the password to use when connecting
|
||||
* to the metastore.
|
||||
*/
|
||||
public static final String META_PASSWORD_KEY = "metastore.password";
|
||||
|
||||
|
||||
/** Default name for the root metadata table in HSQLDB. */
|
||||
private static final String DEFAULT_ROOT_TABLE_NAME = "SQOOP_ROOT";
|
||||
|
||||
/** Configuration key used to override root table name. */
|
||||
public static final String ROOT_TABLE_NAME_KEY =
|
||||
"sqoop.hsqldb.root.table.name";
|
||||
|
||||
/** root metadata table key used to define the current schema version. */
|
||||
private static final String STORAGE_VERSION_KEY =
|
||||
"sqoop.hsqldb.job.storage.version";
|
||||
|
||||
/** The current version number for the schema edition. */
|
||||
private static final int CUR_STORAGE_VERSION = 0;
|
||||
|
||||
/** root metadata table key used to define the job table name. */
|
||||
private static final String SESSION_TABLE_KEY =
|
||||
"sqoop.hsqldb.job.info.table";
|
||||
|
||||
/** Default value for SESSION_TABLE_KEY. */
|
||||
private static final String DEFAULT_SESSION_TABLE_NAME =
|
||||
"SQOOP_SESSIONS";
|
||||
|
||||
/** Per-job key with propClass 'schema' that defines the set of
|
||||
* properties valid to be defined for propClass 'SqoopOptions'. */
|
||||
private static final String PROPERTY_SET_KEY =
|
||||
"sqoop.property.set.id";
|
||||
|
||||
/** Current value for PROPERTY_SET_KEY. */
|
||||
private static final String CUR_PROPERTY_SET_ID = "0";
|
||||
|
||||
// The following are values for propClass in the v0 schema which
|
||||
// describe different aspects of the stored metadata.
|
||||
|
||||
/** Property class for properties about the stored data itself. */
|
||||
private static final String PROPERTY_CLASS_SCHEMA = "schema";
|
||||
|
||||
/** Property class for properties that are loaded into SqoopOptions. */
|
||||
private static final String PROPERTY_CLASS_SQOOP_OPTIONS = "SqoopOptions";
|
||||
|
||||
/** Property class for properties that are loaded into a Configuration. */
|
||||
private static final String PROPERTY_CLASS_CONFIG = "config";
|
||||
|
||||
/**
|
||||
* Per-job key with propClass 'schema' that specifies the SqoopTool
|
||||
* to load.
|
||||
*/
|
||||
private static final String SQOOP_TOOL_KEY = "sqoop.tool";
|
||||
|
||||
|
||||
private Map<String, String> connectedDescriptor;
|
||||
private String metastoreConnectStr;
|
||||
private String metastoreUser;
|
||||
private String metastorePassword;
|
||||
private Connection connection;
|
||||
|
||||
protected Connection getConnection() {
|
||||
return this.connection;
|
||||
}
|
||||
|
||||
// After connection to the database and initialization of the
|
||||
// schema, this holds the name of the job table.
|
||||
private String jobTableName;
|
||||
|
||||
protected void setMetastoreConnectStr(String connectStr) {
|
||||
this.metastoreConnectStr = connectStr;
|
||||
}
|
||||
|
||||
protected void setMetastoreUser(String user) {
|
||||
this.metastoreUser = user;
|
||||
}
|
||||
|
||||
protected void setMetastorePassword(String pass) {
|
||||
this.metastorePassword = pass;
|
||||
}
|
||||
|
||||
private static final String DB_DRIVER_CLASS = "org.hsqldb.jdbcDriver";
|
||||
|
||||
/**
|
||||
* Set the descriptor used to open() this storage.
|
||||
*/
|
||||
protected void setConnectedDescriptor(Map<String, String> descriptor) {
|
||||
this.connectedDescriptor = descriptor;
|
||||
}
|
||||
|
||||
@Override
|
||||
/**
|
||||
* Initialize the connection to the database.
|
||||
*/
|
||||
public void open(Map<String, String> descriptor) throws IOException {
|
||||
setMetastoreConnectStr(descriptor.get(META_CONNECT_KEY));
|
||||
setMetastoreUser(descriptor.get(META_USERNAME_KEY));
|
||||
setMetastorePassword(descriptor.get(META_PASSWORD_KEY));
|
||||
setConnectedDescriptor(descriptor);
|
||||
|
||||
init();
|
||||
}
|
||||
|
||||
protected void init() throws IOException {
|
||||
try {
|
||||
// Load/initialize the JDBC driver.
|
||||
Class.forName(DB_DRIVER_CLASS);
|
||||
} catch (ClassNotFoundException cnfe) {
|
||||
throw new IOException("Could not load HSQLDB JDBC driver", cnfe);
|
||||
}
|
||||
|
||||
try {
|
||||
if (null == metastoreUser) {
|
||||
this.connection = DriverManager.getConnection(metastoreConnectStr);
|
||||
} else {
|
||||
this.connection = DriverManager.getConnection(metastoreConnectStr,
|
||||
metastoreUser, metastorePassword);
|
||||
}
|
||||
|
||||
connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
|
||||
connection.setAutoCommit(false);
|
||||
|
||||
// Initialize the root schema.
|
||||
if (!rootTableExists()) {
|
||||
createRootTable();
|
||||
}
|
||||
|
||||
// Check the schema version.
|
||||
String curStorageVerStr = getRootProperty(STORAGE_VERSION_KEY, null);
|
||||
int actualStorageVer = -1;
|
||||
try {
|
||||
actualStorageVer = Integer.valueOf(curStorageVerStr);
|
||||
} catch (NumberFormatException nfe) {
|
||||
LOG.warn("Could not interpret as a number: " + curStorageVerStr);
|
||||
}
|
||||
if (actualStorageVer != CUR_STORAGE_VERSION) {
|
||||
LOG.error("Can not interpret metadata schema");
|
||||
LOG.error("The metadata schema version is " + curStorageVerStr);
|
||||
LOG.error("The highest version supported is " + CUR_STORAGE_VERSION);
|
||||
LOG.error("To use this version of Sqoop, "
|
||||
+ "you must downgrade your metadata schema.");
|
||||
throw new IOException("Invalid metadata version.");
|
||||
}
|
||||
|
||||
// Initialize the versioned schema.
|
||||
initV0Schema();
|
||||
} catch (SQLException sqle) {
|
||||
if (null != connection) {
|
||||
try {
|
||||
connection.rollback();
|
||||
} catch (SQLException e2) {
|
||||
LOG.warn("Error rolling back transaction in error handler: " + e2);
|
||||
}
|
||||
}
|
||||
|
||||
throw new IOException("Exception creating SQL connection", sqle);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
if (null != this.connection) {
|
||||
try {
|
||||
LOG.debug("Flushing current transaction");
|
||||
this.connection.commit();
|
||||
} catch (SQLException sqlE) {
|
||||
throw new IOException("Exception committing connection", sqlE);
|
||||
}
|
||||
|
||||
try {
|
||||
LOG.debug("Closing connection");
|
||||
this.connection.close();
|
||||
} catch (SQLException sqlE) {
|
||||
throw new IOException("Exception closing connection", sqlE);
|
||||
} finally {
|
||||
this.connection = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
/** {@inheritDoc} */
|
||||
public boolean canAccept(Map<String, String> descriptor) {
|
||||
// We return true if the desciptor contains a connect string to find
|
||||
// the database.
|
||||
return descriptor.get(META_CONNECT_KEY) != null;
|
||||
}
|
||||
|
||||
@Override
|
||||
/** {@inheritDoc} */
|
||||
public JobData read(String jobName) throws IOException {
|
||||
try {
|
||||
if (!jobExists(jobName)) {
|
||||
LOG.error("Cannot restore job: " + jobName);
|
||||
LOG.error("(No such job)");
|
||||
throw new IOException("Cannot restore missing job " + jobName);
|
||||
}
|
||||
|
||||
LOG.debug("Restoring job: " + jobName);
|
||||
Properties schemaProps = getV0Properties(jobName,
|
||||
PROPERTY_CLASS_SCHEMA);
|
||||
Properties sqoopOptProps = getV0Properties(jobName,
|
||||
PROPERTY_CLASS_SQOOP_OPTIONS);
|
||||
Properties configProps = getV0Properties(jobName,
|
||||
PROPERTY_CLASS_CONFIG);
|
||||
|
||||
// Check that we're not using a saved job from a previous
|
||||
// version whose functionality has been deprecated.
|
||||
String thisPropSetId = schemaProps.getProperty(PROPERTY_SET_KEY);
|
||||
LOG.debug("System property set: " + CUR_PROPERTY_SET_ID);
|
||||
LOG.debug("Stored property set: " + thisPropSetId);
|
||||
if (!CUR_PROPERTY_SET_ID.equals(thisPropSetId)) {
|
||||
LOG.warn("The property set present in this database was written by");
|
||||
LOG.warn("an incompatible version of Sqoop. This may result in an");
|
||||
LOG.warn("incomplete operation.");
|
||||
// TODO(aaron): Should this fail out-right?
|
||||
}
|
||||
|
||||
String toolName = schemaProps.getProperty(SQOOP_TOOL_KEY);
|
||||
if (null == toolName) {
|
||||
// Don't know what tool to create.
|
||||
throw new IOException("Incomplete metadata; missing "
|
||||
+ SQOOP_TOOL_KEY);
|
||||
}
|
||||
|
||||
SqoopTool tool = SqoopTool.getTool(toolName);
|
||||
if (null == tool) {
|
||||
throw new IOException("Error in job metadata: invalid tool "
|
||||
+ toolName);
|
||||
}
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
for (Map.Entry<Object, Object> entry : configProps.entrySet()) {
|
||||
conf.set(entry.getKey().toString(), entry.getValue().toString());
|
||||
}
|
||||
|
||||
SqoopOptions opts = new SqoopOptions();
|
||||
opts.setConf(conf);
|
||||
opts.loadProperties(sqoopOptProps);
|
||||
|
||||
// Set the job connection information for this job.
|
||||
opts.setJobName(jobName);
|
||||
opts.setStorageDescriptor(connectedDescriptor);
|
||||
|
||||
return new JobData(opts, tool);
|
||||
} catch (SQLException sqlE) {
|
||||
throw new IOException("Error communicating with database", sqlE);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean jobExists(String jobName) throws SQLException {
|
||||
PreparedStatement s = connection.prepareStatement(
|
||||
"SELECT COUNT(job_name) FROM " + this.jobTableName
|
||||
+ " WHERE job_name = ? GROUP BY job_name");
|
||||
ResultSet rs = null;
|
||||
try {
|
||||
s.setString(1, jobName);
|
||||
rs = s.executeQuery();
|
||||
if (rs.next()) {
|
||||
return true; // We got a result, meaning the job exists.
|
||||
}
|
||||
} finally {
|
||||
if (null != rs) {
|
||||
try {
|
||||
rs.close();
|
||||
} catch (SQLException sqlE) {
|
||||
LOG.warn("Error closing result set: " + sqlE);
|
||||
}
|
||||
}
|
||||
|
||||
s.close();
|
||||
}
|
||||
|
||||
return false; // No result.
|
||||
}
|
||||
|
||||
@Override
|
||||
/** {@inheritDoc} */
|
||||
public void delete(String jobName) throws IOException {
|
||||
try {
|
||||
if (!jobExists(jobName)) {
|
||||
LOG.error("No such job: " + jobName);
|
||||
} else {
|
||||
LOG.debug("Deleting job: " + jobName);
|
||||
PreparedStatement s = connection.prepareStatement("DELETE FROM "
|
||||
+ this.jobTableName + " WHERE job_name = ?");
|
||||
try {
|
||||
s.setString(1, jobName);
|
||||
s.executeUpdate();
|
||||
} finally {
|
||||
s.close();
|
||||
}
|
||||
connection.commit();
|
||||
}
|
||||
} catch (SQLException sqlEx) {
|
||||
try {
|
||||
connection.rollback();
|
||||
} catch (SQLException e2) {
|
||||
LOG.warn("Error rolling back transaction in error handler: " + e2);
|
||||
}
|
||||
throw new IOException("Error communicating with database", sqlEx);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
/** {@inheritDoc} */
|
||||
public void create(String jobName, JobData data)
|
||||
throws IOException {
|
||||
try {
|
||||
if (jobExists(jobName)) {
|
||||
LOG.error("Cannot create job " + jobName
|
||||
+ ": it already exists");
|
||||
throw new IOException("Job " + jobName + " already exists");
|
||||
}
|
||||
} catch (SQLException sqlE) {
|
||||
throw new IOException("Error communicating with database", sqlE);
|
||||
}
|
||||
|
||||
createInternal(jobName, data);
|
||||
}
|
||||
|
||||
/**
|
||||
* Actually insert/update the resources for this job.
|
||||
*/
|
||||
private void createInternal(String jobName, JobData data)
|
||||
throws IOException {
|
||||
try {
|
||||
LOG.debug("Creating job: " + jobName);
|
||||
|
||||
// Save the name of the Sqoop tool.
|
||||
setV0Property(jobName, PROPERTY_CLASS_SCHEMA, SQOOP_TOOL_KEY,
|
||||
data.getSqoopTool().getToolName());
|
||||
|
||||
// Save the property set id.
|
||||
setV0Property(jobName, PROPERTY_CLASS_SCHEMA, PROPERTY_SET_KEY,
|
||||
CUR_PROPERTY_SET_ID);
|
||||
|
||||
// Save all properties of the SqoopOptions.
|
||||
Properties props = data.getSqoopOptions().writeProperties();
|
||||
setV0Properties(jobName, PROPERTY_CLASS_SQOOP_OPTIONS, props);
|
||||
|
||||
// And save all unique properties of the configuration.
|
||||
Configuration saveConf = data.getSqoopOptions().getConf();
|
||||
Configuration baseConf = new Configuration();
|
||||
|
||||
for (Map.Entry<String, String> entry : saveConf) {
|
||||
String key = entry.getKey();
|
||||
String rawVal = saveConf.getRaw(key);
|
||||
String baseVal = baseConf.getRaw(key);
|
||||
if (baseVal != null && rawVal.equals(baseVal)) {
|
||||
continue; // Don't save this; it's set in the base configuration.
|
||||
}
|
||||
|
||||
LOG.debug("Saving " + key + " => " + rawVal + " / " + baseVal);
|
||||
setV0Property(jobName, PROPERTY_CLASS_CONFIG, key, rawVal);
|
||||
}
|
||||
|
||||
connection.commit();
|
||||
} catch (SQLException sqlE) {
|
||||
try {
|
||||
connection.rollback();
|
||||
} catch (SQLException sqlE2) {
|
||||
LOG.warn("Exception rolling back transaction during error handling: "
|
||||
+ sqlE2);
|
||||
}
|
||||
throw new IOException("Error communicating with database", sqlE);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
/** {@inheritDoc} */
|
||||
public void update(String jobName, JobData data)
|
||||
throws IOException {
|
||||
try {
|
||||
if (!jobExists(jobName)) {
|
||||
LOG.error("Cannot update job " + jobName + ": not found");
|
||||
throw new IOException("Job " + jobName + " does not exist");
|
||||
}
|
||||
} catch (SQLException sqlE) {
|
||||
throw new IOException("Error communicating with database", sqlE);
|
||||
}
|
||||
|
||||
// Since we set properties with update-or-insert, this is the same
|
||||
// as create on this system.
|
||||
createInternal(jobName, data);
|
||||
}
|
||||
|
||||
@Override
|
||||
/** {@inheritDoc} */
|
||||
public List<String> list() throws IOException {
|
||||
ResultSet rs = null;
|
||||
try {
|
||||
PreparedStatement s = connection.prepareStatement(
|
||||
"SELECT DISTINCT job_name FROM " + this.jobTableName);
|
||||
try {
|
||||
rs = s.executeQuery();
|
||||
ArrayList<String> jobs = new ArrayList<String>();
|
||||
while (rs.next()) {
|
||||
jobs.add(rs.getString(1));
|
||||
}
|
||||
|
||||
return jobs;
|
||||
} finally {
|
||||
if (null != rs) {
|
||||
try {
|
||||
rs.close();
|
||||
} catch (SQLException sqlE) {
|
||||
LOG.warn("Error closing resultset: " + sqlE);
|
||||
}
|
||||
}
|
||||
|
||||
if (null != s) {
|
||||
s.close();
|
||||
}
|
||||
}
|
||||
} catch (SQLException sqlE) {
|
||||
throw new IOException("Error communicating with database", sqlE);
|
||||
}
|
||||
}
|
||||
|
||||
// Determine the name to use for the root metadata table.
|
||||
private String getRootTableName() {
|
||||
Configuration conf = getConf();
|
||||
return conf.get(ROOT_TABLE_NAME_KEY, DEFAULT_ROOT_TABLE_NAME);
|
||||
}
|
||||
|
||||
private boolean tableExists(String table) throws SQLException {
|
||||
LOG.debug("Checking for table: " + table);
|
||||
DatabaseMetaData dbmd = connection.getMetaData();
|
||||
String [] tableTypes = { "TABLE" };
|
||||
ResultSet rs = dbmd.getTables(null, null, null, tableTypes);
|
||||
if (null != rs) {
|
||||
try {
|
||||
while (rs.next()) {
|
||||
if (table.equalsIgnoreCase(rs.getString("TABLE_NAME"))) {
|
||||
LOG.debug("Found table: " + table);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
rs.close();
|
||||
}
|
||||
}
|
||||
|
||||
LOG.debug("Could not find table.");
|
||||
return false;
|
||||
}
|
||||
|
||||
private boolean rootTableExists() throws SQLException {
|
||||
String rootTableName = getRootTableName();
|
||||
return tableExists(rootTableName);
|
||||
}
|
||||
|
||||
private void createRootTable() throws SQLException {
|
||||
String rootTableName = getRootTableName();
|
||||
LOG.debug("Creating root table: " + rootTableName);
|
||||
|
||||
// TODO: Sanity-check the value of rootTableName to ensure it is
|
||||
// not a SQL-injection attack vector.
|
||||
Statement s = connection.createStatement();
|
||||
try {
|
||||
s.executeUpdate("CREATE TABLE " + rootTableName + " ("
|
||||
+ "version INT, "
|
||||
+ "propname VARCHAR(128) NOT NULL, "
|
||||
+ "propval VARCHAR(256), "
|
||||
+ "CONSTRAINT " + rootTableName + "_unq UNIQUE (version, propname))");
|
||||
} finally {
|
||||
s.close();
|
||||
}
|
||||
|
||||
setRootProperty(STORAGE_VERSION_KEY, null,
|
||||
Integer.toString(CUR_STORAGE_VERSION));
|
||||
|
||||
LOG.debug("Saving root table.");
|
||||
connection.commit();
|
||||
}
|
||||
|
||||
/**
|
||||
* Look up a value for the specified version (may be null) in the
|
||||
* root metadata table.
|
||||
*/
|
||||
private String getRootProperty(String propertyName, Integer version)
|
||||
throws SQLException {
|
||||
LOG.debug("Looking up property " + propertyName + " for version "
|
||||
+ version);
|
||||
PreparedStatement s = null;
|
||||
ResultSet rs = null;
|
||||
|
||||
try {
|
||||
if (null == version) {
|
||||
s = connection.prepareStatement(
|
||||
"SELECT propval FROM " + getRootTableName()
|
||||
+ " WHERE version IS NULL AND propname = ?");
|
||||
s.setString(1, propertyName);
|
||||
} else {
|
||||
s = connection.prepareStatement(
|
||||
"SELECT propval FROM " + getRootTableName() + " WHERE version = ? "
|
||||
+ " AND propname = ?");
|
||||
s.setInt(1, version);
|
||||
s.setString(2, propertyName);
|
||||
}
|
||||
|
||||
rs = s.executeQuery();
|
||||
if (!rs.next()) {
|
||||
LOG.debug(" => (no result)");
|
||||
return null; // No such result.
|
||||
} else {
|
||||
String result = rs.getString(1); // Return the only result col.
|
||||
LOG.debug(" => " + result);
|
||||
return result;
|
||||
}
|
||||
} finally {
|
||||
if (null != rs) {
|
||||
try {
|
||||
rs.close();
|
||||
} catch (SQLException sqlE) {
|
||||
LOG.warn("Error closing resultset: " + sqlE);
|
||||
}
|
||||
}
|
||||
|
||||
if (null != s) {
|
||||
s.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Set a value for the specified version (may be null) in the root
|
||||
* metadata table.
|
||||
*/
|
||||
private void setRootProperty(String propertyName, Integer version,
|
||||
String val) throws SQLException {
|
||||
LOG.debug("Setting property " + propertyName + " for version "
|
||||
+ version + " => " + val);
|
||||
|
||||
PreparedStatement s;
|
||||
String curVal = getRootProperty(propertyName, version);
|
||||
if (null == curVal) {
|
||||
// INSERT the row.
|
||||
s = connection.prepareStatement("INSERT INTO " + getRootTableName()
|
||||
+ " (propval, propname, version) VALUES ( ? , ? , ? )");
|
||||
} else if (version == null) {
|
||||
// UPDATE an existing row with a null version
|
||||
s = connection.prepareStatement("UPDATE " + getRootTableName()
|
||||
+ " SET propval = ? WHERE propname = ? AND version IS NULL");
|
||||
} else {
|
||||
// UPDATE an existing row with non-null version.
|
||||
s = connection.prepareStatement("UPDATE " + getRootTableName()
|
||||
+ " SET propval = ? WHERE propname = ? AND version = ?");
|
||||
}
|
||||
|
||||
try {
|
||||
s.setString(1, val);
|
||||
s.setString(2, propertyName);
|
||||
if (null != version) {
|
||||
s.setInt(3, version);
|
||||
}
|
||||
s.executeUpdate();
|
||||
} finally {
|
||||
s.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the jobs table in the V0 schema.
|
||||
*/
|
||||
private void createJobTable() throws SQLException {
|
||||
String curTableName = DEFAULT_SESSION_TABLE_NAME;
|
||||
int tableNum = -1;
|
||||
while (true) {
|
||||
if (tableExists(curTableName)) {
|
||||
tableNum++;
|
||||
curTableName = DEFAULT_SESSION_TABLE_NAME + "_" + tableNum;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// curTableName contains a table name that does not exist.
|
||||
// Create this table.
|
||||
LOG.debug("Creating job storage table: " + curTableName);
|
||||
Statement s = connection.createStatement();
|
||||
try {
|
||||
s.executeUpdate("CREATE TABLE " + curTableName + " ("
|
||||
+ "job_name VARCHAR(64) NOT NULL, "
|
||||
+ "propname VARCHAR(128) NOT NULL, "
|
||||
+ "propval VARCHAR(1024), "
|
||||
+ "propclass VARCHAR(32) NOT NULL, "
|
||||
+ "CONSTRAINT " + curTableName + "_unq UNIQUE "
|
||||
+ "(job_name, propname, propclass))");
|
||||
|
||||
// Then set a property in the root table pointing to it.
|
||||
setRootProperty(SESSION_TABLE_KEY, 0, curTableName);
|
||||
connection.commit();
|
||||
} finally {
|
||||
s.close();
|
||||
}
|
||||
|
||||
this.jobTableName = curTableName;
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a root schema that exists,
|
||||
* initialize a version-0 key/value storage schema on top of it,
|
||||
* if it does not already exist.
|
||||
*/
|
||||
private void initV0Schema() throws SQLException {
|
||||
this.jobTableName = getRootProperty(SESSION_TABLE_KEY, 0);
|
||||
if (null == this.jobTableName) {
|
||||
createJobTable();
|
||||
}
|
||||
if (!tableExists(this.jobTableName)) {
|
||||
LOG.debug("Could not find job table: " + jobTableName);
|
||||
createJobTable();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* INSERT or UPDATE a single (job, propname, class) to point
|
||||
* to the specified property value.
|
||||
*/
|
||||
private void setV0Property(String jobName, String propClass,
|
||||
String propName, String propVal) throws SQLException {
|
||||
LOG.debug("Job: " + jobName + "; Setting property "
|
||||
+ propName + " with class " + propClass + " => " + propVal);
|
||||
|
||||
PreparedStatement s = null;
|
||||
try {
|
||||
String curValue = getV0Property(jobName, propClass, propName);
|
||||
if (null == curValue) {
|
||||
// Property is not yet set.
|
||||
s = connection.prepareStatement("INSERT INTO " + this.jobTableName
|
||||
+ " (propval, job_name, propclass, propname) "
|
||||
+ "VALUES (?, ?, ?, ?)");
|
||||
} else {
|
||||
// Overwrite existing property.
|
||||
s = connection.prepareStatement("UPDATE " + this.jobTableName
|
||||
+ " SET propval = ? WHERE job_name = ? AND propclass = ? "
|
||||
+ "AND propname = ?");
|
||||
}
|
||||
|
||||
s.setString(1, propVal);
|
||||
s.setString(2, jobName);
|
||||
s.setString(3, propClass);
|
||||
s.setString(4, propName);
|
||||
|
||||
s.executeUpdate();
|
||||
} finally {
|
||||
if (null != s) {
|
||||
s.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a string containing the value of a specified property,
|
||||
* or null if it is not set.
|
||||
*/
|
||||
private String getV0Property(String jobName, String propClass,
|
||||
String propertyName) throws SQLException {
|
||||
LOG.debug("Job: " + jobName + "; Getting property "
|
||||
+ propertyName + " with class " + propClass);
|
||||
|
||||
ResultSet rs = null;
|
||||
PreparedStatement s = connection.prepareStatement(
|
||||
"SELECT propval FROM " + this.jobTableName
|
||||
+ " WHERE job_name = ? AND propclass = ? AND propname = ?");
|
||||
|
||||
try {
|
||||
s.setString(1, jobName);
|
||||
s.setString(2, propClass);
|
||||
s.setString(3, propertyName);
|
||||
rs = s.executeQuery();
|
||||
|
||||
if (!rs.next()) {
|
||||
LOG.debug(" => (no result)");
|
||||
return null;
|
||||
}
|
||||
|
||||
String result = rs.getString(1);
|
||||
LOG.debug(" => " + result);
|
||||
return result;
|
||||
} finally {
|
||||
if (null != rs) {
|
||||
try {
|
||||
rs.close();
|
||||
} catch (SQLException sqlE) {
|
||||
LOG.warn("Error closing resultset: " + sqlE);
|
||||
}
|
||||
}
|
||||
|
||||
s.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a java.util.Properties containing all propName -> propVal
|
||||
* bindings for a given (jobName, propClass).
|
||||
*/
|
||||
private Properties getV0Properties(String jobName, String propClass)
|
||||
throws SQLException {
|
||||
LOG.debug("Job: " + jobName
|
||||
+ "; Getting properties with class " + propClass);
|
||||
|
||||
ResultSet rs = null;
|
||||
PreparedStatement s = connection.prepareStatement(
|
||||
"SELECT propname, propval FROM " + this.jobTableName
|
||||
+ " WHERE job_name = ? AND propclass = ?");
|
||||
try {
|
||||
s.setString(1, jobName);
|
||||
s.setString(2, propClass);
|
||||
rs = s.executeQuery();
|
||||
|
||||
Properties p = new Properties();
|
||||
while (rs.next()) {
|
||||
p.setProperty(rs.getString(1), rs.getString(2));
|
||||
}
|
||||
|
||||
return p;
|
||||
} finally {
|
||||
if (null != rs) {
|
||||
try {
|
||||
rs.close();
|
||||
} catch (SQLException sqlE) {
|
||||
LOG.warn("Error closing result set: " + sqlE);
|
||||
}
|
||||
}
|
||||
|
||||
s.close();
|
||||
}
|
||||
}
|
||||
|
||||
private void setV0Properties(String jobName, String propClass,
|
||||
Properties properties) throws SQLException {
|
||||
LOG.debug("Job: " + jobName
|
||||
+ "; Setting bulk properties for class " + propClass);
|
||||
|
||||
for (Map.Entry<Object, Object> entry : properties.entrySet()) {
|
||||
String key = entry.getKey().toString();
|
||||
String val = entry.getValue().toString();
|
||||
setV0Property(jobName, propClass, key, val);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
180
src/java/org/apache/sqoop/metastore/hsqldb/HsqldbMetaStore.java
Normal file
180
src/java/org/apache/sqoop/metastore/hsqldb/HsqldbMetaStore.java
Normal file
@ -0,0 +1,180 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.sqoop.metastore.hsqldb;
|
||||
|
||||
import java.io.File;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
import org.hsqldb.Server;
|
||||
import org.hsqldb.ServerConstants;
|
||||
|
||||
import com.cloudera.sqoop.SqoopOptions;
|
||||
|
||||
import com.cloudera.sqoop.manager.HsqldbManager;
|
||||
|
||||
/**
|
||||
* Container for an HSQLDB-backed metastore.
|
||||
*/
|
||||
public class HsqldbMetaStore {
|
||||
|
||||
public static final Log LOG = LogFactory.getLog(
|
||||
HsqldbMetaStore.class.getName());
|
||||
|
||||
/** Where on the local fs does the metastore put files? */
|
||||
public static final String META_STORAGE_LOCATION_KEY =
|
||||
"sqoop.metastore.server.location";
|
||||
|
||||
/**
|
||||
* What port does the metastore listen on?
|
||||
*/
|
||||
public static final String META_SERVER_PORT_KEY =
|
||||
"sqoop.metastore.server.port";
|
||||
|
||||
/** Default to this port if unset. */
|
||||
public static final int DEFAULT_PORT = 16000;
|
||||
|
||||
private int port;
|
||||
private String fileLocation;
|
||||
private Server server;
|
||||
private Configuration conf;
|
||||
|
||||
public HsqldbMetaStore(Configuration config) {
|
||||
this.conf = config;
|
||||
init();
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine the user's home directory and return a file path
|
||||
* under this root where the shared metastore can be placed.
|
||||
*/
|
||||
private String getHomeDirFilePath() {
|
||||
String homeDir = System.getProperty("user.home");
|
||||
|
||||
File homeDirObj = new File(homeDir);
|
||||
File sqoopDataDirObj = new File(homeDirObj, ".sqoop");
|
||||
File databaseFileObj = new File(sqoopDataDirObj, "shared-metastore.db");
|
||||
|
||||
return databaseFileObj.toString();
|
||||
}
|
||||
|
||||
private void init() {
|
||||
if (null != server) {
|
||||
LOG.debug("init(): server already exists.");
|
||||
return;
|
||||
}
|
||||
|
||||
fileLocation = conf.get(META_STORAGE_LOCATION_KEY, null);
|
||||
if (null == fileLocation) {
|
||||
fileLocation = getHomeDirFilePath();
|
||||
LOG.warn("The location for metastore data has not been explicitly set. "
|
||||
+ "Placing shared metastore files in " + fileLocation);
|
||||
}
|
||||
|
||||
this.port = conf.getInt(META_SERVER_PORT_KEY, DEFAULT_PORT);
|
||||
}
|
||||
|
||||
|
||||
public void start() {
|
||||
try {
|
||||
if (server != null) {
|
||||
server.checkRunning(false);
|
||||
}
|
||||
} catch (RuntimeException re) {
|
||||
LOG.info("Server is already started.");
|
||||
return;
|
||||
}
|
||||
|
||||
server = new Server();
|
||||
server.setDatabasePath(0, "file:" + fileLocation);
|
||||
server.setDatabaseName(0, "sqoop");
|
||||
server.putPropertiesFromString("hsqldb.write_delay=false");
|
||||
server.setPort(port);
|
||||
server.setSilent(true);
|
||||
server.setNoSystemExit(true);
|
||||
|
||||
server.start();
|
||||
LOG.info("Server started on port " + port + " with protocol "
|
||||
+ server.getProtocol());
|
||||
}
|
||||
|
||||
/**
|
||||
* Blocks the current thread until the server is shut down.
|
||||
*/
|
||||
public void waitForServer() {
|
||||
while (true) {
|
||||
int curState = server.getState();
|
||||
if (curState == ServerConstants.SERVER_STATE_SHUTDOWN) {
|
||||
LOG.info("Got shutdown notification");
|
||||
break;
|
||||
}
|
||||
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException ie) {
|
||||
LOG.info("Interrupted while blocking for server:"
|
||||
+ StringUtils.stringifyException(ie));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Connects to the server and instructs it to shutdown.
|
||||
*/
|
||||
public void shutdown() {
|
||||
// Send the SHUTDOWN command to the server via SQL.
|
||||
SqoopOptions options = new SqoopOptions(conf);
|
||||
options.setConnectString("jdbc:hsqldb:hsql://localhost:"
|
||||
+ port + "/sqoop");
|
||||
options.setUsername("SA");
|
||||
options.setPassword("");
|
||||
HsqldbManager manager = new HsqldbManager(options);
|
||||
Statement s = null;
|
||||
try {
|
||||
Connection c = manager.getConnection();
|
||||
s = c.createStatement();
|
||||
s.execute("SHUTDOWN");
|
||||
} catch (SQLException sqlE) {
|
||||
LOG.warn("Exception shutting down database: "
|
||||
+ StringUtils.stringifyException(sqlE));
|
||||
} finally {
|
||||
if (null != s) {
|
||||
try {
|
||||
s.close();
|
||||
} catch (SQLException sqlE) {
|
||||
LOG.warn("Error closing statement: " + sqlE);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
manager.close();
|
||||
} catch (SQLException sqlE) {
|
||||
LOG.warn("Error closing manager: " + sqlE);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user