5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-16 00:41:23 +08:00

SQOOP-1375: Sqoop2: From/To: Create HDFS connector

This commit is contained in:
Gwen Shapira 2014-08-20 13:23:41 -07:00 committed by Abraham Elmahrek
parent 71279480e8
commit 5c29a2a291
57 changed files with 1337 additions and 687 deletions

View File

@ -369,7 +369,7 @@ public MJob newJob(long fromXid, long toXid) {
fromConnection.getPersistenceId(),
toConnection.getPersistenceId(),
getConnector(fromConnection.getConnectorId()).getJobForms(Direction.FROM),
getConnector(fromConnection.getConnectorId()).getJobForms(Direction.TO),
getConnector(toConnection.getConnectorId()).getJobForms(Direction.TO),
getFramework().getJobForms()
);
}

View File

@ -106,5 +106,71 @@ limitations under the License.
</plugin>
</plugins>
</build>
<!-- Profiles for various supported Hadoop distributions -->
<profiles>
<!-- Hadoop 1.x -->
<profile>
<id>hadoop100</id>
<activation>
<property>
<name>hadoop.profile</name>
<value>100</value>
</property>
</activation>
<dependencies>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</profile>
<!-- Hadoop 2.x (active by default) -->
<profile>
<id>hadoop200</id>
<activation>
<activeByDefault>true</activeByDefault>
<property>
<name>hadoop.profile</name>
<value>200</value>
</property>
</activation>
<properties>
<hadoop.profile>200</hadoop.profile>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</profile>
</profiles>
</project>

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.job;
package org.apache.sqoop.common;
import org.apache.hadoop.conf.Configuration;
import org.apache.sqoop.common.ImmutableContext;

View File

@ -0,0 +1,83 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.sqoop</groupId>
<artifactId>connector</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<groupId>org.apache.sqoop.connector</groupId>
<artifactId>sqoop-connector-hdfs</artifactId>
<name>Sqoop HDFS Connector</name>
<!-- TODO: Hardcoding Hadoop200 for now -->
<dependencies>
<dependency>
<groupId>org.apache.sqoop</groupId>
<artifactId>sqoop-spi</artifactId>
</dependency>
<dependency>
<groupId>org.apache.sqoop</groupId>
<artifactId>connector-sdk</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<finalName>sqoop</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,132 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.hdfs;
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.common.VersionInfo;
import org.apache.sqoop.connector.hdfs.configuration.ConnectionConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
import org.apache.sqoop.connector.spi.MetadataUpgrader;
import org.apache.sqoop.connector.spi.SqoopConnector;
import org.apache.sqoop.job.etl.From;
import org.apache.sqoop.job.etl.To;
import org.apache.sqoop.validation.Validator;
import java.util.Locale;
import java.util.ResourceBundle;
public class HdfsConnector extends SqoopConnector {
private static final From FROM = new From(
HdfsInitializer.class,
HdfsPartitioner.class,
HdfsExtractor.class,
HdfsDestroyer.class);
private static final To TO = new To(
HdfsInitializer.class,
HdfsLoader.class,
HdfsDestroyer.class);
private static final HdfsValidator hdfsValidator = new HdfsValidator();
/**
* Retrieve connector version.
*
* @return Version encoded as a string
*/
@Override
public String getVersion() {
return VersionInfo.getVersion();
}
/**
* @param locale
* @return the resource bundle associated with the given locale.
*/
@Override
public ResourceBundle getBundle(Locale locale) {
return ResourceBundle.getBundle(
HdfsConstants.RESOURCE_BUNDLE_NAME, locale);
}
/**
* @return Get connection configuration class
*/
@Override
public Class getConnectionConfigurationClass() {
return ConnectionConfiguration.class;
}
/**
* @param jobType
* @return Get job configuration class for given type or null if not supported
*/
@Override
public Class getJobConfigurationClass(Direction jobType) {
switch (jobType) {
case FROM:
return FromJobConfiguration.class;
case TO:
return ToJobConfiguration.class;
default:
return null;
}
}
/**
* @return an <tt>From</tt> that provides classes for performing import.
*/
@Override
public From getFrom() {
return FROM;
}
/**
* @return an <tt>To</tt> that provides classes for performing export.
*/
@Override
public To getTo() {
return TO;
}
/**
* Returns validation object that Sqoop framework can use to validate user
* supplied forms before accepting them. This object will be used both for
* connection and job forms.
*
* @return Validator object
*/
@Override
public Validator getValidator() {
return hdfsValidator;
}
/**
* Returns an {@linkplain org.apache.sqoop.connector.spi.MetadataUpgrader} object that can upgrade the
* connection and job metadata.
*
* @return MetadataUpgrader object
*/
@Override
public MetadataUpgrader getMetadataUpgrader() {
return null;
}
}

View File

@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.hdfs;
import org.apache.sqoop.common.ErrorCode;
public enum HdfsConnectorError implements ErrorCode{
/** Error occurs during partitioner run */
GENERIC_HDFS_CONNECTOR_0000("Error occurs during partitioner run"),
/** Error occurs during extractor run */
GENERIC_HDFS_CONNECTOR_0001("Error occurs during extractor run"),
/** Unsupported output format type found **/
GENERIC_HDFS_CONNECTOR_0002("Unknown output format type"),
/** The system was unable to load the specified class. */
GENERIC_HDFS_CONNECTOR_0003("Unable to load the specified class"),
/** The system was unable to instantiate the specified class. */
GENERIC_HDFS_CONNECTOR_0004("Unable to instantiate the specified class"),
/** Error occurs during loader run */
GENERIC_HDFS_CONNECTOR_0005("Error occurs during loader run")
;
private final String message;
private HdfsConnectorError(String message) {
this.message = message;
}
public String getCode() {
return name();
}
public String getMessage() {
return message;
}
}

View File

@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.hdfs;
import org.apache.sqoop.job.Constants;
public final class HdfsConstants extends Constants {
// Resource bundle name
public static final String RESOURCE_BUNDLE_NAME =
"hdfs-connector-resources";
public static final char DEFAULT_RECORD_DELIMITER = '\n';
}

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.hdfs;
import org.apache.sqoop.job.etl.Destroyer;
import org.apache.sqoop.job.etl.DestroyerContext;
public class HdfsDestroyer extends Destroyer {
/**
* Callback to clean up after job execution.
*
* @param context Destroyer context
* @param o Connection configuration object
* @param o2 Job configuration object
*/
@Override
public void destroy(DestroyerContext context, Object o, Object o2) {
//TODO: Add a "success" flag?
}
}

View File

