5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-02 23:21:22 +08:00

SQOOP-3376: Test import into external Hive table backed by S3

(Boglarka Egyed via Szabolcs Vasas)
This commit is contained in:
Szabolcs Vasas 2018-10-10 14:41:22 +02:00
parent 932822aa8f
commit 973629912c
3 changed files with 205 additions and 15 deletions

View File

@ -0,0 +1,130 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.s3;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.sqoop.hive.minicluster.HiveMiniCluster;
import org.apache.sqoop.testutil.ArgumentArrayBuilder;
import org.apache.sqoop.testutil.DefaultS3CredentialGenerator;
import org.apache.sqoop.testutil.HiveServer2TestUtil;
import org.apache.sqoop.testutil.ImportJobTestCase;
import org.apache.sqoop.testutil.S3CredentialGenerator;
import org.apache.sqoop.testutil.S3TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import static org.apache.sqoop.tool.BaseSqoopTool.FMT_PARQUETFILE_ARG;
import static org.apache.sqoop.tool.BaseSqoopTool.FMT_TEXTFILE_ARG;
import static org.junit.Assert.assertEquals;
@RunWith(Parameterized.class)
public class TestS3ExternalHiveTableImport extends ImportJobTestCase {
@Parameterized.Parameters(name = "fileFormatArg = {0}, expectedResult = {1}")
public static Iterable<? extends Object> parameters() {
return Arrays.asList(new Object[] {FMT_TEXTFILE_ARG, S3TestUtils.getExpectedTextOutputAsList()},
new Object[] {FMT_PARQUETFILE_ARG, S3TestUtils.getExpectedParquetOutput()});
}
public static final Log LOG = LogFactory.getLog(
TestS3ExternalHiveTableImport.class.getName());
private String fileFormatArg;
private List<String> expectedResult;
public TestS3ExternalHiveTableImport(String fileFormatArg, List<String> expectedResult) {
this.fileFormatArg = fileFormatArg;
this.expectedResult = expectedResult;
}
private static S3CredentialGenerator s3CredentialGenerator;
private FileSystem s3Client;
private HiveMiniCluster hiveMiniCluster;
private static HiveServer2TestUtil hiveServer2TestUtil;
@BeforeClass
public static void setupS3Credentials() throws IOException {
String generatorCommand = S3TestUtils.getGeneratorCommand();
if (generatorCommand != null) {
s3CredentialGenerator = new DefaultS3CredentialGenerator(generatorCommand);
}
}
@Before
public void setup() throws IOException {
S3TestUtils.runTestCaseOnlyIfS3CredentialsAreSet(s3CredentialGenerator);
super.setUp();
S3TestUtils.createTestTableFromInputData(this);
s3Client = S3TestUtils.setupS3ImportTestCase(s3CredentialGenerator);
hiveMiniCluster = S3TestUtils.setupS3ExternalHiveTableImportTestCase(s3CredentialGenerator);
hiveServer2TestUtil = new HiveServer2TestUtil(hiveMiniCluster.getUrl());
}
@After
public void cleanUpTargetDir() {
S3TestUtils.tearDownS3ExternalHiveTableImportTestCase(s3Client);
super.tearDown();
if (hiveMiniCluster != null) {
hiveMiniCluster.stop();
}
}
@Test
public void testS3ImportIntoExternalHiveTable() throws IOException {
String[] args = getExternalHiveTableImportArgs(false);
runImport(args);
List<String> rows = hiveServer2TestUtil.loadCsvRowsFromTable(getTableName());
assertEquals(rows, expectedResult);
}
@Test
public void testS3CreateAndImportIntoExternalHiveTable() throws IOException {
String[] args = getExternalHiveTableImportArgs(true);
runImport(args);
List<String> rows = hiveServer2TestUtil.loadCsvRowsFromTable(S3TestUtils.HIVE_EXTERNAL_TABLE_NAME);
assertEquals(rows, expectedResult);
}
private String[] getExternalHiveTableImportArgs(boolean createHiveTable) {
ArgumentArrayBuilder builder = S3TestUtils.getArgumentArrayBuilderForS3UnitTestsWithFileFormatOption(this,
s3CredentialGenerator, fileFormatArg);
builder = S3TestUtils.addExternalHiveTableImportArgs(builder, hiveMiniCluster.getUrl());
if(createHiveTable) {
builder = S3TestUtils.addCreateHiveTableArgs(builder);
}
return builder.build();
}
}

