5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-05 04:20:08 +08:00

SQOOP-1428: Sqoop2: From/To: Rebase against Sqoop2 branch for SQOOP-777

This commit is contained in:
Abraham Elmahrek 2014-08-11 11:45:40 -07:00
parent f01ab4d8d7
commit 4511fc1a99
7 changed files with 22 additions and 25 deletions

View File

@ -351,8 +351,8 @@ public MSubmission submit(long jobId, HttpEventContext ctx) {
request.setJobId(job.getPersistenceId()); request.setJobId(job.getPersistenceId());
request.setNotificationUrl(notificationBaseUrl + jobId); request.setNotificationUrl(notificationBaseUrl + jobId);
Class<? extends IntermediateDataFormat<?>> dataFormatClass = Class<? extends IntermediateDataFormat<?>> dataFormatClass =
connector.getIntermediateDataFormat(); fromConnector.getIntermediateDataFormat();
request.setIntermediateDataFormat(connector.getIntermediateDataFormat()); request.setIntermediateDataFormat(fromConnector.getIntermediateDataFormat());
// Create request object // Create request object
// Let's register all important jars // Let's register all important jars

View File

@ -45,7 +45,9 @@ public SubmissionRequest createSubmissionRequest() {
return new MRSubmissionRequest(); return new MRSubmissionRequest();
} }
public void prepareSubmission(MRSubmissionRequest request) { public void prepareSubmission(SubmissionRequest gRequest) {
MRSubmissionRequest request = (MRSubmissionRequest)gRequest;
// Add jar dependencies // Add jar dependencies
addDependencies(request); addDependencies(request);

View File

@ -235,23 +235,17 @@ public static Object getConfigFrameworkJob(Configuration configuration) {
* @param job MapReduce Job object * @param job MapReduce Job object
* @param schema Schema * @param schema Schema
*/ */
public static void setFromConnectorSchema(Job job, Schema schema) { public static void setConnectorSchema(ConnectorType type, Job job, Schema schema) {
if(schema != null) { if(schema != null) {
switch (type) {
case FROM:
job.getCredentials().addSecretKey(SCHEMA_FROM_CONNECTOR_KEY, SchemaSerialization.extractSchema(schema).toJSONString().getBytes()); job.getCredentials().addSecretKey(SCHEMA_FROM_CONNECTOR_KEY, SchemaSerialization.extractSchema(schema).toJSONString().getBytes());
}
}
/** case TO:
* Persist To Connector generated schema.
*
* @param job MapReduce Job object
* @param schema Schema
*/
public static void setToConnectorSchema(Job job, Schema schema) {
if(schema != null) {
job.getCredentials().addSecretKey(SCHEMA_TO_CONNECTOR_KEY, SchemaSerialization.extractSchema(schema).toJSONString().getBytes()); job.getCredentials().addSecretKey(SCHEMA_TO_CONNECTOR_KEY, SchemaSerialization.extractSchema(schema).toJSONString().getBytes());
} }
} }
}
/** /**
* Persist Framework generated schema. * Persist Framework generated schema.

View File

@ -65,10 +65,14 @@ public void run(Context context) throws IOException, InterruptedException {
String extractorName = conf.get(JobConstants.JOB_ETL_EXTRACTOR); String extractorName = conf.get(JobConstants.JOB_ETL_EXTRACTOR);
Extractor extractor = (Extractor) ClassUtils.instantiate(extractorName); Extractor extractor = (Extractor) ClassUtils.instantiate(extractorName);
// Propagate connector schema in every case for now
// TODO: Change to coditional choosing between Connector schemas.
Schema schema = ConfigurationUtils.getConnectorSchema(ConnectorType.FROM, conf);
String intermediateDataFormatName = conf.get(JobConstants String intermediateDataFormatName = conf.get(JobConstants
.INTERMEDIATE_DATA_FORMAT); .INTERMEDIATE_DATA_FORMAT);
data = (IntermediateDataFormat) ClassUtils.instantiate(intermediateDataFormatName); data = (IntermediateDataFormat) ClassUtils.instantiate(intermediateDataFormatName);
data.setSchema(ConfigurationUtils.getConnectorSchema(conf)); data.setSchema(schema);
dataOut = new SqoopWritable(); dataOut = new SqoopWritable();
// Objects that should be pass to the Executor execution // Objects that should be pass to the Executor execution
@ -76,10 +80,6 @@ public void run(Context context) throws IOException, InterruptedException {
Object configConnection = null; Object configConnection = null;
Object configJob = null; Object configJob = null;
// Propagate connector schema in every case for now
// TODO: Change to coditional choosing between Connector schemas.
Schema schema = ConfigurationUtils.getConnectorSchema(ConnectorType.FROM, conf);
// Get configs for extractor // Get configs for extractor
subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT); subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT);
configConnection = ConfigurationUtils.getConnectorConnectionConfig(ConnectorType.FROM, conf); configConnection = ConfigurationUtils.getConnectorConnectionConfig(ConnectorType.FROM, conf);

View File

@ -73,7 +73,7 @@ public SqoopOutputFormatLoadExecutor(JobContext jobctx) {
producer = new SqoopRecordWriter(); producer = new SqoopRecordWriter();
data = (IntermediateDataFormat) ClassUtils.instantiate(context data = (IntermediateDataFormat) ClassUtils.instantiate(context
.getConfiguration().get(JobConstants.INTERMEDIATE_DATA_FORMAT)); .getConfiguration().get(JobConstants.INTERMEDIATE_DATA_FORMAT));
data.setSchema(ConfigurationUtils.getConnectorSchema(context.getConfiguration())); data.setSchema(ConfigurationUtils.getConnectorSchema(ConnectorType.FROM, context.getConfiguration()));
} }
public RecordWriter<SqoopWritable, NullWritable> getRecordWriter() { public RecordWriter<SqoopWritable, NullWritable> getRecordWriter() {

View File

@ -96,13 +96,13 @@ public class TestConfigurationUtils {
// //
// @Test // @Test
// public void testConnectorSchema() throws Exception { // public void testConnectorSchema() throws Exception {
// ConfigurationUtils.setFromConnectorSchema(job, getSchema("a")); // ConfigurationUtils.setConnectorSchema(job, getSchema("a"));
// assertEquals(getSchema("a"), ConfigurationUtils.getFromConnectorSchema(jobConf)); // assertEquals(getSchema("a"), ConfigurationUtils.getFromConnectorSchema(jobConf));
// } // }
// //
// @Test // @Test
// public void testConnectorSchemaNull() throws Exception { // public void testConnectorSchemaNull() throws Exception {
// ConfigurationUtils.setFromConnectorSchema(job, null); // ConfigurationUtils.setConnectorSchema(job, null);
// assertNull(ConfigurationUtils.getFromConnectorSchema(jobConf)); // assertNull(ConfigurationUtils.getFromConnectorSchema(jobConf));
// } // }
// //

View File

@ -207,7 +207,8 @@ public boolean submit(SubmissionRequest generalRequest) {
ConfigurationUtils.setFrameworkConnectionConfig(ConnectorType.FROM, job, request.getFrameworkConnectionConfig(ConnectorType.FROM)); ConfigurationUtils.setFrameworkConnectionConfig(ConnectorType.FROM, job, request.getFrameworkConnectionConfig(ConnectorType.FROM));
ConfigurationUtils.setFrameworkConnectionConfig(ConnectorType.TO, job, request.getFrameworkConnectionConfig(ConnectorType.TO)); ConfigurationUtils.setFrameworkConnectionConfig(ConnectorType.TO, job, request.getFrameworkConnectionConfig(ConnectorType.TO));
ConfigurationUtils.setConfigFrameworkJob(job, request.getConfigFrameworkJob()); ConfigurationUtils.setConfigFrameworkJob(job, request.getConfigFrameworkJob());
ConfigurationUtils.setConnectorSchema(job, request.getSummary().getConnectorSchema()); // @TODO(Abe): Persist TO schema.
ConfigurationUtils.setConnectorSchema(ConnectorType.FROM, job, request.getSummary().getConnectorSchema());
if(request.getJobName() != null) { if(request.getJobName() != null) {
job.setJobName("Sqoop: " + request.getJobName()); job.setJobName("Sqoop: " + request.getJobName());