From 4511fc1a99ffcef4b8a1c3d60a40e2f5049f93a8 Mon Sep 17 00:00:00 2001 From: Abraham Elmahrek Date: Mon, 11 Aug 2014 11:45:40 -0700 Subject: [PATCH] SQOOP-1428: Sqoop2: From/To: Rebase against Sqoop2 branch for SQOOP-777 --- .../apache/sqoop/framework/JobManager.java | 4 ++-- .../mapreduce/MapreduceExecutionEngine.java | 4 +++- .../sqoop/job/mr/ConfigurationUtils.java | 20 +++++++------------ .../org/apache/sqoop/job/mr/SqoopMapper.java | 10 +++++----- .../job/mr/SqoopOutputFormatLoadExecutor.java | 2 +- .../sqoop/job/mr/TestConfigurationUtils.java | 4 ++-- .../mapreduce/MapreduceSubmissionEngine.java | 3 ++- 7 files changed, 22 insertions(+), 25 deletions(-) diff --git a/core/src/main/java/org/apache/sqoop/framework/JobManager.java b/core/src/main/java/org/apache/sqoop/framework/JobManager.java index e0bf0110..d7d89624 100644 --- a/core/src/main/java/org/apache/sqoop/framework/JobManager.java +++ b/core/src/main/java/org/apache/sqoop/framework/JobManager.java @@ -351,8 +351,8 @@ public MSubmission submit(long jobId, HttpEventContext ctx) { request.setJobId(job.getPersistenceId()); request.setNotificationUrl(notificationBaseUrl + jobId); Class> dataFormatClass = - connector.getIntermediateDataFormat(); - request.setIntermediateDataFormat(connector.getIntermediateDataFormat()); + fromConnector.getIntermediateDataFormat(); + request.setIntermediateDataFormat(fromConnector.getIntermediateDataFormat()); // Create request object // Let's register all important jars diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java index 82b195a2..ff328cb0 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java @@ -45,7 +45,9 @@ public SubmissionRequest createSubmissionRequest() { return new MRSubmissionRequest(); } - public void prepareSubmission(MRSubmissionRequest request) { + public void prepareSubmission(SubmissionRequest gRequest) { + MRSubmissionRequest request = (MRSubmissionRequest)gRequest; + // Add jar dependencies addDependencies(request); diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java index c60ae688..476689aa 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java @@ -235,21 +235,15 @@ public static Object getConfigFrameworkJob(Configuration configuration) { * @param job MapReduce Job object * @param schema Schema */ - public static void setFromConnectorSchema(Job job, Schema schema) { + public static void setConnectorSchema(ConnectorType type, Job job, Schema schema) { if(schema != null) { - job.getCredentials().addSecretKey(SCHEMA_FROM_CONNECTOR_KEY, SchemaSerialization.extractSchema(schema).toJSONString().getBytes()); - } - } + switch (type) { + case FROM: + job.getCredentials().addSecretKey(SCHEMA_FROM_CONNECTOR_KEY, SchemaSerialization.extractSchema(schema).toJSONString().getBytes()); - /** - * Persist To Connector generated schema. - * - * @param job MapReduce Job object - * @param schema Schema - */ - public static void setToConnectorSchema(Job job, Schema schema) { - if(schema != null) { - job.getCredentials().addSecretKey(SCHEMA_TO_CONNECTOR_KEY, SchemaSerialization.extractSchema(schema).toJSONString().getBytes()); + case TO: + job.getCredentials().addSecretKey(SCHEMA_TO_CONNECTOR_KEY, SchemaSerialization.extractSchema(schema).toJSONString().getBytes()); + } } } diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java index 2daaee36..c3b6ae93 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java @@ -65,10 +65,14 @@ public void run(Context context) throws IOException, InterruptedException { String extractorName = conf.get(JobConstants.JOB_ETL_EXTRACTOR); Extractor extractor = (Extractor) ClassUtils.instantiate(extractorName); + // Propagate connector schema in every case for now + // TODO: Change to coditional choosing between Connector schemas. + Schema schema = ConfigurationUtils.getConnectorSchema(ConnectorType.FROM, conf); + String intermediateDataFormatName = conf.get(JobConstants .INTERMEDIATE_DATA_FORMAT); data = (IntermediateDataFormat) ClassUtils.instantiate(intermediateDataFormatName); - data.setSchema(ConfigurationUtils.getConnectorSchema(conf)); + data.setSchema(schema); dataOut = new SqoopWritable(); // Objects that should be pass to the Executor execution @@ -76,10 +80,6 @@ public void run(Context context) throws IOException, InterruptedException { Object configConnection = null; Object configJob = null; - // Propagate connector schema in every case for now - // TODO: Change to coditional choosing between Connector schemas. - Schema schema = ConfigurationUtils.getConnectorSchema(ConnectorType.FROM, conf); - // Get configs for extractor subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT); configConnection = ConfigurationUtils.getConnectorConnectionConfig(ConnectorType.FROM, conf); diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java index 123737e1..bed99a20 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java @@ -73,7 +73,7 @@ public SqoopOutputFormatLoadExecutor(JobContext jobctx) { producer = new SqoopRecordWriter(); data = (IntermediateDataFormat) ClassUtils.instantiate(context .getConfiguration().get(JobConstants.INTERMEDIATE_DATA_FORMAT)); - data.setSchema(ConfigurationUtils.getConnectorSchema(context.getConfiguration())); + data.setSchema(ConfigurationUtils.getConnectorSchema(ConnectorType.FROM, context.getConfiguration())); } public RecordWriter getRecordWriter() { diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestConfigurationUtils.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestConfigurationUtils.java index 7e434b70..09f56954 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestConfigurationUtils.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestConfigurationUtils.java @@ -96,13 +96,13 @@ public class TestConfigurationUtils { // // @Test // public void testConnectorSchema() throws Exception { -// ConfigurationUtils.setFromConnectorSchema(job, getSchema("a")); +// ConfigurationUtils.setConnectorSchema(job, getSchema("a")); // assertEquals(getSchema("a"), ConfigurationUtils.getFromConnectorSchema(jobConf)); // } // // @Test // public void testConnectorSchemaNull() throws Exception { -// ConfigurationUtils.setFromConnectorSchema(job, null); +// ConfigurationUtils.setConnectorSchema(job, null); // assertNull(ConfigurationUtils.getFromConnectorSchema(jobConf)); // } // diff --git a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java index 3c214210..fd423cbd 100644 --- a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java +++ b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java @@ -207,7 +207,8 @@ public boolean submit(SubmissionRequest generalRequest) { ConfigurationUtils.setFrameworkConnectionConfig(ConnectorType.FROM, job, request.getFrameworkConnectionConfig(ConnectorType.FROM)); ConfigurationUtils.setFrameworkConnectionConfig(ConnectorType.TO, job, request.getFrameworkConnectionConfig(ConnectorType.TO)); ConfigurationUtils.setConfigFrameworkJob(job, request.getConfigFrameworkJob()); - ConfigurationUtils.setConnectorSchema(job, request.getSummary().getConnectorSchema()); + // @TODO(Abe): Persist TO schema. + ConfigurationUtils.setConnectorSchema(ConnectorType.FROM, job, request.getSummary().getConnectorSchema()); if(request.getJobName() != null) { job.setJobName("Sqoop: " + request.getJobName());