@ -0,0 +1,199 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.hdfs;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.util.LineReader;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.hdfs.configuration.ConnectionConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
import org.apache.sqoop.etl.io.DataWriter;
import org.apache.sqoop.job.etl.Extractor;
import org.apache.sqoop.job.etl.ExtractorContext;
import org.apache.log4j.Logger;
import org.apache.hadoop.conf.Configuration;
import org.apache.sqoop.common.PrefixContext;
import java.io.IOException;
/**
* Extract from HDFS.
* Default field delimiter of a record is comma.
*/
public class HdfsExtractor extends Extractor<ConnectionConfiguration, FromJobConfiguration, HdfsPartition> {
public static final Logger LOG = Logger.getLogger(HdfsExtractor.class);
private Configuration conf;
private DataWriter dataWriter;
private long rowRead = 0;
@Override
public void extract(ExtractorContext context,
ConnectionConfiguration connectionConfiguration,
FromJobConfiguration jobConfiguration, HdfsPartition partition) {
conf = ((PrefixContext) context.getContext()).getConfiguration();
dataWriter = context.getDataWriter();
try {
HdfsPartition p = partition;
LOG.info("Working on partition: " + p);
int numFiles = p.getNumberOfFiles();
for (int i = 0; i < numFiles; i++) {
extractFile(p.getFile(i), p.getOffset(i), p.getLength(i));
}
} catch (IOException e) {
throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0001, e);
}
}
private void extractFile(Path file, long start, long length)
throws IOException {
long end = start + length;
LOG.info("Extracting file " + file);
LOG.info("\t from offset " + start);
LOG.info("\t to offset " + end);
LOG.info("\t of length " + length);
if(isSequenceFile(file)) {
extractSequenceFile(file, start, length);
} else {
extractTextFile(file, start, length);
}
}
/**
* Extracts Sequence file
* @param file
* @param start
* @param length
* @throws IOException
*/
private void extractSequenceFile(Path file, long start, long length)
throws IOException {
LOG.info("Extracting sequence file");
long end = start + length;
SequenceFile.Reader filereader = new SequenceFile.Reader(
file.getFileSystem(conf), file, conf);
if (start > filereader.getPosition()) {
filereader.sync(start); // sync to start
}
Text line = new Text();
boolean hasNext = filereader.next(line);
while (hasNext) {
rowRead++;
dataWriter.writeStringRecord(line.toString());
line = new Text();
hasNext = filereader.next(line);
if (filereader.getPosition() >= end && filereader.syncSeen()) {
break;
}
}
filereader.close();
}
/**
* Extracts Text file
* @param file
* @param start
* @param length
* @throws IOException
*/
private void extractTextFile(Path file, long start, long length)
throws IOException {
LOG.info("Extracting text file");
long end = start + length;
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream filestream = fs.open(file);
CompressionCodec codec = (new CompressionCodecFactory(conf)).getCodec(file);
LineReader filereader;
Seekable fileseeker = filestream;
// Hadoop 1.0 does not have support for custom record delimiter and thus
// we
// are supporting only default one.
// We might add another "else if" case for SplittableCompressionCodec once
// we drop support for Hadoop 1.0.
if (codec == null) {
filestream.seek(start);
filereader = new LineReader(filestream);
} else {
filereader = new LineReader(codec.createInputStream(filestream,
codec.createDecompressor()), conf);
fileseeker = filestream;
}
if (start != 0) {
// always throw away first record because
// one extra line is read in previous split
start += filereader.readLine(new Text(), 0);
}
int size;
LOG.info("Start position: " + String.valueOf(start));
long next = start;
while (next <= end) {
Text line = new Text();
size = filereader.readLine(line, Integer.MAX_VALUE);
if (size == 0) {
break;
}
if (codec == null) {
next += size;
} else {
next = fileseeker.getPos();
}
rowRead++;
dataWriter.writeStringRecord(line.toString());
}
LOG.info("Extracting ended on position: " + fileseeker.getPos());
filestream.close();
}
@Override
public long getRowsRead() {
return rowRead;
}
/**
* Returns true if given file is sequence
* @param file
* @return boolean
*/
private boolean isSequenceFile(Path file) {
SequenceFile.Reader filereader = null;
try {
filereader = new SequenceFile.Reader(file.getFileSystem(conf), file, conf);
filereader.close();
} catch (IOException e) {
return false;
}
return true;
}
}

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.hdfs;
import org.apache.sqoop.job.etl.Initializer;
import org.apache.sqoop.job.etl.InitializerContext;
import org.apache.sqoop.schema.Schema;
public class HdfsInitializer extends Initializer {
/**
* Initialize new submission based on given configuration properties. Any
* needed temporary values might be saved to context object and they will be
* promoted to all other part of the workflow automatically.
*
* @param context Initializer context object
* @param connection Connector's connection configuration object
* @param job Connector's job configuration object
*/
@Override
public void initialize(InitializerContext context, Object connection, Object job) {
}
@Override
public Schema getSchema(InitializerContext context, Object connection, Object job) {
return null;
}
}

View File

@ -0,0 +1,140 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.hdfs;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.hdfs.configuration.ConnectionConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.OutputFormat;
import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
import org.apache.sqoop.connector.hdfs.hdfsWriter.GenericHdfsWriter;
import org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsSequenceWriter;
import org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsTextWriter;
import org.apache.sqoop.etl.io.DataReader;
import org.apache.sqoop.job.etl.Loader;
import org.apache.sqoop.job.etl.LoaderContext;
import org.apache.sqoop.utils.ClassUtils;
import java.io.IOException;
import java.util.UUID;
public class HdfsLoader extends Loader<ConnectionConfiguration, ToJobConfiguration> {
/**
* Load data to target.
*
* @param context Loader context object
* @param connection Connection configuration
* @param job Job configuration
* @throws Exception
*/
@Override
public void load(LoaderContext context, ConnectionConfiguration connection, ToJobConfiguration job) throws Exception {
DataReader reader = context.getDataReader();
Configuration conf = new Configuration();
String directoryName = job.output.outputDirectory;
String codecname = getCompressionCodecName(job);
CompressionCodec codec = null;
if (codecname != null) {
Class<?> clz = ClassUtils.loadClass(codecname);
if (clz == null) {
throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0003, codecname);
}
try {
codec = (CompressionCodec) clz.newInstance();
if (codec instanceof Configurable) {
((Configurable) codec).setConf(conf);
}
} catch (Exception e) {
throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0004, codecname, e);
}
}
String filename = directoryName + "/" + UUID.randomUUID() + getExtension(job,codec);
try {
Path filepath = new Path(filename);
GenericHdfsWriter filewriter = getWriter(job);
filewriter.initialize(filepath,conf,codec);
String csv;
while ((csv = reader.readTextRecord()) != null) {
filewriter.write(csv);
}
filewriter.destroy();
} catch (IOException e) {
throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0005, e);
}
}
private GenericHdfsWriter getWriter(ToJobConfiguration job) {
if (job.output.outputFormat == OutputFormat.SEQUENCE_FILE)
return new HdfsSequenceWriter();
else
return new HdfsTextWriter();
}
private String getCompressionCodecName(ToJobConfiguration jobConf) {
if(jobConf.output.compression == null)
return null;
switch(jobConf.output.compression) {
case NONE:
return null;
case DEFAULT:
return "org.apache.hadoop.io.compress.DefaultCodec";
case DEFLATE:
return "org.apache.hadoop.io.compress.DeflateCodec";
case GZIP:
return "org.apache.hadoop.io.compress.GzipCodec";
case BZIP2:
return "org.apache.hadoop.io.compress.BZip2Codec";
case LZO:
return "com.hadoop.compression.lzo.LzoCodec";
case LZ4:
return "org.apache.hadoop.io.compress.Lz4Codec";
case SNAPPY:
return "org.apache.hadoop.io.compress.SnappyCodec";
case CUSTOM:
return jobConf.output.customCompression.trim();
}
return null;
}
//TODO: We should probably support configurable extensions at some point
private static String getExtension(ToJobConfiguration job, CompressionCodec codec) {
if (job.output.outputFormat == OutputFormat.SEQUENCE_FILE)
return ".seq";
if (codec == null)
return ".txt";
return codec.getDefaultExtension();
}
}

View File

