5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-03 20:40:58 +08:00

SQOOP-64. Refactor named FIFO creation into standalone utility.

Add com.cloudera.sqoop.io.NamedFifo class to represent named FIFO objects.
Added TestNamedFifo as unit test.
MySQLExportMapper now uses this utility.

From: Aaron Kimball <aaron@cloudera.com>

git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1149954 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Bayer 2011-07-22 20:04:10 +00:00
parent 6a9eef446b
commit 1749a84f68
4 changed files with 281 additions and 3 deletions

View File

@ -0,0 +1,80 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.sqoop.io;
import java.io.File;
import java.io.IOException;
import org.apache.hadoop.util.Shell;
/**
* A named FIFO channel.
*/
public class NamedFifo {
private File fifoFile;
/** Create a named FIFO object at the local fs path given by 'pathname'. */
public NamedFifo(String pathname) {
this.fifoFile = new File(pathname);
}
/** Create a named FIFO object at the local fs path given by the 'fifo' File
* object. */
public NamedFifo(File fifo) {
this.fifoFile = fifo;
}
/**
* Return the File object representing the FIFO.
*/
public File getFile() {
return this.fifoFile;
}
/**
* Create a named FIFO object.
* The pipe will be created with permissions 0600.
* @throws IOException on failure.
*/
public void create() throws IOException {
create(0600);
}
/**
* Create a named FIFO object with the specified fs permissions.
* This depends on the 'mknod' system utility existing. (for example,
* provided by Linux coreutils). This object will be deleted when
* the process exits.
* @throws IOException on failure.
*/
public void create(int permissions) throws IOException {
String filename = fifoFile.toString();
// Format permissions as a mode string in base 8.
String modeStr = Integer.toString(permissions, 8);
// Create the FIFO itself.
Shell.execCommand("mknod", "--mode=0" + modeStr, filename, "p");
// Schedule the FIFO to be cleaned up when we exit.
this.fifoFile.deleteOnExit();
}
}

View File

