5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-03 08:00:10 +08:00

SQOOP-2936: Provide Apache Atlas integration for hcatalog based exports

(Balu Vellanki via Venkat Ranganathan)
This commit is contained in:
Venkat Ranganathan 2016-08-20 23:40:28 -07:00
parent 3bd2952a9b
commit b9794f98e3
8 changed files with 233 additions and 91 deletions

View File

@ -19,9 +19,10 @@
package org.apache.sqoop;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.sqoop.mapreduce.ExportJobBase;
import org.apache.sqoop.mapreduce.ImportJobBase;
import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
import java.io.IOException;
import java.util.Properties;
/**
@ -94,40 +95,58 @@ private void init(String operation, String url, String user, String storeType, S
this.url = url;
this.user = user;
this.storeType = storeType;
this.storeTable = storeTable;
this.storeTable = (storeTable == null) ? hiveTable : storeTable;
this.storeQuery = storeQuery;
this.hiveDB = hiveDB;
if (this.hiveDB == null) {
this.hiveDB = SqoopHCatUtilities.DEFHCATDB;
}
this.hiveDB = (hiveDB == null) ? SqoopHCatUtilities.DEFHCATDB : hiveDB;
this.hiveTable = hiveTable;
this.commandLineOpts = commandLineOpts;
this.startTime = startTime;
this.endTime = endTime;
}
public Data(String operation, String url, String user, String storeType, String storeTable,
String storeQuery, String hiveDB, String hiveTable, Properties commandLineOpts,
long startTime, long endTime) {
public Data(String operation, String url, String user, String storeType,
String storeTable, String storeQuery, String hiveDB, String hiveTable,
Properties commandLineOpts, long startTime, long endTime) throws Exception{
init(operation, url, user, storeType, storeTable, storeQuery,
hiveDB, hiveTable, commandLineOpts, startTime, endTime);
}
public Data(SqoopOptions options, String tableName, long startTime, long endTime) throws IOException {
String hiveTableName = options.doHiveImport() ?
options.getHiveTableName() : options.getHCatTableName();
String hiveDatabase = options.doHiveImport() ?
options.getHiveDatabaseName() : options.getHCatDatabaseName();
public Data(String operation, SqoopOptions options, String tableName,
long startTime, long endTime) throws Exception {
String hiveTableName = null;
String hiveDatabase = null;
if (ExportJobBase.OPERATION.equals(operation)) {
// export job data
hiveTableName = options.getHCatTableName();
hiveDatabase = options.getHCatDatabaseName();
} else if (ImportJobBase.OPERATION.equals(operation)){
// import job data
hiveTableName = options.doHiveImport() ?
options.getHiveTableName() : options.getHCatTableName();
hiveDatabase = options.doHiveImport() ?
options.getHiveDatabaseName() : options.getHCatDatabaseName();
} else {
throw new Exception("Data published for unsupported Operation "
+ operation + " in SqoopJobDataPublisher");
}
String dataStoreType = JDBC_STORE;
String[] storeTypeFields = options.getConnectString().split(":");
if (storeTypeFields.length > 2) {
dataStoreType = storeTypeFields[1];
}
init("import", options.getConnectString(), UserGroupInformation.getCurrentUser().getShortUserName(),
init(operation, options.getConnectString(), UserGroupInformation.getCurrentUser().getShortUserName(),
dataStoreType, tableName, options.getSqlQuery(), hiveDatabase, hiveTableName,
options.writeProperties(), startTime, endTime);
}
public String toString() {
return "Operation=" + operation + ", Url=" + url + ", User=" + user + ", StoreType=" + storeType
+ ", StoreTable=" + storeTable + ", StoreQuery=" + storeQuery + ", HiveDB=" + hiveDB
+ ", HiveTable=" + hiveTable + ", StartTime=" + startTime + ", EndTime=" + endTime
+ ", CmdLineArgs=" + commandLineOpts;
}
}
public void publish(Data data) throws Exception{

View File

@ -18,10 +18,14 @@
package org.apache.sqoop.mapreduce;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.sql.SQLException;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.config.ConfigurationHelper;
import com.cloudera.sqoop.lib.SqoopRecord;
import com.cloudera.sqoop.manager.ConnManager;
import com.cloudera.sqoop.manager.ExportJobContext;
import com.cloudera.sqoop.mapreduce.JobBase;
import com.cloudera.sqoop.orm.TableClassName;
import com.cloudera.sqoop.util.ExportException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -39,18 +43,13 @@
import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
import org.apache.sqoop.util.LoggingUtils;
import org.apache.sqoop.util.PerfCounters;
import org.apache.sqoop.validation.ValidationContext;
import org.apache.sqoop.validation.ValidationException;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.SqoopOptions.InvalidOptionsException;
import com.cloudera.sqoop.config.ConfigurationHelper;
import com.cloudera.sqoop.lib.SqoopRecord;
import com.cloudera.sqoop.manager.ConnManager;
import com.cloudera.sqoop.manager.ExportJobContext;
import com.cloudera.sqoop.orm.TableClassName;
import com.cloudera.sqoop.mapreduce.JobBase;
import com.cloudera.sqoop.util.ExportException;
import org.apache.sqoop.validation.*;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Date;
/**
* Base class for running an export MapReduce job.
@ -94,6 +93,10 @@ public enum FileType {
private static final String HADOOP_MAP_TASK_MAX_ATTEMTPS =
"mapred.map.max.attempts";
/** Start and endtime captured for export job. */
private long startTime;
public static final String OPERATION = "export";
protected ExportJobContext context;
@ -107,6 +110,7 @@ public ExportJobBase(final ExportJobContext ctxt,
final Class<? extends OutputFormat> outputFormatClass) {
super(ctxt.getOptions(), mapperClass, inputFormatClass, outputFormatClass);
this.context = ctxt;
this.startTime = new Date().getTime();
}
/**
@ -439,12 +443,20 @@ public void runExport() throws ExportException, IOException {
setJob(job);
boolean success = runJob(job);
if (!success) {
LOG.error("Export job failed!");
throw new ExportException("Export job failed!");
}
if (options.isValidationEnabled()) {
validateExport(tableName, conf, job);
}
if (isHCatJob) {
// Publish export job data for hcat export operation
LOG.info("Publishing HCatalog export job data to Listeners");
PublishJobData.publishJobData(conf, options, OPERATION, tableName, startTime);
}
} catch (InterruptedException ie) {
throw new IOException(ie);
} catch (ClassNotFoundException cnfe) {

View File

@ -27,7 +27,6 @@
import com.cloudera.sqoop.util.ImportException;
import org.apache.avro.file.DataFileConstants;
import org.apache.avro.mapred.AvroJob;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -42,8 +41,6 @@
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.sqoop.SqoopJobDataPublisher;
import org.apache.sqoop.config.ConfigurationConstants;
import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
import org.apache.sqoop.util.PerfCounters;
import org.apache.sqoop.validation.ValidationContext;
@ -61,7 +58,7 @@ public class ImportJobBase extends JobBase {
private ImportJobContext context;
private long startTime;
private long endTime;
public static final String OPERATION = "import";
public static final Log LOG = LogFactory.getLog(
ImportJobBase.class.getName());
@ -280,28 +277,13 @@ public void runImport(String tableName, String ormJarFile, String splitByCol,
if (options.isValidationEnabled()) {
validateImport(tableName, conf, job);
}
this.endTime = new Date().getTime();
String publishClassName = conf.get(ConfigurationConstants.DATA_PUBLISH_CLASS);
if (!StringUtils.isEmpty(publishClassName)) {
try {
Class publishClass = Class.forName(publishClassName);
Object obj = publishClass.newInstance();
if (obj instanceof SqoopJobDataPublisher) {
SqoopJobDataPublisher publisher = (SqoopJobDataPublisher) obj;
if (options.doHiveImport() || options.getHCatTableName() != null) {
// We need to publish the details
SqoopJobDataPublisher.Data data =
new SqoopJobDataPublisher.Data(options, tableName, startTime, endTime);
publisher.publish(data);
}
} else {
LOG.warn("Publisher class not an instance of SqoopJobDataPublisher. Ignoring...");
}
} catch (Exception ex) {
LOG.warn("Unable to publish data to publisher " + ex.getMessage(), ex);
}
if (options.doHiveImport() || isHCatJob) {
// Publish data for import job, only hive/hcat import jobs are supported now.
LOG.info("Publishing Hive/Hcat import job data to Listeners for table " + tableName);
PublishJobData.publishJobData(conf, options, OPERATION, tableName, startTime);
}
} catch (InterruptedException ie) {
throw new IOException(ie);
} catch (ClassNotFoundException cnfe) {

View File

@ -0,0 +1,62 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.sqoop.SqoopJobDataPublisher;
import org.apache.sqoop.SqoopOptions;
import org.apache.sqoop.config.ConfigurationConstants;
import java.util.Date;
/**
* Util class to publish job data to listeners.
*/
public final class PublishJobData {
public static final Log LOG = LogFactory.getLog(PublishJobData.class.getName());
private PublishJobData() {}
public static void publishJobData(Configuration conf, SqoopOptions options,
String operation, String tableName, long startTime) {
// Publish metadata about export job to listeners (if they are registered with sqoop)
long endTime = new Date().getTime();
String publishClassName = conf.get(ConfigurationConstants.DATA_PUBLISH_CLASS);
if (!StringUtils.isEmpty(publishClassName)) {
try {
Class publishClass = Class.forName(publishClassName);
Object obj = publishClass.newInstance();
if (obj instanceof SqoopJobDataPublisher) {
SqoopJobDataPublisher publisher = (SqoopJobDataPublisher) obj;
SqoopJobDataPublisher.Data data =
new SqoopJobDataPublisher.Data(operation, options, tableName, startTime, endTime);
LOG.info("Published data is " + data.toString());
publisher.publish(data);
} else {
LOG.warn("Publisher class not an instance of SqoopJobDataPublisher. Ignoring...");
}
} catch (Exception ex) {
LOG.warn("Unable to publish " + operation + " data to publisher " + ex.getMessage(), ex);
}
}
}
}

