5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-06 20:02:04 +08:00

SQOOP-390. PostgreSQL connector for direct export with pg_bulkload.

(Masatake Iwasaki via Jarek Jarcec Cecho)


git-svn-id: https://svn.apache.org/repos/asf/sqoop/trunk@1374923 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jarek Jarcec Cecho 2012-08-20 06:21:34 +00:00
parent feddf1d6af
commit 29b29a52f3
13 changed files with 1363 additions and 142 deletions

View File

@ -72,8 +72,9 @@ include::version.txt[]
include::compatibility.txt[]
include::connectors.txt[]
include::support.txt[]
include::troubleshooting.txt[]

View File

@ -0,0 +1,138 @@
////
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
////
Notes for specific connectors
-----------------------------
pg_bulkload connector
~~~~~~~~~~~~~~~~~~~~~
Purpose
^^^^^^^
pg_bulkload connector is a direct connector for exporting data into PostgreSQL.
This connector uses
http://pgbulkload.projects.postgresql.org/index.html[pg_bulkload].
Users benefit from functionality of pg_bulkload such as
fast exports bypassing shared bufferes and WAL,
flexible error records handling,
and ETL feature with filter functions.
Requirements
^^^^^^^^^^^^
pg_bulkload connector requires following conditions for export job execution:
* The link:http://pgbulkload.projects.postgresql.org/index.html[pg_bulkload]
must be installed on DB server and all slave nodes.
RPM for RedHat or CentOS is available in then
link:http://pgfoundry.org/frs/?group_id=1000261[download page].
* The link:http://jdbc.postgresql.org/index.html[PostgreSQL JDBC]
is required on client node.
* Superuser role of PostgreSQL database is required for execution of pg_bulkload.
Syntax
^^^^^^
Use +--connection-manager+ option to specify connection manager classname.
----
$ sqoop export (generic-args) --connection-manager org.apache.sqoop.manager.PGBulkloadManager (export-args)
$ sqoop-export (generic-args) --connection-manager org.apache.sqoop.manager.PGBulkloadManager (export-args)
----
This connector supports export arguments shown below.
.Supported export control arguments:
[grid="all"]
`----------------------------------------`---------------------------------------
Argument Description
---------------------------------------------------------------------------------
+\--export-dir <dir>+ HDFS source path for the export
+-m,\--num-mappers <n>+ Use 'n' map tasks to export in\
parallel
+\--table <table-name>+ Table to populate
+\--input-null-string <null-string>+ The string to be interpreted as\
null for string columns
+\--clear-staging-table+ Indicates that any data present in\
the staging table can be deleted.
---------------------------------------------------------------------------------
There are additional configuration for pg_bulkload execution
specified via Hadoop Configuration properties
which can be given with +-D <property=value>+ option.
Because Hadoop Configuration properties are generic arguments of the sqoop,
it must preceed any export control arguments.
.Supported export control properties:
[grid="all"]
`-----------------------------`----------------------------------------------
Property Description
-----------------------------------------------------------------------------
mapred.reduce.tasks Number of reduce tasks for staging. \
The defalt value is 1. \
Each tasks do staging in a single transaction.
pgbulkload.bin Path of the pg_bulkoad binary \
installed on each slave nodes.
pgbulkload.check.constraints Specify whether CHECK constraints are checked \
during the loading. \
The default value is YES.
pgbulkload.parse.errors The maximum mumber of ingored records \
that cause errors during parsing, \
encoding, filtering, constraints checking, \
and data type conversion. \
Error records are recorded \
in the PARSE BADFILE. \
The default value is INFINITE.
pgbulkload.duplicate.errors Number of ingored records \
that violate unique constraints. \
Duplicated records are recorded in the \
DUPLICATE BADFILE on DB server. \
The default value is INFINITE.
pgbulkload.filter Specify the filter function \
to convert each row in the input file. \
See the pg_bulkload documentation to know \
how to write FILTER functions.
-----------------------------------------------------------------------------
Here is a example of complete command line.
----
$ sqoop export \
-Dmapred.reduce.tasks=2
-Dpgbulkload.bin="/usr/local/bin/pg_bulkload" \
-Dpgbulkload.input.field.delim=$'\t' \
-Dpgbulkload.check.constraints="YES" \
-Dpgbulkload.parse.errors="INFINITE" \
-Dpgbulkload.duplicate.errors="INFINITE" \
--connect jdbc:postgresql://pgsql.example.net:5432/sqooptest \
--connection-manager org.apache.sqoop.manager.PGBulkloadManager \
--table test --username sqooptest --export-dir=/test -m 2
----
Data Staging
^^^^^^^^^^^^
Each map tasks of pg_bulkload connector's export job create
their own staging table on the fly.
The Name of staging tables is decided based on the destination table
and the task attempt ids.
For example, the name of staging table for the "test" table is like
+test_attempt_1345021837431_0001_m_000000_0+ .
Staging tables are automatically dropped if tasks successfully complete
or map tasks fail.
When reduce task fails,
staging table for the task are left for manual retry and
users must take care of it.

View File

