diff --git a/src/docs/user/SqoopUserGuide.txt b/src/docs/user/SqoopUserGuide.txt
index 819c5125..e74cf644 100644
--- a/src/docs/user/SqoopUserGuide.txt
+++ b/src/docs/user/SqoopUserGuide.txt
@@ -72,8 +72,9 @@ include::version.txt[]
include::compatibility.txt[]
+include::connectors.txt[]
+
include::support.txt[]
include::troubleshooting.txt[]
-
diff --git a/src/docs/user/connectors.txt b/src/docs/user/connectors.txt
new file mode 100644
index 00000000..a93f14eb
--- /dev/null
+++ b/src/docs/user/connectors.txt
@@ -0,0 +1,138 @@
+
+////
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+////
+
+
+Notes for specific connectors
+-----------------------------
+
+pg_bulkload connector
+~~~~~~~~~~~~~~~~~~~~~
+
+Purpose
+^^^^^^^
+pg_bulkload connector is a direct connector for exporting data into PostgreSQL.
+This connector uses
+http://pgbulkload.projects.postgresql.org/index.html[pg_bulkload].
+Users benefit from functionality of pg_bulkload such as
+fast exports bypassing shared bufferes and WAL,
+flexible error records handling,
+and ETL feature with filter functions.
+
+Requirements
+^^^^^^^^^^^^
+pg_bulkload connector requires following conditions for export job execution:
+
+* The link:http://pgbulkload.projects.postgresql.org/index.html[pg_bulkload]
+ must be installed on DB server and all slave nodes.
+ RPM for RedHat or CentOS is available in then
+ link:http://pgfoundry.org/frs/?group_id=1000261[download page].
+* The link:http://jdbc.postgresql.org/index.html[PostgreSQL JDBC]
+ is required on client node.
+* Superuser role of PostgreSQL database is required for execution of pg_bulkload.
+
+Syntax
+^^^^^^
+Use +--connection-manager+ option to specify connection manager classname.
+----
+$ sqoop export (generic-args) --connection-manager org.apache.sqoop.manager.PGBulkloadManager (export-args)
+$ sqoop-export (generic-args) --connection-manager org.apache.sqoop.manager.PGBulkloadManager (export-args)
+----
+
+This connector supports export arguments shown below.
+
+.Supported export control arguments:
+[grid="all"]
+`----------------------------------------`---------------------------------------
+Argument Description
+---------------------------------------------------------------------------------
++\--export-dir
+ HDFS source path for the export
++-m,\--num-mappers + Use 'n' map tasks to export in\
+ parallel
++\--table + Table to populate
++\--input-null-string + The string to be interpreted as\
+ null for string columns
++\--clear-staging-table+ Indicates that any data present in\
+ the staging table can be deleted.
+---------------------------------------------------------------------------------
+
+There are additional configuration for pg_bulkload execution
+specified via Hadoop Configuration properties
+which can be given with +-D + option.
+Because Hadoop Configuration properties are generic arguments of the sqoop,
+it must preceed any export control arguments.
+
+.Supported export control properties:
+[grid="all"]
+`-----------------------------`----------------------------------------------
+Property Description
+-----------------------------------------------------------------------------
+mapred.reduce.tasks Number of reduce tasks for staging. \
+ The defalt value is 1. \
+ Each tasks do staging in a single transaction.
+pgbulkload.bin Path of the pg_bulkoad binary \
+ installed on each slave nodes.
+pgbulkload.check.constraints Specify whether CHECK constraints are checked \
+ during the loading. \
+ The default value is YES.
+pgbulkload.parse.errors The maximum mumber of ingored records \
+ that cause errors during parsing, \
+ encoding, filtering, constraints checking, \
+ and data type conversion. \
+ Error records are recorded \
+ in the PARSE BADFILE. \
+ The default value is INFINITE.
+pgbulkload.duplicate.errors Number of ingored records \
+ that violate unique constraints. \
+ Duplicated records are recorded in the \
+ DUPLICATE BADFILE on DB server. \
+ The default value is INFINITE.
+pgbulkload.filter Specify the filter function \
+ to convert each row in the input file. \
+ See the pg_bulkload documentation to know \
+ how to write FILTER functions.
+-----------------------------------------------------------------------------
+
+Here is a example of complete command line.
+----
+$ sqoop export \
+ -Dmapred.reduce.tasks=2
+ -Dpgbulkload.bin="/usr/local/bin/pg_bulkload" \
+ -Dpgbulkload.input.field.delim=$'\t' \
+ -Dpgbulkload.check.constraints="YES" \
+ -Dpgbulkload.parse.errors="INFINITE" \
+ -Dpgbulkload.duplicate.errors="INFINITE" \
+ --connect jdbc:postgresql://pgsql.example.net:5432/sqooptest \
+ --connection-manager org.apache.sqoop.manager.PGBulkloadManager \
+ --table test --username sqooptest --export-dir=/test -m 2
+----
+
+Data Staging
+^^^^^^^^^^^^
+Each map tasks of pg_bulkload connector's export job create
+their own staging table on the fly.
+The Name of staging tables is decided based on the destination table
+and the task attempt ids.
+For example, the name of staging table for the "test" table is like
++test_attempt_1345021837431_0001_m_000000_0+ .
+
+Staging tables are automatically dropped if tasks successfully complete
+or map tasks fail.
+When reduce task fails,
+staging table for the task are left for manual retry and
+users must take care of it.
diff --git a/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java b/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java
index 9d79cfeb..a557aa13 100644
--- a/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java
+++ b/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java
@@ -32,6 +32,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.sqoop.util.PostgreSQLUtils;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.io.SplittableBufferedWriter;
@@ -293,26 +294,6 @@ private String writeCopyCommand(String command) throws IOException {
return tempFile.toString();
}
- /** Write the user's password to a file that is chmod 0600.
- @return the filename.
- */
- private String writePasswordFile(String password) throws IOException {
-
- String tmpDir = options.getTempDir();
- File tempFile = File.createTempFile("pgpass", ".pgpass", new File(tmpDir));
- LOG.debug("Writing password to tempfile: " + tempFile);
-
- // Make sure it's only readable by the current user.
- DirectImportUtils.setFilePermissions(tempFile, "0600");
-
- // Actually write the password data into the file.
- BufferedWriter w = new BufferedWriter(
- new OutputStreamWriter(new FileOutputStream(tempFile)));
- w.write("*:*:*:*:" + password);
- w.close();
- return tempFile.toString();
- }
-
// TODO(aaron): Refactor this method to be much shorter.
// CHECKSTYLE:OFF
@Override
@@ -380,7 +361,8 @@ public void importTable(com.cloudera.sqoop.manager.ImportJobContext context)
args.add(username);
String password = options.getPassword();
if (null != password) {
- passwordFilename = writePasswordFile(password);
+ passwordFilename =
+ PostgreSQLUtils.writePasswordFile(options.getTempDir(), password);
// Need to send PGPASSFILE environment variable specifying
// location of our postgres file.
envp.add("PGPASSFILE=" + passwordFilename);
diff --git a/src/java/org/apache/sqoop/manager/PGBulkloadManager.java b/src/java/org/apache/sqoop/manager/PGBulkloadManager.java
new file mode 100644
index 00000000..92174f8e
--- /dev/null
+++ b/src/java/org/apache/sqoop/manager/PGBulkloadManager.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager;
+
+import java.io.IOException;
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.manager.ExportJobContext;
+import com.cloudera.sqoop.util.ExportException;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sqoop.mapreduce.ExportInputFormat;
+import org.apache.sqoop.mapreduce.PGBulkloadExportJob;
+
+
+
+/**
+ * Manages connections to Postgresql databases.
+ */
+public class PGBulkloadManager extends PostgresqlManager {
+
+ public static final Log LOG =
+ LogFactory.getLog(PGBulkloadManager.class.getName());
+
+
+ public PGBulkloadManager(final SqoopOptions opts) {
+ super(opts, true);
+ }
+
+
+ @Override
+ public void exportTable(ExportJobContext context)
+ throws IOException, ExportException {
+ context.setConnManager(this);
+ options.setStagingTableName(null);
+ PGBulkloadExportJob jobbase =
+ new PGBulkloadExportJob(context,
+ null,
+ ExportInputFormat.class,
+ NullOutputFormat.class);
+ jobbase.runExport();
+ }
+
+
+ @Override
+ public boolean supportsStagingForExport() {
+ return false;
+ }
+
+}
diff --git a/src/java/org/apache/sqoop/mapreduce/AutoProgressMapper.java b/src/java/org/apache/sqoop/mapreduce/AutoProgressMapper.java
index 4b61321e..95086417 100644
--- a/src/java/org/apache/sqoop/mapreduce/AutoProgressMapper.java
+++ b/src/java/org/apache/sqoop/mapreduce/AutoProgressMapper.java
@@ -21,7 +21,6 @@
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
/**
* Identity mapper that continuously reports progress via a background thread.
@@ -32,25 +31,6 @@ public class AutoProgressMapper
public static final Log LOG = LogFactory.getLog(
AutoProgressMapper.class.getName());
- /**
- * Total number of millis for which progress will be reported by the
- * auto-progress thread. If this is zero, then the auto-progress thread will
- * never voluntarily exit.
- */
- private int maxProgressPeriod;
-
- /**
- * Number of milliseconds to sleep for between loop iterations. Must be less
- * than report interval.
- */
- private int sleepInterval;
-
- /**
- * Number of milliseconds between calls to Reporter.progress().
- * Should be a multiple of the sleepInterval.
- */
- private int reportInterval;
-
public static final String MAX_PROGRESS_PERIOD_KEY =
"sqoop.mapred.auto.progress.max";
public static final String SLEEP_INTERVAL_KEY =
@@ -67,112 +47,14 @@ public class AutoProgressMapper
// Disable max progress, by default.
public static final int DEFAULT_MAX_PROGRESS = 0;
- private class ProgressThread extends Thread {
-
- private volatile boolean keepGoing; // While this is true, thread runs.
-
- private Context context;
- private long startTimeMillis;
- private long lastReportMillis;
-
- public ProgressThread(final Context ctxt) {
- this.context = ctxt;
- this.keepGoing = true;
- }
-
- public void signalShutdown() {
- this.keepGoing = false; // volatile update.
- this.interrupt();
- }
-
- public void run() {
- this.lastReportMillis = System.currentTimeMillis();
- this.startTimeMillis = this.lastReportMillis;
-
- final long MAX_PROGRESS = AutoProgressMapper.this.maxProgressPeriod;
- final long REPORT_INTERVAL = AutoProgressMapper.this.reportInterval;
- final long SLEEP_INTERVAL = AutoProgressMapper.this.sleepInterval;
-
- // In a loop:
- // * Check that we haven't run for too long (maxProgressPeriod).
- // * If it's been a report interval since we last made progress,
- // make more.
- // * Sleep for a bit.
- // * If the parent thread has signaled for exit, do so.
- while (this.keepGoing) {
- long curTimeMillis = System.currentTimeMillis();
-
- if (MAX_PROGRESS != 0
- && curTimeMillis - this.startTimeMillis > MAX_PROGRESS) {
- this.keepGoing = false;
- LOG.info("Auto-progress thread exiting after " + MAX_PROGRESS
- + " ms.");
- break;
- }
-
- if (curTimeMillis - this.lastReportMillis > REPORT_INTERVAL) {
- // It's been a full report interval -- claim progress.
- LOG.debug("Auto-progress thread reporting progress");
- this.context.progress();
- this.lastReportMillis = curTimeMillis;
- }
-
- // Unless we got an interrupt while we were working,
- // sleep a bit before doing more work.
- if (!Thread.interrupted()) {
- try {
- Thread.sleep(SLEEP_INTERVAL);
- } catch (InterruptedException ie) {
- // we were notified on something; not necessarily an error.
- }
- }
- }
-
- LOG.info("Auto-progress thread is finished. keepGoing=" + this.keepGoing);
- }
- }
-
- /**
- * Set configuration parameters for the auto-progress thread.
- */
- private void configureAutoProgress(Configuration job) {
- this.maxProgressPeriod = job.getInt(MAX_PROGRESS_PERIOD_KEY,
- DEFAULT_MAX_PROGRESS);
- this.sleepInterval = job.getInt(SLEEP_INTERVAL_KEY,
- DEFAULT_SLEEP_INTERVAL);
- this.reportInterval = job.getInt(REPORT_INTERVAL_KEY,
- DEFAULT_REPORT_INTERVAL);
-
- if (this.reportInterval < 1) {
- LOG.warn("Invalid " + REPORT_INTERVAL_KEY + "; setting to "
- + DEFAULT_REPORT_INTERVAL);
- this.reportInterval = DEFAULT_REPORT_INTERVAL;
- }
-
- if (this.sleepInterval > this.reportInterval || this.sleepInterval < 1) {
- LOG.warn("Invalid " + SLEEP_INTERVAL_KEY + "; setting to "
- + DEFAULT_SLEEP_INTERVAL);
- this.sleepInterval = DEFAULT_SLEEP_INTERVAL;
- }
-
- if (this.maxProgressPeriod < 0) {
- LOG.warn("Invalid " + MAX_PROGRESS_PERIOD_KEY + "; setting to "
- + DEFAULT_MAX_PROGRESS);
- this.maxProgressPeriod = DEFAULT_MAX_PROGRESS;
- }
- }
-
-
// map() method intentionally omitted; Mapper.map() is the identity mapper.
-
/**
* Run the mapping process for this task, wrapped in an auto-progress system.
*/
@Override
public void run(Context context) throws IOException, InterruptedException {
- configureAutoProgress(context.getConfiguration());
- ProgressThread thread = this.new ProgressThread(context);
+ ProgressThread thread = new ProgressThread(context, LOG);
try {
thread.setDaemon(true);
@@ -191,7 +73,7 @@ public void run(Context context) throws IOException, InterruptedException {
LOG.debug("Progress thread shutdown detected.");
} catch (InterruptedException ie) {
LOG.warn("Interrupted when waiting on auto-progress thread: "
- + ie.toString());
+ + ie.toString(), ie);
}
}
}
diff --git a/src/java/org/apache/sqoop/mapreduce/AutoProgressReducer.java b/src/java/org/apache/sqoop/mapreduce/AutoProgressReducer.java
new file mode 100644
index 00000000..ee1d8822
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/AutoProgressReducer.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Identity reducer that continuously reports progress via a background thread.
+ */
+public class AutoProgressReducer
+ extends SqoopReducer {
+
+ public static final Log LOG = LogFactory.getLog(
+ AutoProgressReducer.class.getName());
+
+ // reduce() method intentionally omitted;
+ // Reducer.reduce() is the identity reducer.
+
+ /**
+ * Run the mapping process for this task, wrapped in an auto-progress system.
+ */
+ @Override
+ public void run(Context context) throws IOException, InterruptedException {
+ ProgressThread thread = new ProgressThread(context, LOG);
+
+ try {
+ thread.setDaemon(true);
+ thread.start();
+
+ // use default run() method to actually drive the mapping.
+ super.run(context);
+ } finally {
+ // Tell the progress thread to exit..
+ LOG.debug("Instructing auto-progress thread to quit.");
+ thread.signalShutdown();
+ try {
+ // And wait for that to happen.
+ LOG.debug("Waiting for progress thread shutdown...");
+ thread.join();
+ LOG.debug("Progress thread shutdown detected.");
+ } catch (InterruptedException ie) {
+ LOG.warn("Interrupted when waiting on auto-progress thread: "
+ + ie.toString(), ie);
+ }
+ }
+ }
+}
diff --git a/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportJob.java b/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportJob.java
new file mode 100644
index 00000000..f3f094bd
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportJob.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import com.cloudera.sqoop.manager.ExportJobContext;
+import com.cloudera.sqoop.util.ExportException;
+import com.cloudera.sqoop.SqoopOptions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.sqoop.config.ConfigurationHelper;
+import org.apache.sqoop.lib.DelimiterSet;
+import org.apache.sqoop.manager.ConnManager;
+import org.apache.sqoop.mapreduce.db.DBConfiguration;
+import org.apache.sqoop.orm.TableClassName;
+
+
+/**
+ * Class that runs an export job using pg_bulkload in the mapper.
+ */
+public class PGBulkloadExportJob extends ExportJobBase {
+
+ public static final Log LOG =
+ LogFactory.getLog(PGBulkloadExportJob.class.getName());
+
+
+ public PGBulkloadExportJob(final ExportJobContext context) {
+ super(context);
+ }
+
+
+ public PGBulkloadExportJob(final ExportJobContext ctxt,
+ final Class extends Mapper> mapperClass,
+ final Class extends InputFormat> inputFormatClass,
+ final Class extends OutputFormat> outputFormatClass) {
+ super(ctxt, mapperClass, inputFormatClass, outputFormatClass);
+ }
+
+
+ @Override
+ protected void configureInputFormat(Job job, String tableName,
+ String tableClassName, String splitByCol)
+ throws ClassNotFoundException, IOException {
+ super.configureInputFormat(job, tableName, tableClassName, splitByCol);
+ ConnManager mgr = context.getConnManager();
+ String username = options.getUsername();
+ if (null == username || username.length() == 0) {
+ DBConfiguration.configureDB(job.getConfiguration(),
+ mgr.getDriverClass(),
+ options.getConnectString(),
+ options.getFetchSize());
+ } else {
+ DBConfiguration.configureDB(job.getConfiguration(),
+ mgr.getDriverClass(),
+ options.getConnectString(),
+ username, options.getPassword(),
+ options.getFetchSize());
+ }
+ }
+
+
+ @Override
+ protected Class extends Mapper> getMapperClass() {
+ return PGBulkloadExportMapper.class;
+ }
+
+
+ protected Class extends Reducer> getReducerClass() {
+ return PGBulkloadExportReducer.class;
+ }
+
+
+ private void setDelimiter(String prop, char val, Configuration conf) {
+ switch (val) {
+ case DelimiterSet.NULL_CHAR:
+ break;
+ case '\t':
+ default:
+ conf.set(prop, String.valueOf(val));
+ }
+ }
+
+
+ @Override
+ protected void propagateOptionsToJob(Job job) {
+ super.propagateOptionsToJob(job);
+ SqoopOptions opts = context.getOptions();
+ Configuration conf = job.getConfiguration();
+ conf.setIfUnset("pgbulkload.bin", "pg_bulkload");
+ if (opts.getNullStringValue() != null) {
+ conf.set("pgbulkload.null.string", opts.getNullStringValue());
+ }
+ setDelimiter("pgbulkload.input.field.delim",
+ opts.getInputFieldDelim(),
+ conf);
+ setDelimiter("pgbulkload.input.record.delim",
+ opts.getInputRecordDelim(),
+ conf);
+ setDelimiter("pgbulkload.input.enclosedby",
+ opts.getInputEnclosedBy(),
+ conf);
+ setDelimiter("pgbulkload.input.escapedby",
+ opts.getInputEscapedBy(),
+ conf);
+ conf.setBoolean("pgbulkload.input.encloserequired",
+ opts.isInputEncloseRequired());
+ conf.setIfUnset("pgbulkload.check.constraints", "YES");
+ conf.setIfUnset("pgbulkload.parse.errors", "INFINITE");
+ conf.setIfUnset("pgbulkload.duplicate.errors", "INFINITE");
+ conf.set("mapred.jar", context.getJarFile());
+ conf.setBoolean("mapred.map.tasks.speculative.execution", false);
+ conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
+ conf.setInt("mapred.map.max.attempts", 1);
+ conf.setInt("mapred.reduce.max.attempts", 1);
+ conf.setIfUnset("mapred.reduce.tasks", "1");
+ if (context.getOptions().doClearStagingTable()) {
+ conf.setBoolean("pgbulkload.clear.staging.table", true);
+ }
+ }
+
+
+ @Override
+ public void runExport() throws ExportException, IOException {
+ ConnManager cmgr = context.getConnManager();
+ SqoopOptions options = context.getOptions();
+ Configuration conf = options.getConf();
+ DBConfiguration dbConf = null;
+ String outputTableName = context.getTableName();
+ String tableName = outputTableName;
+ String tableClassName =
+ new TableClassName(options).getClassForTable(outputTableName);
+
+ LOG.info("Beginning export of " + outputTableName);
+ loadJars(conf, context.getJarFile(), tableClassName);
+
+ try {
+ Job job = new Job(conf);
+ dbConf = new DBConfiguration(job.getConfiguration());
+ dbConf.setOutputTableName(tableName);
+ configureInputFormat(job, tableName, tableClassName, null);
+ configureOutputFormat(job, tableName, tableClassName);
+ configureNumTasks(job);
+ propagateOptionsToJob(job);
+ job.setMapperClass(getMapperClass());
+ job.setMapOutputKeyClass(LongWritable.class);
+ job.setMapOutputValueClass(Text.class);
+ job.setReducerClass(getReducerClass());
+ cacheJars(job, context.getConnManager());
+ setJob(job);
+
+ boolean success = runJob(job);
+ if (!success) {
+ throw new ExportException("Export job failed!");
+ }
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
+ } catch (ClassNotFoundException cnfe) {
+ throw new IOException(cnfe);
+ } finally {
+ unloadJars();
+ }
+ }
+
+
+ @Override
+ protected int configureNumTasks(Job job) throws IOException {
+ SqoopOptions options = context.getOptions();
+ int numMapTasks = options.getNumMappers();
+ if (numMapTasks < 1) {
+ numMapTasks = SqoopOptions.DEFAULT_NUM_MAPPERS;
+ LOG.warn("Invalid mapper count; using " + numMapTasks + " mappers.");
+ }
+
+ ConfigurationHelper.setJobNumMaps(job, numMapTasks);
+ return numMapTasks;
+ }
+
+
+ private void clearStagingTable(DBConfiguration dbConf, String tableName)
+ throws IOException {
+ // clearing stagingtable is done each mapper tasks
+ }
+}
diff --git a/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportMapper.java b/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportMapper.java
new file mode 100644
index 00000000..14d064a5
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportMapper.java
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.sqoop.lib.SqoopRecord;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.sqoop.mapreduce.db.DBConfiguration;
+import org.apache.sqoop.util.PostgreSQLUtils;
+import org.apache.sqoop.util.Executor;
+import org.apache.sqoop.util.JdbcUrl;
+
+
+/**
+ * Mapper that starts a 'pg_bulkload' process and uses that to export rows from
+ * HDFS to a PostgreSQL database at high speed.
+ *
+ * map() methods are actually provided by subclasses that read from
+ * SequenceFiles (containing existing SqoopRecords) or text files
+ * (containing delimited lines) and deliver these results to the stream
+ * used to interface with pg_bulkload.
+ */
+public class PGBulkloadExportMapper
+ extends AutoProgressMapper {
+ private Configuration conf;
+ private DBConfiguration dbConf;
+ private Process process;
+ private OutputStream out;
+ protected BufferedWriter writer;
+ private Thread thread;
+ protected String tmpTableName;
+ private String tableName;
+ private String passwordFilename;
+
+
+ public PGBulkloadExportMapper() {
+ }
+
+
+ protected void setup(Context context)
+ throws IOException, InterruptedException {
+ super.setup(context);
+ conf = context.getConfiguration();
+ dbConf = new DBConfiguration(conf);
+ tableName = dbConf.getOutputTableName();
+ tmpTableName = tableName + "_" + context.getTaskAttemptID().toString();
+
+ Connection conn = null;
+ try {
+ conn = dbConf.getConnection();
+ conn.setAutoCommit(false);
+ if (conf.getBoolean("pgbulkload.clear.staging.table", false)) {
+ StringBuffer query = new StringBuffer();
+ query.append("DROP TABLE IF EXISTS ");
+ query.append(tmpTableName);
+ doExecuteUpdate(query.toString());
+ }
+ StringBuffer query = new StringBuffer();
+ query.append("CREATE TABLE ");
+ query.append(tmpTableName);
+ query.append("(LIKE ");
+ query.append(tableName);
+ query.append(" INCLUDING CONSTRAINTS)");
+ if (conf.get("pgbulkload.staging.tablespace") != null) {
+ query.append("TABLESPACE ");
+ query.append(conf.get("pgbulkload.staging.tablespace"));
+ }
+ doExecuteUpdate(query.toString());
+ conn.commit();
+ } catch (ClassNotFoundException ex) {
+ LOG.error("Unable to load JDBC driver class", ex);
+ throw new IOException(ex);
+ } catch (SQLException ex) {
+ LOG.error("Unable to execute statement", ex);
+ throw new IOException(ex);
+ } finally {
+ try {
+ conn.close();
+ } catch (SQLException ex) {
+ LOG.error("Unable to close connection", ex);
+ }
+ }
+
+ try {
+ ArrayList args = new ArrayList();
+ List envp = Executor.getCurEnvpStrings();
+ args.add(conf.get("pgbulkload.bin", "pg_bulkload"));
+ args.add("--username="
+ + conf.get(DBConfiguration.USERNAME_PROPERTY));
+ args.add("--dbname="
+ + JdbcUrl.getDatabaseName(conf.get(DBConfiguration.URL_PROPERTY)));
+ args.add("--host="
+ + JdbcUrl.getHostName(conf.get(DBConfiguration.URL_PROPERTY)));
+ args.add("--port="
+ + JdbcUrl.getPort(conf.get(DBConfiguration.URL_PROPERTY)));
+ args.add("--input=stdin");
+ args.add("--output=" + tmpTableName);
+ args.add("-o");
+ args.add("TYPE=CSV");
+ args.add("-o");
+ args.add("DELIMITER=" + conf.get("pgbulkload.input.field.delim", ","));
+ args.add("-o");
+ args.add("QUOTE=" + conf.get("pgbulkload.input.enclosedby", "\""));
+ args.add("-o");
+ args.add("ESCAPE=" + conf.get("pgbulkload.input.escapedby", "\""));
+ args.add("-o");
+ args.add("CHECK_CONSTRAINTS=" + conf.get("pgbulkload.check.constraints"));
+ args.add("-o");
+ args.add("PARSE_ERRORS=" + conf.get("pgbulkload.parse.errors"));
+ args.add("-o");
+ args.add("DUPLICATE_ERRORS=" + conf.get("pgbulkload.duplicate.errors"));
+ if (conf.get("pgbulkload.null.string") != null) {
+ args.add("-o");
+ args.add("NULL=" + conf.get("pgbulkload.null.string"));
+ }
+ if (conf.get("pgbulkload.filter") != null) {
+ args.add("-o");
+ args.add("FILTER=" + conf.get("pgbulkload.filter"));
+ }
+ LOG.debug("Starting pg_bulkload with arguments:");
+ for (String arg : args) {
+ LOG.debug(" " + arg);
+ }
+ if (conf.get(DBConfiguration.PASSWORD_PROPERTY) != null) {
+ String tmpDir = System.getProperty("test.build.data", "/tmp/");
+ if (!tmpDir.endsWith(File.separator)) {
+ tmpDir = tmpDir + File.separator;
+ }
+ tmpDir = conf.get("job.local.dir", tmpDir);
+ passwordFilename = PostgreSQLUtils.writePasswordFile(tmpDir,
+ conf.get(DBConfiguration.PASSWORD_PROPERTY));
+ envp.add("PGPASSFILE=" + passwordFilename);
+ }
+ process = Runtime.getRuntime().exec(args.toArray(new String[0]),
+ envp.toArray(new String[0]));
+ out = process.getOutputStream();
+ writer = new BufferedWriter(new OutputStreamWriter(out));
+ thread = new ReadThread(process.getErrorStream());
+ thread.start();
+ } catch (Exception e) {
+ cleanup(context);
+ doExecuteUpdate("DROP TABLE " + tmpTableName);
+ throw new IOException(e);
+ }
+ }
+
+
+ public void map(LongWritable key, Writable value, Context context)
+ throws IOException, InterruptedException {
+ try {
+ String str = value.toString();
+ if (value instanceof Text) {
+ writer.write(str, 0, str.length());
+ writer.newLine();
+ } else if (value instanceof SqoopRecord) {
+ writer.write(str, 0, str.length());
+ }
+ } catch (Exception e) {
+ doExecuteUpdate("DROP TABLE " + tmpTableName);
+ cleanup(context);
+ throw new IOException(e);
+ }
+ }
+
+
+ protected void cleanup(Context context)
+ throws IOException, InterruptedException {
+ LongWritable taskid =
+ new LongWritable(context.getTaskAttemptID().getTaskID().getId());
+ context.write(taskid, new Text(tmpTableName));
+ writer.close();
+ out.close();
+ try {
+ thread.join();
+ } finally {
+ // block until the process is done.
+ int result = 0;
+ if (null != process) {
+ while (true) {
+ try {
+ result = process.waitFor();
+ } catch (InterruptedException ie) {
+ // interrupted; loop around.
+ continue;
+ }
+ break;
+ }
+ }
+ }
+ if (null != passwordFilename) {
+ if (!new File(passwordFilename).delete()) {
+ LOG.error("Could not remove postgresql password file "
+ + passwordFilename);
+ LOG.error("You should remove this file to protect your credentials.");
+ }
+ }
+ }
+
+
+ protected int doExecuteUpdate(String query) throws IOException {
+ Connection conn = null;
+ try {
+ conn = dbConf.getConnection();
+ conn.setAutoCommit(false);
+ } catch (ClassNotFoundException ex) {
+ LOG.error("Unable to load JDBC driver class", ex);
+ throw new IOException(ex);
+ } catch (SQLException ex) {
+ LOG.error("Unable to connect to database", ex);
+ throw new IOException(ex);
+ }
+ Statement stmt = null;
+ try {
+ stmt = conn.createStatement();
+ int ret = stmt.executeUpdate(query);
+ conn.commit();
+ return ret;
+ } catch (SQLException ex) {
+ LOG.error("Unable to execute query: " + query, ex);
+ throw new IOException(ex);
+ } finally {
+ if (stmt != null) {
+ try {
+ stmt.close();
+ } catch (SQLException ex) {
+ LOG.error("Unable to close statement", ex);
+ }
+ }
+ try {
+ conn.close();
+ } catch (SQLException ex) {
+ LOG.error("Unable to close connection", ex);
+ }
+ }
+ }
+
+
+ private class ReadThread extends Thread {
+ private InputStream in;
+
+ ReadThread(InputStream in) {
+ this.in = in;
+ }
+
+ public void run() {
+ BufferedReader reader = new BufferedReader(new InputStreamReader(in));
+ String line = null;
+ try {
+ while((line = reader.readLine()) != null) {
+ System.out.println(line);
+ }
+ reader.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+}
diff --git a/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportReducer.java b/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportReducer.java
new file mode 100644
index 00000000..63c52c7b
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportReducer.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.sqoop.mapreduce.db.DBConfiguration;
+
+
+/**
+ * Reducer for transfering data from temporary table to destination.
+ * Reducer drops all temporary tables if all data successfully transfered.
+ * Temporary tables is not dropptd in error case for manual retry.
+ */
+public class PGBulkloadExportReducer
+ extends AutoProgressReducer {
+
+ public static final Log LOG =
+ LogFactory.getLog(PGBulkloadExportReducer.class.getName());
+ private Configuration conf;
+ private DBConfiguration dbConf;
+ private Connection conn;
+ private String tableName;
+
+
+ protected void setup(Context context)
+ throws IOException, InterruptedException {
+ conf = context.getConfiguration();
+ dbConf = new DBConfiguration(conf);
+ tableName = dbConf.getOutputTableName();
+ try {
+ conn = dbConf.getConnection();
+ conn.setAutoCommit(false);
+ } catch (ClassNotFoundException ex) {
+ LOG.error("Unable to load JDBC driver class", ex);
+ throw new IOException(ex);
+ } catch (SQLException ex) {
+ LOG.error("Unable to connect to database", ex);
+ throw new IOException(ex);
+ }
+ }
+
+
+ @Override
+ public void reduce(LongWritable key, Iterable values, Context context)
+ throws IOException, InterruptedException {
+ Statement stmt = null;
+ try {
+ stmt = conn.createStatement();
+ for (Text value : values) {
+ int inserted = stmt.executeUpdate("INSERT INTO " + tableName
+ + " ( SELECT * FROM " + value + " )");
+ stmt.executeUpdate("DROP TABLE " + value);
+ }
+ conn.commit();
+ } catch (SQLException ex) {
+ LOG.error("Unable to execute create query.", ex);
+ throw new IOException(ex);
+ } finally {
+ if (stmt != null) {
+ try {
+ stmt.close();
+ } catch (SQLException ex) {
+ LOG.error("Unable to close statement", ex);
+ }
+ }
+ }
+ }
+
+
+ protected void cleanup(Context context)
+ throws IOException, InterruptedException {
+ try {
+ conn.close();
+ } catch (SQLException ex) {
+ LOG.error("Unable to load JDBC driver class", ex);
+ throw new IOException(ex);
+ }
+ }
+
+}
diff --git a/src/java/org/apache/sqoop/mapreduce/ProgressThread.java b/src/java/org/apache/sqoop/mapreduce/ProgressThread.java
new file mode 100644
index 00000000..9d1ff51d
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/ProgressThread.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+
+/**
+ * Run the task process for auto-progress tasks.
+ */
+public class ProgressThread extends Thread {
+
+ private static Log log = null;
+
+ /**
+ * Total number of millis for which progress will be reported by the
+ * auto-progress thread. If this is zero, then the auto-progress thread will
+ * never voluntarily exit.
+ */
+ private int maxProgressPeriod;
+
+ /**
+ * Number of milliseconds to sleep for between loop iterations. Must be less
+ * than report interval.
+ */
+ private int sleepInterval;
+
+ /**
+ * Number of milliseconds between calls to Reporter.progress().
+ * Should be a multiple of the sleepInterval.
+ */
+ private int reportInterval;
+
+ public static final String MAX_PROGRESS_PERIOD_KEY =
+ "sqoop.mapred.auto.progress.max";
+ public static final String SLEEP_INTERVAL_KEY =
+ "sqoop.mapred.auto.progress.sleep";
+ public static final String REPORT_INTERVAL_KEY =
+ "sqoop.mapred.auto.progress.report";
+
+ // Sleep for 10 seconds at a time.
+ public static final int DEFAULT_SLEEP_INTERVAL = 10000;
+
+ // Report progress every 30 seconds.
+ public static final int DEFAULT_REPORT_INTERVAL = 30000;
+
+ // Disable max progress, by default.
+ public static final int DEFAULT_MAX_PROGRESS = 0;
+
+ private volatile boolean keepGoing; // While this is true, thread runs.
+
+ private TaskInputOutputContext context;
+ private long startTimeMillis;
+ private long lastReportMillis;
+
+ public ProgressThread(final TaskInputOutputContext ctxt, Log log) {
+ this.context = ctxt;
+ this.log = log;
+ this.keepGoing = true;
+ configureAutoProgress(ctxt.getConfiguration());
+ }
+
+ /**
+ * Set configuration parameters for the auto-progress thread.
+ */
+ private void configureAutoProgress(Configuration job) {
+ this.maxProgressPeriod = job.getInt(MAX_PROGRESS_PERIOD_KEY,
+ DEFAULT_MAX_PROGRESS);
+ this.sleepInterval = job.getInt(SLEEP_INTERVAL_KEY,
+ DEFAULT_SLEEP_INTERVAL);
+ this.reportInterval = job.getInt(REPORT_INTERVAL_KEY,
+ DEFAULT_REPORT_INTERVAL);
+
+ if (this.reportInterval < 1) {
+ log.warn("Invalid " + REPORT_INTERVAL_KEY + "; setting to "
+ + DEFAULT_REPORT_INTERVAL);
+ this.reportInterval = DEFAULT_REPORT_INTERVAL;
+ }
+
+ if (this.sleepInterval > this.reportInterval || this.sleepInterval < 1) {
+ log.warn("Invalid " + SLEEP_INTERVAL_KEY + "; setting to "
+ + DEFAULT_SLEEP_INTERVAL);
+ this.sleepInterval = DEFAULT_SLEEP_INTERVAL;
+ }
+
+ if (this.maxProgressPeriod < 0) {
+ log.warn("Invalid " + MAX_PROGRESS_PERIOD_KEY + "; setting to "
+ + DEFAULT_MAX_PROGRESS);
+ this.maxProgressPeriod = DEFAULT_MAX_PROGRESS;
+ }
+ }
+
+ public void signalShutdown() {
+ this.keepGoing = false; // volatile update.
+ this.interrupt();
+ }
+
+ public void run() {
+ this.lastReportMillis = System.currentTimeMillis();
+ this.startTimeMillis = this.lastReportMillis;
+
+ final long MAX_PROGRESS = this.maxProgressPeriod;
+ final long REPORT_INTERVAL = this.reportInterval;
+ final long SLEEP_INTERVAL = this.sleepInterval;
+
+ // In a loop:
+ // * Check that we haven't run for too long (maxProgressPeriod).
+ // * If it's been a report interval since we last made progress,
+ // make more.
+ // * Sleep for a bit.
+ // * If the parent thread has signaled for exit, do so.
+ while (this.keepGoing) {
+ long curTimeMillis = System.currentTimeMillis();
+
+ if (MAX_PROGRESS != 0
+ && curTimeMillis - this.startTimeMillis > MAX_PROGRESS) {
+ this.keepGoing = false;
+ log.info("Auto-progress thread exiting after " + MAX_PROGRESS
+ + " ms.");
+ break;
+ }
+
+ if (curTimeMillis - this.lastReportMillis > REPORT_INTERVAL) {
+ // It's been a full report interval -- claim progress.
+ log.debug("Auto-progress thread reporting progress");
+ this.context.progress();
+ this.lastReportMillis = curTimeMillis;
+ }
+
+ // Unless we got an interrupt while we were working,
+ // sleep a bit before doing more work.
+ if (!Thread.interrupted()) {
+ try {
+ Thread.sleep(SLEEP_INTERVAL);
+ } catch (InterruptedException ie) {
+ // we were notified on something; not necessarily an error.
+ }
+ }
+ }
+ log.info("Auto-progress thread is finished. keepGoing=" + this.keepGoing);
+ }
+}
diff --git a/src/java/org/apache/sqoop/mapreduce/SqoopReducer.java b/src/java/org/apache/sqoop/mapreduce/SqoopReducer.java
new file mode 100644
index 00000000..7c268069
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/SqoopReducer.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.sqoop.util.LoggingUtils;
+
+import java.io.IOException;
+
+/**
+ * Base sqoop reducer class that is convenient place for common functionality.
+ * Other specific reducers are highly encouraged to inherit from this class.
+ */
+public abstract class SqoopReducer
+ extends Reducer {
+
+ @Override
+ protected void setup(Context context)
+ throws IOException, InterruptedException {
+ super.setup(context);
+
+ Configuration configuration = context.getConfiguration();
+
+ // Propagate verbose flag if needed
+ if (configuration.getBoolean(JobBase.PROPERTY_VERBOSE, false)) {
+ LoggingUtils.setDebugLevel();
+ }
+ }
+}
diff --git a/src/java/org/apache/sqoop/util/PostgreSQLUtils.java b/src/java/org/apache/sqoop/util/PostgreSQLUtils.java
new file mode 100644
index 00000000..f87d744d
--- /dev/null
+++ b/src/java/org/apache/sqoop/util/PostgreSQLUtils.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.util;
+
+import java.io.IOException;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.OutputStreamWriter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Utility methods for PostgreSQL import/export.
+ */
+public final class PostgreSQLUtils {
+
+ public static final Log LOG =
+ LogFactory.getLog(PostgreSQLUtils.class.getName());
+
+ private PostgreSQLUtils() {
+ }
+
+ /** Write the user's password to a file that is chmod 0600.
+ @return the filename.
+ */
+ public static String writePasswordFile(String tmpDir, String password)
+ throws IOException {
+ File tempFile = File.createTempFile("pgpass", ".pgpass", new File(tmpDir));
+ LOG.debug("Writing password to tempfile: " + tempFile);
+
+ // Make sure it's only readable by the current user.
+ DirectImportUtils.setFilePermissions(tempFile, "0600");
+
+ // Actually write the password data into the file.
+ BufferedWriter w = new BufferedWriter(
+ new OutputStreamWriter(new FileOutputStream(tempFile)));
+ w.write("*:*:*:*:" + password);
+ w.close();
+ return tempFile.toString();
+ }
+}
diff --git a/src/test/com/cloudera/sqoop/manager/PGBulkloadManagerManualTest.java b/src/test/com/cloudera/sqoop/manager/PGBulkloadManagerManualTest.java
new file mode 100644
index 00000000..fff35dcb
--- /dev/null
+++ b/src/test/com/cloudera/sqoop/manager/PGBulkloadManagerManualTest.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop.manager;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.PreparedStatement;
+import java.util.Arrays;
+import java.util.ArrayList;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import com.cloudera.sqoop.TestExport;
+import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
+
+
+/**
+ * Test the PGBulkloadManager implementations.
+ * PGBulkloadManager uses both JDBC driver and pg_bulkload to facilitate it.
+ *
+ * Since this requires a Postgresql installation on your local machine to use,
+ * this class is named in such a way that Hadoop's default QA process does not
+ * run it.
+ *
+ * You need to run this manually with -Dtestcase=PGBulkloadManagerManualTest.
+ *
+ * You need to put Postgresql's JDBC driver library into lib dir.
+ *
+ * You need to create a sqooptest superuser and database and tablespace,
+ * and install pg_bulkload for sqooptest database:
+ *
+ * $ sudo -u postgres createuser -U postgres -s sqooptest
+ * $ sudo -u postgres createdb -U sqooptest sqooptest
+ * $ sudo -u postgres mkdir /var/pgdata/stagingtablespace
+ * $ psql -U sqooptest
+ * -f /usr/local/share/postgresql/contrib/pg_bulkload.sql sqooptest
+ * $ psql -U sqooptest sqooptest
+ * sqooptest=# CREATE USER sqooptest;
+ * sqooptest=# CREATE DATABASE sqooptest;
+ * sqooptest=# CREATE TABLESPACE sqooptest
+ * LOCATION '/var/pgdata/stagingtablespace';
+ * sqooptest=# \q
+ *
+ */
+public class PGBulkloadManagerManualTest extends TestExport {
+
+ public static final Log LOG =
+ LogFactory.getLog(PGBulkloadManagerManualTest.class.getName());
+ private DBConfiguration dbConf;
+
+
+ public PGBulkloadManagerManualTest() {
+ Configuration conf = getConf();
+ DBConfiguration.configureDB(conf,
+ "org.postgresql.Driver",
+ getConnectString(),
+ getUserName(),
+ null, null);
+ dbConf = new DBConfiguration(conf);
+ }
+
+
+ @Override
+ protected boolean useHsqldbTestServer() {
+ return false;
+ }
+
+
+ @Override
+ protected String getConnectString() {
+ return "jdbc:postgresql://localhost:5432/sqooptest";
+ }
+
+
+ protected String getUserName() {
+ return "sqooptest";
+ }
+
+
+ @Override
+ protected String getTablePrefix() {
+ return super.getTablePrefix().toLowerCase();
+ }
+
+
+ @Override
+ protected String getTableName() {
+ return super.getTableName().toLowerCase();
+ }
+
+ @Override
+ public String getStagingTableName() {
+ return super.getStagingTableName().toLowerCase();
+ }
+
+
+ @Override
+ protected Connection getConnection() {
+ try {
+ Connection conn = dbConf.getConnection();
+ conn.setAutoCommit(false);
+ PreparedStatement stmt =
+ conn.prepareStatement("SET extra_float_digits TO 0");
+ stmt.executeUpdate();
+ conn.commit();
+ return conn;
+ } catch (SQLException sqlE) {
+ LOG.error("Could not get connection to test server: " + sqlE);
+ return null;
+ } catch (ClassNotFoundException cnfE) {
+ LOG.error("Could not find driver class: " + cnfE);
+ return null;
+ }
+ }
+
+
+ @Override
+ protected String getDropTableStatement(String tableName) {
+ return "DROP TABLE IF EXISTS " + tableName;
+ }
+
+
+ @Override
+ protected String[] getArgv(boolean includeHadoopFlags,
+ int rowsPerStatement,
+ int statementsPerTx,
+ String... additionalArgv) {
+ ArrayList args =
+ new ArrayList(Arrays.asList(additionalArgv));
+ args.add("--username");
+ args.add(getUserName());
+ args.add("--connection-manager");
+ args.add("org.apache.sqoop.manager.PGBulkloadManager");
+ args.add("--staging-table");
+ args.add("dummy");
+ args.add("--clear-staging-table");
+ return super.getArgv(includeHadoopFlags,
+ rowsPerStatement,
+ statementsPerTx,
+ args.toArray(new String[0]));
+ }
+
+
+ @Override
+ protected String [] getCodeGenArgv(String... extraArgs) {
+ ArrayList args = new ArrayList(Arrays.asList(extraArgs));
+ args.add("--username");
+ args.add(getUserName());
+ return super.getCodeGenArgv(args.toArray(new String[0]));
+ }
+
+
+ @Override
+ public void testColumnsExport() throws IOException, SQLException {
+ // PGBulkloadManager does not support --columns option.
+ }
+
+
+ public void testMultiReduceExport() throws IOException, SQLException {
+ String[] genericargs = newStrArray(null, "-Dmapred.reduce.tasks=2");
+ multiFileTestWithGenericArgs(2, 10, 2, genericargs);
+ }
+
+
+ public void testExportWithTablespace() throws IOException, SQLException {
+ String[] genericargs =
+ newStrArray(null, "-Dpgbulkload.staging.tablespace=sqooptest");
+ multiFileTestWithGenericArgs(1, 10, 1, genericargs);
+ }
+
+
+ protected void multiFileTestWithGenericArgs(int numFiles,
+ int recordsPerMap,
+ int numMaps,
+ String[] genericargs,
+ String... argv)
+ throws IOException, SQLException {
+
+ final int TOTAL_RECORDS = numFiles * recordsPerMap;
+
+ try {
+ LOG.info("Beginning test: numFiles=" + numFiles + "; recordsPerMap="
+ + recordsPerMap + "; numMaps=" + numMaps);
+ LOG.info(" with genericargs: ");
+ for (String arg : genericargs) {
+ LOG.info(" " + arg);
+ }
+
+ for (int i = 0; i < numFiles; i++) {
+ createTextFile(i, recordsPerMap, false);
+ }
+
+ createTable();
+
+ runExport(getArgv(true, 10, 10,
+ newStrArray(newStrArray(genericargs, argv),
+ "-m", "" + numMaps)));
+ verifyExport(TOTAL_RECORDS);
+ } finally {
+ LOG.info("multi-reduce test complete");
+ }
+ }
+}