mirror of
https://github.com/apache/sqoop.git
synced 2025-05-03 04:42:22 +08:00
SQOOP-39. (SIP-6) Session Management API.
Added SessionStorage API. Added SessonData API. Added SessionStorageFactory. Added SessionTool for create/delete/execute/show/list operations on sessions. SqoopOptions can read and write all "sticky" state to a Properties instance. Added HsqldbSessionStorage to implement SessionStorage API. Added AutoHsqldbStorage to auto-instantiate a local metastore for the user. Added client metastore connection parameters to sqoop-site.xml. Added metastore tool (MetastoreTool). Added HsqldbMetaStore for standalone metastore instance. Added metastore properties to sqoop-default.xml. Added TestSessions unit tests of session API. Renamed conf/sqoop-default.xml to conf/sqoop-site-template.xml. Added conf/.gitignore for sqoop-site.xml. Tests run: Tested all metastore operations on an import session. Tested that ~/.sqoop/-based storage will be auto-created by the metastore. Tested that 'sqoop metastore'-based metastores can be connected to by external clients. Tested that 'sqoop metastore --shutdown' will gracefully shut down a running metastore instance. Tested that passwords are not stored in the metastore by default, and the user is prompted for the password when executing that saved session. From: Aaron Kimball <aaron@cloudera.com> git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1149940 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0f35d54f21
commit
06b94587af
@ -484,6 +484,14 @@
|
||||
<include name="**/*.sh" />
|
||||
</fileset>
|
||||
</chmod>
|
||||
|
||||
<!-- In the configuration directory, take the sqoop-site-template
|
||||
and copy it to sqoop-site.xml, overwriting any user-specified
|
||||
sqoop-site.xml in there.
|
||||
-->
|
||||
<copy file="${dist.dir}/conf/sqoop-site-template.xml"
|
||||
tofile="${dist.dir}/conf/sqoop-site.xml"
|
||||
overwrite="true" />
|
||||
</target>
|
||||
|
||||
<target name="tar" depends="package" description="Create release tarball">
|
||||
|
15
conf/.gitignore
vendored
Normal file
15
conf/.gitignore
vendored
Normal file
@ -0,0 +1,15 @@
|
||||
# Licensed to Cloudera, Inc. under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# Cloudera, Inc. licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
/sqoop-site.xml
|
@ -1,33 +0,0 @@
|
||||
<?xml version="1.0"?>
|
||||
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
|
||||
<!--
|
||||
Licensed to Cloudera, Inc. under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
Cloudera, Inc. licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
|
||||
<!-- Put Sqoop-specific properties in this file. -->
|
||||
|
||||
<configuration>
|
||||
|
||||
<property>
|
||||
<name>sqoop.connection.factories</name>
|
||||
<value>com.cloudera.sqoop.manager.DefaultManagerFactory</value>
|
||||
<description>A comma-delimited list of ManagerFactory implementations
|
||||
which are consulted, in order, to instantiate ConnManager instances
|
||||
used to drive connections to databases.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
</configuration>
|
124
conf/sqoop-site-template.xml
Normal file
124
conf/sqoop-site-template.xml
Normal file
@ -0,0 +1,124 @@
|
||||
<?xml version="1.0"?>
|
||||
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
|
||||
<!--
|
||||
Licensed to Cloudera, Inc. under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
Cloudera, Inc. licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
|
||||
<!-- Put Sqoop-specific properties in this file. -->
|
||||
|
||||
<configuration>
|
||||
|
||||
<!--
|
||||
Override the value of this property to enable third-party ManagerFactory
|
||||
plugins.
|
||||
-->
|
||||
<!--
|
||||
<property>
|
||||
<name>sqoop.connection.factories</name>
|
||||
<value>com.cloudera.sqoop.manager.DefaultManagerFactory</value>
|
||||
<description>A comma-delimited list of ManagerFactory implementations
|
||||
which are consulted, in order, to instantiate ConnManager instances
|
||||
used to drive connections to databases.
|
||||
</description>
|
||||
</property>
|
||||
-->
|
||||
|
||||
<!--
|
||||
By default, the Sqoop metastore will auto-connect to a local embedded
|
||||
database stored in ~/.sqoop/. To disable metastore auto-connect, uncomment
|
||||
this next property.
|
||||
-->
|
||||
<!--
|
||||
<property>
|
||||
<name>sqoop.metastore.client.enable.autoconnect</name>
|
||||
<value>false</value>
|
||||
<description>If true, Sqoop will connect to a local metastore
|
||||
for session management when no other metastore arguments are
|
||||
provided.
|
||||
</description>
|
||||
</property>
|
||||
-->
|
||||
|
||||
<!--
|
||||
The auto-connect metastore is stored in ~/.sqoop/. Uncomment
|
||||
these next arguments to control the auto-connect process with
|
||||
greater precision.
|
||||
-->
|
||||
<!--
|
||||
<property>
|
||||
<name>sqoop.metastore.client.autoconnect.url</name>
|
||||
<value>jdbc:hsqldb:file:/tmp/sqoop-meta/meta.db;shutdown=true</value>
|
||||
<description>The connect string to use when connecting to a
|
||||
session-management metastore. If unspecified, uses ~/.sqoop/.
|
||||
You can specify a different path here.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sqoop.metastore.client.autoconnect.username</name>
|
||||
<value>SA</value>
|
||||
<description>The username to bind to the metastore.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sqoop.metastore.client.autoconnect.password</name>
|
||||
<value></value>
|
||||
<description>The password to bind to the metastore.
|
||||
</description>
|
||||
</property>
|
||||
-->
|
||||
|
||||
<!--
|
||||
For security reasons, by default your database password will not be stored in
|
||||
the Sqoop metastore. When executing a saved session, you will need to
|
||||
reenter the database password. Uncomment this setting to enable saved
|
||||
password storage. (INSECURE!)
|
||||
-->
|
||||
<!--
|
||||
<property>
|
||||
<name>sqoop.metastore.client.record.password</name>
|
||||
<value>true</value>
|
||||
<description>If true, allow saved passwords in the metastore.
|
||||
</description>
|
||||
</property>
|
||||
-->
|
||||
|
||||
|
||||
<!--
|
||||
SERVER CONFIGURATION: If you plan to run a Sqoop metastore on this machine,
|
||||
you should uncomment and set these parameters appropriately.
|
||||
|
||||
You should then configure clients with:
|
||||
sqoop.metastore.client.autoconnect.url =
|
||||
jdbc:hsqldb:hsql://<server-name>:<port>/sqoop
|
||||
-->
|
||||
<!--
|
||||
<property>
|
||||
<name>sqoop.metastore.server.location</name>
|
||||
<value>/tmp/sqoop-metastore/shared.db</value>
|
||||
<description>Path to the shared metastore database files.
|
||||
If this is not set, it will be placed in ~/.sqoop/.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>sqoop.metastore.server.port</name>
|
||||
<value>16000</value>
|
||||
<description>Port that this metastore should listen on.
|
||||
</description>
|
||||
</property>
|
||||
-->
|
||||
|
||||
</configuration>
|
@ -46,7 +46,6 @@ public class Sqoop extends Configured implements Tool {
|
||||
public static final String SQOOP_RETHROW_PROPERTY = "sqoop.throwOnError";
|
||||
|
||||
static {
|
||||
Configuration.addDefaultResource("sqoop-default.xml");
|
||||
Configuration.addDefaultResource("sqoop-site.xml");
|
||||
}
|
||||
|
||||
|
@ -20,9 +20,7 @@
|
||||
package com.cloudera.sqoop;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Properties;
|
||||
|
||||
@ -39,6 +37,15 @@ public class SqoopOptions {
|
||||
|
||||
public static final Log LOG = LogFactory.getLog(SqoopOptions.class.getName());
|
||||
|
||||
/**
|
||||
* Set to true in configuration if you want to put db passwords
|
||||
* in the metastore.
|
||||
*/
|
||||
public static final String METASTORE_PASSWORD_KEY =
|
||||
"sqoop.metastore.client.record.password";
|
||||
|
||||
public static final boolean METASTORE_PASSWORD_DEFAULT = false;
|
||||
|
||||
/**
|
||||
* Thrown when invalid cmdline options are given.
|
||||
*/
|
||||
@ -68,19 +75,20 @@ public enum FileLayout {
|
||||
|
||||
|
||||
// TODO(aaron): Adding something here? Add a setter and a getter.
|
||||
// Add a default value in initDefaults() if you need one.
|
||||
// If you want to load from a properties file, add an entry in the
|
||||
// loadFromProperties() method.
|
||||
// Then add command-line arguments in the appropriate tools. The
|
||||
// names of all command-line args are stored as constants in BaseSqoopTool.
|
||||
// Add a default value in initDefaults() if you need one. If this value
|
||||
// needs to be serialized in the metastore for this session, you need to add
|
||||
// an appropriate line to loadProperties() and writeProperties(). Then add
|
||||
// command-line arguments in the appropriate tools. The names of all
|
||||
// command-line args are stored as constants in BaseSqoopTool.
|
||||
|
||||
private String connectString;
|
||||
private String tableName;
|
||||
private String [] columns;
|
||||
private String username;
|
||||
private String password;
|
||||
private String password; // May not be serialized, based on configuration.
|
||||
private String codeOutputDir;
|
||||
private String jarOutputDir;
|
||||
private String hadoopHome;
|
||||
private String hadoopHome; // not serialized to metastore.
|
||||
private String splitByCol;
|
||||
private String whereClause;
|
||||
private String sqlQuery;
|
||||
@ -90,8 +98,8 @@ public enum FileLayout {
|
||||
private boolean append;
|
||||
private FileLayout layout;
|
||||
private boolean direct; // if true and conn is mysql, use mysqldump.
|
||||
private String tmpDir; // where temp data goes; usually /tmp
|
||||
private String hiveHome;
|
||||
private String tmpDir; // where temp data goes; usually /tmp; not serialized.
|
||||
private String hiveHome; // not serialized to metastore.
|
||||
private boolean hiveImport;
|
||||
private boolean overwriteHiveTable;
|
||||
private String hiveTableName;
|
||||
@ -99,6 +107,7 @@ public enum FileLayout {
|
||||
|
||||
// An ordered list of column names denoting what order columns are
|
||||
// serialized to a PreparedStatement from a generated record type.
|
||||
// Not serialized to metastore.
|
||||
private String [] dbOutColumns;
|
||||
|
||||
// package+class to apply to individual table import.
|
||||
@ -133,8 +142,6 @@ public enum FileLayout {
|
||||
|
||||
public static final int DEFAULT_NUM_MAPPERS = 4;
|
||||
|
||||
private static final String DEFAULT_CONFIG_FILE = "sqoop.properties";
|
||||
|
||||
private String [] extraArgs;
|
||||
|
||||
private String hbaseTable; // HBase table to import into.
|
||||
@ -182,62 +189,300 @@ private long getLongProperty(Properties props, String propName,
|
||||
}
|
||||
}
|
||||
|
||||
private void loadFromProperties() {
|
||||
File configFile = new File(DEFAULT_CONFIG_FILE);
|
||||
if (!configFile.canRead()) {
|
||||
return; //can't do this.
|
||||
private int getIntProperty(Properties props, String propName,
|
||||
int defaultVal) {
|
||||
long longVal = getLongProperty(props, propName, defaultVal);
|
||||
return (int) longVal;
|
||||
}
|
||||
|
||||
private char getCharProperty(Properties props, String propName,
|
||||
char defaultVal) {
|
||||
int intVal = getIntProperty(props, propName, (int) defaultVal);
|
||||
return (char) intVal;
|
||||
}
|
||||
|
||||
private DelimiterSet getDelimiterProperties(Properties props,
|
||||
String prefix, DelimiterSet defaults) {
|
||||
|
||||
if (null == defaults) {
|
||||
defaults = new DelimiterSet();
|
||||
}
|
||||
|
||||
Properties props = new Properties();
|
||||
InputStream istream = null;
|
||||
try {
|
||||
LOG.info("Loading properties from " + configFile.getAbsolutePath());
|
||||
istream = new FileInputStream(configFile);
|
||||
props.load(istream);
|
||||
char field = getCharProperty(props, prefix + ".field",
|
||||
defaults.getFieldsTerminatedBy());
|
||||
char record = getCharProperty(props, prefix + ".record",
|
||||
defaults.getLinesTerminatedBy());
|
||||
char enclose = getCharProperty(props, prefix + ".enclose",
|
||||
defaults.getEnclosedBy());
|
||||
char escape = getCharProperty(props, prefix + ".escape",
|
||||
defaults.getEscapedBy());
|
||||
boolean required = getBooleanProperty(props, prefix +".enclose.required",
|
||||
defaults.isEncloseRequired());
|
||||
|
||||
this.hadoopHome = props.getProperty("hadoop.home", this.hadoopHome);
|
||||
this.codeOutputDir = props.getProperty("out.dir", this.codeOutputDir);
|
||||
this.jarOutputDir = props.getProperty("bin.dir", this.jarOutputDir);
|
||||
this.username = props.getProperty("db.username", this.username);
|
||||
this.password = props.getProperty("db.password", this.password);
|
||||
this.tableName = props.getProperty("db.table", this.tableName);
|
||||
this.connectString = props.getProperty("db.connect.url",
|
||||
this.connectString);
|
||||
this.splitByCol = props.getProperty("db.split.column", this.splitByCol);
|
||||
this.whereClause = props.getProperty("db.where.clause", this.whereClause);
|
||||
this.driverClassName = props.getProperty("jdbc.driver",
|
||||
this.driverClassName);
|
||||
this.warehouseDir = props.getProperty("hdfs.warehouse.dir",
|
||||
this.warehouseDir);
|
||||
this.hiveHome = props.getProperty("hive.home", this.hiveHome);
|
||||
this.className = props.getProperty("java.classname", this.className);
|
||||
this.packageName = props.getProperty("java.packagename",
|
||||
this.packageName);
|
||||
this.existingJarFile = props.getProperty("java.jar.file",
|
||||
this.existingJarFile);
|
||||
this.exportDir = props.getProperty("export.dir", this.exportDir);
|
||||
return new DelimiterSet(field, record, enclose, escape, required);
|
||||
}
|
||||
|
||||
this.direct = getBooleanProperty(props, "direct.import", this.direct);
|
||||
this.hiveImport = getBooleanProperty(props, "hive.import",
|
||||
this.hiveImport);
|
||||
this.overwriteHiveTable = getBooleanProperty(props,
|
||||
"hive.overwrite.table", this.overwriteHiveTable);
|
||||
this.useCompression = getBooleanProperty(props, "compression",
|
||||
this.useCompression);
|
||||
this.directSplitSize = getLongProperty(props, "direct.split.size",
|
||||
this.directSplitSize);
|
||||
} catch (IOException ioe) {
|
||||
LOG.error("Could not read properties file " + DEFAULT_CONFIG_FILE + ": "
|
||||
+ ioe.toString());
|
||||
} finally {
|
||||
if (null != istream) {
|
||||
try {
|
||||
istream.close();
|
||||
} catch (IOException ioe) {
|
||||
// Ignore this; we're closing.
|
||||
}
|
||||
private void setDelimiterProperties(Properties props,
|
||||
String prefix, DelimiterSet values) {
|
||||
putProperty(props, prefix + ".field",
|
||||
Integer.toString((int) values.getFieldsTerminatedBy()));
|
||||
putProperty(props, prefix + ".record",
|
||||
Integer.toString((int) values.getLinesTerminatedBy()));
|
||||
putProperty(props, prefix + ".enclose",
|
||||
Integer.toString((int) values.getEnclosedBy()));
|
||||
putProperty(props, prefix + ".escape",
|
||||
Integer.toString((int) values.getEscapedBy()));
|
||||
putProperty(props, prefix + ".enclose.required",
|
||||
Boolean.toString(values.isEncloseRequired()));
|
||||
}
|
||||
|
||||
/** Take a comma-delimited list of input and split the elements
|
||||
* into an output array. */
|
||||
private String [] listToArray(String strList) {
|
||||
return strList.split(",");
|
||||
}
|
||||
|
||||
private String arrayToList(String [] array) {
|
||||
if (null == array) {
|
||||
return null;
|
||||
}
|
||||
|
||||
StringBuilder sb = new StringBuilder();
|
||||
boolean first = true;
|
||||
for (String elem : array) {
|
||||
if (!first) {
|
||||
sb.append(",");
|
||||
}
|
||||
sb.append(elem);
|
||||
first = false;
|
||||
}
|
||||
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* A put() method for Properties that is tolerent of 'null' values.
|
||||
* If a null value is specified, the property is unset.
|
||||
*/
|
||||
private void putProperty(Properties props, String k, String v) {
|
||||
if (null == v) {
|
||||
props.remove(k);
|
||||
} else {
|
||||
props.setProperty(k, v);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a property prefix that denotes a set of numbered properties,
|
||||
* return an array containing all the properties.
|
||||
*
|
||||
* For instance, if prefix is "foo", then return properties "foo.0",
|
||||
* "foo.1", "foo.2", and so on as an array. If no such properties
|
||||
* exist, return 'defaults'.
|
||||
*/
|
||||
private String [] getArgArrayProperty(Properties props, String prefix,
|
||||
String [] defaults) {
|
||||
int cur = 0;
|
||||
ArrayList<String> al = new ArrayList<String>();
|
||||
while (true) {
|
||||
String curProp = prefix + "." + cur;
|
||||
String curStr = props.getProperty(curProp, null);
|
||||
if (null == curStr) {
|
||||
break;
|
||||
}
|
||||
|
||||
al.add(curStr);
|
||||
cur++;
|
||||
}
|
||||
|
||||
if (cur == 0) {
|
||||
// Couldn't find an array here; return the defaults.
|
||||
return defaults;
|
||||
}
|
||||
|
||||
return al.toArray(new String[0]);
|
||||
}
|
||||
|
||||
private void setArgArrayProperties(Properties props, String prefix,
|
||||
String [] values) {
|
||||
if (null == values) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (int i = 0; i < values.length; i++) {
|
||||
putProperty(props, prefix + "." + i, values[i]);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a set of properties, load this into the current SqoopOptions
|
||||
* instance.
|
||||
*/
|
||||
public void loadProperties(Properties props) {
|
||||
|
||||
this.connectString = props.getProperty("db.connect.string",
|
||||
this.connectString);
|
||||
this.username = props.getProperty("db.username", this.username);
|
||||
|
||||
if (getBooleanProperty(props, "db.require.password", false)) {
|
||||
// The user's password was stripped out from the metastore.
|
||||
// Require that the user enter it now.
|
||||
setPasswordFromConsole();
|
||||
} else {
|
||||
this.password = props.getProperty("db.password", this.password);
|
||||
}
|
||||
|
||||
this.tableName = props.getProperty("db.table", this.tableName);
|
||||
String colListStr = props.getProperty("db.column.list", null);
|
||||
if (null != colListStr) {
|
||||
this.columns = listToArray(colListStr);
|
||||
}
|
||||
|
||||
this.codeOutputDir = props.getProperty("codegen.output.dir",
|
||||
this.codeOutputDir);
|
||||
this.jarOutputDir = props.getProperty("codegen.compile.dir",
|
||||
this.jarOutputDir);
|
||||
|
||||
this.splitByCol = props.getProperty("db.split.column", this.splitByCol);
|
||||
this.whereClause = props.getProperty("db.where.clause", this.whereClause);
|
||||
this.sqlQuery = props.getProperty("db.query", this.sqlQuery);
|
||||
|
||||
this.driverClassName = props.getProperty("jdbc.driver.class",
|
||||
this.driverClassName);
|
||||
|
||||
this.warehouseDir = props.getProperty("hdfs.warehouse.dir",
|
||||
this.warehouseDir);
|
||||
this.targetDir = props.getProperty("hdfs.target.dir",
|
||||
this.targetDir);
|
||||
this.append = getBooleanProperty(props, "hdfs.append.dir", this.append);
|
||||
|
||||
String fileFmtStr = props.getProperty("hdfs.file.format", "text");
|
||||
if (fileFmtStr.equals("seq")) {
|
||||
this.layout = FileLayout.SequenceFile;
|
||||
} else {
|
||||
this.layout = FileLayout.TextFile;
|
||||
}
|
||||
|
||||
this.direct = getBooleanProperty(props, "direct.import", this.direct);
|
||||
|
||||
this.hiveImport = getBooleanProperty(props, "hive.import",
|
||||
this.hiveImport);
|
||||
this.overwriteHiveTable = getBooleanProperty(props,
|
||||
"hive.overwrite.table", this.overwriteHiveTable);
|
||||
this.hiveTableName = props.getProperty("hive.table.name",
|
||||
this.hiveTableName);
|
||||
|
||||
this.className = props.getProperty("codegen.java.classname",
|
||||
this.className);
|
||||
this.packageName = props.getProperty("codegen.java.packagename",
|
||||
this.packageName);
|
||||
this.existingJarFile = props.getProperty("codegen.jar.file",
|
||||
this.existingJarFile);
|
||||
|
||||
this.numMappers = getIntProperty(props, "mapreduce.num.mappers",
|
||||
this.numMappers);
|
||||
|
||||
this.useCompression = getBooleanProperty(props, "enable.compression",
|
||||
this.useCompression);
|
||||
|
||||
this.directSplitSize = getLongProperty(props, "import.direct.split.size",
|
||||
this.directSplitSize);
|
||||
|
||||
this.maxInlineLobSize = getLongProperty(props,
|
||||
"import.max.inline.lob.size", this.maxInlineLobSize);
|
||||
|
||||
this.exportDir = props.getProperty("export.source.dir", this.exportDir);
|
||||
this.updateKeyCol = props.getProperty("export.update.col",
|
||||
this.updateKeyCol);
|
||||
|
||||
this.inputDelimiters = getDelimiterProperties(props,
|
||||
"codegen.input.delimiters", this.inputDelimiters);
|
||||
this.outputDelimiters = getDelimiterProperties(props,
|
||||
"codegen.output.delimiters", this.outputDelimiters);
|
||||
|
||||
this.extraArgs = getArgArrayProperty(props, "tool.arguments",
|
||||
this.extraArgs);
|
||||
|
||||
this.hbaseTable = props.getProperty("hbase.table", this.hbaseTable);
|
||||
this.hbaseColFamily = props.getProperty("hbase.col.family",
|
||||
this.hbaseColFamily);
|
||||
this.hbaseRowKeyCol = props.getProperty("hbase.row.key.col",
|
||||
this.hbaseRowKeyCol);
|
||||
this.hbaseCreateTable = getBooleanProperty(props, "hbase.create.table",
|
||||
this.hbaseCreateTable);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a Properties instance that encapsulates all the "sticky"
|
||||
* state of this SqoopOptions that should be written to a metastore
|
||||
* to restore the session later.
|
||||
*/
|
||||
public Properties writeProperties() {
|
||||
Properties props = new Properties();
|
||||
|
||||
putProperty(props, "db.connect.string", this.connectString);
|
||||
putProperty(props, "db.username", this.username);
|
||||
|
||||
if (this.getConf().getBoolean(
|
||||
METASTORE_PASSWORD_KEY, METASTORE_PASSWORD_DEFAULT)) {
|
||||
// If the user specifies, we may store the password in the metastore.
|
||||
putProperty(props, "db.password", this.password);
|
||||
putProperty(props, "db.require.password", "false");
|
||||
} else if (this.password != null) {
|
||||
// Otherwise, if the user has set a password, we just record
|
||||
// a flag stating that the password will need to be reentered.
|
||||
putProperty(props, "db.require.password", "true");
|
||||
} else {
|
||||
// No password saved or required.
|
||||
putProperty(props, "db.require.password", "false");
|
||||
}
|
||||
|
||||
putProperty(props, "db.table", this.tableName);
|
||||
putProperty(props, "db.column.list", arrayToList(this.columns));
|
||||
putProperty(props, "codegen.output.dir", this.codeOutputDir);
|
||||
putProperty(props, "codegen.compile.dir", this.jarOutputDir);
|
||||
putProperty(props, "db.split.column", this.splitByCol);
|
||||
putProperty(props, "db.where.clause", this.whereClause);
|
||||
putProperty(props, "db.query", this.sqlQuery);
|
||||
putProperty(props, "jdbc.driver.class", this.driverClassName);
|
||||
putProperty(props, "hdfs.warehouse.dir", this.warehouseDir);
|
||||
putProperty(props, "hdfs.target.dir", this.targetDir);
|
||||
putProperty(props, "hdfs.append.dir", Boolean.toString(this.append));
|
||||
if (this.layout == FileLayout.SequenceFile) {
|
||||
putProperty(props, "hdfs.file.format", "seq");
|
||||
} else {
|
||||
putProperty(props, "hdfs.file.format", "text");
|
||||
}
|
||||
putProperty(props, "direct.import", Boolean.toString(this.direct));
|
||||
putProperty(props, "hive.import", Boolean.toString(this.hiveImport));
|
||||
putProperty(props, "hive.overwrite.table",
|
||||
Boolean.toString(this.overwriteHiveTable));
|
||||
putProperty(props, "hive.table.name", this.hiveTableName);
|
||||
putProperty(props, "codegen.java.classname", this.className);
|
||||
putProperty(props, "codegen.java.packagename", this.packageName);
|
||||
putProperty(props, "codegen.jar.file", this.existingJarFile);
|
||||
putProperty(props, "mapreduce.num.mappers",
|
||||
Integer.toString(this.numMappers));
|
||||
putProperty(props, "enable.compression",
|
||||
Boolean.toString(this.useCompression));
|
||||
putProperty(props, "import.direct.split.size",
|
||||
Long.toString(this.directSplitSize));
|
||||
putProperty(props, "import.max.inline.lob.size",
|
||||
Long.toString(this.maxInlineLobSize));
|
||||
putProperty(props, "export.source.dir", this.exportDir);
|
||||
putProperty(props, "export.update.col", this.updateKeyCol);
|
||||
setDelimiterProperties(props, "codegen.input.delimiters",
|
||||
this.inputDelimiters);
|
||||
setDelimiterProperties(props, "codegen.output.delimiters",
|
||||
this.outputDelimiters);
|
||||
setArgArrayProperties(props, "tool.arguments", this.extraArgs);
|
||||
putProperty(props, "hbase.table", this.hbaseTable);
|
||||
putProperty(props, "hbase.col.family", this.hbaseColFamily);
|
||||
putProperty(props, "hbase.row.key.col", this.hbaseRowKeyCol);
|
||||
putProperty(props, "hbase.create.table",
|
||||
Boolean.toString(this.hbaseCreateTable));
|
||||
|
||||
return props;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -291,8 +536,6 @@ private void initDefaults(Configuration baseConfiguration) {
|
||||
this.extraArgs = null;
|
||||
|
||||
this.dbOutColumns = null;
|
||||
|
||||
loadFromProperties();
|
||||
}
|
||||
|
||||
/**
|
||||
|
69
src/java/com/cloudera/sqoop/metastore/SessionData.java
Normal file
69
src/java/com/cloudera/sqoop/metastore/SessionData.java
Normal file
@ -0,0 +1,69 @@
|
||||
/**
|
||||
* Licensed to Cloudera, Inc. under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Cloudera, Inc. licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.cloudera.sqoop.metastore;
|
||||
|
||||
import com.cloudera.sqoop.SqoopOptions;
|
||||
import com.cloudera.sqoop.tool.SqoopTool;
|
||||
|
||||
/**
|
||||
* Container for all session data that should be stored to a
|
||||
* permanent resource.
|
||||
*/
|
||||
public class SessionData {
|
||||
private SqoopOptions opts;
|
||||
private SqoopTool tool;
|
||||
|
||||
public SessionData() {
|
||||
}
|
||||
|
||||
public SessionData(SqoopOptions options, SqoopTool sqoopTool) {
|
||||
this.opts = options;
|
||||
this.tool = sqoopTool;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the SqoopOptions.
|
||||
*/
|
||||
public SqoopOptions getSqoopOptions() {
|
||||
return this.opts;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the SqoopTool.
|
||||
*/
|
||||
public SqoopTool getSqoopTool() {
|
||||
return this.tool;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the SqoopOptions.
|
||||
*/
|
||||
public void setSqoopOptions(SqoopOptions options) {
|
||||
this.opts = options;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the SqoopTool.
|
||||
*/
|
||||
public void setSqoopTool(SqoopTool sqoopTool) {
|
||||
this.tool = sqoopTool;
|
||||
}
|
||||
|
||||
}
|
||||
|
94
src/java/com/cloudera/sqoop/metastore/SessionStorage.java
Normal file
94
src/java/com/cloudera/sqoop/metastore/SessionStorage.java
Normal file
@ -0,0 +1,94 @@
|
||||
/**
|
||||
* Licensed to Cloudera, Inc. under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Cloudera, Inc. licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.cloudera.sqoop.metastore;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.conf.Configured;
|
||||
|
||||
/**
|
||||
* API that defines how sessions are saved, restored, and manipulated.
|
||||
*
|
||||
* <p>
|
||||
* SessionStorage instances may be created and then not used; the
|
||||
* SessionStorage factory may create additional SessionStorage instances
|
||||
* that return false from accept() and then discard them. The close()
|
||||
* method will only be triggered for a SessionStorage if the connect()
|
||||
* method is called. Connection should not be triggered by a call to
|
||||
* accept().</p>
|
||||
*/
|
||||
public abstract class SessionStorage extends Configured implements Closeable {
|
||||
|
||||
/**
|
||||
* Returns true if the SessionStorage system can use the metadata in
|
||||
* the descriptor to connect to an underlying session resource.
|
||||
*/
|
||||
public abstract boolean canAccept(Map<String, String> descriptor);
|
||||
|
||||
|
||||
/**
|
||||
* Opens / connects to the underlying storage resource specified by the
|
||||
* descriptor.
|
||||
*/
|
||||
public abstract void open(Map<String, String> descriptor)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Given a session name, reconstitute a SessionData that contains all
|
||||
* configuration information required for the session. Returns null if the
|
||||
* session name does not match an available session.
|
||||
*/
|
||||
public abstract SessionData read(String sessionName)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Forget about a saved session.
|
||||
*/
|
||||
public abstract void delete(String sessionName) throws IOException;
|
||||
|
||||
/**
|
||||
* Given a session name and the data describing a configured
|
||||
* session, record the session information to the storage medium.
|
||||
*/
|
||||
public abstract void create(String sessionName, SessionData data)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Given a session descriptor and a configured session
|
||||
* update the underlying resource to match the current session
|
||||
* configuration.
|
||||
*/
|
||||
public abstract void update(String sessionName, SessionData data)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Close any resources opened by the SessionStorage system.
|
||||
*/
|
||||
public void close() throws IOException {
|
||||
}
|
||||
|
||||
/**
|
||||
* Enumerate all sessions held in the connected resource.
|
||||
*/
|
||||
public abstract List<String> list() throws IOException;
|
||||
}
|
||||
|
@ -0,0 +1,72 @@
|
||||
/**
|
||||
* Licensed to Cloudera, Inc. under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Cloudera, Inc. licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.cloudera.sqoop.metastore;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
/**
|
||||
* Factory that produces the correct SessionStorage system to work with
|
||||
* a particular session descriptor.
|
||||
*/
|
||||
public class SessionStorageFactory {
|
||||
|
||||
private Configuration conf;
|
||||
|
||||
/**
|
||||
* Configuration key describing the list of SessionStorage implementations
|
||||
* to use to handle sessions.
|
||||
*/
|
||||
public static final String AVAILABLE_STORAGES_KEY =
|
||||
"sqoop.session.storage.implementations";
|
||||
|
||||
/** The default list of available SessionStorage implementations. */
|
||||
private static final String DEFAULT_AVAILABLE_STORAGES =
|
||||
"com.cloudera.sqoop.metastore.hsqldb.HsqldbSessionStorage,"
|
||||
+ "com.cloudera.sqoop.metastore.hsqldb.AutoHsqldbStorage";
|
||||
|
||||
public SessionStorageFactory(Configuration config) {
|
||||
this.conf = config;
|
||||
|
||||
// Ensure that we always have an available storages list.
|
||||
if (this.conf.get(AVAILABLE_STORAGES_KEY) == null) {
|
||||
this.conf.set(AVAILABLE_STORAGES_KEY, DEFAULT_AVAILABLE_STORAGES);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a session descriptor, determine the correct SessionStorage
|
||||
* implementation to use to handle the session and return an instance
|
||||
* of it -- or null if no SessionStorage instance is appropriate.
|
||||
*/
|
||||
public SessionStorage getSessionStorage(Map<String, String> descriptor) {
|
||||
List<SessionStorage> storages = this.conf.getInstances(
|
||||
AVAILABLE_STORAGES_KEY, SessionStorage.class);
|
||||
for (SessionStorage stor : storages) {
|
||||
if (stor.canAccept(descriptor)) {
|
||||
return stor;
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,113 @@
|
||||
/**
|
||||
* Licensed to Cloudera, Inc. under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Cloudera, Inc. licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.cloudera.sqoop.metastore.hsqldb;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
/**
|
||||
* SessionStorage implementation that auto-configures an HSQLDB
|
||||
* local-file-based instance to hold sessions.
|
||||
*/
|
||||
public class AutoHsqldbStorage extends HsqldbSessionStorage {
|
||||
|
||||
public static final Log LOG = LogFactory.getLog(
|
||||
AutoHsqldbStorage.class.getName());
|
||||
|
||||
/**
|
||||
* Configuration key specifying whether this storage agent is active.
|
||||
* Defaults to "on" to allow zero-conf local users.
|
||||
*/
|
||||
public static final String AUTO_STORAGE_IS_ACTIVE_KEY =
|
||||
"sqoop.metastore.client.enable.autoconnect";
|
||||
|
||||
/**
|
||||
* Configuration key specifying the connect string used by this
|
||||
* storage agent.
|
||||
*/
|
||||
public static final String AUTO_STORAGE_CONNECT_STRING_KEY =
|
||||
"sqoop.metastore.client.autoconnect.url";
|
||||
|
||||
/**
|
||||
* Configuration key specifying the username to bind with.
|
||||
*/
|
||||
public static final String AUTO_STORAGE_USER_KEY =
|
||||
"sqoop.metastore.client.autoconnect.username";
|
||||
|
||||
|
||||
/** HSQLDB default user is named 'SA'. */
|
||||
private static final String DEFAULT_AUTO_USER = "SA";
|
||||
|
||||
/**
|
||||
* Configuration key specifying the password to bind with.
|
||||
*/
|
||||
public static final String AUTO_STORAGE_PASS_KEY =
|
||||
"sqoop.metastore.client.autoconnect.password";
|
||||
|
||||
/** HSQLDB default user has an empty password. */
|
||||
public static final String DEFAULT_AUTO_PASSWORD = "";
|
||||
|
||||
@Override
|
||||
/** {@inheritDoc} */
|
||||
public boolean canAccept(Map<String, String> descriptor) {
|
||||
Configuration conf = this.getConf();
|
||||
return conf.getBoolean(AUTO_STORAGE_IS_ACTIVE_KEY, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine the user's home directory and return a connect
|
||||
* string to HSQLDB that uses ~/.sqoop/ as the storage location
|
||||
* for the metastore database.
|
||||
*/
|
||||
private String getHomeDirFileConnectStr() {
|
||||
String homeDir = System.getProperty("user.home");
|
||||
|
||||
File homeDirObj = new File(homeDir);
|
||||
File sqoopDataDirObj = new File(homeDirObj, ".sqoop");
|
||||
File databaseFileObj = new File(sqoopDataDirObj, "metastore.db");
|
||||
|
||||
String dbFileStr = databaseFileObj.toString();
|
||||
return "jdbc:hsqldb:file:" + dbFileStr
|
||||
+ ";hsqldb.write_delay=false;shutdown=true";
|
||||
}
|
||||
|
||||
@Override
|
||||
/**
|
||||
* Set the connection information to use the auto-inferred connection
|
||||
* string.
|
||||
*/
|
||||
public void open(Map<String, String> descriptor) throws IOException {
|
||||
Configuration conf = getConf();
|
||||
setMetastoreConnectStr(conf.get(AUTO_STORAGE_CONNECT_STRING_KEY,
|
||||
getHomeDirFileConnectStr()));
|
||||
setMetastoreUser(conf.get(AUTO_STORAGE_USER_KEY, DEFAULT_AUTO_USER));
|
||||
setMetastorePassword(conf.get(AUTO_STORAGE_PASS_KEY,
|
||||
DEFAULT_AUTO_PASSWORD));
|
||||
|
||||
init();
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,182 @@
|
||||
/**
|
||||
* Licensed to Cloudera, Inc. under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Cloudera, Inc. licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
|
||||
package com.cloudera.sqoop.metastore.hsqldb;
|
||||
|
||||
import java.io.File;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
import org.hsqldb.Server;
|
||||
import org.hsqldb.ServerConstants;
|
||||
|
||||
import com.cloudera.sqoop.SqoopOptions;
|
||||
|
||||
import com.cloudera.sqoop.manager.HsqldbManager;
|
||||
|
||||
/**
|
||||
* Container for an HSQLDB-backed metastore.
|
||||
*/
|
||||
public class HsqldbMetaStore {
|
||||
|
||||
public static final Log LOG = LogFactory.getLog(
|
||||
HsqldbMetaStore.class.getName());
|
||||
|
||||
/** Where on the local fs does the metastore put files? */
|
||||
public static final String META_STORAGE_LOCATION_KEY =
|
||||
"sqoop.metastore.server.location";
|
||||
|
||||
/**
|
||||
* What port does the metastore listen on?
|
||||
*/
|
||||
public static final String META_SERVER_PORT_KEY =
|
||||
"sqoop.metastore.server.port";
|
||||
|
||||
/** Default to this port if unset. */
|
||||
public static final int DEFAULT_PORT = 16000;
|
||||
|
||||
private int port;
|
||||
private String fileLocation;
|
||||
private Server server;
|
||||
private Configuration conf;
|
||||
|
||||
public HsqldbMetaStore(Configuration config) {
|
||||
this.conf = config;
|
||||
init();
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine the user's home directory and return a file path
|
||||
* under this root where the shared metastore can be placed.
|
||||
*/
|
||||
private String getHomeDirFilePath() {
|
||||
String homeDir = System.getProperty("user.home");
|
||||
|
||||
File homeDirObj = new File(homeDir);
|
||||
File sqoopDataDirObj = new File(homeDirObj, ".sqoop");
|
||||
File databaseFileObj = new File(sqoopDataDirObj, "shared-metastore.db");
|
||||
|
||||
return databaseFileObj.toString();
|
||||
}
|
||||
|
||||
private void init() {
|
||||
if (null != server) {
|
||||
LOG.debug("init(): server already exists.");
|
||||
return;
|
||||
}
|
||||
|
||||
fileLocation = conf.get(META_STORAGE_LOCATION_KEY, null);
|
||||
if (null == fileLocation) {
|
||||
fileLocation = getHomeDirFilePath();
|
||||
LOG.warn("The location for metastore data has not been explicitly set. "
|
||||
+ "Placing shared metastore files in " + fileLocation);
|
||||
}
|
||||
|
||||
this.port = conf.getInt(META_SERVER_PORT_KEY, DEFAULT_PORT);
|
||||
}
|
||||
|
||||
|
||||
public void start() {
|
||||
try {
|
||||
if (server != null) {
|
||||
server.checkRunning(false);
|
||||
}
|
||||
} catch (RuntimeException re) {
|
||||
LOG.info("Server is already started.");
|
||||
return;
|
||||
}
|
||||
|
||||
server = new Server();
|
||||
server.setDatabasePath(0, "file:" + fileLocation);
|
||||
server.setDatabaseName(0, "sqoop");
|
||||
server.putPropertiesFromString("hsqldb.write_delay=false");
|
||||
server.setPort(port);
|
||||
server.setSilent(true);
|
||||
server.setNoSystemExit(true);
|
||||
|
||||
server.start();
|
||||
LOG.info("Server started on port " + port + " with protocol "
|
||||
+ server.getProtocol());
|
||||
}
|
||||
|
||||
/**
|
||||
* Blocks the current thread until the server is shut down.
|
||||
*/
|
||||
public void waitForServer() {
|
||||
while (true) {
|
||||
int curState = server.getState();
|
||||
if (curState == ServerConstants.SERVER_STATE_SHUTDOWN) {
|
||||
LOG.info("Got shutdown notification");
|
||||
break;
|
||||
}
|
||||
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException ie) {
|
||||
LOG.info("Interrupted while blocking for server:"
|
||||
+ StringUtils.stringifyException(ie));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Connects to the server and instructs it to shutdown.
|
||||
*/
|
||||
public void shutdown() {
|
||||
// Send the SHUTDOWN command to the server via SQL.
|
||||
SqoopOptions options = new SqoopOptions(conf);
|
||||
options.setConnectString("jdbc:hsqldb:hsql://localhost:"
|
||||
+ port + "/sqoop");
|
||||
options.setUsername("SA");
|
||||
options.setPassword("");
|
||||
HsqldbManager manager = new HsqldbManager(options);
|
||||
Statement s = null;
|
||||
try {
|
||||
Connection c = manager.getConnection();
|
||||
s = c.createStatement();
|
||||
s.execute("SHUTDOWN");
|
||||
} catch (SQLException sqlE) {
|
||||
LOG.warn("Exception shutting down database: "
|
||||
+ StringUtils.stringifyException(sqlE));
|
||||
} finally {
|
||||
if (null != s) {
|
||||
try {
|
||||
s.close();
|
||||
} catch (SQLException sqlE) {
|
||||
LOG.warn("Error closing statement: " + sqlE);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
manager.close();
|
||||
} catch (SQLException sqlE) {
|
||||
LOG.warn("Error closing manager: " + sqlE);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,796 @@
|
||||
/**
|
||||
* Licensed to Cloudera, Inc. under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Cloudera, Inc. licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
|
||||
package com.cloudera.sqoop.metastore.hsqldb;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.DatabaseMetaData;
|
||||
import java.sql.DriverManager;
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
import com.cloudera.sqoop.SqoopOptions;
|
||||
|
||||
import com.cloudera.sqoop.metastore.SessionData;
|
||||
import com.cloudera.sqoop.metastore.SessionStorage;
|
||||
|
||||
import com.cloudera.sqoop.tool.SqoopTool;
|
||||
|
||||
/**
|
||||
* SessionStorage implementation that uses an HSQLDB-backed database to
|
||||
* hold session information.
|
||||
*/
|
||||
public class HsqldbSessionStorage extends SessionStorage {
|
||||
|
||||
public static final Log LOG = LogFactory.getLog(
|
||||
HsqldbSessionStorage.class.getName());
|
||||
|
||||
/** descriptor key identifying the connect string for the metastore. */
|
||||
public static final String META_CONNECT_KEY = "metastore.connect.string";
|
||||
|
||||
/** descriptor key identifying the username to use when connecting
|
||||
* to the metastore.
|
||||
*/
|
||||
public static final String META_USERNAME_KEY = "metastore.username";
|
||||
|
||||
/** descriptor key identifying the password to use when connecting
|
||||
* to the metastore.
|
||||
*/
|
||||
public static final String META_PASSWORD_KEY = "metastore.password";
|
||||
|
||||
|
||||
/** Default name for the root metadata table in HSQLDB. */
|
||||
private static final String DEFAULT_ROOT_TABLE_NAME = "SQOOP_ROOT";
|
||||
|
||||
/** Configuration key used to override root table name. */
|
||||
public static final String ROOT_TABLE_NAME_KEY =
|
||||
"sqoop.hsqldb.root.table.name";
|
||||
|
||||
/** root metadata table key used to define the current schema version. */
|
||||
private static final String STORAGE_VERSION_KEY =
|
||||
"sqoop.hsqldb.session.storage.version";
|
||||
|
||||
/** The current version number for the schema edition. */
|
||||
private static final int CUR_STORAGE_VERSION = 0;
|
||||
|
||||
/** root metadata table key used to define the session table name. */
|
||||
private static final String SESSION_TABLE_KEY =
|
||||
"sqoop.hsqldb.session.info.table";
|
||||
|
||||
/** Default value for SESSION_TABLE_KEY. */
|
||||
private static final String DEFAULT_SESSION_TABLE_NAME =
|
||||
"SQOOP_SESSIONS";
|
||||
|
||||
/** Per-session key with propClass 'schema' that defines the set of
|
||||
* properties valid to be defined for propClass 'SqoopOptions'. */
|
||||
private static final String PROPERTY_SET_KEY =
|
||||
"sqoop.property.set.id";
|
||||
|
||||
/** Current value for PROPERTY_SET_KEY. */
|
||||
private static final String CUR_PROPERTY_SET_ID = "0";
|
||||
|
||||
// The following are values for propClass in the v0 schema which
|
||||
// describe different aspects of the stored metadata.
|
||||
|
||||
/** Property class for properties about the stored data itself. */
|
||||
private static final String PROPERTY_CLASS_SCHEMA = "schema";
|
||||
|
||||
/** Property class for properties that are loaded into SqoopOptions. */
|
||||
private static final String PROPERTY_CLASS_SQOOP_OPTIONS = "SqoopOptions";
|
||||
|
||||
/** Property class for properties that are loaded into a Configuration. */
|
||||
private static final String PROPERTY_CLASS_CONFIG = "config";
|
||||
|
||||
/**
|
||||
* Per-session key with propClass 'schema' that specifies the SqoopTool
|
||||
* to load.
|
||||
*/
|
||||
private static final String SQOOP_TOOL_KEY = "sqoop.tool";
|
||||
|
||||
|
||||
private String metastoreConnectStr;
|
||||
private String metastoreUser;
|
||||
private String metastorePassword;
|
||||
private Connection connection;
|
||||
|
||||
protected Connection getConnection() {
|
||||
return this.connection;
|
||||
}
|
||||
|
||||
// After connection to the database and initialization of the
|
||||
// schema, this holds the name of the session table.
|
||||
private String sessionTableName;
|
||||
|
||||
protected void setMetastoreConnectStr(String connectStr) {
|
||||
this.metastoreConnectStr = connectStr;
|
||||
}
|
||||
|
||||
protected void setMetastoreUser(String user) {
|
||||
this.metastoreUser = user;
|
||||
}
|
||||
|
||||
protected void setMetastorePassword(String pass) {
|
||||
this.metastorePassword = pass;
|
||||
}
|
||||
|
||||
private static final String DB_DRIVER_CLASS = "org.hsqldb.jdbcDriver";
|
||||
|
||||
@Override
|
||||
/**
|
||||
* Initialize the connection to the database.
|
||||
*/
|
||||
public void open(Map<String, String> descriptor) throws IOException {
|
||||
setMetastoreConnectStr(descriptor.get(META_CONNECT_KEY));
|
||||
setMetastoreUser(descriptor.get(META_USERNAME_KEY));
|
||||
setMetastorePassword(descriptor.get(META_PASSWORD_KEY));
|
||||
|
||||
init();
|
||||
}
|
||||
|
||||
protected void init() throws IOException {
|
||||
try {
|
||||
// Load/initialize the JDBC driver.
|
||||
Class.forName(DB_DRIVER_CLASS);
|
||||
} catch (ClassNotFoundException cnfe) {
|
||||
throw new IOException("Could not load HSQLDB JDBC driver", cnfe);
|
||||
}
|
||||
|
||||
try {
|
||||
if (null == metastoreUser) {
|
||||
this.connection = DriverManager.getConnection(metastoreConnectStr);
|
||||
} else {
|
||||
this.connection = DriverManager.getConnection(metastoreConnectStr,
|
||||
metastoreUser, metastorePassword);
|
||||
}
|
||||
|
||||
connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
|
||||
connection.setAutoCommit(false);
|
||||
|
||||
// Initialize the root schema.
|
||||
if (!rootTableExists()) {
|
||||
createRootTable();
|
||||
}
|
||||
|
||||
// Check the schema version.
|
||||
String curStorageVerStr = getRootProperty(STORAGE_VERSION_KEY, null);
|
||||
int actualStorageVer = -1;
|
||||
try {
|
||||
actualStorageVer = Integer.valueOf(curStorageVerStr);
|
||||
} catch (NumberFormatException nfe) {
|
||||
LOG.warn("Could not interpret as a number: " + curStorageVerStr);
|
||||
}
|
||||
if (actualStorageVer != CUR_STORAGE_VERSION) {
|
||||
LOG.error("Can not interpret metadata schema");
|
||||
LOG.error("The metadata schema version is " + curStorageVerStr);
|
||||
LOG.error("The highest version supported is " + CUR_STORAGE_VERSION);
|
||||
LOG.error("To use this version of Sqoop, "
|
||||
+ "you must downgrade your metadata schema.");
|
||||
throw new IOException("Invalid metadata version.");
|
||||
}
|
||||
|
||||
// Initialize the versioned schema.
|
||||
initV0Schema();
|
||||
} catch (SQLException sqle) {
|
||||
if (null != connection) {
|
||||
try {
|
||||
connection.rollback();
|
||||
} catch (SQLException e2) {
|
||||
LOG.warn("Error rolling back transaction in error handler: " + e2);
|
||||
}
|
||||
}
|
||||
|
||||
throw new IOException("Exception creating SQL connection", sqle);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
if (null != this.connection) {
|
||||
try {
|
||||
LOG.debug("Flushing current transaction");
|
||||
this.connection.commit();
|
||||
} catch (SQLException sqlE) {
|
||||
throw new IOException("Exception committing connection", sqlE);
|
||||
}
|
||||
|
||||
try {
|
||||
LOG.debug("Closing connection");
|
||||
this.connection.close();
|
||||
} catch (SQLException sqlE) {
|
||||
throw new IOException("Exception closing connection", sqlE);
|
||||
} finally {
|
||||
this.connection = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
/** {@inheritDoc} */
|
||||
public boolean canAccept(Map<String, String> descriptor) {
|
||||
// We return true if the desciptor contains a connect string to find
|
||||
// the database.
|
||||
return descriptor.get(META_CONNECT_KEY) != null;
|
||||
}
|
||||
|
||||
@Override
|
||||
/** {@inheritDoc} */
|
||||
public SessionData read(String sessionName) throws IOException {
|
||||
try {
|
||||
if (!sessionExists(sessionName)) {
|
||||
LOG.error("Cannot restore session: " + sessionName);
|
||||
LOG.error("(No such session)");
|
||||
throw new IOException("Cannot restore missing session " + sessionName);
|
||||
}
|
||||
|
||||
LOG.debug("Restoring session: " + sessionName);
|
||||
Properties schemaProps = getV0Properties(sessionName,
|
||||
PROPERTY_CLASS_SCHEMA);
|
||||
Properties sqoopOptProps = getV0Properties(sessionName,
|
||||
PROPERTY_CLASS_SQOOP_OPTIONS);
|
||||
Properties configProps = getV0Properties(sessionName,
|
||||
PROPERTY_CLASS_CONFIG);
|
||||
|
||||
// Check that we're not using a saved session from a previous
|
||||
// version whose functionality has been deprecated.
|
||||
String thisPropSetId = schemaProps.getProperty(PROPERTY_SET_KEY);
|
||||
LOG.debug("System property set: " + CUR_PROPERTY_SET_ID);
|
||||
LOG.debug("Stored property set: " + thisPropSetId);
|
||||
if (!CUR_PROPERTY_SET_ID.equals(thisPropSetId)) {
|
||||
LOG.warn("The property set present in this database was written by");
|
||||
LOG.warn("an incompatible version of Sqoop. This may result in an");
|
||||
LOG.warn("incomplete operation.");
|
||||
// TODO(aaron): Should this fail out-right?
|
||||
}
|
||||
|
||||
String toolName = schemaProps.getProperty(SQOOP_TOOL_KEY);
|
||||
if (null == toolName) {
|
||||
// Don't know what tool to create.
|
||||
throw new IOException("Incomplete metadata; missing "
|
||||
+ SQOOP_TOOL_KEY);
|
||||
}
|
||||
|
||||
SqoopTool tool = SqoopTool.getTool(toolName);
|
||||
if (null == tool) {
|
||||
throw new IOException("Error in session metadata: invalid tool "
|
||||
+ toolName);
|
||||
}
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
for (Map.Entry<Object, Object> entry : configProps.entrySet()) {
|
||||
conf.set(entry.getKey().toString(), entry.getValue().toString());
|
||||
}
|
||||
|
||||
SqoopOptions opts = new SqoopOptions();
|
||||
opts.setConf(conf);
|
||||
opts.loadProperties(sqoopOptProps);
|
||||
|
||||
return new SessionData(opts, tool);
|
||||
} catch (SQLException sqlE) {
|
||||
throw new IOException("Error communicating with database", sqlE);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean sessionExists(String sessionName) throws SQLException {
|
||||
PreparedStatement s = connection.prepareStatement(
|
||||
"SELECT COUNT(session_name) FROM " + this.sessionTableName
|
||||
+ " WHERE session_name = ? GROUP BY session_name");
|
||||
ResultSet rs = null;
|
||||
try {
|
||||
s.setString(1, sessionName);
|
||||
rs = s.executeQuery();
|
||||
if (rs.next()) {
|
||||
return true; // We got a result, meaning the session exists.
|
||||
}
|
||||
} finally {
|
||||
if (null != rs) {
|
||||
try {
|
||||
rs.close();
|
||||
} catch (SQLException sqlE) {
|
||||
LOG.warn("Error closing result set: " + sqlE);
|
||||
}
|
||||
}
|
||||
|
||||
s.close();
|
||||
}
|
||||
|
||||
return false; // No result.
|
||||
}
|
||||
|
||||
@Override
|
||||
/** {@inheritDoc} */
|
||||
public void delete(String sessionName) throws IOException {
|
||||
try {
|
||||
if (!sessionExists(sessionName)) {
|
||||
LOG.error("No such session: " + sessionName);
|
||||
} else {
|
||||
LOG.debug("Deleting session: " + sessionName);
|
||||
PreparedStatement s = connection.prepareStatement("DELETE FROM "
|
||||
+ this.sessionTableName + " WHERE session_name = ?");
|
||||
try {
|
||||
s.setString(1, sessionName);
|
||||
s.executeUpdate();
|
||||
} finally {
|
||||
s.close();
|
||||
}
|
||||
connection.commit();
|
||||
}
|
||||
} catch (SQLException sqlEx) {
|
||||
try {
|
||||
connection.rollback();
|
||||
} catch (SQLException e2) {
|
||||
LOG.warn("Error rolling back transaction in error handler: " + e2);
|
||||
}
|
||||
throw new IOException("Error communicating with database", sqlEx);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
/** {@inheritDoc} */
|
||||
public void create(String sessionName, SessionData data)
|
||||
throws IOException {
|
||||
try {
|
||||
if (sessionExists(sessionName)) {
|
||||
LOG.error("Cannot create session " + sessionName
|
||||
+ ": it already exists");
|
||||
throw new IOException("Session " + sessionName + " already exists");
|
||||
}
|
||||
} catch (SQLException sqlE) {
|
||||
throw new IOException("Error communicating with database", sqlE);
|
||||
}
|
||||
|
||||
createInternal(sessionName, data);
|
||||
}
|
||||
|
||||
/**
|
||||
* Actually insert/update the resources for this session.
|
||||
*/
|
||||
private void createInternal(String sessionName, SessionData data)
|
||||
throws IOException {
|
||||
try {
|
||||
LOG.debug("Creating session: " + sessionName);
|
||||
|
||||
// Save the name of the Sqoop tool.
|
||||
setV0Property(sessionName, PROPERTY_CLASS_SCHEMA, SQOOP_TOOL_KEY,
|
||||
data.getSqoopTool().getToolName());
|
||||
|
||||
// Save the property set id.
|
||||
setV0Property(sessionName, PROPERTY_CLASS_SCHEMA, PROPERTY_SET_KEY,
|
||||
CUR_PROPERTY_SET_ID);
|
||||
|
||||
// Save all properties of the SqoopOptions.
|
||||
Properties props = data.getSqoopOptions().writeProperties();
|
||||
setV0Properties(sessionName, PROPERTY_CLASS_SQOOP_OPTIONS, props);
|
||||
|
||||
// And save all unique properties of the configuration.
|
||||
Configuration saveConf = data.getSqoopOptions().getConf();
|
||||
Configuration baseConf = new Configuration();
|
||||
|
||||
for (Map.Entry<String, String> entry : saveConf) {
|
||||
String key = entry.getKey();
|
||||
String rawVal = saveConf.getRaw(key);
|
||||
String baseVal = baseConf.getRaw(key);
|
||||
if (baseVal != null && rawVal.equals(baseVal)) {
|
||||
continue; // Don't save this; it's set in the base configuration.
|
||||
}
|
||||
|
||||
LOG.debug("Saving " + key + " => " + rawVal + " / " + baseVal);
|
||||
setV0Property(sessionName, PROPERTY_CLASS_CONFIG, key, rawVal);
|
||||
}
|
||||
|
||||
connection.commit();
|
||||
} catch (SQLException sqlE) {
|
||||
try {
|
||||
connection.rollback();
|
||||
} catch (SQLException sqlE2) {
|
||||
LOG.warn("Exception rolling back transaction during error handling: "
|
||||
+ sqlE2);
|
||||
}
|
||||
throw new IOException("Error communicating with database", sqlE);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
/** {@inheritDoc} */
|
||||
public void update(String sessionName, SessionData data)
|
||||
throws IOException {
|
||||
try {
|
||||
if (!sessionExists(sessionName)) {
|
||||
LOG.error("Cannot update session " + sessionName + ": not found");
|
||||
throw new IOException("Session " + sessionName + " does not exist");
|
||||
}
|
||||
} catch (SQLException sqlE) {
|
||||
throw new IOException("Error communicating with database", sqlE);
|
||||
}
|
||||
|
||||
// Since we set properties with update-or-insert, this is the same
|
||||
// as create on this system.
|
||||
createInternal(sessionName, data);
|
||||
}
|
||||
|
||||
@Override
|
||||
/** {@inheritDoc} */
|
||||
public List<String> list() throws IOException {
|
||||
ResultSet rs = null;
|
||||
try {
|
||||
PreparedStatement s = connection.prepareStatement(
|
||||
"SELECT DISTINCT session_name FROM " + this.sessionTableName);
|
||||
try {
|
||||
rs = s.executeQuery();
|
||||
ArrayList<String> sessions = new ArrayList<String>();
|
||||
while (rs.next()) {
|
||||
sessions.add(rs.getString(1));
|
||||
}
|
||||
|
||||
return sessions;
|
||||
} finally {
|
||||
if (null != rs) {
|
||||
try {
|
||||
rs.close();
|
||||
} catch (SQLException sqlE) {
|
||||
LOG.warn("Error closing resultset: " + sqlE);
|
||||
}
|
||||
}
|
||||
|
||||
if (null != s) {
|
||||
s.close();
|
||||
}
|
||||
}
|
||||
} catch (SQLException sqlE) {
|
||||
throw new IOException("Error communicating with database", sqlE);
|
||||
}
|
||||
}
|
||||
|
||||
// Determine the name to use for the root metadata table.
|
||||
private String getRootTableName() {
|
||||
Configuration conf = getConf();
|
||||
return conf.get(ROOT_TABLE_NAME_KEY, DEFAULT_ROOT_TABLE_NAME);
|
||||
}
|
||||
|
||||
private boolean tableExists(String table) throws SQLException {
|
||||
LOG.debug("Checking for table: " + table);
|
||||
DatabaseMetaData dbmd = connection.getMetaData();
|
||||
String [] tableTypes = { "TABLE" };
|
||||
ResultSet rs = dbmd.getTables(null, null, null, tableTypes);
|
||||
if (null != rs) {
|
||||
try {
|
||||
while (rs.next()) {
|
||||
if (table.equalsIgnoreCase(rs.getString("TABLE_NAME"))) {
|
||||
LOG.debug("Found table: " + table);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
rs.close();
|
||||
}
|
||||
}
|
||||
|
||||
LOG.debug("Could not find table.");
|
||||
return false;
|
||||
}
|
||||
|
||||
private boolean rootTableExists() throws SQLException {
|
||||
String rootTableName = getRootTableName();
|
||||
return tableExists(rootTableName);
|
||||
}
|
||||
|
||||
private void createRootTable() throws SQLException {
|
||||
String rootTableName = getRootTableName();
|
||||
LOG.debug("Creating root table: " + rootTableName);
|
||||
|
||||
// TODO: Sanity-check the value of rootTableName to ensure it is
|
||||
// not a SQL-injection attack vector.
|
||||
Statement s = connection.createStatement();
|
||||
try {
|
||||
s.executeUpdate("CREATE TABLE " + rootTableName + " ("
|
||||
+ "version INT, "
|
||||
+ "propname VARCHAR(128) NOT NULL, "
|
||||
+ "propval VARCHAR(256), "
|
||||
+ "CONSTRAINT " + rootTableName + "_unq UNIQUE (version, propname))");
|
||||
} finally {
|
||||
s.close();
|
||||
}
|
||||
|
||||
setRootProperty(STORAGE_VERSION_KEY, null,
|
||||
Integer.toString(CUR_STORAGE_VERSION));
|
||||
|
||||
LOG.debug("Saving root table.");
|
||||
connection.commit();
|
||||
}
|
||||
|
||||
/**
|
||||
* Look up a value for the specified version (may be null) in the
|
||||
* root metadata table.
|
||||
*/
|
||||
private String getRootProperty(String propertyName, Integer version)
|
||||
throws SQLException {
|
||||
LOG.debug("Looking up property " + propertyName + " for version "
|
||||
+ version);
|
||||
PreparedStatement s = null;
|
||||
ResultSet rs = null;
|
||||
|
||||
try {
|
||||
if (null == version) {
|
||||
s = connection.prepareStatement(
|
||||
"SELECT propval FROM " + getRootTableName()
|
||||
+ " WHERE version IS NULL AND propname = ?");
|
||||
s.setString(1, propertyName);
|
||||
} else {
|
||||
s = connection.prepareStatement(
|
||||
"SELECT propval FROM " + getRootTableName() + " WHERE version = ? "
|
||||
+ " AND propname = ?");
|
||||
s.setInt(1, version);
|
||||
s.setString(2, propertyName);
|
||||
}
|
||||
|
||||
rs = s.executeQuery();
|
||||
if (!rs.next()) {
|
||||
LOG.debug(" => (no result)");
|
||||
return null; // No such result.
|
||||
} else {
|
||||
String result = rs.getString(1); // Return the only result col.
|
||||
LOG.debug(" => " + result);
|
||||
return result;
|
||||
}
|
||||
} finally {
|
||||
if (null != rs) {
|
||||
try {
|
||||
rs.close();
|
||||
} catch (SQLException sqlE) {
|
||||
LOG.warn("Error closing resultset: " + sqlE);
|
||||
}
|
||||
}
|
||||
|
||||
if (null != s) {
|
||||
s.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Set a value for the specified version (may be null) in the root
|
||||
* metadata table.
|
||||
*/
|
||||
private void setRootProperty(String propertyName, Integer version,
|
||||
String val) throws SQLException {
|
||||
LOG.debug("Setting property " + propertyName + " for version "
|
||||
+ version + " => " + val);
|
||||
|
||||
PreparedStatement s;
|
||||
String curVal = getRootProperty(propertyName, version);
|
||||
if (null == curVal) {
|
||||
// INSERT the row.
|
||||
s = connection.prepareStatement("INSERT INTO " + getRootTableName()
|
||||
+ " (propval, propname, version) VALUES ( ? , ? , ? )");
|
||||
} else if (version == null) {
|
||||
// UPDATE an existing row with a null version
|
||||
s = connection.prepareStatement("UPDATE " + getRootTableName()
|
||||
+ " SET propval = ? WHERE propname = ? AND version IS NULL");
|
||||
} else {
|
||||
// UPDATE an existing row with non-null version.
|
||||
s = connection.prepareStatement("UPDATE " + getRootTableName()
|
||||
+ " SET propval = ? WHERE propname = ? AND version = ?");
|
||||
}
|
||||
|
||||
try {
|
||||
s.setString(1, val);
|
||||
s.setString(2, propertyName);
|
||||
if (null != version) {
|
||||
s.setInt(3, version);
|
||||
}
|
||||
s.executeUpdate();
|
||||
} finally {
|
||||
s.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the sessions table in the V0 schema.
|
||||
*/
|
||||
private void createSessionTable() throws SQLException {
|
||||
String curTableName = DEFAULT_SESSION_TABLE_NAME;
|
||||
int tableNum = -1;
|
||||
while (true) {
|
||||
if (tableExists(curTableName)) {
|
||||
tableNum++;
|
||||
curTableName = DEFAULT_SESSION_TABLE_NAME + "_" + tableNum;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// curTableName contains a table name that does not exist.
|
||||
// Create this table.
|
||||
LOG.debug("Creating session storage table: " + curTableName);
|
||||
Statement s = connection.createStatement();
|
||||
try {
|
||||
s.executeUpdate("CREATE TABLE " + curTableName + " ("
|
||||
+ "session_name VARCHAR(64) NOT NULL, "
|
||||
+ "propname VARCHAR(128) NOT NULL, "
|
||||
+ "propval VARCHAR(1024), "
|
||||
+ "propclass VARCHAR(32) NOT NULL, "
|
||||
+ "CONSTRAINT " + curTableName + "_unq UNIQUE "
|
||||
+ "(session_name, propname, propclass))");
|
||||
|
||||
// Then set a property in the root table pointing to it.
|
||||
setRootProperty(SESSION_TABLE_KEY, 0, curTableName);
|
||||
connection.commit();
|
||||
} finally {
|
||||
s.close();
|
||||
}
|
||||
|
||||
this.sessionTableName = curTableName;
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a root schema that exists,
|
||||
* initialize a version-0 key/value storage schema on top of it,
|
||||
* if it does not already exist.
|
||||
*/
|
||||
private void initV0Schema() throws SQLException {
|
||||
this.sessionTableName = getRootProperty(SESSION_TABLE_KEY, 0);
|
||||
if (null == this.sessionTableName) {
|
||||
createSessionTable();
|
||||
}
|
||||
if (!tableExists(this.sessionTableName)) {
|
||||
LOG.debug("Could not find session table: " + sessionTableName);
|
||||
createSessionTable();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* INSERT or UPDATE a single (session, propname, class) to point
|
||||
* to the specified property value.
|
||||
*/
|
||||
private void setV0Property(String sessionName, String propClass,
|
||||
String propName, String propVal) throws SQLException {
|
||||
LOG.debug("Session: " + sessionName + "; Setting property "
|
||||
+ propName + " with class " + propClass + " => " + propVal);
|
||||
|
||||
PreparedStatement s = null;
|
||||
try {
|
||||
String curValue = getV0Property(sessionName, propClass, propName);
|
||||
if (null == curValue) {
|
||||
// Property is not yet set.
|
||||
s = connection.prepareStatement("INSERT INTO " + this.sessionTableName
|
||||
+ " (propval, session_name, propclass, propname) "
|
||||
+ "VALUES (?, ?, ?, ?)");
|
||||
} else {
|
||||
// Overwrite existing property.
|
||||
s = connection.prepareStatement("UPDATE " + this.sessionTableName
|
||||
+ " SET propval = ? WHERE session_name = ? AND propclass = ? "
|
||||
+ "AND propname = ?");
|
||||
}
|
||||
|
||||
s.setString(1, propVal);
|
||||
s.setString(2, sessionName);
|
||||
s.setString(3, propClass);
|
||||
s.setString(4, propName);
|
||||
|
||||
s.executeUpdate();
|
||||
} finally {
|
||||
if (null != s) {
|
||||
s.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a string containing the value of a specified property,
|
||||
* or null if it is not set.
|
||||
*/
|
||||
private String getV0Property(String sessionName, String propClass,
|
||||
String propertyName) throws SQLException {
|
||||
LOG.debug("Session: " + sessionName + "; Getting property "
|
||||
+ propertyName + " with class " + propClass);
|
||||
|
||||
ResultSet rs = null;
|
||||
PreparedStatement s = connection.prepareStatement(
|
||||
"SELECT propval FROM " + this.sessionTableName
|
||||
+ " WHERE session_name = ? AND propclass = ? AND propname = ?");
|
||||
|
||||
try {
|
||||
s.setString(1, sessionName);
|
||||
s.setString(2, propClass);
|
||||
s.setString(3, propertyName);
|
||||
rs = s.executeQuery();
|
||||
|
||||
if (!rs.next()) {
|
||||
LOG.debug(" => (no result)");
|
||||
return null;
|
||||
}
|
||||
|
||||
String result = rs.getString(1);
|
||||
LOG.debug(" => " + result);
|
||||
return result;
|
||||
} finally {
|
||||
if (null != rs) {
|
||||
try {
|
||||
rs.close();
|
||||
} catch (SQLException sqlE) {
|
||||
LOG.warn("Error closing resultset: " + sqlE);
|
||||
}
|
||||
}
|
||||
|
||||
s.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a java.util.Properties containing all propName -> propVal
|
||||
* bindings for a given (sessionName, propClass).
|
||||
*/
|
||||
private Properties getV0Properties(String sessionName, String propClass)
|
||||
throws SQLException {
|
||||
LOG.debug("Session: " + sessionName
|
||||
+ "; Getting properties with class " + propClass);
|
||||
|
||||
ResultSet rs = null;
|
||||
PreparedStatement s = connection.prepareStatement(
|
||||
"SELECT propname, propval FROM " + this.sessionTableName
|
||||
+ " WHERE session_name = ? AND propclass = ?");
|
||||
try {
|
||||
s.setString(1, sessionName);
|
||||
s.setString(2, propClass);
|
||||
rs = s.executeQuery();
|
||||
|
||||
Properties p = new Properties();
|
||||
while (rs.next()) {
|
||||
p.setProperty(rs.getString(1), rs.getString(2));
|
||||
}
|
||||
|
||||
return p;
|
||||
} finally {
|
||||
if (null != rs) {
|
||||
try {
|
||||
rs.close();
|
||||
} catch (SQLException sqlE) {
|
||||
LOG.warn("Error closing result set: " + sqlE);
|
||||
}
|
||||
}
|
||||
|
||||
s.close();
|
||||
}
|
||||
}
|
||||
|
||||
private void setV0Properties(String sessionName, String propClass,
|
||||
Properties properties) throws SQLException {
|
||||
LOG.debug("Session: " + sessionName
|
||||
+ "; Setting bulk properties for class " + propClass);
|
||||
|
||||
for (Map.Entry<Object, Object> entry : properties.entrySet()) {
|
||||
String key = entry.getKey().toString();
|
||||
String val = entry.getValue().toString();
|
||||
setV0Property(sessionName, propClass, key, val);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -22,7 +22,9 @@
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.apache.commons.cli.CommandLine;
|
||||
import org.apache.commons.cli.Option;
|
||||
import org.apache.commons.cli.OptionBuilder;
|
||||
import org.apache.commons.cli.OptionGroup;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
@ -119,6 +121,17 @@ public abstract class BaseSqoopTool extends SqoopTool {
|
||||
public static final String HBASE_CREATE_TABLE_ARG = "hbase-create-table";
|
||||
|
||||
|
||||
// Arguments for the session management system.
|
||||
public static final String SESSION_METASTORE_ARG = "meta-connect";
|
||||
public static final String SESSION_CMD_CREATE_ARG = "create";
|
||||
public static final String SESSION_CMD_DELETE_ARG = "delete";
|
||||
public static final String SESSION_CMD_EXEC_ARG = "exec";
|
||||
public static final String SESSION_CMD_LIST_ARG = "list";
|
||||
public static final String SESSION_CMD_SHOW_ARG = "show";
|
||||
|
||||
// Arguments for the metastore.
|
||||
public static final String METASTORE_SHUTDOWN_ARG = "shutdown";
|
||||
|
||||
public BaseSqoopTool() {
|
||||
}
|
||||
|
||||
@ -234,6 +247,65 @@ protected boolean hasUnrecognizedArgs(String [] argv) {
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return RelatedOptions used by session management tools.
|
||||
*/
|
||||
protected RelatedOptions getSessionOptions() {
|
||||
RelatedOptions relatedOpts = new RelatedOptions(
|
||||
"Session management arguments");
|
||||
relatedOpts.addOption(OptionBuilder.withArgName("jdbc-uri")
|
||||
.hasArg()
|
||||
.withDescription("Specify JDBC connect string for the metastore")
|
||||
.withLongOpt(SESSION_METASTORE_ARG)
|
||||
.create());
|
||||
|
||||
// Create an option-group surrounding the operations a user
|
||||
// can perform on sessions.
|
||||
OptionGroup group = new OptionGroup();
|
||||
group.addOption(OptionBuilder.withArgName("session-id")
|
||||
.hasArg()
|
||||
.withDescription("Create a new session")
|
||||
.withLongOpt(SESSION_CMD_CREATE_ARG)
|
||||
.create());
|
||||
group.addOption(OptionBuilder.withArgName("session-id")
|
||||
.hasArg()
|
||||
.withDescription("Delete a saved session")
|
||||
.withLongOpt(SESSION_CMD_DELETE_ARG)
|
||||
.create());
|
||||
group.addOption(OptionBuilder.withArgName("session-id")
|
||||
.hasArg()
|
||||
.withDescription("Show the parameters for a saved session")
|
||||
.withLongOpt(SESSION_CMD_SHOW_ARG)
|
||||
.create());
|
||||
|
||||
Option execOption = OptionBuilder.withArgName("session-id")
|
||||
.hasArg()
|
||||
.withDescription("Run a saved session")
|
||||
.withLongOpt(SESSION_CMD_EXEC_ARG)
|
||||
.create();
|
||||
group.addOption(execOption);
|
||||
|
||||
group.addOption(OptionBuilder
|
||||
.withDescription("List saved sessions")
|
||||
.withLongOpt(SESSION_CMD_LIST_ARG)
|
||||
.create());
|
||||
|
||||
relatedOpts.addOptionGroup(group);
|
||||
|
||||
// Since the "common" options aren't used in the session tool,
|
||||
// add these settings here.
|
||||
relatedOpts.addOption(OptionBuilder
|
||||
.withDescription("Print more information while working")
|
||||
.withLongOpt(VERBOSE_ARG)
|
||||
.create());
|
||||
relatedOpts.addOption(OptionBuilder
|
||||
.withDescription("Print usage instructions")
|
||||
.withLongOpt(HELP_ARG)
|
||||
.create());
|
||||
|
||||
return relatedOpts;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return RelatedOptions used by most/all Sqoop tools.
|
||||
*/
|
||||
@ -690,5 +762,22 @@ protected void validateHBaseOptions(SqoopOptions options)
|
||||
+ HELP_STR);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Given an array of extra arguments (usually populated via
|
||||
* this.extraArguments), determine the offset of the first '--'
|
||||
* argument in the list. Return 'extra.length' if there is none.
|
||||
*/
|
||||
protected int getDashPosition(String [] extra) {
|
||||
int dashPos = extra.length;
|
||||
for (int i = 0; i < extra.length; i++) {
|
||||
if (extra[i].equals("--")) {
|
||||
dashPos = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return dashPos;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -445,13 +445,7 @@ public void validateOptions(SqoopOptions options)
|
||||
// If extraArguments is full, check for '--' followed by args for
|
||||
// mysqldump or other commands we rely on.
|
||||
options.setExtraArgs(getSubcommandArgs(extraArguments));
|
||||
int dashPos = extraArguments.length;
|
||||
for (int i = 0; i < extraArguments.length; i++) {
|
||||
if (extraArguments[i].equals("--")) {
|
||||
dashPos = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
int dashPos = getDashPosition(extraArguments);
|
||||
|
||||
if (hasUnrecognizedArgs(extraArguments, 0, dashPos)) {
|
||||
throw new InvalidOptionsException(HELP_STR);
|
||||
|
92
src/java/com/cloudera/sqoop/tool/MetastoreTool.java
Normal file
92
src/java/com/cloudera/sqoop/tool/MetastoreTool.java
Normal file
@ -0,0 +1,92 @@
|
||||
/**
|
||||
* Licensed to Cloudera, Inc. under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Cloudera, Inc. licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.cloudera.sqoop.tool;
|
||||
|
||||
import org.apache.commons.cli.CommandLine;
|
||||
import org.apache.commons.cli.OptionBuilder;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import com.cloudera.sqoop.SqoopOptions;
|
||||
import com.cloudera.sqoop.SqoopOptions.InvalidOptionsException;
|
||||
import com.cloudera.sqoop.cli.RelatedOptions;
|
||||
import com.cloudera.sqoop.cli.ToolOptions;
|
||||
|
||||
import com.cloudera.sqoop.metastore.hsqldb.HsqldbMetaStore;
|
||||
|
||||
/**
|
||||
* Tool that runs a standalone Sqoop metastore.
|
||||
*/
|
||||
public class MetastoreTool extends BaseSqoopTool {
|
||||
|
||||
public static final Log LOG = LogFactory.getLog(
|
||||
MetastoreTool.class.getName());
|
||||
|
||||
private HsqldbMetaStore metastore;
|
||||
|
||||
// If set to true, shut an existing metastore down.
|
||||
private boolean shutdown = false;
|
||||
|
||||
public MetastoreTool() {
|
||||
super("metastore");
|
||||
}
|
||||
|
||||
@Override
|
||||
/** {@inheritDoc} */
|
||||
public int run(SqoopOptions options) {
|
||||
metastore = new HsqldbMetaStore(options.getConf());
|
||||
if (shutdown) {
|
||||
LOG.info("Shutting down metastore...");
|
||||
metastore.shutdown();
|
||||
} else {
|
||||
metastore.start();
|
||||
metastore.waitForServer();
|
||||
LOG.info("Server thread has quit.");
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
/** Configure the command-line arguments we expect to receive */
|
||||
public void configureOptions(ToolOptions toolOptions) {
|
||||
RelatedOptions opts = new RelatedOptions("metastore arguments");
|
||||
opts.addOption(OptionBuilder
|
||||
.withDescription("Cleanly shut down a running metastore")
|
||||
.withLongOpt(METASTORE_SHUTDOWN_ARG)
|
||||
.create());
|
||||
|
||||
toolOptions.addUniqueOptions(opts);
|
||||
}
|
||||
|
||||
@Override
|
||||
/** {@inheritDoc} */
|
||||
public void applyOptions(CommandLine in, SqoopOptions out)
|
||||
throws InvalidOptionsException {
|
||||
if (in.hasOption(METASTORE_SHUTDOWN_ARG)) {
|
||||
this.shutdown = true;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
/** {@inheritDoc} */
|
||||
public void validateOptions(SqoopOptions options)
|
||||
throws InvalidOptionsException {
|
||||
}
|
||||
}
|
||||
|
400
src/java/com/cloudera/sqoop/tool/SessionTool.java
Normal file
400
src/java/com/cloudera/sqoop/tool/SessionTool.java
Normal file
@ -0,0 +1,400 @@
|
||||
/**
|
||||
* Licensed to Cloudera, Inc. under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Cloudera, Inc. licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.cloudera.sqoop.tool;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import org.apache.commons.cli.CommandLine;
|
||||
import org.apache.commons.cli.ParseException;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
|
||||
import org.apache.log4j.Category;
|
||||
import org.apache.log4j.Level;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import com.cloudera.sqoop.Sqoop;
|
||||
import com.cloudera.sqoop.SqoopOptions;
|
||||
import com.cloudera.sqoop.SqoopOptions.InvalidOptionsException;
|
||||
import com.cloudera.sqoop.cli.ToolOptions;
|
||||
|
||||
import com.cloudera.sqoop.metastore.hsqldb.HsqldbSessionStorage;
|
||||
import com.cloudera.sqoop.metastore.SessionData;
|
||||
import com.cloudera.sqoop.metastore.SessionStorage;
|
||||
import com.cloudera.sqoop.metastore.SessionStorageFactory;
|
||||
|
||||
/**
|
||||
* Tool that creates and executes saved sessions.
|
||||
*/
|
||||
public class SessionTool extends BaseSqoopTool {
|
||||
|
||||
public static final Log LOG = LogFactory.getLog(
|
||||
SessionTool.class.getName());
|
||||
|
||||
private enum SessionOp {
|
||||
SessionCreate,
|
||||
SessionDelete,
|
||||
SessionExecute,
|
||||
SessionList,
|
||||
SessionShow,
|
||||
};
|
||||
|
||||
private Map<String, String> sessionDescriptor;
|
||||
private String sessionName;
|
||||
private SessionOp operation;
|
||||
private SessionStorage storage;
|
||||
|
||||
public SessionTool() {
|
||||
super("session");
|
||||
}
|
||||
|
||||
/**
|
||||
* Given an array of strings, return all elements of this
|
||||
* array up to (but not including) the first instance of "--".
|
||||
*/
|
||||
private String [] getElementsUpToDoubleDash(String [] array) {
|
||||
String [] parseableChildArgv = null;
|
||||
for (int i = 0; i < array.length; i++) {
|
||||
if ("--".equals(array[i])) {
|
||||
parseableChildArgv = Arrays.copyOfRange(array, 0, i);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (parseableChildArgv == null) {
|
||||
// Didn't find any nested '--'.
|
||||
parseableChildArgv = array;
|
||||
}
|
||||
|
||||
return parseableChildArgv;
|
||||
}
|
||||
|
||||
/**
|
||||
* Given an array of strings, return the first instance
|
||||
* of "--" and all following elements.
|
||||
* If no "--" exists, return null.
|
||||
*/
|
||||
private String [] getElementsAfterDoubleDash(String [] array) {
|
||||
String [] extraChildArgv = null;
|
||||
for (int i = 0; i < array.length; i++) {
|
||||
if ("--".equals(array[i])) {
|
||||
extraChildArgv = Arrays.copyOfRange(array, i, array.length);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return extraChildArgv;
|
||||
}
|
||||
|
||||
private int configureChildTool(SqoopOptions childOptions,
|
||||
SqoopTool childTool, String [] childArgv) {
|
||||
// Within the child arguments there may be a '--' followed by
|
||||
// dependent args. Stash them off to the side.
|
||||
|
||||
// Everything up to the '--'.
|
||||
String [] parseableChildArgv = getElementsUpToDoubleDash(childArgv);
|
||||
|
||||
// The '--' and any subsequent args.
|
||||
String [] extraChildArgv = getElementsAfterDoubleDash(childArgv);
|
||||
|
||||
// Now feed the arguments into the tool itself.
|
||||
try {
|
||||
childOptions = childTool.parseArguments(parseableChildArgv,
|
||||
null, childOptions, false);
|
||||
childTool.appendArgs(extraChildArgv);
|
||||
childTool.validateOptions(childOptions);
|
||||
} catch (ParseException pe) {
|
||||
LOG.error("Error parsing arguments to the session-specific tool.");
|
||||
LOG.error("See 'sqoop help <tool>' for usage.");
|
||||
return 1;
|
||||
} catch (SqoopOptions.InvalidOptionsException e) {
|
||||
System.err.println(e.getMessage());
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 0; // Success.
|
||||
}
|
||||
|
||||
private int createSession(SqoopOptions options) throws IOException {
|
||||
// In our extraArguments array, we should have a '--' followed by
|
||||
// a tool name, and any tool-specific arguments.
|
||||
// Create an instance of the named tool and then configure it to
|
||||
// get a SqoopOptions out which we will serialize into a session.
|
||||
int dashPos = getDashPosition(extraArguments);
|
||||
int toolArgPos = dashPos + 1;
|
||||
if (null == extraArguments || toolArgPos < 0
|
||||
|| toolArgPos >= extraArguments.length) {
|
||||
LOG.error("No tool specified; cannot create a session.");
|
||||
LOG.error("Use: sqoop create-session [session-args] "
|
||||
+ "-- <tool-name> [tool-args]");
|
||||
return 1;
|
||||
}
|
||||
|
||||
String sessionToolName = extraArguments[toolArgPos];
|
||||
SqoopTool sessionTool = SqoopTool.getTool(sessionToolName);
|
||||
if (null == sessionTool) {
|
||||
LOG.error("No such tool available: " + sessionToolName);
|
||||
return 1;
|
||||
}
|
||||
|
||||
// Create a SqoopOptions and Configuration based on the current one,
|
||||
// but deep-copied. This will be populated within the session.
|
||||
SqoopOptions sessionOptions = new SqoopOptions();
|
||||
sessionOptions.setConf(new Configuration(options.getConf()));
|
||||
|
||||
// Get the arguments to feed to the child tool.
|
||||
String [] childArgs = Arrays.copyOfRange(extraArguments, toolArgPos + 1,
|
||||
extraArguments.length);
|
||||
|
||||
int confRet = configureChildTool(sessionOptions, sessionTool, childArgs);
|
||||
if (0 != confRet) {
|
||||
// Error.
|
||||
return confRet;
|
||||
}
|
||||
|
||||
// Now that the tool is fully configured, materialize the session.
|
||||
SessionData sessionData = new SessionData(sessionOptions, sessionTool);
|
||||
this.storage.create(sessionName, sessionData);
|
||||
return 0; // Success.
|
||||
}
|
||||
|
||||
private int listSessions(SqoopOptions opts) throws IOException {
|
||||
List<String> sessionNames = storage.list();
|
||||
System.out.println("Available sessions:");
|
||||
for (String name : sessionNames) {
|
||||
System.out.println(" " + name);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
private int deleteSession(SqoopOptions opts) throws IOException {
|
||||
this.storage.delete(sessionName);
|
||||
return 0;
|
||||
}
|
||||
|
||||
private int execSession(SqoopOptions opts) throws IOException {
|
||||
SessionData data = this.storage.read(sessionName);
|
||||
if (null == data) {
|
||||
LOG.error("No such session: " + sessionName);
|
||||
return 1;
|
||||
}
|
||||
|
||||
SqoopOptions childOpts = data.getSqoopOptions();
|
||||
SqoopTool childTool = data.getSqoopTool();
|
||||
|
||||
int dashPos = getDashPosition(extraArguments);
|
||||
String [] childArgv;
|
||||
if (dashPos >= extraArguments.length) {
|
||||
childArgv = new String[0];
|
||||
} else {
|
||||
childArgv = Arrays.copyOfRange(extraArguments, dashPos + 1,
|
||||
extraArguments.length);
|
||||
}
|
||||
|
||||
int confRet = configureChildTool(childOpts, childTool, childArgv);
|
||||
if (0 != confRet) {
|
||||
// Error.
|
||||
return confRet;
|
||||
}
|
||||
|
||||
return childTool.run(childOpts);
|
||||
}
|
||||
|
||||
private int showSession(SqoopOptions opts) throws IOException {
|
||||
SessionData data = this.storage.read(sessionName);
|
||||
if (null == data) {
|
||||
LOG.error("No such session: " + sessionName);
|
||||
return 1;
|
||||
}
|
||||
|
||||
SqoopOptions childOpts = data.getSqoopOptions();
|
||||
SqoopTool childTool = data.getSqoopTool();
|
||||
|
||||
System.out.println("Session: " + sessionName);
|
||||
System.out.println("Tool: " + childTool.getToolName());
|
||||
|
||||
System.out.println("Options:");
|
||||
System.out.println("----------------------------");
|
||||
Properties props = childOpts.writeProperties();
|
||||
for (Map.Entry<Object, Object> entry : props.entrySet()) {
|
||||
System.out.println(entry.getKey().toString() + " = " + entry.getValue());
|
||||
}
|
||||
|
||||
// TODO: This does not show entries in the Configuration
|
||||
// (SqoopOptions.getConf()) which were stored as different from the
|
||||
// default.
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
/** {@inheritDoc} */
|
||||
public int run(SqoopOptions options) {
|
||||
// Get a SessionStorage instance to use to materialize this session.
|
||||
SessionStorageFactory ssf = new SessionStorageFactory(options.getConf());
|
||||
this.storage = ssf.getSessionStorage(sessionDescriptor);
|
||||
if (null == this.storage) {
|
||||
LOG.error("There is no SessionStorage implementation available");
|
||||
LOG.error("that can read your specified session descriptor.");
|
||||
LOG.error("Don't know where to save this session info! You may");
|
||||
LOG.error("need to specify the connect string with --meta-connect.");
|
||||
return 1;
|
||||
}
|
||||
|
||||
try {
|
||||
// Open the storage layer.
|
||||
this.storage.open(this.sessionDescriptor);
|
||||
|
||||
// And now determine what operation to perform with it.
|
||||
switch (operation) {
|
||||
case SessionCreate:
|
||||
return createSession(options);
|
||||
case SessionDelete:
|
||||
return deleteSession(options);
|
||||
case SessionExecute:
|
||||
return execSession(options);
|
||||
case SessionList:
|
||||
return listSessions(options);
|
||||
case SessionShow:
|
||||
return showSession(options);
|
||||
default:
|
||||
LOG.error("Undefined session operation: " + operation);
|
||||
return 1;
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
LOG.error("I/O error performing session operation: "
|
||||
+ StringUtils.stringifyException(ioe));
|
||||
return 1;
|
||||
} finally {
|
||||
if (null != this.storage) {
|
||||
try {
|
||||
storage.close();
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("IOException closing SessionStorage: "
|
||||
+ StringUtils.stringifyException(ioe));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
/** Configure the command-line arguments we expect to receive */
|
||||
public void configureOptions(ToolOptions toolOptions) {
|
||||
toolOptions.addUniqueOptions(getSessionOptions());
|
||||
}
|
||||
|
||||
@Override
|
||||
/** {@inheritDoc} */
|
||||
public void applyOptions(CommandLine in, SqoopOptions out)
|
||||
throws InvalidOptionsException {
|
||||
|
||||
if (in.hasOption(VERBOSE_ARG)) {
|
||||
// Immediately switch into DEBUG logging.
|
||||
Category sqoopLogger = Logger.getLogger(
|
||||
Sqoop.class.getName()).getParent();
|
||||
sqoopLogger.setLevel(Level.DEBUG);
|
||||
LOG.debug("Enabled debug logging.");
|
||||
}
|
||||
|
||||
if (in.hasOption(HELP_ARG)) {
|
||||
ToolOptions toolOpts = new ToolOptions();
|
||||
configureOptions(toolOpts);
|
||||
printHelp(toolOpts);
|
||||
throw new InvalidOptionsException("");
|
||||
}
|
||||
|
||||
this.sessionDescriptor = new TreeMap<String, String>();
|
||||
|
||||
if (in.hasOption(SESSION_METASTORE_ARG)) {
|
||||
this.sessionDescriptor.put(HsqldbSessionStorage.META_CONNECT_KEY,
|
||||
in.getOptionValue(SESSION_METASTORE_ARG));
|
||||
}
|
||||
|
||||
// These are generated via an option group; exactly one
|
||||
// of this exhaustive list will always be selected.
|
||||
if (in.hasOption(SESSION_CMD_CREATE_ARG)) {
|
||||
this.operation = SessionOp.SessionCreate;
|
||||
this.sessionName = in.getOptionValue(SESSION_CMD_CREATE_ARG);
|
||||
} else if (in.hasOption(SESSION_CMD_DELETE_ARG)) {
|
||||
this.operation = SessionOp.SessionDelete;
|
||||
this.sessionName = in.getOptionValue(SESSION_CMD_DELETE_ARG);
|
||||
} else if (in.hasOption(SESSION_CMD_EXEC_ARG)) {
|
||||
this.operation = SessionOp.SessionExecute;
|
||||
this.sessionName = in.getOptionValue(SESSION_CMD_EXEC_ARG);
|
||||
} else if (in.hasOption(SESSION_CMD_LIST_ARG)) {
|
||||
this.operation = SessionOp.SessionList;
|
||||
} else if (in.hasOption(SESSION_CMD_SHOW_ARG)) {
|
||||
this.operation = SessionOp.SessionShow;
|
||||
this.sessionName = in.getOptionValue(SESSION_CMD_SHOW_ARG);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
/** {@inheritDoc} */
|
||||
public void validateOptions(SqoopOptions options)
|
||||
throws InvalidOptionsException {
|
||||
|
||||
if (null == operation
|
||||
|| (null == this.sessionName && operation != SessionOp.SessionList)) {
|
||||
throw new InvalidOptionsException("No session operation specified"
|
||||
+ HELP_STR);
|
||||
}
|
||||
|
||||
if (operation == SessionOp.SessionCreate) {
|
||||
// Check that we have a '--' followed by at least a tool name.
|
||||
if (extraArguments == null || extraArguments.length == 0) {
|
||||
throw new InvalidOptionsException(
|
||||
"Expected: -- <tool-name> [tool-args] "
|
||||
+ HELP_STR);
|
||||
}
|
||||
}
|
||||
|
||||
int dashPos = getDashPosition(extraArguments);
|
||||
if (hasUnrecognizedArgs(extraArguments, 0, dashPos)) {
|
||||
throw new InvalidOptionsException(HELP_STR);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
/** {@inheritDoc} */
|
||||
public void printHelp(ToolOptions opts) {
|
||||
System.out.println("usage: sqoop " + getToolName()
|
||||
+ " [GENERIC-ARGS] [SESSION-ARGS] [-- [<tool-name>] [TOOL-ARGS]]");
|
||||
System.out.println("");
|
||||
|
||||
opts.printHelp();
|
||||
|
||||
System.out.println("");
|
||||
System.out.println("Generic Hadoop command-line arguments:");
|
||||
System.out.println("(must preceed any tool-specific arguments)");
|
||||
ToolRunner.printGenericCommandUsage(System.out);
|
||||
}
|
||||
}
|
||||
|
@ -73,6 +73,10 @@ public abstract class SqoopTool {
|
||||
"List available databases on a server");
|
||||
registerTool("list-tables", ListTablesTool.class,
|
||||
"List available tables in a database");
|
||||
registerTool("metastore", MetastoreTool.class,
|
||||
"Run a standalone Sqoop metastore");
|
||||
registerTool("session", SessionTool.class,
|
||||
"Work with saved sessions");
|
||||
registerTool("version", VersionTool.class,
|
||||
"Display version information");
|
||||
}
|
||||
|
@ -30,6 +30,8 @@
|
||||
import com.cloudera.sqoop.manager.TestHsqldbManager;
|
||||
import com.cloudera.sqoop.manager.TestSqlManager;
|
||||
import com.cloudera.sqoop.mapreduce.MapreduceTests;
|
||||
|
||||
import com.cloudera.sqoop.metastore.TestSessions;
|
||||
import com.cloudera.sqoop.orm.TestClassWriter;
|
||||
import com.cloudera.sqoop.orm.TestParseMethods;
|
||||
import com.cloudera.sqoop.util.TestDirectImportUtils;
|
||||
@ -74,6 +76,7 @@ public static Test suite() {
|
||||
suite.addTestSuite(TestDirectImportUtils.class);
|
||||
suite.addTestSuite(TestLobFile.class);
|
||||
suite.addTestSuite(TestExportUpdate.class);
|
||||
suite.addTestSuite(TestSessions.class);
|
||||
suite.addTest(MapreduceTests.suite());
|
||||
|
||||
return suite;
|
||||
|
@ -18,12 +18,13 @@
|
||||
|
||||
package com.cloudera.sqoop;
|
||||
|
||||
import java.util.Properties;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
|
||||
import com.cloudera.sqoop.lib.DelimiterSet;
|
||||
import com.cloudera.sqoop.tool.ImportTool;
|
||||
|
||||
|
||||
/**
|
||||
* Test aspects of the SqoopOptions class.
|
||||
*/
|
||||
@ -234,4 +235,30 @@ public void testGoodNumMappers() throws Exception {
|
||||
SqoopOptions opts = parse(args);
|
||||
assertEquals(4, opts.getNumMappers());
|
||||
}
|
||||
|
||||
public void testPropertySerialization() {
|
||||
// Test that if we write a SqoopOptions out to a Properties,
|
||||
// and then read it back in, we get all the same results.
|
||||
SqoopOptions out = new SqoopOptions();
|
||||
out.setUsername("user");
|
||||
out.setConnectString("bla");
|
||||
out.setNumMappers(4);
|
||||
out.setAppendMode(true);
|
||||
out.setHBaseTable("hbasetable");
|
||||
out.setWarehouseDir("Warehouse");
|
||||
out.setClassName("someclass");
|
||||
out.setSplitByCol("somecol");
|
||||
out.setSqlQuery("the query");
|
||||
out.setPackageName("a.package");
|
||||
out.setHiveImport(true);
|
||||
|
||||
Properties outProps = out.writeProperties();
|
||||
|
||||
SqoopOptions in = new SqoopOptions();
|
||||
in.loadProperties(outProps);
|
||||
|
||||
Properties inProps = in.writeProperties();
|
||||
|
||||
assertEquals("properties don't match", outProps, inProps);
|
||||
}
|
||||
}
|
||||
|
205
src/test/com/cloudera/sqoop/metastore/TestSessions.java
Normal file
205
src/test/com/cloudera/sqoop/metastore/TestSessions.java
Normal file
@ -0,0 +1,205 @@
|
||||
/**
|
||||
* Licensed to Cloudera, Inc. under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Cloudera, Inc. licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.cloudera.sqoop.metastore;
|
||||
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
import com.cloudera.sqoop.SqoopOptions;
|
||||
import com.cloudera.sqoop.manager.HsqldbManager;
|
||||
import com.cloudera.sqoop.metastore.hsqldb.AutoHsqldbStorage;
|
||||
import com.cloudera.sqoop.tool.VersionTool;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.sql.Connection;
|
||||
|
||||
/**
|
||||
* Test the metastore and session-handling features.
|
||||
*
|
||||
* These all make use of the auto-connect hsqldb-based metastore.
|
||||
* The metastore URL is configured to be in-memory, and drop all
|
||||
* state between individual tests.
|
||||
*/
|
||||
public class TestSessions extends TestCase {
|
||||
|
||||
public static final String TEST_AUTOCONNECT_URL = "jdbc:hsqldb:mem:testdb";
|
||||
public static final String TEST_AUTOCONNECT_USER = "SA";
|
||||
public static final String TEST_AUTOCONNECT_PASS = "";
|
||||
|
||||
@Override
|
||||
public void setUp() throws Exception {
|
||||
// Delete db state between tests.
|
||||
resetSessionSchema();
|
||||
}
|
||||
|
||||
public static void resetSessionSchema() throws SQLException {
|
||||
SqoopOptions options = new SqoopOptions();
|
||||
options.setConnectString(TEST_AUTOCONNECT_URL);
|
||||
options.setUsername(TEST_AUTOCONNECT_USER);
|
||||
options.setPassword(TEST_AUTOCONNECT_PASS);
|
||||
|
||||
HsqldbManager manager = new HsqldbManager(options);
|
||||
Connection c = manager.getConnection();
|
||||
Statement s = c.createStatement();
|
||||
try {
|
||||
String [] tables = manager.listTables();
|
||||
for (String table : tables) {
|
||||
s.executeUpdate("DROP TABLE " + table);
|
||||
}
|
||||
|
||||
c.commit();
|
||||
} finally {
|
||||
s.close();
|
||||
}
|
||||
}
|
||||
|
||||
public static Configuration newConf() {
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(AutoHsqldbStorage.AUTO_STORAGE_USER_KEY, TEST_AUTOCONNECT_USER);
|
||||
conf.set(AutoHsqldbStorage.AUTO_STORAGE_PASS_KEY, TEST_AUTOCONNECT_PASS);
|
||||
conf.set(AutoHsqldbStorage.AUTO_STORAGE_CONNECT_STRING_KEY,
|
||||
TEST_AUTOCONNECT_URL);
|
||||
|
||||
return conf;
|
||||
}
|
||||
|
||||
public void testAutoConnect() throws IOException {
|
||||
// By default, we should be able to auto-connect with an
|
||||
// empty connection descriptor. We should see an empty
|
||||
// session set.
|
||||
|
||||
Configuration conf = newConf();
|
||||
SessionStorageFactory ssf = new SessionStorageFactory(conf);
|
||||
|
||||
Map<String, String> descriptor = new TreeMap<String, String>();
|
||||
SessionStorage storage = ssf.getSessionStorage(descriptor);
|
||||
|
||||
storage.open(descriptor);
|
||||
List<String> sessions = storage.list();
|
||||
assertEquals(0, sessions.size());
|
||||
storage.close();
|
||||
}
|
||||
|
||||
public void testCreateDeleteSession() throws IOException {
|
||||
Configuration conf = newConf();
|
||||
SessionStorageFactory ssf = new SessionStorageFactory(conf);
|
||||
|
||||
Map<String, String> descriptor = new TreeMap<String, String>();
|
||||
SessionStorage storage = ssf.getSessionStorage(descriptor);
|
||||
|
||||
storage.open(descriptor);
|
||||
|
||||
// Session list should start out empty.
|
||||
List<String> sessions = storage.list();
|
||||
assertEquals(0, sessions.size());
|
||||
|
||||
// Create a session that displays the version.
|
||||
SessionData data = new SessionData(new SqoopOptions(), new VersionTool());
|
||||
storage.create("versionSession", data);
|
||||
|
||||
sessions = storage.list();
|
||||
assertEquals(1, sessions.size());
|
||||
assertEquals("versionSession", sessions.get(0));
|
||||
|
||||
// Try to create that same session name again. This should fail.
|
||||
try {
|
||||
storage.create("versionSession", data);
|
||||
fail("Expected IOException; this session already exists.");
|
||||
} catch (IOException ioe) {
|
||||
// This is expected; continue operation.
|
||||
}
|
||||
|
||||
sessions = storage.list();
|
||||
assertEquals(1, sessions.size());
|
||||
|
||||
// Restore our session, check that it exists.
|
||||
SessionData outData = storage.read("versionSession");
|
||||
assertEquals(new VersionTool().getToolName(),
|
||||
outData.getSqoopTool().getToolName());
|
||||
|
||||
// Try to restore a session that doesn't exist. Watch it fail.
|
||||
try {
|
||||
storage.read("DoesNotExist");
|
||||
fail("Expected IOException");
|
||||
} catch (IOException ioe) {
|
||||
// This is expected. Continue.
|
||||
}
|
||||
|
||||
// Now delete the session.
|
||||
storage.delete("versionSession");
|
||||
|
||||
// After delete, we should have no sessions.
|
||||
sessions = storage.list();
|
||||
assertEquals(0, sessions.size());
|
||||
|
||||
storage.close();
|
||||
}
|
||||
|
||||
public void testMultiConnections() throws IOException {
|
||||
// Ensure that a session can be retrieved when the storage is
|
||||
// closed and reopened.
|
||||
|
||||
Configuration conf = newConf();
|
||||
SessionStorageFactory ssf = new SessionStorageFactory(conf);
|
||||
|
||||
Map<String, String> descriptor = new TreeMap<String, String>();
|
||||
SessionStorage storage = ssf.getSessionStorage(descriptor);
|
||||
|
||||
storage.open(descriptor);
|
||||
|
||||
// Session list should start out empty.
|
||||
List<String> sessions = storage.list();
|
||||
assertEquals(0, sessions.size());
|
||||
|
||||
// Create a session that displays the version.
|
||||
SessionData data = new SessionData(new SqoopOptions(), new VersionTool());
|
||||
storage.create("versionSession", data);
|
||||
|
||||
sessions = storage.list();
|
||||
assertEquals(1, sessions.size());
|
||||
assertEquals("versionSession", sessions.get(0));
|
||||
|
||||
storage.close(); // Close the existing connection
|
||||
|
||||
// Now re-open the storage.
|
||||
ssf = new SessionStorageFactory(newConf());
|
||||
storage = ssf.getSessionStorage(descriptor);
|
||||
storage.open(descriptor);
|
||||
|
||||
sessions = storage.list();
|
||||
assertEquals(1, sessions.size());
|
||||
assertEquals("versionSession", sessions.get(0));
|
||||
|
||||
// Restore our session, check that it exists.
|
||||
SessionData outData = storage.read("versionSession");
|
||||
assertEquals(new VersionTool().getToolName(),
|
||||
outData.getSqoopTool().getToolName());
|
||||
|
||||
storage.close();
|
||||
}
|
||||
}
|
||||
|
@ -40,6 +40,14 @@
|
||||
<Method name="map" />
|
||||
<Bug pattern="NP_ALWAYS_NULL" />
|
||||
</Match>
|
||||
<Match>
|
||||
<!-- createRootTable() allows a user-specified table name retrieved
|
||||
from properties. This since instance is allowed for now.
|
||||
-->
|
||||
<Class name="com.cloudera.sqoop.metastore.hsqldb.HsqldbSessionStorage" />
|
||||
<Method name="createRootTable" />
|
||||
<Bug pattern="SQL_NONCONSTANT_STRING_PASSED_TO_EXECUTE" />
|
||||
</Match>
|
||||
|
||||
<!-- The following broad categories suppress warnings in test code that do
|
||||
not need to be rigidly upheld. -->
|
||||
|
Loading…
Reference in New Issue
Block a user