From b9794f98e397065771fb99a30e5c85ac7b84bdd0 Mon Sep 17 00:00:00 2001 From: Venkat Ranganathan Date: Sat, 20 Aug 2016 23:40:28 -0700 Subject: [PATCH] SQOOP-2936: Provide Apache Atlas integration for hcatalog based exports (Balu Vellanki via Venkat Ranganathan) --- .../apache/sqoop/SqoopJobDataPublisher.java | 49 ++++++++++----- .../apache/sqoop/mapreduce/ExportJobBase.java | 42 ++++++++----- .../apache/sqoop/mapreduce/ImportJobBase.java | 30 ++------- .../sqoop/mapreduce/PublishJobData.java | 62 +++++++++++++++++++ .../sqoop/testutil/BaseSqoopTestCase.java | 52 ++++++++++------ .../sqoop/TestSqoopJobDataPublisher.java | 22 ++----- .../apache/sqoop/hcat/HCatalogExportTest.java | 21 +++++++ .../apache/sqoop/hcat/HCatalogImportTest.java | 46 ++++++++++++++ 8 files changed, 233 insertions(+), 91 deletions(-) create mode 100644 src/java/org/apache/sqoop/mapreduce/PublishJobData.java diff --git a/src/java/org/apache/sqoop/SqoopJobDataPublisher.java b/src/java/org/apache/sqoop/SqoopJobDataPublisher.java index d77125f3..93be02fa 100644 --- a/src/java/org/apache/sqoop/SqoopJobDataPublisher.java +++ b/src/java/org/apache/sqoop/SqoopJobDataPublisher.java @@ -19,9 +19,10 @@ package org.apache.sqoop; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.sqoop.mapreduce.ExportJobBase; +import org.apache.sqoop.mapreduce.ImportJobBase; import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities; -import java.io.IOException; import java.util.Properties; /** @@ -94,40 +95,58 @@ private void init(String operation, String url, String user, String storeType, S this.url = url; this.user = user; this.storeType = storeType; - this.storeTable = storeTable; + this.storeTable = (storeTable == null) ? hiveTable : storeTable; this.storeQuery = storeQuery; - this.hiveDB = hiveDB; - if (this.hiveDB == null) { - this.hiveDB = SqoopHCatUtilities.DEFHCATDB; - } + this.hiveDB = (hiveDB == null) ? SqoopHCatUtilities.DEFHCATDB : hiveDB; this.hiveTable = hiveTable; this.commandLineOpts = commandLineOpts; this.startTime = startTime; this.endTime = endTime; } - public Data(String operation, String url, String user, String storeType, String storeTable, - String storeQuery, String hiveDB, String hiveTable, Properties commandLineOpts, - long startTime, long endTime) { + public Data(String operation, String url, String user, String storeType, + String storeTable, String storeQuery, String hiveDB, String hiveTable, + Properties commandLineOpts, long startTime, long endTime) throws Exception{ init(operation, url, user, storeType, storeTable, storeQuery, hiveDB, hiveTable, commandLineOpts, startTime, endTime); } - public Data(SqoopOptions options, String tableName, long startTime, long endTime) throws IOException { - String hiveTableName = options.doHiveImport() ? - options.getHiveTableName() : options.getHCatTableName(); - String hiveDatabase = options.doHiveImport() ? - options.getHiveDatabaseName() : options.getHCatDatabaseName(); + public Data(String operation, SqoopOptions options, String tableName, + long startTime, long endTime) throws Exception { + String hiveTableName = null; + String hiveDatabase = null; + if (ExportJobBase.OPERATION.equals(operation)) { + // export job data + hiveTableName = options.getHCatTableName(); + hiveDatabase = options.getHCatDatabaseName(); + } else if (ImportJobBase.OPERATION.equals(operation)){ + // import job data + hiveTableName = options.doHiveImport() ? + options.getHiveTableName() : options.getHCatTableName(); + hiveDatabase = options.doHiveImport() ? + options.getHiveDatabaseName() : options.getHCatDatabaseName(); + } else { + throw new Exception("Data published for unsupported Operation " + + operation + " in SqoopJobDataPublisher"); + } + String dataStoreType = JDBC_STORE; String[] storeTypeFields = options.getConnectString().split(":"); if (storeTypeFields.length > 2) { dataStoreType = storeTypeFields[1]; } - init("import", options.getConnectString(), UserGroupInformation.getCurrentUser().getShortUserName(), + init(operation, options.getConnectString(), UserGroupInformation.getCurrentUser().getShortUserName(), dataStoreType, tableName, options.getSqlQuery(), hiveDatabase, hiveTableName, options.writeProperties(), startTime, endTime); } + + public String toString() { + return "Operation=" + operation + ", Url=" + url + ", User=" + user + ", StoreType=" + storeType + + ", StoreTable=" + storeTable + ", StoreQuery=" + storeQuery + ", HiveDB=" + hiveDB + + ", HiveTable=" + hiveTable + ", StartTime=" + startTime + ", EndTime=" + endTime + + ", CmdLineArgs=" + commandLineOpts; + } } public void publish(Data data) throws Exception{ diff --git a/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java b/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java index 32de92a2..27f84dad 100644 --- a/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java +++ b/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java @@ -18,10 +18,14 @@ package org.apache.sqoop.mapreduce; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.sql.SQLException; - +import com.cloudera.sqoop.SqoopOptions; +import com.cloudera.sqoop.config.ConfigurationHelper; +import com.cloudera.sqoop.lib.SqoopRecord; +import com.cloudera.sqoop.manager.ConnManager; +import com.cloudera.sqoop.manager.ExportJobContext; +import com.cloudera.sqoop.mapreduce.JobBase; +import com.cloudera.sqoop.orm.TableClassName; +import com.cloudera.sqoop.util.ExportException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -39,18 +43,13 @@ import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities; import org.apache.sqoop.util.LoggingUtils; import org.apache.sqoop.util.PerfCounters; +import org.apache.sqoop.validation.ValidationContext; +import org.apache.sqoop.validation.ValidationException; -import com.cloudera.sqoop.SqoopOptions; -import com.cloudera.sqoop.SqoopOptions.InvalidOptionsException; -import com.cloudera.sqoop.config.ConfigurationHelper; -import com.cloudera.sqoop.lib.SqoopRecord; -import com.cloudera.sqoop.manager.ConnManager; -import com.cloudera.sqoop.manager.ExportJobContext; -import com.cloudera.sqoop.orm.TableClassName; -import com.cloudera.sqoop.mapreduce.JobBase; -import com.cloudera.sqoop.util.ExportException; - -import org.apache.sqoop.validation.*; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.sql.SQLException; +import java.util.Date; /** * Base class for running an export MapReduce job. @@ -94,6 +93,10 @@ public enum FileType { private static final String HADOOP_MAP_TASK_MAX_ATTEMTPS = "mapred.map.max.attempts"; + /** Start and endtime captured for export job. */ + private long startTime; + public static final String OPERATION = "export"; + protected ExportJobContext context; @@ -107,6 +110,7 @@ public ExportJobBase(final ExportJobContext ctxt, final Class outputFormatClass) { super(ctxt.getOptions(), mapperClass, inputFormatClass, outputFormatClass); this.context = ctxt; + this.startTime = new Date().getTime(); } /** @@ -439,12 +443,20 @@ public void runExport() throws ExportException, IOException { setJob(job); boolean success = runJob(job); if (!success) { + LOG.error("Export job failed!"); throw new ExportException("Export job failed!"); } if (options.isValidationEnabled()) { validateExport(tableName, conf, job); } + + if (isHCatJob) { + // Publish export job data for hcat export operation + LOG.info("Publishing HCatalog export job data to Listeners"); + PublishJobData.publishJobData(conf, options, OPERATION, tableName, startTime); + } + } catch (InterruptedException ie) { throw new IOException(ie); } catch (ClassNotFoundException cnfe) { diff --git a/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java b/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java index 9b6e1a02..105917c3 100644 --- a/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java +++ b/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java @@ -27,7 +27,6 @@ import com.cloudera.sqoop.util.ImportException; import org.apache.avro.file.DataFileConstants; import org.apache.avro.mapred.AvroJob; -import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -42,8 +41,6 @@ import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; -import org.apache.sqoop.SqoopJobDataPublisher; -import org.apache.sqoop.config.ConfigurationConstants; import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities; import org.apache.sqoop.util.PerfCounters; import org.apache.sqoop.validation.ValidationContext; @@ -61,7 +58,7 @@ public class ImportJobBase extends JobBase { private ImportJobContext context; private long startTime; - private long endTime; + public static final String OPERATION = "import"; public static final Log LOG = LogFactory.getLog( ImportJobBase.class.getName()); @@ -280,28 +277,13 @@ public void runImport(String tableName, String ormJarFile, String splitByCol, if (options.isValidationEnabled()) { validateImport(tableName, conf, job); } - this.endTime = new Date().getTime(); - String publishClassName = conf.get(ConfigurationConstants.DATA_PUBLISH_CLASS); - if (!StringUtils.isEmpty(publishClassName)) { - try { - Class publishClass = Class.forName(publishClassName); - Object obj = publishClass.newInstance(); - if (obj instanceof SqoopJobDataPublisher) { - SqoopJobDataPublisher publisher = (SqoopJobDataPublisher) obj; - if (options.doHiveImport() || options.getHCatTableName() != null) { - // We need to publish the details - SqoopJobDataPublisher.Data data = - new SqoopJobDataPublisher.Data(options, tableName, startTime, endTime); - publisher.publish(data); - } - } else { - LOG.warn("Publisher class not an instance of SqoopJobDataPublisher. Ignoring..."); - } - } catch (Exception ex) { - LOG.warn("Unable to publish data to publisher " + ex.getMessage(), ex); - } + if (options.doHiveImport() || isHCatJob) { + // Publish data for import job, only hive/hcat import jobs are supported now. + LOG.info("Publishing Hive/Hcat import job data to Listeners for table " + tableName); + PublishJobData.publishJobData(conf, options, OPERATION, tableName, startTime); } + } catch (InterruptedException ie) { throw new IOException(ie); } catch (ClassNotFoundException cnfe) { diff --git a/src/java/org/apache/sqoop/mapreduce/PublishJobData.java b/src/java/org/apache/sqoop/mapreduce/PublishJobData.java new file mode 100644 index 00000000..fc18188f --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/PublishJobData.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.mapreduce; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.sqoop.SqoopJobDataPublisher; +import org.apache.sqoop.SqoopOptions; +import org.apache.sqoop.config.ConfigurationConstants; + +import java.util.Date; + +/** + * Util class to publish job data to listeners. + */ +public final class PublishJobData { + + public static final Log LOG = LogFactory.getLog(PublishJobData.class.getName()); + + private PublishJobData() {} + + public static void publishJobData(Configuration conf, SqoopOptions options, + String operation, String tableName, long startTime) { + // Publish metadata about export job to listeners (if they are registered with sqoop) + long endTime = new Date().getTime(); + String publishClassName = conf.get(ConfigurationConstants.DATA_PUBLISH_CLASS); + if (!StringUtils.isEmpty(publishClassName)) { + try { + Class publishClass = Class.forName(publishClassName); + Object obj = publishClass.newInstance(); + if (obj instanceof SqoopJobDataPublisher) { + SqoopJobDataPublisher publisher = (SqoopJobDataPublisher) obj; + SqoopJobDataPublisher.Data data = + new SqoopJobDataPublisher.Data(operation, options, tableName, startTime, endTime); + LOG.info("Published data is " + data.toString()); + publisher.publish(data); + } else { + LOG.warn("Publisher class not an instance of SqoopJobDataPublisher. Ignoring..."); + } + } catch (Exception ex) { + LOG.warn("Unable to publish " + operation + " data to publisher " + ex.getMessage(), ex); + } + } + } +} diff --git a/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java b/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java index 99e42931..8dddd026 100644 --- a/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java +++ b/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java @@ -18,6 +18,24 @@ package com.cloudera.sqoop.testutil; +import com.cloudera.sqoop.ConnFactory; +import com.cloudera.sqoop.SqoopOptions; +import com.cloudera.sqoop.manager.ConnManager; +import com.cloudera.sqoop.metastore.JobData; +import com.cloudera.sqoop.tool.ImportTool; +import com.google.common.collect.ObjectArrays; +import junit.framework.TestCase; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.StringUtils; +import org.apache.log4j.BasicConfigurator; +import org.apache.sqoop.SqoopJobDataPublisher; +import org.junit.After; +import org.junit.Before; + import java.io.File; import java.io.IOException; import java.sql.Connection; @@ -26,30 +44,26 @@ import java.sql.SQLException; import java.util.Arrays; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.util.StringUtils; -import org.apache.log4j.BasicConfigurator; -import org.junit.After; -import org.junit.Before; - -import com.cloudera.sqoop.ConnFactory; -import com.cloudera.sqoop.SqoopOptions; -import com.cloudera.sqoop.manager.ConnManager; -import com.cloudera.sqoop.metastore.JobData; -import com.cloudera.sqoop.tool.ImportTool; -import com.google.common.collect.ObjectArrays; - -import junit.framework.TestCase; - /** * Class that implements common methods required for tests. */ public abstract class BaseSqoopTestCase extends TestCase { + public static class DummyDataPublisher extends SqoopJobDataPublisher { + public static String hiveTable; + public static String storeTable; + public static String storeType; + public static String operation; + + @Override + public void publish(Data data) { + hiveTable = data.getHiveTable(); + storeTable = data.getStoreTable(); + storeType = data.getStoreType(); + operation = data.getOperation(); + } + } + public static final Log LOG = LogFactory.getLog( BaseSqoopTestCase.class.getName()); diff --git a/src/test/org/apache/sqoop/TestSqoopJobDataPublisher.java b/src/test/org/apache/sqoop/TestSqoopJobDataPublisher.java index 99bcae07..e9698be3 100644 --- a/src/test/org/apache/sqoop/TestSqoopJobDataPublisher.java +++ b/src/test/org/apache/sqoop/TestSqoopJobDataPublisher.java @@ -19,7 +19,6 @@ package org.apache.sqoop; import com.cloudera.sqoop.hive.HiveImport; -import com.cloudera.sqoop.hive.TestHiveImport; import com.cloudera.sqoop.testutil.CommonArgs; import com.cloudera.sqoop.testutil.ImportJobTestCase; import com.cloudera.sqoop.tool.ImportTool; @@ -36,22 +35,7 @@ public class TestSqoopJobDataPublisher extends ImportJobTestCase { - public static class DummyDataPublisher extends SqoopJobDataPublisher { - private static String hiveTable; - private static String storeTable; - private static String storeType; - - @Override - public void publish(SqoopJobDataPublisher.Data data) { - hiveTable = data.getHiveTable(); - storeTable = data.getStoreTable(); - storeType = data.getStoreType(); - assert (data.getOperation().equals("import")); - } - } - - public static final Log LOG = LogFactory.getLog( - TestHiveImport.class.getName()); + public static final Log LOG = LogFactory.getLog(TestSqoopJobDataPublisher.class.getName()); public void setUp() { super.setUp(); @@ -106,6 +90,7 @@ public void tearDown() { return args.toArray(new String[0]); } + private void runImportTest(String tableName, String [] types, String [] values, String verificationScript, String [] args, SqoopTool tool) throws IOException { @@ -137,6 +122,7 @@ private com.cloudera.sqoop.SqoopOptions getSqoopOptions(String [] args, SqoopToo return opts; } + protected void setNumCols(int numCols) { String [] cols = new String[numCols]; for (int i = 0; i < numCols; i++) { @@ -159,7 +145,7 @@ public void testNormalHiveImport() throws IOException { assert (DummyDataPublisher.hiveTable.equals("NORMAL_HIVE_IMPORT")); assert (DummyDataPublisher.storeTable.equals("NORMAL_HIVE_IMPORT")); assert (DummyDataPublisher.storeType.equals("hsqldb")); - + assert (DummyDataPublisher.operation.equals("import")); } } diff --git a/src/test/org/apache/sqoop/hcat/HCatalogExportTest.java b/src/test/org/apache/sqoop/hcat/HCatalogExportTest.java index 8aa0725f..6f87a182 100644 --- a/src/test/org/apache/sqoop/hcat/HCatalogExportTest.java +++ b/src/test/org/apache/sqoop/hcat/HCatalogExportTest.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.common.type.HiveVarchar; import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; +import org.apache.sqoop.config.ConfigurationConstants; import org.apache.sqoop.hcat.HCatalogTestUtils.ColumnGenerator; import org.apache.sqoop.hcat.HCatalogTestUtils.CreateMode; import org.apache.sqoop.hcat.HCatalogTestUtils.KeyType; @@ -441,4 +442,24 @@ public void testTextFile() throws Exception { utils.setStorageInfo(HCatalogTestUtils.STORED_AS_TEXT); runHCatExport(addlArgsArray, TOTAL_RECORDS, table, cols); } + + public void testPublishExportJobData() throws Exception { + final int TOTAL_RECORDS = 1 * 10; + String table = getTableName().toUpperCase(); + ColumnGenerator[] cols = new ColumnGenerator[] { + HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0), + "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING, 0, 0, + "1", "1", KeyType.STATIC_KEY), + HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1), + "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING, 0, 0, + "2", "2", KeyType.DYNAMIC_KEY), }; + + List addlArgsArray = new ArrayList(); + addlArgsArray.add("-D"); + addlArgsArray.add(ConfigurationConstants.DATA_PUBLISH_CLASS + "=" + DummyDataPublisher.class.getName()); + runHCatExport(addlArgsArray, TOTAL_RECORDS, table, cols); + assert (DummyDataPublisher.storeTable.equals(getTableName())); + assert (DummyDataPublisher.storeType.equals("hsqldb")); + assert (DummyDataPublisher.operation.equals("export")); + } } diff --git a/src/test/org/apache/sqoop/hcat/HCatalogImportTest.java b/src/test/org/apache/sqoop/hcat/HCatalogImportTest.java index 67b7a787..fe5295a4 100644 --- a/src/test/org/apache/sqoop/hcat/HCatalogImportTest.java +++ b/src/test/org/apache/sqoop/hcat/HCatalogImportTest.java @@ -44,6 +44,7 @@ import org.apache.hive.hcatalog.data.HCatRecord; import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; import org.apache.hive.hcatalog.data.schema.HCatSchema; +import org.apache.sqoop.config.ConfigurationConstants; import org.apache.sqoop.hcat.HCatalogTestUtils.ColumnGenerator; import org.apache.sqoop.hcat.HCatalogTestUtils.CreateMode; import org.apache.sqoop.hcat.HCatalogTestUtils.KeyType; @@ -918,4 +919,49 @@ public void testTableCreationWithNonIdentColChars() throws Exception { runHCatImport(addlArgsArray, TOTAL_RECORDS, table, cols, null, true, false); } + + public void testPublishQueryImportData() throws Exception { + final int TOTAL_RECORDS = 1 * 10; + String table = getTableName().toUpperCase(); + ColumnGenerator[] cols = new ColumnGenerator[] { + HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0), + "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING, 0, 0, + new HiveVarchar("1", 20), "1", KeyType.STATIC_KEY), + HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1), + "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING, 0, 0, + new HiveVarchar("2", 20), "2", KeyType.DYNAMIC_KEY), + }; + List cfgParams = new ArrayList(); + cfgParams.add("-D"); + cfgParams.add(ConfigurationConstants.DATA_PUBLISH_CLASS + "=" + DummyDataPublisher.class.getName()); + setConfigParams(cfgParams); + runHCatQueryImport(new ArrayList(), TOTAL_RECORDS, table, cols, null); + assert (DummyDataPublisher.storeType.equals("hsqldb")); + assert (DummyDataPublisher.operation.equals("import")); + assert (DummyDataPublisher.storeTable.equals(getTableName())); + } + + public void testPublishTableImportData() throws Exception { + final int TOTAL_RECORDS = 1 * 10; + String table = getTableName().toUpperCase(); + ColumnGenerator[] cols = new ColumnGenerator[] { + HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0), + "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING, 0, 0, + new HiveVarchar("1", 20), "1", KeyType.STATIC_KEY), + HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1), + "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING, 0, 0, + new HiveVarchar("2", 20), "2", KeyType.DYNAMIC_KEY), + }; + List cfgParams = new ArrayList(); + cfgParams.add("-D"); + cfgParams.add(ConfigurationConstants.DATA_PUBLISH_CLASS + "=" + DummyDataPublisher.class.getName()); + setConfigParams(cfgParams); + List addlArgsArray = new ArrayList(); + addlArgsArray.add("--create-hcatalog-table"); + setExtraArgs(addlArgsArray); + runHCatImport(addlArgsArray, TOTAL_RECORDS, table, cols, null, true, false); + assert (DummyDataPublisher.storeType.equals("hsqldb")); + assert (DummyDataPublisher.operation.equals("import")); + assert (DummyDataPublisher.storeTable.equals(getTableName())); + } }