5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-09 18:20:01 +08:00

SQOOP-2235: Sqoop2: Move PrefixContext back to mapreduce execution engine

(Jarek Jarcec Cecho via Abraham Elmahrek)
This commit is contained in:
Abraham Elmahrek 2015-03-25 15:09:01 -07:00
parent 45d1c32d7e
commit 029e8ff56a
9 changed files with 23 additions and 30 deletions

View File

@ -23,13 +23,13 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.sqoop.common.PrefixContext;
import org.apache.sqoop.common.MutableMapContext;
import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
@ -40,7 +40,6 @@
import org.apache.sqoop.schema.type.FixedPoint;
import org.apache.sqoop.schema.type.FloatingPoint;
import org.apache.sqoop.schema.type.Text;
import org.testng.ITest;
import org.testng.annotations.AfterMethod;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
@ -100,15 +99,14 @@ public void tearDown() throws IOException {
@Test
public void testExtractor() throws Exception {
Configuration conf = new Configuration();
PrefixContext prefixContext = new PrefixContext(conf, "org.apache.sqoop.job.connector.from.context.");
MutableMapContext mutableContext = new MutableMapContext(new HashMap<String, String>());
final boolean[] visited = new boolean[NUMBER_OF_FILES * NUMBER_OF_ROWS_PER_FILE];
Schema schema = new Schema("schema").addColumn(new FixedPoint("col1", 4L, true))
.addColumn(new FloatingPoint("col2", 4L))
.addColumn(new Text("col3"))
.addColumn(new Text("col4"))
.addColumn(new Text("col5"));
ExtractorContext context = new ExtractorContext(prefixContext, new DataWriter() {
ExtractorContext context = new ExtractorContext(mutableContext, new DataWriter() {
@Override
public void writeArrayRecord(Object[] array) {
throw new AssertionError("Should not be writing array.");
@ -156,15 +154,14 @@ public void writeRecord(Object obj) {
@Test
public void testOverrideNull() throws Exception {
Configuration conf = new Configuration();
PrefixContext prefixContext = new PrefixContext(conf, "org.apache.sqoop.job.connector.from.context.");
MutableMapContext mutableContext = new MutableMapContext(new HashMap<String, String>());
final boolean[] visited = new boolean[NUMBER_OF_FILES * NUMBER_OF_ROWS_PER_FILE];
Schema schema = new Schema("schema").addColumn(new FixedPoint("col1", 4L, true))
.addColumn(new FloatingPoint("col2", 4L))
.addColumn(new Text("col3"))
.addColumn(new Text("col4"))
.addColumn(new Text("col5"));
ExtractorContext context = new ExtractorContext(prefixContext, new DataWriter() {
ExtractorContext context = new ExtractorContext(mutableContext, new DataWriter() {
@Override
public void writeArrayRecord(Object[] array) {
int index;

View File

@ -24,6 +24,7 @@
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
@ -34,7 +35,7 @@
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.sqoop.common.PrefixContext;
import org.apache.sqoop.common.MutableMapContext;
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.ToCompression;
import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
@ -102,10 +103,9 @@ public void testLoader() throws Exception {
.addColumn(new FloatingPoint("col2", 4L))
.addColumn(new Text("col3"));
Configuration conf = new Configuration();
conf.set("org.apache.sqoop.job.connector.from.context." + HdfsConstants.WORK_DIRECTORY, outputDirectory);
PrefixContext prefixContext = new PrefixContext(conf, "org.apache.sqoop.job.connector.from.context.");
LoaderContext context = new LoaderContext(prefixContext, new DataReader() {
MutableMapContext mutableContext = new MutableMapContext(new HashMap<String, String>());
mutableContext.setString(HdfsConstants.WORK_DIRECTORY, outputDirectory);
LoaderContext context = new LoaderContext(mutableContext, new DataReader() {
private long index = 0L;
@Override
@ -156,10 +156,9 @@ public void testOverrideNull() throws Exception {
.addColumn(new Text("col3"))
.addColumn(new Text("col4"));
Configuration conf = new Configuration();
conf.set("org.apache.sqoop.job.connector.from.context." + HdfsConstants.WORK_DIRECTORY, outputDirectory);
PrefixContext prefixContext = new PrefixContext(conf, "org.apache.sqoop.job.connector.from.context.");
LoaderContext context = new LoaderContext(prefixContext, new DataReader() {
MutableMapContext mutableContext = new MutableMapContext(new HashMap<String, String>());
mutableContext.setString(HdfsConstants.WORK_DIRECTORY, outputDirectory);
LoaderContext context = new LoaderContext(mutableContext, new DataReader() {
private long index = 0L;
@Override

View File

@ -23,13 +23,13 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.sqoop.common.PrefixContext;
import org.apache.sqoop.common.MapContext;
import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
@ -94,9 +94,7 @@ public static Object[][] data() {
@Test
public void testPartitioner() {
Configuration conf = new Configuration();
PrefixContext prefixContext = new PrefixContext(conf, "org.apache.sqoop.job.connector.from.context.");
PartitionerContext context = new PartitionerContext(prefixContext, 5, null);
PartitionerContext context = new PartitionerContext(new MapContext(new HashMap<String, String>()), 5, null);
LinkConfiguration linkConf = new LinkConfiguration();
FromJobConfiguration jobConf = new FromJobConfiguration();

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.common;
package org.apache.sqoop.job;
import org.apache.sqoop.classification.InterfaceAudience;
import org.apache.sqoop.classification.InterfaceStability;

View File

@ -20,7 +20,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.common.PrefixContext;
import org.apache.sqoop.job.PrefixContext;
import org.apache.sqoop.connector.matcher.Matcher;
import org.apache.sqoop.connector.matcher.MatcherFactory;
import org.apache.sqoop.job.MRJobConstants;

View File

@ -30,7 +30,7 @@
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.common.PrefixContext;
import org.apache.sqoop.job.PrefixContext;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.error.code.MRExecutionError;
import org.apache.sqoop.job.MRJobConstants;

View File

@ -33,7 +33,7 @@
import org.apache.sqoop.connector.matcher.MatcherFactory;
import org.apache.sqoop.job.MRJobConstants;
import org.apache.sqoop.error.code.MRExecutionError;
import org.apache.sqoop.common.PrefixContext;
import org.apache.sqoop.job.PrefixContext;
import org.apache.sqoop.job.etl.Extractor;
import org.apache.sqoop.job.etl.ExtractorContext;
import org.apache.sqoop.etl.io.DataWriter;

View File

@ -38,7 +38,7 @@
import org.apache.sqoop.connector.matcher.MatcherFactory;
import org.apache.sqoop.job.MRJobConstants;
import org.apache.sqoop.error.code.MRExecutionError;
import org.apache.sqoop.common.PrefixContext;
import org.apache.sqoop.job.PrefixContext;
import org.apache.sqoop.job.etl.Loader;
import org.apache.sqoop.job.etl.LoaderContext;
import org.apache.sqoop.etl.io.DataReader;

View File

@ -15,12 +15,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.common;
package org.apache.sqoop.job;
import org.apache.hadoop.conf.Configuration;
import org.testng.annotations.Test;
import java.util.HashMap;
import java.util.Map;
import static org.testng.Assert.assertEquals;