diff --git a/build.xml b/build.xml index eb82ff75..2181586e 100644 --- a/build.xml +++ b/build.xml @@ -484,6 +484,14 @@ + + + diff --git a/conf/.gitignore b/conf/.gitignore new file mode 100644 index 00000000..c0b9de64 --- /dev/null +++ b/conf/.gitignore @@ -0,0 +1,15 @@ +# Licensed to Cloudera, Inc. under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# Cloudera, Inc. licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +/sqoop-site.xml diff --git a/conf/sqoop-default.xml b/conf/sqoop-default.xml deleted file mode 100644 index 5355acde..00000000 --- a/conf/sqoop-default.xml +++ /dev/null @@ -1,33 +0,0 @@ - - - - - - - - - - sqoop.connection.factories - com.cloudera.sqoop.manager.DefaultManagerFactory - A comma-delimited list of ManagerFactory implementations - which are consulted, in order, to instantiate ConnManager instances - used to drive connections to databases. - - - - diff --git a/conf/sqoop-site-template.xml b/conf/sqoop-site-template.xml new file mode 100644 index 00000000..0f06f84a --- /dev/null +++ b/conf/sqoop-site-template.xml @@ -0,0 +1,124 @@ + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/java/com/cloudera/sqoop/Sqoop.java b/src/java/com/cloudera/sqoop/Sqoop.java index 83521dbc..c33e609d 100644 --- a/src/java/com/cloudera/sqoop/Sqoop.java +++ b/src/java/com/cloudera/sqoop/Sqoop.java @@ -46,7 +46,6 @@ public class Sqoop extends Configured implements Tool { public static final String SQOOP_RETHROW_PROPERTY = "sqoop.throwOnError"; static { - Configuration.addDefaultResource("sqoop-default.xml"); Configuration.addDefaultResource("sqoop-site.xml"); } diff --git a/src/java/com/cloudera/sqoop/SqoopOptions.java b/src/java/com/cloudera/sqoop/SqoopOptions.java index 961b007f..37e18337 100644 --- a/src/java/com/cloudera/sqoop/SqoopOptions.java +++ b/src/java/com/cloudera/sqoop/SqoopOptions.java @@ -20,9 +20,7 @@ package com.cloudera.sqoop; import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; +import java.util.ArrayList; import java.util.Arrays; import java.util.Properties; @@ -39,6 +37,15 @@ public class SqoopOptions { public static final Log LOG = LogFactory.getLog(SqoopOptions.class.getName()); + /** + * Set to true in configuration if you want to put db passwords + * in the metastore. + */ + public static final String METASTORE_PASSWORD_KEY = + "sqoop.metastore.client.record.password"; + + public static final boolean METASTORE_PASSWORD_DEFAULT = false; + /** * Thrown when invalid cmdline options are given. */ @@ -68,19 +75,20 @@ public enum FileLayout { // TODO(aaron): Adding something here? Add a setter and a getter. - // Add a default value in initDefaults() if you need one. - // If you want to load from a properties file, add an entry in the - // loadFromProperties() method. - // Then add command-line arguments in the appropriate tools. The - // names of all command-line args are stored as constants in BaseSqoopTool. + // Add a default value in initDefaults() if you need one. If this value + // needs to be serialized in the metastore for this session, you need to add + // an appropriate line to loadProperties() and writeProperties(). Then add + // command-line arguments in the appropriate tools. The names of all + // command-line args are stored as constants in BaseSqoopTool. + private String connectString; private String tableName; private String [] columns; private String username; - private String password; + private String password; // May not be serialized, based on configuration. private String codeOutputDir; private String jarOutputDir; - private String hadoopHome; + private String hadoopHome; // not serialized to metastore. private String splitByCol; private String whereClause; private String sqlQuery; @@ -90,8 +98,8 @@ public enum FileLayout { private boolean append; private FileLayout layout; private boolean direct; // if true and conn is mysql, use mysqldump. - private String tmpDir; // where temp data goes; usually /tmp - private String hiveHome; + private String tmpDir; // where temp data goes; usually /tmp; not serialized. + private String hiveHome; // not serialized to metastore. private boolean hiveImport; private boolean overwriteHiveTable; private String hiveTableName; @@ -99,6 +107,7 @@ public enum FileLayout { // An ordered list of column names denoting what order columns are // serialized to a PreparedStatement from a generated record type. + // Not serialized to metastore. private String [] dbOutColumns; // package+class to apply to individual table import. @@ -133,8 +142,6 @@ public enum FileLayout { public static final int DEFAULT_NUM_MAPPERS = 4; - private static final String DEFAULT_CONFIG_FILE = "sqoop.properties"; - private String [] extraArgs; private String hbaseTable; // HBase table to import into. @@ -182,62 +189,300 @@ private long getLongProperty(Properties props, String propName, } } - private void loadFromProperties() { - File configFile = new File(DEFAULT_CONFIG_FILE); - if (!configFile.canRead()) { - return; //can't do this. + private int getIntProperty(Properties props, String propName, + int defaultVal) { + long longVal = getLongProperty(props, propName, defaultVal); + return (int) longVal; + } + + private char getCharProperty(Properties props, String propName, + char defaultVal) { + int intVal = getIntProperty(props, propName, (int) defaultVal); + return (char) intVal; + } + + private DelimiterSet getDelimiterProperties(Properties props, + String prefix, DelimiterSet defaults) { + + if (null == defaults) { + defaults = new DelimiterSet(); } - Properties props = new Properties(); - InputStream istream = null; - try { - LOG.info("Loading properties from " + configFile.getAbsolutePath()); - istream = new FileInputStream(configFile); - props.load(istream); + char field = getCharProperty(props, prefix + ".field", + defaults.getFieldsTerminatedBy()); + char record = getCharProperty(props, prefix + ".record", + defaults.getLinesTerminatedBy()); + char enclose = getCharProperty(props, prefix + ".enclose", + defaults.getEnclosedBy()); + char escape = getCharProperty(props, prefix + ".escape", + defaults.getEscapedBy()); + boolean required = getBooleanProperty(props, prefix +".enclose.required", + defaults.isEncloseRequired()); - this.hadoopHome = props.getProperty("hadoop.home", this.hadoopHome); - this.codeOutputDir = props.getProperty("out.dir", this.codeOutputDir); - this.jarOutputDir = props.getProperty("bin.dir", this.jarOutputDir); - this.username = props.getProperty("db.username", this.username); - this.password = props.getProperty("db.password", this.password); - this.tableName = props.getProperty("db.table", this.tableName); - this.connectString = props.getProperty("db.connect.url", - this.connectString); - this.splitByCol = props.getProperty("db.split.column", this.splitByCol); - this.whereClause = props.getProperty("db.where.clause", this.whereClause); - this.driverClassName = props.getProperty("jdbc.driver", - this.driverClassName); - this.warehouseDir = props.getProperty("hdfs.warehouse.dir", - this.warehouseDir); - this.hiveHome = props.getProperty("hive.home", this.hiveHome); - this.className = props.getProperty("java.classname", this.className); - this.packageName = props.getProperty("java.packagename", - this.packageName); - this.existingJarFile = props.getProperty("java.jar.file", - this.existingJarFile); - this.exportDir = props.getProperty("export.dir", this.exportDir); + return new DelimiterSet(field, record, enclose, escape, required); + } - this.direct = getBooleanProperty(props, "direct.import", this.direct); - this.hiveImport = getBooleanProperty(props, "hive.import", - this.hiveImport); - this.overwriteHiveTable = getBooleanProperty(props, - "hive.overwrite.table", this.overwriteHiveTable); - this.useCompression = getBooleanProperty(props, "compression", - this.useCompression); - this.directSplitSize = getLongProperty(props, "direct.split.size", - this.directSplitSize); - } catch (IOException ioe) { - LOG.error("Could not read properties file " + DEFAULT_CONFIG_FILE + ": " - + ioe.toString()); - } finally { - if (null != istream) { - try { - istream.close(); - } catch (IOException ioe) { - // Ignore this; we're closing. - } + private void setDelimiterProperties(Properties props, + String prefix, DelimiterSet values) { + putProperty(props, prefix + ".field", + Integer.toString((int) values.getFieldsTerminatedBy())); + putProperty(props, prefix + ".record", + Integer.toString((int) values.getLinesTerminatedBy())); + putProperty(props, prefix + ".enclose", + Integer.toString((int) values.getEnclosedBy())); + putProperty(props, prefix + ".escape", + Integer.toString((int) values.getEscapedBy())); + putProperty(props, prefix + ".enclose.required", + Boolean.toString(values.isEncloseRequired())); + } + + /** Take a comma-delimited list of input and split the elements + * into an output array. */ + private String [] listToArray(String strList) { + return strList.split(","); + } + + private String arrayToList(String [] array) { + if (null == array) { + return null; + } + + StringBuilder sb = new StringBuilder(); + boolean first = true; + for (String elem : array) { + if (!first) { + sb.append(","); } + sb.append(elem); + first = false; } + + return sb.toString(); + } + + /** + * A put() method for Properties that is tolerent of 'null' values. + * If a null value is specified, the property is unset. + */ + private void putProperty(Properties props, String k, String v) { + if (null == v) { + props.remove(k); + } else { + props.setProperty(k, v); + } + } + + /** + * Given a property prefix that denotes a set of numbered properties, + * return an array containing all the properties. + * + * For instance, if prefix is "foo", then return properties "foo.0", + * "foo.1", "foo.2", and so on as an array. If no such properties + * exist, return 'defaults'. + */ + private String [] getArgArrayProperty(Properties props, String prefix, + String [] defaults) { + int cur = 0; + ArrayList al = new ArrayList(); + while (true) { + String curProp = prefix + "." + cur; + String curStr = props.getProperty(curProp, null); + if (null == curStr) { + break; + } + + al.add(curStr); + cur++; + } + + if (cur == 0) { + // Couldn't find an array here; return the defaults. + return defaults; + } + + return al.toArray(new String[0]); + } + + private void setArgArrayProperties(Properties props, String prefix, + String [] values) { + if (null == values) { + return; + } + + for (int i = 0; i < values.length; i++) { + putProperty(props, prefix + "." + i, values[i]); + } + } + + /** + * Given a set of properties, load this into the current SqoopOptions + * instance. + */ + public void loadProperties(Properties props) { + + this.connectString = props.getProperty("db.connect.string", + this.connectString); + this.username = props.getProperty("db.username", this.username); + + if (getBooleanProperty(props, "db.require.password", false)) { + // The user's password was stripped out from the metastore. + // Require that the user enter it now. + setPasswordFromConsole(); + } else { + this.password = props.getProperty("db.password", this.password); + } + + this.tableName = props.getProperty("db.table", this.tableName); + String colListStr = props.getProperty("db.column.list", null); + if (null != colListStr) { + this.columns = listToArray(colListStr); + } + + this.codeOutputDir = props.getProperty("codegen.output.dir", + this.codeOutputDir); + this.jarOutputDir = props.getProperty("codegen.compile.dir", + this.jarOutputDir); + + this.splitByCol = props.getProperty("db.split.column", this.splitByCol); + this.whereClause = props.getProperty("db.where.clause", this.whereClause); + this.sqlQuery = props.getProperty("db.query", this.sqlQuery); + + this.driverClassName = props.getProperty("jdbc.driver.class", + this.driverClassName); + + this.warehouseDir = props.getProperty("hdfs.warehouse.dir", + this.warehouseDir); + this.targetDir = props.getProperty("hdfs.target.dir", + this.targetDir); + this.append = getBooleanProperty(props, "hdfs.append.dir", this.append); + + String fileFmtStr = props.getProperty("hdfs.file.format", "text"); + if (fileFmtStr.equals("seq")) { + this.layout = FileLayout.SequenceFile; + } else { + this.layout = FileLayout.TextFile; + } + + this.direct = getBooleanProperty(props, "direct.import", this.direct); + + this.hiveImport = getBooleanProperty(props, "hive.import", + this.hiveImport); + this.overwriteHiveTable = getBooleanProperty(props, + "hive.overwrite.table", this.overwriteHiveTable); + this.hiveTableName = props.getProperty("hive.table.name", + this.hiveTableName); + + this.className = props.getProperty("codegen.java.classname", + this.className); + this.packageName = props.getProperty("codegen.java.packagename", + this.packageName); + this.existingJarFile = props.getProperty("codegen.jar.file", + this.existingJarFile); + + this.numMappers = getIntProperty(props, "mapreduce.num.mappers", + this.numMappers); + + this.useCompression = getBooleanProperty(props, "enable.compression", + this.useCompression); + + this.directSplitSize = getLongProperty(props, "import.direct.split.size", + this.directSplitSize); + + this.maxInlineLobSize = getLongProperty(props, + "import.max.inline.lob.size", this.maxInlineLobSize); + + this.exportDir = props.getProperty("export.source.dir", this.exportDir); + this.updateKeyCol = props.getProperty("export.update.col", + this.updateKeyCol); + + this.inputDelimiters = getDelimiterProperties(props, + "codegen.input.delimiters", this.inputDelimiters); + this.outputDelimiters = getDelimiterProperties(props, + "codegen.output.delimiters", this.outputDelimiters); + + this.extraArgs = getArgArrayProperty(props, "tool.arguments", + this.extraArgs); + + this.hbaseTable = props.getProperty("hbase.table", this.hbaseTable); + this.hbaseColFamily = props.getProperty("hbase.col.family", + this.hbaseColFamily); + this.hbaseRowKeyCol = props.getProperty("hbase.row.key.col", + this.hbaseRowKeyCol); + this.hbaseCreateTable = getBooleanProperty(props, "hbase.create.table", + this.hbaseCreateTable); + } + + /** + * Return a Properties instance that encapsulates all the "sticky" + * state of this SqoopOptions that should be written to a metastore + * to restore the session later. + */ + public Properties writeProperties() { + Properties props = new Properties(); + + putProperty(props, "db.connect.string", this.connectString); + putProperty(props, "db.username", this.username); + + if (this.getConf().getBoolean( + METASTORE_PASSWORD_KEY, METASTORE_PASSWORD_DEFAULT)) { + // If the user specifies, we may store the password in the metastore. + putProperty(props, "db.password", this.password); + putProperty(props, "db.require.password", "false"); + } else if (this.password != null) { + // Otherwise, if the user has set a password, we just record + // a flag stating that the password will need to be reentered. + putProperty(props, "db.require.password", "true"); + } else { + // No password saved or required. + putProperty(props, "db.require.password", "false"); + } + + putProperty(props, "db.table", this.tableName); + putProperty(props, "db.column.list", arrayToList(this.columns)); + putProperty(props, "codegen.output.dir", this.codeOutputDir); + putProperty(props, "codegen.compile.dir", this.jarOutputDir); + putProperty(props, "db.split.column", this.splitByCol); + putProperty(props, "db.where.clause", this.whereClause); + putProperty(props, "db.query", this.sqlQuery); + putProperty(props, "jdbc.driver.class", this.driverClassName); + putProperty(props, "hdfs.warehouse.dir", this.warehouseDir); + putProperty(props, "hdfs.target.dir", this.targetDir); + putProperty(props, "hdfs.append.dir", Boolean.toString(this.append)); + if (this.layout == FileLayout.SequenceFile) { + putProperty(props, "hdfs.file.format", "seq"); + } else { + putProperty(props, "hdfs.file.format", "text"); + } + putProperty(props, "direct.import", Boolean.toString(this.direct)); + putProperty(props, "hive.import", Boolean.toString(this.hiveImport)); + putProperty(props, "hive.overwrite.table", + Boolean.toString(this.overwriteHiveTable)); + putProperty(props, "hive.table.name", this.hiveTableName); + putProperty(props, "codegen.java.classname", this.className); + putProperty(props, "codegen.java.packagename", this.packageName); + putProperty(props, "codegen.jar.file", this.existingJarFile); + putProperty(props, "mapreduce.num.mappers", + Integer.toString(this.numMappers)); + putProperty(props, "enable.compression", + Boolean.toString(this.useCompression)); + putProperty(props, "import.direct.split.size", + Long.toString(this.directSplitSize)); + putProperty(props, "import.max.inline.lob.size", + Long.toString(this.maxInlineLobSize)); + putProperty(props, "export.source.dir", this.exportDir); + putProperty(props, "export.update.col", this.updateKeyCol); + setDelimiterProperties(props, "codegen.input.delimiters", + this.inputDelimiters); + setDelimiterProperties(props, "codegen.output.delimiters", + this.outputDelimiters); + setArgArrayProperties(props, "tool.arguments", this.extraArgs); + putProperty(props, "hbase.table", this.hbaseTable); + putProperty(props, "hbase.col.family", this.hbaseColFamily); + putProperty(props, "hbase.row.key.col", this.hbaseRowKeyCol); + putProperty(props, "hbase.create.table", + Boolean.toString(this.hbaseCreateTable)); + + return props; } /** @@ -291,8 +536,6 @@ private void initDefaults(Configuration baseConfiguration) { this.extraArgs = null; this.dbOutColumns = null; - - loadFromProperties(); } /** diff --git a/src/java/com/cloudera/sqoop/metastore/SessionData.java b/src/java/com/cloudera/sqoop/metastore/SessionData.java new file mode 100644 index 00000000..52b4ce7f --- /dev/null +++ b/src/java/com/cloudera/sqoop/metastore/SessionData.java @@ -0,0 +1,69 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop.metastore; + +import com.cloudera.sqoop.SqoopOptions; +import com.cloudera.sqoop.tool.SqoopTool; + +/** + * Container for all session data that should be stored to a + * permanent resource. + */ +public class SessionData { + private SqoopOptions opts; + private SqoopTool tool; + + public SessionData() { + } + + public SessionData(SqoopOptions options, SqoopTool sqoopTool) { + this.opts = options; + this.tool = sqoopTool; + } + + /** + * Gets the SqoopOptions. + */ + public SqoopOptions getSqoopOptions() { + return this.opts; + } + + /** + * Gets the SqoopTool. + */ + public SqoopTool getSqoopTool() { + return this.tool; + } + + /** + * Sets the SqoopOptions. + */ + public void setSqoopOptions(SqoopOptions options) { + this.opts = options; + } + + /** + * Sets the SqoopTool. + */ + public void setSqoopTool(SqoopTool sqoopTool) { + this.tool = sqoopTool; + } + +} + diff --git a/src/java/com/cloudera/sqoop/metastore/SessionStorage.java b/src/java/com/cloudera/sqoop/metastore/SessionStorage.java new file mode 100644 index 00000000..3291acda --- /dev/null +++ b/src/java/com/cloudera/sqoop/metastore/SessionStorage.java @@ -0,0 +1,94 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop.metastore; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configured; + +/** + * API that defines how sessions are saved, restored, and manipulated. + * + *