@ -32,6 +32,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sqoop.util.PostgreSQLUtils;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.io.SplittableBufferedWriter;
@ -293,26 +294,6 @@ private String writeCopyCommand(String command) throws IOException {
return tempFile.toString();
}
/** Write the user's password to a file that is chmod 0600.
@return the filename.
*/
private String writePasswordFile(String password) throws IOException {
String tmpDir = options.getTempDir();
File tempFile = File.createTempFile("pgpass", ".pgpass", new File(tmpDir));
LOG.debug("Writing password to tempfile: " + tempFile);
// Make sure it's only readable by the current user.
DirectImportUtils.setFilePermissions(tempFile, "0600");
// Actually write the password data into the file.
BufferedWriter w = new BufferedWriter(
new OutputStreamWriter(new FileOutputStream(tempFile)));
w.write("*:*:*:*:" + password);
w.close();
return tempFile.toString();
}
// TODO(aaron): Refactor this method to be much shorter.
// CHECKSTYLE:OFF
@Override
@ -380,7 +361,8 @@ public void importTable(com.cloudera.sqoop.manager.ImportJobContext context)
args.add(username);
String password = options.getPassword();
if (null != password) {
passwordFilename = writePasswordFile(password);
passwordFilename =
PostgreSQLUtils.writePasswordFile(options.getTempDir(), password);
// Need to send PGPASSFILE environment variable specifying
// location of our postgres file.
envp.add("PGPASSFILE=" + passwordFilename);

View File

@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.manager;
import java.io.IOException;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.manager.ExportJobContext;
import com.cloudera.sqoop.util.ExportException;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sqoop.mapreduce.ExportInputFormat;
import org.apache.sqoop.mapreduce.PGBulkloadExportJob;
/**
* Manages connections to Postgresql databases.
*/
public class PGBulkloadManager extends PostgresqlManager {
public static final Log LOG =
LogFactory.getLog(PGBulkloadManager.class.getName());
public PGBulkloadManager(final SqoopOptions opts) {
super(opts, true);
}
@Override
public void exportTable(ExportJobContext context)
throws IOException, ExportException {
context.setConnManager(this);
options.setStagingTableName(null);
PGBulkloadExportJob jobbase =
new PGBulkloadExportJob(context,
null,
ExportInputFormat.class,
NullOutputFormat.class);
jobbase.runExport();
}
@Override
public boolean supportsStagingForExport() {
return false;
}
}

View File

@ -21,7 +21,6 @@
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
/**
* Identity mapper that continuously reports progress via a background thread.
@ -32,25 +31,6 @@ public class AutoProgressMapper<KEYIN, VALIN, KEYOUT, VALOUT>
public static final Log LOG = LogFactory.getLog(
AutoProgressMapper.class.getName());
/**
* Total number of millis for which progress will be reported by the
* auto-progress thread. If this is zero, then the auto-progress thread will
* never voluntarily exit.
*/
private int maxProgressPeriod;
/**
* Number of milliseconds to sleep for between loop iterations. Must be less
* than report interval.
*/
private int sleepInterval;
/**
* Number of milliseconds between calls to Reporter.progress().
* Should be a multiple of the sleepInterval.
*/
private int reportInterval;
public static final String MAX_PROGRESS_PERIOD_KEY =
"sqoop.mapred.auto.progress.max";
public static final String SLEEP_INTERVAL_KEY =
@ -67,112 +47,14 @@ public class AutoProgressMapper<KEYIN, VALIN, KEYOUT, VALOUT>
// Disable max progress, by default.
public static final int DEFAULT_MAX_PROGRESS = 0;
private class ProgressThread extends Thread {
private volatile boolean keepGoing; // While this is true, thread runs.
private Context context;
private long startTimeMillis;
private long lastReportMillis;
public ProgressThread(final Context ctxt) {
this.context = ctxt;
this.keepGoing = true;
}
public void signalShutdown() {
this.keepGoing = false; // volatile update.
this.interrupt();
}
public void run() {
this.lastReportMillis = System.currentTimeMillis();
this.startTimeMillis = this.lastReportMillis;
final long MAX_PROGRESS = AutoProgressMapper.this.maxProgressPeriod;
final long REPORT_INTERVAL = AutoProgressMapper.this.reportInterval;
final long SLEEP_INTERVAL = AutoProgressMapper.this.sleepInterval;
// In a loop:
// * Check that we haven't run for too long (maxProgressPeriod).
// * If it's been a report interval since we last made progress,
// make more.
// * Sleep for a bit.
// * If the parent thread has signaled for exit, do so.
while (this.keepGoing) {
long curTimeMillis = System.currentTimeMillis();
if (MAX_PROGRESS != 0
&& curTimeMillis - this.startTimeMillis > MAX_PROGRESS) {
this.keepGoing = false;
LOG.info("Auto-progress thread exiting after " + MAX_PROGRESS
+ " ms.");
break;
}
if (curTimeMillis - this.lastReportMillis > REPORT_INTERVAL) {
// It's been a full report interval -- claim progress.
LOG.debug("Auto-progress thread reporting progress");
this.context.progress();
this.lastReportMillis = curTimeMillis;
}
// Unless we got an interrupt while we were working,
// sleep a bit before doing more work.
if (!Thread.interrupted()) {
try {
Thread.sleep(SLEEP_INTERVAL);
} catch (InterruptedException ie) {
// we were notified on something; not necessarily an error.
}
}
}
LOG.info("Auto-progress thread is finished. keepGoing=" + this.keepGoing);
}
}
/**
* Set configuration parameters for the auto-progress thread.
*/
private void configureAutoProgress(Configuration job) {
this.maxProgressPeriod = job.getInt(MAX_PROGRESS_PERIOD_KEY,
DEFAULT_MAX_PROGRESS);
this.sleepInterval = job.getInt(SLEEP_INTERVAL_KEY,
DEFAULT_SLEEP_INTERVAL);
this.reportInterval = job.getInt(REPORT_INTERVAL_KEY,
DEFAULT_REPORT_INTERVAL);
if (this.reportInterval < 1) {
LOG.warn("Invalid " + REPORT_INTERVAL_KEY + "; setting to "
+ DEFAULT_REPORT_INTERVAL);
this.reportInterval = DEFAULT_REPORT_INTERVAL;
}
if (this.sleepInterval > this.reportInterval || this.sleepInterval < 1) {
LOG.warn("Invalid " + SLEEP_INTERVAL_KEY + "; setting to "
+ DEFAULT_SLEEP_INTERVAL);
this.sleepInterval = DEFAULT_SLEEP_INTERVAL;
}
if (this.maxProgressPeriod < 0) {
LOG.warn("Invalid " + MAX_PROGRESS_PERIOD_KEY + "; setting to "
+ DEFAULT_MAX_PROGRESS);
this.maxProgressPeriod = DEFAULT_MAX_PROGRESS;
}
}
// map() method intentionally omitted; Mapper.map() is the identity mapper.
/**
* Run the mapping process for this task, wrapped in an auto-progress system.
*/
@Override
public void run(Context context) throws IOException, InterruptedException {
configureAutoProgress(context.getConfiguration());
ProgressThread thread = this.new ProgressThread(context);
ProgressThread thread = new ProgressThread(context, LOG);
try {
thread.setDaemon(true);
@ -191,7 +73,7 @@ public void run(Context context) throws IOException, InterruptedException {
LOG.debug("Progress thread shutdown detected.");
} catch (InterruptedException ie) {
LOG.warn("Interrupted when waiting on auto-progress thread: "
+ ie.toString());
+ ie.toString(), ie);
}
}
}

