From d920b8a0e88eea08adaa80c25ca24fac02672796 Mon Sep 17 00:00:00 2001 From: Andrew Bayer Date: Fri, 22 Jul 2011 20:04:28 +0000 Subject: [PATCH] SQOOP-140. Control max. number of fetched records. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This change adds the ability of specifying the max. number of fetched records from the database. This will solve problems that may arise when importing large tables. (Michael Häusler via ahmed) From: Ahmed Radwan git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1150011 13f79535-47bb-0310-9956-ffa450edef68 --- src/java/com/cloudera/sqoop/SqoopOptions.java | 27 ++++++++ .../sqoop/manager/DirectMySQLManager.java | 2 +- .../cloudera/sqoop/manager/MySQLManager.java | 68 +++++-------------- .../sqoop/manager/PostgresqlManager.java | 49 ------------- .../cloudera/sqoop/manager/SqlManager.java | 31 ++++++++- .../sqoop/mapreduce/DataDrivenImportJob.java | 5 +- .../sqoop/mapreduce/db/DBConfiguration.java | 52 +++++++++++++- .../sqoop/mapreduce/db/DBInputFormat.java | 5 -- .../sqoop/mapreduce/db/DBRecordReader.java | 34 ++++++---- .../mapreduce/db/DataDrivenDBInputFormat.java | 17 ++--- .../mapreduce/db/MySQLDBRecordReader.java | 51 -------------- .../db/MySQLDataDrivenDBRecordReader.java | 52 -------------- .../cloudera/sqoop/tool/BaseSqoopTool.java | 1 + .../com/cloudera/sqoop/tool/ImportTool.java | 36 ++++++---- .../com/cloudera/sqoop/TestSqoopOptions.java | 33 ++++++++- 15 files changed, 206 insertions(+), 257 deletions(-) delete mode 100644 src/java/com/cloudera/sqoop/mapreduce/db/MySQLDBRecordReader.java delete mode 100644 src/java/com/cloudera/sqoop/mapreduce/db/MySQLDataDrivenDBRecordReader.java diff --git a/src/java/com/cloudera/sqoop/SqoopOptions.java b/src/java/com/cloudera/sqoop/SqoopOptions.java index ed52e08f..4327757f 100644 --- a/src/java/com/cloudera/sqoop/SqoopOptions.java +++ b/src/java/com/cloudera/sqoop/SqoopOptions.java @@ -171,6 +171,10 @@ public enum IncrementalMode { // to external files on disk. @StoredAsProperty("import.max.inline.lob.size") private long maxInlineLobSize; + // Max number 'n' of rows to fetch from the + // database when more rows are needed. + @StoredAsProperty("import.fetch.size") private Integer fetchSize; + // HDFS path to read from when performing an export @StoredAsProperty("export.source.dir") private String exportDir; @@ -427,6 +431,11 @@ public void loadProperties(Properties props) { getLongProperty(props, propName, f.getLong(this))); } else if (typ.equals(String.class)) { f.set(this, props.getProperty(propName, (String) f.get(this))); + } else if (typ.equals(Integer.class)) { + String value = props.getProperty( + propName, + f.get(this) == null ? "null" : f.get(this).toString()); + f.set(this, value.equals("null") ? null : new Integer(value)); } else if (typ.isEnum()) { f.set(this, Enum.valueOf(typ, props.getProperty(propName, f.get(this).toString()))); @@ -502,6 +511,11 @@ public Properties writeProperties() { putProperty(props, propName, Long.toString(f.getLong(this))); } else if (typ.equals(String.class)) { putProperty(props, propName, (String) f.get(this)); + } else if (typ.equals(Integer.class)) { + putProperty( + props, + propName, + f.get(this) == null ? "null" : f.get(this).toString()); } else if (typ.isEnum()) { putProperty(props, propName, f.get(this).toString()); } else { @@ -675,6 +689,11 @@ private void initDefaults(Configuration baseConfiguration) { this.maxInlineLobSize = LargeObjectLoader.DEFAULT_MAX_LOB_LENGTH; + // Don't set a default value for fetchsize. This allows a JDBCManager to + // provide a database-specific default, if no value is provided by the + // user. + this.fetchSize = null; + if (null == baseConfiguration) { this.conf = new Configuration(); } else { @@ -1327,6 +1346,14 @@ public void setInlineLobLimit(long limit) { this.maxInlineLobSize = limit; } + public Integer getFetchSize() { + return this.fetchSize; + } + + public void setFetchSize(Integer size) { + this.fetchSize = size; + } + /** * @return true if the delimiters have been explicitly set by the user. */ diff --git a/src/java/com/cloudera/sqoop/manager/DirectMySQLManager.java b/src/java/com/cloudera/sqoop/manager/DirectMySQLManager.java index 231ec9dd..7210f88e 100644 --- a/src/java/com/cloudera/sqoop/manager/DirectMySQLManager.java +++ b/src/java/com/cloudera/sqoop/manager/DirectMySQLManager.java @@ -38,7 +38,7 @@ public class DirectMySQLManager extends MySQLManager { DirectMySQLManager.class.getName()); public DirectMySQLManager(final SqoopOptions options) { - super(options, false); + super(options); } /** diff --git a/src/java/com/cloudera/sqoop/manager/MySQLManager.java b/src/java/com/cloudera/sqoop/manager/MySQLManager.java index c172dddd..da2cc48e 100644 --- a/src/java/com/cloudera/sqoop/manager/MySQLManager.java +++ b/src/java/com/cloudera/sqoop/manager/MySQLManager.java @@ -22,9 +22,7 @@ import java.io.PrintWriter; import java.net.URI; import java.net.URISyntaxException; -import java.sql.PreparedStatement; import java.sql.ResultSet; -import java.sql.Statement; import java.sql.SQLException; import java.util.ArrayList; @@ -48,15 +46,24 @@ public class MySQLManager extends GenericJdbcManager { // set to true after we warn the user that we can use direct fastpath. private static boolean warningPrinted = false; - private Statement lastStatement; - public MySQLManager(final SqoopOptions opts) { super(DRIVER_CLASS, opts); } - protected MySQLManager(final SqoopOptions opts, boolean ignored) { - // constructor used by subclasses to avoid the --direct warning. - super(DRIVER_CLASS, opts); + @Override + protected void initOptionDefaults() { + if (options.getFetchSize() == null) { + LOG.info("Preparing to use a MySQL streaming resultset."); + options.setFetchSize(Integer.MIN_VALUE); + } else if ( + !options.getFetchSize().equals(Integer.MIN_VALUE) + && !options.getFetchSize().equals(0)) { + LOG.info("Argument '--fetch-size " + options.getFetchSize() + + "' will probably get ignored by MySQL JDBC driver."); + // see also + // http://dev.mysql.com/doc/refman/5.5/en + // /connector-j-reference-implementation-notes.html + } } @Override @@ -182,36 +189,6 @@ private void checkDateTimeBehavior(ImportJobContext context) { } } - /** - * Executes an arbitrary SQL statement. Sets mysql-specific parameter - * to ensure the entire table is not buffered in RAM before reading - * any rows. A consequence of this is that every ResultSet returned - * by this method *MUST* be close()'d, or read to exhaustion before - * another query can be executed from this ConnManager instance. - * - * @param stmt The SQL statement to execute - * @return A ResultSet encapsulating the results or null on error - */ - protected ResultSet execute(String stmt, Object... args) - throws SQLException { - // Free any previous resources. - release(); - - PreparedStatement statement = null; - statement = this.getConnection().prepareStatement(stmt, - ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); - this.lastStatement = statement; - statement.setFetchSize(Integer.MIN_VALUE); // MySQL: read row-at-a-time. - if (null != args) { - for (int i = 0; i < args.length; i++) { - statement.setObject(i + 1, args[i]); - } - } - - LOG.info("Executing SQL statement: " + stmt); - return statement.executeQuery(); - } - @Override public void execAndPrint(String s) { // Override default execAndPrint() with a special version that forces @@ -221,9 +198,8 @@ public void execAndPrint(String s) { ResultSet results = null; try { - // Use default execute() statement which does not issue the - // MySQL-specific setFetchSize() command. - results = super.execute(s); + // Explicitly setting fetchSize to zero disables streaming. + results = super.execute(s, 0); } catch (SQLException sqlE) { LOG.error("Error executing statement: " + StringUtils.stringifyException(sqlE)); @@ -239,18 +215,6 @@ public void execAndPrint(String s) { } } - public void release() { - if (null != this.lastStatement) { - try { - this.lastStatement.close(); - } catch (SQLException e) { - LOG.warn("Exception closing executed Statement: " + e); - } - - this.lastStatement = null; - } - } - /** * When using a column name in a generated SQL query, how (if at all) * should we escape that column name? e.g., a column named "table" diff --git a/src/java/com/cloudera/sqoop/manager/PostgresqlManager.java b/src/java/com/cloudera/sqoop/manager/PostgresqlManager.java index 3526edf1..06a3088d 100644 --- a/src/java/com/cloudera/sqoop/manager/PostgresqlManager.java +++ b/src/java/com/cloudera/sqoop/manager/PostgresqlManager.java @@ -19,9 +19,6 @@ package com.cloudera.sqoop.manager; import java.io.IOException; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.Statement; import java.sql.SQLException; import org.apache.commons.logging.Log; @@ -41,14 +38,9 @@ public class PostgresqlManager extends GenericJdbcManager { // driver class to ensure is loaded when making db connection. private static final String DRIVER_CLASS = "org.postgresql.Driver"; - // Fetch 50 rows at a time. - private static final int POSTGRESQL_FETCH_SIZE = 50; - // set to true after we warn the user that we can use direct fastpath. private static boolean warningPrinted = false; - private Statement lastStatement; - public PostgresqlManager(final SqoopOptions opts) { super(DRIVER_CLASS, opts); } @@ -98,47 +90,6 @@ public String getPrimaryKey(String tableName) { return super.getPrimaryKey(tableName.toLowerCase()); } - /** - * Executes an arbitrary SQL statement. Sets the cursor fetch size - * to ensure the entire table is not buffered in RAM before reading - * any rows. A consequence of this is that every ResultSet returned - * by this method *MUST* be close()'d, or read to exhaustion before - * another query can be executed from this ConnManager instance. - * - * @param stmt The SQL statement to execute - * @return A ResultSet encapsulating the results or null on error - */ - protected ResultSet execute(String stmt, Object... args) throws SQLException { - // Free any previous resources used. - release(); - - PreparedStatement statement = null; - statement = this.getConnection().prepareStatement(stmt, - ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); - this.lastStatement = statement; - statement.setFetchSize(POSTGRESQL_FETCH_SIZE); - if (null != args) { - for (int i = 0; i < args.length; i++) { - statement.setObject(i + 1, args[i]); - } - } - - LOG.info("Executing SQL statement: " + stmt); - return statement.executeQuery(); - } - - public void release() { - if (null != this.lastStatement) { - try { - this.lastStatement.close(); - } catch (SQLException e) { - LOG.warn("Exception closing executed Statement: " + e); - } - - this.lastStatement = null; - } - } - @Override public boolean supportsStagingForExport() { return true; diff --git a/src/java/com/cloudera/sqoop/manager/SqlManager.java b/src/java/com/cloudera/sqoop/manager/SqlManager.java index c59ee617..655c2d6e 100644 --- a/src/java/com/cloudera/sqoop/manager/SqlManager.java +++ b/src/java/com/cloudera/sqoop/manager/SqlManager.java @@ -69,6 +69,8 @@ public abstract class SqlManager extends ConnManager { public static final String SUBSTITUTE_TOKEN = DataDrivenDBInputFormat.SUBSTITUTE_TOKEN; + protected static final int DEFAULT_FETCH_SIZE = 1000; + protected SqoopOptions options; private Statement lastStatement; @@ -78,6 +80,18 @@ public abstract class SqlManager extends ConnManager { */ public SqlManager(final SqoopOptions opts) { this.options = opts; + initOptionDefaults(); + } + + /** + * Sets default values for values that were not provided by the user. + * Only options with database-specific defaults should be configured here. + */ + protected void initOptionDefaults() { + if (options.getFetchSize() == null) { + LOG.info("Using default fetchSize of " + DEFAULT_FETCH_SIZE); + options.setFetchSize(DEFAULT_FETCH_SIZE); + } } /** @@ -422,15 +436,21 @@ public void importQuery(ImportJobContext context) /** * Executes an arbitrary SQL statement. * @param stmt The SQL statement to execute + * @param fetchSize Overrides default or parameterized fetch size * @return A ResultSet encapsulating the results or null on error */ - protected ResultSet execute(String stmt, Object... args) throws SQLException { + protected ResultSet execute(String stmt, Integer fetchSize, Object... args) + throws SQLException { // Release any previously-open statement. release(); PreparedStatement statement = null; statement = this.getConnection().prepareStatement(stmt, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + if (fetchSize != null) { + LOG.debug("Using fetchSize for next query: " + fetchSize); + statement.setFetchSize(fetchSize); + } this.lastStatement = statement; if (null != args) { for (int i = 0; i < args.length; i++) { @@ -442,6 +462,15 @@ protected ResultSet execute(String stmt, Object... args) throws SQLException { return statement.executeQuery(); } + /** + * Executes an arbitrary SQL Statement. + * @param stmt The SQL statement to execute + * @return A ResultSet encapsulating the results or null on error + */ + protected ResultSet execute(String stmt, Object... args) throws SQLException { + return execute(stmt, options.getFetchSize(), args); + } + /** * Resolve a database-specific type to the Java type that should contain it. * @param sqlType diff --git a/src/java/com/cloudera/sqoop/mapreduce/DataDrivenImportJob.java b/src/java/com/cloudera/sqoop/mapreduce/DataDrivenImportJob.java index 94714cd5..49e4a3d8 100644 --- a/src/java/com/cloudera/sqoop/mapreduce/DataDrivenImportJob.java +++ b/src/java/com/cloudera/sqoop/mapreduce/DataDrivenImportJob.java @@ -106,11 +106,12 @@ protected void configureInputFormat(Job job, String tableName, String username = options.getUsername(); if (null == username || username.length() == 0) { DBConfiguration.configureDB(job.getConfiguration(), - mgr.getDriverClass(), options.getConnectString()); + mgr.getDriverClass(), options.getConnectString(), + options.getFetchSize()); } else { DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(), options.getConnectString(), - username, options.getPassword()); + username, options.getPassword(), options.getFetchSize()); } if (null != tableName) { diff --git a/src/java/com/cloudera/sqoop/mapreduce/db/DBConfiguration.java b/src/java/com/cloudera/sqoop/mapreduce/db/DBConfiguration.java index 7fcfd5bc..55715a2d 100644 --- a/src/java/com/cloudera/sqoop/mapreduce/db/DBConfiguration.java +++ b/src/java/com/cloudera/sqoop/mapreduce/db/DBConfiguration.java @@ -56,6 +56,9 @@ public class DBConfiguration { /** Password to access the database. */ public static final String PASSWORD_PROPERTY = "mapreduce.jdbc.password"; + /** Fetch size. */ + public static final String FETCH_SIZE = "mapreduce.jdbc.fetchsize"; + /** Input table name. */ public static final String INPUT_TABLE_NAME_PROPERTY = "mapreduce.jdbc.input.table.name"; @@ -103,12 +106,13 @@ public class DBConfiguration { * Sets the DB access related fields in the {@link Configuration}. * @param conf the configuration * @param driverClass JDBC Driver class name - * @param dbUrl JDBC DB access URL. + * @param dbUrl JDBC DB access URL * @param userName DB access username * @param passwd DB access passwd + * @param fetchSize DB fetch size */ public static void configureDB(Configuration conf, String driverClass, - String dbUrl, String userName, String passwd) { + String dbUrl, String userName, String passwd, Integer fetchSize) { conf.set(DRIVER_CLASS_PROPERTY, driverClass); conf.set(URL_PROPERTY, dbUrl); @@ -118,6 +122,34 @@ public static void configureDB(Configuration conf, String driverClass, if (passwd != null) { conf.set(PASSWORD_PROPERTY, passwd); } + if (fetchSize != null) { + conf.setInt(FETCH_SIZE, fetchSize); + } + } + + /** + * Sets the DB access related fields in the JobConf. + * @param job the job + * @param driverClass JDBC Driver class name + * @param dbUrl JDBC DB access URL + * @param fetchSize DB fetch size + */ + public static void configureDB(Configuration job, String driverClass, + String dbUrl, Integer fetchSize) { + configureDB(job, driverClass, dbUrl, null, null, fetchSize); + } + + /** + * Sets the DB access related fields in the {@link Configuration}. + * @param conf the configuration + * @param driverClass JDBC Driver class name + * @param dbUrl JDBC DB access URL + * @param userName DB access username + * @param passwd DB access passwd + */ + public static void configureDB(Configuration conf, String driverClass, + String dbUrl, String userName, String passwd) { + configureDB(conf, driverClass, dbUrl, userName, passwd, null); } /** @@ -128,7 +160,7 @@ public static void configureDB(Configuration conf, String driverClass, */ public static void configureDB(Configuration job, String driverClass, String dbUrl) { - configureDB(job, driverClass, dbUrl, null, null); + configureDB(job, driverClass, dbUrl, null); } private Configuration conf; @@ -160,6 +192,20 @@ public Configuration getConf() { return conf; } + public Integer getFetchSize() { + if (conf.get(DBConfiguration.FETCH_SIZE) == null) { + return null; + } + return conf.getInt(DBConfiguration.FETCH_SIZE, 0); + } + + public void setFetchSize(Integer fetchSize) { + if (fetchSize != null) { + conf.setInt(DBConfiguration.FETCH_SIZE, fetchSize); + } else { + conf.set(FETCH_SIZE, null); + } + } public String getInputTableName() { return conf.get(DBConfiguration.INPUT_TABLE_NAME_PROPERTY); } diff --git a/src/java/com/cloudera/sqoop/mapreduce/db/DBInputFormat.java b/src/java/com/cloudera/sqoop/mapreduce/db/DBInputFormat.java index e70257b6..9c8dc8ec 100644 --- a/src/java/com/cloudera/sqoop/mapreduce/db/DBInputFormat.java +++ b/src/java/com/cloudera/sqoop/mapreduce/db/DBInputFormat.java @@ -208,11 +208,6 @@ protected RecordReader createDBRecordReader( return new OracleDBRecordReader(split, inputClass, conf, getConnection(), getDBConf(), conditions, fieldNames, tableName); - } else if (dbProductName.startsWith("MYSQL")) { - // use MySQL-specific db reader. - return new MySQLDBRecordReader(split, inputClass, - conf, getConnection(), getDBConf(), conditions, fieldNames, - tableName); } else { // Generic reader. return new DBRecordReader(split, inputClass, diff --git a/src/java/com/cloudera/sqoop/mapreduce/db/DBRecordReader.java b/src/java/com/cloudera/sqoop/mapreduce/db/DBRecordReader.java index 0819c5df..d7cd7ef8 100644 --- a/src/java/com/cloudera/sqoop/mapreduce/db/DBRecordReader.java +++ b/src/java/com/cloudera/sqoop/mapreduce/db/DBRecordReader.java @@ -37,8 +37,8 @@ /** * A RecordReader that reads records from a SQL table. - * Emits LongWritables containing the record number as - * key and DBWritables as value. + * Emits LongWritables containing the record number as + * key and DBWritables as value. */ public class DBRecordReader extends RecordReader { @@ -54,9 +54,9 @@ public class DBRecordReader extends private DBInputFormat.DBInputSplit split; private long pos = 0; - + private LongWritable key = null; - + private T value = null; private Connection connection; @@ -73,11 +73,11 @@ public class DBRecordReader extends /** * @param split The InputSplit to read data for - * @throws SQLException + * @throws SQLException */ // CHECKSTYLE:OFF // TODO (aaron): Refactor constructor to take fewer arguments - public DBRecordReader(DBInputFormat.DBInputSplit split, + public DBRecordReader(DBInputFormat.DBInputSplit split, Class inputClass, Configuration conf, Connection conn, DBConfiguration dbConfig, String cond, String [] fields, String table) throws SQLException { @@ -97,10 +97,18 @@ public DBRecordReader(DBInputFormat.DBInputSplit split, protected ResultSet executeQuery(String query) throws SQLException { this.statement = connection.prepareStatement(query, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + + Integer fetchSize = dbConf.getFetchSize(); + if (fetchSize != null) { + LOG.debug("Using fetchSize for next query: " + fetchSize); + statement.setFetchSize(fetchSize); + } + + LOG.debug("Executing query: " + query); return statement.executeQuery(); } - /** Returns the query for selecting the records, + /** Returns the query for selecting the records, * subclasses can override this for custom behaviour.*/ protected String getSelectQuery() { StringBuilder query = new StringBuilder(); @@ -109,7 +117,7 @@ protected String getSelectQuery() { // Relies on LIMIT/OFFSET for splits. if(dbConf.getInputQuery() == null) { query.append("SELECT "); - + for (int i = 0; i < fieldNames.length; i++) { query.append(fieldNames[i]); if (i != fieldNames.length -1) { @@ -131,7 +139,7 @@ protected String getSelectQuery() { //PREBUILT QUERY query.append(dbConf.getInputQuery()); } - + try { query.append(" LIMIT ").append(split.getLength()); query.append(" OFFSET ").append(split.getStart()); @@ -161,7 +169,7 @@ public void close() throws IOException { } } - public void initialize(InputSplit inputSplit, TaskAttemptContext context) + public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException { //do nothing } @@ -169,7 +177,7 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext context) @Override /** {@inheritDoc} */ public LongWritable getCurrentKey() { - return key; + return key; } @Override @@ -179,7 +187,7 @@ public T getCurrentValue() { } /** - * @deprecated + * @deprecated */ @Deprecated public T createValue() { @@ -187,7 +195,7 @@ public T createValue() { } /** - * @deprecated + * @deprecated */ @Deprecated public long getPos() throws IOException { diff --git a/src/java/com/cloudera/sqoop/mapreduce/db/DataDrivenDBInputFormat.java b/src/java/com/cloudera/sqoop/mapreduce/db/DataDrivenDBInputFormat.java index 6708ceed..06fb3b74 100644 --- a/src/java/com/cloudera/sqoop/mapreduce/db/DataDrivenDBInputFormat.java +++ b/src/java/com/cloudera/sqoop/mapreduce/db/DataDrivenDBInputFormat.java @@ -288,19 +288,10 @@ protected RecordReader createDBRecordReader( LOG.debug("Creating db record reader for db product: " + dbProductName); try { - // use database product name to determine appropriate record reader. - if (dbProductName.startsWith("MYSQL")) { - // use MySQL-specific db reader. - return new MySQLDataDrivenDBRecordReader(split, inputClass, - conf, getConnection(), dbConf, dbConf.getInputConditions(), - dbConf.getInputFieldNames(), dbConf.getInputTableName()); - } else { - // Generic reader. - return new DataDrivenDBRecordReader(split, inputClass, - conf, getConnection(), dbConf, dbConf.getInputConditions(), - dbConf.getInputFieldNames(), dbConf.getInputTableName(), - dbProductName); - } + return new DataDrivenDBRecordReader(split, inputClass, + conf, getConnection(), dbConf, dbConf.getInputConditions(), + dbConf.getInputFieldNames(), dbConf.getInputTableName(), + dbProductName); } catch (SQLException ex) { throw new IOException(ex.getMessage()); } diff --git a/src/java/com/cloudera/sqoop/mapreduce/db/MySQLDBRecordReader.java b/src/java/com/cloudera/sqoop/mapreduce/db/MySQLDBRecordReader.java deleted file mode 100644 index 155d1bb2..00000000 --- a/src/java/com/cloudera/sqoop/mapreduce/db/MySQLDBRecordReader.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to Cloudera, Inc. under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Cloudera, Inc. licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.cloudera.sqoop.mapreduce.db; - -import java.sql.Connection; -import java.sql.ResultSet; -import java.sql.SQLException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.lib.db.DBWritable; - -/** - * A RecordReader that reads records from a MySQL table. - */ -public class MySQLDBRecordReader - extends DBRecordReader { - - // CHECKSTYLE:OFF - // TODO(aaron) - refactor DBRecordReader c'tor to use fewer arguments. - public MySQLDBRecordReader(DBInputFormat.DBInputSplit split, - Class inputClass, Configuration conf, Connection conn, - DBConfiguration dbConfig, String cond, String [] fields, String table) - throws SQLException { - super(split, inputClass, conf, conn, dbConfig, cond, fields, table); - } - // CHECKSTYLE:ON - - // Execute statements for mysql in unbuffered mode. - protected ResultSet executeQuery(String query) throws SQLException { - statement = getConnection().prepareStatement(query, - ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); - statement.setFetchSize(Integer.MIN_VALUE); // MySQL: read row-at-a-time. - return statement.executeQuery(); - } -} diff --git a/src/java/com/cloudera/sqoop/mapreduce/db/MySQLDataDrivenDBRecordReader.java b/src/java/com/cloudera/sqoop/mapreduce/db/MySQLDataDrivenDBRecordReader.java deleted file mode 100644 index e2fb70b0..00000000 --- a/src/java/com/cloudera/sqoop/mapreduce/db/MySQLDataDrivenDBRecordReader.java +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Licensed to Cloudera, Inc. under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Cloudera, Inc. licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.cloudera.sqoop.mapreduce.db; - -import java.sql.Connection; -import java.sql.ResultSet; -import java.sql.SQLException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.lib.db.DBWritable; - -/** - * A RecordReader that reads records from a MySQL table via - * DataDrivenDBRecordReader. - */ -public class MySQLDataDrivenDBRecordReader - extends DataDrivenDBRecordReader { - - //CHECKSTYLE:OFF - public MySQLDataDrivenDBRecordReader(DBInputFormat.DBInputSplit split, - Class inputClass, Configuration conf, Connection conn, - DBConfiguration dbConfig, String cond, String [] fields, String table) - throws SQLException { - super(split, inputClass, conf, conn, dbConfig, cond, fields, table, - "MYSQL"); - } - //CHECKSTYLE:ON - - // Execute statements for mysql in unbuffered mode. - protected ResultSet executeQuery(String query) throws SQLException { - statement = getConnection().prepareStatement(query, - ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); - statement.setFetchSize(Integer.MIN_VALUE); // MySQL: read row-at-a-time. - return statement.executeQuery(); - } -} diff --git a/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java b/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java index 22167904..81d90a26 100644 --- a/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java +++ b/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java @@ -95,6 +95,7 @@ public abstract class BaseSqoopTool extends SqoopTool { public static final String COMPRESS_SHORT_ARG = "z"; public static final String DIRECT_SPLIT_SIZE_ARG = "direct-split-size"; public static final String INLINE_LOB_LIMIT_ARG = "inline-lob-limit"; + public static final String FETCH_SIZE_ARG = "fetch-size"; public static final String EXPORT_PATH_ARG = "export-dir"; public static final String FIELDS_TERMINATED_BY_ARG = "fields-terminated-by"; public static final String LINES_TERMINATED_BY_ARG = "lines-terminated-by"; diff --git a/src/java/com/cloudera/sqoop/tool/ImportTool.java b/src/java/com/cloudera/sqoop/tool/ImportTool.java index 5a64bbec..d51238f5 100644 --- a/src/java/com/cloudera/sqoop/tool/ImportTool.java +++ b/src/java/com/cloudera/sqoop/tool/ImportTool.java @@ -97,7 +97,7 @@ private boolean isIncremental(SqoopOptions options) { return !options.getIncrementalMode().equals( SqoopOptions.IncrementalMode.None); } - + /** * If this is an incremental import, then we should save the * user's state back to the metastore (if this job was run @@ -339,19 +339,19 @@ protected boolean importTable(SqoopOptions options, String tableName, // Do the actual import. ImportJobContext context = new ImportJobContext(tableName, jarFile, options, getOutputPath(options, tableName)); - + // If we're doing an incremental import, set up the // filtering conditions used to get the latest records. if (!initIncrementalConstraints(options, context)) { return false; } - + if (null != tableName) { manager.importTable(context); } else { manager.importQuery(context); } - + if (options.isAppendMode()) { AppendUtils app = new AppendUtils(context); app.append(); @@ -366,8 +366,8 @@ protected boolean importTable(SqoopOptions options, String tableName, return true; } - - /** + + /** * @return the output path for the imported files; * in append mode this will point to a temporary folder. * if importing to hbase, this may return null. @@ -382,7 +382,7 @@ private Path getOutputPath(SqoopOptions options, String tableName) { outputPath = AppendUtils.getTempAppendDir(tableName); LOG.debug("Using temporary folder: " + outputPath.getName()); } else { - // Try in this order: target-dir or warehouse-dir + // Try in this order: target-dir or warehouse-dir if (hdfsTargetDir != null) { outputPath = new Path(hdfsTargetDir); } else if (hdfsWarehouseDir != null) { @@ -392,9 +392,9 @@ private Path getOutputPath(SqoopOptions options, String tableName) { } } - return outputPath; + return outputPath; } - + @Override /** {@inheritDoc} */ public int run(SqoopOptions options) { @@ -479,11 +479,11 @@ protected RelatedOptions getImportOptions() { importOpts.addOption(OptionBuilder .withDescription("Imports data in append mode") .withLongOpt(APPEND_ARG) - .create()); + .create()); importOpts.addOption(OptionBuilder.withArgName("dir") .hasArg().withDescription("HDFS plain table destination") .withLongOpt(TARGET_DIR_ARG) - .create()); + .create()); importOpts.addOption(OptionBuilder.withArgName("statement") .hasArg() .withDescription("Import results of SQL 'statement'") @@ -522,6 +522,12 @@ protected RelatedOptions getImportOptions() { .withDescription("Set the maximum size for an inline LOB") .withLongOpt(INLINE_LOB_LIMIT_ARG) .create()); + importOpts.addOption(OptionBuilder.withArgName("n") + .hasArg() + .withDescription("Set number 'n' of rows to fetch from the " + + "database when more rows are needed") + .withLongOpt(FETCH_SIZE_ARG) + .create()); return importOpts; } @@ -590,7 +596,7 @@ public void printHelp(ToolOptions toolOptions) { } else { System.out.println( "At minimum, you must specify --connect and --table"); - } + } System.out.println( "Arguments to mysqldump and other subprograms may be supplied"); @@ -656,7 +662,7 @@ public void applyOptions(CommandLine in, SqoopOptions out) if (in.hasOption(TARGET_DIR_ARG)) { out.setTargetDir(in.getOptionValue(TARGET_DIR_ARG)); } - + if (in.hasOption(APPEND_ARG)) { out.setAppendMode(true); } @@ -696,6 +702,10 @@ public void applyOptions(CommandLine in, SqoopOptions out) INLINE_LOB_LIMIT_ARG))); } + if (in.hasOption(FETCH_SIZE_ARG)) { + out.setFetchSize(new Integer(in.getOptionValue(FETCH_SIZE_ARG))); + } + if (in.hasOption(JAR_FILE_NAME_ARG)) { out.setExistingJarName(in.getOptionValue(JAR_FILE_NAME_ARG)); } diff --git a/src/test/com/cloudera/sqoop/TestSqoopOptions.java b/src/test/com/cloudera/sqoop/TestSqoopOptions.java index aad4308c..0546094b 100644 --- a/src/test/com/cloudera/sqoop/TestSqoopOptions.java +++ b/src/test/com/cloudera/sqoop/TestSqoopOptions.java @@ -157,7 +157,7 @@ private SqoopOptions parse(String [] argv) throws Exception { return importTool.parseArguments(argv, null, null, false); } - // test that setting output delimiters also sets input delimiters + // test that setting output delimiters also sets input delimiters public void testDelimitersInherit() throws Exception { String [] args = { "--fields-terminated-by", @@ -236,7 +236,7 @@ public void testGoodNumMappers() throws Exception { assertEquals(4, opts.getNumMappers()); } - public void testPropertySerialization() { + public void testPropertySerialization1() { // Test that if we write a SqoopOptions out to a Properties, // and then read it back in, we get all the same results. SqoopOptions out = new SqoopOptions(); @@ -251,6 +251,7 @@ public void testPropertySerialization() { out.setSqlQuery("the query"); out.setPackageName("a.package"); out.setHiveImport(true); + out.setFetchSize(null); Properties outProps = out.writeProperties(); @@ -261,4 +262,32 @@ public void testPropertySerialization() { assertEquals("properties don't match", outProps, inProps); } + + public void testPropertySerialization2() { + // Test that if we write a SqoopOptions out to a Properties, + // and then read it back in, we get all the same results. + SqoopOptions out = new SqoopOptions(); + out.setUsername("user"); + out.setConnectString("bla"); + out.setNumMappers(4); + out.setAppendMode(true); + out.setHBaseTable("hbasetable"); + out.setWarehouseDir("Warehouse"); + out.setClassName("someclass"); + out.setSplitByCol("somecol"); + out.setSqlQuery("the query"); + out.setPackageName("a.package"); + out.setHiveImport(true); + out.setFetchSize(42); + + Properties outProps = out.writeProperties(); + + SqoopOptions in = new SqoopOptions(); + in.loadProperties(outProps); + + Properties inProps = in.writeProperties(); + + assertEquals("properties don't match", outProps, inProps); + } + }