View File

@ -18,6 +18,7 @@
package org.apache.sqoop.testutil;
import org.apache.commons.lang3.StringUtils;
import org.apache.sqoop.hive.HiveServer2ConnectionFactory;
import java.sql.Connection;
@ -29,6 +30,8 @@
import java.util.LinkedHashMap;
import java.util.List;
import static java.util.stream.Collectors.toList;
public class HiveServer2TestUtil {
private static final String SELECT_TABLE_QUERY = "SELECT * FROM %s";
@ -75,4 +78,10 @@ public List<List<Object>> loadRawRowsFromTable(String tableName) {
return result;
}
public List<String> loadCsvRowsFromTable(String tableName) {
return loadRawRowsFromTable(tableName).stream()
.map(list -> StringUtils.join(list, ","))
.collect(toList());
}
}

View File

@ -24,6 +24,8 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.Constants;
import org.apache.sqoop.hive.minicluster.HiveMiniCluster;
import org.apache.sqoop.hive.minicluster.NoAuthenticationConfiguration;
import org.apache.sqoop.util.FileSystemUtil;
import java.io.IOException;
@ -44,12 +46,16 @@ public class S3TestUtils {
private static final String TEMPORARY_CREDENTIALS_PROVIDER_CLASS = "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider";
private static final String BUCKET_TEMP_TEST_DIR = "/tmp/sqooptest/";
private static final String BUCKET_TEMP_DIR = "/tmp/";
private static final String EXTERNAL_TABLE_DIR = "/externaldir";
private static final String TARGET_DIR_NAME_PREFIX = "/testdir";
private static final String TEMPORARY_ROOTDIR_SUFFIX = "_temprootdir";
public static final String HIVE_EXTERNAL_TABLE_NAME = "test_external_table";
private static String targetDirName = TARGET_DIR_NAME_PREFIX;
private static final String[] COLUMN_NAMES = {"ID", "SUPERHERO", "COMICS", "DEBUT"};
@ -95,15 +101,20 @@ private static String getTargetDirName() {
}
public static Path getTargetDirPath() {
String targetPathString = getBucketTempTestDirPath() + getTargetDirName();
String targetPathString = getBucketTempDirPath() + getTargetDirName();
return new Path(targetPathString);
}
private static Path getBucketTempTestDirPath() {
String targetPathString = getPropertyBucketUrl() + BUCKET_TEMP_TEST_DIR;
private static Path getBucketTempDirPath() {
String targetPathString = getPropertyBucketUrl() + BUCKET_TEMP_DIR;
return new Path(targetPathString);
}
public static Path getExternalTableDirPath() {
String externalTableDir = getBucketTempDirPath() + EXTERNAL_TABLE_DIR;
return new Path(externalTableDir);
}
public static void runTestCaseOnlyIfS3CredentialsAreSet(S3CredentialGenerator s3CredentialGenerator) {
assumeNotNull(s3CredentialGenerator);
assumeNotNull(s3CredentialGenerator.getS3AccessKey());
@ -112,7 +123,9 @@ public static void runTestCaseOnlyIfS3CredentialsAreSet(S3CredentialGenerator s3
public static FileSystem setupS3ImportTestCase(S3CredentialGenerator s3CredentialGenerator) throws IOException {
Configuration hadoopConf = new Configuration();
S3TestUtils.setS3CredentialsInHadoopConf(hadoopConf, s3CredentialGenerator);
setS3CredentialsInConf(hadoopConf, s3CredentialGenerator);
setHadoopConfigParametersForS3UnitTests(hadoopConf);
FileSystem s3Client = FileSystem.get(hadoopConf);
setUniqueTargetDirName();
@ -122,16 +135,20 @@ public static FileSystem setupS3ImportTestCase(S3CredentialGenerator s3Credentia
return s3Client;
}
private static void setS3CredentialsInHadoopConf(Configuration hadoopConf,
S3CredentialGenerator s3CredentialGenerator) {
hadoopConf.set("fs.defaultFS", getPropertyBucketUrl());
hadoopConf.set(Constants.ACCESS_KEY, s3CredentialGenerator.getS3AccessKey());
hadoopConf.set(Constants.SECRET_KEY, s3CredentialGenerator.getS3SecretKey());
public static void setS3CredentialsInConf(Configuration conf,
S3CredentialGenerator s3CredentialGenerator) {
conf.set(Constants.ACCESS_KEY, s3CredentialGenerator.getS3AccessKey());
conf.set(Constants.SECRET_KEY, s3CredentialGenerator.getS3SecretKey());
if (s3CredentialGenerator.getS3SessionToken() != null) {
hadoopConf.set(Constants.SESSION_TOKEN, s3CredentialGenerator.getS3SessionToken());
hadoopConf.set(Constants.AWS_CREDENTIALS_PROVIDER, TEMPORARY_CREDENTIALS_PROVIDER_CLASS);
conf.set(Constants.SESSION_TOKEN, s3CredentialGenerator.getS3SessionToken());
conf.set(Constants.AWS_CREDENTIALS_PROVIDER, TEMPORARY_CREDENTIALS_PROVIDER_CLASS);
}
}
private static void setHadoopConfigParametersForS3UnitTests(Configuration hadoopConf) {
// Default filesystem needs to be set to S3 for the output verification phase
hadoopConf.set("fs.defaultFS", getPropertyBucketUrl());
// FileSystem has a static cache that should be disabled during tests to make sure
// Sqoop relies on the S3 credentials set via the -D system properties.
@ -139,6 +156,14 @@ private static void setS3CredentialsInHadoopConf(Configuration hadoopConf,
hadoopConf.setBoolean("fs.s3a.impl.disable.cache", true);
}
public static HiveMiniCluster setupS3ExternalHiveTableImportTestCase(S3CredentialGenerator s3CredentialGenerator) {
HiveMiniCluster hiveMiniCluster = new HiveMiniCluster(new NoAuthenticationConfiguration());
hiveMiniCluster.start();
S3TestUtils.setS3CredentialsInConf(hiveMiniCluster.getConfig(), s3CredentialGenerator);
return hiveMiniCluster;
}
public static ArgumentArrayBuilder getArgumentArrayBuilderForS3UnitTests(BaseSqoopTestCase testCase,
S3CredentialGenerator s3CredentialGenerator) {
ArgumentArrayBuilder builder = new ArgumentArrayBuilder();
@ -169,6 +194,20 @@ public static String[] getArgsForS3UnitTestsWithFileFormatOption(BaseSqoopTestCa
return builder.build();
}
public static ArgumentArrayBuilder addExternalHiveTableImportArgs(ArgumentArrayBuilder builder,
String hs2Url) {
return builder
.withOption("hive-import")
.withOption("hs2-url", hs2Url)
.withOption("external-table-dir", getExternalTableDirPath().toString());
}
public static ArgumentArrayBuilder addCreateHiveTableArgs(ArgumentArrayBuilder builder) {
return builder
.withOption("create-hive-table")
.withOption("hive-table", HIVE_EXTERNAL_TABLE_NAME);
}
private static Path getTemporaryRootDirPath() {
return new Path(getTargetDirPath().toString() + TEMPORARY_ROOTDIR_SUFFIX);
}
@ -244,6 +283,10 @@ public static String[] getExpectedTextOutput() {
};
}
public static List<String> getExpectedTextOutputAsList() {
return Arrays.asList(getExpectedTextOutput());
}
public static String[] getExpectedExtraTextOutput() {
return new String[] {
"5,Black Widow,Marvel,1964"
@ -352,15 +395,23 @@ public static void cleanUpDirectory(FileSystem s3Client, Path directoryPath) {
}
}
public static void tearDownS3ImportTestCase(FileSystem s3Client) {
private static void cleanUpTargetDir(FileSystem s3Client) {
cleanUpDirectory(s3Client, getTargetDirPath());
resetTargetDirName();
}
public static void tearDownS3ImportTestCase(FileSystem s3Client) {
cleanUpTargetDir(s3Client);
}
public static void tearDownS3IncrementalImportTestCase(FileSystem s3Client) {
cleanUpDirectory(s3Client, getTargetDirPath());
cleanUpTargetDir(s3Client);
cleanUpDirectory(s3Client, getTemporaryRootDirPath());
resetTargetDirName();
System.clearProperty(MAPREDUCE_OUTPUT_BASENAME_PROPERTY);
}
public static void tearDownS3ExternalHiveTableImportTestCase(FileSystem s3Client) {
cleanUpTargetDir(s3Client);
cleanUpDirectory(s3Client, getExternalTableDirPath());
}
}