View File

@ -18,6 +18,24 @@
package com.cloudera.sqoop.testutil;
import com.cloudera.sqoop.ConnFactory;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.manager.ConnManager;
import com.cloudera.sqoop.metastore.JobData;
import com.cloudera.sqoop.tool.ImportTool;
import com.google.common.collect.ObjectArrays;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.StringUtils;
import org.apache.log4j.BasicConfigurator;
import org.apache.sqoop.SqoopJobDataPublisher;
import org.junit.After;
import org.junit.Before;
import java.io.File;
import java.io.IOException;
import java.sql.Connection;
@ -26,30 +44,26 @@
import java.sql.SQLException;
import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.StringUtils;
import org.apache.log4j.BasicConfigurator;
import org.junit.After;
import org.junit.Before;
import com.cloudera.sqoop.ConnFactory;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.manager.ConnManager;
import com.cloudera.sqoop.metastore.JobData;
import com.cloudera.sqoop.tool.ImportTool;
import com.google.common.collect.ObjectArrays;
import junit.framework.TestCase;
/**
* Class that implements common methods required for tests.
*/
public abstract class BaseSqoopTestCase extends TestCase {
public static class DummyDataPublisher extends SqoopJobDataPublisher {
public static String hiveTable;
public static String storeTable;
public static String storeType;
public static String operation;
@Override
public void publish(Data data) {
hiveTable = data.getHiveTable();
storeTable = data.getStoreTable();
storeType = data.getStoreType();
operation = data.getOperation();
}
}
public static final Log LOG = LogFactory.getLog(
BaseSqoopTestCase.class.getName());

View File

@ -19,7 +19,6 @@
package org.apache.sqoop;
import com.cloudera.sqoop.hive.HiveImport;
import com.cloudera.sqoop.hive.TestHiveImport;
import com.cloudera.sqoop.testutil.CommonArgs;
import com.cloudera.sqoop.testutil.ImportJobTestCase;
import com.cloudera.sqoop.tool.ImportTool;
@ -36,22 +35,7 @@
public class TestSqoopJobDataPublisher extends ImportJobTestCase {
public static class DummyDataPublisher extends SqoopJobDataPublisher {
private static String hiveTable;
private static String storeTable;
private static String storeType;
@Override
public void publish(SqoopJobDataPublisher.Data data) {
hiveTable = data.getHiveTable();
storeTable = data.getStoreTable();
storeType = data.getStoreType();
assert (data.getOperation().equals("import"));
}
}
public static final Log LOG = LogFactory.getLog(
TestHiveImport.class.getName());
public static final Log LOG = LogFactory.getLog(TestSqoopJobDataPublisher.class.getName());
public void setUp() {
super.setUp();
@ -106,6 +90,7 @@ public void tearDown() {
return args.toArray(new String[0]);
}
private void runImportTest(String tableName, String [] types,
String [] values, String verificationScript, String [] args,
SqoopTool tool) throws IOException {
@ -137,6 +122,7 @@ private com.cloudera.sqoop.SqoopOptions getSqoopOptions(String [] args, SqoopToo
return opts;
}
protected void setNumCols(int numCols) {
String [] cols = new String[numCols];
for (int i = 0; i < numCols; i++) {
@ -159,7 +145,7 @@ public void testNormalHiveImport() throws IOException {
assert (DummyDataPublisher.hiveTable.equals("NORMAL_HIVE_IMPORT"));
assert (DummyDataPublisher.storeTable.equals("NORMAL_HIVE_IMPORT"));
assert (DummyDataPublisher.storeType.equals("hsqldb"));
assert (DummyDataPublisher.operation.equals("import"));
}
}

View File

@ -38,6 +38,7 @@
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.common.type.HiveVarchar;
import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
import org.apache.sqoop.config.ConfigurationConstants;
import org.apache.sqoop.hcat.HCatalogTestUtils.ColumnGenerator;
import org.apache.sqoop.hcat.HCatalogTestUtils.CreateMode;
import org.apache.sqoop.hcat.HCatalogTestUtils.KeyType;
@ -441,4 +442,24 @@ public void testTextFile() throws Exception {
utils.setStorageInfo(HCatalogTestUtils.STORED_AS_TEXT);
runHCatExport(addlArgsArray, TOTAL_RECORDS, table, cols);
}
public void testPublishExportJobData() throws Exception {
final int TOTAL_RECORDS = 1 * 10;
String table = getTableName().toUpperCase();
ColumnGenerator[] cols = new ColumnGenerator[] {
HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
"varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING, 0, 0,
"1", "1", KeyType.STATIC_KEY),
HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
"varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING, 0, 0,
"2", "2", KeyType.DYNAMIC_KEY), };
List<String> addlArgsArray = new ArrayList<String>();
addlArgsArray.add("-D");
addlArgsArray.add(ConfigurationConstants.DATA_PUBLISH_CLASS + "=" + DummyDataPublisher.class.getName());
runHCatExport(addlArgsArray, TOTAL_RECORDS, table, cols);
assert (DummyDataPublisher.storeTable.equals(getTableName()));
assert (DummyDataPublisher.storeType.equals("hsqldb"));
assert (DummyDataPublisher.operation.equals("export"));
}
}

View File

@ -44,6 +44,7 @@
import org.apache.hive.hcatalog.data.HCatRecord;
import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
import org.apache.hive.hcatalog.data.schema.HCatSchema;
import org.apache.sqoop.config.ConfigurationConstants;
import org.apache.sqoop.hcat.HCatalogTestUtils.ColumnGenerator;
import org.apache.sqoop.hcat.HCatalogTestUtils.CreateMode;
import org.apache.sqoop.hcat.HCatalogTestUtils.KeyType;
@ -918,4 +919,49 @@ public void testTableCreationWithNonIdentColChars() throws Exception {
runHCatImport(addlArgsArray, TOTAL_RECORDS, table, cols,
null, true, false);
}
public void testPublishQueryImportData() throws Exception {
final int TOTAL_RECORDS = 1 * 10;
String table = getTableName().toUpperCase();
ColumnGenerator[] cols = new ColumnGenerator[] {
HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
"varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING, 0, 0,
new HiveVarchar("1", 20), "1", KeyType.STATIC_KEY),
HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
"varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING, 0, 0,
new HiveVarchar("2", 20), "2", KeyType.DYNAMIC_KEY),
};
List<String> cfgParams = new ArrayList<String>();
cfgParams.add("-D");
cfgParams.add(ConfigurationConstants.DATA_PUBLISH_CLASS + "=" + DummyDataPublisher.class.getName());
setConfigParams(cfgParams);
runHCatQueryImport(new ArrayList<String>(), TOTAL_RECORDS, table, cols, null);
assert (DummyDataPublisher.storeType.equals("hsqldb"));
assert (DummyDataPublisher.operation.equals("import"));
assert (DummyDataPublisher.storeTable.equals(getTableName()));
}
public void testPublishTableImportData() throws Exception {
final int TOTAL_RECORDS = 1 * 10;
String table = getTableName().toUpperCase();
ColumnGenerator[] cols = new ColumnGenerator[] {
HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
"varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING, 0, 0,
new HiveVarchar("1", 20), "1", KeyType.STATIC_KEY),
HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
"varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING, 0, 0,
new HiveVarchar("2", 20), "2", KeyType.DYNAMIC_KEY),
};
List<String> cfgParams = new ArrayList<String>();
cfgParams.add("-D");
cfgParams.add(ConfigurationConstants.DATA_PUBLISH_CLASS + "=" + DummyDataPublisher.class.getName());
setConfigParams(cfgParams);
List<String> addlArgsArray = new ArrayList<String>();
addlArgsArray.add("--create-hcatalog-table");
setExtraArgs(addlArgsArray);
runHCatImport(addlArgsArray, TOTAL_RECORDS, table, cols, null, true, false);
assert (DummyDataPublisher.storeType.equals("hsqldb"));
assert (DummyDataPublisher.operation.equals("import"));
assert (DummyDataPublisher.storeTable.equals(getTableName()));
}
}