diff --git a/src/java/com/cloudera/sqoop/SqoopOptions.java b/src/java/com/cloudera/sqoop/SqoopOptions.java index ed52e08f..4327757f 100644 --- a/src/java/com/cloudera/sqoop/SqoopOptions.java +++ b/src/java/com/cloudera/sqoop/SqoopOptions.java @@ -171,6 +171,10 @@ public enum IncrementalMode { // to external files on disk. @StoredAsProperty("import.max.inline.lob.size") private long maxInlineLobSize; + // Max number 'n' of rows to fetch from the + // database when more rows are needed. + @StoredAsProperty("import.fetch.size") private Integer fetchSize; + // HDFS path to read from when performing an export @StoredAsProperty("export.source.dir") private String exportDir; @@ -427,6 +431,11 @@ public void loadProperties(Properties props) { getLongProperty(props, propName, f.getLong(this))); } else if (typ.equals(String.class)) { f.set(this, props.getProperty(propName, (String) f.get(this))); + } else if (typ.equals(Integer.class)) { + String value = props.getProperty( + propName, + f.get(this) == null ? "null" : f.get(this).toString()); + f.set(this, value.equals("null") ? null : new Integer(value)); } else if (typ.isEnum()) { f.set(this, Enum.valueOf(typ, props.getProperty(propName, f.get(this).toString()))); @@ -502,6 +511,11 @@ public Properties writeProperties() { putProperty(props, propName, Long.toString(f.getLong(this))); } else if (typ.equals(String.class)) { putProperty(props, propName, (String) f.get(this)); + } else if (typ.equals(Integer.class)) { + putProperty( + props, + propName, + f.get(this) == null ? "null" : f.get(this).toString()); } else if (typ.isEnum()) { putProperty(props, propName, f.get(this).toString()); } else { @@ -675,6 +689,11 @@ private void initDefaults(Configuration baseConfiguration) { this.maxInlineLobSize = LargeObjectLoader.DEFAULT_MAX_LOB_LENGTH; + // Don't set a default value for fetchsize. This allows a JDBCManager to + // provide a database-specific default, if no value is provided by the + // user. + this.fetchSize = null; + if (null == baseConfiguration) { this.conf = new Configuration(); } else { @@ -1327,6 +1346,14 @@ public void setInlineLobLimit(long limit) { this.maxInlineLobSize = limit; } + public Integer getFetchSize() { + return this.fetchSize; + } + + public void setFetchSize(Integer size) { + this.fetchSize = size; + } + /** * @return true if the delimiters have been explicitly set by the user. */ diff --git a/src/java/com/cloudera/sqoop/manager/DirectMySQLManager.java b/src/java/com/cloudera/sqoop/manager/DirectMySQLManager.java index 231ec9dd..7210f88e 100644 --- a/src/java/com/cloudera/sqoop/manager/DirectMySQLManager.java +++ b/src/java/com/cloudera/sqoop/manager/DirectMySQLManager.java @@ -38,7 +38,7 @@ public class DirectMySQLManager extends MySQLManager { DirectMySQLManager.class.getName()); public DirectMySQLManager(final SqoopOptions options) { - super(options, false); + super(options); } /** diff --git a/src/java/com/cloudera/sqoop/manager/MySQLManager.java b/src/java/com/cloudera/sqoop/manager/MySQLManager.java index c172dddd..da2cc48e 100644 --- a/src/java/com/cloudera/sqoop/manager/MySQLManager.java +++ b/src/java/com/cloudera/sqoop/manager/MySQLManager.java @@ -22,9 +22,7 @@ import java.io.PrintWriter; import java.net.URI; import java.net.URISyntaxException; -import java.sql.PreparedStatement; import java.sql.ResultSet; -import java.sql.Statement; import java.sql.SQLException; import java.util.ArrayList; @@ -48,15 +46,24 @@ public class MySQLManager extends GenericJdbcManager { // set to true after we warn the user that we can use direct fastpath. private static boolean warningPrinted = false; - private Statement lastStatement; - public MySQLManager(final SqoopOptions opts) { super(DRIVER_CLASS, opts); } - protected MySQLManager(final SqoopOptions opts, boolean ignored) { - // constructor used by subclasses to avoid the --direct warning. - super(DRIVER_CLASS, opts); + @Override + protected void initOptionDefaults() { + if (options.getFetchSize() == null) { + LOG.info("Preparing to use a MySQL streaming resultset."); + options.setFetchSize(Integer.MIN_VALUE); + } else if ( + !options.getFetchSize().equals(Integer.MIN_VALUE) + && !options.getFetchSize().equals(0)) { + LOG.info("Argument '--fetch-size " + options.getFetchSize() + + "' will probably get ignored by MySQL JDBC driver."); + // see also + // http://dev.mysql.com/doc/refman/5.5/en + // /connector-j-reference-implementation-notes.html + } } @Override @@ -182,36 +189,6 @@ private void checkDateTimeBehavior(ImportJobContext context) { } } - /** - * Executes an arbitrary SQL statement. Sets mysql-specific parameter - * to ensure the entire table is not buffered in RAM before reading - * any rows. A consequence of this is that every ResultSet returned - * by this method *MUST* be close()'d, or read to exhaustion before - * another query can be executed from this ConnManager instance. - * - * @param stmt The SQL statement to execute - * @return A ResultSet encapsulating the results or null on error - */ - protected ResultSet execute(String stmt, Object... args) - throws SQLException { - // Free any previous resources. - release(); - - PreparedStatement statement = null; - statement = this.getConnection().prepareStatement(stmt, - ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); - this.lastStatement = statement; - statement.setFetchSize(Integer.MIN_VALUE); // MySQL: read row-at-a-time. - if (null != args) { - for (int i = 0; i < args.length; i++) { - statement.setObject(i + 1, args[i]); - } - } - - LOG.info("Executing SQL statement: " + stmt); - return statement.executeQuery(); - } - @Override public void execAndPrint(String s) { // Override default execAndPrint() with a special version that forces @@ -221,9 +198,8 @@ public void execAndPrint(String s) { ResultSet results = null; try { - // Use default execute() statement which does not issue the - // MySQL-specific setFetchSize() command. - results = super.execute(s); + // Explicitly setting fetchSize to zero disables streaming. + results = super.execute(s, 0); } catch (SQLException sqlE) { LOG.error("Error executing statement: " + StringUtils.stringifyException(sqlE)); @@ -239,18 +215,6 @@ public void execAndPrint(String s) { } } - public void release() { - if (null != this.lastStatement) { - try { - this.lastStatement.close(); - } catch (SQLException e) { - LOG.warn("Exception closing executed Statement: " + e); - } - - this.lastStatement = null; - } - } - /** * When using a column name in a generated SQL query, how (if at all) * should we escape that column name? e.g., a column named "table" diff --git a/src/java/com/cloudera/sqoop/manager/PostgresqlManager.java b/src/java/com/cloudera/sqoop/manager/PostgresqlManager.java index 3526edf1..06a3088d 100644 --- a/src/java/com/cloudera/sqoop/manager/PostgresqlManager.java +++ b/src/java/com/cloudera/sqoop/manager/PostgresqlManager.java @@ -19,9 +19,6 @@ package com.cloudera.sqoop.manager; import java.io.IOException; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.Statement; import java.sql.SQLException; import org.apache.commons.logging.Log; @@ -41,14 +38,9 @@ public class PostgresqlManager extends GenericJdbcManager { // driver class to ensure is loaded when making db connection. private static final String DRIVER_CLASS = "org.postgresql.Driver"; - // Fetch 50 rows at a time. - private static final int POSTGRESQL_FETCH_SIZE = 50; - // set to true after we warn the user that we can use direct fastpath. private static boolean warningPrinted = false; - private Statement lastStatement; - public PostgresqlManager(final SqoopOptions opts) { super(DRIVER_CLASS, opts); } @@ -98,47 +90,6 @@ public String getPrimaryKey(String tableName) { return super.getPrimaryKey(tableName.toLowerCase()); } - /** - * Executes an arbitrary SQL statement. Sets the cursor fetch size - * to ensure the entire table is not buffered in RAM before reading - * any rows. A consequence of this is that every ResultSet returned - * by this method *MUST* be close()'d, or read to exhaustion before - * another query can be executed from this ConnManager instance. - * - * @param stmt The SQL statement to execute - * @return A ResultSet encapsulating the results or null on error - */ - protected ResultSet execute(String stmt, Object... args) throws SQLException { - // Free any previous resources used. - release(); - - PreparedStatement statement = null; - statement = this.getConnection().prepareStatement(stmt, - ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); - this.lastStatement = statement; - statement.setFetchSize(POSTGRESQL_FETCH_SIZE); - if (null != args) { - for (int i = 0; i < args.length; i++) { - statement.setObject(i + 1, args[i]); - } - } - - LOG.info("Executing SQL statement: " + stmt); - return statement.executeQuery(); - } - - public void release() { - if (null != this.lastStatement) { - try { - this.lastStatement.close(); - } catch (SQLException e) { - LOG.warn("Exception closing executed Statement: " + e); - } - - this.lastStatement = null; - } - } - @Override public boolean supportsStagingForExport() { return true; diff --git a/src/java/com/cloudera/sqoop/manager/SqlManager.java b/src/java/com/cloudera/sqoop/manager/SqlManager.java index c59ee617..655c2d6e 100644 --- a/src/java/com/cloudera/sqoop/manager/SqlManager.java +++ b/src/java/com/cloudera/sqoop/manager/SqlManager.java @@ -69,6 +69,8 @@ public abstract class SqlManager extends ConnManager { public static final String SUBSTITUTE_TOKEN = DataDrivenDBInputFormat.SUBSTITUTE_TOKEN; + protected static final int DEFAULT_FETCH_SIZE = 1000; + protected SqoopOptions options; private Statement lastStatement; @@ -78,6 +80,18 @@ public abstract class SqlManager extends ConnManager { */ public SqlManager(final SqoopOptions opts) { this.options = opts; + initOptionDefaults(); + } + + /** + * Sets default values for values that were not provided by the user. + * Only options with database-specific defaults should be configured here. + */ + protected void initOptionDefaults() { + if (options.getFetchSize() == null) { + LOG.info("Using default fetchSize of " + DEFAULT_FETCH_SIZE); + options.setFetchSize(DEFAULT_FETCH_SIZE); + } } /** @@ -422,15 +436,21 @@ public void importQuery(ImportJobContext context) /** * Executes an arbitrary SQL statement. * @param stmt The SQL statement to execute + * @param fetchSize Overrides default or parameterized fetch size * @return A ResultSet encapsulating the results or null on error */ - protected ResultSet execute(String stmt, Object... args) throws SQLException { + protected ResultSet execute(String stmt, Integer fetchSize, Object... args) + throws SQLException { // Release any previously-open statement. release(); PreparedStatement statement = null; statement = this.getConnection().prepareStatement(stmt, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + if (fetchSize != null) { + LOG.debug("Using fetchSize for next query: " + fetchSize); + statement.setFetchSize(fetchSize); + } this.lastStatement = statement; if (null != args) { for (int i = 0; i < args.length; i++) { @@ -442,6 +462,15 @@ protected ResultSet execute(String stmt, Object... args) throws SQLException { return statement.executeQuery(); } + /** + * Executes an arbitrary SQL Statement. + * @param stmt The SQL statement to execute + * @return A ResultSet encapsulating the results or null on error + */ + protected ResultSet execute(String stmt, Object... args) throws SQLException { + return execute(stmt, options.getFetchSize(), args); + } + /** * Resolve a database-specific type to the Java type that should contain it. * @param sqlType diff --git a/src/java/com/cloudera/sqoop/mapreduce/DataDrivenImportJob.java b/src/java/com/cloudera/sqoop/mapreduce/DataDrivenImportJob.java index 94714cd5..49e4a3d8 100644 --- a/src/java/com/cloudera/sqoop/mapreduce/DataDrivenImportJob.java +++ b/src/java/com/cloudera/sqoop/mapreduce/DataDrivenImportJob.java @@ -106,11 +106,12 @@ protected void configureInputFormat(Job job, String tableName, String username = options.getUsername(); if (null == username || username.length() == 0) { DBConfiguration.configureDB(job.getConfiguration(), - mgr.getDriverClass(), options.getConnectString()); + mgr.getDriverClass(), options.getConnectString(), + options.getFetchSize()); } else { DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(), options.getConnectString(), - username, options.getPassword()); + username, options.getPassword(), options.getFetchSize()); } if (null != tableName) { diff --git a/src/java/com/cloudera/sqoop/mapreduce/db/DBConfiguration.java b/src/java/com/cloudera/sqoop/mapreduce/db/DBConfiguration.java index 7fcfd5bc..55715a2d 100644 --- a/src/java/com/cloudera/sqoop/mapreduce/db/DBConfiguration.java +++ b/src/java/com/cloudera/sqoop/mapreduce/db/DBConfiguration.java @@ -56,6 +56,9 @@ public class DBConfiguration { /** Password to access the database. */ public static final String PASSWORD_PROPERTY = "mapreduce.jdbc.password"; + /** Fetch size. */ + public static final String FETCH_SIZE = "mapreduce.jdbc.fetchsize"; + /** Input table name. */ public static final String INPUT_TABLE_NAME_PROPERTY = "mapreduce.jdbc.input.table.name"; @@ -103,12 +106,13 @@ public class DBConfiguration { * Sets the DB access related fields in the {@link Configuration}. * @param conf the configuration * @param driverClass JDBC Driver class name - * @param dbUrl JDBC DB access URL. + * @param dbUrl JDBC DB access URL * @param userName DB access username * @param passwd DB access passwd + * @param fetchSize DB fetch size */ public static void configureDB(Configuration conf, String driverClass, - String dbUrl, String userName, String passwd) { + String dbUrl, String userName, String passwd, Integer fetchSize) { conf.set(DRIVER_CLASS_PROPERTY, driverClass); conf.set(URL_PROPERTY, dbUrl); @@ -118,6 +122,34 @@ public static void configureDB(Configuration conf, String driverClass, if (passwd != null) { conf.set(PASSWORD_PROPERTY, passwd); } + if (fetchSize != null) { + conf.setInt(FETCH_SIZE, fetchSize); + } + } + + /** + * Sets the DB access related fields in the JobConf. + * @param job the job + * @param driverClass JDBC Driver class name + * @param dbUrl JDBC DB access URL + * @param fetchSize DB fetch size + */ + public static void configureDB(Configuration job, String driverClass, + String dbUrl, Integer fetchSize) { + configureDB(job, driverClass, dbUrl, null, null, fetchSize); + } + + /** + * Sets the DB access related fields in the {@link Configuration}. + * @param conf the configuration + * @param driverClass JDBC Driver class name + * @param dbUrl JDBC DB access URL + * @param userName DB access username + * @param passwd DB access passwd + */ + public static void configureDB(Configuration conf, String driverClass, + String dbUrl, String userName, String passwd) { + configureDB(conf, driverClass, dbUrl, userName, passwd, null); } /** @@ -128,7 +160,7 @@ public static void configureDB(Configuration conf, String driverClass, */ public static void configureDB(Configuration job, String driverClass, String dbUrl) { - configureDB(job, driverClass, dbUrl, null, null); + configureDB(job, driverClass, dbUrl, null); } private Configuration conf; @@ -160,6 +192,20 @@ public Configuration getConf() { return conf; } + public Integer getFetchSize() { + if (conf.get(DBConfiguration.FETCH_SIZE) == null) { + return null; + } + return conf.getInt(DBConfiguration.FETCH_SIZE, 0); + } + + public void setFetchSize(Integer fetchSize) { + if (fetchSize != null) { + conf.setInt(DBConfiguration.FETCH_SIZE, fetchSize); + } else { + conf.set(FETCH_SIZE, null); + } + } public String getInputTableName() { return conf.get(DBConfiguration.INPUT_TABLE_NAME_PROPERTY); } diff --git a/src/java/com/cloudera/sqoop/mapreduce/db/DBInputFormat.java b/src/java/com/cloudera/sqoop/mapreduce/db/DBInputFormat.java index e70257b6..9c8dc8ec 100644 --- a/src/java/com/cloudera/sqoop/mapreduce/db/DBInputFormat.java +++ b/src/java/com/cloudera/sqoop/mapreduce/db/DBInputFormat.java @@ -208,11 +208,6 @@ protected RecordReader createDBRecordReader( return new OracleDBRecordReader(split, inputClass, conf, getConnection(), getDBConf(), conditions, fieldNames, tableName); - } else if (dbProductName.startsWith("MYSQL")) { - // use MySQL-specific db reader. - return new MySQLDBRecordReader(split, inputClass, - conf, getConnection(), getDBConf(), conditions, fieldNames, - tableName); } else { // Generic reader. return new DBRecordReader(split, inputClass, diff --git a/src/java/com/cloudera/sqoop/mapreduce/db/DBRecordReader.java b/src/java/com/cloudera/sqoop/mapreduce/db/DBRecordReader.java index 0819c5df..d7cd7ef8 100644 --- a/src/java/com/cloudera/sqoop/mapreduce/db/DBRecordReader.java +++ b/src/java/com/cloudera/sqoop/mapreduce/db/DBRecordReader.java @@ -37,8 +37,8 @@ /** * A RecordReader that reads records from a SQL table. - * Emits LongWritables containing the record number as - * key and DBWritables as value. + * Emits LongWritables containing the record number as + * key and DBWritables as value. */ public class DBRecordReader extends RecordReader { @@ -54,9 +54,9 @@ public class DBRecordReader extends private DBInputFormat.DBInputSplit split; private long pos = 0; - + private LongWritable key = null; - + private T value = null; private Connection connection; @@ -73,11 +73,11 @@ public class DBRecordReader extends /** * @param split The InputSplit to read data for - * @throws SQLException + * @throws SQLException */ // CHECKSTYLE:OFF // TODO (aaron): Refactor constructor to take fewer arguments - public DBRecordReader(DBInputFormat.DBInputSplit split, + public DBRecordReader(DBInputFormat.DBInputSplit split, Class inputClass, Configuration conf, Connection conn, DBConfiguration dbConfig, String cond, String [] fields, String table) throws SQLException { @@ -97,10 +97,18 @@ public DBRecordReader(DBInputFormat.DBInputSplit split, protected ResultSet executeQuery(String query) throws SQLException { this.statement = connection.prepareStatement(query, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + + Integer fetchSize = dbConf.getFetchSize(); + if (fetchSize != null) { + LOG.debug("Using fetchSize for next query: " + fetchSize); + statement.setFetchSize(fetchSize); + } + + LOG.debug("Executing query: " + query); return statement.executeQuery(); } - /** Returns the query for selecting the records, + /** Returns the query for selecting the records, * subclasses can override this for custom behaviour.*/ protected String getSelectQuery() { StringBuilder query = new StringBuilder(); @@ -109,7 +117,7 @@ protected String getSelectQuery() { // Relies on LIMIT/OFFSET for splits. if(dbConf.getInputQuery() == null) { query.append("SELECT "); - + for (int i = 0; i < fieldNames.length; i++) { query.append(fieldNames[i]); if (i != fieldNames.length -1) { @@ -131,7 +139,7 @@ protected String getSelectQuery() { //PREBUILT QUERY query.append(dbConf.getInputQuery()); } - + try { query.append(" LIMIT ").append(split.getLength()); query.append(" OFFSET ").append(split.getStart()); @@ -161,7 +169,7 @@ public void close() throws IOException { } } - public void initialize(InputSplit inputSplit, TaskAttemptContext context) + public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException { //do nothing } @@ -169,7 +177,7 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext context) @Override /** {@inheritDoc} */ public LongWritable getCurrentKey() { - return key; + return key; } @Override @@ -179,7 +187,7 @@ public T getCurrentValue() { } /** - * @deprecated + * @deprecated */ @Deprecated public T createValue() { @@ -187,7 +195,7 @@ public T createValue() { } /** - * @deprecated + * @deprecated */ @Deprecated public long getPos() throws IOException { diff --git a/src/java/com/cloudera/sqoop/mapreduce/db/DataDrivenDBInputFormat.java b/src/java/com/cloudera/sqoop/mapreduce/db/DataDrivenDBInputFormat.java index 6708ceed..06fb3b74 100644 --- a/src/java/com/cloudera/sqoop/mapreduce/db/DataDrivenDBInputFormat.java +++ b/src/java/com/cloudera/sqoop/mapreduce/db/DataDrivenDBInputFormat.java @@ -288,19 +288,10 @@ protected RecordReader createDBRecordReader( LOG.debug("Creating db record reader for db product: " + dbProductName); try { - // use database product name to determine appropriate record reader. - if (dbProductName.startsWith("MYSQL")) { - // use MySQL-specific db reader. - return new MySQLDataDrivenDBRecordReader(split, inputClass, - conf, getConnection(), dbConf, dbConf.getInputConditions(), - dbConf.getInputFieldNames(), dbConf.getInputTableName()); - } else { - // Generic reader. - return new DataDrivenDBRecordReader(split, inputClass, - conf, getConnection(), dbConf, dbConf.getInputConditions(), - dbConf.getInputFieldNames(), dbConf.getInputTableName(), - dbProductName); - } + return new DataDrivenDBRecordReader(split, inputClass, + conf, getConnection(), dbConf, dbConf.getInputConditions(), + dbConf.getInputFieldNames(), dbConf.getInputTableName(), + dbProductName); } catch (SQLException ex) { throw new IOException(ex.getMessage()); } diff --git a/src/java/com/cloudera/sqoop/mapreduce/db/MySQLDBRecordReader.java b/src/java/com/cloudera/sqoop/mapreduce/db/MySQLDBRecordReader.java deleted file mode 100644 index 155d1bb2..00000000 --- a/src/java/com/cloudera/sqoop/mapreduce/db/MySQLDBRecordReader.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to Cloudera, Inc. under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Cloudera, Inc. licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.cloudera.sqoop.mapreduce.db; - -import java.sql.Connection; -import java.sql.ResultSet; -import java.sql.SQLException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.lib.db.DBWritable; - -/** - * A RecordReader that reads records from a MySQL table. - */ -public class MySQLDBRecordReader - extends DBRecordReader { - - // CHECKSTYLE:OFF - // TODO(aaron) - refactor DBRecordReader c'tor to use fewer arguments. - public MySQLDBRecordReader(DBInputFormat.DBInputSplit split, - Class inputClass, Configuration conf, Connection conn, - DBConfiguration dbConfig, String cond, String [] fields, String table) - throws SQLException { - super(split, inputClass, conf, conn, dbConfig, cond, fields, table); - } - // CHECKSTYLE:ON - - // Execute statements for mysql in unbuffered mode. - protected ResultSet executeQuery(String query) throws SQLException { - statement = getConnection().prepareStatement(query, - ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); - statement.setFetchSize(Integer.MIN_VALUE); // MySQL: read row-at-a-time. - return statement.executeQuery(); - } -} diff --git a/src/java/com/cloudera/sqoop/mapreduce/db/MySQLDataDrivenDBRecordReader.java b/src/java/com/cloudera/sqoop/mapreduce/db/MySQLDataDrivenDBRecordReader.java deleted file mode 100644 index e2fb70b0..00000000 --- a/src/java/com/cloudera/sqoop/mapreduce/db/MySQLDataDrivenDBRecordReader.java +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Licensed to Cloudera, Inc. under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Cloudera, Inc. licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.cloudera.sqoop.mapreduce.db; - -import java.sql.Connection; -import java.sql.ResultSet; -import java.sql.SQLException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.lib.db.DBWritable; - -/** - * A RecordReader that reads records from a MySQL table via - * DataDrivenDBRecordReader. - */ -public class MySQLDataDrivenDBRecordReader - extends DataDrivenDBRecordReader { - - //CHECKSTYLE:OFF - public MySQLDataDrivenDBRecordReader(DBInputFormat.DBInputSplit split, - Class inputClass, Configuration conf, Connection conn, - DBConfiguration dbConfig, String cond, String [] fields, String table) - throws SQLException { - super(split, inputClass, conf, conn, dbConfig, cond, fields, table, - "MYSQL"); - } - //CHECKSTYLE:ON - - // Execute statements for mysql in unbuffered mode. - protected ResultSet executeQuery(String query) throws SQLException { - statement = getConnection().prepareStatement(query, - ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); - statement.setFetchSize(Integer.MIN_VALUE); // MySQL: read row-at-a-time. - return statement.executeQuery(); - } -} diff --git a/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java b/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java index 22167904..81d90a26 100644 --- a/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java +++ b/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java @@ -95,6 +95,7 @@ public abstract class BaseSqoopTool extends SqoopTool { public static final String COMPRESS_SHORT_ARG = "z"; public static final String DIRECT_SPLIT_SIZE_ARG = "direct-split-size"; public static final String INLINE_LOB_LIMIT_ARG = "inline-lob-limit"; + public static final String FETCH_SIZE_ARG = "fetch-size"; public static final String EXPORT_PATH_ARG = "export-dir"; public static final String FIELDS_TERMINATED_BY_ARG = "fields-terminated-by"; public static final String LINES_TERMINATED_BY_ARG = "lines-terminated-by"; diff --git a/src/java/com/cloudera/sqoop/tool/ImportTool.java b/src/java/com/cloudera/sqoop/tool/ImportTool.java index 5a64bbec..d51238f5 100644 --- a/src/java/com/cloudera/sqoop/tool/ImportTool.java +++ b/src/java/com/cloudera/sqoop/tool/ImportTool.java @@ -97,7 +97,7 @@ private boolean isIncremental(SqoopOptions options) { return !options.getIncrementalMode().equals( SqoopOptions.IncrementalMode.None); } - + /** * If this is an incremental import, then we should save the * user's state back to the metastore (if this job was run @@ -339,19 +339,19 @@ protected boolean importTable(SqoopOptions options, String tableName, // Do the actual import. ImportJobContext context = new ImportJobContext(tableName, jarFile, options, getOutputPath(options, tableName)); - + // If we're doing an incremental import, set up the // filtering conditions used to get the latest records. if (!initIncrementalConstraints(options, context)) { return false; } - + if (null != tableName) { manager.importTable(context); } else { manager.importQuery(context); } - + if (options.isAppendMode()) { AppendUtils app = new AppendUtils(context); app.append(); @@ -366,8 +366,8 @@ protected boolean importTable(SqoopOptions options, String tableName, return true; } - - /** + + /** * @return the output path for the imported files; * in append mode this will point to a temporary folder. * if importing to hbase, this may return null. @@ -382,7 +382,7 @@ private Path getOutputPath(SqoopOptions options, String tableName) { outputPath = AppendUtils.getTempAppendDir(tableName); LOG.debug("Using temporary folder: " + outputPath.getName()); } else { - // Try in this order: target-dir or warehouse-dir + // Try in this order: target-dir or warehouse-dir if (hdfsTargetDir != null) { outputPath = new Path(hdfsTargetDir); } else if (hdfsWarehouseDir != null) { @@ -392,9 +392,9 @@ private Path getOutputPath(SqoopOptions options, String tableName) { } } - return outputPath; + return outputPath; } - + @Override /** {@inheritDoc} */ public int run(SqoopOptions options) { @@ -479,11 +479,11 @@ protected RelatedOptions getImportOptions() { importOpts.addOption(OptionBuilder .withDescription("Imports data in append mode") .withLongOpt(APPEND_ARG) - .create()); + .create()); importOpts.addOption(OptionBuilder.withArgName("dir") .hasArg().withDescription("HDFS plain table destination") .withLongOpt(TARGET_DIR_ARG) - .create()); + .create()); importOpts.addOption(OptionBuilder.withArgName("statement") .hasArg() .withDescription("Import results of SQL 'statement'") @@ -522,6 +522,12 @@ protected RelatedOptions getImportOptions() { .withDescription("Set the maximum size for an inline LOB") .withLongOpt(INLINE_LOB_LIMIT_ARG) .create()); + importOpts.addOption(OptionBuilder.withArgName("n") + .hasArg() + .withDescription("Set number 'n' of rows to fetch from the " + + "database when more rows are needed") + .withLongOpt(FETCH_SIZE_ARG) + .create()); return importOpts; } @@ -590,7 +596,7 @@ public void printHelp(ToolOptions toolOptions) { } else { System.out.println( "At minimum, you must specify --connect and --table"); - } + } System.out.println( "Arguments to mysqldump and other subprograms may be supplied"); @@ -656,7 +662,7 @@ public void applyOptions(CommandLine in, SqoopOptions out) if (in.hasOption(TARGET_DIR_ARG)) { out.setTargetDir(in.getOptionValue(TARGET_DIR_ARG)); } - + if (in.hasOption(APPEND_ARG)) { out.setAppendMode(true); } @@ -696,6 +702,10 @@ public void applyOptions(CommandLine in, SqoopOptions out) INLINE_LOB_LIMIT_ARG))); } + if (in.hasOption(FETCH_SIZE_ARG)) { + out.setFetchSize(new Integer(in.getOptionValue(FETCH_SIZE_ARG))); + } + if (in.hasOption(JAR_FILE_NAME_ARG)) { out.setExistingJarName(in.getOptionValue(JAR_FILE_NAME_ARG)); } diff --git a/src/test/com/cloudera/sqoop/TestSqoopOptions.java b/src/test/com/cloudera/sqoop/TestSqoopOptions.java index aad4308c..0546094b 100644 --- a/src/test/com/cloudera/sqoop/TestSqoopOptions.java +++ b/src/test/com/cloudera/sqoop/TestSqoopOptions.java @@ -157,7 +157,7 @@ private SqoopOptions parse(String [] argv) throws Exception { return importTool.parseArguments(argv, null, null, false); } - // test that setting output delimiters also sets input delimiters + // test that setting output delimiters also sets input delimiters public void testDelimitersInherit() throws Exception { String [] args = { "--fields-terminated-by", @@ -236,7 +236,7 @@ public void testGoodNumMappers() throws Exception { assertEquals(4, opts.getNumMappers()); } - public void testPropertySerialization() { + public void testPropertySerialization1() { // Test that if we write a SqoopOptions out to a Properties, // and then read it back in, we get all the same results. SqoopOptions out = new SqoopOptions(); @@ -251,6 +251,7 @@ public void testPropertySerialization() { out.setSqlQuery("the query"); out.setPackageName("a.package"); out.setHiveImport(true); + out.setFetchSize(null); Properties outProps = out.writeProperties(); @@ -261,4 +262,32 @@ public void testPropertySerialization() { assertEquals("properties don't match", outProps, inProps); } + + public void testPropertySerialization2() { + // Test that if we write a SqoopOptions out to a Properties, + // and then read it back in, we get all the same results. + SqoopOptions out = new SqoopOptions(); + out.setUsername("user"); + out.setConnectString("bla"); + out.setNumMappers(4); + out.setAppendMode(true); + out.setHBaseTable("hbasetable"); + out.setWarehouseDir("Warehouse"); + out.setClassName("someclass"); + out.setSplitByCol("somecol"); + out.setSqlQuery("the query"); + out.setPackageName("a.package"); + out.setHiveImport(true); + out.setFetchSize(42); + + Properties outProps = out.writeProperties(); + + SqoopOptions in = new SqoopOptions(); + in.loadProperties(outProps); + + Properties inProps = in.writeProperties(); + + assertEquals("properties don't match", outProps, inProps); + } + }