View File

@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Identity reducer that continuously reports progress via a background thread.
*/
public class AutoProgressReducer<KEYIN, VALIN, KEYOUT, VALOUT>
extends SqoopReducer<KEYIN, VALIN, KEYOUT, VALOUT> {
public static final Log LOG = LogFactory.getLog(
AutoProgressReducer.class.getName());
// reduce() method intentionally omitted;
// Reducer.reduce() is the identity reducer.
/**
* Run the mapping process for this task, wrapped in an auto-progress system.
*/
@Override
public void run(Context context) throws IOException, InterruptedException {
ProgressThread thread = new ProgressThread(context, LOG);
try {
thread.setDaemon(true);
thread.start();
// use default run() method to actually drive the mapping.
super.run(context);
} finally {
// Tell the progress thread to exit..
LOG.debug("Instructing auto-progress thread to quit.");
thread.signalShutdown();
try {
// And wait for that to happen.
LOG.debug("Waiting for progress thread shutdown...");
thread.join();
LOG.debug("Progress thread shutdown detected.");
} catch (InterruptedException ie) {
LOG.warn("Interrupted when waiting on auto-progress thread: "
+ ie.toString(), ie);
}
}
}
}

View File

@ -0,0 +1,207 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce;
import java.io.IOException;
import com.cloudera.sqoop.manager.ExportJobContext;
import com.cloudera.sqoop.util.ExportException;
import com.cloudera.sqoop.SqoopOptions;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.sqoop.config.ConfigurationHelper;
import org.apache.sqoop.lib.DelimiterSet;
import org.apache.sqoop.manager.ConnManager;
import org.apache.sqoop.mapreduce.db.DBConfiguration;
import org.apache.sqoop.orm.TableClassName;
/**
* Class that runs an export job using pg_bulkload in the mapper.
*/
public class PGBulkloadExportJob extends ExportJobBase {
public static final Log LOG =
LogFactory.getLog(PGBulkloadExportJob.class.getName());
public PGBulkloadExportJob(final ExportJobContext context) {
super(context);
}
public PGBulkloadExportJob(final ExportJobContext ctxt,
final Class<? extends Mapper> mapperClass,
final Class<? extends InputFormat> inputFormatClass,
final Class<? extends OutputFormat> outputFormatClass) {
super(ctxt, mapperClass, inputFormatClass, outputFormatClass);
}
@Override
protected void configureInputFormat(Job job, String tableName,
String tableClassName, String splitByCol)
throws ClassNotFoundException, IOException {
super.configureInputFormat(job, tableName, tableClassName, splitByCol);
ConnManager mgr = context.getConnManager();
String username = options.getUsername();
if (null == username || username.length() == 0) {
DBConfiguration.configureDB(job.getConfiguration(),
mgr.getDriverClass(),
options.getConnectString(),
options.getFetchSize());
} else {
DBConfiguration.configureDB(job.getConfiguration(),
mgr.getDriverClass(),
options.getConnectString(),
username, options.getPassword(),
options.getFetchSize());
}
}
@Override
protected Class<? extends Mapper> getMapperClass() {
return PGBulkloadExportMapper.class;
}
protected Class<? extends Reducer> getReducerClass() {
return PGBulkloadExportReducer.class;
}
private void setDelimiter(String prop, char val, Configuration conf) {
switch (val) {
case DelimiterSet.NULL_CHAR:
break;
case '\t':
default:
conf.set(prop, String.valueOf(val));
}
}
@Override
protected void propagateOptionsToJob(Job job) {
super.propagateOptionsToJob(job);
SqoopOptions opts = context.getOptions();
Configuration conf = job.getConfiguration();
conf.setIfUnset("pgbulkload.bin", "pg_bulkload");
if (opts.getNullStringValue() != null) {
conf.set("pgbulkload.null.string", opts.getNullStringValue());
}
setDelimiter("pgbulkload.input.field.delim",
opts.getInputFieldDelim(),
conf);
setDelimiter("pgbulkload.input.record.delim",
opts.getInputRecordDelim(),
conf);
setDelimiter("pgbulkload.input.enclosedby",
opts.getInputEnclosedBy(),
conf);
setDelimiter("pgbulkload.input.escapedby",
opts.getInputEscapedBy(),
conf);
conf.setBoolean("pgbulkload.input.encloserequired",
opts.isInputEncloseRequired());
conf.setIfUnset("pgbulkload.check.constraints", "YES");
conf.setIfUnset("pgbulkload.parse.errors", "INFINITE");
conf.setIfUnset("pgbulkload.duplicate.errors", "INFINITE");
conf.set("mapred.jar", context.getJarFile());
conf.setBoolean("mapred.map.tasks.speculative.execution", false);
conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
conf.setInt("mapred.map.max.attempts", 1);
conf.setInt("mapred.reduce.max.attempts", 1);
conf.setIfUnset("mapred.reduce.tasks", "1");
if (context.getOptions().doClearStagingTable()) {
conf.setBoolean("pgbulkload.clear.staging.table", true);
}
}
@Override
public void runExport() throws ExportException, IOException {
ConnManager cmgr = context.getConnManager();
SqoopOptions options = context.getOptions();
Configuration conf = options.getConf();
DBConfiguration dbConf = null;
String outputTableName = context.getTableName();
String tableName = outputTableName;
String tableClassName =
new TableClassName(options).getClassForTable(outputTableName);
LOG.info("Beginning export of " + outputTableName);
loadJars(conf, context.getJarFile(), tableClassName);
try {
Job job = new Job(conf);
dbConf = new DBConfiguration(job.getConfiguration());
dbConf.setOutputTableName(tableName);
configureInputFormat(job, tableName, tableClassName, null);
configureOutputFormat(job, tableName, tableClassName);
configureNumTasks(job);
propagateOptionsToJob(job);
job.setMapperClass(getMapperClass());
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(getReducerClass());
cacheJars(job, context.getConnManager());
setJob(job);
boolean success = runJob(job);
if (!success) {
throw new ExportException("Export job failed!");
}
} catch (InterruptedException ie) {
throw new IOException(ie);
} catch (ClassNotFoundException cnfe) {
throw new IOException(cnfe);
} finally {
unloadJars();
}
}
@Override
protected int configureNumTasks(Job job) throws IOException {
SqoopOptions options = context.getOptions();
int numMapTasks = options.getNumMappers();
if (numMapTasks < 1) {
numMapTasks = SqoopOptions.DEFAULT_NUM_MAPPERS;
LOG.warn("Invalid mapper count; using " + numMapTasks + " mappers.");
}
ConfigurationHelper.setJobNumMaps(job, numMapTasks);
return numMapTasks;
}
private void clearStagingTable(DBConfiguration dbConf, String tableName)
throws IOException {
// clearing stagingtable is done each mapper tasks
}
}