@ -16,19 +16,20 @@
* limitations under the License.
*/
package org.apache.sqoop.job.etl;
package org.apache.sqoop.connector.hdfs;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.sqoop.job.etl.Partition;
/**
* This class derives mostly from CombineFileSplit of Hadoop, i.e.
* org.apache.hadoop.mapreduce.lib.input.CombineFileSplit.
*/
public class HdfsExportPartition extends Partition {
public class HdfsPartition extends Partition {
private long lenFiles;
private int numFiles;
@ -37,10 +38,10 @@ public class HdfsExportPartition extends Partition {
private long[] lengths;
private String[] locations;
public HdfsExportPartition() {}
public HdfsPartition() {}
public HdfsExportPartition(Path[] files, long[] offsets,
long[] lengths, String[] locations) {
public HdfsPartition(Path[] files, long[] offsets,
long[] lengths, String[] locations) {
for(long length : lengths) {
this.lenFiles += length;
}

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.sqoop.job.etl;
package org.apache.sqoop.connector.hdfs;
import java.io.IOException;
import java.util.ArrayList;
@ -39,15 +39,18 @@
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.MapreduceExecutionError;
import org.apache.sqoop.job.PrefixContext;
import org.apache.sqoop.connector.hdfs.configuration.ConnectionConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
import org.apache.sqoop.job.etl.Partition;
import org.apache.sqoop.job.etl.Partitioner;
import org.apache.sqoop.job.etl.PartitionerContext;
import org.apache.sqoop.common.PrefixContext;
/**
* This class derives mostly from CombineFileInputFormat of Hadoop, i.e.
* org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.
*/
public class HdfsExportPartitioner extends Partitioner {
public class HdfsPartitioner extends Partitioner<ConnectionConfiguration, FromJobConfiguration> {
public static final String SPLIT_MINSIZE_PERNODE =
"mapreduce.input.fileinputformat.split.minsize.per.node";
@ -65,12 +68,12 @@ public class HdfsExportPartitioner extends Partitioner {
@Override
public List<Partition> getPartitions(PartitionerContext context,
Object connectionConfiguration, Object jobConfiguration) {
ConnectionConfiguration connectionConfiguration, FromJobConfiguration jobConfiguration) {
Configuration conf = ((PrefixContext)context.getContext()).getConfiguration();
try {
long numInputBytes = getInputSize(conf);
long numInputBytes = getInputSize(conf, jobConfiguration.input.inputDirectory);
maxSplitSize = numInputBytes / context.getMaxPartitions();
if(numInputBytes % context.getMaxPartitions() != 0 ) {
@ -115,7 +118,7 @@ public List<Partition> getPartitions(PartitionerContext context,
}
// all the files in input set
String indir = conf.get(JobConstants.HADOOP_INPUTDIR);
String indir = jobConfiguration.input.inputDirectory;
FileSystem fs = FileSystem.get(conf);
List<Path> paths = new LinkedList<Path>();
@ -140,12 +143,12 @@ public List<Partition> getPartitions(PartitionerContext context,
return partitions;
} catch (IOException e) {
throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0021, e);
throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0000, e);
}
}
private long getInputSize(Configuration conf) throws IOException {
String indir = conf.get(JobConstants.HADOOP_INPUTDIR);
//TODO: Perhaps get the FS from connection configuration so we can support remote HDFS
private long getInputSize(Configuration conf, String indir) throws IOException {
FileSystem fs = FileSystem.get(conf);
FileStatus[] files = fs.listStatus(new Path(indir));
long count = 0;
@ -372,7 +375,7 @@ private void addCreatedSplit(List<Partition> partitions,
}
// add this split to the list that is returned
HdfsExportPartition partition = new HdfsExportPartition(
HdfsPartition partition = new HdfsPartition(
files, offsets, lengths, locations.toArray(new String[0]));
partitions.add(partition);
}

View File

@ -0,0 +1,89 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.hdfs;
import org.apache.sqoop.connector.hdfs.configuration.*;
import org.apache.sqoop.validation.Status;
import org.apache.sqoop.validation.Validation;
import org.apache.sqoop.validation.Validator;
/**
* Validate framework configuration objects
*/
public class HdfsValidator extends Validator {
@Override
public Validation validateConnection(Object connectionConfiguration) {
Validation validation = new Validation(ConnectionConfiguration.class);
// No validation on connection object
return validation;
}
@Override
public Validation validateJob(Object jobConfiguration) {
//TODO: I'm pretty sure this needs to call either validateExportJob or validateImportJob, depending on context
return super.validateJob(jobConfiguration);
}
private Validation validateExportJob(Object jobConfiguration) {
Validation validation = new Validation(FromJobConfiguration.class);
FromJobConfiguration configuration = (FromJobConfiguration)jobConfiguration;
validateInputForm(validation, configuration.input);
return validation;
}
private Validation validateImportJob(Object jobConfiguration) {
Validation validation = new Validation(ToJobConfiguration.class);
ToJobConfiguration configuration = (ToJobConfiguration)jobConfiguration;
validateOutputForm(validation, configuration.output);
return validation;
}
private void validateInputForm(Validation validation, InputForm input) {
if(input.inputDirectory == null || input.inputDirectory.isEmpty()) {
validation.addMessage(Status.UNACCEPTABLE, "input", "inputDirectory", "Input directory is empty");
}
}
private void validateOutputForm(Validation validation, OutputForm output) {
if(output.outputDirectory == null || output.outputDirectory.isEmpty()) {
validation.addMessage(Status.UNACCEPTABLE, "output", "outputDirectory", "Output directory is empty");
}
if(output.customCompression != null &&
output.customCompression.trim().length() > 0 &&
output.compression != OutputCompression.CUSTOM) {
validation.addMessage(Status.UNACCEPTABLE, "output", "compression",
"custom compression should be blank as " + output.compression + " is being used.");
}
if(output.compression == OutputCompression.CUSTOM &&
(output.customCompression == null ||
output.customCompression.trim().length() == 0)
) {
validation.addMessage(Status.UNACCEPTABLE, "output", "compression",
"custom compression is blank.");
}
}
}

View File

@ -15,23 +15,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.framework.configuration;
package org.apache.sqoop.connector.hdfs.configuration;
import org.apache.sqoop.model.ConfigurationClass;
import org.apache.sqoop.model.Form;
/**
*
*/
@ConfigurationClass
public class ImportJobConfiguration {
public class ConnectionConfiguration {
@Form
public ConnectionForm connection;
@Form public OutputForm output;
@Form public ThrottlingForm throttling;
public ImportJobConfiguration() {
output = new OutputForm();
throttling = new ThrottlingForm();
public ConnectionConfiguration() {
connection = new ConnectionForm();
}
}

View File

@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.hdfs.configuration;
import org.apache.sqoop.model.FormClass;
import org.apache.sqoop.model.Input;
@FormClass
public class ConnectionForm {
//Todo: Didn't find anything that belongs here...
// Since empty forms don't work (DERBYREPO_0008:The form contains no input metadata), I'm putting a dummy form here
@Input(size = 255) public String dummy;
}

View File

@ -15,23 +15,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.framework.configuration;
package org.apache.sqoop.connector.hdfs.configuration;
import org.apache.sqoop.model.ConfigurationClass;
import org.apache.sqoop.model.Form;
/**
*
*/
@ConfigurationClass
public class ExportJobConfiguration {
public class FromJobConfiguration {
@Form public InputForm input;
@Form public ThrottlingForm throttling;
public ExportJobConfiguration() {
public FromJobConfiguration() {
input = new InputForm();
throttling = new ThrottlingForm();
}
}

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.framework.configuration;
package org.apache.sqoop.connector.hdfs.configuration;
import org.apache.sqoop.model.FormClass;
import org.apache.sqoop.model.Input;

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.framework.configuration;
package org.apache.sqoop.connector.hdfs.configuration;
/**
* Supported compressions

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.framework.configuration;
package org.apache.sqoop.connector.hdfs.configuration;
import org.apache.sqoop.model.FormClass;
import org.apache.sqoop.model.Input;
@ -26,8 +26,6 @@
@FormClass
public class OutputForm {
@Input public StorageType storageType;
@Input public OutputFormat outputFormat;
@Input public OutputCompression compression;

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.framework.configuration;
package org.apache.sqoop.connector.hdfs.configuration;
/**
* Various supported formats on disk

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.framework.configuration;
package org.apache.sqoop.connector.hdfs.configuration;
/**
* Various storage types that Sqoop is supporting

View File

@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.hdfs.configuration;
import org.apache.sqoop.model.ConfigurationClass;
import org.apache.sqoop.model.Form;
@ConfigurationClass
public class ToJobConfiguration {
@Form
public OutputForm output;
public ToJobConfiguration() {
output = new OutputForm();
}
}

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.hdfs.hdfsWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import java.io.IOException;
public abstract class GenericHdfsWriter {
public abstract void initialize(Path filepath, Configuration conf, CompressionCodec codec) throws IOException;
public abstract void write(String csv) throws IOException;
public abstract void destroy() throws IOException;
}

View File

@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.hdfs.hdfsWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import java.io.IOException;
public class HdfsSequenceWriter extends GenericHdfsWriter {
private SequenceFile.Writer filewriter;
private Text text;
public void initialize(Path filepath, Configuration conf, CompressionCodec codec) throws IOException {
if (codec != null) {
filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf),
conf, filepath, Text.class, NullWritable.class,
SequenceFile.CompressionType.BLOCK, codec);
} else {
filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf),
conf, filepath, Text.class, NullWritable.class, SequenceFile.CompressionType.NONE);
}
text = new Text();
}
@Override
public void write(String csv) throws IOException {
text.set(csv);
filewriter.append(text, NullWritable.get());
}
public void destroy() throws IOException {
filewriter.close();
}
}

View File

@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.hdfs.hdfsWriter;
import com.google.common.base.Charsets;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.sqoop.connector.hdfs.HdfsConstants;
import java.io.BufferedWriter;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
public class HdfsTextWriter extends GenericHdfsWriter {
private BufferedWriter filewriter;
@Override
public void initialize(Path filepath, Configuration conf, CompressionCodec codec) throws IOException {
FileSystem fs = filepath.getFileSystem(conf);
DataOutputStream filestream = fs.create(filepath, false);
if (codec != null) {
filewriter = new BufferedWriter(new OutputStreamWriter(
codec.createOutputStream(filestream, codec.createCompressor()),
Charsets.UTF_8));
} else {
filewriter = new BufferedWriter(new OutputStreamWriter(
filestream, Charsets.UTF_8));
}
}
@Override
public void write(String csv) throws IOException {
filewriter.write(csv + HdfsConstants.DEFAULT_RECORD_DELIMITER);
}
@Override
public void destroy() throws IOException {
filewriter.close();
}
}

View File

@ -0,0 +1,58 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Generic HDFS Connector Resources
############################
# Connection Form
#
connection.label = Connection configuration
connection.help = You must supply the information requested in order to \
create a connection object.
connection.dummy.label = Dummy parameter needed to get HDFS connector to register
connection.dummy.help = You can write anything here. Doesn't matter.
# Output From
#
output.label = Output configuration
output.help = You must supply the information requested in order to \
get information where you want to store your data.
output.storageType.label = Storage type
output.storageType.help = Target on Hadoop ecosystem where to store data
output.outputFormat.label = Output format
output.outputFormat.help = Format in which data should be serialized
output.compression.label = Compression format
output.compression.help = Compression that should be used for the data
output.customCompression.label = Custom compression format
output.customCompression.help = Full class name of the custom compression
output.outputDirectory.label = Output directory
output.outputDirectory.help = Output directory for final data
output.ignored.label = Ignored
output.ignored.help = This value is ignored
# Input Form
#
input.label = Input configuration
input.help = Specifies information required to get data from Hadoop ecosystem
input.inputDirectory.label = Input directory
input.inputDirectory.help = Directory that should be exported

View File

@ -0,0 +1,18 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Generic HDFS Connector Properties
org.apache.sqoop.connector.class = org.apache.sqoop.connector.hdfs.HdfsConnector
org.apache.sqoop.connector.name = hdfs-connector

View File

@ -180,6 +180,10 @@ public Object[] getObjectData() {
return null;
}
if (schema == null) {
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0006);
}
if (fields.length != schema.getColumns().size()) {
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
"The data " + getTextData() + " has the wrong number of fields.");
@ -189,7 +193,8 @@ public Object[] getObjectData() {
Column[] cols = schema.getColumns().toArray(new Column[fields.length]);
for (int i = 0; i < fields.length; i++) {
Type colType = cols[i].getType();
if (fields[i].equals("NULL")) {
//TODO: Replace with proper isNull method. Actually the entire content of the loop should be a parse method
if (fields[i].equals("NULL") || fields[i].equals("null") || fields[i].equals("'null'") || fields[i].isEmpty()) {
out[i] = null;
continue;
}

View File

@ -37,7 +37,9 @@ public enum IntermediateDataFormatError implements ErrorCode {
INTERMEDIATE_DATA_FORMAT_0004("Unknown column type."),
/** Number of fields. */
INTERMEDIATE_DATA_FORMAT_0005("Wrong number of fields.")
INTERMEDIATE_DATA_FORMAT_0005("Wrong number of fields."),
INTERMEDIATE_DATA_FORMAT_0006("Schema missing.")
;

View File

@ -35,10 +35,11 @@ limitations under the License.
<modules>
<module>connector-sdk</module>
<module>connector-generic-jdbc</module>
<!-- Uncomment and finish connectors after sqoop framework will become stable
<module>connector-mysql-jdbc</module>
<module>connector-mysql-fastpath</module>
-->
<module>connector-hdfs</module>
<!-- Uncomment and finish connectors after sqoop framework will become stable
<module>connector-mysql-jdbc</module>
<module>connector-mysql-fastpath</module>
-->
</modules>
</project>

View File

@ -18,20 +18,13 @@
package org.apache.sqoop.framework;
import org.apache.sqoop.framework.configuration.ConnectionConfiguration;
import org.apache.sqoop.framework.configuration.InputForm;
import org.apache.sqoop.framework.configuration.JobConfiguration;
import org.apache.sqoop.framework.configuration.OutputCompression;
import org.apache.sqoop.framework.configuration.OutputForm;
import org.apache.sqoop.framework.configuration.ThrottlingForm;
import org.apache.sqoop.validation.Status;
import org.apache.sqoop.validation.Validation;
import org.apache.sqoop.validation.Validator;
/**
* Validate framework configuration objects
*/
public class FrameworkValidator extends Validator {
@Override
public Validation validateConnection(Object connectionConfiguration) {
Validation validation = new Validation(ConnectionConfiguration.class);
@ -39,61 +32,16 @@ public Validation validateConnection(Object connectionConfiguration) {
return validation;
}
@Override
public Validation validateJob(Object jobConfiguration) {
JobConfiguration configuration = (JobConfiguration)jobConfiguration;
Validation validation = new Validation(JobConfiguration.class);
validateThrottingForm(validation, configuration.throttling);
return super.validateJob(jobConfiguration);
}
JobConfiguration conf = (JobConfiguration)jobConfiguration;
validateThrottlingForm(validation,conf.throttling);
// private Validation validateExportJob(Object jobConfiguration) {
// Validation validation = new Validation(ExportJobConfiguration.class);
// ExportJobConfiguration configuration = (ExportJobConfiguration)jobConfiguration;
//
// validateInputForm(validation, configuration.input);
// validateThrottingForm(validation, configuration.throttling);
//
// return validation;
// }
//
// private Validation validateImportJob(Object jobConfiguration) {
// Validation validation = new Validation(ImportJobConfiguration.class);
// ImportJobConfiguration configuration = (ImportJobConfiguration)jobConfiguration;
//
// validateOutputForm(validation, configuration.output);
// validateThrottingForm(validation, configuration.throttling);
//
// return validation;
// }
return validation;
};
// private void validateInputForm(Validation validation, InputForm input) {
// if(input.inputDirectory == null || input.inputDirectory.isEmpty()) {
// validation.addMessage(Status.UNACCEPTABLE, "input", "inputDirectory", "Input directory is empty");
// }
// }
//
// private void validateOutputForm(Validation validation, OutputForm output) {
// if(output.outputDirectory == null || output.outputDirectory.isEmpty()) {
// validation.addMessage(Status.UNACCEPTABLE, "output", "outputDirectory", "Output directory is empty");
// }
// if(output.customCompression != null &&
// output.customCompression.trim().length() > 0 &&
// output.compression != OutputCompression.CUSTOM) {
// validation.addMessage(Status.UNACCEPTABLE, "output", "compression",
// "custom compression should be blank as " + output.compression + " is being used.");
// }
// if(output.compression == OutputCompression.CUSTOM &&
// (output.customCompression == null ||
// output.customCompression.trim().length() == 0)
// ) {
// validation.addMessage(Status.UNACCEPTABLE, "output", "compression",
// "custom compression is blank.");
// }
// }
private void validateThrottingForm(Validation validation, ThrottlingForm throttling) {
private void validateThrottlingForm(Validation validation, ThrottlingForm throttling) {
if(throttling.extractors != null && throttling.extractors < 1) {
validation.addMessage(Status.UNACCEPTABLE, "throttling", "extractors", "You need to specify more than one extractor");
}
@ -102,4 +50,5 @@ private void validateThrottingForm(Validation validation, ThrottlingForm throttl
validation.addMessage(Status.UNACCEPTABLE, "throttling", "loaders", "You need to specify more than one loader");
}
}
}

View File

@ -36,6 +36,7 @@
import org.apache.sqoop.model.MSubmission;
import org.apache.sqoop.repository.Repository;
import org.apache.sqoop.repository.RepositoryManager;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.submission.SubmissionStatus;
import org.apache.sqoop.submission.counter.Counters;
import org.apache.sqoop.utils.ClassUtils;
@ -434,12 +435,17 @@ public MSubmission submit(long jobId, HttpEventContext ctx) {
request.getConnectorJobConfig(Direction.FROM)));
// @TODO(Abe): Alter behavior of Schema here. Need from Schema.
// Retrieve and persist the schema
request.getSummary().setConnectorSchema(initializer.getSchema(
initializerContext,
request.getConnectorConnectionConfig(Direction.FROM),
request.getConnectorJobConfig(Direction.FROM)
));
Schema fromSchema = initializer.getSchema(initializerContext,
request.getConnectorConnectionConfig(Direction.FROM),
request.getConnectorJobConfig(Direction.FROM));
// request.getSummary().setConnectorSchema(initializer.getSchema(
// initializerContext,
// request.getConnectorConnectionConfig(ConnectorType.FROM),
// request.getConnectorJobConfig(ConnectorType.FROM)
// ));
// Initialize To Connector callback.
baseCallback = request.getToCallback();
@ -468,6 +474,11 @@ public MSubmission submit(long jobId, HttpEventContext ctx) {
request.getConnectorJobConfig(Direction.TO)));
// @TODO(Abe): Alter behavior of Schema here. Need To Schema.
Schema toSchema = initializer.getSchema(initializerContext,
request.getConnectorConnectionConfig(Direction.TO),
request.getConnectorJobConfig(Direction.TO));
// Retrieve and persist the schema
// request.getSummary().setConnectorSchema(initializer.getSchema(
// initializerContext,
@ -475,6 +486,12 @@ public MSubmission submit(long jobId, HttpEventContext ctx) {
// request.getConnectorJobConfig(ConnectorType.TO)
// ));
//TODO: Need better logic here
if (fromSchema != null)
request.getSummary().setConnectorSchema(fromSchema);
else
request.getSummary().setConnectorSchema(toSchema);
// Bootstrap job from framework perspective
prepareSubmission(request);

View File

@ -176,9 +176,11 @@ public void setConnector(Direction type, SqoopConnector connector) {
switch(type) {
case FROM:
fromConnector = connector;
break;
case TO:
toConnector = connector;
break;
default:
throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
@ -238,10 +240,10 @@ public void setConnectorConnectionConfig(Direction type, Object config) {
switch(type) {
case FROM:
fromConnectorConnectionConfig = config;
break;
case TO:
toConnectorConnectionConfig = config;
break;
default:
throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
}
@ -264,10 +266,10 @@ public void setConnectorJobConfig(Direction type, Object config) {
switch(type) {
case FROM:
fromConnectorJobConfig = config;
break;
case TO:
toConnectorJobConfig = config;
break;
default:
throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
}
@ -290,10 +292,10 @@ public void setFrameworkConnectionConfig(Direction type, Object config) {
switch(type) {
case FROM:
fromFrameworkConnectionConfig = config;
break;
case TO:
toFrameworkConnectionConfig = config;
break;
default:
throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
}

View File

@ -22,8 +22,8 @@
@ConfigurationClass
public class JobConfiguration {
@Form public ThrottlingForm throttling;
@Form
public ThrottlingForm throttling;
public JobConfiguration() {
throttling = new ThrottlingForm();

View File

@ -17,15 +17,7 @@
*/
package org.apache.sqoop.framework;
import org.apache.sqoop.framework.configuration.ConnectionConfiguration;
import org.apache.sqoop.framework.configuration.ExportJobConfiguration;
import org.apache.sqoop.framework.configuration.ImportJobConfiguration;
import org.apache.sqoop.framework.configuration.OutputCompression;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.validation.Status;
import org.apache.sqoop.validation.Validation;
import org.junit.Before;
import org.junit.Test;
//import org.apache.sqoop.framework.configuration.OutputCompression;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

View File

@ -28,7 +28,7 @@
import org.apache.sqoop.connector.spi.MetadataUpgrader;
import org.apache.sqoop.connector.spi.SqoopConnector;
import org.apache.sqoop.framework.FrameworkManager;
import org.apache.sqoop.framework.configuration.ImportJobConfiguration;
//import org.apache.sqoop.framework.configuration.ImportJobConfiguration;
import org.apache.sqoop.model.ConfigurationClass;
import org.apache.sqoop.model.FormUtils;
import org.apache.sqoop.model.MConnection;

View File

@ -80,49 +80,9 @@ public void prepareSubmission(SubmissionRequest gRequest) {
if(request.getExtractors() != null) {
context.setInteger(JobConstants.JOB_ETL_EXTRACTOR_NUM, request.getExtractors());
}
// @TODO(Abe): Move to HDFS connector.
// if(jobConf.output.outputFormat == OutputFormat.TEXT_FILE) {
// context.setString(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName());
// } else if(jobConf.output.outputFormat == OutputFormat.SEQUENCE_FILE) {
// context.setString(JobConstants.JOB_ETL_LOADER, HdfsSequenceImportLoader.class.getName());
// } else {
// throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0024,
// "Format: " + jobConf.output.outputFormat);
// }
// if(getCompressionCodecName(jobConf) != null) {
// context.setString(JobConstants.HADOOP_COMPRESS_CODEC,
// getCompressionCodecName(jobConf));
// context.setBoolean(JobConstants.HADOOP_COMPRESS, true);
// }
}
// @TODO(Abe): Move to HDFS connector.
// private String getCompressionCodecName(ImportJobConfiguration jobConf) {
// if(jobConf.output.compression == null)
// return null;
// switch(jobConf.output.compression) {
// case NONE:
// return null;
// case DEFAULT:
// return "org.apache.hadoop.io.compress.DefaultCodec";
// case DEFLATE:
// return "org.apache.hadoop.io.compress.DeflateCodec";
// case GZIP:
// return "org.apache.hadoop.io.compress.GzipCodec";
// case BZIP2:
// return "org.apache.hadoop.io.compress.BZip2Codec";
// case LZO:
// return "com.hadoop.compression.lzo.LzoCodec";
// case LZ4:
// return "org.apache.hadoop.io.compress.Lz4Codec";
// case SNAPPY:
// return "org.apache.hadoop.io.compress.SnappyCodec";
// case CUSTOM:
// return jobConf.output.customCompression.trim();
// }
// return null;
// }
/**
* Our execution engine have additional dependencies that needs to be available

View File

@ -1,194 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.job.etl;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.util.LineReader;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.etl.io.DataWriter;
import org.apache.sqoop.framework.configuration.ConnectionConfiguration;
import org.apache.sqoop.framework.configuration.ExportJobConfiguration;
import org.apache.sqoop.job.MapreduceExecutionError;
import org.apache.sqoop.job.PrefixContext;
/**
* Extract from HDFS.
* Default field delimiter of a record is comma.
*/
//public class HdfsExportExtractor extends Extractor<ConnectionConfiguration, ExportJobConfiguration, HdfsExportPartition> {
//
// public static final Logger LOG = Logger.getLogger(HdfsExportExtractor.class);
//
// private Configuration conf;
// private DataWriter dataWriter;
// private long rowRead = 0;
//
// @Override
// public void extract(ExtractorContext context,
// ConnectionConfiguration connectionConfiguration,
// ExportJobConfiguration jobConfiguration, HdfsExportPartition partition) {
//
// conf = ((PrefixContext) context.getContext()).getConfiguration();
// dataWriter = context.getDataWriter();
//
// try {
// HdfsExportPartition p = partition;
// LOG.info("Working on partition: " + p);
// int numFiles = p.getNumberOfFiles();
// for (int i = 0; i < numFiles; i++) {
// extractFile(p.getFile(i), p.getOffset(i), p.getLength(i));
// }
// } catch (IOException e) {
// throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0017, e);
// }
// }
//
// private void extractFile(Path file, long start, long length)
// throws IOException {
// long end = start + length;
// LOG.info("Extracting file " + file);
// LOG.info("\t from offset " + start);
// LOG.info("\t to offset " + end);
// LOG.info("\t of length " + length);
// if(isSequenceFile(file)) {
// extractSequenceFile(file, start, length);
// } else {
// extractTextFile(file, start, length);
// }
// }
//
// /**
// * Extracts Sequence file
// * @param file
// * @param start
// * @param length
// * @throws IOException
// */
// private void extractSequenceFile(Path file, long start, long length)
// throws IOException {
// LOG.info("Extracting sequence file");
// long end = start + length;
// SequenceFile.Reader filereader = new SequenceFile.Reader(
// file.getFileSystem(conf), file, conf);
//
// if (start > filereader.getPosition()) {
// filereader.sync(start); // sync to start
// }
//
// Text line = new Text();
// boolean hasNext = filereader.next(line);
// while (hasNext) {
// rowRead++;
// dataWriter.writeStringRecord(line.toString());
// line = new Text();
// hasNext = filereader.next(line);
// if (filereader.getPosition() >= end && filereader.syncSeen()) {
// break;
// }
// }
// filereader.close();
// }
//
// /**
// * Extracts Text file
// * @param file
// * @param start
// * @param length
// * @throws IOException
// */
// private void extractTextFile(Path file, long start, long length)
// throws IOException {
// LOG.info("Extracting text file");
// long end = start + length;
// FileSystem fs = file.getFileSystem(conf);
// FSDataInputStream filestream = fs.open(file);
// CompressionCodec codec = (new CompressionCodecFactory(conf)).getCodec(file);
// LineReader filereader;
// Seekable fileseeker = filestream;
//
// // Hadoop 1.0 does not have support for custom record delimiter and thus
// // we
// // are supporting only default one.
// // We might add another "else if" case for SplittableCompressionCodec once
// // we drop support for Hadoop 1.0.
// if (codec == null) {
// filestream.seek(start);
// filereader = new LineReader(filestream);
// } else {
// filereader = new LineReader(codec.createInputStream(filestream,
// codec.createDecompressor()), conf);
// fileseeker = filestream;
// }
// if (start != 0) {
// // always throw away first record because
// // one extra line is read in previous split
// start += filereader.readLine(new Text(), 0);
// }
// int size;
// LOG.info("Start position: " + String.valueOf(start));
// long next = start;
// while (next <= end) {
// Text line = new Text();
// size = filereader.readLine(line, Integer.MAX_VALUE);
// if (size == 0) {
// break;
// }
// if (codec == null) {
// next += size;
// } else {
// next = fileseeker.getPos();
// }
// rowRead++;
// dataWriter.writeStringRecord(line.toString());
// }
// LOG.info("Extracting ended on position: " + fileseeker.getPos());
// filestream.close();
// }
//
// @Override
// public long getRowsRead() {
// return rowRead;
// }
//
// /**
// * Returns true if given file is sequence
// * @param file
// * @return boolean
// */
// private boolean isSequenceFile(Path file) {
// SequenceFile.Reader filereader = null;
// try {
// filereader = new SequenceFile.Reader(file.getFileSystem(conf), file, conf);
// filereader.close();
// } catch (IOException e) {
// return false;
// }
// return true;
// }
//}

View File

@ -1,94 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.job.etl;
import java.io.IOException;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.MapreduceExecutionError;
import org.apache.sqoop.etl.io.DataReader;
import org.apache.sqoop.utils.ClassUtils;
public class HdfsSequenceImportLoader extends Loader {
public static final String EXTENSION = ".seq";
@Override
public void load(LoaderContext context, Object oc, Object oj) throws Exception {
DataReader reader = context.getDataReader();
Configuration conf = new Configuration();
// Configuration conf = ((EtlContext)context).getConfiguration();
String filename = context.getString(JobConstants.JOB_MR_OUTPUT_FILE);
String codecname = context.getString(JobConstants.JOB_MR_OUTPUT_CODEC);
CompressionCodec codec = null;
if (codecname != null) {
Class<?> clz = ClassUtils.loadClass(codecname);
if (clz == null) {
throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0009, codecname);
}
try {
codec = (CompressionCodec) clz.newInstance();
if (codec instanceof Configurable) {
((Configurable) codec).setConf(conf);
}
} catch (Exception e) {
throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0010, codecname, e);
}
}
filename += EXTENSION;
try {
Path filepath = new Path(filename);
SequenceFile.Writer filewriter;
if (codec != null) {
filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf),
conf, filepath, Text.class, NullWritable.class,
CompressionType.BLOCK, codec);
} else {
filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf),
conf, filepath, Text.class, NullWritable.class, CompressionType.NONE);
}
String csv;
Text text = new Text();
while ((csv = reader.readTextRecord()) != null) {
text.set(csv);
filewriter.append(text, NullWritable.get());
}
filewriter.close();
} catch (IOException e) {
throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0018, e);
}
}
}

View File

@ -1,101 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.job.etl;
import java.io.BufferedWriter;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import com.google.common.base.Charsets;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.MapreduceExecutionError;
import org.apache.sqoop.job.io.Data;
import org.apache.sqoop.etl.io.DataReader;
import org.apache.sqoop.utils.ClassUtils;
public class HdfsTextImportLoader extends Loader {
private final char recordDelimiter;
public HdfsTextImportLoader() {
recordDelimiter = Data.DEFAULT_RECORD_DELIMITER;
}
@Override
public void load(LoaderContext context, Object oc, Object oj) throws Exception{
DataReader reader = context.getDataReader();
Configuration conf = new Configuration();
// Configuration conf = ((EtlContext)context).getConfiguration();
String filename = context.getString(JobConstants.JOB_MR_OUTPUT_FILE);
String codecname = context.getString(JobConstants.JOB_MR_OUTPUT_CODEC);
CompressionCodec codec = null;
if (codecname != null) {
Class<?> clz = ClassUtils.loadClass(codecname);
if (clz == null) {
throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0009, codecname);
}
try {
codec = (CompressionCodec) clz.newInstance();
if (codec instanceof Configurable) {
((Configurable) codec).setConf(conf);
}
} catch (Exception e) {
throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0010, codecname, e);
}
filename += codec.getDefaultExtension();
}
try {
Path filepath = new Path(filename);
FileSystem fs = filepath.getFileSystem(conf);
BufferedWriter filewriter;
DataOutputStream filestream = fs.create(filepath, false);
if (codec != null) {
filewriter = new BufferedWriter(new OutputStreamWriter(
codec.createOutputStream(filestream, codec.createCompressor()),
Charsets.UTF_8));
} else {
filewriter = new BufferedWriter(new OutputStreamWriter(
filestream, Charsets.UTF_8));
}
String csv;
while ((csv = reader.readTextRecord()) != null) {
filewriter.write(csv + recordDelimiter);
}
filewriter.close();
} catch (IOException e) {
throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0018, e);
}
}
}

View File

@ -21,7 +21,7 @@
import org.apache.log4j.Logger;
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.PrefixContext;
import org.apache.sqoop.common.PrefixContext;
import org.apache.sqoop.job.etl.Destroyer;
import org.apache.sqoop.job.etl.DestroyerContext;
import org.apache.sqoop.schema.Schema;

View File

@ -33,7 +33,7 @@
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.MapreduceExecutionError;
import org.apache.sqoop.job.PrefixContext;
import org.apache.sqoop.common.PrefixContext;
import org.apache.sqoop.job.etl.Partition;
import org.apache.sqoop.job.etl.Partitioner;
import org.apache.sqoop.job.etl.PartitionerContext;

View File

@ -31,7 +31,7 @@
import org.apache.sqoop.connector.idf.IntermediateDataFormat;
import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.MapreduceExecutionError;
import org.apache.sqoop.job.PrefixContext;
import org.apache.sqoop.common.PrefixContext;
import org.apache.sqoop.job.etl.Extractor;
import org.apache.sqoop.job.etl.ExtractorContext;
import org.apache.sqoop.etl.io.DataWriter;
@ -66,7 +66,16 @@ public void run(Context context) throws IOException, InterruptedException {
// Propagate connector schema in every case for now
// TODO: Change to coditional choosing between Connector schemas.
Schema schema = ConfigurationUtils.getConnectorSchema(Direction.FROM, conf);
if (schema==null) {
schema = ConfigurationUtils.getConnectorSchema(Direction.TO, conf);
}
if (schema==null) {
LOG.info("setting an empty schema");
}
String intermediateDataFormatName = conf.get(JobConstants
.INTERMEDIATE_DATA_FORMAT);

View File

@ -36,7 +36,7 @@
import org.apache.sqoop.connector.idf.IntermediateDataFormat;
import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.MapreduceExecutionError;
import org.apache.sqoop.job.PrefixContext;
import org.apache.sqoop.common.PrefixContext;
import org.apache.sqoop.job.etl.Loader;
import org.apache.sqoop.job.etl.LoaderContext;
import org.apache.sqoop.etl.io.DataReader;
@ -72,7 +72,13 @@ public SqoopOutputFormatLoadExecutor(JobContext jobctx) {
producer = new SqoopRecordWriter();
data = (IntermediateDataFormat) ClassUtils.instantiate(context
.getConfiguration().get(JobConstants.INTERMEDIATE_DATA_FORMAT));
data.setSchema(ConfigurationUtils.getConnectorSchema(Direction.FROM, context.getConfiguration()));
Schema schema = ConfigurationUtils.getConnectorSchema(Direction.FROM, context.getConfiguration());
if (schema==null) {
schema = ConfigurationUtils.getConnectorSchema(Direction.TO, context.getConfiguration());
}
data.setSchema(schema);
}
public RecordWriter<SqoopWritable, NullWritable> getRecordWriter() {

View File

@ -17,18 +17,9 @@
*/
package org.apache.sqoop.execution.mapreduce;
import org.apache.sqoop.common.MutableMapContext;
//import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
import org.apache.sqoop.framework.SubmissionRequest;
import org.apache.sqoop.framework.configuration.ImportJobConfiguration;
import org.apache.sqoop.framework.configuration.OutputCompression;
import org.apache.sqoop.framework.configuration.OutputFormat;
import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.etl.Destroyer;
import org.apache.sqoop.job.etl.Extractor;
import org.apache.sqoop.job.etl.Initializer;
import org.apache.sqoop.job.etl.Partitioner;
import org.junit.Test;
//import org.apache.sqoop.framework.configuration.OutputCompression;
//import org.apache.sqoop.framework.configuration.OutputFormat;
import static junit.framework.TestCase.assertEquals;

View File

@ -17,40 +17,11 @@
*/
package org.apache.sqoop.job;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.util.List;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapreduce.Job;
//import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
//import org.apache.sqoop.job.etl.HdfsExportExtractor;
import org.apache.sqoop.job.etl.HdfsExportPartitioner;
import org.apache.sqoop.job.etl.HdfsSequenceImportLoader;
import org.apache.sqoop.job.etl.Loader;
import org.apache.sqoop.job.etl.LoaderContext;
import org.apache.sqoop.job.etl.Partition;
import org.apache.sqoop.job.etl.PartitionerContext;
import org.apache.sqoop.job.io.Data;
import org.apache.sqoop.job.mr.ConfigurationUtils;
import org.apache.sqoop.job.mr.SqoopFileOutputFormat;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.schema.type.FixedPoint;
import org.apache.sqoop.schema.type.FloatingPoint;
import org.junit.Test;
//import org.apache.sqoop.job.etl.HdfsExportPartitioner;
public class TestHdfsExtract extends TestCase {

View File

@ -17,40 +17,11 @@
*/
package org.apache.sqoop.job;
import java.io.BufferedReader;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.LinkedList;
import java.util.List;
import com.google.common.base.Charsets;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.ReflectionUtils;
//import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
import org.apache.sqoop.job.etl.Extractor;
import org.apache.sqoop.job.etl.ExtractorContext;
import org.apache.sqoop.job.etl.HdfsSequenceImportLoader;
import org.apache.sqoop.job.etl.HdfsTextImportLoader;
import org.apache.sqoop.job.etl.Partition;
import org.apache.sqoop.job.etl.Partitioner;
import org.apache.sqoop.job.etl.PartitionerContext;
import org.apache.sqoop.job.io.Data;
import org.apache.sqoop.job.mr.ConfigurationUtils;
import org.apache.sqoop.job.mr.SqoopFileOutputFormat;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.schema.type.FixedPoint;
import org.apache.sqoop.schema.type.FloatingPoint;
//import org.apache.sqoop.job.etl.HdfsSequenceImportLoader;
//import org.apache.sqoop.job.etl.HdfsTextImportLoader;
public class TestHdfsLoad extends TestCase {

11
pom.xml
View File

@ -299,6 +299,17 @@ limitations under the License.
<type>test-jar</type>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.sqoop.connector</groupId>
<artifactId>sqoop-connector-hdfs</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.sqoop.connector</groupId>
<artifactId>sqoop-connector-hdfs</artifactId>
<type>test-jar</type>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.sqoop.connector</groupId>
<artifactId>sqoop-connector-mysql-jdbc</artifactId>

View File

@ -27,12 +27,7 @@
import java.sql.Statement;
import java.sql.Timestamp;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.*;
import javax.sql.DataSource;
@ -1711,14 +1706,16 @@ private List<MJob> loadJobs(PreparedStatement stmt,
throws SQLException {
List<MJob> jobs = new ArrayList<MJob>();
ResultSet rsJob = null;
PreparedStatement formConnectorFetchStmt = null;
PreparedStatement toFormConnectorFetchStmt = null;
PreparedStatement fromFormConnectorFetchStmt = null;
PreparedStatement formFrameworkFetchStmt = null;
PreparedStatement inputFetchStmt = null;
try {
rsJob = stmt.executeQuery();
formConnectorFetchStmt = conn.prepareStatement(STMT_FETCH_FORM_CONNECTOR);
toFormConnectorFetchStmt = conn.prepareStatement(STMT_FETCH_FORM_CONNECTOR);
fromFormConnectorFetchStmt = conn.prepareStatement(STMT_FETCH_FORM_CONNECTOR);
formFrameworkFetchStmt = conn.prepareStatement(STMT_FETCH_FORM_FRAMEWORK);
inputFetchStmt = conn.prepareStatement(STMT_FETCH_JOB_INPUT);
@ -1735,28 +1732,47 @@ private List<MJob> loadJobs(PreparedStatement stmt,
String updateBy = rsJob.getString(10);
Date lastUpdateDate = rsJob.getTimestamp(11);
formConnectorFetchStmt.setLong(1, fromConnectorId);
fromFormConnectorFetchStmt.setLong(1, fromConnectorId);
toFormConnectorFetchStmt.setLong(1,toConnectorId);
inputFetchStmt.setLong(1, id);
//inputFetchStmt.setLong(1, XXX); // Will be filled by loadForms
inputFetchStmt.setLong(3, id);
List<MForm> connectorConnForms = new ArrayList<MForm>();
List<MForm> toConnectorConnForms = new ArrayList<MForm>();
List<MForm> fromConnectorConnForms = new ArrayList<MForm>();
List<MForm> frameworkConnForms = new ArrayList<MForm>();
List<MForm> frameworkJobForms = new ArrayList<MForm>();
List<MForm> fromJobForms = new ArrayList<MForm>();
List<MForm> toJobForms = new ArrayList<MForm>();
loadConnectorForms(connectorConnForms, fromJobForms, toJobForms,
formConnectorFetchStmt, inputFetchStmt, 2);
// This looks confusing but our job has 2 connectors, each connector has two job forms
// To define the job, we need to TO job form of the TO connector
// and the FROM job form of the FROM connector
List<MForm> fromConnectorFromJobForms = new ArrayList<MForm>();
List<MForm> fromConnectorToJobForms = new ArrayList<MForm>();
List<MForm> toConnectorFromJobForms = new ArrayList<MForm>();
List<MForm> toConnectorToJobForms = new ArrayList<MForm>();
loadConnectorForms(fromConnectorConnForms,
fromConnectorFromJobForms,
fromConnectorToJobForms,
fromFormConnectorFetchStmt,
inputFetchStmt,
2);
loadConnectorForms(toConnectorConnForms,
toConnectorFromJobForms,
toConnectorToJobForms,
toFormConnectorFetchStmt, inputFetchStmt, 2);
loadForms(frameworkConnForms, frameworkJobForms,
formFrameworkFetchStmt, inputFetchStmt, 2);
MJob job = new MJob(
fromConnectorId, toConnectorId,
fromConnectionId, toConnectionId,
new MJobForms(fromJobForms),
new MJobForms(toJobForms),
new MJobForms(fromConnectorFromJobForms),
new MJobForms(toConnectorToJobForms),
new MJobForms(frameworkJobForms));
job.setPersistenceId(id);
@ -1771,7 +1787,7 @@ private List<MJob> loadJobs(PreparedStatement stmt,
}
} finally {
closeResultSets(rsJob);
closeStatements(formConnectorFetchStmt, formFrameworkFetchStmt, inputFetchStmt);
closeStatements(fromFormConnectorFetchStmt, toFormConnectorFetchStmt, formFrameworkFetchStmt, inputFetchStmt);
}
return jobs;

View File

@ -64,6 +64,11 @@ limitations under the License.
<artifactId>sqoop-connector-generic-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.apache.sqoop.connector</groupId>
<artifactId>sqoop-connector-hdfs</artifactId>
</dependency>
<!--
<dependency>
<groupId>org.apache.sqoop.connector</groupId>

View File

@ -283,6 +283,7 @@ private JsonBean getJobs(RequestContext ctx) {
MJob job = repository.findJob(jid);
// @TODO(Abe): From/To
long connectorId = job.getConnectorId(Direction.FROM);
bean = new JobBean(job);

View File

@ -67,6 +67,11 @@ limitations under the License.
<artifactId>sqoop-connector-generic-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.apache.sqoop.connector</groupId>
<artifactId>sqoop-connector-hdfs</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.cargo</groupId>
<artifactId>cargo-core-container-tomcat</artifactId>

View File

@ -21,8 +21,8 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.log4j.Logger;
import org.apache.sqoop.client.SubmissionCallback;
import org.apache.sqoop.framework.configuration.OutputFormat;
import org.apache.sqoop.framework.configuration.StorageType;
import org.apache.sqoop.connector.hdfs.configuration.OutputFormat;
import org.apache.sqoop.connector.hdfs.configuration.StorageType;
import org.apache.sqoop.model.MConnection;
import org.apache.sqoop.model.MFormList;
import org.apache.sqoop.model.MJob;

View File

@ -17,15 +17,7 @@
*/
package org.apache.sqoop.integration.connector.jdbc.generic;
import org.apache.log4j.Logger;
import org.apache.sqoop.framework.configuration.OutputFormat;
import org.apache.sqoop.framework.configuration.StorageType;
import org.apache.sqoop.test.testcases.ConnectorTestCase;
import org.apache.sqoop.model.MConnection;
import org.apache.sqoop.model.MFormList;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MSubmission;
import org.junit.Test;
import static org.junit.Assert.assertTrue;

View File

@ -17,15 +17,7 @@
*/
package org.apache.sqoop.integration.connector.jdbc.generic.imports;
import org.apache.log4j.Logger;
import org.apache.sqoop.framework.configuration.OutputFormat;
import org.apache.sqoop.framework.configuration.StorageType;
import org.apache.sqoop.model.MConnection;
import org.apache.sqoop.model.MFormList;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.test.testcases.ConnectorTestCase;
import org.apache.sqoop.test.utils.ParametrizedUtils;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

View File

@ -17,21 +17,10 @@
*/
package org.apache.sqoop.integration.server;
import org.apache.sqoop.client.ClientError;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.framework.FrameworkError;
import org.apache.sqoop.framework.configuration.OutputFormat;
import org.apache.sqoop.framework.configuration.StorageType;
import org.apache.sqoop.model.MConnection;
import org.apache.sqoop.model.MFormList;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.test.testcases.ConnectorTestCase;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.Arrays;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;