+ * SessionStorage instances may be created and then not used; the + * SessionStorage factory may create additional SessionStorage instances + * that return false from accept() and then discard them. The close() + * method will only be triggered for a SessionStorage if the connect() + * method is called. Connection should not be triggered by a call to + * accept().

+ */ +public abstract class SessionStorage extends Configured implements Closeable { + + /** + * Returns true if the SessionStorage system can use the metadata in + * the descriptor to connect to an underlying session resource. + */ + public abstract boolean canAccept(Map descriptor); + + + /** + * Opens / connects to the underlying storage resource specified by the + * descriptor. + */ + public abstract void open(Map descriptor) + throws IOException; + + /** + * Given a session name, reconstitute a SessionData that contains all + * configuration information required for the session. Returns null if the + * session name does not match an available session. + */ + public abstract SessionData read(String sessionName) + throws IOException; + + /** + * Forget about a saved session. + */ + public abstract void delete(String sessionName) throws IOException; + + /** + * Given a session name and the data describing a configured + * session, record the session information to the storage medium. + */ + public abstract void create(String sessionName, SessionData data) + throws IOException; + + /** + * Given a session descriptor and a configured session + * update the underlying resource to match the current session + * configuration. + */ + public abstract void update(String sessionName, SessionData data) + throws IOException; + + /** + * Close any resources opened by the SessionStorage system. + */ + public void close() throws IOException { + } + + /** + * Enumerate all sessions held in the connected resource. + */ + public abstract List list() throws IOException; +} + diff --git a/src/java/com/cloudera/sqoop/metastore/SessionStorageFactory.java b/src/java/com/cloudera/sqoop/metastore/SessionStorageFactory.java new file mode 100644 index 00000000..3ae75bb8 --- /dev/null +++ b/src/java/com/cloudera/sqoop/metastore/SessionStorageFactory.java @@ -0,0 +1,72 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop.metastore; + +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; + +/** + * Factory that produces the correct SessionStorage system to work with + * a particular session descriptor. + */ +public class SessionStorageFactory { + + private Configuration conf; + + /** + * Configuration key describing the list of SessionStorage implementations + * to use to handle sessions. + */ + public static final String AVAILABLE_STORAGES_KEY = + "sqoop.session.storage.implementations"; + + /** The default list of available SessionStorage implementations. */ + private static final String DEFAULT_AVAILABLE_STORAGES = + "com.cloudera.sqoop.metastore.hsqldb.HsqldbSessionStorage," + + "com.cloudera.sqoop.metastore.hsqldb.AutoHsqldbStorage"; + + public SessionStorageFactory(Configuration config) { + this.conf = config; + + // Ensure that we always have an available storages list. + if (this.conf.get(AVAILABLE_STORAGES_KEY) == null) { + this.conf.set(AVAILABLE_STORAGES_KEY, DEFAULT_AVAILABLE_STORAGES); + } + } + + /** + * Given a session descriptor, determine the correct SessionStorage + * implementation to use to handle the session and return an instance + * of it -- or null if no SessionStorage instance is appropriate. + */ + public SessionStorage getSessionStorage(Map descriptor) { + List storages = this.conf.getInstances( + AVAILABLE_STORAGES_KEY, SessionStorage.class); + for (SessionStorage stor : storages) { + if (stor.canAccept(descriptor)) { + return stor; + } + } + + return null; + } +} + diff --git a/src/java/com/cloudera/sqoop/metastore/hsqldb/AutoHsqldbStorage.java b/src/java/com/cloudera/sqoop/metastore/hsqldb/AutoHsqldbStorage.java new file mode 100644 index 00000000..ffcde79e --- /dev/null +++ b/src/java/com/cloudera/sqoop/metastore/hsqldb/AutoHsqldbStorage.java @@ -0,0 +1,113 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop.metastore.hsqldb; + +import java.io.File; +import java.io.IOException; + +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.conf.Configuration; + +/** + * SessionStorage implementation that auto-configures an HSQLDB + * local-file-based instance to hold sessions. + */ +public class AutoHsqldbStorage extends HsqldbSessionStorage { + + public static final Log LOG = LogFactory.getLog( + AutoHsqldbStorage.class.getName()); + + /** + * Configuration key specifying whether this storage agent is active. + * Defaults to "on" to allow zero-conf local users. + */ + public static final String AUTO_STORAGE_IS_ACTIVE_KEY = + "sqoop.metastore.client.enable.autoconnect"; + + /** + * Configuration key specifying the connect string used by this + * storage agent. + */ + public static final String AUTO_STORAGE_CONNECT_STRING_KEY = + "sqoop.metastore.client.autoconnect.url"; + + /** + * Configuration key specifying the username to bind with. + */ + public static final String AUTO_STORAGE_USER_KEY = + "sqoop.metastore.client.autoconnect.username"; + + + /** HSQLDB default user is named 'SA'. */ + private static final String DEFAULT_AUTO_USER = "SA"; + + /** + * Configuration key specifying the password to bind with. + */ + public static final String AUTO_STORAGE_PASS_KEY = + "sqoop.metastore.client.autoconnect.password"; + + /** HSQLDB default user has an empty password. */ + public static final String DEFAULT_AUTO_PASSWORD = ""; + + @Override + /** {@inheritDoc} */ + public boolean canAccept(Map descriptor) { + Configuration conf = this.getConf(); + return conf.getBoolean(AUTO_STORAGE_IS_ACTIVE_KEY, true); + } + + /** + * Determine the user's home directory and return a connect + * string to HSQLDB that uses ~/.sqoop/ as the storage location + * for the metastore database. + */ + private String getHomeDirFileConnectStr() { + String homeDir = System.getProperty("user.home"); + + File homeDirObj = new File(homeDir); + File sqoopDataDirObj = new File(homeDirObj, ".sqoop"); + File databaseFileObj = new File(sqoopDataDirObj, "metastore.db"); + + String dbFileStr = databaseFileObj.toString(); + return "jdbc:hsqldb:file:" + dbFileStr + + ";hsqldb.write_delay=false;shutdown=true"; + } + + @Override + /** + * Set the connection information to use the auto-inferred connection + * string. + */ + public void open(Map descriptor) throws IOException { + Configuration conf = getConf(); + setMetastoreConnectStr(conf.get(AUTO_STORAGE_CONNECT_STRING_KEY, + getHomeDirFileConnectStr())); + setMetastoreUser(conf.get(AUTO_STORAGE_USER_KEY, DEFAULT_AUTO_USER)); + setMetastorePassword(conf.get(AUTO_STORAGE_PASS_KEY, + DEFAULT_AUTO_PASSWORD)); + + init(); + } +} + diff --git a/src/java/com/cloudera/sqoop/metastore/hsqldb/HsqldbMetaStore.java b/src/java/com/cloudera/sqoop/metastore/hsqldb/HsqldbMetaStore.java new file mode 100644 index 00000000..235695c7 --- /dev/null +++ b/src/java/com/cloudera/sqoop/metastore/hsqldb/HsqldbMetaStore.java @@ -0,0 +1,182 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package com.cloudera.sqoop.metastore.hsqldb; + +import java.io.File; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; + +import org.apache.hadoop.util.StringUtils; + +import org.hsqldb.Server; +import org.hsqldb.ServerConstants; + +import com.cloudera.sqoop.SqoopOptions; + +import com.cloudera.sqoop.manager.HsqldbManager; + +/** + * Container for an HSQLDB-backed metastore. + */ +public class HsqldbMetaStore { + + public static final Log LOG = LogFactory.getLog( + HsqldbMetaStore.class.getName()); + + /** Where on the local fs does the metastore put files? */ + public static final String META_STORAGE_LOCATION_KEY = + "sqoop.metastore.server.location"; + + /** + * What port does the metastore listen on? + */ + public static final String META_SERVER_PORT_KEY = + "sqoop.metastore.server.port"; + + /** Default to this port if unset. */ + public static final int DEFAULT_PORT = 16000; + + private int port; + private String fileLocation; + private Server server; + private Configuration conf; + + public HsqldbMetaStore(Configuration config) { + this.conf = config; + init(); + } + + /** + * Determine the user's home directory and return a file path + * under this root where the shared metastore can be placed. + */ + private String getHomeDirFilePath() { + String homeDir = System.getProperty("user.home"); + + File homeDirObj = new File(homeDir); + File sqoopDataDirObj = new File(homeDirObj, ".sqoop"); + File databaseFileObj = new File(sqoopDataDirObj, "shared-metastore.db"); + + return databaseFileObj.toString(); + } + + private void init() { + if (null != server) { + LOG.debug("init(): server already exists."); + return; + } + + fileLocation = conf.get(META_STORAGE_LOCATION_KEY, null); + if (null == fileLocation) { + fileLocation = getHomeDirFilePath(); + LOG.warn("The location for metastore data has not been explicitly set. " + + "Placing shared metastore files in " + fileLocation); + } + + this.port = conf.getInt(META_SERVER_PORT_KEY, DEFAULT_PORT); + } + + + public void start() { + try { + if (server != null) { + server.checkRunning(false); + } + } catch (RuntimeException re) { + LOG.info("Server is already started."); + return; + } + + server = new Server(); + server.setDatabasePath(0, "file:" + fileLocation); + server.setDatabaseName(0, "sqoop"); + server.putPropertiesFromString("hsqldb.write_delay=false"); + server.setPort(port); + server.setSilent(true); + server.setNoSystemExit(true); + + server.start(); + LOG.info("Server started on port " + port + " with protocol " + + server.getProtocol()); + } + + /** + * Blocks the current thread until the server is shut down. + */ + public void waitForServer() { + while (true) { + int curState = server.getState(); + if (curState == ServerConstants.SERVER_STATE_SHUTDOWN) { + LOG.info("Got shutdown notification"); + break; + } + + try { + Thread.sleep(100); + } catch (InterruptedException ie) { + LOG.info("Interrupted while blocking for server:" + + StringUtils.stringifyException(ie)); + } + } + } + + /** + * Connects to the server and instructs it to shutdown. + */ + public void shutdown() { + // Send the SHUTDOWN command to the server via SQL. + SqoopOptions options = new SqoopOptions(conf); + options.setConnectString("jdbc:hsqldb:hsql://localhost:" + + port + "/sqoop"); + options.setUsername("SA"); + options.setPassword(""); + HsqldbManager manager = new HsqldbManager(options); + Statement s = null; + try { + Connection c = manager.getConnection(); + s = c.createStatement(); + s.execute("SHUTDOWN"); + } catch (SQLException sqlE) { + LOG.warn("Exception shutting down database: " + + StringUtils.stringifyException(sqlE)); + } finally { + if (null != s) { + try { + s.close(); + } catch (SQLException sqlE) { + LOG.warn("Error closing statement: " + sqlE); + } + } + + try { + manager.close(); + } catch (SQLException sqlE) { + LOG.warn("Error closing manager: " + sqlE); + } + } + } +} + diff --git a/src/java/com/cloudera/sqoop/metastore/hsqldb/HsqldbSessionStorage.java b/src/java/com/cloudera/sqoop/metastore/hsqldb/HsqldbSessionStorage.java new file mode 100644 index 00000000..09d9da9f --- /dev/null +++ b/src/java/com/cloudera/sqoop/metastore/hsqldb/HsqldbSessionStorage.java @@ -0,0 +1,796 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package com.cloudera.sqoop.metastore.hsqldb; + +import java.io.IOException; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.conf.Configuration; + +import com.cloudera.sqoop.SqoopOptions; + +import com.cloudera.sqoop.metastore.SessionData; +import com.cloudera.sqoop.metastore.SessionStorage; + +import com.cloudera.sqoop.tool.SqoopTool; + +/** + * SessionStorage implementation that uses an HSQLDB-backed database to + * hold session information. + */ +public class HsqldbSessionStorage extends SessionStorage { + + public static final Log LOG = LogFactory.getLog( + HsqldbSessionStorage.class.getName()); + + /** descriptor key identifying the connect string for the metastore. */ + public static final String META_CONNECT_KEY = "metastore.connect.string"; + + /** descriptor key identifying the username to use when connecting + * to the metastore. + */ + public static final String META_USERNAME_KEY = "metastore.username"; + + /** descriptor key identifying the password to use when connecting + * to the metastore. + */ + public static final String META_PASSWORD_KEY = "metastore.password"; + + + /** Default name for the root metadata table in HSQLDB. */ + private static final String DEFAULT_ROOT_TABLE_NAME = "SQOOP_ROOT"; + + /** Configuration key used to override root table name. */ + public static final String ROOT_TABLE_NAME_KEY = + "sqoop.hsqldb.root.table.name"; + + /** root metadata table key used to define the current schema version. */ + private static final String STORAGE_VERSION_KEY = + "sqoop.hsqldb.session.storage.version"; + + /** The current version number for the schema edition. */ + private static final int CUR_STORAGE_VERSION = 0; + + /** root metadata table key used to define the session table name. */ + private static final String SESSION_TABLE_KEY = + "sqoop.hsqldb.session.info.table"; + + /** Default value for SESSION_TABLE_KEY. */ + private static final String DEFAULT_SESSION_TABLE_NAME = + "SQOOP_SESSIONS"; + + /** Per-session key with propClass 'schema' that defines the set of + * properties valid to be defined for propClass 'SqoopOptions'. */ + private static final String PROPERTY_SET_KEY = + "sqoop.property.set.id"; + + /** Current value for PROPERTY_SET_KEY. */ + private static final String CUR_PROPERTY_SET_ID = "0"; + + // The following are values for propClass in the v0 schema which + // describe different aspects of the stored metadata. + + /** Property class for properties about the stored data itself. */ + private static final String PROPERTY_CLASS_SCHEMA = "schema"; + + /** Property class for properties that are loaded into SqoopOptions. */ + private static final String PROPERTY_CLASS_SQOOP_OPTIONS = "SqoopOptions"; + + /** Property class for properties that are loaded into a Configuration. */ + private static final String PROPERTY_CLASS_CONFIG = "config"; + + /** + * Per-session key with propClass 'schema' that specifies the SqoopTool + * to load. + */ + private static final String SQOOP_TOOL_KEY = "sqoop.tool"; + + + private String metastoreConnectStr; + private String metastoreUser; + private String metastorePassword; + private Connection connection; + + protected Connection getConnection() { + return this.connection; + } + + // After connection to the database and initialization of the + // schema, this holds the name of the session table. + private String sessionTableName; + + protected void setMetastoreConnectStr(String connectStr) { + this.metastoreConnectStr = connectStr; + } + + protected void setMetastoreUser(String user) { + this.metastoreUser = user; + } + + protected void setMetastorePassword(String pass) { + this.metastorePassword = pass; + } + + private static final String DB_DRIVER_CLASS = "org.hsqldb.jdbcDriver"; + + @Override + /** + * Initialize the connection to the database. + */ + public void open(Map descriptor) throws IOException { + setMetastoreConnectStr(descriptor.get(META_CONNECT_KEY)); + setMetastoreUser(descriptor.get(META_USERNAME_KEY)); + setMetastorePassword(descriptor.get(META_PASSWORD_KEY)); + + init(); + } + + protected void init() throws IOException { + try { + // Load/initialize the JDBC driver. + Class.forName(DB_DRIVER_CLASS); + } catch (ClassNotFoundException cnfe) { + throw new IOException("Could not load HSQLDB JDBC driver", cnfe); + } + + try { + if (null == metastoreUser) { + this.connection = DriverManager.getConnection(metastoreConnectStr); + } else { + this.connection = DriverManager.getConnection(metastoreConnectStr, + metastoreUser, metastorePassword); + } + + connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE); + connection.setAutoCommit(false); + + // Initialize the root schema. + if (!rootTableExists()) { + createRootTable(); + } + + // Check the schema version. + String curStorageVerStr = getRootProperty(STORAGE_VERSION_KEY, null); + int actualStorageVer = -1; + try { + actualStorageVer = Integer.valueOf(curStorageVerStr); + } catch (NumberFormatException nfe) { + LOG.warn("Could not interpret as a number: " + curStorageVerStr); + } + if (actualStorageVer != CUR_STORAGE_VERSION) { + LOG.error("Can not interpret metadata schema"); + LOG.error("The metadata schema version is " + curStorageVerStr); + LOG.error("The highest version supported is " + CUR_STORAGE_VERSION); + LOG.error("To use this version of Sqoop, " + + "you must downgrade your metadata schema."); + throw new IOException("Invalid metadata version."); + } + + // Initialize the versioned schema. + initV0Schema(); + } catch (SQLException sqle) { + if (null != connection) { + try { + connection.rollback(); + } catch (SQLException e2) { + LOG.warn("Error rolling back transaction in error handler: " + e2); + } + } + + throw new IOException("Exception creating SQL connection", sqle); + } + } + + @Override + public void close() throws IOException { + if (null != this.connection) { + try { + LOG.debug("Flushing current transaction"); + this.connection.commit(); + } catch (SQLException sqlE) { + throw new IOException("Exception committing connection", sqlE); + } + + try { + LOG.debug("Closing connection"); + this.connection.close(); + } catch (SQLException sqlE) { + throw new IOException("Exception closing connection", sqlE); + } finally { + this.connection = null; + } + } + } + + @Override + /** {@inheritDoc} */ + public boolean canAccept(Map descriptor) { + // We return true if the desciptor contains a connect string to find + // the database. + return descriptor.get(META_CONNECT_KEY) != null; + } + + @Override + /** {@inheritDoc} */ + public SessionData read(String sessionName) throws IOException { + try { + if (!sessionExists(sessionName)) { + LOG.error("Cannot restore session: " + sessionName); + LOG.error("(No such session)"); + throw new IOException("Cannot restore missing session " + sessionName); + } + + LOG.debug("Restoring session: " + sessionName); + Properties schemaProps = getV0Properties(sessionName, + PROPERTY_CLASS_SCHEMA); + Properties sqoopOptProps = getV0Properties(sessionName, + PROPERTY_CLASS_SQOOP_OPTIONS); + Properties configProps = getV0Properties(sessionName, + PROPERTY_CLASS_CONFIG); + + // Check that we're not using a saved session from a previous + // version whose functionality has been deprecated. + String thisPropSetId = schemaProps.getProperty(PROPERTY_SET_KEY); + LOG.debug("System property set: " + CUR_PROPERTY_SET_ID); + LOG.debug("Stored property set: " + thisPropSetId); + if (!CUR_PROPERTY_SET_ID.equals(thisPropSetId)) { + LOG.warn("The property set present in this database was written by"); + LOG.warn("an incompatible version of Sqoop. This may result in an"); + LOG.warn("incomplete operation."); + // TODO(aaron): Should this fail out-right? + } + + String toolName = schemaProps.getProperty(SQOOP_TOOL_KEY); + if (null == toolName) { + // Don't know what tool to create. + throw new IOException("Incomplete metadata; missing " + + SQOOP_TOOL_KEY); + } + + SqoopTool tool = SqoopTool.getTool(toolName); + if (null == tool) { + throw new IOException("Error in session metadata: invalid tool " + + toolName); + } + + Configuration conf = new Configuration(); + for (Map.Entry entry : configProps.entrySet()) { + conf.set(entry.getKey().toString(), entry.getValue().toString()); + } + + SqoopOptions opts = new SqoopOptions(); + opts.setConf(conf); + opts.loadProperties(sqoopOptProps); + + return new SessionData(opts, tool); + } catch (SQLException sqlE) { + throw new IOException("Error communicating with database", sqlE); + } + } + + private boolean sessionExists(String sessionName) throws SQLException { + PreparedStatement s = connection.prepareStatement( + "SELECT COUNT(session_name) FROM " + this.sessionTableName + + " WHERE session_name = ? GROUP BY session_name"); + ResultSet rs = null; + try { + s.setString(1, sessionName); + rs = s.executeQuery(); + if (rs.next()) { + return true; // We got a result, meaning the session exists. + } + } finally { + if (null != rs) { + try { + rs.close(); + } catch (SQLException sqlE) { + LOG.warn("Error closing result set: " + sqlE); + } + } + + s.close(); + } + + return false; // No result. + } + + @Override + /** {@inheritDoc} */ + public void delete(String sessionName) throws IOException { + try { + if (!sessionExists(sessionName)) { + LOG.error("No such session: " + sessionName); + } else { + LOG.debug("Deleting session: " + sessionName); + PreparedStatement s = connection.prepareStatement("DELETE FROM " + + this.sessionTableName + " WHERE session_name = ?"); + try { + s.setString(1, sessionName); + s.executeUpdate(); + } finally { + s.close(); + } + connection.commit(); + } + } catch (SQLException sqlEx) { + try { + connection.rollback(); + } catch (SQLException e2) { + LOG.warn("Error rolling back transaction in error handler: " + e2); + } + throw new IOException("Error communicating with database", sqlEx); + } + } + + @Override + /** {@inheritDoc} */ + public void create(String sessionName, SessionData data) + throws IOException { + try { + if (sessionExists(sessionName)) { + LOG.error("Cannot create session " + sessionName + + ": it already exists"); + throw new IOException("Session " + sessionName + " already exists"); + } + } catch (SQLException sqlE) { + throw new IOException("Error communicating with database", sqlE); + } + + createInternal(sessionName, data); + } + + /** + * Actually insert/update the resources for this session. + */ + private void createInternal(String sessionName, SessionData data) + throws IOException { + try { + LOG.debug("Creating session: " + sessionName); + + // Save the name of the Sqoop tool. + setV0Property(sessionName, PROPERTY_CLASS_SCHEMA, SQOOP_TOOL_KEY, + data.getSqoopTool().getToolName()); + + // Save the property set id. + setV0Property(sessionName, PROPERTY_CLASS_SCHEMA, PROPERTY_SET_KEY, + CUR_PROPERTY_SET_ID); + + // Save all properties of the SqoopOptions. + Properties props = data.getSqoopOptions().writeProperties(); + setV0Properties(sessionName, PROPERTY_CLASS_SQOOP_OPTIONS, props); + + // And save all unique properties of the configuration. + Configuration saveConf = data.getSqoopOptions().getConf(); + Configuration baseConf = new Configuration(); + + for (Map.Entry entry : saveConf) { + String key = entry.getKey(); + String rawVal = saveConf.getRaw(key); + String baseVal = baseConf.getRaw(key); + if (baseVal != null && rawVal.equals(baseVal)) { + continue; // Don't save this; it's set in the base configuration. + } + + LOG.debug("Saving " + key + " => " + rawVal + " / " + baseVal); + setV0Property(sessionName, PROPERTY_CLASS_CONFIG, key, rawVal); + } + + connection.commit(); + } catch (SQLException sqlE) { + try { + connection.rollback(); + } catch (SQLException sqlE2) { + LOG.warn("Exception rolling back transaction during error handling: " + + sqlE2); + } + throw new IOException("Error communicating with database", sqlE); + } + } + + @Override + /** {@inheritDoc} */ + public void update(String sessionName, SessionData data) + throws IOException { + try { + if (!sessionExists(sessionName)) { + LOG.error("Cannot update session " + sessionName + ": not found"); + throw new IOException("Session " + sessionName + " does not exist"); + } + } catch (SQLException sqlE) { + throw new IOException("Error communicating with database", sqlE); + } + + // Since we set properties with update-or-insert, this is the same + // as create on this system. + createInternal(sessionName, data); + } + + @Override + /** {@inheritDoc} */ + public List list() throws IOException { + ResultSet rs = null; + try { + PreparedStatement s = connection.prepareStatement( + "SELECT DISTINCT session_name FROM " + this.sessionTableName); + try { + rs = s.executeQuery(); + ArrayList sessions = new ArrayList(); + while (rs.next()) { + sessions.add(rs.getString(1)); + } + + return sessions; + } finally { + if (null != rs) { + try { + rs.close(); + } catch (SQLException sqlE) { + LOG.warn("Error closing resultset: " + sqlE); + } + } + + if (null != s) { + s.close(); + } + } + } catch (SQLException sqlE) { + throw new IOException("Error communicating with database", sqlE); + } + } + + // Determine the name to use for the root metadata table. + private String getRootTableName() { + Configuration conf = getConf(); + return conf.get(ROOT_TABLE_NAME_KEY, DEFAULT_ROOT_TABLE_NAME); + } + + private boolean tableExists(String table) throws SQLException { + LOG.debug("Checking for table: " + table); + DatabaseMetaData dbmd = connection.getMetaData(); + String [] tableTypes = { "TABLE" }; + ResultSet rs = dbmd.getTables(null, null, null, tableTypes); + if (null != rs) { + try { + while (rs.next()) { + if (table.equalsIgnoreCase(rs.getString("TABLE_NAME"))) { + LOG.debug("Found table: " + table); + return true; + } + } + } finally { + rs.close(); + } + } + + LOG.debug("Could not find table."); + return false; + } + + private boolean rootTableExists() throws SQLException { + String rootTableName = getRootTableName(); + return tableExists(rootTableName); + } + + private void createRootTable() throws SQLException { + String rootTableName = getRootTableName(); + LOG.debug("Creating root table: " + rootTableName); + + // TODO: Sanity-check the value of rootTableName to ensure it is + // not a SQL-injection attack vector. + Statement s = connection.createStatement(); + try { + s.executeUpdate("CREATE TABLE " + rootTableName + " (" + + "version INT, " + + "propname VARCHAR(128) NOT NULL, " + + "propval VARCHAR(256), " + + "CONSTRAINT " + rootTableName + "_unq UNIQUE (version, propname))"); + } finally { + s.close(); + } + + setRootProperty(STORAGE_VERSION_KEY, null, + Integer.toString(CUR_STORAGE_VERSION)); + + LOG.debug("Saving root table."); + connection.commit(); + } + + /** + * Look up a value for the specified version (may be null) in the + * root metadata table. + */ + private String getRootProperty(String propertyName, Integer version) + throws SQLException { + LOG.debug("Looking up property " + propertyName + " for version " + + version); + PreparedStatement s = null; + ResultSet rs = null; + + try { + if (null == version) { + s = connection.prepareStatement( + "SELECT propval FROM " + getRootTableName() + + " WHERE version IS NULL AND propname = ?"); + s.setString(1, propertyName); + } else { + s = connection.prepareStatement( + "SELECT propval FROM " + getRootTableName() + " WHERE version = ? " + + " AND propname = ?"); + s.setInt(1, version); + s.setString(2, propertyName); + } + + rs = s.executeQuery(); + if (!rs.next()) { + LOG.debug(" => (no result)"); + return null; // No such result. + } else { + String result = rs.getString(1); // Return the only result col. + LOG.debug(" => " + result); + return result; + } + } finally { + if (null != rs) { + try { + rs.close(); + } catch (SQLException sqlE) { + LOG.warn("Error closing resultset: " + sqlE); + } + } + + if (null != s) { + s.close(); + } + } + } + + /** + * Set a value for the specified version (may be null) in the root + * metadata table. + */ + private void setRootProperty(String propertyName, Integer version, + String val) throws SQLException { + LOG.debug("Setting property " + propertyName + " for version " + + version + " => " + val); + + PreparedStatement s; + String curVal = getRootProperty(propertyName, version); + if (null == curVal) { + // INSERT the row. + s = connection.prepareStatement("INSERT INTO " + getRootTableName() + + " (propval, propname, version) VALUES ( ? , ? , ? )"); + } else if (version == null) { + // UPDATE an existing row with a null version + s = connection.prepareStatement("UPDATE " + getRootTableName() + + " SET propval = ? WHERE propname = ? AND version IS NULL"); + } else { + // UPDATE an existing row with non-null version. + s = connection.prepareStatement("UPDATE " + getRootTableName() + + " SET propval = ? WHERE propname = ? AND version = ?"); + } + + try { + s.setString(1, val); + s.setString(2, propertyName); + if (null != version) { + s.setInt(3, version); + } + s.executeUpdate(); + } finally { + s.close(); + } + } + + /** + * Create the sessions table in the V0 schema. + */ + private void createSessionTable() throws SQLException { + String curTableName = DEFAULT_SESSION_TABLE_NAME; + int tableNum = -1; + while (true) { + if (tableExists(curTableName)) { + tableNum++; + curTableName = DEFAULT_SESSION_TABLE_NAME + "_" + tableNum; + } else { + break; + } + } + + // curTableName contains a table name that does not exist. + // Create this table. + LOG.debug("Creating session storage table: " + curTableName); + Statement s = connection.createStatement(); + try { + s.executeUpdate("CREATE TABLE " + curTableName + " (" + + "session_name VARCHAR(64) NOT NULL, " + + "propname VARCHAR(128) NOT NULL, " + + "propval VARCHAR(1024), " + + "propclass VARCHAR(32) NOT NULL, " + + "CONSTRAINT " + curTableName + "_unq UNIQUE " + + "(session_name, propname, propclass))"); + + // Then set a property in the root table pointing to it. + setRootProperty(SESSION_TABLE_KEY, 0, curTableName); + connection.commit(); + } finally { + s.close(); + } + + this.sessionTableName = curTableName; + } + + /** + * Given a root schema that exists, + * initialize a version-0 key/value storage schema on top of it, + * if it does not already exist. + */ + private void initV0Schema() throws SQLException { + this.sessionTableName = getRootProperty(SESSION_TABLE_KEY, 0); + if (null == this.sessionTableName) { + createSessionTable(); + } + if (!tableExists(this.sessionTableName)) { + LOG.debug("Could not find session table: " + sessionTableName); + createSessionTable(); + } + } + + /** + * INSERT or UPDATE a single (session, propname, class) to point + * to the specified property value. + */ + private void setV0Property(String sessionName, String propClass, + String propName, String propVal) throws SQLException { + LOG.debug("Session: " + sessionName + "; Setting property " + + propName + " with class " + propClass + " => " + propVal); + + PreparedStatement s = null; + try { + String curValue = getV0Property(sessionName, propClass, propName); + if (null == curValue) { + // Property is not yet set. + s = connection.prepareStatement("INSERT INTO " + this.sessionTableName + + " (propval, session_name, propclass, propname) " + + "VALUES (?, ?, ?, ?)"); + } else { + // Overwrite existing property. + s = connection.prepareStatement("UPDATE " + this.sessionTableName + + " SET propval = ? WHERE session_name = ? AND propclass = ? " + + "AND propname = ?"); + } + + s.setString(1, propVal); + s.setString(2, sessionName); + s.setString(3, propClass); + s.setString(4, propName); + + s.executeUpdate(); + } finally { + if (null != s) { + s.close(); + } + } + } + + /** + * Return a string containing the value of a specified property, + * or null if it is not set. + */ + private String getV0Property(String sessionName, String propClass, + String propertyName) throws SQLException { + LOG.debug("Session: " + sessionName + "; Getting property " + + propertyName + " with class " + propClass); + + ResultSet rs = null; + PreparedStatement s = connection.prepareStatement( + "SELECT propval FROM " + this.sessionTableName + + " WHERE session_name = ? AND propclass = ? AND propname = ?"); + + try { + s.setString(1, sessionName); + s.setString(2, propClass); + s.setString(3, propertyName); + rs = s.executeQuery(); + + if (!rs.next()) { + LOG.debug(" => (no result)"); + return null; + } + + String result = rs.getString(1); + LOG.debug(" => " + result); + return result; + } finally { + if (null != rs) { + try { + rs.close(); + } catch (SQLException sqlE) { + LOG.warn("Error closing resultset: " + sqlE); + } + } + + s.close(); + } + } + + /** + * Get a java.util.Properties containing all propName -> propVal + * bindings for a given (sessionName, propClass). + */ + private Properties getV0Properties(String sessionName, String propClass) + throws SQLException { + LOG.debug("Session: " + sessionName + + "; Getting properties with class " + propClass); + + ResultSet rs = null; + PreparedStatement s = connection.prepareStatement( + "SELECT propname, propval FROM " + this.sessionTableName + + " WHERE session_name = ? AND propclass = ?"); + try { + s.setString(1, sessionName); + s.setString(2, propClass); + rs = s.executeQuery(); + + Properties p = new Properties(); + while (rs.next()) { + p.setProperty(rs.getString(1), rs.getString(2)); + } + + return p; + } finally { + if (null != rs) { + try { + rs.close(); + } catch (SQLException sqlE) { + LOG.warn("Error closing result set: " + sqlE); + } + } + + s.close(); + } + } + + private void setV0Properties(String sessionName, String propClass, + Properties properties) throws SQLException { + LOG.debug("Session: " + sessionName + + "; Setting bulk properties for class " + propClass); + + for (Map.Entry entry : properties.entrySet()) { + String key = entry.getKey().toString(); + String val = entry.getValue().toString(); + setV0Property(sessionName, propClass, key, val); + } + } +} + diff --git a/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java b/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java index 6ed2114f..250317db 100644 --- a/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java +++ b/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java @@ -22,7 +22,9 @@ import java.util.Arrays; import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.OptionGroup; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.util.StringUtils; @@ -119,6 +121,17 @@ public abstract class BaseSqoopTool extends SqoopTool { public static final String HBASE_CREATE_TABLE_ARG = "hbase-create-table"; + // Arguments for the session management system. + public static final String SESSION_METASTORE_ARG = "meta-connect"; + public static final String SESSION_CMD_CREATE_ARG = "create"; + public static final String SESSION_CMD_DELETE_ARG = "delete"; + public static final String SESSION_CMD_EXEC_ARG = "exec"; + public static final String SESSION_CMD_LIST_ARG = "list"; + public static final String SESSION_CMD_SHOW_ARG = "show"; + + // Arguments for the metastore. + public static final String METASTORE_SHUTDOWN_ARG = "shutdown"; + public BaseSqoopTool() { } @@ -234,6 +247,65 @@ protected boolean hasUnrecognizedArgs(String [] argv) { return null; } + /** + * @return RelatedOptions used by session management tools. + */ + protected RelatedOptions getSessionOptions() { + RelatedOptions relatedOpts = new RelatedOptions( + "Session management arguments"); + relatedOpts.addOption(OptionBuilder.withArgName("jdbc-uri") + .hasArg() + .withDescription("Specify JDBC connect string for the metastore") + .withLongOpt(SESSION_METASTORE_ARG) + .create()); + + // Create an option-group surrounding the operations a user + // can perform on sessions. + OptionGroup group = new OptionGroup(); + group.addOption(OptionBuilder.withArgName("session-id") + .hasArg() + .withDescription("Create a new session") + .withLongOpt(SESSION_CMD_CREATE_ARG) + .create()); + group.addOption(OptionBuilder.withArgName("session-id") + .hasArg() + .withDescription("Delete a saved session") + .withLongOpt(SESSION_CMD_DELETE_ARG) + .create()); + group.addOption(OptionBuilder.withArgName("session-id") + .hasArg() + .withDescription("Show the parameters for a saved session") + .withLongOpt(SESSION_CMD_SHOW_ARG) + .create()); + + Option execOption = OptionBuilder.withArgName("session-id") + .hasArg() + .withDescription("Run a saved session") + .withLongOpt(SESSION_CMD_EXEC_ARG) + .create(); + group.addOption(execOption); + + group.addOption(OptionBuilder + .withDescription("List saved sessions") + .withLongOpt(SESSION_CMD_LIST_ARG) + .create()); + + relatedOpts.addOptionGroup(group); + + // Since the "common" options aren't used in the session tool, + // add these settings here. + relatedOpts.addOption(OptionBuilder + .withDescription("Print more information while working") + .withLongOpt(VERBOSE_ARG) + .create()); + relatedOpts.addOption(OptionBuilder + .withDescription("Print usage instructions") + .withLongOpt(HELP_ARG) + .create()); + + return relatedOpts; + } + /** * @return RelatedOptions used by most/all Sqoop tools. */ @@ -690,5 +762,22 @@ protected void validateHBaseOptions(SqoopOptions options) + HELP_STR); } } + + /** + * Given an array of extra arguments (usually populated via + * this.extraArguments), determine the offset of the first '--' + * argument in the list. Return 'extra.length' if there is none. + */ + protected int getDashPosition(String [] extra) { + int dashPos = extra.length; + for (int i = 0; i < extra.length; i++) { + if (extra[i].equals("--")) { + dashPos = i; + break; + } + } + + return dashPos; + } } diff --git a/src/java/com/cloudera/sqoop/tool/ImportTool.java b/src/java/com/cloudera/sqoop/tool/ImportTool.java index b69c17d8..0ec5310d 100644 --- a/src/java/com/cloudera/sqoop/tool/ImportTool.java +++ b/src/java/com/cloudera/sqoop/tool/ImportTool.java @@ -445,13 +445,7 @@ public void validateOptions(SqoopOptions options) // If extraArguments is full, check for '--' followed by args for // mysqldump or other commands we rely on. options.setExtraArgs(getSubcommandArgs(extraArguments)); - int dashPos = extraArguments.length; - for (int i = 0; i < extraArguments.length; i++) { - if (extraArguments[i].equals("--")) { - dashPos = i; - break; - } - } + int dashPos = getDashPosition(extraArguments); if (hasUnrecognizedArgs(extraArguments, 0, dashPos)) { throw new InvalidOptionsException(HELP_STR); diff --git a/src/java/com/cloudera/sqoop/tool/MetastoreTool.java b/src/java/com/cloudera/sqoop/tool/MetastoreTool.java new file mode 100644 index 00000000..95edb90c --- /dev/null +++ b/src/java/com/cloudera/sqoop/tool/MetastoreTool.java @@ -0,0 +1,92 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop.tool; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import com.cloudera.sqoop.SqoopOptions; +import com.cloudera.sqoop.SqoopOptions.InvalidOptionsException; +import com.cloudera.sqoop.cli.RelatedOptions; +import com.cloudera.sqoop.cli.ToolOptions; + +import com.cloudera.sqoop.metastore.hsqldb.HsqldbMetaStore; + +/** + * Tool that runs a standalone Sqoop metastore. + */ +public class MetastoreTool extends BaseSqoopTool { + + public static final Log LOG = LogFactory.getLog( + MetastoreTool.class.getName()); + + private HsqldbMetaStore metastore; + + // If set to true, shut an existing metastore down. + private boolean shutdown = false; + + public MetastoreTool() { + super("metastore"); + } + + @Override + /** {@inheritDoc} */ + public int run(SqoopOptions options) { + metastore = new HsqldbMetaStore(options.getConf()); + if (shutdown) { + LOG.info("Shutting down metastore..."); + metastore.shutdown(); + } else { + metastore.start(); + metastore.waitForServer(); + LOG.info("Server thread has quit."); + } + return 0; + } + + @Override + /** Configure the command-line arguments we expect to receive */ + public void configureOptions(ToolOptions toolOptions) { + RelatedOptions opts = new RelatedOptions("metastore arguments"); + opts.addOption(OptionBuilder + .withDescription("Cleanly shut down a running metastore") + .withLongOpt(METASTORE_SHUTDOWN_ARG) + .create()); + + toolOptions.addUniqueOptions(opts); + } + + @Override + /** {@inheritDoc} */ + public void applyOptions(CommandLine in, SqoopOptions out) + throws InvalidOptionsException { + if (in.hasOption(METASTORE_SHUTDOWN_ARG)) { + this.shutdown = true; + } + } + + @Override + /** {@inheritDoc} */ + public void validateOptions(SqoopOptions options) + throws InvalidOptionsException { + } +} + diff --git a/src/java/com/cloudera/sqoop/tool/SessionTool.java b/src/java/com/cloudera/sqoop/tool/SessionTool.java new file mode 100644 index 00000000..af6acc4a --- /dev/null +++ b/src/java/com/cloudera/sqoop/tool/SessionTool.java @@ -0,0 +1,400 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop.tool; + +import java.io.IOException; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.TreeMap; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.ParseException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.conf.Configuration; + +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.ToolRunner; + +import org.apache.log4j.Category; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + +import com.cloudera.sqoop.Sqoop; +import com.cloudera.sqoop.SqoopOptions; +import com.cloudera.sqoop.SqoopOptions.InvalidOptionsException; +import com.cloudera.sqoop.cli.ToolOptions; + +import com.cloudera.sqoop.metastore.hsqldb.HsqldbSessionStorage; +import com.cloudera.sqoop.metastore.SessionData; +import com.cloudera.sqoop.metastore.SessionStorage; +import com.cloudera.sqoop.metastore.SessionStorageFactory; + +/** + * Tool that creates and executes saved sessions. + */ +public class SessionTool extends BaseSqoopTool { + + public static final Log LOG = LogFactory.getLog( + SessionTool.class.getName()); + + private enum SessionOp { + SessionCreate, + SessionDelete, + SessionExecute, + SessionList, + SessionShow, + }; + + private Map sessionDescriptor; + private String sessionName; + private SessionOp operation; + private SessionStorage storage; + + public SessionTool() { + super("session"); + } + + /** + * Given an array of strings, return all elements of this + * array up to (but not including) the first instance of "--". + */ + private String [] getElementsUpToDoubleDash(String [] array) { + String [] parseableChildArgv = null; + for (int i = 0; i < array.length; i++) { + if ("--".equals(array[i])) { + parseableChildArgv = Arrays.copyOfRange(array, 0, i); + break; + } + } + + if (parseableChildArgv == null) { + // Didn't find any nested '--'. + parseableChildArgv = array; + } + + return parseableChildArgv; + } + + /** + * Given an array of strings, return the first instance + * of "--" and all following elements. + * If no "--" exists, return null. + */ + private String [] getElementsAfterDoubleDash(String [] array) { + String [] extraChildArgv = null; + for (int i = 0; i < array.length; i++) { + if ("--".equals(array[i])) { + extraChildArgv = Arrays.copyOfRange(array, i, array.length); + break; + } + } + + return extraChildArgv; + } + + private int configureChildTool(SqoopOptions childOptions, + SqoopTool childTool, String [] childArgv) { + // Within the child arguments there may be a '--' followed by + // dependent args. Stash them off to the side. + + // Everything up to the '--'. + String [] parseableChildArgv = getElementsUpToDoubleDash(childArgv); + + // The '--' and any subsequent args. + String [] extraChildArgv = getElementsAfterDoubleDash(childArgv); + + // Now feed the arguments into the tool itself. + try { + childOptions = childTool.parseArguments(parseableChildArgv, + null, childOptions, false); + childTool.appendArgs(extraChildArgv); + childTool.validateOptions(childOptions); + } catch (ParseException pe) { + LOG.error("Error parsing arguments to the session-specific tool."); + LOG.error("See 'sqoop help ' for usage."); + return 1; + } catch (SqoopOptions.InvalidOptionsException e) { + System.err.println(e.getMessage()); + return 1; + } + + return 0; // Success. + } + + private int createSession(SqoopOptions options) throws IOException { + // In our extraArguments array, we should have a '--' followed by + // a tool name, and any tool-specific arguments. + // Create an instance of the named tool and then configure it to + // get a SqoopOptions out which we will serialize into a session. + int dashPos = getDashPosition(extraArguments); + int toolArgPos = dashPos + 1; + if (null == extraArguments || toolArgPos < 0 + || toolArgPos >= extraArguments.length) { + LOG.error("No tool specified; cannot create a session."); + LOG.error("Use: sqoop create-session [session-args] " + + "-- [tool-args]"); + return 1; + } + + String sessionToolName = extraArguments[toolArgPos]; + SqoopTool sessionTool = SqoopTool.getTool(sessionToolName); + if (null == sessionTool) { + LOG.error("No such tool available: " + sessionToolName); + return 1; + } + + // Create a SqoopOptions and Configuration based on the current one, + // but deep-copied. This will be populated within the session. + SqoopOptions sessionOptions = new SqoopOptions(); + sessionOptions.setConf(new Configuration(options.getConf())); + + // Get the arguments to feed to the child tool. + String [] childArgs = Arrays.copyOfRange(extraArguments, toolArgPos + 1, + extraArguments.length); + + int confRet = configureChildTool(sessionOptions, sessionTool, childArgs); + if (0 != confRet) { + // Error. + return confRet; + } + + // Now that the tool is fully configured, materialize the session. + SessionData sessionData = new SessionData(sessionOptions, sessionTool); + this.storage.create(sessionName, sessionData); + return 0; // Success. + } + + private int listSessions(SqoopOptions opts) throws IOException { + List sessionNames = storage.list(); + System.out.println("Available sessions:"); + for (String name : sessionNames) { + System.out.println(" " + name); + } + return 0; + } + + private int deleteSession(SqoopOptions opts) throws IOException { + this.storage.delete(sessionName); + return 0; + } + + private int execSession(SqoopOptions opts) throws IOException { + SessionData data = this.storage.read(sessionName); + if (null == data) { + LOG.error("No such session: " + sessionName); + return 1; + } + + SqoopOptions childOpts = data.getSqoopOptions(); + SqoopTool childTool = data.getSqoopTool(); + + int dashPos = getDashPosition(extraArguments); + String [] childArgv; + if (dashPos >= extraArguments.length) { + childArgv = new String[0]; + } else { + childArgv = Arrays.copyOfRange(extraArguments, dashPos + 1, + extraArguments.length); + } + + int confRet = configureChildTool(childOpts, childTool, childArgv); + if (0 != confRet) { + // Error. + return confRet; + } + + return childTool.run(childOpts); + } + + private int showSession(SqoopOptions opts) throws IOException { + SessionData data = this.storage.read(sessionName); + if (null == data) { + LOG.error("No such session: " + sessionName); + return 1; + } + + SqoopOptions childOpts = data.getSqoopOptions(); + SqoopTool childTool = data.getSqoopTool(); + + System.out.println("Session: " + sessionName); + System.out.println("Tool: " + childTool.getToolName()); + + System.out.println("Options:"); + System.out.println("----------------------------"); + Properties props = childOpts.writeProperties(); + for (Map.Entry entry : props.entrySet()) { + System.out.println(entry.getKey().toString() + " = " + entry.getValue()); + } + + // TODO: This does not show entries in the Configuration + // (SqoopOptions.getConf()) which were stored as different from the + // default. + + return 0; + } + + @Override + /** {@inheritDoc} */ + public int run(SqoopOptions options) { + // Get a SessionStorage instance to use to materialize this session. + SessionStorageFactory ssf = new SessionStorageFactory(options.getConf()); + this.storage = ssf.getSessionStorage(sessionDescriptor); + if (null == this.storage) { + LOG.error("There is no SessionStorage implementation available"); + LOG.error("that can read your specified session descriptor."); + LOG.error("Don't know where to save this session info! You may"); + LOG.error("need to specify the connect string with --meta-connect."); + return 1; + } + + try { + // Open the storage layer. + this.storage.open(this.sessionDescriptor); + + // And now determine what operation to perform with it. + switch (operation) { + case SessionCreate: + return createSession(options); + case SessionDelete: + return deleteSession(options); + case SessionExecute: + return execSession(options); + case SessionList: + return listSessions(options); + case SessionShow: + return showSession(options); + default: + LOG.error("Undefined session operation: " + operation); + return 1; + } + } catch (IOException ioe) { + LOG.error("I/O error performing session operation: " + + StringUtils.stringifyException(ioe)); + return 1; + } finally { + if (null != this.storage) { + try { + storage.close(); + } catch (IOException ioe) { + LOG.warn("IOException closing SessionStorage: " + + StringUtils.stringifyException(ioe)); + } + } + } + } + + @Override + /** Configure the command-line arguments we expect to receive */ + public void configureOptions(ToolOptions toolOptions) { + toolOptions.addUniqueOptions(getSessionOptions()); + } + + @Override + /** {@inheritDoc} */ + public void applyOptions(CommandLine in, SqoopOptions out) + throws InvalidOptionsException { + + if (in.hasOption(VERBOSE_ARG)) { + // Immediately switch into DEBUG logging. + Category sqoopLogger = Logger.getLogger( + Sqoop.class.getName()).getParent(); + sqoopLogger.setLevel(Level.DEBUG); + LOG.debug("Enabled debug logging."); + } + + if (in.hasOption(HELP_ARG)) { + ToolOptions toolOpts = new ToolOptions(); + configureOptions(toolOpts); + printHelp(toolOpts); + throw new InvalidOptionsException(""); + } + + this.sessionDescriptor = new TreeMap(); + + if (in.hasOption(SESSION_METASTORE_ARG)) { + this.sessionDescriptor.put(HsqldbSessionStorage.META_CONNECT_KEY, + in.getOptionValue(SESSION_METASTORE_ARG)); + } + + // These are generated via an option group; exactly one + // of this exhaustive list will always be selected. + if (in.hasOption(SESSION_CMD_CREATE_ARG)) { + this.operation = SessionOp.SessionCreate; + this.sessionName = in.getOptionValue(SESSION_CMD_CREATE_ARG); + } else if (in.hasOption(SESSION_CMD_DELETE_ARG)) { + this.operation = SessionOp.SessionDelete; + this.sessionName = in.getOptionValue(SESSION_CMD_DELETE_ARG); + } else if (in.hasOption(SESSION_CMD_EXEC_ARG)) { + this.operation = SessionOp.SessionExecute; + this.sessionName = in.getOptionValue(SESSION_CMD_EXEC_ARG); + } else if (in.hasOption(SESSION_CMD_LIST_ARG)) { + this.operation = SessionOp.SessionList; + } else if (in.hasOption(SESSION_CMD_SHOW_ARG)) { + this.operation = SessionOp.SessionShow; + this.sessionName = in.getOptionValue(SESSION_CMD_SHOW_ARG); + } + } + + @Override + /** {@inheritDoc} */ + public void validateOptions(SqoopOptions options) + throws InvalidOptionsException { + + if (null == operation + || (null == this.sessionName && operation != SessionOp.SessionList)) { + throw new InvalidOptionsException("No session operation specified" + + HELP_STR); + } + + if (operation == SessionOp.SessionCreate) { + // Check that we have a '--' followed by at least a tool name. + if (extraArguments == null || extraArguments.length == 0) { + throw new InvalidOptionsException( + "Expected: -- [tool-args] " + + HELP_STR); + } + } + + int dashPos = getDashPosition(extraArguments); + if (hasUnrecognizedArgs(extraArguments, 0, dashPos)) { + throw new InvalidOptionsException(HELP_STR); + } + } + + @Override + /** {@inheritDoc} */ + public void printHelp(ToolOptions opts) { + System.out.println("usage: sqoop " + getToolName() + + " [GENERIC-ARGS] [SESSION-ARGS] [-- [] [TOOL-ARGS]]"); + System.out.println(""); + + opts.printHelp(); + + System.out.println(""); + System.out.println("Generic Hadoop command-line arguments:"); + System.out.println("(must preceed any tool-specific arguments)"); + ToolRunner.printGenericCommandUsage(System.out); + } +} + diff --git a/src/java/com/cloudera/sqoop/tool/SqoopTool.java b/src/java/com/cloudera/sqoop/tool/SqoopTool.java index b7f68cb4..e24f0320 100644 --- a/src/java/com/cloudera/sqoop/tool/SqoopTool.java +++ b/src/java/com/cloudera/sqoop/tool/SqoopTool.java @@ -73,6 +73,10 @@ public abstract class SqoopTool { "List available databases on a server"); registerTool("list-tables", ListTablesTool.class, "List available tables in a database"); + registerTool("metastore", MetastoreTool.class, + "Run a standalone Sqoop metastore"); + registerTool("session", SessionTool.class, + "Work with saved sessions"); registerTool("version", VersionTool.class, "Display version information"); } diff --git a/src/test/com/cloudera/sqoop/SmokeTests.java b/src/test/com/cloudera/sqoop/SmokeTests.java index e3b48ae5..cebd641e 100644 --- a/src/test/com/cloudera/sqoop/SmokeTests.java +++ b/src/test/com/cloudera/sqoop/SmokeTests.java @@ -30,6 +30,8 @@ import com.cloudera.sqoop.manager.TestHsqldbManager; import com.cloudera.sqoop.manager.TestSqlManager; import com.cloudera.sqoop.mapreduce.MapreduceTests; + +import com.cloudera.sqoop.metastore.TestSessions; import com.cloudera.sqoop.orm.TestClassWriter; import com.cloudera.sqoop.orm.TestParseMethods; import com.cloudera.sqoop.util.TestDirectImportUtils; @@ -74,6 +76,7 @@ public static Test suite() { suite.addTestSuite(TestDirectImportUtils.class); suite.addTestSuite(TestLobFile.class); suite.addTestSuite(TestExportUpdate.class); + suite.addTestSuite(TestSessions.class); suite.addTest(MapreduceTests.suite()); return suite; diff --git a/src/test/com/cloudera/sqoop/TestSqoopOptions.java b/src/test/com/cloudera/sqoop/TestSqoopOptions.java index 0ae87be9..aad4308c 100644 --- a/src/test/com/cloudera/sqoop/TestSqoopOptions.java +++ b/src/test/com/cloudera/sqoop/TestSqoopOptions.java @@ -18,12 +18,13 @@ package com.cloudera.sqoop; +import java.util.Properties; + import junit.framework.TestCase; import com.cloudera.sqoop.lib.DelimiterSet; import com.cloudera.sqoop.tool.ImportTool; - /** * Test aspects of the SqoopOptions class. */ @@ -234,4 +235,30 @@ public void testGoodNumMappers() throws Exception { SqoopOptions opts = parse(args); assertEquals(4, opts.getNumMappers()); } + + public void testPropertySerialization() { + // Test that if we write a SqoopOptions out to a Properties, + // and then read it back in, we get all the same results. + SqoopOptions out = new SqoopOptions(); + out.setUsername("user"); + out.setConnectString("bla"); + out.setNumMappers(4); + out.setAppendMode(true); + out.setHBaseTable("hbasetable"); + out.setWarehouseDir("Warehouse"); + out.setClassName("someclass"); + out.setSplitByCol("somecol"); + out.setSqlQuery("the query"); + out.setPackageName("a.package"); + out.setHiveImport(true); + + Properties outProps = out.writeProperties(); + + SqoopOptions in = new SqoopOptions(); + in.loadProperties(outProps); + + Properties inProps = in.writeProperties(); + + assertEquals("properties don't match", outProps, inProps); + } } diff --git a/src/test/com/cloudera/sqoop/metastore/TestSessions.java b/src/test/com/cloudera/sqoop/metastore/TestSessions.java new file mode 100644 index 00000000..b4c3fac4 --- /dev/null +++ b/src/test/com/cloudera/sqoop/metastore/TestSessions.java @@ -0,0 +1,205 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop.metastore; + +import java.sql.SQLException; +import java.sql.Statement; + +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.hadoop.conf.Configuration; + +import com.cloudera.sqoop.SqoopOptions; +import com.cloudera.sqoop.manager.HsqldbManager; +import com.cloudera.sqoop.metastore.hsqldb.AutoHsqldbStorage; +import com.cloudera.sqoop.tool.VersionTool; + +import junit.framework.TestCase; + +import java.io.IOException; +import java.sql.Connection; + +/** + * Test the metastore and session-handling features. + * + * These all make use of the auto-connect hsqldb-based metastore. + * The metastore URL is configured to be in-memory, and drop all + * state between individual tests. + */ +public class TestSessions extends TestCase { + + public static final String TEST_AUTOCONNECT_URL = "jdbc:hsqldb:mem:testdb"; + public static final String TEST_AUTOCONNECT_USER = "SA"; + public static final String TEST_AUTOCONNECT_PASS = ""; + + @Override + public void setUp() throws Exception { + // Delete db state between tests. + resetSessionSchema(); + } + + public static void resetSessionSchema() throws SQLException { + SqoopOptions options = new SqoopOptions(); + options.setConnectString(TEST_AUTOCONNECT_URL); + options.setUsername(TEST_AUTOCONNECT_USER); + options.setPassword(TEST_AUTOCONNECT_PASS); + + HsqldbManager manager = new HsqldbManager(options); + Connection c = manager.getConnection(); + Statement s = c.createStatement(); + try { + String [] tables = manager.listTables(); + for (String table : tables) { + s.executeUpdate("DROP TABLE " + table); + } + + c.commit(); + } finally { + s.close(); + } + } + + public static Configuration newConf() { + Configuration conf = new Configuration(); + conf.set(AutoHsqldbStorage.AUTO_STORAGE_USER_KEY, TEST_AUTOCONNECT_USER); + conf.set(AutoHsqldbStorage.AUTO_STORAGE_PASS_KEY, TEST_AUTOCONNECT_PASS); + conf.set(AutoHsqldbStorage.AUTO_STORAGE_CONNECT_STRING_KEY, + TEST_AUTOCONNECT_URL); + + return conf; + } + + public void testAutoConnect() throws IOException { + // By default, we should be able to auto-connect with an + // empty connection descriptor. We should see an empty + // session set. + + Configuration conf = newConf(); + SessionStorageFactory ssf = new SessionStorageFactory(conf); + + Map descriptor = new TreeMap(); + SessionStorage storage = ssf.getSessionStorage(descriptor); + + storage.open(descriptor); + List sessions = storage.list(); + assertEquals(0, sessions.size()); + storage.close(); + } + + public void testCreateDeleteSession() throws IOException { + Configuration conf = newConf(); + SessionStorageFactory ssf = new SessionStorageFactory(conf); + + Map descriptor = new TreeMap(); + SessionStorage storage = ssf.getSessionStorage(descriptor); + + storage.open(descriptor); + + // Session list should start out empty. + List sessions = storage.list(); + assertEquals(0, sessions.size()); + + // Create a session that displays the version. + SessionData data = new SessionData(new SqoopOptions(), new VersionTool()); + storage.create("versionSession", data); + + sessions = storage.list(); + assertEquals(1, sessions.size()); + assertEquals("versionSession", sessions.get(0)); + + // Try to create that same session name again. This should fail. + try { + storage.create("versionSession", data); + fail("Expected IOException; this session already exists."); + } catch (IOException ioe) { + // This is expected; continue operation. + } + + sessions = storage.list(); + assertEquals(1, sessions.size()); + + // Restore our session, check that it exists. + SessionData outData = storage.read("versionSession"); + assertEquals(new VersionTool().getToolName(), + outData.getSqoopTool().getToolName()); + + // Try to restore a session that doesn't exist. Watch it fail. + try { + storage.read("DoesNotExist"); + fail("Expected IOException"); + } catch (IOException ioe) { + // This is expected. Continue. + } + + // Now delete the session. + storage.delete("versionSession"); + + // After delete, we should have no sessions. + sessions = storage.list(); + assertEquals(0, sessions.size()); + + storage.close(); + } + + public void testMultiConnections() throws IOException { + // Ensure that a session can be retrieved when the storage is + // closed and reopened. + + Configuration conf = newConf(); + SessionStorageFactory ssf = new SessionStorageFactory(conf); + + Map descriptor = new TreeMap(); + SessionStorage storage = ssf.getSessionStorage(descriptor); + + storage.open(descriptor); + + // Session list should start out empty. + List sessions = storage.list(); + assertEquals(0, sessions.size()); + + // Create a session that displays the version. + SessionData data = new SessionData(new SqoopOptions(), new VersionTool()); + storage.create("versionSession", data); + + sessions = storage.list(); + assertEquals(1, sessions.size()); + assertEquals("versionSession", sessions.get(0)); + + storage.close(); // Close the existing connection + + // Now re-open the storage. + ssf = new SessionStorageFactory(newConf()); + storage = ssf.getSessionStorage(descriptor); + storage.open(descriptor); + + sessions = storage.list(); + assertEquals(1, sessions.size()); + assertEquals("versionSession", sessions.get(0)); + + // Restore our session, check that it exists. + SessionData outData = storage.read("versionSession"); + assertEquals(new VersionTool().getToolName(), + outData.getSqoopTool().getToolName()); + + storage.close(); + } +} + diff --git a/src/test/findbugsExcludeFile.xml b/src/test/findbugsExcludeFile.xml index d6284550..16bba2b8 100644 --- a/src/test/findbugsExcludeFile.xml +++ b/src/test/findbugsExcludeFile.xml @@ -40,6 +40,14 @@ + + + + + +