5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-03 02:51:00 +08:00

SQOOP-77. Rename saved sessions to saved jobs.

From: Aaron Kimball <aaron@cloudera.com>

git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1149967 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Bayer 2011-07-22 20:04:13 +00:00
parent e825dc560c
commit 36f93eac1d
21 changed files with 437 additions and 442 deletions

View File

@ -54,7 +54,7 @@
<name>sqoop.metastore.client.enable.autoconnect</name>
<value>false</value>
<description>If true, Sqoop will connect to a local metastore
for session management when no other metastore arguments are
for job management when no other metastore arguments are
provided.
</description>
</property>
@ -70,7 +70,7 @@
<name>sqoop.metastore.client.autoconnect.url</name>
<value>jdbc:hsqldb:file:/tmp/sqoop-meta/meta.db;shutdown=true</value>
<description>The connect string to use when connecting to a
session-management metastore. If unspecified, uses ~/.sqoop/.
job-management metastore. If unspecified, uses ~/.sqoop/.
You can specify a different path here.
</description>
</property>
@ -90,7 +90,7 @@
<!--
For security reasons, by default your database password will not be stored in
the Sqoop metastore. When executing a saved session, you will need to
the Sqoop metastore. When executing a saved job, you will need to
reenter the database password. Uncomment this setting to enable saved
password storage. (INSECURE!)
-->

View File

