From 1749a84f68a55bae050210aea8ed7d41c3d1e44a Mon Sep 17 00:00:00 2001 From: Andrew Bayer Date: Fri, 22 Jul 2011 20:04:10 +0000 Subject: [PATCH] SQOOP-64. Refactor named FIFO creation into standalone utility. Add com.cloudera.sqoop.io.NamedFifo class to represent named FIFO objects. Added TestNamedFifo as unit test. MySQLExportMapper now uses this utility. From: Aaron Kimball git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1149954 13f79535-47bb-0310-9956-ffa450edef68 --- src/java/com/cloudera/sqoop/io/NamedFifo.java | 80 +++++++ .../sqoop/mapreduce/MySQLExportMapper.java | 5 +- src/test/com/cloudera/sqoop/SmokeTests.java | 3 +- .../com/cloudera/sqoop/io/TestNamedFifo.java | 196 ++++++++++++++++++ 4 files changed, 281 insertions(+), 3 deletions(-) create mode 100644 src/java/com/cloudera/sqoop/io/NamedFifo.java create mode 100644 src/test/com/cloudera/sqoop/io/TestNamedFifo.java diff --git a/src/java/com/cloudera/sqoop/io/NamedFifo.java b/src/java/com/cloudera/sqoop/io/NamedFifo.java new file mode 100644 index 00000000..8e3c54d4 --- /dev/null +++ b/src/java/com/cloudera/sqoop/io/NamedFifo.java @@ -0,0 +1,80 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop.io; + +import java.io.File; +import java.io.IOException; + +import org.apache.hadoop.util.Shell; + +/** + * A named FIFO channel. + */ +public class NamedFifo { + + private File fifoFile; + + /** Create a named FIFO object at the local fs path given by 'pathname'. */ + public NamedFifo(String pathname) { + this.fifoFile = new File(pathname); + } + + /** Create a named FIFO object at the local fs path given by the 'fifo' File + * object. */ + public NamedFifo(File fifo) { + this.fifoFile = fifo; + } + + /** + * Return the File object representing the FIFO. + */ + public File getFile() { + return this.fifoFile; + } + + /** + * Create a named FIFO object. + * The pipe will be created with permissions 0600. + * @throws IOException on failure. + */ + public void create() throws IOException { + create(0600); + } + + /** + * Create a named FIFO object with the specified fs permissions. + * This depends on the 'mknod' system utility existing. (for example, + * provided by Linux coreutils). This object will be deleted when + * the process exits. + * @throws IOException on failure. + */ + public void create(int permissions) throws IOException { + String filename = fifoFile.toString(); + + // Format permissions as a mode string in base 8. + String modeStr = Integer.toString(permissions, 8); + + // Create the FIFO itself. + Shell.execCommand("mknod", "--mode=0" + modeStr, filename, "p"); + + // Schedule the FIFO to be cleaned up when we exit. + this.fifoFile.deleteOnExit(); + } +} + diff --git a/src/java/com/cloudera/sqoop/mapreduce/MySQLExportMapper.java b/src/java/com/cloudera/sqoop/mapreduce/MySQLExportMapper.java index 86eb04d8..639a0a80 100644 --- a/src/java/com/cloudera/sqoop/mapreduce/MySQLExportMapper.java +++ b/src/java/com/cloudera/sqoop/mapreduce/MySQLExportMapper.java @@ -29,8 +29,9 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.util.Shell; import org.apache.hadoop.mapreduce.Mapper; + +import com.cloudera.sqoop.io.NamedFifo; import com.cloudera.sqoop.mapreduce.db.DBConfiguration; import com.cloudera.sqoop.manager.MySQLUtils; import com.cloudera.sqoop.shims.HadoopShim; @@ -119,7 +120,7 @@ private void initMySQLImportProcess() throws IOException { // Create the FIFO itself. try { - Shell.execCommand("mknod", "--mode=0600", filename, "p"); + new NamedFifo(this.fifoFile).create(); } catch (IOException ioe) { // Command failed. LOG.error("Could not mknod " + filename); diff --git a/src/test/com/cloudera/sqoop/SmokeTests.java b/src/test/com/cloudera/sqoop/SmokeTests.java index cebd641e..1757ec90 100644 --- a/src/test/com/cloudera/sqoop/SmokeTests.java +++ b/src/test/com/cloudera/sqoop/SmokeTests.java @@ -21,6 +21,7 @@ import com.cloudera.sqoop.hive.TestHiveImport; import com.cloudera.sqoop.hive.TestTableDefWriter; import com.cloudera.sqoop.io.TestLobFile; +import com.cloudera.sqoop.io.TestNamedFifo; import com.cloudera.sqoop.io.TestSplittableBufferedWriter; import com.cloudera.sqoop.lib.TestFieldFormatter; import com.cloudera.sqoop.lib.TestRecordParser; @@ -30,7 +31,6 @@ import com.cloudera.sqoop.manager.TestHsqldbManager; import com.cloudera.sqoop.manager.TestSqlManager; import com.cloudera.sqoop.mapreduce.MapreduceTests; - import com.cloudera.sqoop.metastore.TestSessions; import com.cloudera.sqoop.orm.TestClassWriter; import com.cloudera.sqoop.orm.TestParseMethods; @@ -77,6 +77,7 @@ public static Test suite() { suite.addTestSuite(TestLobFile.class); suite.addTestSuite(TestExportUpdate.class); suite.addTestSuite(TestSessions.class); + suite.addTestSuite(TestNamedFifo.class); suite.addTest(MapreduceTests.suite()); return suite; diff --git a/src/test/com/cloudera/sqoop/io/TestNamedFifo.java b/src/test/com/cloudera/sqoop/io/TestNamedFifo.java new file mode 100644 index 00000000..02e16b6c --- /dev/null +++ b/src/test/com/cloudera/sqoop/io/TestNamedFifo.java @@ -0,0 +1,196 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop.io; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; + +import org.apache.hadoop.util.StringUtils; + +import junit.framework.TestCase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +/** + * Test the named fifo utility. + */ +public class TestNamedFifo extends TestCase { + + public static final Log LOG = LogFactory.getLog( + TestNamedFifo.class.getName()); + + public static final Path TEMP_BASE_DIR; + + static { + String tmpDir = System.getProperty("test.build.data", "/tmp/"); + if (!tmpDir.endsWith(File.separator)) { + tmpDir = tmpDir + File.separator; + } + + TEMP_BASE_DIR = new Path(new Path(tmpDir), "namedfifo"); + } + + private Configuration conf; + private FileSystem fs; + + public void setUp() throws Exception { + conf = new Configuration(); + conf.set("fs.default.name", "file:///"); + + fs = FileSystem.getLocal(conf); + fs.mkdirs(TEMP_BASE_DIR); + } + + static final String MSG = "THIS IS THE MESSAGE\n"; + static final String MSG2 = "Here is a follow-up.\n"; + + private static class ReaderThread extends Thread { + private File file; + private IOException exception; + + public ReaderThread(File f) { + this.file = f; + } + + /** return any exception during the run method. */ + public IOException getException() { + return this.exception; + } + + public void run() { + BufferedReader r = null; + try { + r = new BufferedReader(new InputStreamReader( + new FileInputStream(file))); + + // Assert that after a flush, we get back what we wrote. + String line = r.readLine(); + if (!MSG.trim().equals(line)) { + throw new IOException("Expected " + MSG.trim() + " but got " + + line); + } + + // Assert that after closing the writer, we get back what + // we wrote again. + line = r.readLine(); + if (null == line) { + throw new IOException("line2 was null"); + } else if (!MSG2.trim().equals(line)) { + throw new IOException("Expected " + MSG2.trim() + " but got " + + line); + } + } catch (IOException ioe) { + this.exception = ioe; + } finally { + if (null != r) { + try { + r.close(); + } catch (IOException ioe) { + LOG.warn("Error closing reader: " + ioe); + } + } + } + } + } + + private static class WriterThread extends Thread { + private File file; + private IOException exception; + + public WriterThread(File f) { + this.file = f; + } + + /** return any exception during the run method. */ + public IOException getException() { + return this.exception; + } + + public void run() { + BufferedWriter w = null; + try { + w = new BufferedWriter(new OutputStreamWriter( + new FileOutputStream(file))); + + w.write(MSG); + w.flush(); + + w.write(MSG2); + } catch (IOException ioe) { + this.exception = ioe; + } finally { + if (null != w) { + try { + w.close(); + } catch (IOException ioe) { + LOG.warn("Error closing writer: " + ioe); + } + } + } + } + } + + public void testNamedFifo() throws Exception { + + File root = new File(TEMP_BASE_DIR.toString()); + File fifo = new File(root, "foo-fifo"); + + NamedFifo nf = new NamedFifo(fifo); + nf.create(); + + File returned = nf.getFile(); + + // These should be the same object. + assertEquals(fifo, returned); + + ReaderThread rt = new ReaderThread(returned); + WriterThread wt = new WriterThread(returned); + + rt.start(); + wt.start(); + + rt.join(); + wt.join(); + + IOException rex = rt.getException(); + IOException wex = wt.getException(); + + if (null != rex) { + LOG.error("reader exception: " + StringUtils.stringifyException(rex)); + } + + if (null != wex) { + LOG.error("writer exception: " + StringUtils.stringifyException(wex)); + } + + assertNull(rex); + assertNull(wex); + } +} +