View File

@ -0,0 +1,290 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.sqoop.lib.SqoopRecord;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.sqoop.mapreduce.db.DBConfiguration;
import org.apache.sqoop.util.PostgreSQLUtils;
import org.apache.sqoop.util.Executor;
import org.apache.sqoop.util.JdbcUrl;
/**
* Mapper that starts a 'pg_bulkload' process and uses that to export rows from
* HDFS to a PostgreSQL database at high speed.
*
* map() methods are actually provided by subclasses that read from
* SequenceFiles (containing existing SqoopRecords) or text files
* (containing delimited lines) and deliver these results to the stream
* used to interface with pg_bulkload.
*/
public class PGBulkloadExportMapper
extends AutoProgressMapper<LongWritable, Writable, LongWritable, Text> {
private Configuration conf;
private DBConfiguration dbConf;
private Process process;
private OutputStream out;
protected BufferedWriter writer;
private Thread thread;
protected String tmpTableName;
private String tableName;
private String passwordFilename;
public PGBulkloadExportMapper() {
}
protected void setup(Context context)
throws IOException, InterruptedException {
super.setup(context);
conf = context.getConfiguration();
dbConf = new DBConfiguration(conf);
tableName = dbConf.getOutputTableName();
tmpTableName = tableName + "_" + context.getTaskAttemptID().toString();
Connection conn = null;
try {
conn = dbConf.getConnection();
conn.setAutoCommit(false);
if (conf.getBoolean("pgbulkload.clear.staging.table", false)) {
StringBuffer query = new StringBuffer();
query.append("DROP TABLE IF EXISTS ");
query.append(tmpTableName);
doExecuteUpdate(query.toString());
}
StringBuffer query = new StringBuffer();
query.append("CREATE TABLE ");
query.append(tmpTableName);
query.append("(LIKE ");
query.append(tableName);
query.append(" INCLUDING CONSTRAINTS)");
if (conf.get("pgbulkload.staging.tablespace") != null) {
query.append("TABLESPACE ");
query.append(conf.get("pgbulkload.staging.tablespace"));
}
doExecuteUpdate(query.toString());
conn.commit();
} catch (ClassNotFoundException ex) {
LOG.error("Unable to load JDBC driver class", ex);
throw new IOException(ex);
} catch (SQLException ex) {
LOG.error("Unable to execute statement", ex);
throw new IOException(ex);
} finally {
try {
conn.close();
} catch (SQLException ex) {
LOG.error("Unable to close connection", ex);
}
}
try {
ArrayList<String> args = new ArrayList<String>();
List<String> envp = Executor.getCurEnvpStrings();
args.add(conf.get("pgbulkload.bin", "pg_bulkload"));
args.add("--username="
+ conf.get(DBConfiguration.USERNAME_PROPERTY));
args.add("--dbname="
+ JdbcUrl.getDatabaseName(conf.get(DBConfiguration.URL_PROPERTY)));
args.add("--host="
+ JdbcUrl.getHostName(conf.get(DBConfiguration.URL_PROPERTY)));
args.add("--port="
+ JdbcUrl.getPort(conf.get(DBConfiguration.URL_PROPERTY)));
args.add("--input=stdin");
args.add("--output=" + tmpTableName);
args.add("-o");
args.add("TYPE=CSV");
args.add("-o");
args.add("DELIMITER=" + conf.get("pgbulkload.input.field.delim", ","));
args.add("-o");
args.add("QUOTE=" + conf.get("pgbulkload.input.enclosedby", "\""));
args.add("-o");
args.add("ESCAPE=" + conf.get("pgbulkload.input.escapedby", "\""));
args.add("-o");
args.add("CHECK_CONSTRAINTS=" + conf.get("pgbulkload.check.constraints"));
args.add("-o");
args.add("PARSE_ERRORS=" + conf.get("pgbulkload.parse.errors"));
args.add("-o");
args.add("DUPLICATE_ERRORS=" + conf.get("pgbulkload.duplicate.errors"));
if (conf.get("pgbulkload.null.string") != null) {
args.add("-o");
args.add("NULL=" + conf.get("pgbulkload.null.string"));
}
if (conf.get("pgbulkload.filter") != null) {
args.add("-o");
args.add("FILTER=" + conf.get("pgbulkload.filter"));
}
LOG.debug("Starting pg_bulkload with arguments:");
for (String arg : args) {
LOG.debug(" " + arg);
}
if (conf.get(DBConfiguration.PASSWORD_PROPERTY) != null) {
String tmpDir = System.getProperty("test.build.data", "/tmp/");
if (!tmpDir.endsWith(File.separator)) {
tmpDir = tmpDir + File.separator;
}
tmpDir = conf.get("job.local.dir", tmpDir);
passwordFilename = PostgreSQLUtils.writePasswordFile(tmpDir,
conf.get(DBConfiguration.PASSWORD_PROPERTY));
envp.add("PGPASSFILE=" + passwordFilename);
}
process = Runtime.getRuntime().exec(args.toArray(new String[0]),
envp.toArray(new String[0]));
out = process.getOutputStream();
writer = new BufferedWriter(new OutputStreamWriter(out));
thread = new ReadThread(process.getErrorStream());
thread.start();
} catch (Exception e) {
cleanup(context);
doExecuteUpdate("DROP TABLE " + tmpTableName);
throw new IOException(e);
}
}
public void map(LongWritable key, Writable value, Context context)
throws IOException, InterruptedException {
try {
String str = value.toString();
if (value instanceof Text) {
writer.write(str, 0, str.length());
writer.newLine();
} else if (value instanceof SqoopRecord) {
writer.write(str, 0, str.length());
}
} catch (Exception e) {
doExecuteUpdate("DROP TABLE " + tmpTableName);
cleanup(context);
throw new IOException(e);
}
}
protected void cleanup(Context context)
throws IOException, InterruptedException {
LongWritable taskid =
new LongWritable(context.getTaskAttemptID().getTaskID().getId());
context.write(taskid, new Text(tmpTableName));
writer.close();
out.close();
try {
thread.join();
} finally {
// block until the process is done.
int result = 0;
if (null != process) {
while (true) {
try {
result = process.waitFor();
} catch (InterruptedException ie) {
// interrupted; loop around.
continue;
}
break;
}
}
}
if (null != passwordFilename) {
if (!new File(passwordFilename).delete()) {
LOG.error("Could not remove postgresql password file "
+ passwordFilename);
LOG.error("You should remove this file to protect your credentials.");
}
}
}
protected int doExecuteUpdate(String query) throws IOException {
Connection conn = null;
try {
conn = dbConf.getConnection();
conn.setAutoCommit(false);
} catch (ClassNotFoundException ex) {
LOG.error("Unable to load JDBC driver class", ex);
throw new IOException(ex);
} catch (SQLException ex) {
LOG.error("Unable to connect to database", ex);
throw new IOException(ex);
}
Statement stmt = null;
try {
stmt = conn.createStatement();
int ret = stmt.executeUpdate(query);
conn.commit();
return ret;
} catch (SQLException ex) {
LOG.error("Unable to execute query: " + query, ex);
throw new IOException(ex);
} finally {
if (stmt != null) {
try {
stmt.close();
} catch (SQLException ex) {
LOG.error("Unable to close statement", ex);
}
}
try {
conn.close();
} catch (SQLException ex) {
LOG.error("Unable to close connection", ex);
}
}
}
private class ReadThread extends Thread {
private InputStream in;
ReadThread(InputStream in) {
this.in = in;
}
public void run() {
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
String line = null;
try {
while((line = reader.readLine()) != null) {
System.out.println(line);
}
reader.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

View File

@ -0,0 +1,106 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce;
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.sqoop.mapreduce.db.DBConfiguration;
/**
* Reducer for transfering data from temporary table to destination.
* Reducer drops all temporary tables if all data successfully transfered.
* Temporary tables is not dropptd in error case for manual retry.
*/
public class PGBulkloadExportReducer
extends AutoProgressReducer<LongWritable, Text,
NullWritable, NullWritable> {
public static final Log LOG =
LogFactory.getLog(PGBulkloadExportReducer.class.getName());
private Configuration conf;
private DBConfiguration dbConf;
private Connection conn;
private String tableName;
protected void setup(Context context)
throws IOException, InterruptedException {
conf = context.getConfiguration();
dbConf = new DBConfiguration(conf);
tableName = dbConf.getOutputTableName();
try {
conn = dbConf.getConnection();
conn.setAutoCommit(false);
} catch (ClassNotFoundException ex) {
LOG.error("Unable to load JDBC driver class", ex);
throw new IOException(ex);
} catch (SQLException ex) {
LOG.error("Unable to connect to database", ex);
throw new IOException(ex);
}
}
@Override
public void reduce(LongWritable key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
Statement stmt = null;
try {
stmt = conn.createStatement();
for (Text value : values) {
int inserted = stmt.executeUpdate("INSERT INTO " + tableName
+ " ( SELECT * FROM " + value + " )");
stmt.executeUpdate("DROP TABLE " + value);
}
conn.commit();
} catch (SQLException ex) {
LOG.error("Unable to execute create query.", ex);
throw new IOException(ex);
} finally {
if (stmt != null) {
try {
stmt.close();
} catch (SQLException ex) {
LOG.error("Unable to close statement", ex);
}
}
}
}
protected void cleanup(Context context)
throws IOException, InterruptedException {
try {
conn.close();
} catch (SQLException ex) {
LOG.error("Unable to load JDBC driver class", ex);
throw new IOException(ex);
}
}
}

View File

@ -0,0 +1,160 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce;
import org.apache.commons.logging.Log;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
/**
* Run the task process for auto-progress tasks.
*/
public class ProgressThread extends Thread {
private static Log log = null;
/**
* Total number of millis for which progress will be reported by the
* auto-progress thread. If this is zero, then the auto-progress thread will
* never voluntarily exit.
*/
private int maxProgressPeriod;
/**
* Number of milliseconds to sleep for between loop iterations. Must be less
* than report interval.
*/
private int sleepInterval;
/**
* Number of milliseconds between calls to Reporter.progress().
* Should be a multiple of the sleepInterval.
*/
private int reportInterval;
public static final String MAX_PROGRESS_PERIOD_KEY =
"sqoop.mapred.auto.progress.max";
public static final String SLEEP_INTERVAL_KEY =
"sqoop.mapred.auto.progress.sleep";
public static final String REPORT_INTERVAL_KEY =
"sqoop.mapred.auto.progress.report";
// Sleep for 10 seconds at a time.
public static final int DEFAULT_SLEEP_INTERVAL = 10000;
// Report progress every 30 seconds.
public static final int DEFAULT_REPORT_INTERVAL = 30000;
// Disable max progress, by default.
public static final int DEFAULT_MAX_PROGRESS = 0;
private volatile boolean keepGoing; // While this is true, thread runs.
private TaskInputOutputContext context;
private long startTimeMillis;
private long lastReportMillis;
public ProgressThread(final TaskInputOutputContext ctxt, Log log) {
this.context = ctxt;
this.log = log;
this.keepGoing = true;
configureAutoProgress(ctxt.getConfiguration());
}
/**
* Set configuration parameters for the auto-progress thread.
*/
private void configureAutoProgress(Configuration job) {
this.maxProgressPeriod = job.getInt(MAX_PROGRESS_PERIOD_KEY,
DEFAULT_MAX_PROGRESS);
this.sleepInterval = job.getInt(SLEEP_INTERVAL_KEY,
DEFAULT_SLEEP_INTERVAL);
this.reportInterval = job.getInt(REPORT_INTERVAL_KEY,
DEFAULT_REPORT_INTERVAL);
if (this.reportInterval < 1) {
log.warn("Invalid " + REPORT_INTERVAL_KEY + "; setting to "
+ DEFAULT_REPORT_INTERVAL);
this.reportInterval = DEFAULT_REPORT_INTERVAL;
}
if (this.sleepInterval > this.reportInterval || this.sleepInterval < 1) {
log.warn("Invalid " + SLEEP_INTERVAL_KEY + "; setting to "
+ DEFAULT_SLEEP_INTERVAL);
this.sleepInterval = DEFAULT_SLEEP_INTERVAL;
}
if (this.maxProgressPeriod < 0) {
log.warn("Invalid " + MAX_PROGRESS_PERIOD_KEY + "; setting to "
+ DEFAULT_MAX_PROGRESS);
this.maxProgressPeriod = DEFAULT_MAX_PROGRESS;
}
}
public void signalShutdown() {
this.keepGoing = false; // volatile update.
this.interrupt();
}
public void run() {
this.lastReportMillis = System.currentTimeMillis();
this.startTimeMillis = this.lastReportMillis;
final long MAX_PROGRESS = this.maxProgressPeriod;
final long REPORT_INTERVAL = this.reportInterval;
final long SLEEP_INTERVAL = this.sleepInterval;
// In a loop:
// * Check that we haven't run for too long (maxProgressPeriod).
// * If it's been a report interval since we last made progress,
// make more.
// * Sleep for a bit.
// * If the parent thread has signaled for exit, do so.
while (this.keepGoing) {
long curTimeMillis = System.currentTimeMillis();
if (MAX_PROGRESS != 0
&& curTimeMillis - this.startTimeMillis > MAX_PROGRESS) {
this.keepGoing = false;
log.info("Auto-progress thread exiting after " + MAX_PROGRESS
+ " ms.");
break;
}
if (curTimeMillis - this.lastReportMillis > REPORT_INTERVAL) {
// It's been a full report interval -- claim progress.
log.debug("Auto-progress thread reporting progress");
this.context.progress();
this.lastReportMillis = curTimeMillis;
}
// Unless we got an interrupt while we were working,
// sleep a bit before doing more work.
if (!Thread.interrupted()) {
try {
Thread.sleep(SLEEP_INTERVAL);
} catch (InterruptedException ie) {
// we were notified on something; not necessarily an error.
}
}
}
log.info("Auto-progress thread is finished. keepGoing=" + this.keepGoing);
}
}

View File

@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.sqoop.util.LoggingUtils;
import java.io.IOException;
/**
* Base sqoop reducer class that is convenient place for common functionality.
* Other specific reducers are highly encouraged to inherit from this class.
*/
public abstract class SqoopReducer<KI, VI, KO, VO>
extends Reducer<KI, VI, KO, VO> {
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
super.setup(context);
Configuration configuration = context.getConfiguration();
// Propagate verbose flag if needed
if (configuration.getBoolean(JobBase.PROPERTY_VERBOSE, false)) {
LoggingUtils.setDebugLevel();
}
}
}

View File

@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.util;
import java.io.IOException;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStreamWriter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Utility methods for PostgreSQL import/export.
*/
public final class PostgreSQLUtils {
public static final Log LOG =
LogFactory.getLog(PostgreSQLUtils.class.getName());
private PostgreSQLUtils() {
}
/** Write the user's password to a file that is chmod 0600.
@return the filename.
*/
public static String writePasswordFile(String tmpDir, String password)
throws IOException {
File tempFile = File.createTempFile("pgpass", ".pgpass", new File(tmpDir));
LOG.debug("Writing password to tempfile: " + tempFile);
// Make sure it's only readable by the current user.
DirectImportUtils.setFilePermissions(tempFile, "0600");
// Actually write the password data into the file.
BufferedWriter w = new BufferedWriter(
new OutputStreamWriter(new FileOutputStream(tempFile)));
w.write("*:*:*:*:" + password);
w.close();
return tempFile.toString();
}
}

View File

@ -0,0 +1,220 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.sqoop.manager;
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.PreparedStatement;
import java.util.Arrays;
import java.util.ArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import com.cloudera.sqoop.TestExport;
import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
/**
* Test the PGBulkloadManager implementations.
* PGBulkloadManager uses both JDBC driver and pg_bulkload to facilitate it.
*
* Since this requires a Postgresql installation on your local machine to use,
* this class is named in such a way that Hadoop's default QA process does not
* run it.
*
* You need to run this manually with -Dtestcase=PGBulkloadManagerManualTest.
*
* You need to put Postgresql's JDBC driver library into lib dir.
*
* You need to create a sqooptest superuser and database and tablespace,
* and install pg_bulkload for sqooptest database:
*
* $ sudo -u postgres createuser -U postgres -s sqooptest
* $ sudo -u postgres createdb -U sqooptest sqooptest
* $ sudo -u postgres mkdir /var/pgdata/stagingtablespace
* $ psql -U sqooptest
* -f /usr/local/share/postgresql/contrib/pg_bulkload.sql sqooptest
* $ psql -U sqooptest sqooptest
* sqooptest=# CREATE USER sqooptest;
* sqooptest=# CREATE DATABASE sqooptest;
* sqooptest=# CREATE TABLESPACE sqooptest
* LOCATION '/var/pgdata/stagingtablespace';
* sqooptest=# \q
*
*/
public class PGBulkloadManagerManualTest extends TestExport {
public static final Log LOG =
LogFactory.getLog(PGBulkloadManagerManualTest.class.getName());
private DBConfiguration dbConf;
public PGBulkloadManagerManualTest() {
Configuration conf = getConf();
DBConfiguration.configureDB(conf,
"org.postgresql.Driver",
getConnectString(),
getUserName(),
null, null);
dbConf = new DBConfiguration(conf);
}
@Override
protected boolean useHsqldbTestServer() {
return false;
}
@Override
protected String getConnectString() {
return "jdbc:postgresql://localhost:5432/sqooptest";
}
protected String getUserName() {
return "sqooptest";
}
@Override
protected String getTablePrefix() {
return super.getTablePrefix().toLowerCase();
}
@Override
protected String getTableName() {
return super.getTableName().toLowerCase();
}
@Override
public String getStagingTableName() {
return super.getStagingTableName().toLowerCase();
}
@Override
protected Connection getConnection() {
try {
Connection conn = dbConf.getConnection();
conn.setAutoCommit(false);
PreparedStatement stmt =
conn.prepareStatement("SET extra_float_digits TO 0");
stmt.executeUpdate();
conn.commit();
return conn;
} catch (SQLException sqlE) {
LOG.error("Could not get connection to test server: " + sqlE);
return null;
} catch (ClassNotFoundException cnfE) {
LOG.error("Could not find driver class: " + cnfE);
return null;
}
}
@Override
protected String getDropTableStatement(String tableName) {
return "DROP TABLE IF EXISTS " + tableName;
}
@Override
protected String[] getArgv(boolean includeHadoopFlags,
int rowsPerStatement,
int statementsPerTx,
String... additionalArgv) {
ArrayList<String> args =
new ArrayList<String>(Arrays.asList(additionalArgv));
args.add("--username");
args.add(getUserName());
args.add("--connection-manager");
args.add("org.apache.sqoop.manager.PGBulkloadManager");
args.add("--staging-table");
args.add("dummy");
args.add("--clear-staging-table");
return super.getArgv(includeHadoopFlags,
rowsPerStatement,
statementsPerTx,
args.toArray(new String[0]));
}
@Override
protected String [] getCodeGenArgv(String... extraArgs) {
ArrayList<String> args = new ArrayList<String>(Arrays.asList(extraArgs));
args.add("--username");
args.add(getUserName());
return super.getCodeGenArgv(args.toArray(new String[0]));
}
@Override
public void testColumnsExport() throws IOException, SQLException {
// PGBulkloadManager does not support --columns option.
}
public void testMultiReduceExport() throws IOException, SQLException {
String[] genericargs = newStrArray(null, "-Dmapred.reduce.tasks=2");
multiFileTestWithGenericArgs(2, 10, 2, genericargs);
}
public void testExportWithTablespace() throws IOException, SQLException {
String[] genericargs =
newStrArray(null, "-Dpgbulkload.staging.tablespace=sqooptest");
multiFileTestWithGenericArgs(1, 10, 1, genericargs);
}
protected void multiFileTestWithGenericArgs(int numFiles,
int recordsPerMap,
int numMaps,
String[] genericargs,
String... argv)
throws IOException, SQLException {
final int TOTAL_RECORDS = numFiles * recordsPerMap;
try {
LOG.info("Beginning test: numFiles=" + numFiles + "; recordsPerMap="
+ recordsPerMap + "; numMaps=" + numMaps);
LOG.info(" with genericargs: ");
for (String arg : genericargs) {
LOG.info(" " + arg);
}
for (int i = 0; i < numFiles; i++) {
createTextFile(i, recordsPerMap, false);
}
createTable();
runExport(getArgv(true, 10, 10,
newStrArray(newStrArray(genericargs, argv),
"-m", "" + numMaps)));
verifyExport(TOTAL_RECORDS);
} finally {
LOG.info("multi-reduce test complete");
}
}
}