@ -38,7 +38,7 @@
import com.cloudera.sqoop.manager.ConnManager;
import com.cloudera.sqoop.manager.DefaultManagerFactory;
import com.cloudera.sqoop.manager.ManagerFactory;
import com.cloudera.sqoop.metastore.SessionData;
import com.cloudera.sqoop.metastore.JobData;
import com.cloudera.sqoop.util.ClassLoaderStack;
@ -105,7 +105,7 @@ private void instantiateFactories(Configuration conf) {
* @return a ConnManager instance for the appropriate database.
* @throws IOException if it cannot find a ConnManager for this schema.
*/
public ConnManager getManager(SessionData data) throws IOException {
public ConnManager getManager(JobData data) throws IOException {
// Try all the available manager factories.
for (ManagerFactory factory : factories) {
LOG.debug("Trying ManagerFactory: " + factory.getClass().getName());

View File

@ -196,13 +196,13 @@ public enum IncrementalMode {
private String incrementalLastValue;
// These next two fields are not serialized to the metastore.
// If this SqoopOptions is created by reading a saved session, these will
// be populated by the SessionStorage to facilitate updating the same
// session.
private String sessionName;
private Map<String, String> sessionStorageDescriptor;
// If this SqoopOptions is created by reading a saved job, these will
// be populated by the JobStorage to facilitate updating the same
// job.
private String jobName;
private Map<String, String> jobStorageDescriptor;
// If we restore a session and then allow the user to apply arguments on
// If we restore a job and then allow the user to apply arguments on
// top, we retain the version without the arguments in a reference to the
// 'parent' SqoopOptions instance, here.
private SqoopOptions parent;
@ -449,7 +449,7 @@ public void loadProperties(Properties props) {
/**
* Return a Properties instance that encapsulates all the "sticky"
* state of this SqoopOptions that should be written to a metastore
* to restore the session later.
* to restore the job later.
*/
public Properties writeProperties() {
Properties props = new Properties();
@ -1468,33 +1468,33 @@ public String getIncrementalLastValue() {
}
/**
* Set the name of the saved session this SqoopOptions belongs to.
* Set the name of the saved job this SqoopOptions belongs to.
*/
public void setSessionName(String session) {
this.sessionName = session;
public void setJobName(String job) {
this.jobName = job;
}
/**
* Get the name of the saved session this SqoopOptions belongs to.
* Get the name of the saved job this SqoopOptions belongs to.
*/
public String getSessionName() {
return this.sessionName;
public String getJobName() {
return this.jobName;
}
/**
* Set the SessionStorage descriptor used to open the saved session
* Set the JobStorage descriptor used to open the saved job
* this SqoopOptions belongs to.
*/
public void setStorageDescriptor(Map<String, String> descriptor) {
this.sessionStorageDescriptor = descriptor;
this.jobStorageDescriptor = descriptor;
}
/**
* Get the SessionStorage descriptor used to open the saved session
* Get the JobStorage descriptor used to open the saved job
* this SqoopOptions belongs to.
*/
public Map<String, String> getStorageDescriptor() {
return this.sessionStorageDescriptor;
return this.jobStorageDescriptor;
}
/**

View File

@ -22,7 +22,7 @@
import org.apache.commons.logging.LogFactory;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.metastore.SessionData;
import com.cloudera.sqoop.metastore.JobData;
/**
* Contains instantiation code for all ConnManager implementations
@ -33,7 +33,7 @@ public final class DefaultManagerFactory extends ManagerFactory {
public static final Log LOG = LogFactory.getLog(
DefaultManagerFactory.class.getName());
public ConnManager accept(SessionData data) {
public ConnManager accept(JobData data) {
SqoopOptions options = data.getSqoopOptions();
String manualDriver = options.getDriverClassName();
if (manualDriver != null) {

View File

@ -19,7 +19,7 @@
package com.cloudera.sqoop.manager;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.metastore.SessionData;
import com.cloudera.sqoop.metastore.JobData;
/**
* Interface for factory classes for ConnManager implementations.
@ -30,23 +30,23 @@
*/
public abstract class ManagerFactory {
@Deprecated
/** Do not use accept(SqoopOptions). Use accept(SessionData) instead. */
/** Do not use accept(SqoopOptions). Use accept(JobData) instead. */
public ConnManager accept(SqoopOptions options) {
throw new RuntimeException(
"Deprecated method; override ManagerFactory.accept(SessionData)");
"Deprecated method; override ManagerFactory.accept(JobData)");
}
/**
* Instantiate a ConnManager that can fulfill the database connection
* requirements of the task specified in the SessionData.
* @param sessionData the user-provided arguments that configure this
* requirements of the task specified in the JobData.
* @param jobData the user-provided arguments that configure this
* Sqoop job.
* @return a ConnManager that can connect to the specified database
* and perform the operations required, or null if this factory cannot
* find a suitable ConnManager implementation.
*/
public ConnManager accept(SessionData sessionData) {
return accept(sessionData.getSqoopOptions());
public ConnManager accept(JobData jobData) {
return accept(jobData.getSqoopOptions());
}
}

View File

@ -22,17 +22,17 @@
import com.cloudera.sqoop.tool.SqoopTool;
/**
* Container for all session data that should be stored to a
* Container for all job data that should be stored to a
* permanent resource.
*/
public class SessionData {
public class JobData {
private SqoopOptions opts;
private SqoopTool tool;
public SessionData() {
public JobData() {
}
public SessionData(SqoopOptions options, SqoopTool sqoopTool) {
public JobData(SqoopOptions options, SqoopTool sqoopTool) {
this.opts = options;
this.tool = sqoopTool;
}

View File

@ -26,21 +26,21 @@
import org.apache.hadoop.conf.Configured;
/**
* API that defines how sessions are saved, restored, and manipulated.
* API that defines how jobs are saved, restored, and manipulated.
*
* <p>
* SessionStorage instances may be created and then not used; the
* SessionStorage factory may create additional SessionStorage instances
* JobStorage instances may be created and then not used; the
* JobStorage factory may create additional JobStorage instances
* that return false from accept() and then discard them. The close()
* method will only be triggered for a SessionStorage if the connect()
* method will only be triggered for a JobStorage if the connect()
* method is called. Connection should not be triggered by a call to
* accept().</p>
*/
public abstract class SessionStorage extends Configured implements Closeable {
public abstract class JobStorage extends Configured implements Closeable {
/**
* Returns true if the SessionStorage system can use the metadata in
* the descriptor to connect to an underlying session resource.
* Returns true if the JobStorage system can use the metadata in
* the descriptor to connect to an underlying storage resource.
*/
public abstract boolean canAccept(Map<String, String> descriptor);
@ -53,41 +53,40 @@ public abstract void open(Map<String, String> descriptor)
throws IOException;
/**
* Given a session name, reconstitute a SessionData that contains all
* configuration information required for the session. Returns null if the
* session name does not match an available session.
* Given a job name, reconstitute a JobData that contains all
* configuration information required for the job. Returns null if the
* job name does not match an available job.
*/
public abstract SessionData read(String sessionName)
public abstract JobData read(String jobName)
throws IOException;
/**
* Forget about a saved session.
* Forget about a saved job.
*/
public abstract void delete(String sessionName) throws IOException;
public abstract void delete(String jobName) throws IOException;
/**
* Given a session name and the data describing a configured
* session, record the session information to the storage medium.
* Given a job name and the data describing a configured job, record the job
* information to the storage medium.
*/
public abstract void create(String sessionName, SessionData data)
public abstract void create(String jobName, JobData data)
throws IOException;
/**
* Given a session descriptor and a configured session
* update the underlying resource to match the current session
* configuration.
* Given a job name and configured job data, update the underlying resource
* to match the current job configuration.
*/
public abstract void update(String sessionName, SessionData data)
public abstract void update(String jobName, JobData data)
throws IOException;
/**
* Close any resources opened by the SessionStorage system.
* Close any resources opened by the JobStorage system.
*/
public void close() throws IOException {
}
/**
* Enumerate all sessions held in the connected resource.
* Enumerate all jobs held in the connected resource.
*/
public abstract List<String> list() throws IOException;
}

View File

@ -24,26 +24,26 @@
import org.apache.hadoop.conf.Configuration;
/**
* Factory that produces the correct SessionStorage system to work with
* a particular session descriptor.
* Factory that produces the correct JobStorage system to work with
* a particular job descriptor.
*/
public class SessionStorageFactory {
public class JobStorageFactory {
private Configuration conf;
/**
* Configuration key describing the list of SessionStorage implementations
* to use to handle sessions.
* Configuration key describing the list of JobStorage implementations
* to use to handle jobs.
*/
public static final String AVAILABLE_STORAGES_KEY =
"sqoop.session.storage.implementations";
"sqoop.job.storage.implementations";
/** The default list of available SessionStorage implementations. */
/** The default list of available JobStorage implementations. */
private static final String DEFAULT_AVAILABLE_STORAGES =
"com.cloudera.sqoop.metastore.hsqldb.HsqldbSessionStorage,"
"com.cloudera.sqoop.metastore.hsqldb.HsqldbJobStorage,"
+ "com.cloudera.sqoop.metastore.hsqldb.AutoHsqldbStorage";
public SessionStorageFactory(Configuration config) {
public JobStorageFactory(Configuration config) {
this.conf = config;
// Ensure that we always have an available storages list.
@ -53,14 +53,14 @@ public SessionStorageFactory(Configuration config) {
}
/**
* Given a session descriptor, determine the correct SessionStorage
* implementation to use to handle the session and return an instance
* of it -- or null if no SessionStorage instance is appropriate.
* Given a storage descriptor, determine the correct JobStorage
* implementation to use to connect to the storage resource and return an
* instance of it -- or null if no JobStorage instance is appropriate.
*/
public SessionStorage getSessionStorage(Map<String, String> descriptor) {
List<SessionStorage> storages = this.conf.getInstances(
AVAILABLE_STORAGES_KEY, SessionStorage.class);
for (SessionStorage stor : storages) {
public JobStorage getJobStorage(Map<String, String> descriptor) {
List<JobStorage> storages = this.conf.getInstances(
AVAILABLE_STORAGES_KEY, JobStorage.class);
for (JobStorage stor : storages) {
if (stor.canAccept(descriptor)) {
return stor;
}

View File

@ -29,10 +29,10 @@
import org.apache.hadoop.conf.Configuration;
/**
* SessionStorage implementation that auto-configures an HSQLDB
* local-file-based instance to hold sessions.
* JobStorage implementation that auto-configures an HSQLDB
* local-file-based instance to hold jobs.
*/
public class AutoHsqldbStorage extends HsqldbSessionStorage {
public class AutoHsqldbStorage extends HsqldbJobStorage {
public static final Log LOG = LogFactory.getLog(
AutoHsqldbStorage.class.getName());

View File

@ -39,20 +39,18 @@
import org.apache.hadoop.conf.Configuration;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.metastore.SessionData;
import com.cloudera.sqoop.metastore.SessionStorage;
import com.cloudera.sqoop.metastore.JobData;
import com.cloudera.sqoop.metastore.JobStorage;
import com.cloudera.sqoop.tool.SqoopTool;
/**
* SessionStorage implementation that uses an HSQLDB-backed database to
* hold session information.
* JobStorage implementation that uses an HSQLDB-backed database to
* hold job information.
*/
public class HsqldbSessionStorage extends SessionStorage {
public class HsqldbJobStorage extends JobStorage {
public static final Log LOG = LogFactory.getLog(
HsqldbSessionStorage.class.getName());
HsqldbJobStorage.class.getName());
/** descriptor key identifying the connect string for the metastore. */
public static final String META_CONNECT_KEY = "metastore.connect.string";
@ -77,20 +75,20 @@ public class HsqldbSessionStorage extends SessionStorage {
/** root metadata table key used to define the current schema version. */
private static final String STORAGE_VERSION_KEY =
"sqoop.hsqldb.session.storage.version";
"sqoop.hsqldb.job.storage.version";
/** The current version number for the schema edition. */
private static final int CUR_STORAGE_VERSION = 0;
/** root metadata table key used to define the session table name. */
/** root metadata table key used to define the job table name. */
private static final String SESSION_TABLE_KEY =
"sqoop.hsqldb.session.info.table";
"sqoop.hsqldb.job.info.table";
/** Default value for SESSION_TABLE_KEY. */
private static final String DEFAULT_SESSION_TABLE_NAME =
"SQOOP_SESSIONS";
/** Per-session key with propClass 'schema' that defines the set of
/** Per-job key with propClass 'schema' that defines the set of
* properties valid to be defined for propClass 'SqoopOptions'. */
private static final String PROPERTY_SET_KEY =
"sqoop.property.set.id";
@ -111,7 +109,7 @@ public class HsqldbSessionStorage extends SessionStorage {
private static final String PROPERTY_CLASS_CONFIG = "config";
/**
* Per-session key with propClass 'schema' that specifies the SqoopTool
* Per-job key with propClass 'schema' that specifies the SqoopTool
* to load.
*/
private static final String SQOOP_TOOL_KEY = "sqoop.tool";
@ -128,8 +126,8 @@ protected Connection getConnection() {
}
// After connection to the database and initialization of the
// schema, this holds the name of the session table.
private String sessionTableName;
// schema, this holds the name of the job table.
private String jobTableName;
protected void setMetastoreConnectStr(String connectStr) {
this.metastoreConnectStr = connectStr;
@ -252,23 +250,23 @@ public boolean canAccept(Map<String, String> descriptor) {
@Override
/** {@inheritDoc} */
public SessionData read(String sessionName) throws IOException {
public JobData read(String jobName) throws IOException {
try {
if (!sessionExists(sessionName)) {
LOG.error("Cannot restore session: " + sessionName);
LOG.error("(No such session)");
throw new IOException("Cannot restore missing session " + sessionName);
if (!jobExists(jobName)) {
LOG.error("Cannot restore job: " + jobName);
LOG.error("(No such job)");
throw new IOException("Cannot restore missing job " + jobName);
}
LOG.debug("Restoring session: " + sessionName);
Properties schemaProps = getV0Properties(sessionName,
LOG.debug("Restoring job: " + jobName);
Properties schemaProps = getV0Properties(jobName,
PROPERTY_CLASS_SCHEMA);
Properties sqoopOptProps = getV0Properties(sessionName,
Properties sqoopOptProps = getV0Properties(jobName,
PROPERTY_CLASS_SQOOP_OPTIONS);
Properties configProps = getV0Properties(sessionName,
Properties configProps = getV0Properties(jobName,
PROPERTY_CLASS_CONFIG);
// Check that we're not using a saved session from a previous
// Check that we're not using a saved job from a previous
// version whose functionality has been deprecated.
String thisPropSetId = schemaProps.getProperty(PROPERTY_SET_KEY);
LOG.debug("System property set: " + CUR_PROPERTY_SET_ID);
@ -289,7 +287,7 @@ public SessionData read(String sessionName) throws IOException {
SqoopTool tool = SqoopTool.getTool(toolName);
if (null == tool) {
throw new IOException("Error in session metadata: invalid tool "
throw new IOException("Error in job metadata: invalid tool "
+ toolName);
}
@ -302,26 +300,26 @@ public SessionData read(String sessionName) throws IOException {
opts.setConf(conf);
opts.loadProperties(sqoopOptProps);
// Set the session connection information for this session.
opts.setSessionName(sessionName);
// Set the job connection information for this job.
opts.setJobName(jobName);
opts.setStorageDescriptor(connectedDescriptor);
return new SessionData(opts, tool);
return new JobData(opts, tool);
} catch (SQLException sqlE) {
throw new IOException("Error communicating with database", sqlE);
}
}
private boolean sessionExists(String sessionName) throws SQLException {
private boolean jobExists(String jobName) throws SQLException {
PreparedStatement s = connection.prepareStatement(
"SELECT COUNT(session_name) FROM " + this.sessionTableName
+ " WHERE session_name = ? GROUP BY session_name");
"SELECT COUNT(job_name) FROM " + this.jobTableName
+ " WHERE job_name = ? GROUP BY job_name");
ResultSet rs = null;
try {
s.setString(1, sessionName);
s.setString(1, jobName);
rs = s.executeQuery();
if (rs.next()) {
return true; // We got a result, meaning the session exists.
return true; // We got a result, meaning the job exists.
}
} finally {
if (null != rs) {
@ -340,16 +338,16 @@ private boolean sessionExists(String sessionName) throws SQLException {
@Override
/** {@inheritDoc} */
public void delete(String sessionName) throws IOException {
public void delete(String jobName) throws IOException {
try {
if (!sessionExists(sessionName)) {
LOG.error("No such session: " + sessionName);
if (!jobExists(jobName)) {
LOG.error("No such job: " + jobName);
} else {
LOG.debug("Deleting session: " + sessionName);
LOG.debug("Deleting job: " + jobName);
PreparedStatement s = connection.prepareStatement("DELETE FROM "
+ this.sessionTableName + " WHERE session_name = ?");
+ this.jobTableName + " WHERE job_name = ?");
try {
s.setString(1, sessionName);
s.setString(1, jobName);
s.executeUpdate();
} finally {
s.close();
@ -368,40 +366,40 @@ public void delete(String sessionName) throws IOException {
@Override
/** {@inheritDoc} */
public void create(String sessionName, SessionData data)
public void create(String jobName, JobData data)
throws IOException {
try {
if (sessionExists(sessionName)) {
LOG.error("Cannot create session " + sessionName
if (jobExists(jobName)) {
LOG.error("Cannot create job " + jobName
+ ": it already exists");
throw new IOException("Session " + sessionName + " already exists");
throw new IOException("Job " + jobName + " already exists");
}
} catch (SQLException sqlE) {
throw new IOException("Error communicating with database", sqlE);
}
createInternal(sessionName, data);
createInternal(jobName, data);
}
/**
* Actually insert/update the resources for this session.
* Actually insert/update the resources for this job.
*/
private void createInternal(String sessionName, SessionData data)
private void createInternal(String jobName, JobData data)
throws IOException {
try {
LOG.debug("Creating session: " + sessionName);
LOG.debug("Creating job: " + jobName);
// Save the name of the Sqoop tool.
setV0Property(sessionName, PROPERTY_CLASS_SCHEMA, SQOOP_TOOL_KEY,
setV0Property(jobName, PROPERTY_CLASS_SCHEMA, SQOOP_TOOL_KEY,
data.getSqoopTool().getToolName());
// Save the property set id.
setV0Property(sessionName, PROPERTY_CLASS_SCHEMA, PROPERTY_SET_KEY,
setV0Property(jobName, PROPERTY_CLASS_SCHEMA, PROPERTY_SET_KEY,
CUR_PROPERTY_SET_ID);
// Save all properties of the SqoopOptions.
Properties props = data.getSqoopOptions().writeProperties();
setV0Properties(sessionName, PROPERTY_CLASS_SQOOP_OPTIONS, props);
setV0Properties(jobName, PROPERTY_CLASS_SQOOP_OPTIONS, props);
// And save all unique properties of the configuration.
Configuration saveConf = data.getSqoopOptions().getConf();
@ -416,7 +414,7 @@ private void createInternal(String sessionName, SessionData data)
}
LOG.debug("Saving " + key + " => " + rawVal + " / " + baseVal);
setV0Property(sessionName, PROPERTY_CLASS_CONFIG, key, rawVal);
setV0Property(jobName, PROPERTY_CLASS_CONFIG, key, rawVal);
}
connection.commit();
@ -433,12 +431,12 @@ private void createInternal(String sessionName, SessionData data)
@Override
/** {@inheritDoc} */
public void update(String sessionName, SessionData data)
public void update(String jobName, JobData data)
throws IOException {
try {
if (!sessionExists(sessionName)) {
LOG.error("Cannot update session " + sessionName + ": not found");
throw new IOException("Session " + sessionName + " does not exist");
if (!jobExists(jobName)) {
LOG.error("Cannot update job " + jobName + ": not found");
throw new IOException("Job " + jobName + " does not exist");
}
} catch (SQLException sqlE) {
throw new IOException("Error communicating with database", sqlE);
@ -446,7 +444,7 @@ public void update(String sessionName, SessionData data)
// Since we set properties with update-or-insert, this is the same
// as create on this system.
createInternal(sessionName, data);
createInternal(jobName, data);
}
@Override
@ -455,15 +453,15 @@ public List<String> list() throws IOException {
ResultSet rs = null;
try {
PreparedStatement s = connection.prepareStatement(
"SELECT DISTINCT session_name FROM " + this.sessionTableName);
"SELECT DISTINCT job_name FROM " + this.jobTableName);
try {
rs = s.executeQuery();
ArrayList<String> sessions = new ArrayList<String>();
ArrayList<String> jobs = new ArrayList<String>();
while (rs.next()) {
sessions.add(rs.getString(1));
jobs.add(rs.getString(1));
}
return sessions;
return jobs;
} finally {
if (null != rs) {
try {
@ -626,9 +624,9 @@ private void setRootProperty(String propertyName, Integer version,
}
/**
* Create the sessions table in the V0 schema.
* Create the jobs table in the V0 schema.
*/
private void createSessionTable() throws SQLException {
private void createJobTable() throws SQLException {
String curTableName = DEFAULT_SESSION_TABLE_NAME;
int tableNum = -1;
while (true) {
@ -642,16 +640,16 @@ private void createSessionTable() throws SQLException {
// curTableName contains a table name that does not exist.
// Create this table.
LOG.debug("Creating session storage table: " + curTableName);
LOG.debug("Creating job storage table: " + curTableName);
Statement s = connection.createStatement();
try {
s.executeUpdate("CREATE TABLE " + curTableName + " ("
+ "session_name VARCHAR(64) NOT NULL, "
+ "job_name VARCHAR(64) NOT NULL, "
+ "propname VARCHAR(128) NOT NULL, "
+ "propval VARCHAR(1024), "
+ "propclass VARCHAR(32) NOT NULL, "
+ "CONSTRAINT " + curTableName + "_unq UNIQUE "
+ "(session_name, propname, propclass))");
+ "(job_name, propname, propclass))");
// Then set a property in the root table pointing to it.
setRootProperty(SESSION_TABLE_KEY, 0, curTableName);
@ -660,7 +658,7 @@ private void createSessionTable() throws SQLException {
s.close();
}
this.sessionTableName = curTableName;
this.jobTableName = curTableName;
}
/**
@ -669,42 +667,42 @@ private void createSessionTable() throws SQLException {
* if it does not already exist.
*/
private void initV0Schema() throws SQLException {
this.sessionTableName = getRootProperty(SESSION_TABLE_KEY, 0);
if (null == this.sessionTableName) {
createSessionTable();
this.jobTableName = getRootProperty(SESSION_TABLE_KEY, 0);
if (null == this.jobTableName) {
createJobTable();
}
if (!tableExists(this.sessionTableName)) {
LOG.debug("Could not find session table: " + sessionTableName);
createSessionTable();
if (!tableExists(this.jobTableName)) {
LOG.debug("Could not find job table: " + jobTableName);
createJobTable();
}
}
/**
* INSERT or UPDATE a single (session, propname, class) to point
* INSERT or UPDATE a single (job, propname, class) to point
* to the specified property value.
*/
private void setV0Property(String sessionName, String propClass,
private void setV0Property(String jobName, String propClass,
String propName, String propVal) throws SQLException {
LOG.debug("Session: " + sessionName + "; Setting property "
LOG.debug("Job: " + jobName + "; Setting property "
+ propName + " with class " + propClass + " => " + propVal);
PreparedStatement s = null;
try {
String curValue = getV0Property(sessionName, propClass, propName);
String curValue = getV0Property(jobName, propClass, propName);
if (null == curValue) {
// Property is not yet set.
s = connection.prepareStatement("INSERT INTO " + this.sessionTableName
+ " (propval, session_name, propclass, propname) "
s = connection.prepareStatement("INSERT INTO " + this.jobTableName
+ " (propval, job_name, propclass, propname) "
+ "VALUES (?, ?, ?, ?)");
} else {
// Overwrite existing property.
s = connection.prepareStatement("UPDATE " + this.sessionTableName
+ " SET propval = ? WHERE session_name = ? AND propclass = ? "
s = connection.prepareStatement("UPDATE " + this.jobTableName
+ " SET propval = ? WHERE job_name = ? AND propclass = ? "
+ "AND propname = ?");
}
s.setString(1, propVal);
s.setString(2, sessionName);
s.setString(2, jobName);
s.setString(3, propClass);
s.setString(4, propName);
@ -720,18 +718,18 @@ private void setV0Property(String sessionName, String propClass,
* Return a string containing the value of a specified property,
* or null if it is not set.
*/
private String getV0Property(String sessionName, String propClass,
private String getV0Property(String jobName, String propClass,
String propertyName) throws SQLException {
LOG.debug("Session: " + sessionName + "; Getting property "
LOG.debug("Job: " + jobName + "; Getting property "
+ propertyName + " with class " + propClass);
ResultSet rs = null;
PreparedStatement s = connection.prepareStatement(
"SELECT propval FROM " + this.sessionTableName
+ " WHERE session_name = ? AND propclass = ? AND propname = ?");
"SELECT propval FROM " + this.jobTableName
+ " WHERE job_name = ? AND propclass = ? AND propname = ?");
try {
s.setString(1, sessionName);
s.setString(1, jobName);
s.setString(2, propClass);
s.setString(3, propertyName);
rs = s.executeQuery();
@ -759,19 +757,19 @@ private String getV0Property(String sessionName, String propClass,
/**
* Get a java.util.Properties containing all propName -&gt; propVal
* bindings for a given (sessionName, propClass).
* bindings for a given (jobName, propClass).
*/
private Properties getV0Properties(String sessionName, String propClass)
private Properties getV0Properties(String jobName, String propClass)
throws SQLException {
LOG.debug("Session: " + sessionName
LOG.debug("Job: " + jobName
+ "; Getting properties with class " + propClass);
ResultSet rs = null;
PreparedStatement s = connection.prepareStatement(
"SELECT propname, propval FROM " + this.sessionTableName
+ " WHERE session_name = ? AND propclass = ?");
"SELECT propname, propval FROM " + this.jobTableName
+ " WHERE job_name = ? AND propclass = ?");
try {
s.setString(1, sessionName);
s.setString(1, jobName);
s.setString(2, propClass);
rs = s.executeQuery();
@ -794,15 +792,15 @@ private Properties getV0Properties(String sessionName, String propClass)
}
}
private void setV0Properties(String sessionName, String propClass,
private void setV0Properties(String jobName, String propClass,
Properties properties) throws SQLException {
LOG.debug("Session: " + sessionName
LOG.debug("Job: " + jobName
+ "; Setting bulk properties for class " + propClass);
for (Map.Entry<Object, Object> entry : properties.entrySet()) {
String key = entry.getKey().toString();
String val = entry.getValue().toString();
setV0Property(sessionName, propClass, key, val);
setV0Property(jobName, propClass, key, val);
}
}
}

View File

@ -40,7 +40,7 @@
import com.cloudera.sqoop.cli.ToolOptions;
import com.cloudera.sqoop.lib.DelimiterSet;
import com.cloudera.sqoop.manager.ConnManager;
import com.cloudera.sqoop.metastore.SessionData;
import com.cloudera.sqoop.metastore.JobData;
import com.cloudera.sqoop.shims.ShimLoader;
/**
@ -127,13 +127,13 @@ public abstract class BaseSqoopTool extends SqoopTool {
public static final String HBASE_CREATE_TABLE_ARG = "hbase-create-table";
// Arguments for the session management system.
public static final String SESSION_METASTORE_ARG = "meta-connect";
public static final String SESSION_CMD_CREATE_ARG = "create";
public static final String SESSION_CMD_DELETE_ARG = "delete";
public static final String SESSION_CMD_EXEC_ARG = "exec";
public static final String SESSION_CMD_LIST_ARG = "list";
public static final String SESSION_CMD_SHOW_ARG = "show";
// Arguments for the saved job management system.
public static final String STORAGE_METASTORE_ARG = "meta-connect";
public static final String JOB_CMD_CREATE_ARG = "create";
public static final String JOB_CMD_DELETE_ARG = "delete";
public static final String JOB_CMD_EXEC_ARG = "exec";
public static final String JOB_CMD_LIST_ARG = "list";
public static final String JOB_CMD_SHOW_ARG = "show";
// Arguments for the metastore.
public static final String METASTORE_SHUTDOWN_ARG = "shutdown";
@ -168,7 +168,7 @@ protected boolean init(SqoopOptions sqoopOpts) {
// Get the connection to the database.
try {
SessionData data = new SessionData(sqoopOpts, this);
JobData data = new JobData(sqoopOpts, this);
this.manager = new ConnFactory(sqoopOpts.getConf()).getManager(data);
return true;
} catch (Exception e) {
@ -255,51 +255,51 @@ protected boolean hasUnrecognizedArgs(String [] argv) {
}
/**
* @return RelatedOptions used by session management tools.
* @return RelatedOptions used by job management tools.
*/
protected RelatedOptions getSessionOptions() {
protected RelatedOptions getJobOptions() {
RelatedOptions relatedOpts = new RelatedOptions(
"Session management arguments");
"Job management arguments");
relatedOpts.addOption(OptionBuilder.withArgName("jdbc-uri")
.hasArg()
.withDescription("Specify JDBC connect string for the metastore")
.withLongOpt(SESSION_METASTORE_ARG)
.withLongOpt(STORAGE_METASTORE_ARG)
.create());
// Create an option-group surrounding the operations a user
// can perform on sessions.
// can perform on jobs.
OptionGroup group = new OptionGroup();
group.addOption(OptionBuilder.withArgName("session-id")
group.addOption(OptionBuilder.withArgName("job-id")
.hasArg()
.withDescription("Create a new session")
.withLongOpt(SESSION_CMD_CREATE_ARG)
.withDescription("Create a new saved job")
.withLongOpt(JOB_CMD_CREATE_ARG)
.create());
group.addOption(OptionBuilder.withArgName("session-id")
group.addOption(OptionBuilder.withArgName("job-id")
.hasArg()
.withDescription("Delete a saved session")
.withLongOpt(SESSION_CMD_DELETE_ARG)
.withDescription("Delete a saved job")
.withLongOpt(JOB_CMD_DELETE_ARG)
.create());
group.addOption(OptionBuilder.withArgName("session-id")
group.addOption(OptionBuilder.withArgName("job-id")
.hasArg()
.withDescription("Show the parameters for a saved session")
.withLongOpt(SESSION_CMD_SHOW_ARG)
.withDescription("Show the parameters for a saved job")
.withLongOpt(JOB_CMD_SHOW_ARG)
.create());
Option execOption = OptionBuilder.withArgName("session-id")
Option execOption = OptionBuilder.withArgName("job-id")
.hasArg()
.withDescription("Run a saved session")
.withLongOpt(SESSION_CMD_EXEC_ARG)
.withDescription("Run a saved job")
.withLongOpt(JOB_CMD_EXEC_ARG)
.create();
group.addOption(execOption);
group.addOption(OptionBuilder
.withDescription("List saved sessions")
.withLongOpt(SESSION_CMD_LIST_ARG)
.withDescription("List saved jobs")
.withLongOpt(JOB_CMD_LIST_ARG)
.create());
relatedOpts.addOptionGroup(group);
// Since the "common" options aren't used in the session tool,
// Since the "common" options aren't used in the job tool,
// add these settings here.
relatedOpts.addOption(OptionBuilder
.withDescription("Print more information while working")

View File

@ -45,9 +45,9 @@
import com.cloudera.sqoop.hive.HiveImport;
import com.cloudera.sqoop.manager.ImportJobContext;
import com.cloudera.sqoop.metastore.SessionData;
import com.cloudera.sqoop.metastore.SessionStorage;
import com.cloudera.sqoop.metastore.SessionStorageFactory;
import com.cloudera.sqoop.metastore.JobData;
import com.cloudera.sqoop.metastore.JobStorage;
import com.cloudera.sqoop.metastore.JobStorageFactory;
import com.cloudera.sqoop.util.AppendUtils;
import com.cloudera.sqoop.util.ImportException;
import org.apache.hadoop.fs.Path;
@ -100,7 +100,7 @@ private boolean isIncremental(SqoopOptions options) {
/**
* If this is an incremental import, then we should save the
* user's state back to the metastore (if this session was run
* user's state back to the metastore (if this job was run
* from the metastore). Otherwise, log to the user what data
* they need to supply next time.
*/
@ -111,22 +111,22 @@ private void saveIncrementalState(SqoopOptions options)
}
Map<String, String> descriptor = options.getStorageDescriptor();
String sessionName = options.getSessionName();
String jobName = options.getJobName();
if (null != sessionName && null != descriptor) {
if (null != jobName && null != descriptor) {
// Actually save it back to the metastore.
LOG.info("Saving incremental import state to the metastore");
SessionStorageFactory ssf = new SessionStorageFactory(options.getConf());
SessionStorage storage = ssf.getSessionStorage(descriptor);
JobStorageFactory ssf = new JobStorageFactory(options.getConf());
JobStorage storage = ssf.getJobStorage(descriptor);
storage.open(descriptor);
try {
// Save the 'parent' SqoopOptions; this does not contain the mutations
// to the SqoopOptions state that occurred over the course of this
// execution, except for the one we specifically want to memorize:
// the latest value of the check column.
SessionData data = new SessionData(options.getParent(), this);
storage.update(sessionName, data);
LOG.info("Updated data for session: " + sessionName);
JobData data = new JobData(options.getParent(), this);
storage.update(jobName, data);
LOG.info("Updated data for job: " + jobName);
} finally {
storage.close();
}
@ -151,7 +151,7 @@ private void saveIncrementalState(SqoopOptions options)
}
LOG.info(" --check-column " + options.getIncrementalTestColumn());
LOG.info(" --last-value " + options.getIncrementalLastValue());
LOG.info("(Consider saving this with 'sqoop session --create')");
LOG.info("(Consider saving this with 'sqoop job --create')");
}
}

View File

@ -32,7 +32,6 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner;
@ -44,35 +43,34 @@
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.SqoopOptions.InvalidOptionsException;
import com.cloudera.sqoop.cli.ToolOptions;
import com.cloudera.sqoop.metastore.hsqldb.HsqldbSessionStorage;
import com.cloudera.sqoop.metastore.SessionData;
import com.cloudera.sqoop.metastore.SessionStorage;
import com.cloudera.sqoop.metastore.SessionStorageFactory;
import com.cloudera.sqoop.metastore.hsqldb.HsqldbJobStorage;
import com.cloudera.sqoop.metastore.JobData;
import com.cloudera.sqoop.metastore.JobStorage;
import com.cloudera.sqoop.metastore.JobStorageFactory;
/**
* Tool that creates and executes saved sessions.
* Tool that creates and executes saved jobs.
*/
public class SessionTool extends BaseSqoopTool {
public class JobTool extends BaseSqoopTool {
public static final Log LOG = LogFactory.getLog(
SessionTool.class.getName());
JobTool.class.getName());
private enum SessionOp {
SessionCreate,
SessionDelete,
SessionExecute,
SessionList,
SessionShow,
private enum JobOp {
JobCreate,
JobDelete,
JobExecute,
JobList,
JobShow,
};
private Map<String, String> sessionDescriptor;
private String sessionName;
private SessionOp operation;
private SessionStorage storage;
private Map<String, String> storageDescriptor;
private String jobName;
private JobOp operation;
private JobStorage storage;
public SessionTool() {
super("session");
public JobTool() {
super("job");
}
/**
@ -131,7 +129,7 @@ private int configureChildTool(SqoopOptions childOptions,
childTool.appendArgs(extraChildArgv);
childTool.validateOptions(childOptions);
} catch (ParseException pe) {
LOG.error("Error parsing arguments to the session-specific tool.");
LOG.error("Error parsing arguments to the job-specific tool.");
LOG.error("See 'sqoop help <tool>' for usage.");
return 1;
} catch (SqoopOptions.InvalidOptionsException e) {
@ -142,67 +140,67 @@ private int configureChildTool(SqoopOptions childOptions,
return 0; // Success.
}
private int createSession(SqoopOptions options) throws IOException {
private int createJob(SqoopOptions options) throws IOException {
// In our extraArguments array, we should have a '--' followed by
// a tool name, and any tool-specific arguments.
// Create an instance of the named tool and then configure it to
// get a SqoopOptions out which we will serialize into a session.
// get a SqoopOptions out which we will serialize into a job.
int dashPos = getDashPosition(extraArguments);
int toolArgPos = dashPos + 1;
if (null == extraArguments || toolArgPos < 0
|| toolArgPos >= extraArguments.length) {
LOG.error("No tool specified; cannot create a session.");
LOG.error("Use: sqoop create-session [session-args] "
LOG.error("No tool specified; cannot create a job.");
LOG.error("Use: sqoop job --create <job-name> "
+ "-- <tool-name> [tool-args]");
return 1;
}
String sessionToolName = extraArguments[toolArgPos];
SqoopTool sessionTool = SqoopTool.getTool(sessionToolName);
if (null == sessionTool) {
LOG.error("No such tool available: " + sessionToolName);
String jobToolName = extraArguments[toolArgPos];
SqoopTool jobTool = SqoopTool.getTool(jobToolName);
if (null == jobTool) {
LOG.error("No such tool available: " + jobToolName);
return 1;
}
// Create a SqoopOptions and Configuration based on the current one,
// but deep-copied. This will be populated within the session.
SqoopOptions sessionOptions = new SqoopOptions();
sessionOptions.setConf(new Configuration(options.getConf()));
// but deep-copied. This will be populated within the job.
SqoopOptions jobOptions = new SqoopOptions();
jobOptions.setConf(new Configuration(options.getConf()));
// Get the arguments to feed to the child tool.
String [] childArgs = Arrays.copyOfRange(extraArguments, toolArgPos + 1,
extraArguments.length);
int confRet = configureChildTool(sessionOptions, sessionTool, childArgs);
int confRet = configureChildTool(jobOptions, jobTool, childArgs);
if (0 != confRet) {
// Error.
return confRet;
}
// Now that the tool is fully configured, materialize the session.
SessionData sessionData = new SessionData(sessionOptions, sessionTool);
this.storage.create(sessionName, sessionData);
// Now that the tool is fully configured, materialize the job.
JobData jobData = new JobData(jobOptions, jobTool);
this.storage.create(jobName, jobData);
return 0; // Success.
}
private int listSessions(SqoopOptions opts) throws IOException {
List<String> sessionNames = storage.list();
System.out.println("Available sessions:");
for (String name : sessionNames) {
private int listJobs(SqoopOptions opts) throws IOException {
List<String> jobNames = storage.list();
System.out.println("Available jobs:");
for (String name : jobNames) {
System.out.println(" " + name);
}
return 0;
}
private int deleteSession(SqoopOptions opts) throws IOException {
this.storage.delete(sessionName);
private int deleteJob(SqoopOptions opts) throws IOException {
this.storage.delete(jobName);
return 0;
}
private int execSession(SqoopOptions opts) throws IOException {
SessionData data = this.storage.read(sessionName);
private int execJob(SqoopOptions opts) throws IOException {
JobData data = this.storage.read(jobName);
if (null == data) {
LOG.error("No such session: " + sessionName);
LOG.error("No such job: " + jobName);
return 1;
}
@ -233,17 +231,17 @@ private int execSession(SqoopOptions opts) throws IOException {
return childTool.run(clonedOpts);
}
private int showSession(SqoopOptions opts) throws IOException {
SessionData data = this.storage.read(sessionName);
private int showJob(SqoopOptions opts) throws IOException {
JobData data = this.storage.read(jobName);
if (null == data) {
LOG.error("No such session: " + sessionName);
LOG.error("No such job: " + jobName);
return 1;
}
SqoopOptions childOpts = data.getSqoopOptions();
SqoopTool childTool = data.getSqoopTool();
System.out.println("Session: " + sessionName);
System.out.println("Job: " + jobName);
System.out.println("Tool: " + childTool.getToolName());
System.out.println("Options:");
@ -263,39 +261,39 @@ private int showSession(SqoopOptions opts) throws IOException {
@Override
/** {@inheritDoc} */
public int run(SqoopOptions options) {
// Get a SessionStorage instance to use to materialize this session.
SessionStorageFactory ssf = new SessionStorageFactory(options.getConf());
this.storage = ssf.getSessionStorage(sessionDescriptor);
// Get a JobStorage instance to use to materialize this job.
JobStorageFactory ssf = new JobStorageFactory(options.getConf());
this.storage = ssf.getJobStorage(storageDescriptor);
if (null == this.storage) {
LOG.error("There is no SessionStorage implementation available");
LOG.error("that can read your specified session descriptor.");
LOG.error("Don't know where to save this session info! You may");
LOG.error("There is no JobStorage implementation available");
LOG.error("that can read your specified storage descriptor.");
LOG.error("Don't know where to save this job info! You may");
LOG.error("need to specify the connect string with --meta-connect.");
return 1;
}
try {
// Open the storage layer.
this.storage.open(this.sessionDescriptor);
this.storage.open(this.storageDescriptor);
// And now determine what operation to perform with it.
switch (operation) {
case SessionCreate:
return createSession(options);
case SessionDelete:
return deleteSession(options);
case SessionExecute:
return execSession(options);
case SessionList:
return listSessions(options);
case SessionShow:
return showSession(options);
case JobCreate:
return createJob(options);
case JobDelete:
return deleteJob(options);
case JobExecute:
return execJob(options);
case JobList:
return listJobs(options);
case JobShow:
return showJob(options);
default:
LOG.error("Undefined session operation: " + operation);
LOG.error("Undefined job operation: " + operation);
return 1;
}
} catch (IOException ioe) {
LOG.error("I/O error performing session operation: "
LOG.error("I/O error performing job operation: "
+ StringUtils.stringifyException(ioe));
return 1;
} finally {
@ -303,7 +301,7 @@ public int run(SqoopOptions options) {
try {
storage.close();
} catch (IOException ioe) {
LOG.warn("IOException closing SessionStorage: "
LOG.warn("IOException closing JobStorage: "
+ StringUtils.stringifyException(ioe));
}
}
@ -313,7 +311,7 @@ public int run(SqoopOptions options) {
@Override
/** Configure the command-line arguments we expect to receive */
public void configureOptions(ToolOptions toolOptions) {
toolOptions.addUniqueOptions(getSessionOptions());
toolOptions.addUniqueOptions(getJobOptions());
}
@Override
@ -336,29 +334,29 @@ public void applyOptions(CommandLine in, SqoopOptions out)
throw new InvalidOptionsException("");
}
this.sessionDescriptor = new TreeMap<String, String>();
this.storageDescriptor = new TreeMap<String, String>();
if (in.hasOption(SESSION_METASTORE_ARG)) {
this.sessionDescriptor.put(HsqldbSessionStorage.META_CONNECT_KEY,
in.getOptionValue(SESSION_METASTORE_ARG));
if (in.hasOption(STORAGE_METASTORE_ARG)) {
this.storageDescriptor.put(HsqldbJobStorage.META_CONNECT_KEY,
in.getOptionValue(STORAGE_METASTORE_ARG));
}
// These are generated via an option group; exactly one
// of this exhaustive list will always be selected.
if (in.hasOption(SESSION_CMD_CREATE_ARG)) {
this.operation = SessionOp.SessionCreate;
this.sessionName = in.getOptionValue(SESSION_CMD_CREATE_ARG);
} else if (in.hasOption(SESSION_CMD_DELETE_ARG)) {
this.operation = SessionOp.SessionDelete;
this.sessionName = in.getOptionValue(SESSION_CMD_DELETE_ARG);
} else if (in.hasOption(SESSION_CMD_EXEC_ARG)) {
this.operation = SessionOp.SessionExecute;
this.sessionName = in.getOptionValue(SESSION_CMD_EXEC_ARG);
} else if (in.hasOption(SESSION_CMD_LIST_ARG)) {
this.operation = SessionOp.SessionList;
} else if (in.hasOption(SESSION_CMD_SHOW_ARG)) {
this.operation = SessionOp.SessionShow;
this.sessionName = in.getOptionValue(SESSION_CMD_SHOW_ARG);
if (in.hasOption(JOB_CMD_CREATE_ARG)) {
this.operation = JobOp.JobCreate;
this.jobName = in.getOptionValue(JOB_CMD_CREATE_ARG);
} else if (in.hasOption(JOB_CMD_DELETE_ARG)) {
this.operation = JobOp.JobDelete;
this.jobName = in.getOptionValue(JOB_CMD_DELETE_ARG);
} else if (in.hasOption(JOB_CMD_EXEC_ARG)) {
this.operation = JobOp.JobExecute;
this.jobName = in.getOptionValue(JOB_CMD_EXEC_ARG);
} else if (in.hasOption(JOB_CMD_LIST_ARG)) {
this.operation = JobOp.JobList;
} else if (in.hasOption(JOB_CMD_SHOW_ARG)) {
this.operation = JobOp.JobShow;
this.jobName = in.getOptionValue(JOB_CMD_SHOW_ARG);
}
}
@ -368,12 +366,12 @@ public void validateOptions(SqoopOptions options)
throws InvalidOptionsException {
if (null == operation
|| (null == this.sessionName && operation != SessionOp.SessionList)) {
throw new InvalidOptionsException("No session operation specified"
|| (null == this.jobName && operation != JobOp.JobList)) {
throw new InvalidOptionsException("No job operation specified"
+ HELP_STR);
}
if (operation == SessionOp.SessionCreate) {
if (operation == JobOp.JobCreate) {
// Check that we have a '--' followed by at least a tool name.
if (extraArguments == null || extraArguments.length == 0) {
throw new InvalidOptionsException(
@ -392,7 +390,7 @@ public void validateOptions(SqoopOptions options)
/** {@inheritDoc} */
public void printHelp(ToolOptions opts) {
System.out.println("usage: sqoop " + getToolName()
+ " [GENERIC-ARGS] [SESSION-ARGS] [-- [<tool-name>] [TOOL-ARGS]]");
+ " [GENERIC-ARGS] [JOB-ARGS] [-- [<tool-name>] [TOOL-ARGS]]");
System.out.println("");
opts.printHelp();

View File

@ -75,8 +75,8 @@ public abstract class SqoopTool {
"List available tables in a database");
registerTool("metastore", MetastoreTool.class,
"Run a standalone Sqoop metastore");
registerTool("session", SessionTool.class,
"Work with saved sessions");
registerTool("job", JobTool.class,
"Work with saved jobs");
registerTool("version", VersionTool.class,
"Display version information");
}

View File

@ -32,7 +32,7 @@
import com.cloudera.sqoop.manager.TestHsqldbManager;
import com.cloudera.sqoop.manager.TestSqlManager;
import com.cloudera.sqoop.mapreduce.MapreduceTests;
import com.cloudera.sqoop.metastore.TestSessions;
import com.cloudera.sqoop.metastore.TestSavedJobs;
import com.cloudera.sqoop.orm.TestClassWriter;
import com.cloudera.sqoop.orm.TestParseMethods;
import com.cloudera.sqoop.util.TestDirectImportUtils;
@ -77,7 +77,7 @@ public static Test suite() {
suite.addTestSuite(TestDirectImportUtils.class);
suite.addTestSuite(TestLobFile.class);
suite.addTestSuite(TestExportUpdate.class);
suite.addTestSuite(TestSessions.class);
suite.addTestSuite(TestSavedJobs.class);
suite.addTestSuite(TestNamedFifo.class);
suite.addTestSuite(TestBooleanParser.class);
suite.addTest(MapreduceTests.suite());

View File

@ -22,7 +22,7 @@
import com.cloudera.sqoop.manager.ConnManager;
import com.cloudera.sqoop.manager.ImportJobContext;
import com.cloudera.sqoop.manager.ManagerFactory;
import com.cloudera.sqoop.metastore.SessionData;
import com.cloudera.sqoop.metastore.JobData;
import com.cloudera.sqoop.tool.ImportTool;
import junit.framework.TestCase;
@ -45,7 +45,7 @@ public void testCustomFactory() throws IOException {
ConnFactory factory = new ConnFactory(conf);
ConnManager manager = factory.getManager(
new SessionData(new SqoopOptions(), new ImportTool()));
new JobData(new SqoopOptions(), new ImportTool()));
assertNotNull("No manager returned", manager);
assertTrue("Expected a DummyManager", manager instanceof DummyManager);
}
@ -57,7 +57,7 @@ public void testExceptionForNoManager() {
ConnFactory factory = new ConnFactory(conf);
try {
factory.getManager(
new SessionData(new SqoopOptions(), new ImportTool()));
new JobData(new SqoopOptions(), new ImportTool()));
fail("factory.getManager() expected to throw IOException");
} catch (IOException ioe) {
// Expected this. Test passes.
@ -75,7 +75,7 @@ public void testMultipleManagers() throws IOException {
ConnFactory factory = new ConnFactory(conf);
ConnManager manager = factory.getManager(
new SessionData(new SqoopOptions(), new ImportTool()));
new JobData(new SqoopOptions(), new ImportTool()));
assertNotNull("No manager returned", manager);
assertTrue("Expected a DummyManager", manager instanceof DummyManager);
}
@ -87,7 +87,7 @@ public void testMultipleManagers() throws IOException {
* configuration.
*/
public static class AlwaysDummyFactory extends ManagerFactory {
public ConnManager accept(SessionData data) {
public ConnManager accept(JobData data) {
// Always return a new DummyManager
return new DummyManager();
}
@ -97,7 +97,7 @@ public ConnManager accept(SessionData data) {
* ManagerFactory that accepts no configurations.
*/
public static class EmptyFactory extends ManagerFactory {
public ConnManager accept(SessionData data) {
public ConnManager accept(JobData data) {
// Never instantiate a proper ConnManager;
return null;
}

View File

@ -42,12 +42,12 @@
import com.cloudera.sqoop.manager.ConnManager;
import com.cloudera.sqoop.manager.HsqldbManager;
import com.cloudera.sqoop.manager.ManagerFactory;
import com.cloudera.sqoop.metastore.SessionData;
import com.cloudera.sqoop.metastore.TestSessions;
import com.cloudera.sqoop.metastore.JobData;
import com.cloudera.sqoop.metastore.TestSavedJobs;
import com.cloudera.sqoop.testutil.BaseSqoopTestCase;
import com.cloudera.sqoop.testutil.CommonArgs;
import com.cloudera.sqoop.tool.ImportTool;
import com.cloudera.sqoop.tool.SessionTool;
import com.cloudera.sqoop.tool.JobTool;
import junit.framework.TestCase;
@ -71,18 +71,18 @@ public class TestIncrementalImport extends TestCase {
@Override
public void setUp() throws Exception {
// Delete db state between tests.
TestSessions.resetSessionSchema();
TestSavedJobs.resetJobSchema();
resetSourceDataSchema();
}
public static void resetSourceDataSchema() throws SQLException {
SqoopOptions options = new SqoopOptions();
options.setConnectString(SOURCE_DB_URL);
TestSessions.resetSchema(options);
TestSavedJobs.resetSchema(options);
}
public static Configuration newConf() {
return TestSessions.newConf();
return TestSavedJobs.newConf();
}
/**
@ -379,65 +379,65 @@ private List<String> getArgListForTable(String tableName, boolean commonArgs,
}
/**
* Create a session with the specified name, where the session performs
* an import configured with 'sessionArgs'.
* Create a job with the specified name, where the job performs
* an import configured with 'jobArgs'.
*/
private void createSession(String sessionName, List<String> sessionArgs) {
createSession(sessionName, sessionArgs, newConf());
private void createJob(String jobName, List<String> jobArgs) {
createJob(jobName, jobArgs, newConf());
}
/**
* Create a session with the specified name, where the session performs
* an import configured with 'sessionArgs', using the provided configuration
* Create a job with the specified name, where the job performs
* an import configured with 'jobArgs', using the provided configuration
* as defaults.
*/
private void createSession(String sessionName, List<String> sessionArgs,
private void createJob(String jobName, List<String> jobArgs,
Configuration conf) {
try {
SqoopOptions options = new SqoopOptions();
options.setConf(conf);
Sqoop makeSession = new Sqoop(new SessionTool(), conf, options);
Sqoop makeJob = new Sqoop(new JobTool(), conf, options);
List<String> args = new ArrayList<String>();
args.add("--create");
args.add(sessionName);
args.add(jobName);
args.add("--");
args.add("import");
args.addAll(sessionArgs);
args.addAll(jobArgs);
int ret = Sqoop.runSqoop(makeSession, args.toArray(new String[0]));
assertEquals("Failure during job to create session", 0, ret);
int ret = Sqoop.runSqoop(makeJob, args.toArray(new String[0]));
assertEquals("Failure to create job", 0, ret);
} catch (Exception e) {
LOG.error("Got exception running Sqoop to create session: "
LOG.error("Got exception running Sqoop to create job: "
+ StringUtils.stringifyException(e));
throw new RuntimeException(e);
}
}
/**
* Run the specified session.
* Run the specified job.
*/
private void runSession(String sessionName) {
runSession(sessionName, newConf());
private void runJob(String jobName) {
runJob(jobName, newConf());
}
/**
* Run the specified session.
* Run the specified job.
*/
private void runSession(String sessionName, Configuration conf) {
private void runJob(String jobName, Configuration conf) {
try {
SqoopOptions options = new SqoopOptions();
options.setConf(conf);
Sqoop runSession = new Sqoop(new SessionTool(), conf, options);
Sqoop runJob = new Sqoop(new JobTool(), conf, options);
List<String> args = new ArrayList<String>();
args.add("--exec");
args.add(sessionName);
args.add(jobName);
int ret = Sqoop.runSqoop(runSession, args.toArray(new String[0]));
assertEquals("Failure during job to run session", 0, ret);
int ret = Sqoop.runSqoop(runJob, args.toArray(new String[0]));
assertEquals("Failure to run job", 0, ret);
} catch (Exception e) {
LOG.error("Got exception running Sqoop to run session: "
LOG.error("Got exception running Sqoop to run job: "
+ StringUtils.stringifyException(e));
throw new RuntimeException(e);
}
@ -471,25 +471,25 @@ public void testFullAppendImport() throws Exception {
assertDirOfNumbers(TABLE_NAME, 10);
}
public void testEmptySessionAppend() throws Exception {
// Create a session and run an import on an empty table.
public void testEmptyJobAppend() throws Exception {
// Create a job and run an import on an empty table.
// Nothing should happen.
final String TABLE_NAME = "emptySession";
final String TABLE_NAME = "emptyJob";
createIdTable(TABLE_NAME, 0);
List<String> args = getArgListForTable(TABLE_NAME, false, true);
createSession("emptySession", args);
runSession("emptySession");
createJob("emptyJob", args);
runJob("emptyJob");
assertDirOfNumbers(TABLE_NAME, 0);
// Running the session a second time should result in
// Running the job a second time should result in
// nothing happening, it's still empty.
runSession("emptySession");
runJob("emptyJob");
assertDirOfNumbers(TABLE_NAME, 0);
}
public void testEmptyThenFullSessionAppend() throws Exception {
public void testEmptyThenFullJobAppend() throws Exception {
// Create an empty table. Import it; nothing happens.
// Add some rows. Verify they are appended.
@ -497,22 +497,22 @@ public void testEmptyThenFullSessionAppend() throws Exception {
createIdTable(TABLE_NAME, 0);
List<String> args = getArgListForTable(TABLE_NAME, false, true);
createSession(TABLE_NAME, args);
runSession(TABLE_NAME);
createJob(TABLE_NAME, args);
runJob(TABLE_NAME);
assertDirOfNumbers(TABLE_NAME, 0);
// Now add some rows.
insertIdRows(TABLE_NAME, 0, 10);
// Running the session a second time should import 10 rows.
runSession(TABLE_NAME);
// Running the job a second time should import 10 rows.
runJob(TABLE_NAME);
assertDirOfNumbers(TABLE_NAME, 10);
// Add some more rows.
insertIdRows(TABLE_NAME, 10, 20);
// Import only those rows.
runSession(TABLE_NAME);
runJob(TABLE_NAME);
assertDirOfNumbers(TABLE_NAME, 20);
}
@ -524,15 +524,15 @@ public void testAppend() throws Exception {
createIdTable(TABLE_NAME, 10);
List<String> args = getArgListForTable(TABLE_NAME, false, true);
createSession(TABLE_NAME, args);
runSession(TABLE_NAME);
createJob(TABLE_NAME, args);
runJob(TABLE_NAME);
assertDirOfNumbers(TABLE_NAME, 10);
// Add some more rows.
insertIdRows(TABLE_NAME, 10, 20);
// Import only those rows.
runSession(TABLE_NAME);
runJob(TABLE_NAME);
assertDirOfNumbers(TABLE_NAME, 20);
}
@ -584,26 +584,26 @@ public void testNoImportFromTheFuture() throws Exception {
assertDirOfNumbers(TABLE_NAME, 0);
}
public void testEmptySessionLastMod() throws Exception {
// Create a session and run an import on an empty table.
public void testEmptyJobLastMod() throws Exception {
// Create a job and run an import on an empty table.
// Nothing should happen.
final String TABLE_NAME = "emptySessionLastMod";
final String TABLE_NAME = "emptyJobLastMod";
createTimestampTable(TABLE_NAME, 0, null);
List<String> args = getArgListForTable(TABLE_NAME, false, false);
args.add("--append");
createSession("emptySessionLastMod", args);
runSession("emptySessionLastMod");
createJob("emptyJobLastMod", args);
runJob("emptyJobLastMod");
assertDirOfNumbers(TABLE_NAME, 0);
// Running the session a second time should result in
// Running the job a second time should result in
// nothing happening, it's still empty.
runSession("emptySessionLastMod");
runJob("emptyJobLastMod");
assertDirOfNumbers(TABLE_NAME, 0);
}
public void testEmptyThenFullSessionLastMod() throws Exception {
public void testEmptyThenFullJobLastMod() throws Exception {
// Create an empty table. Import it; nothing happens.
// Add some rows. Verify they are appended.
@ -612,8 +612,8 @@ public void testEmptyThenFullSessionLastMod() throws Exception {
List<String> args = getArgListForTable(TABLE_NAME, false, false);
args.add("--append");
createSession(TABLE_NAME, args);
runSession(TABLE_NAME);
createJob(TABLE_NAME, args);
runJob(TABLE_NAME);
assertDirOfNumbers(TABLE_NAME, 0);
long importWasBefore = System.currentTimeMillis();
@ -630,8 +630,8 @@ public void testEmptyThenFullSessionLastMod() throws Exception {
insertIdTimestampRows(TABLE_NAME, 0, 10, new Timestamp(rowsAddedTime));
// Running the session a second time should import 10 rows.
runSession(TABLE_NAME);
// Running the job a second time should import 10 rows.
runJob(TABLE_NAME);
assertDirOfNumbers(TABLE_NAME, 10);
// Add some more rows.
@ -643,7 +643,7 @@ public void testEmptyThenFullSessionLastMod() throws Exception {
insertIdTimestampRows(TABLE_NAME, 10, 20, new Timestamp(rowsAddedTime));
// Import only those rows.
runSession(TABLE_NAME);
runJob(TABLE_NAME);
assertDirOfNumbers(TABLE_NAME, 20);
}
@ -657,8 +657,8 @@ public void testAppendWithTimestamp() throws Exception {
List<String> args = getArgListForTable(TABLE_NAME, false, false);
args.add("--append");
createSession(TABLE_NAME, args);
runSession(TABLE_NAME);
createJob(TABLE_NAME, args);
runJob(TABLE_NAME);
assertDirOfNumbers(TABLE_NAME, 10);
// Add some more rows.
@ -670,7 +670,7 @@ public void testAppendWithTimestamp() throws Exception {
insertIdTimestampRows(TABLE_NAME, 10, 20, new Timestamp(rowsAddedTime));
// Import only those rows.
runSession(TABLE_NAME);
runJob(TABLE_NAME);
assertDirOfNumbers(TABLE_NAME, 20);
}
@ -684,8 +684,8 @@ public void testModifyWithTimestamp() throws Exception {
createTimestampTable(TABLE_NAME, 10, thePast);
List<String> args = getArgListForTable(TABLE_NAME, false, false);
createSession(TABLE_NAME, args);
runSession(TABLE_NAME);
createJob(TABLE_NAME, args);
runJob(TABLE_NAME);
assertDirOfNumbers(TABLE_NAME, 10);
// Modify a row.
@ -713,7 +713,7 @@ public void testModifyWithTimestamp() throws Exception {
// Import only the new row.
clearDir(TABLE_NAME);
runSession(TABLE_NAME);
runJob(TABLE_NAME);
assertSpecificNumber(TABLE_NAME, 4000);
}
@ -723,7 +723,7 @@ public void testModifyWithTimestamp() throws Exception {
*/
public static class InstrumentHsqldbManagerFactory extends ManagerFactory {
@Override
public ConnManager accept(SessionData data) {
public ConnManager accept(JobData data) {
LOG.info("Using instrumented manager");
return new InstrumentHsqldbManager(data.getSqoopOptions());
}
@ -771,8 +771,8 @@ public void testTimestampBoundary() throws Exception {
List<String> args = getArgListForTable(TABLE_NAME, false, false);
args.add("--append");
createSession(TABLE_NAME, args, conf);
runSession(TABLE_NAME);
createJob(TABLE_NAME, args, conf);
runJob(TABLE_NAME);
assertDirOfNumbers(TABLE_NAME, 10);
// Add some more rows with the timestamp equal to the job run timestamp.
@ -784,7 +784,7 @@ public void testTimestampBoundary() throws Exception {
InstrumentHsqldbManager.setCurrentDbTimestamp(secondJobTime);
// Import only those rows.
runSession(TABLE_NAME);
runJob(TABLE_NAME);
assertDirOfNumbers(TABLE_NAME, 20);
}
}

View File

@ -38,13 +38,13 @@
import java.sql.Connection;
/**
* Test the metastore and session-handling features.
* Test the metastore and job-handling features.
*
* These all make use of the auto-connect hsqldb-based metastore.
* The metastore URL is configured to be in-memory, and drop all
* state between individual tests.
*/
public class TestSessions extends TestCase {
public class TestSavedJobs extends TestCase {
public static final String TEST_AUTOCONNECT_URL =
"jdbc:hsqldb:mem:sqoopmetastore";
@ -54,10 +54,10 @@ public class TestSessions extends TestCase {
@Override
public void setUp() throws Exception {
// Delete db state between tests.
resetSessionSchema();
resetJobSchema();
}
public static void resetSessionSchema() throws SQLException {
public static void resetJobSchema() throws SQLException {
SqoopOptions options = new SqoopOptions();
options.setConnectString(TEST_AUTOCONNECT_URL);
options.setUsername(TEST_AUTOCONNECT_USER);
@ -98,58 +98,58 @@ public static Configuration newConf() {
public void testAutoConnect() throws IOException {
// By default, we should be able to auto-connect with an
// empty connection descriptor. We should see an empty
// session set.
// job set.
Configuration conf = newConf();
SessionStorageFactory ssf = new SessionStorageFactory(conf);
JobStorageFactory ssf = new JobStorageFactory(conf);
Map<String, String> descriptor = new TreeMap<String, String>();
SessionStorage storage = ssf.getSessionStorage(descriptor);
JobStorage storage = ssf.getJobStorage(descriptor);
storage.open(descriptor);
List<String> sessions = storage.list();
assertEquals(0, sessions.size());
List<String> jobs = storage.list();
assertEquals(0, jobs.size());
storage.close();
}
public void testCreateDeleteSession() throws IOException {
public void testCreateDeleteJob() throws IOException {
Configuration conf = newConf();
SessionStorageFactory ssf = new SessionStorageFactory(conf);
JobStorageFactory ssf = new JobStorageFactory(conf);
Map<String, String> descriptor = new TreeMap<String, String>();
SessionStorage storage = ssf.getSessionStorage(descriptor);
JobStorage storage = ssf.getJobStorage(descriptor);
storage.open(descriptor);
// Session list should start out empty.
List<String> sessions = storage.list();
assertEquals(0, sessions.size());
// Job list should start out empty.
List<String> jobs = storage.list();
assertEquals(0, jobs.size());
// Create a session that displays the version.
SessionData data = new SessionData(new SqoopOptions(), new VersionTool());
storage.create("versionSession", data);
// Create a job that displays the version.
JobData data = new JobData(new SqoopOptions(), new VersionTool());
storage.create("versionJob", data);
sessions = storage.list();
assertEquals(1, sessions.size());
assertEquals("versionSession", sessions.get(0));
jobs = storage.list();
assertEquals(1, jobs.size());
assertEquals("versionJob", jobs.get(0));
// Try to create that same session name again. This should fail.
// Try to create that same job name again. This should fail.
try {
storage.create("versionSession", data);
fail("Expected IOException; this session already exists.");
storage.create("versionJob", data);
fail("Expected IOException; this job already exists.");
} catch (IOException ioe) {
// This is expected; continue operation.
}
sessions = storage.list();
assertEquals(1, sessions.size());
jobs = storage.list();
assertEquals(1, jobs.size());
// Restore our session, check that it exists.
SessionData outData = storage.read("versionSession");
// Restore our job, check that it exists.
JobData outData = storage.read("versionJob");
assertEquals(new VersionTool().getToolName(),
outData.getSqoopTool().getToolName());
// Try to restore a session that doesn't exist. Watch it fail.
// Try to restore a job that doesn't exist. Watch it fail.
try {
storage.read("DoesNotExist");
fail("Expected IOException");
@ -157,53 +157,53 @@ public void testCreateDeleteSession() throws IOException {
// This is expected. Continue.
}
// Now delete the session.
storage.delete("versionSession");
// Now delete the job.
storage.delete("versionJob");
// After delete, we should have no sessions.
sessions = storage.list();
assertEquals(0, sessions.size());
// After delete, we should have no jobs.
jobs = storage.list();
assertEquals(0, jobs.size());
storage.close();
}
public void testMultiConnections() throws IOException {
// Ensure that a session can be retrieved when the storage is
// Ensure that a job can be retrieved when the storage is
// closed and reopened.
Configuration conf = newConf();
SessionStorageFactory ssf = new SessionStorageFactory(conf);
JobStorageFactory ssf = new JobStorageFactory(conf);
Map<String, String> descriptor = new TreeMap<String, String>();
SessionStorage storage = ssf.getSessionStorage(descriptor);
JobStorage storage = ssf.getJobStorage(descriptor);
storage.open(descriptor);
// Session list should start out empty.
List<String> sessions = storage.list();
assertEquals(0, sessions.size());
// Job list should start out empty.
List<String> jobs = storage.list();
assertEquals(0, jobs.size());
// Create a session that displays the version.
SessionData data = new SessionData(new SqoopOptions(), new VersionTool());
storage.create("versionSession", data);
// Create a job that displays the version.
JobData data = new JobData(new SqoopOptions(), new VersionTool());
storage.create("versionJob", data);
sessions = storage.list();
assertEquals(1, sessions.size());
assertEquals("versionSession", sessions.get(0));
jobs = storage.list();
assertEquals(1, jobs.size());
assertEquals("versionJob", jobs.get(0));
storage.close(); // Close the existing connection
// Now re-open the storage.
ssf = new SessionStorageFactory(newConf());
storage = ssf.getSessionStorage(descriptor);
ssf = new JobStorageFactory(newConf());
storage = ssf.getJobStorage(descriptor);
storage.open(descriptor);
sessions = storage.list();
assertEquals(1, sessions.size());
assertEquals("versionSession", sessions.get(0));
jobs = storage.list();
assertEquals(1, jobs.size());
assertEquals("versionJob", jobs.get(0));
// Restore our session, check that it exists.
SessionData outData = storage.read("versionSession");
// Restore our job, check that it exists.
JobData outData = storage.read("versionJob");
assertEquals(new VersionTool().getToolName(),
outData.getSqoopTool().getToolName());

View File

@ -38,7 +38,7 @@
import com.cloudera.sqoop.ConnFactory;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.manager.ConnManager;
import com.cloudera.sqoop.metastore.SessionData;
import com.cloudera.sqoop.metastore.JobData;
import com.cloudera.sqoop.shims.ShimLoader;
import com.cloudera.sqoop.tool.ImportTool;
@ -199,7 +199,7 @@ public void setUp() {
opts.setTableName(getTableName());
ConnFactory f = new ConnFactory(conf);
try {
this.manager = f.getManager(new SessionData(opts, new ImportTool()));
this.manager = f.getManager(new JobData(opts, new ImportTool()));
} catch (IOException ioe) {
fail("IOException instantiating manager: "
+ StringUtils.stringifyException(ioe));

View File

@ -20,7 +20,7 @@
import com.cloudera.sqoop.manager.ConnManager;
import com.cloudera.sqoop.manager.ManagerFactory;
import com.cloudera.sqoop.metastore.SessionData;
import com.cloudera.sqoop.metastore.JobData;
/**
* ManagerFactory that is used for testing; this accepts any
@ -28,7 +28,7 @@
*/
public class InjectableManagerFactory extends ManagerFactory {
public ConnManager accept(SessionData data) {
public ConnManager accept(JobData data) {
// Always accept and use the injectable manager.
return new InjectableConnManager(data.getSqoopOptions());
}

View File

@ -44,7 +44,7 @@
<!-- createRootTable() allows a user-specified table name retrieved
from properties. This since instance is allowed for now.
-->
<Class name="com.cloudera.sqoop.metastore.hsqldb.HsqldbSessionStorage" />
<Class name="com.cloudera.sqoop.metastore.hsqldb.HsqldbJobStorage" />
<Method name="createRootTable" />
<Bug pattern="SQL_NONCONSTANT_STRING_PASSED_TO_EXECUTE" />
</Match>