From 236b9ef9cc016e81dcd8122616f9014324c3973b Mon Sep 17 00:00:00 2001 From: Abraham Elmahrek Date: Thu, 18 Dec 2014 09:52:01 -0800 Subject: [PATCH] SQOOP-1869: Sqoop2: Expand schema matching to support two schemaless connectors (Gwen Shapira via Abraham Elmahrek) --- .../apache/sqoop/schema/ByteArraySchema.java | 40 +++++++++++ .../sqoop/connector/matcher/Matcher.java | 4 +- .../org/apache/sqoop/job/TestMatching.java | 39 +++++++++++ .../testcases/KafkaConnectorTestCase.java | 4 +- .../connector/kafka/FromHDFSToKafkaTest.java | 67 +++++++++++++++++++ 5 files changed, 152 insertions(+), 2 deletions(-) create mode 100644 common/src/main/java/org/apache/sqoop/schema/ByteArraySchema.java create mode 100644 test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromHDFSToKafkaTest.java diff --git a/common/src/main/java/org/apache/sqoop/schema/ByteArraySchema.java b/common/src/main/java/org/apache/sqoop/schema/ByteArraySchema.java new file mode 100644 index 00000000..4e2ab960 --- /dev/null +++ b/common/src/main/java/org/apache/sqoop/schema/ByteArraySchema.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.schema; + +import org.apache.sqoop.schema.type.Binary; + +/*** + * Schema holding a single field of Binary data + * Used to support connectors to schemaless / unstructured systems + * Such as HDFS or Kafka + */ +public class ByteArraySchema extends Schema { + + public static final ByteArraySchema instance = (ByteArraySchema) new ByteArraySchema() + .addColumn(new Binary("ByteArraySchema_Bytes")); + + public static ByteArraySchema getInstance() { + return instance; + } + + // To avoid instantiation + private ByteArraySchema() { + super("ByteArraySchema"); + } +} diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/Matcher.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/Matcher.java index 8ab13181..39e00077 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/Matcher.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/Matcher.java @@ -18,6 +18,7 @@ package org.apache.sqoop.connector.matcher; import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.schema.ByteArraySchema; import org.apache.sqoop.schema.Schema; public abstract class Matcher { @@ -27,7 +28,8 @@ public abstract class Matcher { public Matcher(Schema fromSchema, Schema toSchema) { if (fromSchema.isEmpty() && toSchema.isEmpty()) { - throw new SqoopException(MatcherError.MATCHER_0000, "Neither a FROM or TO schemas been provided."); + this.fromSchema = ByteArraySchema.getInstance(); + this.toSchema = ByteArraySchema.getInstance(); } else if (toSchema.isEmpty()) { this.fromSchema = fromSchema; this.toSchema = fromSchema; diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java index 1692ddbf..1b791e30 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java @@ -17,11 +17,14 @@ */ package org.apache.sqoop.job; +import static org.apache.sqoop.connector.common.SqoopIDFUtils.BYTE_FIELD_CHARSET; +import static org.apache.sqoop.connector.common.SqoopIDFUtils.toText; import static org.junit.Assert.assertEquals; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.Collection; import java.util.LinkedList; @@ -39,6 +42,8 @@ import org.apache.sqoop.connector.common.EmptyConfiguration; import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; import org.apache.sqoop.connector.idf.IntermediateDataFormat; +import org.apache.sqoop.connector.matcher.Matcher; +import org.apache.sqoop.connector.matcher.MatcherFactory; import org.apache.sqoop.job.etl.Extractor; import org.apache.sqoop.job.etl.ExtractorContext; import org.apache.sqoop.job.etl.Partition; @@ -49,6 +54,7 @@ import org.apache.sqoop.job.mr.SqoopInputFormat; import org.apache.sqoop.job.mr.SqoopMapper; import org.apache.sqoop.job.util.MRJobTestUtil; +import org.apache.sqoop.schema.NullSchema; import org.apache.sqoop.schema.Schema; import org.apache.sqoop.schema.type.FixedPoint; import org.apache.sqoop.schema.type.FloatingPoint; @@ -156,6 +162,39 @@ public void testSchemaMatching() throws Exception { } } + + @Test + public void testSchemalessFromAndTo() throws UnsupportedEncodingException { + CSVIntermediateDataFormat dataFormat = new CSVIntermediateDataFormat(); + String testData = "\"This is the data you are looking for. It has no structure.\""; + Object[] testObject = new Object[] {testData.getBytes(BYTE_FIELD_CHARSET)}; + Object[] testObjectCopy = new Object[1]; + System.arraycopy(testObject,0,testObjectCopy,0,testObject.length); + + Matcher matcher = MatcherFactory.getMatcher(NullSchema.getInstance(), + NullSchema.getInstance()); + // Checking FROM side only because currently that is the only IDF that is used + dataFormat.setSchema(matcher.getFromSchema()); + + // Setting data as CSV and validating getting CSV and object + dataFormat.setCSVTextData(testData); + + String validateCSV = dataFormat.getCSVTextData(); + Object[] validateObj = dataFormat.getObjectData(); + + assertEquals(testData, validateCSV); + assertEquals(testObject, validateObj); + + // Setting data as Object + dataFormat.setObjectData(testObject); + + validateCSV = toText(dataFormat.getCSVTextData()); + validateObj = dataFormat.getObjectData(); + + assertEquals(testData, validateCSV); + assertEquals(testObjectCopy, validateObj); + } + public static class DummyPartition extends Partition { private int id; diff --git a/test/src/main/java/org/apache/sqoop/test/testcases/KafkaConnectorTestCase.java b/test/src/main/java/org/apache/sqoop/test/testcases/KafkaConnectorTestCase.java index 41d43c02..804516b9 100644 --- a/test/src/main/java/org/apache/sqoop/test/testcases/KafkaConnectorTestCase.java +++ b/test/src/main/java/org/apache/sqoop/test/testcases/KafkaConnectorTestCase.java @@ -32,6 +32,8 @@ import java.util.ArrayList; import java.util.List; +import static org.apache.sqoop.connector.common.SqoopIDFUtils.toText; + public class KafkaConnectorTestCase extends ConnectorTestCase { private static TestUtil testUtil = TestUtil.getInstance(); private static final String TOPIC = "mytopic"; @@ -72,7 +74,7 @@ protected void validateContent(String[] content) throws UnsupportedEncodingExcep MessageAndMetadata fetchedMsg = testUtil.getNextMessageFromConsumer(TOPIC); Assert.assertEquals(str, - new String(fetchedMsg.message(), "UTF-8")); + toText(new String(fetchedMsg.message(), "UTF-8"))); } } } diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromHDFSToKafkaTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromHDFSToKafkaTest.java new file mode 100644 index 00000000..dabb69d2 --- /dev/null +++ b/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromHDFSToKafkaTest.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.integration.connector.kafka; + +import org.apache.sqoop.model.MDriverConfig; +import org.apache.sqoop.model.MJob; +import org.apache.sqoop.model.MLink; +import org.apache.sqoop.test.testcases.KafkaConnectorTestCase; +import org.junit.Test; + + +public class FromHDFSToKafkaTest extends KafkaConnectorTestCase { + + public static final String[] input = { + "A BIRD came down the walk:", + "He did not know I saw;", + "He bit an angle-worm in halves", + "And ate the fellow raw." + }; + @Test + public void testBasic() throws Exception { + createFromFile("input-0001",input); + + // Create Kafka link + MLink kafkaLink = getClient().createLink("kafka-connector"); + fillKafkaLinkConfig(kafkaLink); + saveLink(kafkaLink); + + // HDFS link + MLink hdfsLink = getClient().createLink("hdfs-connector"); + saveLink(hdfsLink); + + // Job creation + MJob job = getClient().createJob(hdfsLink.getPersistenceId(), kafkaLink.getPersistenceId()); + + // Job connector configs + fillHdfsFromConfig(job); + fillKafkaToConfig(job); + + // driver config + MDriverConfig driverConfig = job.getDriverConfig(); + driverConfig.getIntegerInput("throttlingConfig.numExtractors").setValue(3); + saveJob(job); + + executeJob(job); + + // this will assert the content of the array matches the content of the topic + validateContent(input); + } + + +} \ No newline at end of file