diff --git a/client/src/main/java/org/apache/sqoop/client/SqoopClient.java b/client/src/main/java/org/apache/sqoop/client/SqoopClient.java index 1d93ae3f..2b3171c3 100644 --- a/client/src/main/java/org/apache/sqoop/client/SqoopClient.java +++ b/client/src/main/java/org/apache/sqoop/client/SqoopClient.java @@ -369,7 +369,7 @@ public MJob newJob(long fromXid, long toXid) { fromConnection.getPersistenceId(), toConnection.getPersistenceId(), getConnector(fromConnection.getConnectorId()).getJobForms(Direction.FROM), - getConnector(fromConnection.getConnectorId()).getJobForms(Direction.TO), + getConnector(toConnection.getConnectorId()).getJobForms(Direction.TO), getFramework().getJobForms() ); } diff --git a/common/pom.xml b/common/pom.xml index 9bfa07d4..151a649f 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -106,5 +106,71 @@ limitations under the License. + + + + + + hadoop100 + + + + hadoop.profile + 100 + + + + + + com.google.guava + guava + + + + org.apache.hadoop + hadoop-core + provided + + + + + + + hadoop200 + + + true + + hadoop.profile + 200 + + + + + 200 + + + + + org.apache.hadoop + hadoop-common + provided + + + + org.apache.hadoop + hadoop-mapreduce-client-core + provided + + + + org.apache.hadoop + hadoop-mapreduce-client-jobclient + provided + + + + + diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/PrefixContext.java b/common/src/main/java/org/apache/sqoop/common/PrefixContext.java similarity index 98% rename from execution/mapreduce/src/main/java/org/apache/sqoop/job/PrefixContext.java rename to common/src/main/java/org/apache/sqoop/common/PrefixContext.java index c3beed7f..6434e6d6 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/PrefixContext.java +++ b/common/src/main/java/org/apache/sqoop/common/PrefixContext.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.sqoop.job; +package org.apache.sqoop.common; import org.apache.hadoop.conf.Configuration; import org.apache.sqoop.common.ImmutableContext; diff --git a/connector/connector-hdfs/pom.xml b/connector/connector-hdfs/pom.xml new file mode 100644 index 00000000..8df9f11a --- /dev/null +++ b/connector/connector-hdfs/pom.xml @@ -0,0 +1,83 @@ + + + + + 4.0.0 + + + org.apache.sqoop + connector + 2.0.0-SNAPSHOT + + + org.apache.sqoop.connector + sqoop-connector-hdfs + Sqoop HDFS Connector + + + + + + org.apache.sqoop + sqoop-spi + + + + org.apache.sqoop + connector-sdk + + + + org.apache.hadoop + hadoop-common + provided + + + + org.apache.hadoop + hadoop-mapreduce-client-core + provided + + + + org.apache.hadoop + hadoop-mapreduce-client-jobclient + provided + + + + + sqoop + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + + + + + + + \ No newline at end of file diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java new file mode 100644 index 00000000..557091ea --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.connector.hdfs; + +import org.apache.sqoop.common.Direction; +import org.apache.sqoop.common.VersionInfo; +import org.apache.sqoop.connector.hdfs.configuration.ConnectionConfiguration; +import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration; +import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration; +import org.apache.sqoop.connector.spi.MetadataUpgrader; +import org.apache.sqoop.connector.spi.SqoopConnector; +import org.apache.sqoop.job.etl.From; +import org.apache.sqoop.job.etl.To; +import org.apache.sqoop.validation.Validator; + +import java.util.Locale; +import java.util.ResourceBundle; + +public class HdfsConnector extends SqoopConnector { + + private static final From FROM = new From( + HdfsInitializer.class, + HdfsPartitioner.class, + HdfsExtractor.class, + HdfsDestroyer.class); + + private static final To TO = new To( + HdfsInitializer.class, + HdfsLoader.class, + HdfsDestroyer.class); + + private static final HdfsValidator hdfsValidator = new HdfsValidator(); + + /** + * Retrieve connector version. + * + * @return Version encoded as a string + */ + @Override + public String getVersion() { + return VersionInfo.getVersion(); + } + + /** + * @param locale + * @return the resource bundle associated with the given locale. + */ + @Override + public ResourceBundle getBundle(Locale locale) { + return ResourceBundle.getBundle( + HdfsConstants.RESOURCE_BUNDLE_NAME, locale); + } + + /** + * @return Get connection configuration class + */ + @Override + public Class getConnectionConfigurationClass() { + return ConnectionConfiguration.class; + } + + /** + * @param jobType + * @return Get job configuration class for given type or null if not supported + */ + @Override + public Class getJobConfigurationClass(Direction jobType) { + switch (jobType) { + case FROM: + return FromJobConfiguration.class; + case TO: + return ToJobConfiguration.class; + default: + return null; + } + } + + /** + * @return an From that provides classes for performing import. + */ + @Override + public From getFrom() { + return FROM; + } + + /** + * @return an To that provides classes for performing export. + */ + @Override + public To getTo() { + return TO; + } + + /** + * Returns validation object that Sqoop framework can use to validate user + * supplied forms before accepting them. This object will be used both for + * connection and job forms. + * + * @return Validator object + */ + @Override + public Validator getValidator() { + return hdfsValidator; + } + + /** + * Returns an {@linkplain org.apache.sqoop.connector.spi.MetadataUpgrader} object that can upgrade the + * connection and job metadata. + * + * @return MetadataUpgrader object + */ + @Override + public MetadataUpgrader getMetadataUpgrader() { + return null; + } +} diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnectorError.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnectorError.java new file mode 100644 index 00000000..8a095d26 --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnectorError.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs; + +import org.apache.sqoop.common.ErrorCode; + +public enum HdfsConnectorError implements ErrorCode{ + /** Error occurs during partitioner run */ + GENERIC_HDFS_CONNECTOR_0000("Error occurs during partitioner run"), + /** Error occurs during extractor run */ + GENERIC_HDFS_CONNECTOR_0001("Error occurs during extractor run"), + /** Unsupported output format type found **/ + GENERIC_HDFS_CONNECTOR_0002("Unknown output format type"), + /** The system was unable to load the specified class. */ + GENERIC_HDFS_CONNECTOR_0003("Unable to load the specified class"), + /** The system was unable to instantiate the specified class. */ + GENERIC_HDFS_CONNECTOR_0004("Unable to instantiate the specified class"), + /** Error occurs during loader run */ + GENERIC_HDFS_CONNECTOR_0005("Error occurs during loader run") + + ; + + private final String message; + + private HdfsConnectorError(String message) { + this.message = message; + } + + public String getCode() { + return name(); + } + + public String getMessage() { + return message; + } + +} diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java new file mode 100644 index 00000000..a27aff19 --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.connector.hdfs; + +import org.apache.sqoop.job.Constants; + +public final class HdfsConstants extends Constants { + + // Resource bundle name + public static final String RESOURCE_BUNDLE_NAME = + "hdfs-connector-resources"; + + public static final char DEFAULT_RECORD_DELIMITER = '\n'; + +} diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsDestroyer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsDestroyer.java new file mode 100644 index 00000000..74b1cb8b --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsDestroyer.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs; + +import org.apache.sqoop.job.etl.Destroyer; +import org.apache.sqoop.job.etl.DestroyerContext; + +public class HdfsDestroyer extends Destroyer { + /** + * Callback to clean up after job execution. + * + * @param context Destroyer context + * @param o Connection configuration object + * @param o2 Job configuration object + */ + @Override + public void destroy(DestroyerContext context, Object o, Object o2) { + //TODO: Add a "success" flag? + + } +} diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java new file mode 100644 index 00000000..fc12381a --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.Seekable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.util.LineReader; +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.connector.hdfs.configuration.ConnectionConfiguration; +import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration; +import org.apache.sqoop.etl.io.DataWriter; +import org.apache.sqoop.job.etl.Extractor; +import org.apache.sqoop.job.etl.ExtractorContext; +import org.apache.log4j.Logger; +import org.apache.hadoop.conf.Configuration; +import org.apache.sqoop.common.PrefixContext; + +import java.io.IOException; + +/** + * Extract from HDFS. + * Default field delimiter of a record is comma. + */ + + +public class HdfsExtractor extends Extractor { + + public static final Logger LOG = Logger.getLogger(HdfsExtractor.class); + + private Configuration conf; + private DataWriter dataWriter; + private long rowRead = 0; + + @Override + public void extract(ExtractorContext context, + ConnectionConfiguration connectionConfiguration, + FromJobConfiguration jobConfiguration, HdfsPartition partition) { + + conf = ((PrefixContext) context.getContext()).getConfiguration(); + dataWriter = context.getDataWriter(); + + try { + HdfsPartition p = partition; + LOG.info("Working on partition: " + p); + int numFiles = p.getNumberOfFiles(); + for (int i = 0; i < numFiles; i++) { + extractFile(p.getFile(i), p.getOffset(i), p.getLength(i)); + } + } catch (IOException e) { + throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0001, e); + } + } + + private void extractFile(Path file, long start, long length) + throws IOException { + long end = start + length; + LOG.info("Extracting file " + file); + LOG.info("\t from offset " + start); + LOG.info("\t to offset " + end); + LOG.info("\t of length " + length); + if(isSequenceFile(file)) { + extractSequenceFile(file, start, length); + } else { + extractTextFile(file, start, length); + } + } + + /** + * Extracts Sequence file + * @param file + * @param start + * @param length + * @throws IOException + */ + private void extractSequenceFile(Path file, long start, long length) + throws IOException { + LOG.info("Extracting sequence file"); + long end = start + length; + SequenceFile.Reader filereader = new SequenceFile.Reader( + file.getFileSystem(conf), file, conf); + + if (start > filereader.getPosition()) { + filereader.sync(start); // sync to start + } + + Text line = new Text(); + boolean hasNext = filereader.next(line); + while (hasNext) { + rowRead++; + dataWriter.writeStringRecord(line.toString()); + line = new Text(); + hasNext = filereader.next(line); + if (filereader.getPosition() >= end && filereader.syncSeen()) { + break; + } + } + filereader.close(); + } + + /** + * Extracts Text file + * @param file + * @param start + * @param length + * @throws IOException + */ + private void extractTextFile(Path file, long start, long length) + throws IOException { + LOG.info("Extracting text file"); + long end = start + length; + FileSystem fs = file.getFileSystem(conf); + FSDataInputStream filestream = fs.open(file); + CompressionCodec codec = (new CompressionCodecFactory(conf)).getCodec(file); + LineReader filereader; + Seekable fileseeker = filestream; + + // Hadoop 1.0 does not have support for custom record delimiter and thus + // we + // are supporting only default one. + // We might add another "else if" case for SplittableCompressionCodec once + // we drop support for Hadoop 1.0. + if (codec == null) { + filestream.seek(start); + filereader = new LineReader(filestream); + } else { + filereader = new LineReader(codec.createInputStream(filestream, + codec.createDecompressor()), conf); + fileseeker = filestream; + } + if (start != 0) { + // always throw away first record because + // one extra line is read in previous split + start += filereader.readLine(new Text(), 0); + } + int size; + LOG.info("Start position: " + String.valueOf(start)); + long next = start; + while (next <= end) { + Text line = new Text(); + size = filereader.readLine(line, Integer.MAX_VALUE); + if (size == 0) { + break; + } + if (codec == null) { + next += size; + } else { + next = fileseeker.getPos(); + } + rowRead++; + dataWriter.writeStringRecord(line.toString()); + } + LOG.info("Extracting ended on position: " + fileseeker.getPos()); + filestream.close(); + } + + @Override + public long getRowsRead() { + return rowRead; + } + + /** + * Returns true if given file is sequence + * @param file + * @return boolean + */ + private boolean isSequenceFile(Path file) { + SequenceFile.Reader filereader = null; + try { + filereader = new SequenceFile.Reader(file.getFileSystem(conf), file, conf); + filereader.close(); + } catch (IOException e) { + return false; + } + return true; + } + + +} diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsInitializer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsInitializer.java new file mode 100644 index 00000000..d2d12a82 --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsInitializer.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs; + +import org.apache.sqoop.job.etl.Initializer; +import org.apache.sqoop.job.etl.InitializerContext; +import org.apache.sqoop.schema.Schema; + + +public class HdfsInitializer extends Initializer { + /** + * Initialize new submission based on given configuration properties. Any + * needed temporary values might be saved to context object and they will be + * promoted to all other part of the workflow automatically. + * + * @param context Initializer context object + * @param connection Connector's connection configuration object + * @param job Connector's job configuration object + */ + @Override + public void initialize(InitializerContext context, Object connection, Object job) { + + } + + + @Override + public Schema getSchema(InitializerContext context, Object connection, Object job) { + return null; + } +} diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java new file mode 100644 index 00000000..5a924f91 --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.connector.hdfs.configuration.ConnectionConfiguration; +import org.apache.sqoop.connector.hdfs.configuration.OutputFormat; +import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration; +import org.apache.sqoop.connector.hdfs.hdfsWriter.GenericHdfsWriter; +import org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsSequenceWriter; +import org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsTextWriter; +import org.apache.sqoop.etl.io.DataReader; +import org.apache.sqoop.job.etl.Loader; +import org.apache.sqoop.job.etl.LoaderContext; +import org.apache.sqoop.utils.ClassUtils; + +import java.io.IOException; +import java.util.UUID; + +public class HdfsLoader extends Loader { + /** + * Load data to target. + * + * @param context Loader context object + * @param connection Connection configuration + * @param job Job configuration + * @throws Exception + */ + @Override + public void load(LoaderContext context, ConnectionConfiguration connection, ToJobConfiguration job) throws Exception { + + DataReader reader = context.getDataReader(); + + Configuration conf = new Configuration(); + + String directoryName = job.output.outputDirectory; + String codecname = getCompressionCodecName(job); + + CompressionCodec codec = null; + if (codecname != null) { + Class clz = ClassUtils.loadClass(codecname); + if (clz == null) { + throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0003, codecname); + } + + try { + codec = (CompressionCodec) clz.newInstance(); + if (codec instanceof Configurable) { + ((Configurable) codec).setConf(conf); + } + } catch (Exception e) { + throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0004, codecname, e); + } + } + + String filename = directoryName + "/" + UUID.randomUUID() + getExtension(job,codec); + + try { + Path filepath = new Path(filename); + + GenericHdfsWriter filewriter = getWriter(job); + + filewriter.initialize(filepath,conf,codec); + + String csv; + + while ((csv = reader.readTextRecord()) != null) { + filewriter.write(csv); + } + filewriter.destroy(); + + } catch (IOException e) { + throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0005, e); + } + + } + + private GenericHdfsWriter getWriter(ToJobConfiguration job) { + if (job.output.outputFormat == OutputFormat.SEQUENCE_FILE) + return new HdfsSequenceWriter(); + else + return new HdfsTextWriter(); + } + + + private String getCompressionCodecName(ToJobConfiguration jobConf) { + if(jobConf.output.compression == null) + return null; + switch(jobConf.output.compression) { + case NONE: + return null; + case DEFAULT: + return "org.apache.hadoop.io.compress.DefaultCodec"; + case DEFLATE: + return "org.apache.hadoop.io.compress.DeflateCodec"; + case GZIP: + return "org.apache.hadoop.io.compress.GzipCodec"; + case BZIP2: + return "org.apache.hadoop.io.compress.BZip2Codec"; + case LZO: + return "com.hadoop.compression.lzo.LzoCodec"; + case LZ4: + return "org.apache.hadoop.io.compress.Lz4Codec"; + case SNAPPY: + return "org.apache.hadoop.io.compress.SnappyCodec"; + case CUSTOM: + return jobConf.output.customCompression.trim(); + } + return null; + } + + //TODO: We should probably support configurable extensions at some point + private static String getExtension(ToJobConfiguration job, CompressionCodec codec) { + if (job.output.outputFormat == OutputFormat.SEQUENCE_FILE) + return ".seq"; + if (codec == null) + return ".txt"; + return codec.getDefaultExtension(); + } + +} diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartition.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartition.java similarity index 92% rename from execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartition.java rename to connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartition.java index cdbdaa8d..b801356e 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartition.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartition.java @@ -16,19 +16,20 @@ * limitations under the License. */ -package org.apache.sqoop.job.etl; +package org.apache.sqoop.connector.hdfs; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.fs.Path; +import org.apache.sqoop.job.etl.Partition; /** * This class derives mostly from CombineFileSplit of Hadoop, i.e. * org.apache.hadoop.mapreduce.lib.input.CombineFileSplit. */ -public class HdfsExportPartition extends Partition { +public class HdfsPartition extends Partition { private long lenFiles; private int numFiles; @@ -37,10 +38,10 @@ public class HdfsExportPartition extends Partition { private long[] lengths; private String[] locations; - public HdfsExportPartition() {} + public HdfsPartition() {} - public HdfsExportPartition(Path[] files, long[] offsets, - long[] lengths, String[] locations) { + public HdfsPartition(Path[] files, long[] offsets, + long[] lengths, String[] locations) { for(long length : lengths) { this.lenFiles += length; } diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java similarity index 95% rename from execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java rename to connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java index b3590dc3..df764d20 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.sqoop.job.etl; +package org.apache.sqoop.connector.hdfs; import java.io.IOException; import java.util.ArrayList; @@ -39,15 +39,18 @@ import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.net.NetworkTopology; import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.job.JobConstants; -import org.apache.sqoop.job.MapreduceExecutionError; -import org.apache.sqoop.job.PrefixContext; +import org.apache.sqoop.connector.hdfs.configuration.ConnectionConfiguration; +import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration; +import org.apache.sqoop.job.etl.Partition; +import org.apache.sqoop.job.etl.Partitioner; +import org.apache.sqoop.job.etl.PartitionerContext; +import org.apache.sqoop.common.PrefixContext; /** * This class derives mostly from CombineFileInputFormat of Hadoop, i.e. * org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat. */ -public class HdfsExportPartitioner extends Partitioner { +public class HdfsPartitioner extends Partitioner { public static final String SPLIT_MINSIZE_PERNODE = "mapreduce.input.fileinputformat.split.minsize.per.node"; @@ -65,12 +68,12 @@ public class HdfsExportPartitioner extends Partitioner { @Override public List getPartitions(PartitionerContext context, - Object connectionConfiguration, Object jobConfiguration) { + ConnectionConfiguration connectionConfiguration, FromJobConfiguration jobConfiguration) { Configuration conf = ((PrefixContext)context.getContext()).getConfiguration(); try { - long numInputBytes = getInputSize(conf); + long numInputBytes = getInputSize(conf, jobConfiguration.input.inputDirectory); maxSplitSize = numInputBytes / context.getMaxPartitions(); if(numInputBytes % context.getMaxPartitions() != 0 ) { @@ -115,7 +118,7 @@ public List getPartitions(PartitionerContext context, } // all the files in input set - String indir = conf.get(JobConstants.HADOOP_INPUTDIR); + String indir = jobConfiguration.input.inputDirectory; FileSystem fs = FileSystem.get(conf); List paths = new LinkedList(); @@ -140,12 +143,12 @@ public List getPartitions(PartitionerContext context, return partitions; } catch (IOException e) { - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0021, e); + throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0000, e); } } - private long getInputSize(Configuration conf) throws IOException { - String indir = conf.get(JobConstants.HADOOP_INPUTDIR); + //TODO: Perhaps get the FS from connection configuration so we can support remote HDFS + private long getInputSize(Configuration conf, String indir) throws IOException { FileSystem fs = FileSystem.get(conf); FileStatus[] files = fs.listStatus(new Path(indir)); long count = 0; @@ -372,7 +375,7 @@ private void addCreatedSplit(List partitions, } // add this split to the list that is returned - HdfsExportPartition partition = new HdfsExportPartition( + HdfsPartition partition = new HdfsPartition( files, offsets, lengths, locations.toArray(new String[0])); partitions.add(partition); } diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsValidator.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsValidator.java new file mode 100644 index 00000000..4efbd334 --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsValidator.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs; + +import org.apache.sqoop.connector.hdfs.configuration.*; +import org.apache.sqoop.validation.Status; +import org.apache.sqoop.validation.Validation; +import org.apache.sqoop.validation.Validator; + +/** + * Validate framework configuration objects + */ +public class HdfsValidator extends Validator { + + @Override + public Validation validateConnection(Object connectionConfiguration) { + Validation validation = new Validation(ConnectionConfiguration.class); + // No validation on connection object + return validation; + } + + + @Override + public Validation validateJob(Object jobConfiguration) { + //TODO: I'm pretty sure this needs to call either validateExportJob or validateImportJob, depending on context + return super.validateJob(jobConfiguration); + } + + private Validation validateExportJob(Object jobConfiguration) { + Validation validation = new Validation(FromJobConfiguration.class); + FromJobConfiguration configuration = (FromJobConfiguration)jobConfiguration; + + validateInputForm(validation, configuration.input); + + + return validation; + } + + private Validation validateImportJob(Object jobConfiguration) { + Validation validation = new Validation(ToJobConfiguration.class); + ToJobConfiguration configuration = (ToJobConfiguration)jobConfiguration; + + validateOutputForm(validation, configuration.output); + + return validation; + } + + private void validateInputForm(Validation validation, InputForm input) { + if(input.inputDirectory == null || input.inputDirectory.isEmpty()) { + validation.addMessage(Status.UNACCEPTABLE, "input", "inputDirectory", "Input directory is empty"); + } + } + + private void validateOutputForm(Validation validation, OutputForm output) { + if(output.outputDirectory == null || output.outputDirectory.isEmpty()) { + validation.addMessage(Status.UNACCEPTABLE, "output", "outputDirectory", "Output directory is empty"); + } + if(output.customCompression != null && + output.customCompression.trim().length() > 0 && + output.compression != OutputCompression.CUSTOM) { + validation.addMessage(Status.UNACCEPTABLE, "output", "compression", + "custom compression should be blank as " + output.compression + " is being used."); + } + if(output.compression == OutputCompression.CUSTOM && + (output.customCompression == null || + output.customCompression.trim().length() == 0) + ) { + validation.addMessage(Status.UNACCEPTABLE, "output", "compression", + "custom compression is blank."); + } + } + + +} diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/ImportJobConfiguration.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ConnectionConfiguration.java similarity index 76% rename from core/src/main/java/org/apache/sqoop/framework/configuration/ImportJobConfiguration.java rename to connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ConnectionConfiguration.java index 2a35eb98..6dd79d55 100644 --- a/core/src/main/java/org/apache/sqoop/framework/configuration/ImportJobConfiguration.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ConnectionConfiguration.java @@ -15,23 +15,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.sqoop.framework.configuration; +package org.apache.sqoop.connector.hdfs.configuration; import org.apache.sqoop.model.ConfigurationClass; import org.apache.sqoop.model.Form; -/** - * - */ @ConfigurationClass -public class ImportJobConfiguration { +public class ConnectionConfiguration { + @Form + public ConnectionForm connection; - @Form public OutputForm output; - - @Form public ThrottlingForm throttling; - - public ImportJobConfiguration() { - output = new OutputForm(); - throttling = new ThrottlingForm(); + public ConnectionConfiguration() { + connection = new ConnectionForm(); } } diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ConnectionForm.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ConnectionForm.java new file mode 100644 index 00000000..7dad2a29 --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ConnectionForm.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs.configuration; + +import org.apache.sqoop.model.FormClass; +import org.apache.sqoop.model.Input; + +@FormClass +public class ConnectionForm { + //Todo: Didn't find anything that belongs here... + // Since empty forms don't work (DERBYREPO_0008:The form contains no input metadata), I'm putting a dummy form here + + @Input(size = 255) public String dummy; +} diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/ExportJobConfiguration.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/FromJobConfiguration.java similarity index 82% rename from core/src/main/java/org/apache/sqoop/framework/configuration/ExportJobConfiguration.java rename to connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/FromJobConfiguration.java index 66654291..bccb99d9 100644 --- a/core/src/main/java/org/apache/sqoop/framework/configuration/ExportJobConfiguration.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/FromJobConfiguration.java @@ -15,23 +15,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.sqoop.framework.configuration; +package org.apache.sqoop.connector.hdfs.configuration; import org.apache.sqoop.model.ConfigurationClass; import org.apache.sqoop.model.Form; -/** - * - */ @ConfigurationClass -public class ExportJobConfiguration { - +public class FromJobConfiguration { @Form public InputForm input; - @Form public ThrottlingForm throttling; - public ExportJobConfiguration() { + public FromJobConfiguration() { input = new InputForm(); - throttling = new ThrottlingForm(); + } } diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/InputForm.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/InputForm.java similarity index 94% rename from core/src/main/java/org/apache/sqoop/framework/configuration/InputForm.java rename to connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/InputForm.java index d5cbeec4..413f04c1 100644 --- a/core/src/main/java/org/apache/sqoop/framework/configuration/InputForm.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/InputForm.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.sqoop.framework.configuration; +package org.apache.sqoop.connector.hdfs.configuration; import org.apache.sqoop.model.FormClass; import org.apache.sqoop.model.Input; diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/OutputCompression.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputCompression.java similarity index 94% rename from core/src/main/java/org/apache/sqoop/framework/configuration/OutputCompression.java rename to connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputCompression.java index 6cac46de..55db1bc4 100644 --- a/core/src/main/java/org/apache/sqoop/framework/configuration/OutputCompression.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputCompression.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.sqoop.framework.configuration; +package org.apache.sqoop.connector.hdfs.configuration; /** * Supported compressions diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/OutputForm.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputForm.java similarity index 92% rename from core/src/main/java/org/apache/sqoop/framework/configuration/OutputForm.java rename to connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputForm.java index b2cdb44f..d57b4c29 100644 --- a/core/src/main/java/org/apache/sqoop/framework/configuration/OutputForm.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputForm.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.sqoop.framework.configuration; +package org.apache.sqoop.connector.hdfs.configuration; import org.apache.sqoop.model.FormClass; import org.apache.sqoop.model.Input; @@ -26,8 +26,6 @@ @FormClass public class OutputForm { - @Input public StorageType storageType; - @Input public OutputFormat outputFormat; @Input public OutputCompression compression; diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/OutputFormat.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputFormat.java similarity index 94% rename from core/src/main/java/org/apache/sqoop/framework/configuration/OutputFormat.java rename to connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputFormat.java index 4cd3589b..676c33cf 100644 --- a/core/src/main/java/org/apache/sqoop/framework/configuration/OutputFormat.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputFormat.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.sqoop.framework.configuration; +package org.apache.sqoop.connector.hdfs.configuration; /** * Various supported formats on disk diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/StorageType.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/StorageType.java similarity index 94% rename from core/src/main/java/org/apache/sqoop/framework/configuration/StorageType.java rename to connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/StorageType.java index dbe9f952..d4aaa0aa 100644 --- a/core/src/main/java/org/apache/sqoop/framework/configuration/StorageType.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/StorageType.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.sqoop.framework.configuration; +package org.apache.sqoop.connector.hdfs.configuration; /** * Various storage types that Sqoop is supporting diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfiguration.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfiguration.java new file mode 100644 index 00000000..65ee8a7f --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfiguration.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs.configuration; + +import org.apache.sqoop.model.ConfigurationClass; +import org.apache.sqoop.model.Form; + +@ConfigurationClass +public class ToJobConfiguration { + @Form + public OutputForm output; + + public ToJobConfiguration() { + output = new OutputForm(); + } +} diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/GenericHdfsWriter.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/GenericHdfsWriter.java new file mode 100644 index 00000000..2ccccc4a --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/GenericHdfsWriter.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs.hdfsWriter; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.compress.CompressionCodec; + +import java.io.IOException; + +public abstract class GenericHdfsWriter { + + public abstract void initialize(Path filepath, Configuration conf, CompressionCodec codec) throws IOException; + + public abstract void write(String csv) throws IOException; + + public abstract void destroy() throws IOException; + +} diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsSequenceWriter.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsSequenceWriter.java new file mode 100644 index 00000000..eb801212 --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsSequenceWriter.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs.hdfsWriter; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.CompressionCodec; + + +import java.io.IOException; + +public class HdfsSequenceWriter extends GenericHdfsWriter { + + private SequenceFile.Writer filewriter; + private Text text; + + public void initialize(Path filepath, Configuration conf, CompressionCodec codec) throws IOException { + if (codec != null) { + filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf), + conf, filepath, Text.class, NullWritable.class, + SequenceFile.CompressionType.BLOCK, codec); + } else { + filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf), + conf, filepath, Text.class, NullWritable.class, SequenceFile.CompressionType.NONE); + } + + text = new Text(); + } + + @Override + public void write(String csv) throws IOException { + text.set(csv); + filewriter.append(text, NullWritable.get()); + } + + public void destroy() throws IOException { + filewriter.close(); + } +} diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsTextWriter.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsTextWriter.java new file mode 100644 index 00000000..78cf9732 --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsTextWriter.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs.hdfsWriter; + +import com.google.common.base.Charsets; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.sqoop.connector.hdfs.HdfsConstants; + +import java.io.BufferedWriter; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; + +public class HdfsTextWriter extends GenericHdfsWriter { + + private BufferedWriter filewriter; + + @Override + public void initialize(Path filepath, Configuration conf, CompressionCodec codec) throws IOException { + FileSystem fs = filepath.getFileSystem(conf); + + DataOutputStream filestream = fs.create(filepath, false); + if (codec != null) { + filewriter = new BufferedWriter(new OutputStreamWriter( + codec.createOutputStream(filestream, codec.createCompressor()), + Charsets.UTF_8)); + } else { + filewriter = new BufferedWriter(new OutputStreamWriter( + filestream, Charsets.UTF_8)); + } + } + + @Override + public void write(String csv) throws IOException { + filewriter.write(csv + HdfsConstants.DEFAULT_RECORD_DELIMITER); + + } + + @Override + public void destroy() throws IOException { + filewriter.close(); + } +} diff --git a/connector/connector-hdfs/src/main/resources/hdfs-connector-resources.properties b/connector/connector-hdfs/src/main/resources/hdfs-connector-resources.properties new file mode 100644 index 00000000..31259117 --- /dev/null +++ b/connector/connector-hdfs/src/main/resources/hdfs-connector-resources.properties @@ -0,0 +1,58 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Generic HDFS Connector Resources + +############################ +# Connection Form +# +connection.label = Connection configuration +connection.help = You must supply the information requested in order to \ + create a connection object. + +connection.dummy.label = Dummy parameter needed to get HDFS connector to register +connection.dummy.help = You can write anything here. Doesn't matter. + +# Output From +# +output.label = Output configuration +output.help = You must supply the information requested in order to \ + get information where you want to store your data. + +output.storageType.label = Storage type +output.storageType.help = Target on Hadoop ecosystem where to store data + +output.outputFormat.label = Output format +output.outputFormat.help = Format in which data should be serialized + +output.compression.label = Compression format +output.compression.help = Compression that should be used for the data + +output.customCompression.label = Custom compression format +output.customCompression.help = Full class name of the custom compression + +output.outputDirectory.label = Output directory +output.outputDirectory.help = Output directory for final data + +output.ignored.label = Ignored +output.ignored.help = This value is ignored + +# Input Form +# +input.label = Input configuration +input.help = Specifies information required to get data from Hadoop ecosystem + +input.inputDirectory.label = Input directory +input.inputDirectory.help = Directory that should be exported diff --git a/connector/connector-hdfs/src/main/resources/sqoopconnector.properties b/connector/connector-hdfs/src/main/resources/sqoopconnector.properties new file mode 100644 index 00000000..fa4e5e13 --- /dev/null +++ b/connector/connector-hdfs/src/main/resources/sqoopconnector.properties @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Generic HDFS Connector Properties +org.apache.sqoop.connector.class = org.apache.sqoop.connector.hdfs.HdfsConnector +org.apache.sqoop.connector.name = hdfs-connector \ No newline at end of file diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java index 39d48c79..1e8ab52e 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java @@ -180,6 +180,10 @@ public Object[] getObjectData() { return null; } + if (schema == null) { + throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0006); + } + if (fields.length != schema.getColumns().size()) { throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005, "The data " + getTextData() + " has the wrong number of fields."); @@ -189,7 +193,8 @@ public Object[] getObjectData() { Column[] cols = schema.getColumns().toArray(new Column[fields.length]); for (int i = 0; i < fields.length; i++) { Type colType = cols[i].getType(); - if (fields[i].equals("NULL")) { + //TODO: Replace with proper isNull method. Actually the entire content of the loop should be a parse method + if (fields[i].equals("NULL") || fields[i].equals("null") || fields[i].equals("'null'") || fields[i].isEmpty()) { out[i] = null; continue; } diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java index 92190744..4d41679d 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java @@ -37,7 +37,9 @@ public enum IntermediateDataFormatError implements ErrorCode { INTERMEDIATE_DATA_FORMAT_0004("Unknown column type."), /** Number of fields. */ - INTERMEDIATE_DATA_FORMAT_0005("Wrong number of fields.") + INTERMEDIATE_DATA_FORMAT_0005("Wrong number of fields."), + + INTERMEDIATE_DATA_FORMAT_0006("Schema missing.") ; diff --git a/connector/pom.xml b/connector/pom.xml index d642c3ee..e98a0fcf 100644 --- a/connector/pom.xml +++ b/connector/pom.xml @@ -35,10 +35,11 @@ limitations under the License. connector-sdk connector-generic-jdbc - + connector-hdfs + diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java index f19a23e3..46257f27 100644 --- a/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java +++ b/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java @@ -18,20 +18,13 @@ package org.apache.sqoop.framework; import org.apache.sqoop.framework.configuration.ConnectionConfiguration; -import org.apache.sqoop.framework.configuration.InputForm; import org.apache.sqoop.framework.configuration.JobConfiguration; -import org.apache.sqoop.framework.configuration.OutputCompression; -import org.apache.sqoop.framework.configuration.OutputForm; import org.apache.sqoop.framework.configuration.ThrottlingForm; import org.apache.sqoop.validation.Status; import org.apache.sqoop.validation.Validation; import org.apache.sqoop.validation.Validator; -/** - * Validate framework configuration objects - */ public class FrameworkValidator extends Validator { - @Override public Validation validateConnection(Object connectionConfiguration) { Validation validation = new Validation(ConnectionConfiguration.class); @@ -39,61 +32,16 @@ public Validation validateConnection(Object connectionConfiguration) { return validation; } - @Override public Validation validateJob(Object jobConfiguration) { - JobConfiguration configuration = (JobConfiguration)jobConfiguration; Validation validation = new Validation(JobConfiguration.class); - validateThrottingForm(validation, configuration.throttling); - return super.validateJob(jobConfiguration); - } + JobConfiguration conf = (JobConfiguration)jobConfiguration; + validateThrottlingForm(validation,conf.throttling); -// private Validation validateExportJob(Object jobConfiguration) { -// Validation validation = new Validation(ExportJobConfiguration.class); -// ExportJobConfiguration configuration = (ExportJobConfiguration)jobConfiguration; -// -// validateInputForm(validation, configuration.input); -// validateThrottingForm(validation, configuration.throttling); -// -// return validation; -// } -// -// private Validation validateImportJob(Object jobConfiguration) { -// Validation validation = new Validation(ImportJobConfiguration.class); -// ImportJobConfiguration configuration = (ImportJobConfiguration)jobConfiguration; -// -// validateOutputForm(validation, configuration.output); -// validateThrottingForm(validation, configuration.throttling); -// -// return validation; -// } + return validation; + }; -// private void validateInputForm(Validation validation, InputForm input) { -// if(input.inputDirectory == null || input.inputDirectory.isEmpty()) { -// validation.addMessage(Status.UNACCEPTABLE, "input", "inputDirectory", "Input directory is empty"); -// } -// } -// -// private void validateOutputForm(Validation validation, OutputForm output) { -// if(output.outputDirectory == null || output.outputDirectory.isEmpty()) { -// validation.addMessage(Status.UNACCEPTABLE, "output", "outputDirectory", "Output directory is empty"); -// } -// if(output.customCompression != null && -// output.customCompression.trim().length() > 0 && -// output.compression != OutputCompression.CUSTOM) { -// validation.addMessage(Status.UNACCEPTABLE, "output", "compression", -// "custom compression should be blank as " + output.compression + " is being used."); -// } -// if(output.compression == OutputCompression.CUSTOM && -// (output.customCompression == null || -// output.customCompression.trim().length() == 0) -// ) { -// validation.addMessage(Status.UNACCEPTABLE, "output", "compression", -// "custom compression is blank."); -// } -// } - - private void validateThrottingForm(Validation validation, ThrottlingForm throttling) { + private void validateThrottlingForm(Validation validation, ThrottlingForm throttling) { if(throttling.extractors != null && throttling.extractors < 1) { validation.addMessage(Status.UNACCEPTABLE, "throttling", "extractors", "You need to specify more than one extractor"); } @@ -102,4 +50,5 @@ private void validateThrottingForm(Validation validation, ThrottlingForm throttl validation.addMessage(Status.UNACCEPTABLE, "throttling", "loaders", "You need to specify more than one loader"); } } + } diff --git a/core/src/main/java/org/apache/sqoop/framework/JobManager.java b/core/src/main/java/org/apache/sqoop/framework/JobManager.java index 55719286..b1b37f61 100644 --- a/core/src/main/java/org/apache/sqoop/framework/JobManager.java +++ b/core/src/main/java/org/apache/sqoop/framework/JobManager.java @@ -36,6 +36,7 @@ import org.apache.sqoop.model.MSubmission; import org.apache.sqoop.repository.Repository; import org.apache.sqoop.repository.RepositoryManager; +import org.apache.sqoop.schema.Schema; import org.apache.sqoop.submission.SubmissionStatus; import org.apache.sqoop.submission.counter.Counters; import org.apache.sqoop.utils.ClassUtils; @@ -434,12 +435,17 @@ public MSubmission submit(long jobId, HttpEventContext ctx) { request.getConnectorJobConfig(Direction.FROM))); // @TODO(Abe): Alter behavior of Schema here. Need from Schema. - // Retrieve and persist the schema - request.getSummary().setConnectorSchema(initializer.getSchema( - initializerContext, - request.getConnectorConnectionConfig(Direction.FROM), - request.getConnectorJobConfig(Direction.FROM) - )); + + + Schema fromSchema = initializer.getSchema(initializerContext, + request.getConnectorConnectionConfig(Direction.FROM), + request.getConnectorJobConfig(Direction.FROM)); + + // request.getSummary().setConnectorSchema(initializer.getSchema( + // initializerContext, + // request.getConnectorConnectionConfig(ConnectorType.FROM), + // request.getConnectorJobConfig(ConnectorType.FROM) + // )); // Initialize To Connector callback. baseCallback = request.getToCallback(); @@ -468,6 +474,11 @@ public MSubmission submit(long jobId, HttpEventContext ctx) { request.getConnectorJobConfig(Direction.TO))); // @TODO(Abe): Alter behavior of Schema here. Need To Schema. + + Schema toSchema = initializer.getSchema(initializerContext, + request.getConnectorConnectionConfig(Direction.TO), + request.getConnectorJobConfig(Direction.TO)); + // Retrieve and persist the schema // request.getSummary().setConnectorSchema(initializer.getSchema( // initializerContext, @@ -475,6 +486,12 @@ public MSubmission submit(long jobId, HttpEventContext ctx) { // request.getConnectorJobConfig(ConnectorType.TO) // )); + //TODO: Need better logic here + if (fromSchema != null) + request.getSummary().setConnectorSchema(fromSchema); + else + request.getSummary().setConnectorSchema(toSchema); + // Bootstrap job from framework perspective prepareSubmission(request); diff --git a/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java b/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java index be6099ec..bf3f7852 100644 --- a/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java +++ b/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java @@ -176,9 +176,11 @@ public void setConnector(Direction type, SqoopConnector connector) { switch(type) { case FROM: fromConnector = connector; + break; case TO: toConnector = connector; + break; default: throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type); @@ -238,10 +240,10 @@ public void setConnectorConnectionConfig(Direction type, Object config) { switch(type) { case FROM: fromConnectorConnectionConfig = config; - + break; case TO: toConnectorConnectionConfig = config; - + break; default: throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type); } @@ -264,10 +266,10 @@ public void setConnectorJobConfig(Direction type, Object config) { switch(type) { case FROM: fromConnectorJobConfig = config; - + break; case TO: toConnectorJobConfig = config; - + break; default: throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type); } @@ -290,10 +292,10 @@ public void setFrameworkConnectionConfig(Direction type, Object config) { switch(type) { case FROM: fromFrameworkConnectionConfig = config; - + break; case TO: toFrameworkConnectionConfig = config; - + break; default: throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type); } diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/JobConfiguration.java b/core/src/main/java/org/apache/sqoop/framework/configuration/JobConfiguration.java index 7c653bf6..0abc6115 100644 --- a/core/src/main/java/org/apache/sqoop/framework/configuration/JobConfiguration.java +++ b/core/src/main/java/org/apache/sqoop/framework/configuration/JobConfiguration.java @@ -22,8 +22,8 @@ @ConfigurationClass public class JobConfiguration { - - @Form public ThrottlingForm throttling; + @Form + public ThrottlingForm throttling; public JobConfiguration() { throttling = new ThrottlingForm(); diff --git a/core/src/test/java/org/apache/sqoop/framework/TestFrameworkValidator.java b/core/src/test/java/org/apache/sqoop/framework/TestFrameworkValidator.java index f875ceb6..90395acd 100644 --- a/core/src/test/java/org/apache/sqoop/framework/TestFrameworkValidator.java +++ b/core/src/test/java/org/apache/sqoop/framework/TestFrameworkValidator.java @@ -17,15 +17,7 @@ */ package org.apache.sqoop.framework; -import org.apache.sqoop.framework.configuration.ConnectionConfiguration; -import org.apache.sqoop.framework.configuration.ExportJobConfiguration; -import org.apache.sqoop.framework.configuration.ImportJobConfiguration; -import org.apache.sqoop.framework.configuration.OutputCompression; -import org.apache.sqoop.model.MJob; -import org.apache.sqoop.validation.Status; -import org.apache.sqoop.validation.Validation; -import org.junit.Before; -import org.junit.Test; +//import org.apache.sqoop.framework.configuration.OutputCompression; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; diff --git a/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java b/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java index 31df04c1..f19e01c2 100644 --- a/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java +++ b/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java @@ -28,7 +28,7 @@ import org.apache.sqoop.connector.spi.MetadataUpgrader; import org.apache.sqoop.connector.spi.SqoopConnector; import org.apache.sqoop.framework.FrameworkManager; -import org.apache.sqoop.framework.configuration.ImportJobConfiguration; +//import org.apache.sqoop.framework.configuration.ImportJobConfiguration; import org.apache.sqoop.model.ConfigurationClass; import org.apache.sqoop.model.FormUtils; import org.apache.sqoop.model.MConnection; diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java index ff328cb0..b05954bb 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java @@ -80,49 +80,9 @@ public void prepareSubmission(SubmissionRequest gRequest) { if(request.getExtractors() != null) { context.setInteger(JobConstants.JOB_ETL_EXTRACTOR_NUM, request.getExtractors()); } - - // @TODO(Abe): Move to HDFS connector. -// if(jobConf.output.outputFormat == OutputFormat.TEXT_FILE) { -// context.setString(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName()); -// } else if(jobConf.output.outputFormat == OutputFormat.SEQUENCE_FILE) { -// context.setString(JobConstants.JOB_ETL_LOADER, HdfsSequenceImportLoader.class.getName()); -// } else { -// throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0024, -// "Format: " + jobConf.output.outputFormat); -// } -// if(getCompressionCodecName(jobConf) != null) { -// context.setString(JobConstants.HADOOP_COMPRESS_CODEC, -// getCompressionCodecName(jobConf)); -// context.setBoolean(JobConstants.HADOOP_COMPRESS, true); -// } } - // @TODO(Abe): Move to HDFS connector. -// private String getCompressionCodecName(ImportJobConfiguration jobConf) { -// if(jobConf.output.compression == null) -// return null; -// switch(jobConf.output.compression) { -// case NONE: -// return null; -// case DEFAULT: -// return "org.apache.hadoop.io.compress.DefaultCodec"; -// case DEFLATE: -// return "org.apache.hadoop.io.compress.DeflateCodec"; -// case GZIP: -// return "org.apache.hadoop.io.compress.GzipCodec"; -// case BZIP2: -// return "org.apache.hadoop.io.compress.BZip2Codec"; -// case LZO: -// return "com.hadoop.compression.lzo.LzoCodec"; -// case LZ4: -// return "org.apache.hadoop.io.compress.Lz4Codec"; -// case SNAPPY: -// return "org.apache.hadoop.io.compress.SnappyCodec"; -// case CUSTOM: -// return jobConf.output.customCompression.trim(); -// } -// return null; -// } + /** * Our execution engine have additional dependencies that needs to be available diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java deleted file mode 100644 index 27afd8c2..00000000 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java +++ /dev/null @@ -1,194 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.job.etl; - -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.Seekable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.CompressionCodecFactory; -import org.apache.hadoop.util.LineReader; -import org.apache.log4j.Logger; -import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.etl.io.DataWriter; -import org.apache.sqoop.framework.configuration.ConnectionConfiguration; -import org.apache.sqoop.framework.configuration.ExportJobConfiguration; -import org.apache.sqoop.job.MapreduceExecutionError; -import org.apache.sqoop.job.PrefixContext; - -/** - * Extract from HDFS. - * Default field delimiter of a record is comma. - */ -//public class HdfsExportExtractor extends Extractor { -// -// public static final Logger LOG = Logger.getLogger(HdfsExportExtractor.class); -// -// private Configuration conf; -// private DataWriter dataWriter; -// private long rowRead = 0; -// -// @Override -// public void extract(ExtractorContext context, -// ConnectionConfiguration connectionConfiguration, -// ExportJobConfiguration jobConfiguration, HdfsExportPartition partition) { -// -// conf = ((PrefixContext) context.getContext()).getConfiguration(); -// dataWriter = context.getDataWriter(); -// -// try { -// HdfsExportPartition p = partition; -// LOG.info("Working on partition: " + p); -// int numFiles = p.getNumberOfFiles(); -// for (int i = 0; i < numFiles; i++) { -// extractFile(p.getFile(i), p.getOffset(i), p.getLength(i)); -// } -// } catch (IOException e) { -// throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0017, e); -// } -// } -// -// private void extractFile(Path file, long start, long length) -// throws IOException { -// long end = start + length; -// LOG.info("Extracting file " + file); -// LOG.info("\t from offset " + start); -// LOG.info("\t to offset " + end); -// LOG.info("\t of length " + length); -// if(isSequenceFile(file)) { -// extractSequenceFile(file, start, length); -// } else { -// extractTextFile(file, start, length); -// } -// } -// -// /** -// * Extracts Sequence file -// * @param file -// * @param start -// * @param length -// * @throws IOException -// */ -// private void extractSequenceFile(Path file, long start, long length) -// throws IOException { -// LOG.info("Extracting sequence file"); -// long end = start + length; -// SequenceFile.Reader filereader = new SequenceFile.Reader( -// file.getFileSystem(conf), file, conf); -// -// if (start > filereader.getPosition()) { -// filereader.sync(start); // sync to start -// } -// -// Text line = new Text(); -// boolean hasNext = filereader.next(line); -// while (hasNext) { -// rowRead++; -// dataWriter.writeStringRecord(line.toString()); -// line = new Text(); -// hasNext = filereader.next(line); -// if (filereader.getPosition() >= end && filereader.syncSeen()) { -// break; -// } -// } -// filereader.close(); -// } -// -// /** -// * Extracts Text file -// * @param file -// * @param start -// * @param length -// * @throws IOException -// */ -// private void extractTextFile(Path file, long start, long length) -// throws IOException { -// LOG.info("Extracting text file"); -// long end = start + length; -// FileSystem fs = file.getFileSystem(conf); -// FSDataInputStream filestream = fs.open(file); -// CompressionCodec codec = (new CompressionCodecFactory(conf)).getCodec(file); -// LineReader filereader; -// Seekable fileseeker = filestream; -// -// // Hadoop 1.0 does not have support for custom record delimiter and thus -// // we -// // are supporting only default one. -// // We might add another "else if" case for SplittableCompressionCodec once -// // we drop support for Hadoop 1.0. -// if (codec == null) { -// filestream.seek(start); -// filereader = new LineReader(filestream); -// } else { -// filereader = new LineReader(codec.createInputStream(filestream, -// codec.createDecompressor()), conf); -// fileseeker = filestream; -// } -// if (start != 0) { -// // always throw away first record because -// // one extra line is read in previous split -// start += filereader.readLine(new Text(), 0); -// } -// int size; -// LOG.info("Start position: " + String.valueOf(start)); -// long next = start; -// while (next <= end) { -// Text line = new Text(); -// size = filereader.readLine(line, Integer.MAX_VALUE); -// if (size == 0) { -// break; -// } -// if (codec == null) { -// next += size; -// } else { -// next = fileseeker.getPos(); -// } -// rowRead++; -// dataWriter.writeStringRecord(line.toString()); -// } -// LOG.info("Extracting ended on position: " + fileseeker.getPos()); -// filestream.close(); -// } -// -// @Override -// public long getRowsRead() { -// return rowRead; -// } -// -// /** -// * Returns true if given file is sequence -// * @param file -// * @return boolean -// */ -// private boolean isSequenceFile(Path file) { -// SequenceFile.Reader filereader = null; -// try { -// filereader = new SequenceFile.Reader(file.getFileSystem(conf), file, conf); -// filereader.close(); -// } catch (IOException e) { -// return false; -// } -// return true; -// } -//} diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java deleted file mode 100644 index d4ffb130..00000000 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java +++ /dev/null @@ -1,94 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.job.etl; - -import java.io.IOException; - -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.SequenceFile.CompressionType; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.job.JobConstants; -import org.apache.sqoop.job.MapreduceExecutionError; -import org.apache.sqoop.etl.io.DataReader; -import org.apache.sqoop.utils.ClassUtils; - -public class HdfsSequenceImportLoader extends Loader { - - public static final String EXTENSION = ".seq"; - - @Override - public void load(LoaderContext context, Object oc, Object oj) throws Exception { - DataReader reader = context.getDataReader(); - - Configuration conf = new Configuration(); -// Configuration conf = ((EtlContext)context).getConfiguration(); - String filename = context.getString(JobConstants.JOB_MR_OUTPUT_FILE); - String codecname = context.getString(JobConstants.JOB_MR_OUTPUT_CODEC); - - CompressionCodec codec = null; - if (codecname != null) { - Class clz = ClassUtils.loadClass(codecname); - if (clz == null) { - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0009, codecname); - } - - try { - codec = (CompressionCodec) clz.newInstance(); - if (codec instanceof Configurable) { - ((Configurable) codec).setConf(conf); - } - } catch (Exception e) { - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0010, codecname, e); - } - } - - filename += EXTENSION; - - try { - Path filepath = new Path(filename); - SequenceFile.Writer filewriter; - if (codec != null) { - filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf), - conf, filepath, Text.class, NullWritable.class, - CompressionType.BLOCK, codec); - } else { - filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf), - conf, filepath, Text.class, NullWritable.class, CompressionType.NONE); - } - - String csv; - Text text = new Text(); - while ((csv = reader.readTextRecord()) != null) { - text.set(csv); - filewriter.append(text, NullWritable.get()); - } - filewriter.close(); - - } catch (IOException e) { - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0018, e); - } - - } - -} diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java deleted file mode 100644 index 7b799ca6..00000000 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java +++ /dev/null @@ -1,101 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.job.etl; - -import java.io.BufferedWriter; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.OutputStreamWriter; - -import com.google.common.base.Charsets; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.job.JobConstants; -import org.apache.sqoop.job.MapreduceExecutionError; -import org.apache.sqoop.job.io.Data; -import org.apache.sqoop.etl.io.DataReader; -import org.apache.sqoop.utils.ClassUtils; - -public class HdfsTextImportLoader extends Loader { - - private final char recordDelimiter; - - public HdfsTextImportLoader() { - recordDelimiter = Data.DEFAULT_RECORD_DELIMITER; - } - - @Override - public void load(LoaderContext context, Object oc, Object oj) throws Exception{ - DataReader reader = context.getDataReader(); - - Configuration conf = new Configuration(); -// Configuration conf = ((EtlContext)context).getConfiguration(); - String filename = context.getString(JobConstants.JOB_MR_OUTPUT_FILE); - String codecname = context.getString(JobConstants.JOB_MR_OUTPUT_CODEC); - - CompressionCodec codec = null; - if (codecname != null) { - Class clz = ClassUtils.loadClass(codecname); - if (clz == null) { - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0009, codecname); - } - - try { - codec = (CompressionCodec) clz.newInstance(); - if (codec instanceof Configurable) { - ((Configurable) codec).setConf(conf); - } - } catch (Exception e) { - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0010, codecname, e); - } - - filename += codec.getDefaultExtension(); - } - - try { - Path filepath = new Path(filename); - FileSystem fs = filepath.getFileSystem(conf); - - BufferedWriter filewriter; - DataOutputStream filestream = fs.create(filepath, false); - if (codec != null) { - filewriter = new BufferedWriter(new OutputStreamWriter( - codec.createOutputStream(filestream, codec.createCompressor()), - Charsets.UTF_8)); - } else { - filewriter = new BufferedWriter(new OutputStreamWriter( - filestream, Charsets.UTF_8)); - } - - String csv; - while ((csv = reader.readTextRecord()) != null) { - filewriter.write(csv + recordDelimiter); - } - filewriter.close(); - - } catch (IOException e) { - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0018, e); - } - - } - -} diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java index 8e31ef5f..59431f43 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java @@ -21,7 +21,7 @@ import org.apache.log4j.Logger; import org.apache.sqoop.common.Direction; import org.apache.sqoop.job.JobConstants; -import org.apache.sqoop.job.PrefixContext; +import org.apache.sqoop.common.PrefixContext; import org.apache.sqoop.job.etl.Destroyer; import org.apache.sqoop.job.etl.DestroyerContext; import org.apache.sqoop.schema.Schema; diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java index e96909a9..1c1133a8 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java @@ -33,7 +33,7 @@ import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.job.JobConstants; import org.apache.sqoop.job.MapreduceExecutionError; -import org.apache.sqoop.job.PrefixContext; +import org.apache.sqoop.common.PrefixContext; import org.apache.sqoop.job.etl.Partition; import org.apache.sqoop.job.etl.Partitioner; import org.apache.sqoop.job.etl.PartitionerContext; diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java index 6e2cfbf9..1d60ba37 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java @@ -31,7 +31,7 @@ import org.apache.sqoop.connector.idf.IntermediateDataFormat; import org.apache.sqoop.job.JobConstants; import org.apache.sqoop.job.MapreduceExecutionError; -import org.apache.sqoop.job.PrefixContext; +import org.apache.sqoop.common.PrefixContext; import org.apache.sqoop.job.etl.Extractor; import org.apache.sqoop.job.etl.ExtractorContext; import org.apache.sqoop.etl.io.DataWriter; @@ -66,7 +66,16 @@ public void run(Context context) throws IOException, InterruptedException { // Propagate connector schema in every case for now // TODO: Change to coditional choosing between Connector schemas. + Schema schema = ConfigurationUtils.getConnectorSchema(Direction.FROM, conf); + if (schema==null) { + schema = ConfigurationUtils.getConnectorSchema(Direction.TO, conf); + } + + if (schema==null) { + LOG.info("setting an empty schema"); + } + String intermediateDataFormatName = conf.get(JobConstants .INTERMEDIATE_DATA_FORMAT); diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java index 01c32e4b..e457cfff 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java @@ -36,7 +36,7 @@ import org.apache.sqoop.connector.idf.IntermediateDataFormat; import org.apache.sqoop.job.JobConstants; import org.apache.sqoop.job.MapreduceExecutionError; -import org.apache.sqoop.job.PrefixContext; +import org.apache.sqoop.common.PrefixContext; import org.apache.sqoop.job.etl.Loader; import org.apache.sqoop.job.etl.LoaderContext; import org.apache.sqoop.etl.io.DataReader; @@ -72,7 +72,13 @@ public SqoopOutputFormatLoadExecutor(JobContext jobctx) { producer = new SqoopRecordWriter(); data = (IntermediateDataFormat) ClassUtils.instantiate(context .getConfiguration().get(JobConstants.INTERMEDIATE_DATA_FORMAT)); - data.setSchema(ConfigurationUtils.getConnectorSchema(Direction.FROM, context.getConfiguration())); + + Schema schema = ConfigurationUtils.getConnectorSchema(Direction.FROM, context.getConfiguration()); + if (schema==null) { + schema = ConfigurationUtils.getConnectorSchema(Direction.TO, context.getConfiguration()); + } + + data.setSchema(schema); } public RecordWriter getRecordWriter() { diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngineTest.java b/execution/mapreduce/src/test/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngineTest.java index 0f2a8826..f70e9bd3 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngineTest.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngineTest.java @@ -17,18 +17,9 @@ */ package org.apache.sqoop.execution.mapreduce; -import org.apache.sqoop.common.MutableMapContext; //import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; -import org.apache.sqoop.framework.SubmissionRequest; -import org.apache.sqoop.framework.configuration.ImportJobConfiguration; -import org.apache.sqoop.framework.configuration.OutputCompression; -import org.apache.sqoop.framework.configuration.OutputFormat; -import org.apache.sqoop.job.JobConstants; -import org.apache.sqoop.job.etl.Destroyer; -import org.apache.sqoop.job.etl.Extractor; -import org.apache.sqoop.job.etl.Initializer; -import org.apache.sqoop.job.etl.Partitioner; -import org.junit.Test; +//import org.apache.sqoop.framework.configuration.OutputCompression; +//import org.apache.sqoop.framework.configuration.OutputFormat; import static junit.framework.TestCase.assertEquals; diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java index e460c3e6..2accf771 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java @@ -17,40 +17,11 @@ */ package org.apache.sqoop.job; -import java.io.BufferedWriter; -import java.io.IOException; -import java.io.OutputStream; -import java.io.OutputStreamWriter; -import java.util.List; - import junit.framework.TestCase; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.SequenceFile.CompressionType; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.compress.BZip2Codec; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.mapreduce.Job; //import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; //import org.apache.sqoop.job.etl.HdfsExportExtractor; -import org.apache.sqoop.job.etl.HdfsExportPartitioner; -import org.apache.sqoop.job.etl.HdfsSequenceImportLoader; -import org.apache.sqoop.job.etl.Loader; -import org.apache.sqoop.job.etl.LoaderContext; -import org.apache.sqoop.job.etl.Partition; -import org.apache.sqoop.job.etl.PartitionerContext; -import org.apache.sqoop.job.io.Data; -import org.apache.sqoop.job.mr.ConfigurationUtils; -import org.apache.sqoop.job.mr.SqoopFileOutputFormat; -import org.apache.sqoop.model.MJob; -import org.apache.sqoop.schema.Schema; -import org.apache.sqoop.schema.type.FixedPoint; -import org.apache.sqoop.schema.type.FloatingPoint; -import org.junit.Test; +//import org.apache.sqoop.job.etl.HdfsExportPartitioner; public class TestHdfsExtract extends TestCase { diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java index 65e82b16..8eba0490 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java @@ -17,40 +17,11 @@ */ package org.apache.sqoop.job; -import java.io.BufferedReader; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.util.LinkedList; -import java.util.List; - -import com.google.common.base.Charsets; import junit.framework.TestCase; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.util.ReflectionUtils; //import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; -import org.apache.sqoop.job.etl.Extractor; -import org.apache.sqoop.job.etl.ExtractorContext; -import org.apache.sqoop.job.etl.HdfsSequenceImportLoader; -import org.apache.sqoop.job.etl.HdfsTextImportLoader; -import org.apache.sqoop.job.etl.Partition; -import org.apache.sqoop.job.etl.Partitioner; -import org.apache.sqoop.job.etl.PartitionerContext; -import org.apache.sqoop.job.io.Data; -import org.apache.sqoop.job.mr.ConfigurationUtils; -import org.apache.sqoop.job.mr.SqoopFileOutputFormat; -import org.apache.sqoop.model.MJob; -import org.apache.sqoop.schema.Schema; -import org.apache.sqoop.schema.type.FixedPoint; -import org.apache.sqoop.schema.type.FloatingPoint; +//import org.apache.sqoop.job.etl.HdfsSequenceImportLoader; +//import org.apache.sqoop.job.etl.HdfsTextImportLoader; public class TestHdfsLoad extends TestCase { diff --git a/pom.xml b/pom.xml index 38b49741..fae9fe8f 100644 --- a/pom.xml +++ b/pom.xml @@ -299,6 +299,17 @@ limitations under the License. test-jar ${project.version} + + org.apache.sqoop.connector + sqoop-connector-hdfs + ${project.version} + + + org.apache.sqoop.connector + sqoop-connector-hdfs + test-jar + ${project.version} + org.apache.sqoop.connector sqoop-connector-mysql-jdbc diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java index f355ceb6..88be9fbd 100644 --- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java +++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java @@ -27,12 +27,7 @@ import java.sql.Statement; import java.sql.Timestamp; import java.sql.Types; -import java.util.ArrayList; -import java.util.Date; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Set; +import java.util.*; import javax.sql.DataSource; @@ -1711,14 +1706,16 @@ private List loadJobs(PreparedStatement stmt, throws SQLException { List jobs = new ArrayList(); ResultSet rsJob = null; - PreparedStatement formConnectorFetchStmt = null; + PreparedStatement toFormConnectorFetchStmt = null; + PreparedStatement fromFormConnectorFetchStmt = null; PreparedStatement formFrameworkFetchStmt = null; PreparedStatement inputFetchStmt = null; try { rsJob = stmt.executeQuery(); - formConnectorFetchStmt = conn.prepareStatement(STMT_FETCH_FORM_CONNECTOR); + toFormConnectorFetchStmt = conn.prepareStatement(STMT_FETCH_FORM_CONNECTOR); + fromFormConnectorFetchStmt = conn.prepareStatement(STMT_FETCH_FORM_CONNECTOR); formFrameworkFetchStmt = conn.prepareStatement(STMT_FETCH_FORM_FRAMEWORK); inputFetchStmt = conn.prepareStatement(STMT_FETCH_JOB_INPUT); @@ -1735,28 +1732,47 @@ private List loadJobs(PreparedStatement stmt, String updateBy = rsJob.getString(10); Date lastUpdateDate = rsJob.getTimestamp(11); - formConnectorFetchStmt.setLong(1, fromConnectorId); + fromFormConnectorFetchStmt.setLong(1, fromConnectorId); + toFormConnectorFetchStmt.setLong(1,toConnectorId); inputFetchStmt.setLong(1, id); //inputFetchStmt.setLong(1, XXX); // Will be filled by loadForms inputFetchStmt.setLong(3, id); - List connectorConnForms = new ArrayList(); + List toConnectorConnForms = new ArrayList(); + List fromConnectorConnForms = new ArrayList(); + List frameworkConnForms = new ArrayList(); List frameworkJobForms = new ArrayList(); - List fromJobForms = new ArrayList(); - List toJobForms = new ArrayList(); - loadConnectorForms(connectorConnForms, fromJobForms, toJobForms, - formConnectorFetchStmt, inputFetchStmt, 2); + // This looks confusing but our job has 2 connectors, each connector has two job forms + // To define the job, we need to TO job form of the TO connector + // and the FROM job form of the FROM connector + List fromConnectorFromJobForms = new ArrayList(); + List fromConnectorToJobForms = new ArrayList(); + List toConnectorFromJobForms = new ArrayList(); + List toConnectorToJobForms = new ArrayList(); + + + loadConnectorForms(fromConnectorConnForms, + fromConnectorFromJobForms, + fromConnectorToJobForms, + fromFormConnectorFetchStmt, + inputFetchStmt, + 2); + loadConnectorForms(toConnectorConnForms, + toConnectorFromJobForms, + toConnectorToJobForms, + toFormConnectorFetchStmt, inputFetchStmt, 2); + loadForms(frameworkConnForms, frameworkJobForms, formFrameworkFetchStmt, inputFetchStmt, 2); MJob job = new MJob( fromConnectorId, toConnectorId, fromConnectionId, toConnectionId, - new MJobForms(fromJobForms), - new MJobForms(toJobForms), + new MJobForms(fromConnectorFromJobForms), + new MJobForms(toConnectorToJobForms), new MJobForms(frameworkJobForms)); job.setPersistenceId(id); @@ -1771,7 +1787,7 @@ private List loadJobs(PreparedStatement stmt, } } finally { closeResultSets(rsJob); - closeStatements(formConnectorFetchStmt, formFrameworkFetchStmt, inputFetchStmt); + closeStatements(fromFormConnectorFetchStmt, toFormConnectorFetchStmt, formFrameworkFetchStmt, inputFetchStmt); } return jobs; diff --git a/server/pom.xml b/server/pom.xml index dc894099..67baaa57 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -64,6 +64,11 @@ limitations under the License. sqoop-connector-generic-jdbc + + org.apache.sqoop.connector + sqoop-connector-hdfs + +