5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-16 17:00:53 +08:00

SQOOP-1647: Sqoop2: Read data from HDFS in KiteConnector

(Qian Xu via Abraham Elmahrek)
This commit is contained in:
Abraham Elmahrek 2015-01-07 21:11:33 -08:00
parent 2b59860da1
commit f073cf6938
20 changed files with 783 additions and 66 deletions

View File

@ -20,6 +20,7 @@
import org.apache.sqoop.common.Direction; import org.apache.sqoop.common.Direction;
import org.apache.sqoop.common.VersionInfo; import org.apache.sqoop.common.VersionInfo;
import org.apache.sqoop.connector.kite.configuration.FromJobConfiguration;
import org.apache.sqoop.connector.kite.configuration.LinkConfiguration; import org.apache.sqoop.connector.kite.configuration.LinkConfiguration;
import org.apache.sqoop.connector.kite.configuration.ToJobConfiguration; import org.apache.sqoop.connector.kite.configuration.ToJobConfiguration;
import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader; import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader;
@ -27,8 +28,6 @@
import org.apache.sqoop.job.etl.From; import org.apache.sqoop.job.etl.From;
import org.apache.sqoop.job.etl.To; import org.apache.sqoop.job.etl.To;
import java.util.Arrays;
import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.ResourceBundle; import java.util.ResourceBundle;
@ -44,6 +43,12 @@ public class KiteConnector extends SqoopConnector {
KiteLoader.class, KiteLoader.class,
KiteToDestroyer.class); KiteToDestroyer.class);
private static final From FROM = new From(
KiteFromInitializer.class,
KiteDatasetPartitioner.class,
KiteExtractor.class,
KiteFromDestroyer.class);
@Override @Override
public String getVersion() { public String getVersion() {
return VersionInfo.getBuildVersion(); return VersionInfo.getBuildVersion();
@ -64,8 +69,7 @@ public Class getLinkConfigurationClass() {
public Class getJobConfigurationClass(Direction jobType) { public Class getJobConfigurationClass(Direction jobType) {
switch (jobType) { switch (jobType) {
case FROM: case FROM:
// TODO: SQOOP-1647 return FromJobConfiguration.class;
return null;
case TO: case TO:
return ToJobConfiguration.class; return ToJobConfiguration.class;
default: default:
@ -73,16 +77,9 @@ public Class getJobConfigurationClass(Direction jobType) {
} }
} }
@Override
public List<Direction> getSupportedDirections() {
// TODO: No need to override, when SQOOP-1647 is done
return Arrays.asList(Direction.TO);
}
@Override @Override
public From getFrom() { public From getFrom() {
// TODO: SQOOP-1647 return FROM;
return null;
} }
@Override @Override

View File

@ -24,9 +24,15 @@ public enum KiteConnectorError implements ErrorCode {
/** Unsupported dataset URI scheme */ /** Unsupported dataset URI scheme */
GENERIC_KITE_CONNECTOR_0000("Unsupported dataset URI scheme"), GENERIC_KITE_CONNECTOR_0000("Unsupported dataset URI scheme"),
/** Destination is not empty */ /** Target dataset is not empty */
GENERIC_KITE_CONNECTOR_0001("Dataset is not empty"), GENERIC_KITE_CONNECTOR_0001("Dataset is not empty"),
/** Dataset does not exist */
GENERIC_KITE_CONNECTOR_0002("Dataset does not exist"),
/** Error occurred while creating partitions */
GENERIC_KITE_CONNECTOR_0003("Error occurred while creating partitions"),
; ;
private final String message; private final String message;

View File

@ -23,9 +23,11 @@
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.common.FileFormat; import org.apache.sqoop.connector.common.FileFormat;
import org.apache.sqoop.connector.common.AvroDataTypeUtil;
import org.apache.sqoop.connector.kite.util.KiteDataTypeUtil; import org.apache.sqoop.connector.kite.util.KiteDataTypeUtil;
import org.kitesdk.data.Dataset; import org.kitesdk.data.Dataset;
import org.kitesdk.data.DatasetDescriptor; import org.kitesdk.data.DatasetDescriptor;
import org.kitesdk.data.DatasetReader;
import org.kitesdk.data.DatasetWriter; import org.kitesdk.data.DatasetWriter;
import org.kitesdk.data.Datasets; import org.kitesdk.data.Datasets;
import org.kitesdk.data.Format; import org.kitesdk.data.Format;
@ -47,10 +49,12 @@ public class KiteDatasetExecutor {
private DatasetWriter<GenericRecord> writer; private DatasetWriter<GenericRecord> writer;
private DatasetReader<GenericRecord> reader;
/** /**
* Creates a new dataset. * Creates a new dataset.
*/ */
public KiteDatasetExecutor(String uri, org.apache.sqoop.schema.Schema schema, public static Dataset<GenericRecord> createDataset(String uri, org.apache.sqoop.schema.Schema schema,
FileFormat format) { FileFormat format) {
Schema datasetSchema = KiteDataTypeUtil.createAvroSchema(schema); Schema datasetSchema = KiteDataTypeUtil.createAvroSchema(schema);
Format datasetFormat = KiteDataTypeUtil.toFormat(format); Format datasetFormat = KiteDataTypeUtil.toFormat(format);
@ -59,11 +63,10 @@ public KiteDatasetExecutor(String uri, org.apache.sqoop.schema.Schema schema,
.schema(datasetSchema) .schema(datasetSchema)
.format(datasetFormat) .format(datasetFormat)
.build(); .build();
dataset = Datasets.create(uri, descriptor); return Datasets.create(uri, descriptor);
} }
@VisibleForTesting public KiteDatasetExecutor(Dataset<GenericRecord> dataset) {
protected KiteDatasetExecutor(Dataset<GenericRecord> dataset) {
this.dataset = dataset; this.dataset = dataset;
} }
@ -87,7 +90,7 @@ private DatasetWriter<GenericRecord> getOrNewWriter() {
} }
@VisibleForTesting @VisibleForTesting
protected boolean isWriterClosed() { boolean isWriterClosed() {
return writer == null || !writer.isOpen(); return writer == null || !writer.isOpen();
} }
@ -101,25 +104,38 @@ public void closeWriter() {
} }
} }
/** public Object[] readRecord() {
* Checks the existence by a specified dataset URI. if (getOrNewReader().hasNext()) {
*/ GenericRecord record = getOrNewReader().next();
public static boolean datasetExists(String uri) { return AvroDataTypeUtil.extractGenericRecord(record);
return Datasets.exists(uri); }
return null;
}
private DatasetReader<GenericRecord> getOrNewReader() {
if (reader == null) {
reader = dataset.newReader();
}
return reader;
}
@VisibleForTesting
boolean isReaderClosed() {
return reader == null || !reader.isOpen();
}
public void closeReader() {
if (reader != null) {
Closeables.closeQuietly(reader);
reader = null;
}
} }
/** /**
* Deletes current dataset physically. * Deletes current dataset physically.
*/ */
public void deleteDataset() { public void deleteDataset() {
deleteDataset(dataset.getUri().toString()); Datasets.delete(dataset.getUri().toString());
}
/**
* Deletes particular dataset physically.
*/
public static boolean deleteDataset(String uri) {
return Datasets.delete(uri);
} }
/** /**

View File

@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.kite;
import org.apache.sqoop.job.etl.Partition;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* A part of the input data partitioned by the Partitioner.
*/
public class KiteDatasetPartition extends Partition {
/** The uri to the dataset */
private String uri;
public KiteDatasetPartition() {
}
public String getUri() {
return uri;
}
public void setUri(String uri) {
this.uri = uri;
}
@Override
public void readFields(DataInput in) throws IOException {
uri = in.readUTF();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(uri);
}
@Override
public String toString() {
return String.format("{uri=%s}", uri);
}
}

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.kite;
import org.apache.sqoop.connector.kite.configuration.FromJobConfiguration;
import org.apache.sqoop.connector.kite.configuration.LinkConfiguration;
import org.apache.sqoop.job.etl.Partition;
import org.apache.sqoop.job.etl.Partitioner;
import org.apache.sqoop.job.etl.PartitionerContext;
import java.util.LinkedList;
import java.util.List;
/**
* This allows connector to define how input data from the FROM source can be
* partitioned. The number of data partitions also determines the degree of
* parallelism.
*/
public class KiteDatasetPartitioner extends Partitioner<LinkConfiguration,
FromJobConfiguration> {
@Override
public List<Partition> getPartitions(PartitionerContext context,
LinkConfiguration linkConfiguration, FromJobConfiguration fromJobConfig) {
// There is no way to create partitions of an un-partitioned dataset.
// TODO: SQOOP-1942 will create partitions, if dataset is partitioned.
KiteDatasetPartition partition = new KiteDatasetPartition();
partition.setUri(fromJobConfig.fromJobConfig.uri);
List<Partition> partitions = new LinkedList<Partition>();
partitions.add(partition);
return partitions;
}
}

View File

@ -0,0 +1,75 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.kite;
import com.google.common.annotations.VisibleForTesting;
import org.apache.avro.generic.GenericRecord;
import org.apache.log4j.Logger;
import org.apache.sqoop.connector.kite.configuration.FromJobConfiguration;
import org.apache.sqoop.connector.kite.configuration.LinkConfiguration;
import org.apache.sqoop.etl.io.DataWriter;
import org.apache.sqoop.job.etl.Extractor;
import org.apache.sqoop.job.etl.ExtractorContext;
import org.kitesdk.data.Dataset;
import org.kitesdk.data.Datasets;
/**
* This allows connector to extract data from a source system based on each
* partition.
*/
public class KiteExtractor extends Extractor<LinkConfiguration,
FromJobConfiguration, KiteDatasetPartition> {
private static final Logger LOG = Logger.getLogger(KiteExtractor.class);
private long rowsRead = 0L;
@VisibleForTesting
KiteDatasetExecutor getExecutor(String uri) {
Dataset<GenericRecord> dataset = Datasets.load(uri);
return new KiteDatasetExecutor(dataset);
}
@Override
public void extract(ExtractorContext context, LinkConfiguration linkConfig,
FromJobConfiguration fromJobConfig, KiteDatasetPartition partition) {
String uri = partition.getUri();
LOG.info("Loading data from " + uri);
KiteDatasetExecutor executor = getExecutor(uri);
DataWriter writer = context.getDataWriter();
Object[] array;
rowsRead = 0L;
try {
while ((array = executor.readRecord()) != null) {
// TODO: SQOOP-1616 will cover more column data types. Use schema and do data type conversion (e.g. datatime).
writer.writeArrayRecord(array);
rowsRead++;
}
} finally {
executor.closeReader();
}
}
@Override
public long getRowsRead() {
return rowsRead;
}
}

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.kite;
import org.apache.sqoop.connector.kite.configuration.FromJobConfiguration;
import org.apache.sqoop.connector.kite.configuration.LinkConfiguration;
import org.apache.sqoop.job.etl.Destroyer;
import org.apache.sqoop.job.etl.DestroyerContext;
/**
* This classes allows connector to define work to complete execution.
*/
public class KiteFromDestroyer extends Destroyer<LinkConfiguration,
FromJobConfiguration> {
@Override
public void destroy(DestroyerContext context,
LinkConfiguration linkConfig, FromJobConfiguration fromJobConfig) {
}
}

View File

@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.kite;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.kite.configuration.FromJobConfiguration;
import org.apache.sqoop.connector.kite.configuration.LinkConfiguration;
import org.apache.sqoop.connector.common.AvroDataTypeUtil;
import org.apache.sqoop.job.etl.Initializer;
import org.apache.sqoop.job.etl.InitializerContext;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.utils.ClassUtils;
import org.kitesdk.data.Dataset;
import org.kitesdk.data.Datasets;
import java.util.Set;
/**
* This class allows connector to define initialization work for execution.
*/
public class KiteFromInitializer extends Initializer<LinkConfiguration,
FromJobConfiguration> {
private static final Logger LOG = Logger.getLogger(KiteFromInitializer.class);
@Override
public void initialize(InitializerContext context,
LinkConfiguration linkConfig, FromJobConfiguration fromJobConfig) {
if (!Datasets.exists(fromJobConfig.fromJobConfig.uri)) {
LOG.error("Dataset does not exist");
throw new SqoopException(KiteConnectorError.GENERIC_KITE_CONNECTOR_0002);
}
}
@Override
public Set<String> getJars(InitializerContext context,
LinkConfiguration linkConfig, FromJobConfiguration fromJobConfig) {
Set<String> jars = super.getJars(context, linkConfig, fromJobConfig);
jars.add(ClassUtils.jarForClass("org.kitesdk.data.Datasets"));
jars.add(ClassUtils.jarForClass("com.fasterxml.jackson.databind.JsonNode"));
jars.add(ClassUtils.jarForClass("com.fasterxml.jackson.core.TreeNode"));
return jars;
}
@Override
public Schema getSchema(InitializerContext context,
LinkConfiguration linkConfig, FromJobConfiguration fromJobConfig) {
Dataset dataset = Datasets.load(fromJobConfig.fromJobConfig.uri);
org.apache.avro.Schema avroSchema = dataset.getDescriptor().getSchema();
return AvroDataTypeUtil.createSqoopSchema(avroSchema);
}
}

View File

@ -18,6 +18,7 @@
package org.apache.sqoop.connector.kite; package org.apache.sqoop.connector.kite;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.avro.generic.GenericRecord;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.sqoop.connector.common.FileFormat; import org.apache.sqoop.connector.common.FileFormat;
import org.apache.sqoop.connector.kite.configuration.ConfigUtil; import org.apache.sqoop.connector.kite.configuration.ConfigUtil;
@ -27,6 +28,7 @@
import org.apache.sqoop.job.etl.Loader; import org.apache.sqoop.job.etl.Loader;
import org.apache.sqoop.job.etl.LoaderContext; import org.apache.sqoop.job.etl.LoaderContext;
import org.apache.sqoop.schema.Schema; import org.apache.sqoop.schema.Schema;
import org.kitesdk.data.Dataset;
/** /**
* This class allows Kite connector to load data into a target system. * This class allows Kite connector to load data into a target system.
@ -36,8 +38,9 @@ public class KiteLoader extends Loader<LinkConfiguration, ToJobConfiguration> {
private static final Logger LOG = Logger.getLogger(KiteLoader.class); private static final Logger LOG = Logger.getLogger(KiteLoader.class);
private long rowsWritten = 0; private long rowsWritten = 0;
@VisibleForTesting @VisibleForTesting
protected KiteDatasetExecutor getExecutor(String uri, Schema schema, KiteDatasetExecutor getExecutor(String uri, Schema schema,
FileFormat format) { FileFormat format) {
// Note that instead of creating a dataset at destination, we create a // Note that instead of creating a dataset at destination, we create a
// temporary dataset by every KiteLoader instance. They will be merged when // temporary dataset by every KiteLoader instance. They will be merged when
@ -45,8 +48,9 @@ protected KiteDatasetExecutor getExecutor(String uri, Schema schema,
// not able to pass the temporary dataset uri to KiteToDestroyer. So we // not able to pass the temporary dataset uri to KiteToDestroyer. So we
// delegate KiteDatasetExecutor to manage name convention for datasets. // delegate KiteDatasetExecutor to manage name convention for datasets.
uri = KiteDatasetExecutor.suggestTemporaryDatasetUri(uri); uri = KiteDatasetExecutor.suggestTemporaryDatasetUri(uri);
Dataset<GenericRecord> dataset =
return new KiteDatasetExecutor(uri, schema, format); KiteDatasetExecutor.createDataset(uri, schema, format);
return new KiteDatasetExecutor(dataset);
} }
@Override @Override

View File

@ -18,6 +18,7 @@
package org.apache.sqoop.connector.kite; package org.apache.sqoop.connector.kite;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.avro.generic.GenericRecord;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.sqoop.connector.common.FileFormat; import org.apache.sqoop.connector.common.FileFormat;
import org.apache.sqoop.connector.kite.configuration.ConfigUtil; import org.apache.sqoop.connector.kite.configuration.ConfigUtil;
@ -26,6 +27,8 @@
import org.apache.sqoop.job.etl.Destroyer; import org.apache.sqoop.job.etl.Destroyer;
import org.apache.sqoop.job.etl.DestroyerContext; import org.apache.sqoop.job.etl.DestroyerContext;
import org.apache.sqoop.schema.Schema; import org.apache.sqoop.schema.Schema;
import org.kitesdk.data.Dataset;
import org.kitesdk.data.Datasets;
/** /**
* This classes allows connector to define work to complete execution. * This classes allows connector to define work to complete execution.
@ -54,7 +57,7 @@ public void destroy(DestroyerContext context,
} }
} else { } else {
for (String tempUri : tempUris) { for (String tempUri : tempUris) {
KiteDatasetExecutor.deleteDataset(tempUri); Datasets.delete(tempUri);
LOG.warn(String.format("Failed to import. " + LOG.warn(String.format("Failed to import. " +
"Temporary dataset %s has been deleted", tempUri)); "Temporary dataset %s has been deleted", tempUri));
} }
@ -62,9 +65,11 @@ public void destroy(DestroyerContext context,
} }
@VisibleForTesting @VisibleForTesting
protected KiteDatasetExecutor getExecutor(String uri, Schema schema, KiteDatasetExecutor getExecutor(String uri, Schema schema,
FileFormat format) { FileFormat format) {
return new KiteDatasetExecutor(uri, schema, format); Dataset<GenericRecord> dataset =
KiteDatasetExecutor.createDataset(uri, schema, format);
return new KiteDatasetExecutor(dataset);
} }
} }

View File

@ -28,6 +28,7 @@
import org.apache.sqoop.schema.NullSchema; import org.apache.sqoop.schema.NullSchema;
import org.apache.sqoop.schema.Schema; import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.utils.ClassUtils; import org.apache.sqoop.utils.ClassUtils;
import org.kitesdk.data.Datasets;
import java.util.Set; import java.util.Set;
@ -46,7 +47,7 @@ public void initialize(InitializerContext context,
LinkConfiguration linkConfig, ToJobConfiguration toJobConfig) { LinkConfiguration linkConfig, ToJobConfiguration toJobConfig) {
String uri = ConfigUtil.buildDatasetUri( String uri = ConfigUtil.buildDatasetUri(
linkConfig.linkConfig, toJobConfig.toJobConfig); linkConfig.linkConfig, toJobConfig.toJobConfig);
if (KiteDatasetExecutor.datasetExists(uri)) { if (Datasets.exists(uri)) {
LOG.error("Overwrite an existing dataset is not expected in new create mode."); LOG.error("Overwrite an existing dataset is not expected in new create mode.");
throw new SqoopException(KiteConnectorError.GENERIC_KITE_CONNECTOR_0001); throw new SqoopException(KiteConnectorError.GENERIC_KITE_CONNECTOR_0001);
} }

View File

@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.kite.configuration;
import org.apache.sqoop.model.ConfigClass;
import org.apache.sqoop.model.Input;
import org.apache.sqoop.model.Validator;
import org.apache.sqoop.validation.validators.DatasetURIValidator;
@ConfigClass
public class FromJobConfig {
@Input(size = 255, validators = {@Validator(DatasetURIValidator.class)})
public String uri;
}

View File

@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.kite.configuration;
import org.apache.sqoop.model.Config;
import org.apache.sqoop.model.ConfigurationClass;
@ConfigurationClass
public class FromJobConfiguration {
@Config public FromJobConfig fromJobConfig;
public FromJobConfiguration() {
fromJobConfig = new FromJobConfig();
}
}

View File

@ -37,4 +37,15 @@ toJobConfig.uri.help = Location to store dataset (i.e. \
"dataset:hive://<namespace>/<dataset>") "dataset:hive://<namespace>/<dataset>")
toJobConfig.fileFormat.label = File format toJobConfig.fileFormat.label = File format
toJobConfig.fileFormat.help = Specify storage format to create a dataset and cannot be changed. toJobConfig.fileFormat.help = Specify storage format to create a dataset and cannot be changed.
# From Job Config
#
fromJobConfig.label = From Kite Dataset Configuration
fromJobConfig.help = You must supply the information requested in order to \
get information where you want to store your data.
fromJobConfig.uri.label = Dataset URI
fromJobConfig.uri.help = Location to load dataset (i.e. \
"dataset:hdfs://<host>[:port]/<path>/<namespace>/<dataset>", \
"dataset:hive://<namespace>/<dataset>")

View File

@ -19,39 +19,47 @@
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.kitesdk.data.Dataset; import org.kitesdk.data.Dataset;
import org.kitesdk.data.DatasetDescriptor; import org.kitesdk.data.DatasetDescriptor;
import org.kitesdk.data.DatasetReader;
import org.kitesdk.data.DatasetWriter; import org.kitesdk.data.DatasetWriter;
import static junit.framework.Assert.assertEquals;
import static junit.framework.Assert.assertNotNull;
import static junit.framework.Assert.assertNull;
import static junit.framework.TestCase.assertTrue; import static junit.framework.TestCase.assertTrue;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.Mock;
import static org.mockito.MockitoAnnotations.initMocks; import static org.mockito.MockitoAnnotations.initMocks;
public class TestKiteExecutor { public class TestKiteExecutor {
@Mock @org.mockito.Mock
private Dataset<GenericRecord> datasetMock; private Dataset<GenericRecord> datasetMock;
@Mock @org.mockito.Mock
private DatasetDescriptor descriptorMock; private DatasetDescriptor descriptorMock;
@Mock @org.mockito.Mock
private DatasetWriter<GenericRecord> writerMock; private DatasetWriter<GenericRecord> writerMock;
@org.mockito.Mock
private DatasetReader<GenericRecord> readerMock;
private KiteDatasetExecutor executor; private KiteDatasetExecutor executor;
@Before @Before
public void setUp() { public void setUp() {
initMocks(this); initMocks(this);
when(datasetMock.newWriter()).thenReturn(writerMock); when(datasetMock.newWriter()).thenReturn(writerMock);
when(datasetMock.newReader()).thenReturn(readerMock);
when(datasetMock.getDescriptor()).thenReturn(descriptorMock); when(datasetMock.getDescriptor()).thenReturn(descriptorMock);
when(descriptorMock.getSchema()).thenReturn( when(descriptorMock.getSchema()).thenReturn(
new Schema.Parser().parse("{\"name\":\"test\",\"type\":\"record\"," + new Schema.Parser().parse("{\"name\":\"test\",\"type\":\"record\"," +
@ -63,25 +71,16 @@ public void setUp() {
@After @After
public void tearDown() { public void tearDown() {
executor.closeWriter(); executor.closeWriter();
executor.closeReader();
assertTrue(executor.isWriterClosed()); assertTrue(executor.isWriterClosed());
assertTrue(executor.isReaderClosed());
} }
@Test @Test
public void testWriteRecord() { public void testWriteRecord() {
// setup // setup & exercise
final int NUMBER_OF_ROWS = 10; final int NUMBER_OF_ROWS = 10;
when(descriptorMock.getSchema()).thenReturn( createDatasetWithRecords(NUMBER_OF_ROWS);
new Schema.Parser().parse("{" +
"\"name\":\"test\",\"type\":\"record\"," +
"\"fields\":[" +
"{\"name\":\"f1\",\"type\":\"int\"}," +
"{\"name\":\"f2\",\"type\":\"string\"}" +
"]}"));
// exercise
for (int i = 0; i < NUMBER_OF_ROWS; i++) {
executor.writeRecord(new Object[]{42, "foo"});
}
// verify // verify
verify(writerMock, times(NUMBER_OF_ROWS)).write(any(GenericRecord.class)); verify(writerMock, times(NUMBER_OF_ROWS)).write(any(GenericRecord.class));
@ -103,4 +102,62 @@ public void testCloseWriter() {
assertTrue(executor.isWriterClosed()); assertTrue(executor.isWriterClosed());
} }
@Test
public void testReaderRecord() {
// setup
final int NUMBER_OF_ROWS = 10;
createDatasetWithRecords(NUMBER_OF_ROWS);
when(readerMock.next()).thenReturn(
new GenericRecordBuilder(createTwoFieldSchema())
.set("f1", 1)
.set("f2", "foo")
.build());
when(readerMock.hasNext()).thenReturn(true);
// exercise & verify
for (int i = 0; i < NUMBER_OF_ROWS; i++) {
Object[] actual = executor.readRecord();
assertNotNull(actual);
assertEquals(2, actual.length);
assertEquals(1, actual[0]);
assertEquals("foo", actual[1]);
}
when(readerMock.hasNext()).thenReturn(false);
Object[] actual = executor.readRecord();
assertNull(actual);
}
@Test
public void testCloseReader() {
// setup
when(readerMock.isOpen()).thenReturn(true);
executor.readRecord();
assertTrue(!executor.isReaderClosed());
// exercise
executor.closeReader();
// verify
verify(readerMock, times(1)).close();
assertTrue(executor.isReaderClosed());
}
private static Schema createTwoFieldSchema() {
return new Schema.Parser().parse("{" +
"\"name\":\"test\",\"type\":\"record\"," +
"\"fields\":[" +
"{\"name\":\"f1\",\"type\":\"int\"}," +
"{\"name\":\"f2\",\"type\":\"string\"}" +
"]}");
}
private void createDatasetWithRecords(int numberOfRecords) {
when(descriptorMock.getSchema()).thenReturn(createTwoFieldSchema());
// exercise
for (int i = 0; i < numberOfRecords; i++) {
executor.writeRecord(new Object[]{i, "foo" + i});
}
}
} }

View File

@ -0,0 +1,97 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.kite;
import org.apache.sqoop.connector.kite.configuration.FromJobConfiguration;
import org.apache.sqoop.connector.kite.configuration.LinkConfiguration;
import org.apache.sqoop.etl.io.DataWriter;
import org.apache.sqoop.job.etl.ExtractorContext;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.schema.type.Text;
import org.junit.Before;
import org.junit.Test;
import org.mockito.stubbing.OngoingStubbing;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;
public class TestKiteExtractor {
private KiteExtractor extractor;
@org.mockito.Mock
private KiteDatasetExecutor executorMock;
@org.mockito.Mock
private DataWriter writerMock = new DataWriter() {
@Override
public void writeArrayRecord(Object[] array) {
}
@Override
public void writeStringRecord(String text) {
}
@Override
public void writeRecord(Object obj) {
}
};
@Before
public void setUp() {
initMocks(this);
extractor = new KiteExtractor() {
@Override
protected KiteDatasetExecutor getExecutor(String uri) {
return executorMock;
}
};
}
@Test
public void testExtractor() throws Exception {
// setup
Schema schema = new Schema("testExtractor");
schema.addColumn(new Text("TextCol"));
ExtractorContext context = new ExtractorContext(null, writerMock, schema);
LinkConfiguration linkConfig = new LinkConfiguration();
FromJobConfiguration jobConfig = new FromJobConfiguration();
KiteDatasetPartition partition = new KiteDatasetPartition();
partition.setUri("dataset:hdfs:/path/to/dataset");
OngoingStubbing<Object[]> readRecordMethodStub = when(executorMock.readRecord());
final int NUMBER_OF_ROWS = 1000;
for (int i = 0; i < NUMBER_OF_ROWS; i++) {
// TODO: SQOOP-1616 will cover more column data types
readRecordMethodStub = readRecordMethodStub.thenReturn(new Object[]{});
}
readRecordMethodStub.thenReturn(null);
// exercise
extractor.extract(context, linkConfig, jobConfig, partition);
// verify
verify(writerMock, times(NUMBER_OF_ROWS)).writeArrayRecord(
any(Object[].class));
}
}

View File

@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.kite;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.kite.configuration.FromJobConfiguration;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.kitesdk.data.Datasets;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;
import static org.powermock.api.mockito.PowerMockito.mockStatic;
@RunWith(PowerMockRunner.class)
@PrepareForTest(Datasets.class)
public class TestKiteFromInitializer {
private KiteFromInitializer initializer;
@Before
public void setUp() {
initMocks(this);
mockStatic(Datasets.class);
initializer = new KiteFromInitializer();
}
@Test
public void testInitializePassed() {
// setup
FromJobConfiguration jobConfig = new FromJobConfiguration();
jobConfig.fromJobConfig.uri = "dataset:file:/ds/exist";
when(Datasets.exists(jobConfig.fromJobConfig.uri)).thenReturn(true);
// exercise
initializer.initialize(null, null, jobConfig);
}
@Test(expected=SqoopException.class)
public void testInitializeFailed() {
// setup
FromJobConfiguration jobConfig = new FromJobConfiguration();
jobConfig.fromJobConfig.uri = "dataset:file:/ds/not/exist";
when(Datasets.exists(jobConfig.fromJobConfig.uri)).thenReturn(false);
// exercise
initializer.initialize(null, null, jobConfig);
}
}

View File

@ -26,6 +26,7 @@
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.kitesdk.data.Datasets;
import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner; import org.powermock.modules.junit4.PowerMockRunner;
@ -37,7 +38,7 @@
import static org.powermock.api.mockito.PowerMockito.verifyStatic; import static org.powermock.api.mockito.PowerMockito.verifyStatic;
@RunWith(PowerMockRunner.class) @RunWith(PowerMockRunner.class)
@PrepareForTest(KiteDatasetExecutor.class) @PrepareForTest({KiteDatasetExecutor.class, Datasets.class})
public class TestKiteToDestroyer { public class TestKiteToDestroyer {
private KiteToDestroyer destroyer; private KiteToDestroyer destroyer;
@ -55,6 +56,7 @@ public class TestKiteToDestroyer {
public void setUp() { public void setUp() {
initMocks(this); initMocks(this);
mockStatic(KiteDatasetExecutor.class); mockStatic(KiteDatasetExecutor.class);
mockStatic(Datasets.class);
destroyer = new KiteToDestroyer() { destroyer = new KiteToDestroyer() {
@Override @Override
@ -93,7 +95,7 @@ public void testDestroyForFailedJob() {
when(KiteDatasetExecutor.listTemporaryDatasetUris(toJobConfig.toJobConfig.uri)) when(KiteDatasetExecutor.listTemporaryDatasetUris(toJobConfig.toJobConfig.uri))
.thenReturn(expectedUris); .thenReturn(expectedUris);
for (String uri : expectedUris) { for (String uri : expectedUris) {
when(KiteDatasetExecutor.deleteDataset(uri)).thenReturn(true); when(Datasets.delete(uri)).thenReturn(true);
} }
// exercise // exercise
@ -102,7 +104,7 @@ public void testDestroyForFailedJob() {
// verify // verify
for (String uri : expectedUris) { for (String uri : expectedUris) {
verifyStatic(times(1)); verifyStatic(times(1));
KiteDatasetExecutor.deleteDataset(uri); Datasets.delete(uri);
} }
} }

View File

@ -25,6 +25,7 @@
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.kitesdk.data.Datasets;
import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner; import org.powermock.modules.junit4.PowerMockRunner;
@ -35,18 +36,15 @@
import static org.powermock.api.mockito.PowerMockito.mockStatic; import static org.powermock.api.mockito.PowerMockito.mockStatic;
@RunWith(PowerMockRunner.class) @RunWith(PowerMockRunner.class)
@PrepareForTest(KiteDatasetExecutor.class) @PrepareForTest(Datasets.class)
public class TestKiteToInitializer { public class TestKiteToInitializer {
private KiteToInitializer initializer; private KiteToInitializer initializer;
@org.mockito.Mock
private KiteDatasetExecutor executorMock;
@Before @Before
public void setUp() { public void setUp() {
initMocks(this); initMocks(this);
mockStatic(KiteDatasetExecutor.class); mockStatic(Datasets.class);
initializer = new KiteToInitializer(); initializer = new KiteToInitializer();
} }
@ -57,7 +55,7 @@ public void testInitializePassed() {
LinkConfiguration linkConfig = new LinkConfiguration(); LinkConfiguration linkConfig = new LinkConfiguration();
ToJobConfiguration toJobConfig = new ToJobConfiguration(); ToJobConfiguration toJobConfig = new ToJobConfiguration();
toJobConfig.toJobConfig.uri = "dataset:file:/ds/not/exist"; toJobConfig.toJobConfig.uri = "dataset:file:/ds/not/exist";
when(KiteDatasetExecutor.datasetExists(toJobConfig.toJobConfig.uri)) when(Datasets.exists(toJobConfig.toJobConfig.uri))
.thenReturn(false); .thenReturn(false);
// exercise // exercise
@ -70,7 +68,7 @@ public void testInitializeFailed() {
LinkConfiguration linkConfig = new LinkConfiguration(); LinkConfiguration linkConfig = new LinkConfiguration();
ToJobConfiguration toJobConfig = new ToJobConfiguration(); ToJobConfiguration toJobConfig = new ToJobConfiguration();
toJobConfig.toJobConfig.uri = "dataset:file:/ds/exist"; toJobConfig.toJobConfig.uri = "dataset:file:/ds/exist";
when(KiteDatasetExecutor.datasetExists(toJobConfig.toJobConfig.uri)) when(Datasets.exists(toJobConfig.toJobConfig.uri))
.thenReturn(true); .thenReturn(true);
// exercise // exercise

View File

@ -0,0 +1,99 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.common;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.sqoop.schema.type.Binary;
import org.apache.sqoop.schema.type.Bit;
import org.apache.sqoop.schema.type.Column;
import org.apache.sqoop.schema.type.FixedPoint;
import org.apache.sqoop.schema.type.FloatingPoint;
import org.apache.sqoop.schema.type.Text;
import org.apache.sqoop.schema.type.Unknown;
import java.util.List;
/**
* The helper class provides methods to convert Sqoop data types to Avro
* supported data types.
*/
public class AvroDataTypeUtil {
public static org.apache.sqoop.schema.Schema createSqoopSchema(
Schema avroSchema) {
org.apache.sqoop.schema.Schema schema =
new org.apache.sqoop.schema.Schema(avroSchema.getName());
schema.setNote(avroSchema.getDoc());
for (Schema.Field field : avroSchema.getFields()) {
Column column = avroTypeToSchemaType(field);
schema.addColumn(column);
}
return schema;
}
private static Column avroTypeToSchemaType(Schema.Field field) {
Schema.Type schemaType = field.schema().getType();
if (schemaType == Schema.Type.UNION) {
List<Schema> unionSchema = field.schema().getTypes();
if (unionSchema.size() == 2) {
Schema.Type first = unionSchema.get(0).getType();
Schema.Type second = unionSchema.get(1).getType();
if ((first == Schema.Type.NULL && second != Schema.Type.NULL) ||
(first != Schema.Type.NULL && second == Schema.Type.NULL)) {
return avroPrimitiveTypeToSchemaType(field.name(),
first != Schema.Type.NULL ? first : second);
}
}
// This is an unsupported complex data type
return new Unknown(field.name());
}
return avroPrimitiveTypeToSchemaType(field.name(), schemaType);
}
private static Column avroPrimitiveTypeToSchemaType(String name,
Schema.Type type) {
assert type != Schema.Type.UNION;
switch (type) {
case INT:
case LONG:
return new FixedPoint(name);
case STRING:
return new Text(name);
case DOUBLE:
return new FloatingPoint(name);
case BOOLEAN:
return new Bit(name);
case BYTES:
return new Binary(name);
default:
return new Unknown(name);
}
}
public static Object[] extractGenericRecord(GenericRecord data) {
List<Schema.Field> fields = data.getSchema().getFields();
Object[] record = new Object[fields.size()];
for (int i = 0; i < fields.size(); i++) {
record[i] = data.get(i);
}
return record;
}
}