@ -29,8 +29,9 @@
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Mapper;
import com.cloudera.sqoop.io.NamedFifo;
import com.cloudera.sqoop.mapreduce.db.DBConfiguration; import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
import com.cloudera.sqoop.manager.MySQLUtils; import com.cloudera.sqoop.manager.MySQLUtils;
import com.cloudera.sqoop.shims.HadoopShim; import com.cloudera.sqoop.shims.HadoopShim;
@ -119,7 +120,7 @@ private void initMySQLImportProcess() throws IOException {
// Create the FIFO itself. // Create the FIFO itself.
try { try {
Shell.execCommand("mknod", "--mode=0600", filename, "p"); new NamedFifo(this.fifoFile).create();
} catch (IOException ioe) { } catch (IOException ioe) {
// Command failed. // Command failed.
LOG.error("Could not mknod " + filename); LOG.error("Could not mknod " + filename);

View File

@ -21,6 +21,7 @@
import com.cloudera.sqoop.hive.TestHiveImport; import com.cloudera.sqoop.hive.TestHiveImport;
import com.cloudera.sqoop.hive.TestTableDefWriter; import com.cloudera.sqoop.hive.TestTableDefWriter;
import com.cloudera.sqoop.io.TestLobFile; import com.cloudera.sqoop.io.TestLobFile;
import com.cloudera.sqoop.io.TestNamedFifo;
import com.cloudera.sqoop.io.TestSplittableBufferedWriter; import com.cloudera.sqoop.io.TestSplittableBufferedWriter;
import com.cloudera.sqoop.lib.TestFieldFormatter; import com.cloudera.sqoop.lib.TestFieldFormatter;
import com.cloudera.sqoop.lib.TestRecordParser; import com.cloudera.sqoop.lib.TestRecordParser;
@ -30,7 +31,6 @@
import com.cloudera.sqoop.manager.TestHsqldbManager; import com.cloudera.sqoop.manager.TestHsqldbManager;
import com.cloudera.sqoop.manager.TestSqlManager; import com.cloudera.sqoop.manager.TestSqlManager;
import com.cloudera.sqoop.mapreduce.MapreduceTests; import com.cloudera.sqoop.mapreduce.MapreduceTests;
import com.cloudera.sqoop.metastore.TestSessions; import com.cloudera.sqoop.metastore.TestSessions;
import com.cloudera.sqoop.orm.TestClassWriter; import com.cloudera.sqoop.orm.TestClassWriter;
import com.cloudera.sqoop.orm.TestParseMethods; import com.cloudera.sqoop.orm.TestParseMethods;
@ -77,6 +77,7 @@ public static Test suite() {
suite.addTestSuite(TestLobFile.class); suite.addTestSuite(TestLobFile.class);
suite.addTestSuite(TestExportUpdate.class); suite.addTestSuite(TestExportUpdate.class);
suite.addTestSuite(TestSessions.class); suite.addTestSuite(TestSessions.class);
suite.addTestSuite(TestNamedFifo.class);
suite.addTest(MapreduceTests.suite()); suite.addTest(MapreduceTests.suite());
return suite; return suite;

View File

@ -0,0 +1,196 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.sqoop.io;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import org.apache.hadoop.util.StringUtils;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
/**
* Test the named fifo utility.
*/
public class TestNamedFifo extends TestCase {
public static final Log LOG = LogFactory.getLog(
TestNamedFifo.class.getName());
public static final Path TEMP_BASE_DIR;
static {
String tmpDir = System.getProperty("test.build.data", "/tmp/");
if (!tmpDir.endsWith(File.separator)) {
tmpDir = tmpDir + File.separator;
}
TEMP_BASE_DIR = new Path(new Path(tmpDir), "namedfifo");
}
private Configuration conf;
private FileSystem fs;
public void setUp() throws Exception {
conf = new Configuration();
conf.set("fs.default.name", "file:///");
fs = FileSystem.getLocal(conf);
fs.mkdirs(TEMP_BASE_DIR);
}
static final String MSG = "THIS IS THE MESSAGE\n";
static final String MSG2 = "Here is a follow-up.\n";
private static class ReaderThread extends Thread {
private File file;
private IOException exception;
public ReaderThread(File f) {
this.file = f;
}
/** return any exception during the run method. */
public IOException getException() {
return this.exception;
}
public void run() {
BufferedReader r = null;
try {
r = new BufferedReader(new InputStreamReader(
new FileInputStream(file)));
// Assert that after a flush, we get back what we wrote.
String line = r.readLine();
if (!MSG.trim().equals(line)) {
throw new IOException("Expected " + MSG.trim() + " but got "
+ line);
}
// Assert that after closing the writer, we get back what
// we wrote again.
line = r.readLine();
if (null == line) {
throw new IOException("line2 was null");
} else if (!MSG2.trim().equals(line)) {
throw new IOException("Expected " + MSG2.trim() + " but got "
+ line);
}
} catch (IOException ioe) {
this.exception = ioe;
} finally {
if (null != r) {
try {
r.close();
} catch (IOException ioe) {
LOG.warn("Error closing reader: " + ioe);
}
}
}
}
}
private static class WriterThread extends Thread {
private File file;
private IOException exception;
public WriterThread(File f) {
this.file = f;
}
/** return any exception during the run method. */
public IOException getException() {
return this.exception;
}
public void run() {
BufferedWriter w = null;
try {
w = new BufferedWriter(new OutputStreamWriter(
new FileOutputStream(file)));
w.write(MSG);
w.flush();
w.write(MSG2);
} catch (IOException ioe) {
this.exception = ioe;
} finally {
if (null != w) {
try {
w.close();
} catch (IOException ioe) {
LOG.warn("Error closing writer: " + ioe);
}
}
}
}
}
public void testNamedFifo() throws Exception {
File root = new File(TEMP_BASE_DIR.toString());
File fifo = new File(root, "foo-fifo");
NamedFifo nf = new NamedFifo(fifo);
nf.create();
File returned = nf.getFile();
// These should be the same object.
assertEquals(fifo, returned);
ReaderThread rt = new ReaderThread(returned);
WriterThread wt = new WriterThread(returned);
rt.start();
wt.start();
rt.join();
wt.join();
IOException rex = rt.getException();
IOException wex = wt.getException();
if (null != rex) {
LOG.error("reader exception: " + StringUtils.stringifyException(rex));
}
if (null != wex) {
LOG.error("writer exception: " + StringUtils.stringifyException(wex));
}
assertNull(rex);
assertNull(wex);
}
}