From 29b29a52f30fae051496f6add85da9bb88adffc6 Mon Sep 17 00:00:00 2001 From: Jarek Jarcec Cecho Date: Mon, 20 Aug 2012 06:21:34 +0000 Subject: [PATCH] SQOOP-390. PostgreSQL connector for direct export with pg_bulkload. (Masatake Iwasaki via Jarek Jarcec Cecho) git-svn-id: https://svn.apache.org/repos/asf/sqoop/trunk@1374923 13f79535-47bb-0310-9956-ffa450edef68 --- src/docs/user/SqoopUserGuide.txt | 3 +- src/docs/user/connectors.txt | 138 +++++++++ .../manager/DirectPostgresqlManager.java | 24 +- .../sqoop/manager/PGBulkloadManager.java | 66 ++++ .../sqoop/mapreduce/AutoProgressMapper.java | 122 +------- .../sqoop/mapreduce/AutoProgressReducer.java | 65 ++++ .../sqoop/mapreduce/PGBulkloadExportJob.java | 207 +++++++++++++ .../mapreduce/PGBulkloadExportMapper.java | 290 ++++++++++++++++++ .../mapreduce/PGBulkloadExportReducer.java | 106 +++++++ .../sqoop/mapreduce/ProgressThread.java | 160 ++++++++++ .../apache/sqoop/mapreduce/SqoopReducer.java | 46 +++ .../apache/sqoop/util/PostgreSQLUtils.java | 58 ++++ .../manager/PGBulkloadManagerManualTest.java | 220 +++++++++++++ 13 files changed, 1363 insertions(+), 142 deletions(-) create mode 100644 src/docs/user/connectors.txt create mode 100644 src/java/org/apache/sqoop/manager/PGBulkloadManager.java create mode 100644 src/java/org/apache/sqoop/mapreduce/AutoProgressReducer.java create mode 100644 src/java/org/apache/sqoop/mapreduce/PGBulkloadExportJob.java create mode 100644 src/java/org/apache/sqoop/mapreduce/PGBulkloadExportMapper.java create mode 100644 src/java/org/apache/sqoop/mapreduce/PGBulkloadExportReducer.java create mode 100644 src/java/org/apache/sqoop/mapreduce/ProgressThread.java create mode 100644 src/java/org/apache/sqoop/mapreduce/SqoopReducer.java create mode 100644 src/java/org/apache/sqoop/util/PostgreSQLUtils.java create mode 100644 src/test/com/cloudera/sqoop/manager/PGBulkloadManagerManualTest.java diff --git a/src/docs/user/SqoopUserGuide.txt b/src/docs/user/SqoopUserGuide.txt index 819c5125..e74cf644 100644 --- a/src/docs/user/SqoopUserGuide.txt +++ b/src/docs/user/SqoopUserGuide.txt @@ -72,8 +72,9 @@ include::version.txt[] include::compatibility.txt[] +include::connectors.txt[] + include::support.txt[] include::troubleshooting.txt[] - diff --git a/src/docs/user/connectors.txt b/src/docs/user/connectors.txt new file mode 100644 index 00000000..a93f14eb --- /dev/null +++ b/src/docs/user/connectors.txt @@ -0,0 +1,138 @@ + +//// + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +//// + + +Notes for specific connectors +----------------------------- + +pg_bulkload connector +~~~~~~~~~~~~~~~~~~~~~ + +Purpose +^^^^^^^ +pg_bulkload connector is a direct connector for exporting data into PostgreSQL. +This connector uses +http://pgbulkload.projects.postgresql.org/index.html[pg_bulkload]. +Users benefit from functionality of pg_bulkload such as +fast exports bypassing shared bufferes and WAL, +flexible error records handling, +and ETL feature with filter functions. + +Requirements +^^^^^^^^^^^^ +pg_bulkload connector requires following conditions for export job execution: + +* The link:http://pgbulkload.projects.postgresql.org/index.html[pg_bulkload] + must be installed on DB server and all slave nodes. + RPM for RedHat or CentOS is available in then + link:http://pgfoundry.org/frs/?group_id=1000261[download page]. +* The link:http://jdbc.postgresql.org/index.html[PostgreSQL JDBC] + is required on client node. +* Superuser role of PostgreSQL database is required for execution of pg_bulkload. + +Syntax +^^^^^^ +Use +--connection-manager+ option to specify connection manager classname. +---- +$ sqoop export (generic-args) --connection-manager org.apache.sqoop.manager.PGBulkloadManager (export-args) +$ sqoop-export (generic-args) --connection-manager org.apache.sqoop.manager.PGBulkloadManager (export-args) +---- + +This connector supports export arguments shown below. + +.Supported export control arguments: +[grid="all"] +`----------------------------------------`--------------------------------------- +Argument Description +--------------------------------------------------------------------------------- ++\--export-dir + HDFS source path for the export ++-m,\--num-mappers + Use 'n' map tasks to export in\ + parallel ++\--table + Table to populate ++\--input-null-string + The string to be interpreted as\ + null for string columns ++\--clear-staging-table+ Indicates that any data present in\ + the staging table can be deleted. +--------------------------------------------------------------------------------- + +There are additional configuration for pg_bulkload execution +specified via Hadoop Configuration properties +which can be given with +-D + option. +Because Hadoop Configuration properties are generic arguments of the sqoop, +it must preceed any export control arguments. + +.Supported export control properties: +[grid="all"] +`-----------------------------`---------------------------------------------- +Property Description +----------------------------------------------------------------------------- +mapred.reduce.tasks Number of reduce tasks for staging. \ + The defalt value is 1. \ + Each tasks do staging in a single transaction. +pgbulkload.bin Path of the pg_bulkoad binary \ + installed on each slave nodes. +pgbulkload.check.constraints Specify whether CHECK constraints are checked \ + during the loading. \ + The default value is YES. +pgbulkload.parse.errors The maximum mumber of ingored records \ + that cause errors during parsing, \ + encoding, filtering, constraints checking, \ + and data type conversion. \ + Error records are recorded \ + in the PARSE BADFILE. \ + The default value is INFINITE. +pgbulkload.duplicate.errors Number of ingored records \ + that violate unique constraints. \ + Duplicated records are recorded in the \ + DUPLICATE BADFILE on DB server. \ + The default value is INFINITE. +pgbulkload.filter Specify the filter function \ + to convert each row in the input file. \ + See the pg_bulkload documentation to know \ + how to write FILTER functions. +----------------------------------------------------------------------------- + +Here is a example of complete command line. +---- +$ sqoop export \ + -Dmapred.reduce.tasks=2 + -Dpgbulkload.bin="/usr/local/bin/pg_bulkload" \ + -Dpgbulkload.input.field.delim=$'\t' \ + -Dpgbulkload.check.constraints="YES" \ + -Dpgbulkload.parse.errors="INFINITE" \ + -Dpgbulkload.duplicate.errors="INFINITE" \ + --connect jdbc:postgresql://pgsql.example.net:5432/sqooptest \ + --connection-manager org.apache.sqoop.manager.PGBulkloadManager \ + --table test --username sqooptest --export-dir=/test -m 2 +---- + +Data Staging +^^^^^^^^^^^^ +Each map tasks of pg_bulkload connector's export job create +their own staging table on the fly. +The Name of staging tables is decided based on the destination table +and the task attempt ids. +For example, the name of staging table for the "test" table is like ++test_attempt_1345021837431_0001_m_000000_0+ . + +Staging tables are automatically dropped if tasks successfully complete +or map tasks fail. +When reduce task fails, +staging table for the task are left for manual retry and +users must take care of it. diff --git a/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java b/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java index 9d79cfeb..a557aa13 100644 --- a/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java +++ b/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java @@ -32,6 +32,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.sqoop.util.PostgreSQLUtils; import com.cloudera.sqoop.SqoopOptions; import com.cloudera.sqoop.io.SplittableBufferedWriter; @@ -293,26 +294,6 @@ private String writeCopyCommand(String command) throws IOException { return tempFile.toString(); } - /** Write the user's password to a file that is chmod 0600. - @return the filename. - */ - private String writePasswordFile(String password) throws IOException { - - String tmpDir = options.getTempDir(); - File tempFile = File.createTempFile("pgpass", ".pgpass", new File(tmpDir)); - LOG.debug("Writing password to tempfile: " + tempFile); - - // Make sure it's only readable by the current user. - DirectImportUtils.setFilePermissions(tempFile, "0600"); - - // Actually write the password data into the file. - BufferedWriter w = new BufferedWriter( - new OutputStreamWriter(new FileOutputStream(tempFile))); - w.write("*:*:*:*:" + password); - w.close(); - return tempFile.toString(); - } - // TODO(aaron): Refactor this method to be much shorter. // CHECKSTYLE:OFF @Override @@ -380,7 +361,8 @@ public void importTable(com.cloudera.sqoop.manager.ImportJobContext context) args.add(username); String password = options.getPassword(); if (null != password) { - passwordFilename = writePasswordFile(password); + passwordFilename = + PostgreSQLUtils.writePasswordFile(options.getTempDir(), password); // Need to send PGPASSFILE environment variable specifying // location of our postgres file. envp.add("PGPASSFILE=" + passwordFilename); diff --git a/src/java/org/apache/sqoop/manager/PGBulkloadManager.java b/src/java/org/apache/sqoop/manager/PGBulkloadManager.java new file mode 100644 index 00000000..92174f8e --- /dev/null +++ b/src/java/org/apache/sqoop/manager/PGBulkloadManager.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.manager; + +import java.io.IOException; +import com.cloudera.sqoop.SqoopOptions; +import com.cloudera.sqoop.manager.ExportJobContext; +import com.cloudera.sqoop.util.ExportException; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.sqoop.mapreduce.ExportInputFormat; +import org.apache.sqoop.mapreduce.PGBulkloadExportJob; + + + +/** + * Manages connections to Postgresql databases. + */ +public class PGBulkloadManager extends PostgresqlManager { + + public static final Log LOG = + LogFactory.getLog(PGBulkloadManager.class.getName()); + + + public PGBulkloadManager(final SqoopOptions opts) { + super(opts, true); + } + + + @Override + public void exportTable(ExportJobContext context) + throws IOException, ExportException { + context.setConnManager(this); + options.setStagingTableName(null); + PGBulkloadExportJob jobbase = + new PGBulkloadExportJob(context, + null, + ExportInputFormat.class, + NullOutputFormat.class); + jobbase.runExport(); + } + + + @Override + public boolean supportsStagingForExport() { + return false; + } + +} diff --git a/src/java/org/apache/sqoop/mapreduce/AutoProgressMapper.java b/src/java/org/apache/sqoop/mapreduce/AutoProgressMapper.java index 4b61321e..95086417 100644 --- a/src/java/org/apache/sqoop/mapreduce/AutoProgressMapper.java +++ b/src/java/org/apache/sqoop/mapreduce/AutoProgressMapper.java @@ -21,7 +21,6 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; /** * Identity mapper that continuously reports progress via a background thread. @@ -32,25 +31,6 @@ public class AutoProgressMapper public static final Log LOG = LogFactory.getLog( AutoProgressMapper.class.getName()); - /** - * Total number of millis for which progress will be reported by the - * auto-progress thread. If this is zero, then the auto-progress thread will - * never voluntarily exit. - */ - private int maxProgressPeriod; - - /** - * Number of milliseconds to sleep for between loop iterations. Must be less - * than report interval. - */ - private int sleepInterval; - - /** - * Number of milliseconds between calls to Reporter.progress(). - * Should be a multiple of the sleepInterval. - */ - private int reportInterval; - public static final String MAX_PROGRESS_PERIOD_KEY = "sqoop.mapred.auto.progress.max"; public static final String SLEEP_INTERVAL_KEY = @@ -67,112 +47,14 @@ public class AutoProgressMapper // Disable max progress, by default. public static final int DEFAULT_MAX_PROGRESS = 0; - private class ProgressThread extends Thread { - - private volatile boolean keepGoing; // While this is true, thread runs. - - private Context context; - private long startTimeMillis; - private long lastReportMillis; - - public ProgressThread(final Context ctxt) { - this.context = ctxt; - this.keepGoing = true; - } - - public void signalShutdown() { - this.keepGoing = false; // volatile update. - this.interrupt(); - } - - public void run() { - this.lastReportMillis = System.currentTimeMillis(); - this.startTimeMillis = this.lastReportMillis; - - final long MAX_PROGRESS = AutoProgressMapper.this.maxProgressPeriod; - final long REPORT_INTERVAL = AutoProgressMapper.this.reportInterval; - final long SLEEP_INTERVAL = AutoProgressMapper.this.sleepInterval; - - // In a loop: - // * Check that we haven't run for too long (maxProgressPeriod). - // * If it's been a report interval since we last made progress, - // make more. - // * Sleep for a bit. - // * If the parent thread has signaled for exit, do so. - while (this.keepGoing) { - long curTimeMillis = System.currentTimeMillis(); - - if (MAX_PROGRESS != 0 - && curTimeMillis - this.startTimeMillis > MAX_PROGRESS) { - this.keepGoing = false; - LOG.info("Auto-progress thread exiting after " + MAX_PROGRESS - + " ms."); - break; - } - - if (curTimeMillis - this.lastReportMillis > REPORT_INTERVAL) { - // It's been a full report interval -- claim progress. - LOG.debug("Auto-progress thread reporting progress"); - this.context.progress(); - this.lastReportMillis = curTimeMillis; - } - - // Unless we got an interrupt while we were working, - // sleep a bit before doing more work. - if (!Thread.interrupted()) { - try { - Thread.sleep(SLEEP_INTERVAL); - } catch (InterruptedException ie) { - // we were notified on something; not necessarily an error. - } - } - } - - LOG.info("Auto-progress thread is finished. keepGoing=" + this.keepGoing); - } - } - - /** - * Set configuration parameters for the auto-progress thread. - */ - private void configureAutoProgress(Configuration job) { - this.maxProgressPeriod = job.getInt(MAX_PROGRESS_PERIOD_KEY, - DEFAULT_MAX_PROGRESS); - this.sleepInterval = job.getInt(SLEEP_INTERVAL_KEY, - DEFAULT_SLEEP_INTERVAL); - this.reportInterval = job.getInt(REPORT_INTERVAL_KEY, - DEFAULT_REPORT_INTERVAL); - - if (this.reportInterval < 1) { - LOG.warn("Invalid " + REPORT_INTERVAL_KEY + "; setting to " - + DEFAULT_REPORT_INTERVAL); - this.reportInterval = DEFAULT_REPORT_INTERVAL; - } - - if (this.sleepInterval > this.reportInterval || this.sleepInterval < 1) { - LOG.warn("Invalid " + SLEEP_INTERVAL_KEY + "; setting to " - + DEFAULT_SLEEP_INTERVAL); - this.sleepInterval = DEFAULT_SLEEP_INTERVAL; - } - - if (this.maxProgressPeriod < 0) { - LOG.warn("Invalid " + MAX_PROGRESS_PERIOD_KEY + "; setting to " - + DEFAULT_MAX_PROGRESS); - this.maxProgressPeriod = DEFAULT_MAX_PROGRESS; - } - } - - // map() method intentionally omitted; Mapper.map() is the identity mapper. - /** * Run the mapping process for this task, wrapped in an auto-progress system. */ @Override public void run(Context context) throws IOException, InterruptedException { - configureAutoProgress(context.getConfiguration()); - ProgressThread thread = this.new ProgressThread(context); + ProgressThread thread = new ProgressThread(context, LOG); try { thread.setDaemon(true); @@ -191,7 +73,7 @@ public void run(Context context) throws IOException, InterruptedException { LOG.debug("Progress thread shutdown detected."); } catch (InterruptedException ie) { LOG.warn("Interrupted when waiting on auto-progress thread: " - + ie.toString()); + + ie.toString(), ie); } } } diff --git a/src/java/org/apache/sqoop/mapreduce/AutoProgressReducer.java b/src/java/org/apache/sqoop/mapreduce/AutoProgressReducer.java new file mode 100644 index 00000000..ee1d8822 --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/AutoProgressReducer.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce; + +import java.io.IOException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Identity reducer that continuously reports progress via a background thread. + */ +public class AutoProgressReducer + extends SqoopReducer { + + public static final Log LOG = LogFactory.getLog( + AutoProgressReducer.class.getName()); + + // reduce() method intentionally omitted; + // Reducer.reduce() is the identity reducer. + + /** + * Run the mapping process for this task, wrapped in an auto-progress system. + */ + @Override + public void run(Context context) throws IOException, InterruptedException { + ProgressThread thread = new ProgressThread(context, LOG); + + try { + thread.setDaemon(true); + thread.start(); + + // use default run() method to actually drive the mapping. + super.run(context); + } finally { + // Tell the progress thread to exit.. + LOG.debug("Instructing auto-progress thread to quit."); + thread.signalShutdown(); + try { + // And wait for that to happen. + LOG.debug("Waiting for progress thread shutdown..."); + thread.join(); + LOG.debug("Progress thread shutdown detected."); + } catch (InterruptedException ie) { + LOG.warn("Interrupted when waiting on auto-progress thread: " + + ie.toString(), ie); + } + } + } +} diff --git a/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportJob.java b/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportJob.java new file mode 100644 index 00000000..f3f094bd --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportJob.java @@ -0,0 +1,207 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce; + +import java.io.IOException; +import com.cloudera.sqoop.manager.ExportJobContext; +import com.cloudera.sqoop.util.ExportException; +import com.cloudera.sqoop.SqoopOptions; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.sqoop.config.ConfigurationHelper; +import org.apache.sqoop.lib.DelimiterSet; +import org.apache.sqoop.manager.ConnManager; +import org.apache.sqoop.mapreduce.db.DBConfiguration; +import org.apache.sqoop.orm.TableClassName; + + +/** + * Class that runs an export job using pg_bulkload in the mapper. + */ +public class PGBulkloadExportJob extends ExportJobBase { + + public static final Log LOG = + LogFactory.getLog(PGBulkloadExportJob.class.getName()); + + + public PGBulkloadExportJob(final ExportJobContext context) { + super(context); + } + + + public PGBulkloadExportJob(final ExportJobContext ctxt, + final Class mapperClass, + final Class inputFormatClass, + final Class outputFormatClass) { + super(ctxt, mapperClass, inputFormatClass, outputFormatClass); + } + + + @Override + protected void configureInputFormat(Job job, String tableName, + String tableClassName, String splitByCol) + throws ClassNotFoundException, IOException { + super.configureInputFormat(job, tableName, tableClassName, splitByCol); + ConnManager mgr = context.getConnManager(); + String username = options.getUsername(); + if (null == username || username.length() == 0) { + DBConfiguration.configureDB(job.getConfiguration(), + mgr.getDriverClass(), + options.getConnectString(), + options.getFetchSize()); + } else { + DBConfiguration.configureDB(job.getConfiguration(), + mgr.getDriverClass(), + options.getConnectString(), + username, options.getPassword(), + options.getFetchSize()); + } + } + + + @Override + protected Class getMapperClass() { + return PGBulkloadExportMapper.class; + } + + + protected Class getReducerClass() { + return PGBulkloadExportReducer.class; + } + + + private void setDelimiter(String prop, char val, Configuration conf) { + switch (val) { + case DelimiterSet.NULL_CHAR: + break; + case '\t': + default: + conf.set(prop, String.valueOf(val)); + } + } + + + @Override + protected void propagateOptionsToJob(Job job) { + super.propagateOptionsToJob(job); + SqoopOptions opts = context.getOptions(); + Configuration conf = job.getConfiguration(); + conf.setIfUnset("pgbulkload.bin", "pg_bulkload"); + if (opts.getNullStringValue() != null) { + conf.set("pgbulkload.null.string", opts.getNullStringValue()); + } + setDelimiter("pgbulkload.input.field.delim", + opts.getInputFieldDelim(), + conf); + setDelimiter("pgbulkload.input.record.delim", + opts.getInputRecordDelim(), + conf); + setDelimiter("pgbulkload.input.enclosedby", + opts.getInputEnclosedBy(), + conf); + setDelimiter("pgbulkload.input.escapedby", + opts.getInputEscapedBy(), + conf); + conf.setBoolean("pgbulkload.input.encloserequired", + opts.isInputEncloseRequired()); + conf.setIfUnset("pgbulkload.check.constraints", "YES"); + conf.setIfUnset("pgbulkload.parse.errors", "INFINITE"); + conf.setIfUnset("pgbulkload.duplicate.errors", "INFINITE"); + conf.set("mapred.jar", context.getJarFile()); + conf.setBoolean("mapred.map.tasks.speculative.execution", false); + conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); + conf.setInt("mapred.map.max.attempts", 1); + conf.setInt("mapred.reduce.max.attempts", 1); + conf.setIfUnset("mapred.reduce.tasks", "1"); + if (context.getOptions().doClearStagingTable()) { + conf.setBoolean("pgbulkload.clear.staging.table", true); + } + } + + + @Override + public void runExport() throws ExportException, IOException { + ConnManager cmgr = context.getConnManager(); + SqoopOptions options = context.getOptions(); + Configuration conf = options.getConf(); + DBConfiguration dbConf = null; + String outputTableName = context.getTableName(); + String tableName = outputTableName; + String tableClassName = + new TableClassName(options).getClassForTable(outputTableName); + + LOG.info("Beginning export of " + outputTableName); + loadJars(conf, context.getJarFile(), tableClassName); + + try { + Job job = new Job(conf); + dbConf = new DBConfiguration(job.getConfiguration()); + dbConf.setOutputTableName(tableName); + configureInputFormat(job, tableName, tableClassName, null); + configureOutputFormat(job, tableName, tableClassName); + configureNumTasks(job); + propagateOptionsToJob(job); + job.setMapperClass(getMapperClass()); + job.setMapOutputKeyClass(LongWritable.class); + job.setMapOutputValueClass(Text.class); + job.setReducerClass(getReducerClass()); + cacheJars(job, context.getConnManager()); + setJob(job); + + boolean success = runJob(job); + if (!success) { + throw new ExportException("Export job failed!"); + } + } catch (InterruptedException ie) { + throw new IOException(ie); + } catch (ClassNotFoundException cnfe) { + throw new IOException(cnfe); + } finally { + unloadJars(); + } + } + + + @Override + protected int configureNumTasks(Job job) throws IOException { + SqoopOptions options = context.getOptions(); + int numMapTasks = options.getNumMappers(); + if (numMapTasks < 1) { + numMapTasks = SqoopOptions.DEFAULT_NUM_MAPPERS; + LOG.warn("Invalid mapper count; using " + numMapTasks + " mappers."); + } + + ConfigurationHelper.setJobNumMaps(job, numMapTasks); + return numMapTasks; + } + + + private void clearStagingTable(DBConfiguration dbConf, String tableName) + throws IOException { + // clearing stagingtable is done each mapper tasks + } +} diff --git a/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportMapper.java b/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportMapper.java new file mode 100644 index 00000000..14d064a5 --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportMapper.java @@ -0,0 +1,290 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.sqoop.lib.SqoopRecord; +import org.apache.hadoop.mapreduce.Mapper.Context; +import org.apache.sqoop.mapreduce.db.DBConfiguration; +import org.apache.sqoop.util.PostgreSQLUtils; +import org.apache.sqoop.util.Executor; +import org.apache.sqoop.util.JdbcUrl; + + +/** + * Mapper that starts a 'pg_bulkload' process and uses that to export rows from + * HDFS to a PostgreSQL database at high speed. + * + * map() methods are actually provided by subclasses that read from + * SequenceFiles (containing existing SqoopRecords) or text files + * (containing delimited lines) and deliver these results to the stream + * used to interface with pg_bulkload. + */ +public class PGBulkloadExportMapper + extends AutoProgressMapper { + private Configuration conf; + private DBConfiguration dbConf; + private Process process; + private OutputStream out; + protected BufferedWriter writer; + private Thread thread; + protected String tmpTableName; + private String tableName; + private String passwordFilename; + + + public PGBulkloadExportMapper() { + } + + + protected void setup(Context context) + throws IOException, InterruptedException { + super.setup(context); + conf = context.getConfiguration(); + dbConf = new DBConfiguration(conf); + tableName = dbConf.getOutputTableName(); + tmpTableName = tableName + "_" + context.getTaskAttemptID().toString(); + + Connection conn = null; + try { + conn = dbConf.getConnection(); + conn.setAutoCommit(false); + if (conf.getBoolean("pgbulkload.clear.staging.table", false)) { + StringBuffer query = new StringBuffer(); + query.append("DROP TABLE IF EXISTS "); + query.append(tmpTableName); + doExecuteUpdate(query.toString()); + } + StringBuffer query = new StringBuffer(); + query.append("CREATE TABLE "); + query.append(tmpTableName); + query.append("(LIKE "); + query.append(tableName); + query.append(" INCLUDING CONSTRAINTS)"); + if (conf.get("pgbulkload.staging.tablespace") != null) { + query.append("TABLESPACE "); + query.append(conf.get("pgbulkload.staging.tablespace")); + } + doExecuteUpdate(query.toString()); + conn.commit(); + } catch (ClassNotFoundException ex) { + LOG.error("Unable to load JDBC driver class", ex); + throw new IOException(ex); + } catch (SQLException ex) { + LOG.error("Unable to execute statement", ex); + throw new IOException(ex); + } finally { + try { + conn.close(); + } catch (SQLException ex) { + LOG.error("Unable to close connection", ex); + } + } + + try { + ArrayList args = new ArrayList(); + List envp = Executor.getCurEnvpStrings(); + args.add(conf.get("pgbulkload.bin", "pg_bulkload")); + args.add("--username=" + + conf.get(DBConfiguration.USERNAME_PROPERTY)); + args.add("--dbname=" + + JdbcUrl.getDatabaseName(conf.get(DBConfiguration.URL_PROPERTY))); + args.add("--host=" + + JdbcUrl.getHostName(conf.get(DBConfiguration.URL_PROPERTY))); + args.add("--port=" + + JdbcUrl.getPort(conf.get(DBConfiguration.URL_PROPERTY))); + args.add("--input=stdin"); + args.add("--output=" + tmpTableName); + args.add("-o"); + args.add("TYPE=CSV"); + args.add("-o"); + args.add("DELIMITER=" + conf.get("pgbulkload.input.field.delim", ",")); + args.add("-o"); + args.add("QUOTE=" + conf.get("pgbulkload.input.enclosedby", "\"")); + args.add("-o"); + args.add("ESCAPE=" + conf.get("pgbulkload.input.escapedby", "\"")); + args.add("-o"); + args.add("CHECK_CONSTRAINTS=" + conf.get("pgbulkload.check.constraints")); + args.add("-o"); + args.add("PARSE_ERRORS=" + conf.get("pgbulkload.parse.errors")); + args.add("-o"); + args.add("DUPLICATE_ERRORS=" + conf.get("pgbulkload.duplicate.errors")); + if (conf.get("pgbulkload.null.string") != null) { + args.add("-o"); + args.add("NULL=" + conf.get("pgbulkload.null.string")); + } + if (conf.get("pgbulkload.filter") != null) { + args.add("-o"); + args.add("FILTER=" + conf.get("pgbulkload.filter")); + } + LOG.debug("Starting pg_bulkload with arguments:"); + for (String arg : args) { + LOG.debug(" " + arg); + } + if (conf.get(DBConfiguration.PASSWORD_PROPERTY) != null) { + String tmpDir = System.getProperty("test.build.data", "/tmp/"); + if (!tmpDir.endsWith(File.separator)) { + tmpDir = tmpDir + File.separator; + } + tmpDir = conf.get("job.local.dir", tmpDir); + passwordFilename = PostgreSQLUtils.writePasswordFile(tmpDir, + conf.get(DBConfiguration.PASSWORD_PROPERTY)); + envp.add("PGPASSFILE=" + passwordFilename); + } + process = Runtime.getRuntime().exec(args.toArray(new String[0]), + envp.toArray(new String[0])); + out = process.getOutputStream(); + writer = new BufferedWriter(new OutputStreamWriter(out)); + thread = new ReadThread(process.getErrorStream()); + thread.start(); + } catch (Exception e) { + cleanup(context); + doExecuteUpdate("DROP TABLE " + tmpTableName); + throw new IOException(e); + } + } + + + public void map(LongWritable key, Writable value, Context context) + throws IOException, InterruptedException { + try { + String str = value.toString(); + if (value instanceof Text) { + writer.write(str, 0, str.length()); + writer.newLine(); + } else if (value instanceof SqoopRecord) { + writer.write(str, 0, str.length()); + } + } catch (Exception e) { + doExecuteUpdate("DROP TABLE " + tmpTableName); + cleanup(context); + throw new IOException(e); + } + } + + + protected void cleanup(Context context) + throws IOException, InterruptedException { + LongWritable taskid = + new LongWritable(context.getTaskAttemptID().getTaskID().getId()); + context.write(taskid, new Text(tmpTableName)); + writer.close(); + out.close(); + try { + thread.join(); + } finally { + // block until the process is done. + int result = 0; + if (null != process) { + while (true) { + try { + result = process.waitFor(); + } catch (InterruptedException ie) { + // interrupted; loop around. + continue; + } + break; + } + } + } + if (null != passwordFilename) { + if (!new File(passwordFilename).delete()) { + LOG.error("Could not remove postgresql password file " + + passwordFilename); + LOG.error("You should remove this file to protect your credentials."); + } + } + } + + + protected int doExecuteUpdate(String query) throws IOException { + Connection conn = null; + try { + conn = dbConf.getConnection(); + conn.setAutoCommit(false); + } catch (ClassNotFoundException ex) { + LOG.error("Unable to load JDBC driver class", ex); + throw new IOException(ex); + } catch (SQLException ex) { + LOG.error("Unable to connect to database", ex); + throw new IOException(ex); + } + Statement stmt = null; + try { + stmt = conn.createStatement(); + int ret = stmt.executeUpdate(query); + conn.commit(); + return ret; + } catch (SQLException ex) { + LOG.error("Unable to execute query: " + query, ex); + throw new IOException(ex); + } finally { + if (stmt != null) { + try { + stmt.close(); + } catch (SQLException ex) { + LOG.error("Unable to close statement", ex); + } + } + try { + conn.close(); + } catch (SQLException ex) { + LOG.error("Unable to close connection", ex); + } + } + } + + + private class ReadThread extends Thread { + private InputStream in; + + ReadThread(InputStream in) { + this.in = in; + } + + public void run() { + BufferedReader reader = new BufferedReader(new InputStreamReader(in)); + String line = null; + try { + while((line = reader.readLine()) != null) { + System.out.println(line); + } + reader.close(); + } catch (Exception e) { + e.printStackTrace(); + } + } + } +} diff --git a/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportReducer.java b/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportReducer.java new file mode 100644 index 00000000..63c52c7b --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportReducer.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.sqoop.mapreduce.db.DBConfiguration; + + +/** + * Reducer for transfering data from temporary table to destination. + * Reducer drops all temporary tables if all data successfully transfered. + * Temporary tables is not dropptd in error case for manual retry. + */ +public class PGBulkloadExportReducer + extends AutoProgressReducer { + + public static final Log LOG = + LogFactory.getLog(PGBulkloadExportReducer.class.getName()); + private Configuration conf; + private DBConfiguration dbConf; + private Connection conn; + private String tableName; + + + protected void setup(Context context) + throws IOException, InterruptedException { + conf = context.getConfiguration(); + dbConf = new DBConfiguration(conf); + tableName = dbConf.getOutputTableName(); + try { + conn = dbConf.getConnection(); + conn.setAutoCommit(false); + } catch (ClassNotFoundException ex) { + LOG.error("Unable to load JDBC driver class", ex); + throw new IOException(ex); + } catch (SQLException ex) { + LOG.error("Unable to connect to database", ex); + throw new IOException(ex); + } + } + + + @Override + public void reduce(LongWritable key, Iterable values, Context context) + throws IOException, InterruptedException { + Statement stmt = null; + try { + stmt = conn.createStatement(); + for (Text value : values) { + int inserted = stmt.executeUpdate("INSERT INTO " + tableName + + " ( SELECT * FROM " + value + " )"); + stmt.executeUpdate("DROP TABLE " + value); + } + conn.commit(); + } catch (SQLException ex) { + LOG.error("Unable to execute create query.", ex); + throw new IOException(ex); + } finally { + if (stmt != null) { + try { + stmt.close(); + } catch (SQLException ex) { + LOG.error("Unable to close statement", ex); + } + } + } + } + + + protected void cleanup(Context context) + throws IOException, InterruptedException { + try { + conn.close(); + } catch (SQLException ex) { + LOG.error("Unable to load JDBC driver class", ex); + throw new IOException(ex); + } + } + +} diff --git a/src/java/org/apache/sqoop/mapreduce/ProgressThread.java b/src/java/org/apache/sqoop/mapreduce/ProgressThread.java new file mode 100644 index 00000000..9d1ff51d --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/ProgressThread.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce; + +import org.apache.commons.logging.Log; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.TaskInputOutputContext; + + +/** + * Run the task process for auto-progress tasks. + */ +public class ProgressThread extends Thread { + + private static Log log = null; + + /** + * Total number of millis for which progress will be reported by the + * auto-progress thread. If this is zero, then the auto-progress thread will + * never voluntarily exit. + */ + private int maxProgressPeriod; + + /** + * Number of milliseconds to sleep for between loop iterations. Must be less + * than report interval. + */ + private int sleepInterval; + + /** + * Number of milliseconds between calls to Reporter.progress(). + * Should be a multiple of the sleepInterval. + */ + private int reportInterval; + + public static final String MAX_PROGRESS_PERIOD_KEY = + "sqoop.mapred.auto.progress.max"; + public static final String SLEEP_INTERVAL_KEY = + "sqoop.mapred.auto.progress.sleep"; + public static final String REPORT_INTERVAL_KEY = + "sqoop.mapred.auto.progress.report"; + + // Sleep for 10 seconds at a time. + public static final int DEFAULT_SLEEP_INTERVAL = 10000; + + // Report progress every 30 seconds. + public static final int DEFAULT_REPORT_INTERVAL = 30000; + + // Disable max progress, by default. + public static final int DEFAULT_MAX_PROGRESS = 0; + + private volatile boolean keepGoing; // While this is true, thread runs. + + private TaskInputOutputContext context; + private long startTimeMillis; + private long lastReportMillis; + + public ProgressThread(final TaskInputOutputContext ctxt, Log log) { + this.context = ctxt; + this.log = log; + this.keepGoing = true; + configureAutoProgress(ctxt.getConfiguration()); + } + + /** + * Set configuration parameters for the auto-progress thread. + */ + private void configureAutoProgress(Configuration job) { + this.maxProgressPeriod = job.getInt(MAX_PROGRESS_PERIOD_KEY, + DEFAULT_MAX_PROGRESS); + this.sleepInterval = job.getInt(SLEEP_INTERVAL_KEY, + DEFAULT_SLEEP_INTERVAL); + this.reportInterval = job.getInt(REPORT_INTERVAL_KEY, + DEFAULT_REPORT_INTERVAL); + + if (this.reportInterval < 1) { + log.warn("Invalid " + REPORT_INTERVAL_KEY + "; setting to " + + DEFAULT_REPORT_INTERVAL); + this.reportInterval = DEFAULT_REPORT_INTERVAL; + } + + if (this.sleepInterval > this.reportInterval || this.sleepInterval < 1) { + log.warn("Invalid " + SLEEP_INTERVAL_KEY + "; setting to " + + DEFAULT_SLEEP_INTERVAL); + this.sleepInterval = DEFAULT_SLEEP_INTERVAL; + } + + if (this.maxProgressPeriod < 0) { + log.warn("Invalid " + MAX_PROGRESS_PERIOD_KEY + "; setting to " + + DEFAULT_MAX_PROGRESS); + this.maxProgressPeriod = DEFAULT_MAX_PROGRESS; + } + } + + public void signalShutdown() { + this.keepGoing = false; // volatile update. + this.interrupt(); + } + + public void run() { + this.lastReportMillis = System.currentTimeMillis(); + this.startTimeMillis = this.lastReportMillis; + + final long MAX_PROGRESS = this.maxProgressPeriod; + final long REPORT_INTERVAL = this.reportInterval; + final long SLEEP_INTERVAL = this.sleepInterval; + + // In a loop: + // * Check that we haven't run for too long (maxProgressPeriod). + // * If it's been a report interval since we last made progress, + // make more. + // * Sleep for a bit. + // * If the parent thread has signaled for exit, do so. + while (this.keepGoing) { + long curTimeMillis = System.currentTimeMillis(); + + if (MAX_PROGRESS != 0 + && curTimeMillis - this.startTimeMillis > MAX_PROGRESS) { + this.keepGoing = false; + log.info("Auto-progress thread exiting after " + MAX_PROGRESS + + " ms."); + break; + } + + if (curTimeMillis - this.lastReportMillis > REPORT_INTERVAL) { + // It's been a full report interval -- claim progress. + log.debug("Auto-progress thread reporting progress"); + this.context.progress(); + this.lastReportMillis = curTimeMillis; + } + + // Unless we got an interrupt while we were working, + // sleep a bit before doing more work. + if (!Thread.interrupted()) { + try { + Thread.sleep(SLEEP_INTERVAL); + } catch (InterruptedException ie) { + // we were notified on something; not necessarily an error. + } + } + } + log.info("Auto-progress thread is finished. keepGoing=" + this.keepGoing); + } +} diff --git a/src/java/org/apache/sqoop/mapreduce/SqoopReducer.java b/src/java/org/apache/sqoop/mapreduce/SqoopReducer.java new file mode 100644 index 00000000..7c268069 --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/SqoopReducer.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.sqoop.util.LoggingUtils; + +import java.io.IOException; + +/** + * Base sqoop reducer class that is convenient place for common functionality. + * Other specific reducers are highly encouraged to inherit from this class. + */ +public abstract class SqoopReducer + extends Reducer { + + @Override + protected void setup(Context context) + throws IOException, InterruptedException { + super.setup(context); + + Configuration configuration = context.getConfiguration(); + + // Propagate verbose flag if needed + if (configuration.getBoolean(JobBase.PROPERTY_VERBOSE, false)) { + LoggingUtils.setDebugLevel(); + } + } +} diff --git a/src/java/org/apache/sqoop/util/PostgreSQLUtils.java b/src/java/org/apache/sqoop/util/PostgreSQLUtils.java new file mode 100644 index 00000000..f87d744d --- /dev/null +++ b/src/java/org/apache/sqoop/util/PostgreSQLUtils.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.util; + +import java.io.IOException; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileOutputStream; +import java.io.OutputStreamWriter; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Utility methods for PostgreSQL import/export. + */ +public final class PostgreSQLUtils { + + public static final Log LOG = + LogFactory.getLog(PostgreSQLUtils.class.getName()); + + private PostgreSQLUtils() { + } + + /** Write the user's password to a file that is chmod 0600. + @return the filename. + */ + public static String writePasswordFile(String tmpDir, String password) + throws IOException { + File tempFile = File.createTempFile("pgpass", ".pgpass", new File(tmpDir)); + LOG.debug("Writing password to tempfile: " + tempFile); + + // Make sure it's only readable by the current user. + DirectImportUtils.setFilePermissions(tempFile, "0600"); + + // Actually write the password data into the file. + BufferedWriter w = new BufferedWriter( + new OutputStreamWriter(new FileOutputStream(tempFile))); + w.write("*:*:*:*:" + password); + w.close(); + return tempFile.toString(); + } +} diff --git a/src/test/com/cloudera/sqoop/manager/PGBulkloadManagerManualTest.java b/src/test/com/cloudera/sqoop/manager/PGBulkloadManagerManualTest.java new file mode 100644 index 00000000..fff35dcb --- /dev/null +++ b/src/test/com/cloudera/sqoop/manager/PGBulkloadManagerManualTest.java @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop.manager; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.PreparedStatement; +import java.util.Arrays; +import java.util.ArrayList; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import com.cloudera.sqoop.TestExport; +import com.cloudera.sqoop.mapreduce.db.DBConfiguration; + + +/** + * Test the PGBulkloadManager implementations. + * PGBulkloadManager uses both JDBC driver and pg_bulkload to facilitate it. + * + * Since this requires a Postgresql installation on your local machine to use, + * this class is named in such a way that Hadoop's default QA process does not + * run it. + * + * You need to run this manually with -Dtestcase=PGBulkloadManagerManualTest. + * + * You need to put Postgresql's JDBC driver library into lib dir. + * + * You need to create a sqooptest superuser and database and tablespace, + * and install pg_bulkload for sqooptest database: + * + * $ sudo -u postgres createuser -U postgres -s sqooptest + * $ sudo -u postgres createdb -U sqooptest sqooptest + * $ sudo -u postgres mkdir /var/pgdata/stagingtablespace + * $ psql -U sqooptest + * -f /usr/local/share/postgresql/contrib/pg_bulkload.sql sqooptest + * $ psql -U sqooptest sqooptest + * sqooptest=# CREATE USER sqooptest; + * sqooptest=# CREATE DATABASE sqooptest; + * sqooptest=# CREATE TABLESPACE sqooptest + * LOCATION '/var/pgdata/stagingtablespace'; + * sqooptest=# \q + * + */ +public class PGBulkloadManagerManualTest extends TestExport { + + public static final Log LOG = + LogFactory.getLog(PGBulkloadManagerManualTest.class.getName()); + private DBConfiguration dbConf; + + + public PGBulkloadManagerManualTest() { + Configuration conf = getConf(); + DBConfiguration.configureDB(conf, + "org.postgresql.Driver", + getConnectString(), + getUserName(), + null, null); + dbConf = new DBConfiguration(conf); + } + + + @Override + protected boolean useHsqldbTestServer() { + return false; + } + + + @Override + protected String getConnectString() { + return "jdbc:postgresql://localhost:5432/sqooptest"; + } + + + protected String getUserName() { + return "sqooptest"; + } + + + @Override + protected String getTablePrefix() { + return super.getTablePrefix().toLowerCase(); + } + + + @Override + protected String getTableName() { + return super.getTableName().toLowerCase(); + } + + @Override + public String getStagingTableName() { + return super.getStagingTableName().toLowerCase(); + } + + + @Override + protected Connection getConnection() { + try { + Connection conn = dbConf.getConnection(); + conn.setAutoCommit(false); + PreparedStatement stmt = + conn.prepareStatement("SET extra_float_digits TO 0"); + stmt.executeUpdate(); + conn.commit(); + return conn; + } catch (SQLException sqlE) { + LOG.error("Could not get connection to test server: " + sqlE); + return null; + } catch (ClassNotFoundException cnfE) { + LOG.error("Could not find driver class: " + cnfE); + return null; + } + } + + + @Override + protected String getDropTableStatement(String tableName) { + return "DROP TABLE IF EXISTS " + tableName; + } + + + @Override + protected String[] getArgv(boolean includeHadoopFlags, + int rowsPerStatement, + int statementsPerTx, + String... additionalArgv) { + ArrayList args = + new ArrayList(Arrays.asList(additionalArgv)); + args.add("--username"); + args.add(getUserName()); + args.add("--connection-manager"); + args.add("org.apache.sqoop.manager.PGBulkloadManager"); + args.add("--staging-table"); + args.add("dummy"); + args.add("--clear-staging-table"); + return super.getArgv(includeHadoopFlags, + rowsPerStatement, + statementsPerTx, + args.toArray(new String[0])); + } + + + @Override + protected String [] getCodeGenArgv(String... extraArgs) { + ArrayList args = new ArrayList(Arrays.asList(extraArgs)); + args.add("--username"); + args.add(getUserName()); + return super.getCodeGenArgv(args.toArray(new String[0])); + } + + + @Override + public void testColumnsExport() throws IOException, SQLException { + // PGBulkloadManager does not support --columns option. + } + + + public void testMultiReduceExport() throws IOException, SQLException { + String[] genericargs = newStrArray(null, "-Dmapred.reduce.tasks=2"); + multiFileTestWithGenericArgs(2, 10, 2, genericargs); + } + + + public void testExportWithTablespace() throws IOException, SQLException { + String[] genericargs = + newStrArray(null, "-Dpgbulkload.staging.tablespace=sqooptest"); + multiFileTestWithGenericArgs(1, 10, 1, genericargs); + } + + + protected void multiFileTestWithGenericArgs(int numFiles, + int recordsPerMap, + int numMaps, + String[] genericargs, + String... argv) + throws IOException, SQLException { + + final int TOTAL_RECORDS = numFiles * recordsPerMap; + + try { + LOG.info("Beginning test: numFiles=" + numFiles + "; recordsPerMap=" + + recordsPerMap + "; numMaps=" + numMaps); + LOG.info(" with genericargs: "); + for (String arg : genericargs) { + LOG.info(" " + arg); + } + + for (int i = 0; i < numFiles; i++) { + createTextFile(i, recordsPerMap, false); + } + + createTable(); + + runExport(getArgv(true, 10, 10, + newStrArray(newStrArray(genericargs, argv), + "-m", "" + numMaps))); + verifyExport(TOTAL_RECORDS); + } finally { + LOG.info("multi-reduce test complete"); + } + } +}