5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-05 01:51:37 +08:00

SQOOP-762 Improve logging in mapreduce execution engine

(Jarek Jarcec Cecho)
This commit is contained in:
Bilung Lee 2012-12-13 17:33:29 -08:00 committed by Jarek Jarcec Cecho
parent 5375fc8e99
commit 993c6e42af
7 changed files with 68 additions and 7 deletions

View File

@ -139,9 +139,19 @@ public void write(DataOutput out) throws IOException {
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("{");
sb.append(files[0]);
for(int i = 1; i < files.length; i++) {
sb.append(", " + files[i]);
boolean first = true;
for(int i = 0; i < files.length; i++) {
if(first) {
first = false;
} else {
sb.append(", ");
}
sb.append(files[i]);
sb.append(" (offset=").append(offsets[i]);
sb.append(", end=").append(offsets[i] + lengths[i]);
sb.append(", length=").append(lengths[i]);
sb.append(")");
}
sb.append("}");
return sb.toString();

View File

@ -19,6 +19,8 @@
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
@ -32,6 +34,9 @@
public class HdfsSequenceExportExtractor extends Extractor {
public static final Log LOG =
LogFactory.getLog(HdfsSequenceExportExtractor.class.getName());
private Configuration conf;
private DataWriter datawriter;
@ -51,6 +56,7 @@ public void run(ImmutableContext context, Object connectionConfiguration,
try {
HdfsExportPartition p = (HdfsExportPartition)partition;
LOG.info("Working on partition: " + p);
int numFiles = p.getNumberOfFiles();
for (int i=0; i<numFiles; i++) {
extractFile(p.getFile(i), p.getOffset(i), p.getLength(i));
@ -60,9 +66,13 @@ public void run(ImmutableContext context, Object connectionConfiguration,
}
}
private void extractFile(Path file, long offset, long length)
private void extractFile(Path file, long start, long length)
throws IOException {
long start = offset;
long end = start + length;
LOG.info("Extracting file " + file);
LOG.info("\t from offset " + start);
LOG.info("\t to offset " + end);
LOG.info("\t of length " + length);
SequenceFile.Reader filereader = new SequenceFile.Reader(conf,
SequenceFile.Reader.file(file));

View File

@ -20,6 +20,8 @@
import java.io.IOException;
import java.nio.charset.Charset;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
@ -40,6 +42,9 @@
public class HdfsTextExportExtractor extends Extractor {
public static final Log LOG =
LogFactory.getLog(HdfsTextExportExtractor.class.getName());
private Configuration conf;
private DataWriter datawriter;
@ -59,6 +64,7 @@ public void run(ImmutableContext context, Object connectionConfiguration,
try {
HdfsExportPartition p = (HdfsExportPartition)partition;
LOG.info("Working on partition: " + p);
int numFiles = p.getNumberOfFiles();
for (int i=0; i<numFiles; i++) {
extractFile(p.getFile(i), p.getOffset(i), p.getLength(i));
@ -68,10 +74,13 @@ public void run(ImmutableContext context, Object connectionConfiguration,
}
}
private void extractFile(Path file, long offset, long length)
private void extractFile(Path file, long start, long length)
throws IOException {
long start = offset;
long end = start + length;
LOG.info("Extracting file " + file);
LOG.info("\t from offset " + start);
LOG.info("\t to offset " + end);
LOG.info("\t of length " + length);
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream filestream = fs.open(file);
@ -129,6 +138,7 @@ private void extractFile(Path file, long offset, long length)
datawriter.writeCsvRecord(line.toString());
}
LOG.info("Extracting ended on position: " + fileseeker.getPos());
}
@Override

View File

@ -54,6 +54,7 @@ public static void executeDestroyer(boolean success, Configuration configuration
Object configConnection = ConfigurationUtils.getConnectorConnection(configuration);
Object configJob = ConfigurationUtils.getConnectorJob(configuration);
LOG.info("Executing destroyer class " + destroyer.getClass());
destroyer.destroy(success, subContext, configConnection, configJob);
}

View File

@ -74,8 +74,10 @@ public void run(Context context) throws IOException, InterruptedException {
SqoopSplit split = context.getCurrentKey();
try {
LOG.info("Running extractor class " + extractorName);
extractor.run(subContext, configConnection, configJob, split.getPartition(),
new MapDataWriter(context));
LOG.info("Extractor has finished");
context.getCounter(SqoopCounters.ROWS_READ)
.increment(extractor.getRowsRead());
} catch (Exception e) {

View File

@ -85,10 +85,12 @@ public void write(Data key, NullWritable value) throws InterruptedException {
@Override
public void close(TaskAttemptContext context)
throws InterruptedException, IOException {
LOG.info("SqoopOutputFormatLoadExecutor::SqoopRecordWriter is about to be closed");
free.acquire();
writerFinished = true;
filled.release();
waitForConsumer();
LOG.info("SqoopOutputFormatLoadExecutor::SqoopRecordWriter is closed");
}
}
@ -194,7 +196,9 @@ public void run() {
}
try {
LOG.info("Running loader class " + loaderName);
loader.load(subContext, configConnection, configJob, reader);
LOG.info("Loader has finished");
} catch (Throwable t) {
readerFinished = true;
LOG.error("Error while loading data out of MR job.", t);

View File

@ -0,0 +1,24 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Set root logger level to DEBUG and its only appender to A1.
log4j.rootLogger=DEBUG, A1
# A1 is set to be a ConsoleAppender.
log4j.appender.A1=org.apache.log4j.ConsoleAppender
# A1 uses PatternLayout.
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n