From 973629912ce9b849228e0bce94e245f016dd6101 Mon Sep 17 00:00:00 2001 From: Szabolcs Vasas Date: Wed, 10 Oct 2018 14:41:22 +0200 Subject: [PATCH] SQOOP-3376: Test import into external Hive table backed by S3 (Boglarka Egyed via Szabolcs Vasas) --- .../s3/TestS3ExternalHiveTableImport.java | 130 ++++++++++++++++++ .../sqoop/testutil/HiveServer2TestUtil.java | 9 ++ .../apache/sqoop/testutil/S3TestUtils.java | 81 +++++++++-- 3 files changed, 205 insertions(+), 15 deletions(-) create mode 100644 src/test/org/apache/sqoop/s3/TestS3ExternalHiveTableImport.java diff --git a/src/test/org/apache/sqoop/s3/TestS3ExternalHiveTableImport.java b/src/test/org/apache/sqoop/s3/TestS3ExternalHiveTableImport.java new file mode 100644 index 00000000..0c3161e5 --- /dev/null +++ b/src/test/org/apache/sqoop/s3/TestS3ExternalHiveTableImport.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.s3; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.sqoop.hive.minicluster.HiveMiniCluster; +import org.apache.sqoop.testutil.ArgumentArrayBuilder; +import org.apache.sqoop.testutil.DefaultS3CredentialGenerator; +import org.apache.sqoop.testutil.HiveServer2TestUtil; +import org.apache.sqoop.testutil.ImportJobTestCase; +import org.apache.sqoop.testutil.S3CredentialGenerator; +import org.apache.sqoop.testutil.S3TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +import static org.apache.sqoop.tool.BaseSqoopTool.FMT_PARQUETFILE_ARG; +import static org.apache.sqoop.tool.BaseSqoopTool.FMT_TEXTFILE_ARG; +import static org.junit.Assert.assertEquals; + +@RunWith(Parameterized.class) +public class TestS3ExternalHiveTableImport extends ImportJobTestCase { + + @Parameterized.Parameters(name = "fileFormatArg = {0}, expectedResult = {1}") + public static Iterable parameters() { + return Arrays.asList(new Object[] {FMT_TEXTFILE_ARG, S3TestUtils.getExpectedTextOutputAsList()}, + new Object[] {FMT_PARQUETFILE_ARG, S3TestUtils.getExpectedParquetOutput()}); + } + + public static final Log LOG = LogFactory.getLog( + TestS3ExternalHiveTableImport.class.getName()); + + private String fileFormatArg; + + private List expectedResult; + + public TestS3ExternalHiveTableImport(String fileFormatArg, List expectedResult) { + this.fileFormatArg = fileFormatArg; + this.expectedResult = expectedResult; + } + + private static S3CredentialGenerator s3CredentialGenerator; + + private FileSystem s3Client; + + private HiveMiniCluster hiveMiniCluster; + + private static HiveServer2TestUtil hiveServer2TestUtil; + + @BeforeClass + public static void setupS3Credentials() throws IOException { + String generatorCommand = S3TestUtils.getGeneratorCommand(); + if (generatorCommand != null) { + s3CredentialGenerator = new DefaultS3CredentialGenerator(generatorCommand); + } + } + + @Before + public void setup() throws IOException { + S3TestUtils.runTestCaseOnlyIfS3CredentialsAreSet(s3CredentialGenerator); + super.setUp(); + S3TestUtils.createTestTableFromInputData(this); + s3Client = S3TestUtils.setupS3ImportTestCase(s3CredentialGenerator); + hiveMiniCluster = S3TestUtils.setupS3ExternalHiveTableImportTestCase(s3CredentialGenerator); + hiveServer2TestUtil = new HiveServer2TestUtil(hiveMiniCluster.getUrl()); + } + + @After + public void cleanUpTargetDir() { + S3TestUtils.tearDownS3ExternalHiveTableImportTestCase(s3Client); + super.tearDown(); + if (hiveMiniCluster != null) { + hiveMiniCluster.stop(); + } + } + + @Test + public void testS3ImportIntoExternalHiveTable() throws IOException { + String[] args = getExternalHiveTableImportArgs(false); + runImport(args); + + List rows = hiveServer2TestUtil.loadCsvRowsFromTable(getTableName()); + assertEquals(rows, expectedResult); + } + + @Test + public void testS3CreateAndImportIntoExternalHiveTable() throws IOException { + String[] args = getExternalHiveTableImportArgs(true); + runImport(args); + + List rows = hiveServer2TestUtil.loadCsvRowsFromTable(S3TestUtils.HIVE_EXTERNAL_TABLE_NAME); + assertEquals(rows, expectedResult); + } + + private String[] getExternalHiveTableImportArgs(boolean createHiveTable) { + ArgumentArrayBuilder builder = S3TestUtils.getArgumentArrayBuilderForS3UnitTestsWithFileFormatOption(this, + s3CredentialGenerator, fileFormatArg); + builder = S3TestUtils.addExternalHiveTableImportArgs(builder, hiveMiniCluster.getUrl()); + if(createHiveTable) { + builder = S3TestUtils.addCreateHiveTableArgs(builder); + } + return builder.build(); + } + +} diff --git a/src/test/org/apache/sqoop/testutil/HiveServer2TestUtil.java b/src/test/org/apache/sqoop/testutil/HiveServer2TestUtil.java index 79937081..c0689e63 100644 --- a/src/test/org/apache/sqoop/testutil/HiveServer2TestUtil.java +++ b/src/test/org/apache/sqoop/testutil/HiveServer2TestUtil.java @@ -18,6 +18,7 @@ package org.apache.sqoop.testutil; +import org.apache.commons.lang3.StringUtils; import org.apache.sqoop.hive.HiveServer2ConnectionFactory; import java.sql.Connection; @@ -29,6 +30,8 @@ import java.util.LinkedHashMap; import java.util.List; +import static java.util.stream.Collectors.toList; + public class HiveServer2TestUtil { private static final String SELECT_TABLE_QUERY = "SELECT * FROM %s"; @@ -75,4 +78,10 @@ public List> loadRawRowsFromTable(String tableName) { return result; } + public List loadCsvRowsFromTable(String tableName) { + return loadRawRowsFromTable(tableName).stream() + .map(list -> StringUtils.join(list, ",")) + .collect(toList()); + } + } diff --git a/src/test/org/apache/sqoop/testutil/S3TestUtils.java b/src/test/org/apache/sqoop/testutil/S3TestUtils.java index 0e6ef5bf..c9d17bc7 100644 --- a/src/test/org/apache/sqoop/testutil/S3TestUtils.java +++ b/src/test/org/apache/sqoop/testutil/S3TestUtils.java @@ -24,6 +24,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.Constants; +import org.apache.sqoop.hive.minicluster.HiveMiniCluster; +import org.apache.sqoop.hive.minicluster.NoAuthenticationConfiguration; import org.apache.sqoop.util.FileSystemUtil; import java.io.IOException; @@ -44,12 +46,16 @@ public class S3TestUtils { private static final String TEMPORARY_CREDENTIALS_PROVIDER_CLASS = "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider"; - private static final String BUCKET_TEMP_TEST_DIR = "/tmp/sqooptest/"; + private static final String BUCKET_TEMP_DIR = "/tmp/"; + + private static final String EXTERNAL_TABLE_DIR = "/externaldir"; private static final String TARGET_DIR_NAME_PREFIX = "/testdir"; private static final String TEMPORARY_ROOTDIR_SUFFIX = "_temprootdir"; + public static final String HIVE_EXTERNAL_TABLE_NAME = "test_external_table"; + private static String targetDirName = TARGET_DIR_NAME_PREFIX; private static final String[] COLUMN_NAMES = {"ID", "SUPERHERO", "COMICS", "DEBUT"}; @@ -95,15 +101,20 @@ private static String getTargetDirName() { } public static Path getTargetDirPath() { - String targetPathString = getBucketTempTestDirPath() + getTargetDirName(); + String targetPathString = getBucketTempDirPath() + getTargetDirName(); return new Path(targetPathString); } - private static Path getBucketTempTestDirPath() { - String targetPathString = getPropertyBucketUrl() + BUCKET_TEMP_TEST_DIR; + private static Path getBucketTempDirPath() { + String targetPathString = getPropertyBucketUrl() + BUCKET_TEMP_DIR; return new Path(targetPathString); } + public static Path getExternalTableDirPath() { + String externalTableDir = getBucketTempDirPath() + EXTERNAL_TABLE_DIR; + return new Path(externalTableDir); + } + public static void runTestCaseOnlyIfS3CredentialsAreSet(S3CredentialGenerator s3CredentialGenerator) { assumeNotNull(s3CredentialGenerator); assumeNotNull(s3CredentialGenerator.getS3AccessKey()); @@ -112,7 +123,9 @@ public static void runTestCaseOnlyIfS3CredentialsAreSet(S3CredentialGenerator s3 public static FileSystem setupS3ImportTestCase(S3CredentialGenerator s3CredentialGenerator) throws IOException { Configuration hadoopConf = new Configuration(); - S3TestUtils.setS3CredentialsInHadoopConf(hadoopConf, s3CredentialGenerator); + setS3CredentialsInConf(hadoopConf, s3CredentialGenerator); + setHadoopConfigParametersForS3UnitTests(hadoopConf); + FileSystem s3Client = FileSystem.get(hadoopConf); setUniqueTargetDirName(); @@ -122,16 +135,20 @@ public static FileSystem setupS3ImportTestCase(S3CredentialGenerator s3Credentia return s3Client; } - private static void setS3CredentialsInHadoopConf(Configuration hadoopConf, - S3CredentialGenerator s3CredentialGenerator) { - hadoopConf.set("fs.defaultFS", getPropertyBucketUrl()); - hadoopConf.set(Constants.ACCESS_KEY, s3CredentialGenerator.getS3AccessKey()); - hadoopConf.set(Constants.SECRET_KEY, s3CredentialGenerator.getS3SecretKey()); + public static void setS3CredentialsInConf(Configuration conf, + S3CredentialGenerator s3CredentialGenerator) { + conf.set(Constants.ACCESS_KEY, s3CredentialGenerator.getS3AccessKey()); + conf.set(Constants.SECRET_KEY, s3CredentialGenerator.getS3SecretKey()); if (s3CredentialGenerator.getS3SessionToken() != null) { - hadoopConf.set(Constants.SESSION_TOKEN, s3CredentialGenerator.getS3SessionToken()); - hadoopConf.set(Constants.AWS_CREDENTIALS_PROVIDER, TEMPORARY_CREDENTIALS_PROVIDER_CLASS); + conf.set(Constants.SESSION_TOKEN, s3CredentialGenerator.getS3SessionToken()); + conf.set(Constants.AWS_CREDENTIALS_PROVIDER, TEMPORARY_CREDENTIALS_PROVIDER_CLASS); } + } + + private static void setHadoopConfigParametersForS3UnitTests(Configuration hadoopConf) { + // Default filesystem needs to be set to S3 for the output verification phase + hadoopConf.set("fs.defaultFS", getPropertyBucketUrl()); // FileSystem has a static cache that should be disabled during tests to make sure // Sqoop relies on the S3 credentials set via the -D system properties. @@ -139,6 +156,14 @@ private static void setS3CredentialsInHadoopConf(Configuration hadoopConf, hadoopConf.setBoolean("fs.s3a.impl.disable.cache", true); } + public static HiveMiniCluster setupS3ExternalHiveTableImportTestCase(S3CredentialGenerator s3CredentialGenerator) { + HiveMiniCluster hiveMiniCluster = new HiveMiniCluster(new NoAuthenticationConfiguration()); + hiveMiniCluster.start(); + S3TestUtils.setS3CredentialsInConf(hiveMiniCluster.getConfig(), s3CredentialGenerator); + + return hiveMiniCluster; + } + public static ArgumentArrayBuilder getArgumentArrayBuilderForS3UnitTests(BaseSqoopTestCase testCase, S3CredentialGenerator s3CredentialGenerator) { ArgumentArrayBuilder builder = new ArgumentArrayBuilder(); @@ -169,6 +194,20 @@ public static String[] getArgsForS3UnitTestsWithFileFormatOption(BaseSqoopTestCa return builder.build(); } + public static ArgumentArrayBuilder addExternalHiveTableImportArgs(ArgumentArrayBuilder builder, + String hs2Url) { + return builder + .withOption("hive-import") + .withOption("hs2-url", hs2Url) + .withOption("external-table-dir", getExternalTableDirPath().toString()); + } + + public static ArgumentArrayBuilder addCreateHiveTableArgs(ArgumentArrayBuilder builder) { + return builder + .withOption("create-hive-table") + .withOption("hive-table", HIVE_EXTERNAL_TABLE_NAME); + } + private static Path getTemporaryRootDirPath() { return new Path(getTargetDirPath().toString() + TEMPORARY_ROOTDIR_SUFFIX); } @@ -244,6 +283,10 @@ public static String[] getExpectedTextOutput() { }; } + public static List getExpectedTextOutputAsList() { + return Arrays.asList(getExpectedTextOutput()); + } + public static String[] getExpectedExtraTextOutput() { return new String[] { "5,Black Widow,Marvel,1964" @@ -352,15 +395,23 @@ public static void cleanUpDirectory(FileSystem s3Client, Path directoryPath) { } } - public static void tearDownS3ImportTestCase(FileSystem s3Client) { + private static void cleanUpTargetDir(FileSystem s3Client) { cleanUpDirectory(s3Client, getTargetDirPath()); resetTargetDirName(); } + public static void tearDownS3ImportTestCase(FileSystem s3Client) { + cleanUpTargetDir(s3Client); + } + public static void tearDownS3IncrementalImportTestCase(FileSystem s3Client) { - cleanUpDirectory(s3Client, getTargetDirPath()); + cleanUpTargetDir(s3Client); cleanUpDirectory(s3Client, getTemporaryRootDirPath()); - resetTargetDirName(); System.clearProperty(MAPREDUCE_OUTPUT_BASENAME_PROPERTY); } + + public static void tearDownS3ExternalHiveTableImportTestCase(FileSystem s3Client) { + cleanUpTargetDir(s3Client); + cleanUpDirectory(s3Client, getExternalTableDirPath()); + } }