5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-05 04:31:48 +08:00

SQOOP-1869: Sqoop2: Expand schema matching to support two schemaless connectors

(Gwen Shapira via Abraham Elmahrek)
This commit is contained in:
Abraham Elmahrek 2014-12-18 09:52:01 -08:00
parent 1fa2d6c0de
commit 236b9ef9cc
5 changed files with 152 additions and 2 deletions

View File

@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.schema;
import org.apache.sqoop.schema.type.Binary;
/***
* Schema holding a single field of Binary data
* Used to support connectors to schemaless / unstructured systems
* Such as HDFS or Kafka
*/
public class ByteArraySchema extends Schema {
public static final ByteArraySchema instance = (ByteArraySchema) new ByteArraySchema()
.addColumn(new Binary("ByteArraySchema_Bytes"));
public static ByteArraySchema getInstance() {
return instance;
}
// To avoid instantiation
private ByteArraySchema() {
super("ByteArraySchema");
}
}

View File

@ -18,6 +18,7 @@
package org.apache.sqoop.connector.matcher; package org.apache.sqoop.connector.matcher;
import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.schema.ByteArraySchema;
import org.apache.sqoop.schema.Schema; import org.apache.sqoop.schema.Schema;
public abstract class Matcher { public abstract class Matcher {
@ -27,7 +28,8 @@ public abstract class Matcher {
public Matcher(Schema fromSchema, Schema toSchema) { public Matcher(Schema fromSchema, Schema toSchema) {
if (fromSchema.isEmpty() && toSchema.isEmpty()) { if (fromSchema.isEmpty() && toSchema.isEmpty()) {
throw new SqoopException(MatcherError.MATCHER_0000, "Neither a FROM or TO schemas been provided."); this.fromSchema = ByteArraySchema.getInstance();
this.toSchema = ByteArraySchema.getInstance();
} else if (toSchema.isEmpty()) { } else if (toSchema.isEmpty()) {
this.fromSchema = fromSchema; this.fromSchema = fromSchema;
this.toSchema = fromSchema; this.toSchema = fromSchema;

View File

@ -17,11 +17,14 @@
*/ */
package org.apache.sqoop.job; package org.apache.sqoop.job;
import static org.apache.sqoop.connector.common.SqoopIDFUtils.BYTE_FIELD_CHARSET;
import static org.apache.sqoop.connector.common.SqoopIDFUtils.toText;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import java.io.DataInput; import java.io.DataInput;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.IOException; import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.LinkedList; import java.util.LinkedList;
@ -39,6 +42,8 @@
import org.apache.sqoop.connector.common.EmptyConfiguration; import org.apache.sqoop.connector.common.EmptyConfiguration;
import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
import org.apache.sqoop.connector.idf.IntermediateDataFormat; import org.apache.sqoop.connector.idf.IntermediateDataFormat;
import org.apache.sqoop.connector.matcher.Matcher;
import org.apache.sqoop.connector.matcher.MatcherFactory;
import org.apache.sqoop.job.etl.Extractor; import org.apache.sqoop.job.etl.Extractor;
import org.apache.sqoop.job.etl.ExtractorContext; import org.apache.sqoop.job.etl.ExtractorContext;
import org.apache.sqoop.job.etl.Partition; import org.apache.sqoop.job.etl.Partition;
@ -49,6 +54,7 @@
import org.apache.sqoop.job.mr.SqoopInputFormat; import org.apache.sqoop.job.mr.SqoopInputFormat;
import org.apache.sqoop.job.mr.SqoopMapper; import org.apache.sqoop.job.mr.SqoopMapper;
import org.apache.sqoop.job.util.MRJobTestUtil; import org.apache.sqoop.job.util.MRJobTestUtil;
import org.apache.sqoop.schema.NullSchema;
import org.apache.sqoop.schema.Schema; import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.schema.type.FixedPoint; import org.apache.sqoop.schema.type.FixedPoint;
import org.apache.sqoop.schema.type.FloatingPoint; import org.apache.sqoop.schema.type.FloatingPoint;
@ -156,6 +162,39 @@ public void testSchemaMatching() throws Exception {
} }
} }
@Test
public void testSchemalessFromAndTo() throws UnsupportedEncodingException {
CSVIntermediateDataFormat dataFormat = new CSVIntermediateDataFormat();
String testData = "\"This is the data you are looking for. It has no structure.\"";
Object[] testObject = new Object[] {testData.getBytes(BYTE_FIELD_CHARSET)};
Object[] testObjectCopy = new Object[1];
System.arraycopy(testObject,0,testObjectCopy,0,testObject.length);
Matcher matcher = MatcherFactory.getMatcher(NullSchema.getInstance(),
NullSchema.getInstance());
// Checking FROM side only because currently that is the only IDF that is used
dataFormat.setSchema(matcher.getFromSchema());
// Setting data as CSV and validating getting CSV and object
dataFormat.setCSVTextData(testData);
String validateCSV = dataFormat.getCSVTextData();
Object[] validateObj = dataFormat.getObjectData();
assertEquals(testData, validateCSV);
assertEquals(testObject, validateObj);
// Setting data as Object
dataFormat.setObjectData(testObject);
validateCSV = toText(dataFormat.getCSVTextData());
validateObj = dataFormat.getObjectData();
assertEquals(testData, validateCSV);
assertEquals(testObjectCopy, validateObj);
}
public static class DummyPartition extends Partition { public static class DummyPartition extends Partition {
private int id; private int id;

View File

@ -32,6 +32,8 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import static org.apache.sqoop.connector.common.SqoopIDFUtils.toText;
public class KafkaConnectorTestCase extends ConnectorTestCase { public class KafkaConnectorTestCase extends ConnectorTestCase {
private static TestUtil testUtil = TestUtil.getInstance(); private static TestUtil testUtil = TestUtil.getInstance();
private static final String TOPIC = "mytopic"; private static final String TOPIC = "mytopic";
@ -72,7 +74,7 @@ protected void validateContent(String[] content) throws UnsupportedEncodingExcep
MessageAndMetadata<byte[],byte[]> fetchedMsg = MessageAndMetadata<byte[],byte[]> fetchedMsg =
testUtil.getNextMessageFromConsumer(TOPIC); testUtil.getNextMessageFromConsumer(TOPIC);
Assert.assertEquals(str, Assert.assertEquals(str,
new String(fetchedMsg.message(), "UTF-8")); toText(new String(fetchedMsg.message(), "UTF-8")));
} }
} }
} }

View File

@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.integration.connector.kafka;
import org.apache.sqoop.model.MDriverConfig;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MLink;
import org.apache.sqoop.test.testcases.KafkaConnectorTestCase;
import org.junit.Test;
public class FromHDFSToKafkaTest extends KafkaConnectorTestCase {
public static final String[] input = {
"A BIRD came down the walk:",
"He did not know I saw;",
"He bit an angle-worm in halves",
"And ate the fellow raw."
};
@Test
public void testBasic() throws Exception {
createFromFile("input-0001",input);
// Create Kafka link
MLink kafkaLink = getClient().createLink("kafka-connector");
fillKafkaLinkConfig(kafkaLink);
saveLink(kafkaLink);
// HDFS link
MLink hdfsLink = getClient().createLink("hdfs-connector");
saveLink(hdfsLink);
// Job creation
MJob job = getClient().createJob(hdfsLink.getPersistenceId(), kafkaLink.getPersistenceId());
// Job connector configs
fillHdfsFromConfig(job);
fillKafkaToConfig(job);
// driver config
MDriverConfig driverConfig = job.getDriverConfig();
driverConfig.getIntegerInput("throttlingConfig.numExtractors").setValue(3);
saveJob(job);
executeJob(job);
// this will assert the content of the array matches the content of the topic
validateContent(input);
}
}