diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteConnector.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteConnector.java index 982d6dde..5f58a90a 100644 --- a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteConnector.java +++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteConnector.java @@ -20,6 +20,7 @@ import org.apache.sqoop.common.Direction; import org.apache.sqoop.common.VersionInfo; +import org.apache.sqoop.connector.kite.configuration.FromJobConfiguration; import org.apache.sqoop.connector.kite.configuration.LinkConfiguration; import org.apache.sqoop.connector.kite.configuration.ToJobConfiguration; import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader; @@ -27,8 +28,6 @@ import org.apache.sqoop.job.etl.From; import org.apache.sqoop.job.etl.To; -import java.util.Arrays; -import java.util.List; import java.util.Locale; import java.util.ResourceBundle; @@ -44,6 +43,12 @@ public class KiteConnector extends SqoopConnector { KiteLoader.class, KiteToDestroyer.class); + private static final From FROM = new From( + KiteFromInitializer.class, + KiteDatasetPartitioner.class, + KiteExtractor.class, + KiteFromDestroyer.class); + @Override public String getVersion() { return VersionInfo.getBuildVersion(); @@ -64,8 +69,7 @@ public Class getLinkConfigurationClass() { public Class getJobConfigurationClass(Direction jobType) { switch (jobType) { case FROM: - // TODO: SQOOP-1647 - return null; + return FromJobConfiguration.class; case TO: return ToJobConfiguration.class; default: @@ -73,16 +77,9 @@ public Class getJobConfigurationClass(Direction jobType) { } } - @Override - public List getSupportedDirections() { - // TODO: No need to override, when SQOOP-1647 is done - return Arrays.asList(Direction.TO); - } - @Override public From getFrom() { - // TODO: SQOOP-1647 - return null; + return FROM; } @Override diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteConnectorError.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteConnectorError.java index d67c8de3..5775fcf6 100644 --- a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteConnectorError.java +++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteConnectorError.java @@ -24,9 +24,15 @@ public enum KiteConnectorError implements ErrorCode { /** Unsupported dataset URI scheme */ GENERIC_KITE_CONNECTOR_0000("Unsupported dataset URI scheme"), - /** Destination is not empty */ + /** Target dataset is not empty */ GENERIC_KITE_CONNECTOR_0001("Dataset is not empty"), + /** Dataset does not exist */ + GENERIC_KITE_CONNECTOR_0002("Dataset does not exist"), + + /** Error occurred while creating partitions */ + GENERIC_KITE_CONNECTOR_0003("Error occurred while creating partitions"), + ; private final String message; diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteDatasetExecutor.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteDatasetExecutor.java index 9432e4bf..e4514b57 100644 --- a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteDatasetExecutor.java +++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteDatasetExecutor.java @@ -23,9 +23,11 @@ import org.apache.avro.generic.GenericRecord; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.common.FileFormat; +import org.apache.sqoop.connector.common.AvroDataTypeUtil; import org.apache.sqoop.connector.kite.util.KiteDataTypeUtil; import org.kitesdk.data.Dataset; import org.kitesdk.data.DatasetDescriptor; +import org.kitesdk.data.DatasetReader; import org.kitesdk.data.DatasetWriter; import org.kitesdk.data.Datasets; import org.kitesdk.data.Format; @@ -47,10 +49,12 @@ public class KiteDatasetExecutor { private DatasetWriter writer; + private DatasetReader reader; + /** * Creates a new dataset. */ - public KiteDatasetExecutor(String uri, org.apache.sqoop.schema.Schema schema, + public static Dataset createDataset(String uri, org.apache.sqoop.schema.Schema schema, FileFormat format) { Schema datasetSchema = KiteDataTypeUtil.createAvroSchema(schema); Format datasetFormat = KiteDataTypeUtil.toFormat(format); @@ -59,11 +63,10 @@ public KiteDatasetExecutor(String uri, org.apache.sqoop.schema.Schema schema, .schema(datasetSchema) .format(datasetFormat) .build(); - dataset = Datasets.create(uri, descriptor); + return Datasets.create(uri, descriptor); } - @VisibleForTesting - protected KiteDatasetExecutor(Dataset dataset) { + public KiteDatasetExecutor(Dataset dataset) { this.dataset = dataset; } @@ -87,7 +90,7 @@ private DatasetWriter getOrNewWriter() { } @VisibleForTesting - protected boolean isWriterClosed() { + boolean isWriterClosed() { return writer == null || !writer.isOpen(); } @@ -101,25 +104,38 @@ public void closeWriter() { } } - /** - * Checks the existence by a specified dataset URI. - */ - public static boolean datasetExists(String uri) { - return Datasets.exists(uri); + public Object[] readRecord() { + if (getOrNewReader().hasNext()) { + GenericRecord record = getOrNewReader().next(); + return AvroDataTypeUtil.extractGenericRecord(record); + } + return null; + } + + private DatasetReader getOrNewReader() { + if (reader == null) { + reader = dataset.newReader(); + } + return reader; + } + + @VisibleForTesting + boolean isReaderClosed() { + return reader == null || !reader.isOpen(); + } + + public void closeReader() { + if (reader != null) { + Closeables.closeQuietly(reader); + reader = null; + } } /** * Deletes current dataset physically. */ public void deleteDataset() { - deleteDataset(dataset.getUri().toString()); - } - - /** - * Deletes particular dataset physically. - */ - public static boolean deleteDataset(String uri) { - return Datasets.delete(uri); + Datasets.delete(dataset.getUri().toString()); } /** diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteDatasetPartition.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteDatasetPartition.java new file mode 100644 index 00000000..9399a915 --- /dev/null +++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteDatasetPartition.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.connector.kite; + +import org.apache.sqoop.job.etl.Partition; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * A part of the input data partitioned by the Partitioner. + */ +public class KiteDatasetPartition extends Partition { + + /** The uri to the dataset */ + private String uri; + + public KiteDatasetPartition() { + } + + public String getUri() { + return uri; + } + + public void setUri(String uri) { + this.uri = uri; + } + + @Override + public void readFields(DataInput in) throws IOException { + uri = in.readUTF(); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeUTF(uri); + } + + @Override + public String toString() { + return String.format("{uri=%s}", uri); + } + +} \ No newline at end of file diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteDatasetPartitioner.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteDatasetPartitioner.java new file mode 100644 index 00000000..41fe7e16 --- /dev/null +++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteDatasetPartitioner.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.connector.kite; + +import org.apache.sqoop.connector.kite.configuration.FromJobConfiguration; +import org.apache.sqoop.connector.kite.configuration.LinkConfiguration; +import org.apache.sqoop.job.etl.Partition; +import org.apache.sqoop.job.etl.Partitioner; +import org.apache.sqoop.job.etl.PartitionerContext; + +import java.util.LinkedList; +import java.util.List; + +/** + * This allows connector to define how input data from the FROM source can be + * partitioned. The number of data partitions also determines the degree of + * parallelism. + */ +public class KiteDatasetPartitioner extends Partitioner { + + @Override + public List getPartitions(PartitionerContext context, + LinkConfiguration linkConfiguration, FromJobConfiguration fromJobConfig) { + // There is no way to create partitions of an un-partitioned dataset. + // TODO: SQOOP-1942 will create partitions, if dataset is partitioned. + KiteDatasetPartition partition = new KiteDatasetPartition(); + partition.setUri(fromJobConfig.fromJobConfig.uri); + + List partitions = new LinkedList(); + partitions.add(partition); + return partitions; + } + +} \ No newline at end of file diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteExtractor.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteExtractor.java new file mode 100644 index 00000000..d4a8a771 --- /dev/null +++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteExtractor.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.kite; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.avro.generic.GenericRecord; +import org.apache.log4j.Logger; +import org.apache.sqoop.connector.kite.configuration.FromJobConfiguration; +import org.apache.sqoop.connector.kite.configuration.LinkConfiguration; +import org.apache.sqoop.etl.io.DataWriter; +import org.apache.sqoop.job.etl.Extractor; +import org.apache.sqoop.job.etl.ExtractorContext; +import org.kitesdk.data.Dataset; +import org.kitesdk.data.Datasets; + +/** + * This allows connector to extract data from a source system based on each + * partition. + */ +public class KiteExtractor extends Extractor { + + private static final Logger LOG = Logger.getLogger(KiteExtractor.class); + + private long rowsRead = 0L; + + @VisibleForTesting + KiteDatasetExecutor getExecutor(String uri) { + Dataset dataset = Datasets.load(uri); + return new KiteDatasetExecutor(dataset); + } + + @Override + public void extract(ExtractorContext context, LinkConfiguration linkConfig, + FromJobConfiguration fromJobConfig, KiteDatasetPartition partition) { + String uri = partition.getUri(); + LOG.info("Loading data from " + uri); + + KiteDatasetExecutor executor = getExecutor(uri); + DataWriter writer = context.getDataWriter(); + Object[] array; + rowsRead = 0L; + + try { + while ((array = executor.readRecord()) != null) { + // TODO: SQOOP-1616 will cover more column data types. Use schema and do data type conversion (e.g. datatime). + writer.writeArrayRecord(array); + rowsRead++; + } + } finally { + executor.closeReader(); + } + } + + @Override + public long getRowsRead() { + return rowsRead; + } + +} \ No newline at end of file diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteFromDestroyer.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteFromDestroyer.java new file mode 100644 index 00000000..8d0a4952 --- /dev/null +++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteFromDestroyer.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.kite; + +import org.apache.sqoop.connector.kite.configuration.FromJobConfiguration; +import org.apache.sqoop.connector.kite.configuration.LinkConfiguration; +import org.apache.sqoop.job.etl.Destroyer; +import org.apache.sqoop.job.etl.DestroyerContext; + +/** + * This classes allows connector to define work to complete execution. + */ +public class KiteFromDestroyer extends Destroyer { + + @Override + public void destroy(DestroyerContext context, + LinkConfiguration linkConfig, FromJobConfiguration fromJobConfig) { + } + +} \ No newline at end of file diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteFromInitializer.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteFromInitializer.java new file mode 100644 index 00000000..2f82eaa7 --- /dev/null +++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteFromInitializer.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.kite; + +import org.apache.log4j.Logger; +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.connector.kite.configuration.FromJobConfiguration; +import org.apache.sqoop.connector.kite.configuration.LinkConfiguration; +import org.apache.sqoop.connector.common.AvroDataTypeUtil; +import org.apache.sqoop.job.etl.Initializer; +import org.apache.sqoop.job.etl.InitializerContext; +import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.utils.ClassUtils; +import org.kitesdk.data.Dataset; +import org.kitesdk.data.Datasets; + +import java.util.Set; + +/** + * This class allows connector to define initialization work for execution. + */ +public class KiteFromInitializer extends Initializer { + + private static final Logger LOG = Logger.getLogger(KiteFromInitializer.class); + + @Override + public void initialize(InitializerContext context, + LinkConfiguration linkConfig, FromJobConfiguration fromJobConfig) { + if (!Datasets.exists(fromJobConfig.fromJobConfig.uri)) { + LOG.error("Dataset does not exist"); + throw new SqoopException(KiteConnectorError.GENERIC_KITE_CONNECTOR_0002); + } + } + @Override + public Set getJars(InitializerContext context, + LinkConfiguration linkConfig, FromJobConfiguration fromJobConfig) { + Set jars = super.getJars(context, linkConfig, fromJobConfig); + jars.add(ClassUtils.jarForClass("org.kitesdk.data.Datasets")); + jars.add(ClassUtils.jarForClass("com.fasterxml.jackson.databind.JsonNode")); + jars.add(ClassUtils.jarForClass("com.fasterxml.jackson.core.TreeNode")); + return jars; + } + + @Override + public Schema getSchema(InitializerContext context, + LinkConfiguration linkConfig, FromJobConfiguration fromJobConfig) { + Dataset dataset = Datasets.load(fromJobConfig.fromJobConfig.uri); + org.apache.avro.Schema avroSchema = dataset.getDescriptor().getSchema(); + return AvroDataTypeUtil.createSqoopSchema(avroSchema); + } + +} \ No newline at end of file diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteLoader.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteLoader.java index b1152429..1710969c 100644 --- a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteLoader.java +++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteLoader.java @@ -18,6 +18,7 @@ package org.apache.sqoop.connector.kite; import com.google.common.annotations.VisibleForTesting; +import org.apache.avro.generic.GenericRecord; import org.apache.log4j.Logger; import org.apache.sqoop.connector.common.FileFormat; import org.apache.sqoop.connector.kite.configuration.ConfigUtil; @@ -27,6 +28,7 @@ import org.apache.sqoop.job.etl.Loader; import org.apache.sqoop.job.etl.LoaderContext; import org.apache.sqoop.schema.Schema; +import org.kitesdk.data.Dataset; /** * This class allows Kite connector to load data into a target system. @@ -36,8 +38,9 @@ public class KiteLoader extends Loader { private static final Logger LOG = Logger.getLogger(KiteLoader.class); private long rowsWritten = 0; + @VisibleForTesting - protected KiteDatasetExecutor getExecutor(String uri, Schema schema, + KiteDatasetExecutor getExecutor(String uri, Schema schema, FileFormat format) { // Note that instead of creating a dataset at destination, we create a // temporary dataset by every KiteLoader instance. They will be merged when @@ -45,8 +48,9 @@ protected KiteDatasetExecutor getExecutor(String uri, Schema schema, // not able to pass the temporary dataset uri to KiteToDestroyer. So we // delegate KiteDatasetExecutor to manage name convention for datasets. uri = KiteDatasetExecutor.suggestTemporaryDatasetUri(uri); - - return new KiteDatasetExecutor(uri, schema, format); + Dataset dataset = + KiteDatasetExecutor.createDataset(uri, schema, format); + return new KiteDatasetExecutor(dataset); } @Override diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToDestroyer.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToDestroyer.java index 3b36f1d2..704c8e9a 100644 --- a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToDestroyer.java +++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToDestroyer.java @@ -18,6 +18,7 @@ package org.apache.sqoop.connector.kite; import com.google.common.annotations.VisibleForTesting; +import org.apache.avro.generic.GenericRecord; import org.apache.log4j.Logger; import org.apache.sqoop.connector.common.FileFormat; import org.apache.sqoop.connector.kite.configuration.ConfigUtil; @@ -26,6 +27,8 @@ import org.apache.sqoop.job.etl.Destroyer; import org.apache.sqoop.job.etl.DestroyerContext; import org.apache.sqoop.schema.Schema; +import org.kitesdk.data.Dataset; +import org.kitesdk.data.Datasets; /** * This classes allows connector to define work to complete execution. @@ -54,7 +57,7 @@ public void destroy(DestroyerContext context, } } else { for (String tempUri : tempUris) { - KiteDatasetExecutor.deleteDataset(tempUri); + Datasets.delete(tempUri); LOG.warn(String.format("Failed to import. " + "Temporary dataset %s has been deleted", tempUri)); } @@ -62,9 +65,11 @@ public void destroy(DestroyerContext context, } @VisibleForTesting - protected KiteDatasetExecutor getExecutor(String uri, Schema schema, + KiteDatasetExecutor getExecutor(String uri, Schema schema, FileFormat format) { - return new KiteDatasetExecutor(uri, schema, format); + Dataset dataset = + KiteDatasetExecutor.createDataset(uri, schema, format); + return new KiteDatasetExecutor(dataset); } } \ No newline at end of file diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToInitializer.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToInitializer.java index ad5898f8..ef94d48b 100644 --- a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToInitializer.java +++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToInitializer.java @@ -28,6 +28,7 @@ import org.apache.sqoop.schema.NullSchema; import org.apache.sqoop.schema.Schema; import org.apache.sqoop.utils.ClassUtils; +import org.kitesdk.data.Datasets; import java.util.Set; @@ -46,7 +47,7 @@ public void initialize(InitializerContext context, LinkConfiguration linkConfig, ToJobConfiguration toJobConfig) { String uri = ConfigUtil.buildDatasetUri( linkConfig.linkConfig, toJobConfig.toJobConfig); - if (KiteDatasetExecutor.datasetExists(uri)) { + if (Datasets.exists(uri)) { LOG.error("Overwrite an existing dataset is not expected in new create mode."); throw new SqoopException(KiteConnectorError.GENERIC_KITE_CONNECTOR_0001); } diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/FromJobConfig.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/FromJobConfig.java new file mode 100644 index 00000000..68a1d7ac --- /dev/null +++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/FromJobConfig.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.kite.configuration; + +import org.apache.sqoop.model.ConfigClass; +import org.apache.sqoop.model.Input; +import org.apache.sqoop.model.Validator; +import org.apache.sqoop.validation.validators.DatasetURIValidator; + +@ConfigClass +public class FromJobConfig { + + @Input(size = 255, validators = {@Validator(DatasetURIValidator.class)}) + public String uri; + +} \ No newline at end of file diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/FromJobConfiguration.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/FromJobConfiguration.java new file mode 100644 index 00000000..e4e297f7 --- /dev/null +++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/FromJobConfiguration.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.kite.configuration; + +import org.apache.sqoop.model.Config; +import org.apache.sqoop.model.ConfigurationClass; + +@ConfigurationClass +public class FromJobConfiguration { + + @Config public FromJobConfig fromJobConfig; + + public FromJobConfiguration() { + fromJobConfig = new FromJobConfig(); + } + +} \ No newline at end of file diff --git a/connector/connector-kite/src/main/resources/kite-connector-config.properties b/connector/connector-kite/src/main/resources/kite-connector-config.properties index 65541c55..23d0e285 100644 --- a/connector/connector-kite/src/main/resources/kite-connector-config.properties +++ b/connector/connector-kite/src/main/resources/kite-connector-config.properties @@ -37,4 +37,15 @@ toJobConfig.uri.help = Location to store dataset (i.e. \ "dataset:hive:///") toJobConfig.fileFormat.label = File format -toJobConfig.fileFormat.help = Specify storage format to create a dataset and cannot be changed. \ No newline at end of file +toJobConfig.fileFormat.help = Specify storage format to create a dataset and cannot be changed. + +# From Job Config +# +fromJobConfig.label = From Kite Dataset Configuration +fromJobConfig.help = You must supply the information requested in order to \ + get information where you want to store your data. + +fromJobConfig.uri.label = Dataset URI +fromJobConfig.uri.help = Location to load dataset (i.e. \ + "dataset:hdfs://[:port]///", \ + "dataset:hive:///") \ No newline at end of file diff --git a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteExecutor.java b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteExecutor.java index 5e4edc50..43736ccb 100644 --- a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteExecutor.java +++ b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteExecutor.java @@ -19,39 +19,47 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRecordBuilder; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.kitesdk.data.Dataset; import org.kitesdk.data.DatasetDescriptor; +import org.kitesdk.data.DatasetReader; import org.kitesdk.data.DatasetWriter; +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertNotNull; +import static junit.framework.Assert.assertNull; import static junit.framework.TestCase.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -import static org.mockito.MockitoAnnotations.Mock; import static org.mockito.MockitoAnnotations.initMocks; public class TestKiteExecutor { - @Mock + @org.mockito.Mock private Dataset datasetMock; - @Mock + @org.mockito.Mock private DatasetDescriptor descriptorMock; - @Mock + @org.mockito.Mock private DatasetWriter writerMock; + @org.mockito.Mock + private DatasetReader readerMock; + private KiteDatasetExecutor executor; @Before public void setUp() { initMocks(this); when(datasetMock.newWriter()).thenReturn(writerMock); + when(datasetMock.newReader()).thenReturn(readerMock); when(datasetMock.getDescriptor()).thenReturn(descriptorMock); when(descriptorMock.getSchema()).thenReturn( new Schema.Parser().parse("{\"name\":\"test\",\"type\":\"record\"," + @@ -63,25 +71,16 @@ public void setUp() { @After public void tearDown() { executor.closeWriter(); + executor.closeReader(); assertTrue(executor.isWriterClosed()); + assertTrue(executor.isReaderClosed()); } @Test public void testWriteRecord() { - // setup + // setup & exercise final int NUMBER_OF_ROWS = 10; - when(descriptorMock.getSchema()).thenReturn( - new Schema.Parser().parse("{" + - "\"name\":\"test\",\"type\":\"record\"," + - "\"fields\":[" + - "{\"name\":\"f1\",\"type\":\"int\"}," + - "{\"name\":\"f2\",\"type\":\"string\"}" + - "]}")); - - // exercise - for (int i = 0; i < NUMBER_OF_ROWS; i++) { - executor.writeRecord(new Object[]{42, "foo"}); - } + createDatasetWithRecords(NUMBER_OF_ROWS); // verify verify(writerMock, times(NUMBER_OF_ROWS)).write(any(GenericRecord.class)); @@ -103,4 +102,62 @@ public void testCloseWriter() { assertTrue(executor.isWriterClosed()); } + @Test + public void testReaderRecord() { + // setup + final int NUMBER_OF_ROWS = 10; + createDatasetWithRecords(NUMBER_OF_ROWS); + when(readerMock.next()).thenReturn( + new GenericRecordBuilder(createTwoFieldSchema()) + .set("f1", 1) + .set("f2", "foo") + .build()); + when(readerMock.hasNext()).thenReturn(true); + + // exercise & verify + for (int i = 0; i < NUMBER_OF_ROWS; i++) { + Object[] actual = executor.readRecord(); + assertNotNull(actual); + assertEquals(2, actual.length); + assertEquals(1, actual[0]); + assertEquals("foo", actual[1]); + } + when(readerMock.hasNext()).thenReturn(false); + Object[] actual = executor.readRecord(); + assertNull(actual); + } + + @Test + public void testCloseReader() { + // setup + when(readerMock.isOpen()).thenReturn(true); + executor.readRecord(); + assertTrue(!executor.isReaderClosed()); + + // exercise + executor.closeReader(); + + // verify + verify(readerMock, times(1)).close(); + assertTrue(executor.isReaderClosed()); + } + + private static Schema createTwoFieldSchema() { + return new Schema.Parser().parse("{" + + "\"name\":\"test\",\"type\":\"record\"," + + "\"fields\":[" + + "{\"name\":\"f1\",\"type\":\"int\"}," + + "{\"name\":\"f2\",\"type\":\"string\"}" + + "]}"); + } + + private void createDatasetWithRecords(int numberOfRecords) { + when(descriptorMock.getSchema()).thenReturn(createTwoFieldSchema()); + + // exercise + for (int i = 0; i < numberOfRecords; i++) { + executor.writeRecord(new Object[]{i, "foo" + i}); + } + } + } \ No newline at end of file diff --git a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteExtractor.java b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteExtractor.java new file mode 100644 index 00000000..0e2c8652 --- /dev/null +++ b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteExtractor.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.connector.kite; + +import org.apache.sqoop.connector.kite.configuration.FromJobConfiguration; +import org.apache.sqoop.connector.kite.configuration.LinkConfiguration; +import org.apache.sqoop.etl.io.DataWriter; +import org.apache.sqoop.job.etl.ExtractorContext; +import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.schema.type.Text; +import org.junit.Before; +import org.junit.Test; +import org.mockito.stubbing.OngoingStubbing; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; + +public class TestKiteExtractor { + + private KiteExtractor extractor; + + @org.mockito.Mock + private KiteDatasetExecutor executorMock; + + @org.mockito.Mock + private DataWriter writerMock = new DataWriter() { + @Override + public void writeArrayRecord(Object[] array) { + } + + @Override + public void writeStringRecord(String text) { + } + + @Override + public void writeRecord(Object obj) { + } + }; + + @Before + public void setUp() { + initMocks(this); + + extractor = new KiteExtractor() { + @Override + protected KiteDatasetExecutor getExecutor(String uri) { + return executorMock; + } + }; + } + + @Test + public void testExtractor() throws Exception { + // setup + Schema schema = new Schema("testExtractor"); + schema.addColumn(new Text("TextCol")); + ExtractorContext context = new ExtractorContext(null, writerMock, schema); + LinkConfiguration linkConfig = new LinkConfiguration(); + FromJobConfiguration jobConfig = new FromJobConfiguration(); + KiteDatasetPartition partition = new KiteDatasetPartition(); + partition.setUri("dataset:hdfs:/path/to/dataset"); + OngoingStubbing readRecordMethodStub = when(executorMock.readRecord()); + final int NUMBER_OF_ROWS = 1000; + for (int i = 0; i < NUMBER_OF_ROWS; i++) { + // TODO: SQOOP-1616 will cover more column data types + readRecordMethodStub = readRecordMethodStub.thenReturn(new Object[]{}); + } + readRecordMethodStub.thenReturn(null); + + // exercise + extractor.extract(context, linkConfig, jobConfig, partition); + + // verify + verify(writerMock, times(NUMBER_OF_ROWS)).writeArrayRecord( + any(Object[].class)); + } + +} \ No newline at end of file diff --git a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteFromInitializer.java b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteFromInitializer.java new file mode 100644 index 00000000..557a3c25 --- /dev/null +++ b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteFromInitializer.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.connector.kite; + +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.connector.kite.configuration.FromJobConfiguration; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.kitesdk.data.Datasets; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; +import static org.powermock.api.mockito.PowerMockito.mockStatic; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(Datasets.class) +public class TestKiteFromInitializer { + + private KiteFromInitializer initializer; + + @Before + public void setUp() { + initMocks(this); + mockStatic(Datasets.class); + + initializer = new KiteFromInitializer(); + } + + @Test + public void testInitializePassed() { + // setup + FromJobConfiguration jobConfig = new FromJobConfiguration(); + jobConfig.fromJobConfig.uri = "dataset:file:/ds/exist"; + when(Datasets.exists(jobConfig.fromJobConfig.uri)).thenReturn(true); + + // exercise + initializer.initialize(null, null, jobConfig); + } + + @Test(expected=SqoopException.class) + public void testInitializeFailed() { + // setup + FromJobConfiguration jobConfig = new FromJobConfiguration(); + jobConfig.fromJobConfig.uri = "dataset:file:/ds/not/exist"; + when(Datasets.exists(jobConfig.fromJobConfig.uri)).thenReturn(false); + + // exercise + initializer.initialize(null, null, jobConfig); + } + +} \ No newline at end of file diff --git a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToDestroyer.java b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToDestroyer.java index 87ed9068..92424c47 100644 --- a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToDestroyer.java +++ b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToDestroyer.java @@ -26,6 +26,7 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.kitesdk.data.Datasets; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; @@ -37,7 +38,7 @@ import static org.powermock.api.mockito.PowerMockito.verifyStatic; @RunWith(PowerMockRunner.class) -@PrepareForTest(KiteDatasetExecutor.class) +@PrepareForTest({KiteDatasetExecutor.class, Datasets.class}) public class TestKiteToDestroyer { private KiteToDestroyer destroyer; @@ -55,6 +56,7 @@ public class TestKiteToDestroyer { public void setUp() { initMocks(this); mockStatic(KiteDatasetExecutor.class); + mockStatic(Datasets.class); destroyer = new KiteToDestroyer() { @Override @@ -93,7 +95,7 @@ public void testDestroyForFailedJob() { when(KiteDatasetExecutor.listTemporaryDatasetUris(toJobConfig.toJobConfig.uri)) .thenReturn(expectedUris); for (String uri : expectedUris) { - when(KiteDatasetExecutor.deleteDataset(uri)).thenReturn(true); + when(Datasets.delete(uri)).thenReturn(true); } // exercise @@ -102,7 +104,7 @@ public void testDestroyForFailedJob() { // verify for (String uri : expectedUris) { verifyStatic(times(1)); - KiteDatasetExecutor.deleteDataset(uri); + Datasets.delete(uri); } } diff --git a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToInitializer.java b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToInitializer.java index fab31f95..50c30641 100644 --- a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToInitializer.java +++ b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToInitializer.java @@ -25,6 +25,7 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.kitesdk.data.Datasets; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; @@ -35,18 +36,15 @@ import static org.powermock.api.mockito.PowerMockito.mockStatic; @RunWith(PowerMockRunner.class) -@PrepareForTest(KiteDatasetExecutor.class) +@PrepareForTest(Datasets.class) public class TestKiteToInitializer { private KiteToInitializer initializer; - @org.mockito.Mock - private KiteDatasetExecutor executorMock; - @Before public void setUp() { initMocks(this); - mockStatic(KiteDatasetExecutor.class); + mockStatic(Datasets.class); initializer = new KiteToInitializer(); } @@ -57,7 +55,7 @@ public void testInitializePassed() { LinkConfiguration linkConfig = new LinkConfiguration(); ToJobConfiguration toJobConfig = new ToJobConfiguration(); toJobConfig.toJobConfig.uri = "dataset:file:/ds/not/exist"; - when(KiteDatasetExecutor.datasetExists(toJobConfig.toJobConfig.uri)) + when(Datasets.exists(toJobConfig.toJobConfig.uri)) .thenReturn(false); // exercise @@ -70,7 +68,7 @@ public void testInitializeFailed() { LinkConfiguration linkConfig = new LinkConfiguration(); ToJobConfiguration toJobConfig = new ToJobConfiguration(); toJobConfig.toJobConfig.uri = "dataset:file:/ds/exist"; - when(KiteDatasetExecutor.datasetExists(toJobConfig.toJobConfig.uri)) + when(Datasets.exists(toJobConfig.toJobConfig.uri)) .thenReturn(true); // exercise diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/AvroDataTypeUtil.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/AvroDataTypeUtil.java new file mode 100644 index 00000000..a71385b1 --- /dev/null +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/AvroDataTypeUtil.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.common; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.sqoop.schema.type.Binary; +import org.apache.sqoop.schema.type.Bit; +import org.apache.sqoop.schema.type.Column; +import org.apache.sqoop.schema.type.FixedPoint; +import org.apache.sqoop.schema.type.FloatingPoint; +import org.apache.sqoop.schema.type.Text; +import org.apache.sqoop.schema.type.Unknown; + +import java.util.List; + +/** + * The helper class provides methods to convert Sqoop data types to Avro + * supported data types. + */ +public class AvroDataTypeUtil { + + public static org.apache.sqoop.schema.Schema createSqoopSchema( + Schema avroSchema) { + org.apache.sqoop.schema.Schema schema = + new org.apache.sqoop.schema.Schema(avroSchema.getName()); + schema.setNote(avroSchema.getDoc()); + for (Schema.Field field : avroSchema.getFields()) { + Column column = avroTypeToSchemaType(field); + schema.addColumn(column); + } + return schema; + } + + private static Column avroTypeToSchemaType(Schema.Field field) { + Schema.Type schemaType = field.schema().getType(); + if (schemaType == Schema.Type.UNION) { + List unionSchema = field.schema().getTypes(); + if (unionSchema.size() == 2) { + Schema.Type first = unionSchema.get(0).getType(); + Schema.Type second = unionSchema.get(1).getType(); + if ((first == Schema.Type.NULL && second != Schema.Type.NULL) || + (first != Schema.Type.NULL && second == Schema.Type.NULL)) { + return avroPrimitiveTypeToSchemaType(field.name(), + first != Schema.Type.NULL ? first : second); + } + } + // This is an unsupported complex data type + return new Unknown(field.name()); + } + + return avroPrimitiveTypeToSchemaType(field.name(), schemaType); + } + + private static Column avroPrimitiveTypeToSchemaType(String name, + Schema.Type type) { + assert type != Schema.Type.UNION; + switch (type) { + case INT: + case LONG: + return new FixedPoint(name); + case STRING: + return new Text(name); + case DOUBLE: + return new FloatingPoint(name); + case BOOLEAN: + return new Bit(name); + case BYTES: + return new Binary(name); + default: + return new Unknown(name); + } + } + + public static Object[] extractGenericRecord(GenericRecord data) { + List fields = data.getSchema().getFields(); + Object[] record = new Object[fields.size()]; + for (int i = 0; i < fields.size(); i++) { + record[i] = data.get(i); + } + return record; + } + +} \ No newline at end of file