5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-02 20:21:30 +08:00

SQOOP-3363: Test incremental import with S3

(Boglarka Egyed via Szabolcs Vasas)
This commit is contained in:
Szabolcs Vasas 2018-08-28 14:17:55 +02:00
parent 1f73341070
commit 816146df56
18 changed files with 1298 additions and 126 deletions

View File

@ -44,6 +44,7 @@ public class AppendUtils {
private static final String FILEEXT_SEPARATOR = ".";
public static final String DATA_PART_PATTERN_PREFIX = "part";
public static final String MAPREDUCE_OUTPUT_BASENAME_PROPERTY = "mapreduce.output.basename";
private ImportJobContext context = null;
@ -285,7 +286,7 @@ public static Path getTempAppendDir(String salt, SqoopOptions options) {
* @return Pattern
*/
private Pattern getDataFileNamePattern() {
String prefix = context.getOptions().getConf().get("mapreduce.output.basename");
String prefix = context.getOptions().getConf().get(MAPREDUCE_OUTPUT_BASENAME_PROPERTY);
if(null == prefix || prefix.length() == 0) {
prefix = DATA_PART_PATTERN_PREFIX;

View File

@ -62,4 +62,15 @@ public static List<Path> listFiles(Path path, Configuration conf) throws IOExcep
}
return result;
}
public static List<Path> findFilesWithPathContainingPattern(Path path, Configuration conf, String pattern) throws IOException {
List<Path> result = new ArrayList<>();
List<Path> filePaths = listFiles(path, conf);
for (Path filePath : filePaths) {
if (filePath.toString().contains(pattern)) {
result.add(filePath);
}
}
return result;
}
}

View File

@ -43,6 +43,7 @@
import org.apache.sqoop.util.AppendUtils;
import org.junit.Test;
import static org.apache.sqoop.util.AppendUtils.MAPREDUCE_OUTPUT_BASENAME_PROPERTY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@ -330,7 +331,7 @@ public void testAppendWithMapreduceOutputBasename() throws IOException {
ArrayList<String> args = new ArrayList<>();
args.add("-D");
args.add("mapreduce.output.basename=" + prefix);
args.add(MAPREDUCE_OUTPUT_BASENAME_PROPERTY + "=" + prefix);
args.addAll(getOutputlessArgv(false, true, HsqldbTestServer.getFieldNames(), getConf()));
String targetDir = getWarehouseDir() + "/tempTargetDirOutputBaseNameTest";
args.add("--target-dir");

View File

@ -60,52 +60,50 @@ public static void setupS3Credentials() throws IOException {
public void setup() throws IOException {
S3TestUtils.runTestCaseOnlyIfS3CredentialsAreSet(s3CredentialGenerator);
super.setUp();
s3Client = S3TestUtils.setupS3ImportTestCase(s3CredentialGenerator, this);
S3TestUtils.createTestTableFromInputData(this);
s3Client = S3TestUtils.setupS3ImportTestCase(s3CredentialGenerator);
}
@After
public void clearOutputDir() throws IOException {
S3TestUtils.clearTargetDir(s3Client);
S3TestUtils.resetTargetDirName();
public void cleanUpTargetDir() {
S3TestUtils.tearDownS3ImportTestCase(s3Client);
super.tearDown();
}
protected ArgumentArrayBuilder getArgumentArrayBuilder() {
ArgumentArrayBuilder builder = S3TestUtils.getArgumentArrayBuilderForUnitTests(this, s3CredentialGenerator);
return builder;
}
@Test
public void testImportAsAvroDataFileWithoutDeleteTargetDirOptionWhenTargetDirDoesNotExist() throws IOException {
ArgumentArrayBuilder builder = getArgumentArrayBuilder();
builder.withOption("as-avrodatafile");
String[] args = builder.build();
public void testS3ImportAsAvroDataFileWithoutDeleteTargetDirOptionWhenTargetDirDoesNotExist() throws IOException {
String[] args = getArgsWithAsAvroDataFileOption();
runImport(args);
AvroTestUtils.verify(S3TestUtils.getExpectedAvroOutput(), s3Client.getConf(), S3TestUtils.getTargetDirPath());
}
@Test
public void testImportAsAvroDataFileWithDeleteTargetDirOptionWhenTargetDirAlreadyExists() throws IOException {
ArgumentArrayBuilder builder = getArgumentArrayBuilder();
builder.withOption("as-avrodatafile");
builder.withOption("delete-target-dir");
String[] args = builder.build();
public void testS3ImportAsAvroDataFileWithDeleteTargetDirOptionWhenTargetDirAlreadyExists() throws IOException {
String[] args = getArgsWithAsAvroDataFileOption();
runImport(args);
args = getArgsWithAsAvroDataFileAndDeleteTargetDirOption();
runImport(args);
AvroTestUtils.verify(S3TestUtils.getExpectedAvroOutput(), s3Client.getConf(), S3TestUtils.getTargetDirPath());
runImport(args);
}
@Test
public void testImportAsAvroDataFileWithoutDeleteTargetDirOptionWhenTargetDirAlreadyExists() throws IOException {
ArgumentArrayBuilder builder = getArgumentArrayBuilder();
builder.withOption("as-avrodatafile");
String[] args = builder.build();
public void testS3ImportAsAvroDataFileWithoutDeleteTargetDirOptionWhenTargetDirAlreadyExists() throws IOException {
String[] args = getArgsWithAsAvroDataFileOption();
runImport(args);
AvroTestUtils.verify(S3TestUtils.getExpectedAvroOutput(), s3Client.getConf(), S3TestUtils.getTargetDirPath());
thrown.expect(IOException.class);
runImport(args);
}
private String[] getArgsWithAsAvroDataFileOption() {
return S3TestUtils.getArgsForS3UnitTestsWithFileFormatOption(this, s3CredentialGenerator, "as-avrodatafile");
}
private String[] getArgsWithAsAvroDataFileAndDeleteTargetDirOption() {
ArgumentArrayBuilder builder = S3TestUtils.getArgumentArrayBuilderForS3UnitTestsWithFileFormatOption(this,
s3CredentialGenerator,"as-avrodatafile");
builder.withOption("delete-target-dir");
return builder.build();
}
}

View File

@ -0,0 +1,130 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.s3;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.sqoop.testutil.ArgumentArrayBuilder;
import org.apache.sqoop.testutil.AvroTestUtils;
import org.apache.sqoop.testutil.DefaultS3CredentialGenerator;
import org.apache.sqoop.testutil.ImportJobTestCase;
import org.apache.sqoop.testutil.S3CredentialGenerator;
import org.apache.sqoop.testutil.S3TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.IOException;
import static org.apache.sqoop.util.AppendUtils.MAPREDUCE_OUTPUT_BASENAME_PROPERTY;
public class TestS3IncrementalAppendAvroImport extends ImportJobTestCase {
public static final Log LOG = LogFactory.getLog(
TestS3IncrementalAppendAvroImport.class.getName());
private static S3CredentialGenerator s3CredentialGenerator;
private FileSystem s3Client;
@Rule
public ExpectedException thrown = ExpectedException.none();
@BeforeClass
public static void setupS3Credentials() throws IOException {
String generatorCommand = S3TestUtils.getGeneratorCommand();
if (generatorCommand != null) {
s3CredentialGenerator = new DefaultS3CredentialGenerator(generatorCommand);
}
}
@Before
public void setup() throws IOException {
S3TestUtils.runTestCaseOnlyIfS3CredentialsAreSet(s3CredentialGenerator);
super.setUp();
S3TestUtils.createTestTableFromInputData(this);
s3Client = S3TestUtils.setupS3ImportTestCase(s3CredentialGenerator);
}
@After
public void cleanUpOutputDirectories() {
S3TestUtils.tearDownS3IncrementalImportTestCase(s3Client);
super.tearDown();
}
@Test
public void testS3IncrementalAppendAsAvroDataFileWhenNoNewRowIsImported() throws IOException {
String[] args = getArgsWithAsAvroDataFileOption(false);
runImport(args);
args = getIncrementalAppendArgsWithAsAvroDataFileOption(false);
runImport(args);
S3TestUtils.failIfOutputFilePathContainingPatternExists(s3Client, MAP_OUTPUT_FILE_00001);
}
@Test
public void testS3IncrementalAppendAsAvroDataFile() throws IOException {
String[] args = getArgsWithAsAvroDataFileOption(false);
runImport(args);
S3TestUtils.insertInputDataIntoTable(this, S3TestUtils.getExtraInputData());
args = getIncrementalAppendArgsWithAsAvroDataFileOption(false);
runImport(args);
AvroTestUtils.verify(S3TestUtils.getExpectedExtraAvroOutput(), s3Client.getConf(), S3TestUtils.getTargetDirPath(), MAP_OUTPUT_FILE_00001);
}
@Test
public void testS3IncrementalAppendAsAvroDataFileWithMapreduceOutputBasenameProperty() throws IOException {
String[] args = getArgsWithAsAvroDataFileOption(true);
runImport(args);
S3TestUtils.insertInputDataIntoTable(this, S3TestUtils.getExtraInputData());
args = getIncrementalAppendArgsWithAsAvroDataFileOption(true);
runImport(args);
AvroTestUtils.verify(S3TestUtils.getExpectedExtraAvroOutput(), s3Client.getConf(), S3TestUtils.getTargetDirPath(), S3TestUtils.CUSTOM_MAP_OUTPUT_FILE_00001);
}
private String[] getArgsWithAsAvroDataFileOption(boolean withMapreduceOutputBasenameProperty) {
ArgumentArrayBuilder builder = S3TestUtils.getArgumentArrayBuilderForS3UnitTestsWithFileFormatOption(this,
s3CredentialGenerator,"as-avrodatafile");
if (withMapreduceOutputBasenameProperty) {
builder.withProperty(MAPREDUCE_OUTPUT_BASENAME_PROPERTY, S3TestUtils.MAPREDUCE_OUTPUT_BASENAME);
}
return builder.build();
}
private String[] getIncrementalAppendArgsWithAsAvroDataFileOption(boolean withMapreduceOutputBasenameProperty) {
ArgumentArrayBuilder builder = S3TestUtils.getArgumentArrayBuilderForS3UnitTestsWithFileFormatOption(this,
s3CredentialGenerator,"as-avrodatafile");
builder = S3TestUtils.addIncrementalAppendImportArgs(builder);
if (withMapreduceOutputBasenameProperty) {
builder.withProperty(MAPREDUCE_OUTPUT_BASENAME_PROPERTY, S3TestUtils.MAPREDUCE_OUTPUT_BASENAME);
}
return builder.build();
}
}

View File

@ -0,0 +1,137 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.s3;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.sqoop.testutil.ArgumentArrayBuilder;
import org.apache.sqoop.testutil.DefaultS3CredentialGenerator;
import org.apache.sqoop.testutil.ImportJobTestCase;
import org.apache.sqoop.testutil.S3CredentialGenerator;
import org.apache.sqoop.testutil.S3TestUtils;
import org.apache.sqoop.util.ParquetReader;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.util.List;
import static org.apache.sqoop.util.AppendUtils.MAPREDUCE_OUTPUT_BASENAME_PROPERTY;
import static org.junit.Assert.assertEquals;
public class TestS3IncrementalAppendParquetImport extends ImportJobTestCase {
public static final Log LOG = LogFactory.getLog(
TestS3IncrementalAppendParquetImport.class.getName());
private static S3CredentialGenerator s3CredentialGenerator;
private FileSystem s3Client;
@Rule
public ExpectedException thrown = ExpectedException.none();
@BeforeClass
public static void setupS3Credentials() throws IOException {
String generatorCommand = S3TestUtils.getGeneratorCommand();
if (generatorCommand != null) {
s3CredentialGenerator = new DefaultS3CredentialGenerator(generatorCommand);
}
}
@Before
public void setup() throws IOException {
S3TestUtils.runTestCaseOnlyIfS3CredentialsAreSet(s3CredentialGenerator);
super.setUp();
S3TestUtils.createTestTableFromInputData(this);
s3Client = S3TestUtils.setupS3ImportTestCase(s3CredentialGenerator);
}
@After
public void cleanUpOutputDirectories() {
S3TestUtils.tearDownS3IncrementalImportTestCase(s3Client);
super.tearDown();
}
@Test
public void testS3IncrementalAppendAsParquetFileWhenNoNewRowIsImported() throws Exception {
String[] args = getArgsWithAsParquetFileOption(false);
runImport(args);
args = getIncrementalAppendArgsWithAsParquetFileOption(false);
runImport(args);
List<String> result = new ParquetReader(S3TestUtils.getTargetDirPath(), s3Client.getConf()).readAllInCsvSorted();
assertEquals(S3TestUtils.getExpectedParquetOutput(), result);
}
@Test
public void testS3IncrementalAppendAsParquetFile() throws Exception {
String[] args = getArgsWithAsParquetFileOption(false);
runImport(args);
S3TestUtils.insertInputDataIntoTable(this, S3TestUtils.getExtraInputData());
args = getIncrementalAppendArgsWithAsParquetFileOption(false);
runImport(args);
List<String> result = new ParquetReader(S3TestUtils.getTargetDirPath(), s3Client.getConf()).readAllInCsvSorted();
assertEquals(S3TestUtils.getExpectedParquetOutputAfterAppend(), result);
}
@Test
public void testS3IncrementalAppendAsParquetFileWithMapreduceOutputBasenameProperty() throws Exception {
String[] args = getArgsWithAsParquetFileOption(true);
runImport(args);
S3TestUtils.insertInputDataIntoTable(this, S3TestUtils.getExtraInputData());
args = getIncrementalAppendArgsWithAsParquetFileOption(true);
runImport(args);
S3TestUtils.failIfOutputFilePathContainingPatternDoesNotExists(s3Client, S3TestUtils.MAPREDUCE_OUTPUT_BASENAME);
List<String> result = new ParquetReader(S3TestUtils.getTargetDirPath(), s3Client.getConf()).readAllInCsvSorted();
assertEquals(S3TestUtils.getExpectedParquetOutputAfterAppend(), result);
}
private String[] getArgsWithAsParquetFileOption(boolean withMapreduceOutputBasenameProperty) {
ArgumentArrayBuilder builder = S3TestUtils.getArgumentArrayBuilderForS3UnitTestsWithFileFormatOption(this,
s3CredentialGenerator,"as-parquetfile");
if (withMapreduceOutputBasenameProperty) {
builder.withProperty(MAPREDUCE_OUTPUT_BASENAME_PROPERTY, S3TestUtils.MAPREDUCE_OUTPUT_BASENAME);
}
return builder.build();
}
private String[] getIncrementalAppendArgsWithAsParquetFileOption(boolean withMapreduceOutputBasenameProperty) {
ArgumentArrayBuilder builder = S3TestUtils.getArgumentArrayBuilderForS3UnitTestsWithFileFormatOption(this,
s3CredentialGenerator,"as-parquetfile");
builder = S3TestUtils.addIncrementalAppendImportArgs(builder);
if (withMapreduceOutputBasenameProperty) {
builder.withProperty(MAPREDUCE_OUTPUT_BASENAME_PROPERTY, S3TestUtils.MAPREDUCE_OUTPUT_BASENAME);
}
return builder.build();
}
}

View File

@ -0,0 +1,130 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.s3;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.sqoop.testutil.ArgumentArrayBuilder;
import org.apache.sqoop.testutil.DefaultS3CredentialGenerator;
import org.apache.sqoop.testutil.ImportJobTestCase;
import org.apache.sqoop.testutil.S3CredentialGenerator;
import org.apache.sqoop.testutil.S3TestUtils;
import org.apache.sqoop.testutil.SequenceFileTestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.IOException;
import static org.apache.sqoop.util.AppendUtils.MAPREDUCE_OUTPUT_BASENAME_PROPERTY;
public class TestS3IncrementalAppendSequenceFileImport extends ImportJobTestCase {
public static final Log LOG = LogFactory.getLog(
TestS3IncrementalAppendSequenceFileImport.class.getName());
private static S3CredentialGenerator s3CredentialGenerator;
private FileSystem s3Client;
@Rule
public ExpectedException thrown = ExpectedException.none();
@BeforeClass
public static void setupS3Credentials() throws IOException {
String generatorCommand = S3TestUtils.getGeneratorCommand();
if (generatorCommand != null) {
s3CredentialGenerator = new DefaultS3CredentialGenerator(generatorCommand);
}
}
@Before
public void setup() throws IOException {
S3TestUtils.runTestCaseOnlyIfS3CredentialsAreSet(s3CredentialGenerator);
super.setUp();
S3TestUtils.createTestTableFromInputData(this);
s3Client = S3TestUtils.setupS3ImportTestCase(s3CredentialGenerator);
}
@After
public void cleanUpOutputDirectories() {
S3TestUtils.tearDownS3IncrementalImportTestCase(s3Client);
super.tearDown();
}
@Test
public void testS3IncrementalAppendAsSequenceFileWhenNoNewRowIsImported() throws Exception {
String[] args = getArgsWithAsSequenceFileOption(false);
runImport(args);
args = getIncrementalAppendArgsWithAsSequenceFileOption(false);
runImport(args);
S3TestUtils.failIfOutputFilePathContainingPatternExists(s3Client, MAP_OUTPUT_FILE_00001);
}
@Test
public void testS3IncrementalAppendAsSequenceFile() throws Exception {
String[] args = getArgsWithAsSequenceFileOption(false);
runImport(args);
S3TestUtils.insertInputDataIntoTable(this, S3TestUtils.getExtraInputData());
args = getIncrementalAppendArgsWithAsSequenceFileOption(false);
runImport(args);
SequenceFileTestUtils.verify(this, S3TestUtils.getExpectedExtraSequenceFileOutput(), s3Client, S3TestUtils.getTargetDirPath(), MAP_OUTPUT_FILE_00001);
}
@Test
public void testS3IncrementalAppendAsSequenceFileWithMapreduceOutputBasenameProperty() throws Exception {
String[] args = getArgsWithAsSequenceFileOption(true);
runImport(args);
S3TestUtils.insertInputDataIntoTable(this, S3TestUtils.getExtraInputData());
args = getIncrementalAppendArgsWithAsSequenceFileOption(true);
runImport(args);
SequenceFileTestUtils.verify(this, S3TestUtils.getExpectedExtraSequenceFileOutput(), s3Client, S3TestUtils.getTargetDirPath(), S3TestUtils.CUSTOM_MAP_OUTPUT_FILE_00001);
}
private String[] getArgsWithAsSequenceFileOption(boolean withMapreduceOutputBasenameProperty) {
ArgumentArrayBuilder builder = S3TestUtils.getArgumentArrayBuilderForS3UnitTestsWithFileFormatOption(this,
s3CredentialGenerator,"as-sequencefile");
if (withMapreduceOutputBasenameProperty) {
builder.withProperty(MAPREDUCE_OUTPUT_BASENAME_PROPERTY, S3TestUtils.MAPREDUCE_OUTPUT_BASENAME);
}
return builder.build();
}
private String[] getIncrementalAppendArgsWithAsSequenceFileOption(boolean withMapreduceOutputBasenameProperty) {
ArgumentArrayBuilder builder = S3TestUtils.getArgumentArrayBuilderForS3UnitTestsWithFileFormatOption(this,
s3CredentialGenerator,"as-sequencefile");
builder = S3TestUtils.addIncrementalAppendImportArgs(builder);
if (withMapreduceOutputBasenameProperty) {
builder.withProperty(MAPREDUCE_OUTPUT_BASENAME_PROPERTY, S3TestUtils.MAPREDUCE_OUTPUT_BASENAME);
}
return builder.build();
}
}

View File

@ -0,0 +1,128 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.s3;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.sqoop.testutil.ArgumentArrayBuilder;
import org.apache.sqoop.testutil.DefaultS3CredentialGenerator;
import org.apache.sqoop.testutil.ImportJobTestCase;
import org.apache.sqoop.testutil.S3CredentialGenerator;
import org.apache.sqoop.testutil.S3TestUtils;
import org.apache.sqoop.testutil.TextFileTestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.IOException;
import static org.apache.sqoop.util.AppendUtils.MAPREDUCE_OUTPUT_BASENAME_PROPERTY;
public class TestS3IncrementalAppendTextImport extends ImportJobTestCase {
public static final Log LOG = LogFactory.getLog(
TestS3IncrementalAppendTextImport.class.getName());
private static S3CredentialGenerator s3CredentialGenerator;
private FileSystem s3Client;
@Rule
public ExpectedException thrown = ExpectedException.none();
@BeforeClass
public static void setupS3Credentials() throws IOException {
String generatorCommand = S3TestUtils.getGeneratorCommand();
if (generatorCommand != null) {
s3CredentialGenerator = new DefaultS3CredentialGenerator(generatorCommand);
}
}
@Before
public void setup() throws IOException {
S3TestUtils.runTestCaseOnlyIfS3CredentialsAreSet(s3CredentialGenerator);
super.setUp();
S3TestUtils.createTestTableFromInputData(this);
s3Client = S3TestUtils.setupS3ImportTestCase(s3CredentialGenerator);
}
@After
public void cleanUpOutputDirectories() {
S3TestUtils.tearDownS3IncrementalImportTestCase(s3Client);
super.tearDown();
}
@Test
public void testS3IncrementalAppendAsTextFileWhenNoNewRowIsImported() throws IOException {
String[] args = getArgs(false);
runImport(args);
args = getIncrementalAppendArgs(false);
runImport(args);
S3TestUtils.failIfOutputFilePathContainingPatternExists(s3Client, MAP_OUTPUT_FILE_00001);
}
@Test
public void testS3IncrementalAppendAsTextFile() throws IOException {
String[] args = getArgs(false);
runImport(args);
S3TestUtils.insertInputDataIntoTable(this, S3TestUtils.getExtraInputData());
args = getIncrementalAppendArgs(false);
runImport(args);
TextFileTestUtils.verify(S3TestUtils.getExpectedExtraTextOutput(), s3Client, S3TestUtils.getTargetDirPath(), MAP_OUTPUT_FILE_00001);
}
@Test
public void testS3IncrementalAppendAsTextFileWithMapreduceOutputBasenameProperty() throws IOException {
String[] args = getArgs(true);
runImport(args);
S3TestUtils.insertInputDataIntoTable(this, S3TestUtils.getExtraInputData());
args = getIncrementalAppendArgs(true);
runImport(args);
TextFileTestUtils.verify(S3TestUtils.getExpectedExtraTextOutput(), s3Client, S3TestUtils.getTargetDirPath(), S3TestUtils.CUSTOM_MAP_OUTPUT_FILE_00001);
}
private String[] getArgs(boolean withMapreduceOutputBasenameProperty) {
ArgumentArrayBuilder builder = S3TestUtils.getArgumentArrayBuilderForS3UnitTests(this, s3CredentialGenerator);
if (withMapreduceOutputBasenameProperty) {
builder.withProperty(MAPREDUCE_OUTPUT_BASENAME_PROPERTY, S3TestUtils.MAPREDUCE_OUTPUT_BASENAME);
}
return builder.build();
}
private String[] getIncrementalAppendArgs(boolean withMapreduceOutputBasenameProperty) {
ArgumentArrayBuilder builder = S3TestUtils.getArgumentArrayBuilderForS3UnitTests(this, s3CredentialGenerator);
builder = S3TestUtils.addIncrementalAppendImportArgs(builder);
if (withMapreduceOutputBasenameProperty) {
builder.withProperty(MAPREDUCE_OUTPUT_BASENAME_PROPERTY, S3TestUtils.MAPREDUCE_OUTPUT_BASENAME);
}
return builder.build();
}
}

View File

@ -0,0 +1,143 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.s3;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.sqoop.testutil.ArgumentArrayBuilder;
import org.apache.sqoop.testutil.DefaultS3CredentialGenerator;
import org.apache.sqoop.testutil.ImportJobTestCase;
import org.apache.sqoop.testutil.S3CredentialGenerator;
import org.apache.sqoop.testutil.S3TestUtils;
import org.apache.sqoop.util.ParquetReader;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.util.List;
import static org.apache.sqoop.util.AppendUtils.MAPREDUCE_OUTPUT_BASENAME_PROPERTY;
import static org.junit.Assert.assertEquals;
public class TestS3IncrementalMergeParquetImport extends ImportJobTestCase {
public static final Log LOG = LogFactory.getLog(
TestS3IncrementalMergeParquetImport.class.getName());
private static S3CredentialGenerator s3CredentialGenerator;
private FileSystem s3Client;
@Rule
public ExpectedException thrown = ExpectedException.none();
@BeforeClass
public static void setupS3Credentials() throws IOException {
String generatorCommand = S3TestUtils.getGeneratorCommand();
if (generatorCommand != null) {
s3CredentialGenerator = new DefaultS3CredentialGenerator(generatorCommand);
}
}
@Before
public void setup() throws IOException {
S3TestUtils.runTestCaseOnlyIfS3CredentialsAreSet(s3CredentialGenerator);
super.setUp();
S3TestUtils.createTestTableFromInitialInputDataForMerge(this);
s3Client = S3TestUtils.setupS3ImportTestCase(s3CredentialGenerator);
}
@After
public void cleanUpOutputDirectories() {
S3TestUtils.tearDownS3IncrementalImportTestCase(s3Client);
super.tearDown();
}
@Test
public void testS3IncrementalMergeAsParquetFileWhenNoNewRowIsImported() throws Exception {
String[] args = getArgsWithAsParquetFileOption(false);
runImport(args);
clearTable(getTableName());
args = getIncrementalMergeArgsWithAsParquetFileOption(false);
runImport(args);
List<String> result = new ParquetReader(S3TestUtils.getTargetDirPath(), s3Client.getConf()).readAllInCsvSorted();
assertEquals(S3TestUtils.getExpectedParquetOutputWithTimestampColumn(this), result);
}
@Test
public void testS3IncrementalMergeAsParquetFile() throws Exception {
String[] args = getArgsWithAsParquetFileOption(false);
runImport(args);
clearTable(getTableName());
S3TestUtils.insertInputDataIntoTableForMerge(this, S3TestUtils.getNewInputDataForMerge());
args = getIncrementalMergeArgsWithAsParquetFileOption(false);
runImport(args);
List<String> result = new ParquetReader(S3TestUtils.getTargetDirPath(), s3Client.getConf()).readAllInCsvSorted();
assertEquals(S3TestUtils.getExpectedParquetOutputWithTimestampColumnAfterMerge(this), result);
}
@Test
public void testS3IncrementalMergeAsParquetFileWithMapreduceOutputBasenameProperty() throws Exception {
String[] args = getArgsWithAsParquetFileOption(true);
runImport(args);
clearTable(getTableName());
S3TestUtils.insertInputDataIntoTableForMerge(this, S3TestUtils.getNewInputDataForMerge());
args = getIncrementalMergeArgsWithAsParquetFileOption(true);
runImport(args);
S3TestUtils.failIfOutputFilePathContainingPatternDoesNotExists(s3Client, S3TestUtils.MAPREDUCE_OUTPUT_BASENAME);
List<String> result = new ParquetReader(S3TestUtils.getTargetDirPath(), s3Client.getConf()).readAllInCsvSorted();
assertEquals(S3TestUtils.getExpectedParquetOutputWithTimestampColumnAfterMerge(this), result);
}
private String[] getArgsWithAsParquetFileOption(boolean withMapreduceOutputBasenameProperty) {
ArgumentArrayBuilder builder = S3TestUtils.getArgumentArrayBuilderForS3UnitTestsWithFileFormatOption(this,
s3CredentialGenerator,"as-parquetfile");
if (withMapreduceOutputBasenameProperty) {
builder.withProperty(MAPREDUCE_OUTPUT_BASENAME_PROPERTY, S3TestUtils.MAPREDUCE_OUTPUT_BASENAME);
}
return builder.build();
}
private String[] getIncrementalMergeArgsWithAsParquetFileOption(boolean withMapreduceOutputBasenameProperty) {
ArgumentArrayBuilder builder = S3TestUtils.getArgumentArrayBuilderForS3UnitTestsWithFileFormatOption(this,
s3CredentialGenerator,"as-parquetfile");
builder = S3TestUtils.addIncrementalMergeImportArgs(builder);
if (withMapreduceOutputBasenameProperty) {
builder.withProperty(MAPREDUCE_OUTPUT_BASENAME_PROPERTY, S3TestUtils.MAPREDUCE_OUTPUT_BASENAME);
}
return builder.build();
}
}

View File

@ -0,0 +1,134 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.s3;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.sqoop.testutil.ArgumentArrayBuilder;
import org.apache.sqoop.testutil.DefaultS3CredentialGenerator;
import org.apache.sqoop.testutil.ImportJobTestCase;
import org.apache.sqoop.testutil.S3CredentialGenerator;
import org.apache.sqoop.testutil.S3TestUtils;
import org.apache.sqoop.testutil.TextFileTestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.IOException;
import static org.apache.sqoop.util.AppendUtils.MAPREDUCE_OUTPUT_BASENAME_PROPERTY;
public class TestS3IncrementalMergeTextImport extends ImportJobTestCase {
public static final Log LOG = LogFactory.getLog(
TestS3IncrementalMergeTextImport.class.getName());
private static S3CredentialGenerator s3CredentialGenerator;
private FileSystem s3Client;
@Rule
public ExpectedException thrown = ExpectedException.none();
@BeforeClass
public static void setupS3Credentials() throws IOException {
String generatorCommand = S3TestUtils.getGeneratorCommand();
if (generatorCommand != null) {
s3CredentialGenerator = new DefaultS3CredentialGenerator(generatorCommand);
}
}
@Before
public void setup() throws IOException {
S3TestUtils.runTestCaseOnlyIfS3CredentialsAreSet(s3CredentialGenerator);
super.setUp();
S3TestUtils.createTestTableFromInitialInputDataForMerge(this);
s3Client = S3TestUtils.setupS3ImportTestCase(s3CredentialGenerator);
}
@After
public void cleanUpOutputDirectories() {
S3TestUtils.tearDownS3IncrementalImportTestCase(s3Client);
super.tearDown();
}
@Test
public void testS3IncrementalMergeAsTextFileWhenNoNewRowIsImported() throws Exception {
String[] args = getArgs(false);
runImport(args);
clearTable(getTableName());
args = getIncrementalMergeArgs(false);
runImport(args);
TextFileTestUtils.verify(S3TestUtils.getExpectedTextOutputBeforeMerge(), s3Client, S3TestUtils.getTargetDirPath(), REDUCE_OUTPUT_FILE_00000);
}
@Test
public void testS3IncrementalMergeAsTextFile() throws Exception {
String[] args = getArgs(false);
runImport(args);
clearTable(getTableName());
S3TestUtils.insertInputDataIntoTableForMerge(this, S3TestUtils.getNewInputDataForMerge());
args = getIncrementalMergeArgs(false);
runImport(args);
TextFileTestUtils.verify(S3TestUtils.getExpectedTextOutputAfterMerge(), s3Client, S3TestUtils.getTargetDirPath(), REDUCE_OUTPUT_FILE_00000);
}
@Test
public void testS3IncrementalMergeAsTextFileWithMapreduceOutputBasenameProperty() throws Exception {
String[] args = getArgs(true);
runImport(args);
clearTable(getTableName());
S3TestUtils.insertInputDataIntoTableForMerge(this, S3TestUtils.getNewInputDataForMerge());
args = getIncrementalMergeArgs(true);
runImport(args);
TextFileTestUtils.verify(S3TestUtils.getExpectedTextOutputAfterMerge(), s3Client, S3TestUtils.getTargetDirPath(), S3TestUtils.CUSTOM_REDUCE_OUTPUT_FILE_00000);
}
private String[] getArgs(boolean withMapreduceOutputBasenameProperty) {
ArgumentArrayBuilder builder = S3TestUtils.getArgumentArrayBuilderForS3UnitTests(this, s3CredentialGenerator);
if (withMapreduceOutputBasenameProperty) {
builder.withProperty(MAPREDUCE_OUTPUT_BASENAME_PROPERTY, S3TestUtils.MAPREDUCE_OUTPUT_BASENAME);
}
return builder.build();
}
private String[] getIncrementalMergeArgs(boolean withMapreduceOutputBasenameProperty) {
ArgumentArrayBuilder builder = S3TestUtils.getArgumentArrayBuilderForS3UnitTests(this, s3CredentialGenerator);
builder = S3TestUtils.addIncrementalMergeImportArgs(builder);
if (withMapreduceOutputBasenameProperty) {
builder.withProperty(MAPREDUCE_OUTPUT_BASENAME_PROPERTY, S3TestUtils.MAPREDUCE_OUTPUT_BASENAME);
}
return builder.build();
}
}

View File

@ -0,0 +1,116 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.s3;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.sqoop.testutil.ArgumentArrayBuilder;
import org.apache.sqoop.testutil.DefaultS3CredentialGenerator;
import org.apache.sqoop.testutil.ImportJobTestCase;
import org.apache.sqoop.testutil.S3CredentialGenerator;
import org.apache.sqoop.testutil.S3TestUtils;
import org.apache.sqoop.util.ParquetReader;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.util.List;
import static org.junit.Assert.assertEquals;
public class TestS3ParquetImport extends ImportJobTestCase {
public static final Log LOG = LogFactory.getLog(
TestS3ParquetImport.class.getName());
private static S3CredentialGenerator s3CredentialGenerator;
private FileSystem s3Client;
@Rule
public ExpectedException thrown = ExpectedException.none();
@BeforeClass
public static void setupS3Credentials() throws IOException {
String generatorCommand = S3TestUtils.getGeneratorCommand();
if (generatorCommand != null) {
s3CredentialGenerator = new DefaultS3CredentialGenerator(generatorCommand);
}
}
@Before
public void setup() throws IOException {
S3TestUtils.runTestCaseOnlyIfS3CredentialsAreSet(s3CredentialGenerator);
super.setUp();
S3TestUtils.createTestTableFromInputData(this);
s3Client = S3TestUtils.setupS3ImportTestCase(s3CredentialGenerator);
}
@After
public void cleanUpTargetDir() {
S3TestUtils.tearDownS3ImportTestCase(s3Client);
super.tearDown();
}
@Test
public void testS3ImportAsParquetFileWithoutDeleteTargetDirOptionWhenTargetDirDoesNotExist() throws Exception {
String[] args = getArgsWithAsParquetFileOption();
runImport(args);
List<String> result = new ParquetReader(S3TestUtils.getTargetDirPath(), s3Client.getConf()).readAllInCsvSorted();
assertEquals(S3TestUtils.getExpectedParquetOutput(), result);
}
@Test
public void testS3ImportAsParquetFileWithDeleteTargetDirOptionWhenTargetDirAlreadyExists() throws Exception {
String[] args = getArgsWithAsParquetFileOption();
runImport(args);
args = getArgsWithAsParquetFileAndDeleteTargetDirOption();
runImport(args);
List<String> result = new ParquetReader(S3TestUtils.getTargetDirPath(), s3Client.getConf()).readAllInCsvSorted();
assertEquals(S3TestUtils.getExpectedParquetOutput(), result);
}
@Test
public void testS3ImportAsParquetFileWithoutDeleteTargetDirOptionWhenTargetDirAlreadyExists() throws Exception {
String[] args = getArgsWithAsParquetFileOption();
runImport(args);
thrown.expect(IOException.class);
runImport(args);
}
private String[] getArgsWithAsParquetFileOption() {
return S3TestUtils.getArgsForS3UnitTestsWithFileFormatOption(this, s3CredentialGenerator, "as-parquetfile");
}
private String[] getArgsWithAsParquetFileAndDeleteTargetDirOption() {
ArgumentArrayBuilder builder = S3TestUtils.getArgumentArrayBuilderForS3UnitTestsWithFileFormatOption(this,
s3CredentialGenerator,"as-parquetfile");
builder.withOption("delete-target-dir");
return builder.build();
}
}

View File

@ -60,52 +60,50 @@ public static void setupS3Credentials() throws IOException {
public void setup() throws IOException {
S3TestUtils.runTestCaseOnlyIfS3CredentialsAreSet(s3CredentialGenerator);
super.setUp();
s3Client = S3TestUtils.setupS3ImportTestCase(s3CredentialGenerator, this);
S3TestUtils.createTestTableFromInputData(this);
s3Client = S3TestUtils.setupS3ImportTestCase(s3CredentialGenerator);
}
@After
public void clearOutputDir() throws IOException {
S3TestUtils.clearTargetDir(s3Client);
S3TestUtils.resetTargetDirName();
public void cleanUpTargetDir() {
S3TestUtils.tearDownS3ImportTestCase(s3Client);
super.tearDown();
}
protected ArgumentArrayBuilder getArgumentArrayBuilder() {
ArgumentArrayBuilder builder = S3TestUtils.getArgumentArrayBuilderForUnitTests(this, s3CredentialGenerator);
return builder;
}
@Test
public void testImportAsSequenceFileWithoutDeleteTargetDirOptionWhenTargetDirDoesNotExist() throws Exception {
ArgumentArrayBuilder builder = getArgumentArrayBuilder();
builder.withOption("as-sequencefile");
String[] args = builder.build();
public void testS3ImportAsSequenceFileWithoutDeleteTargetDirOptionWhenTargetDirDoesNotExist() throws Exception {
String[] args = getArgsWithAsSequenceFileOption();
runImport(args);
SequenceFileTestUtils.verify(this, S3TestUtils.getExpectedSequenceFileOutput(), s3Client, S3TestUtils.getTargetDirPath());
}
@Test
public void testImportAsSequenceFileWithDeleteTargetDirOptionWhenTargetDirAlreadyExists() throws Exception {
ArgumentArrayBuilder builder = getArgumentArrayBuilder();
builder.withOption("as-sequencefile");
builder.withOption("delete-target-dir");
String[] args = builder.build();
public void testS3ImportAsSequenceFileWithDeleteTargetDirOptionWhenTargetDirAlreadyExists() throws Exception {
String[] args = getArgsWithAsSequenceFileOption();
runImport(args);
args = getArgsWithAsSequenceFileAndDeleteTargetDirOption();
runImport(args);
SequenceFileTestUtils.verify(this, S3TestUtils.getExpectedSequenceFileOutput(), s3Client, S3TestUtils.getTargetDirPath());
runImport(args);
}
@Test
public void testImportAsSequenceWithoutDeleteTargetDirOptionWhenTargetDirAlreadyExists() throws Exception {
ArgumentArrayBuilder builder = getArgumentArrayBuilder();
builder.withOption("as-sequencefile");
String[] args = builder.build();
public void testS3ImportAsSequenceWithoutDeleteTargetDirOptionWhenTargetDirAlreadyExists() throws Exception {
String[] args = getArgsWithAsSequenceFileOption();
runImport(args);
SequenceFileTestUtils.verify(this, S3TestUtils.getExpectedSequenceFileOutput(), s3Client, S3TestUtils.getTargetDirPath());
thrown.expect(IOException.class);
runImport(args);
}
private String[] getArgsWithAsSequenceFileOption() {
return S3TestUtils.getArgsForS3UnitTestsWithFileFormatOption(this, s3CredentialGenerator, "as-sequencefile");
}
private String[] getArgsWithAsSequenceFileAndDeleteTargetDirOption() {
ArgumentArrayBuilder builder = S3TestUtils.getArgumentArrayBuilderForS3UnitTestsWithFileFormatOption(this,
s3CredentialGenerator,"as-sequencefile");
builder.withOption("delete-target-dir");
return builder.build();
}
}

View File

@ -60,81 +60,82 @@ public static void setupS3Credentials() throws IOException {
public void setup() throws IOException {
S3TestUtils.runTestCaseOnlyIfS3CredentialsAreSet(s3CredentialGenerator);
super.setUp();
s3Client = S3TestUtils.setupS3ImportTestCase(s3CredentialGenerator, this);
S3TestUtils.createTestTableFromInputData(this);
s3Client = S3TestUtils.setupS3ImportTestCase(s3CredentialGenerator);
}
@After
public void clearOutputDir() throws IOException {
S3TestUtils.clearTargetDir(s3Client);
S3TestUtils.resetTargetDirName();
public void cleanUpTargetDir() {
S3TestUtils.tearDownS3ImportTestCase(s3Client);
super.tearDown();
}
protected ArgumentArrayBuilder getArgumentArrayBuilder() {
ArgumentArrayBuilder builder = S3TestUtils.getArgumentArrayBuilderForUnitTests(this, s3CredentialGenerator);
return builder;
}
@Test
public void testImportWithoutDeleteTargetDirOptionWhenTargetDirDoesNotExist() throws IOException {
ArgumentArrayBuilder builder = getArgumentArrayBuilder();
String[] args = builder.build();
String[] args = getArgs(false);
runImport(args);
TextFileTestUtils.verify(S3TestUtils.getExpectedTextOutput(), s3Client, S3TestUtils.getTargetDirPath());
}
@Test
public void testImportWithDeleteTargetDirOptionWhenTargetDirAlreadyExists() throws IOException {
ArgumentArrayBuilder builder = getArgumentArrayBuilder();
builder.withOption("delete-target-dir");
String[] args = builder.build();
String[] args = getArgs(false);
runImport(args);
args = getArgsWithDeleteTargetOption(false);
runImport(args);
TextFileTestUtils.verify(S3TestUtils.getExpectedTextOutput(), s3Client, S3TestUtils.getTargetDirPath());
runImport(args);
}
@Test
public void testImportWithoutDeleteTargetDirOptionWhenTargetDirAlreadyExists() throws IOException {
ArgumentArrayBuilder builder = getArgumentArrayBuilder();
String[] args = builder.build();
String[] args = getArgs(false);
runImport(args);
TextFileTestUtils.verify(S3TestUtils.getExpectedTextOutput(), s3Client, S3TestUtils.getTargetDirPath());
thrown.expect(IOException.class);
runImport(args);
}
@Test
public void testImportAsTextFile() throws IOException {
ArgumentArrayBuilder builder = getArgumentArrayBuilder();
builder.withOption("as-textfile");
String[] args = builder.build();
public void testS3ImportAsTextFile() throws IOException {
String[] args = getArgs(true);
runImport(args);
TextFileTestUtils.verify(S3TestUtils.getExpectedTextOutput(), s3Client, S3TestUtils.getTargetDirPath());
}
@Test
public void testImportAsTextFileWithDeleteTargetDirOptionWhenTargetDirAlreadyExists() throws IOException {
ArgumentArrayBuilder builder = getArgumentArrayBuilder();
builder.withOption("as-textfile");
public void testS3ImportAsTextFileWithDeleteTargetDirOptionWhenTargetDirAlreadyExists() throws IOException {
String[] args = getArgs(true);
runImport(args);
args = getArgsWithDeleteTargetOption(true);
runImport(args);
TextFileTestUtils.verify(S3TestUtils.getExpectedTextOutput(), s3Client, S3TestUtils.getTargetDirPath());
}
@Test
public void testS3ImportAsTextFileWithoutDeleteTargetDirOptionWhenTargetDirAlreadyExists() throws IOException {
String[] args = getArgs(true);
runImport(args);
thrown.expect(IOException.class);
runImport(args);
}
private String[] getArgs(boolean withAsTextFileOption) {
ArgumentArrayBuilder builder = S3TestUtils.getArgumentArrayBuilderForS3UnitTests(this, s3CredentialGenerator);
if (withAsTextFileOption) {
builder.withOption("as-textfile");
}
return builder.build();
}
private String[] getArgsWithDeleteTargetOption(boolean withAsTextFileOption) {
ArgumentArrayBuilder builder = S3TestUtils.getArgumentArrayBuilderForS3UnitTests(this, s3CredentialGenerator);
builder.withOption("delete-target-dir");
String[] args = builder.build();
runImport(args);
TextFileTestUtils.verify(S3TestUtils.getExpectedTextOutput(), s3Client, S3TestUtils.getTargetDirPath());
runImport(args);
}
@Test
public void testImportAsTextFileWithoutDeleteTargetDirOptionWhenTargetDirAlreadyExists() throws IOException {
ArgumentArrayBuilder builder = getArgumentArrayBuilder();
builder.withOption("as-textfile");
String[] args = builder.build();
runImport(args);
TextFileTestUtils.verify(S3TestUtils.getExpectedTextOutput(), s3Client, S3TestUtils.getTargetDirPath());
thrown.expect(IOException.class);
runImport(args);
if (withAsTextFileOption) {
builder.withOption("as-textfile");
}
return builder.build();
}
}

View File

@ -76,17 +76,28 @@ public static void registerDecimalConversionUsageForVerification() {
public static void verify(String[] expectedResults, Configuration conf, Path tablePath) {
Path outputFile = new Path(tablePath, OUTPUT_FILE_NAME);
readAndVerify(expectedResults, conf, outputFile);
}
public static void verify(String[] expectedResults, Configuration conf, Path tablePath, String outputFileName) {
Path outputFile = new Path(tablePath, outputFileName + ".avro");
readAndVerify(expectedResults, conf, outputFile);
}
private static void readAndVerify(String[] expectedResults, Configuration conf, Path outputFile) {
try (DataFileReader<GenericRecord> reader = read(outputFile, conf)) {
GenericRecord record;
if (!reader.hasNext() && expectedResults != null && expectedResults.length > 0) {
fail("empty file was not expected");
fail("Empty file was not expected");
}
int i = 0;
while (reader.hasNext()){
record = reader.next();
assertEquals(expectedResults[i++], record.toString());
}
if (expectedResults != null && expectedResults.length > i) {
fail("More output data was expected");
}
}
catch (IOException ioe) {
LOG.error("Issue with verifying the output", ioe);

View File

@ -81,6 +81,9 @@ private static void setOnPhysicalCluster(boolean value) {
private static boolean onPhysicalCluster = false;
public static final String MAP_OUTPUT_FILE_00001 = "part-m-00001";
public static final String REDUCE_OUTPUT_FILE_00000 = "part-r-00000";
/** Base directory for all temporary data. */
public static final String TEMP_BASE_DIR;
@ -447,6 +450,12 @@ protected void insertRecordsIntoTable(String[] colTypes, List<List<Object>> reco
}
}
protected void insertRecordsIntoTableWithColTypesAndNames(String[] columns, String[] colTypes, List<List<Object>> records) {
for (List<Object> record : records) {
insertIntoTable(columns, colTypes, record);
}
}
protected void insertIntoTable(String[] columns, String[] colTypes, List<Object> record) {
insertIntoTable(columns, colTypes, toStringArray(record));
}
@ -610,6 +619,13 @@ protected void createTableWithRecords(String [] colTypes, List<List<Object>> rec
}
}
protected void createTableWithRecordsWithColTypesAndNames(String [] columns, String [] colTypes, List<List<Object>> records) {
createTableWithColTypesAndNames(columns, colTypes, records.get(0));
for (int i = 1; i < records.size(); i++) {
insertIntoTable(columns, colTypes, records.get(i));
}
}
/**
* Create a table with a single column and put a data element in it.
* @param colType the type of the column to create

View File

@ -24,12 +24,17 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.Constants;
import org.apache.sqoop.util.FileSystemUtil;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import static java.util.Arrays.asList;
import static org.apache.sqoop.util.AppendUtils.MAPREDUCE_OUTPUT_BASENAME_PROPERTY;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeNotNull;
public class S3TestUtils {
@ -39,12 +44,28 @@ public class S3TestUtils {
private static final String TEMPORARY_CREDENTIALS_PROVIDER_CLASS = "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider";
private static final String BUCKET_TEMP_DIR = "/tmp/";
private static final String BUCKET_TEMP_TEST_DIR = "/tmp/sqooptest/";
private static final String TARGET_DIR_NAME_PREFIX = "testdir";
private static final String TARGET_DIR_NAME_PREFIX = "/testdir";
private static final String TEMPORARY_ROOTDIR_SUFFIX = "_temprootdir";
private static String targetDirName = TARGET_DIR_NAME_PREFIX;
private static final String[] COLUMN_NAMES = {"ID", "SUPERHERO", "COMICS", "DEBUT"};
private static final String[] COLUMN_TYPES = { "INT", "VARCHAR(25)", "VARCHAR(25)", "INT"};
private static final String[] COLUMN_NAMES_FOR_MERGE = { "DEBUT", "SUPERHERO1", "SUPERHERO2", "RECORD_DATE"};
private static final String[] COLUMN_TYPES_FOR_MERGE = { "INT", "VARCHAR(25)", "VARCHAR(25)", "TIMESTAMP"};
private static final String INITIAL_TIMESTAMP_FOR_MERGE = "2018-07-23 15:00:00.000";
private static final String NEW_TIMESTAMP_FOR_MERGE = "2018-08-16 16:30:09.000";
private static final String EXPECTED_INITIAL_TIMESTAMP_FOR_MERGE = "2018-07-23 15:00:00.0";
private static final String EXPECTED_NEW_TIMESTAMP_FOR_MERGE = "2018-08-16 16:30:09.0";
public static final String MAPREDUCE_OUTPUT_BASENAME = "custom";
public static final String CUSTOM_MAP_OUTPUT_FILE_00001 = MAPREDUCE_OUTPUT_BASENAME + "-m-00001";
public static final String CUSTOM_REDUCE_OUTPUT_FILE_00000 = MAPREDUCE_OUTPUT_BASENAME + "-r-00000";
public static final Log LOG = LogFactory.getLog(
S3TestUtils.class.getName());
@ -65,7 +86,7 @@ private static void setUniqueTargetDirName() {
targetDirName = targetDirName + "-" + uuid;
}
public static void resetTargetDirName() {
private static void resetTargetDirName() {
targetDirName = TARGET_DIR_NAME_PREFIX;
}
@ -74,7 +95,12 @@ private static String getTargetDirName() {
}
public static Path getTargetDirPath() {
String targetPathString = getPropertyBucketUrl() + BUCKET_TEMP_DIR + getTargetDirName();
String targetPathString = getBucketTempTestDirPath() + getTargetDirName();
return new Path(targetPathString);
}
private static Path getBucketTempTestDirPath() {
String targetPathString = getPropertyBucketUrl() + BUCKET_TEMP_TEST_DIR;
return new Path(targetPathString);
}
@ -84,17 +110,14 @@ public static void runTestCaseOnlyIfS3CredentialsAreSet(S3CredentialGenerator s3
assumeNotNull(s3CredentialGenerator.getS3SecretKey());
}
public static FileSystem setupS3ImportTestCase(S3CredentialGenerator s3CredentialGenerator,
BaseSqoopTestCase testCase) throws IOException {
createTestTableFromInputData(testCase);
public static FileSystem setupS3ImportTestCase(S3CredentialGenerator s3CredentialGenerator) throws IOException {
Configuration hadoopConf = new Configuration();
S3TestUtils.setS3CredentialsInHadoopConf(hadoopConf, s3CredentialGenerator);
FileSystem s3Client = FileSystem.get(hadoopConf);
setUniqueTargetDirName();
clearTargetDir(s3Client);
cleanUpDirectory(s3Client, getTargetDirPath());
return s3Client;
}
@ -111,8 +134,8 @@ private static void setS3CredentialsInHadoopConf(Configuration hadoopConf,
}
}
public static ArgumentArrayBuilder getArgumentArrayBuilderForUnitTests(BaseSqoopTestCase testCase,
S3CredentialGenerator s3CredentialGenerator) {
public static ArgumentArrayBuilder getArgumentArrayBuilderForS3UnitTests(BaseSqoopTestCase testCase,
S3CredentialGenerator s3CredentialGenerator) {
ArgumentArrayBuilder builder = new ArgumentArrayBuilder();
return builder.withCommonHadoopFlags()
.withProperty(Constants.ACCESS_KEY, s3CredentialGenerator.getS3AccessKey())
@ -125,6 +148,43 @@ public static ArgumentArrayBuilder getArgumentArrayBuilderForUnitTests(BaseSqoop
.withOption("target-dir", getTargetDirPath().toString());
}
public static ArgumentArrayBuilder getArgumentArrayBuilderForS3UnitTestsWithFileFormatOption(BaseSqoopTestCase testCase,
S3CredentialGenerator s3CredentialGenerator,
String fileFormat) {
ArgumentArrayBuilder builder = getArgumentArrayBuilderForS3UnitTests(testCase, s3CredentialGenerator);
builder.withOption(fileFormat);
return builder;
}
public static String[] getArgsForS3UnitTestsWithFileFormatOption(BaseSqoopTestCase testCase,
S3CredentialGenerator s3CredentialGenerator,
String fileFormat) {
ArgumentArrayBuilder builder = getArgumentArrayBuilderForS3UnitTests(testCase, s3CredentialGenerator);
builder.withOption(fileFormat);
return builder.build();
}
private static Path getTemporaryRootDirPath() {
return new Path(getTargetDirPath().toString() + TEMPORARY_ROOTDIR_SUFFIX);
}
public static ArgumentArrayBuilder addIncrementalAppendImportArgs(ArgumentArrayBuilder builder) {
return builder
.withOption("incremental", "append")
.withOption("check-column", "ID")
.withOption("last-value", "4")
.withOption("temporary-rootdir", getTemporaryRootDirPath().toString());
}
public static ArgumentArrayBuilder addIncrementalMergeImportArgs(ArgumentArrayBuilder builder) {
return builder
.withOption("incremental", "lastmodified")
.withOption("check-column", "RECORD_DATE")
.withOption("merge-key", "DEBUT")
.withOption("last-value", INITIAL_TIMESTAMP_FOR_MERGE)
.withOption("temporary-rootdir", getTemporaryRootDirPath().toString());
}
private static List<String[]> getInputData() {
List<String[]> data = new ArrayList<>();
data.add(new String[]{"1", "'Ironman'", "'Marvel'", "1963"});
@ -134,15 +194,40 @@ private static List<String[]> getInputData() {
return data;
}
private static void createTestTableFromInputData(BaseSqoopTestCase testCase) {
String[] names = {"ID", "SUPERHERO", "COMICS", "DEBUT"};
String[] types = { "INT", "VARCHAR(25)", "VARCHAR(25)", "INT"};
public static String[] getExtraInputData() {
return new String[]{"5", "'Black Widow'", "'Marvel'", "1964"};
}
public static List<List<Object>> getInitialInputDataForMerge() {
return Arrays.<List<Object>>asList(
Arrays.<Object>asList(1940, "Black Widow", "Falcon", INITIAL_TIMESTAMP_FOR_MERGE),
Arrays.<Object>asList(1974, "Iron Fist", "The Punisher", INITIAL_TIMESTAMP_FOR_MERGE));
}
public static List<List<Object>> getNewInputDataForMerge() {
return Arrays.<List<Object>>asList(
Arrays.<Object>asList(1962, "Spiderman", "Thor", NEW_TIMESTAMP_FOR_MERGE),
Arrays.<Object>asList(1974, "Wolverine", "The Punisher", NEW_TIMESTAMP_FOR_MERGE));
}
public static void createTestTableFromInputData(BaseSqoopTestCase testCase) {
List<String[]> inputData = getInputData();
testCase.createTableWithColTypesAndNames(names, types, new String[0]);
testCase.insertIntoTable(names, types, inputData.get(0));
testCase.insertIntoTable(names, types, inputData.get(1));
testCase.insertIntoTable(names, types, inputData.get(2));
testCase.insertIntoTable(names, types, inputData.get(3));
testCase.createTableWithColTypesAndNames(COLUMN_NAMES, COLUMN_TYPES, new String[0]);
for (String[] dataRow : inputData) {
insertInputDataIntoTable(testCase, dataRow);
}
}
public static void insertInputDataIntoTable(BaseSqoopTestCase testCase, String[] inputData) {
testCase.insertIntoTable(COLUMN_NAMES, COLUMN_TYPES, inputData);
}
public static void createTestTableFromInitialInputDataForMerge(BaseSqoopTestCase testCase) {
testCase.createTableWithRecordsWithColTypesAndNames(COLUMN_NAMES_FOR_MERGE, COLUMN_TYPES_FOR_MERGE, getInitialInputDataForMerge());
}
public static void insertInputDataIntoTableForMerge(BaseSqoopTestCase testCase, List<List<Object>> inputData) {
testCase.insertRecordsIntoTableWithColTypesAndNames(COLUMN_NAMES_FOR_MERGE, COLUMN_TYPES_FOR_MERGE, inputData);
}
public static String[] getExpectedTextOutput() {
@ -154,6 +239,27 @@ public static String[] getExpectedTextOutput() {
};
}
public static String[] getExpectedExtraTextOutput() {
return new String[] {
"5,Black Widow,Marvel,1964"
};
}
public static String[] getExpectedTextOutputBeforeMerge() {
return new String[] {
"1940,Black Widow,Falcon," + EXPECTED_INITIAL_TIMESTAMP_FOR_MERGE,
"1974,Iron Fist,The Punisher," + EXPECTED_INITIAL_TIMESTAMP_FOR_MERGE
};
}
public static String[] getExpectedTextOutputAfterMerge() {
return new String[] {
"1940,Black Widow,Falcon," + EXPECTED_INITIAL_TIMESTAMP_FOR_MERGE,
"1962,Spiderman,Thor," + EXPECTED_NEW_TIMESTAMP_FOR_MERGE,
"1974,Wolverine,The Punisher," + EXPECTED_NEW_TIMESTAMP_FOR_MERGE
};
}
public static String[] getExpectedSequenceFileOutput() {
return new String[] {
"1,Ironman,Marvel,1963\n",
@ -163,6 +269,12 @@ public static String[] getExpectedSequenceFileOutput() {
};
}
public static String[] getExpectedExtraSequenceFileOutput() {
return new String[] {
"5,Black Widow,Marvel,1964\n"
};
}
public static String[] getExpectedAvroOutput() {
return new String[] {
"{\"ID\": 1, \"SUPERHERO\": \"Ironman\", \"COMICS\": \"Marvel\", \"DEBUT\": 1963}",
@ -172,14 +284,78 @@ public static String[] getExpectedAvroOutput() {
};
}
public static void clearTargetDir(FileSystem s3Client) throws IOException{
public static String[] getExpectedExtraAvroOutput() {
return new String[] {
"{\"ID\": 5, \"SUPERHERO\": \"Black Widow\", \"COMICS\": \"Marvel\", \"DEBUT\": 1964}"
};
}
try {
if (s3Client.exists(getTargetDirPath())) {
s3Client.delete(getTargetDirPath(), true);
}
} catch (Exception e) {
LOG.error("Issue with cleaning up output directory", e);
public static List<String> getExpectedParquetOutput() {
return asList(
"1,Ironman,Marvel,1963",
"2,Wonder Woman,DC,1941",
"3,Batman,DC,1939",
"4,Hulk,Marvel,1962");
}
public static List<String> getExpectedParquetOutputAfterAppend() {
return asList(
"1,Ironman,Marvel,1963",
"2,Wonder Woman,DC,1941",
"3,Batman,DC,1939",
"4,Hulk,Marvel,1962",
"5,Black Widow,Marvel,1964");
}
public static List<String> getExpectedParquetOutputWithTimestampColumn(BaseSqoopTestCase testCase) {
return asList(
"1940,Black Widow,Falcon," + testCase.timeFromString(INITIAL_TIMESTAMP_FOR_MERGE),
"1974,Iron Fist,The Punisher," + testCase.timeFromString(INITIAL_TIMESTAMP_FOR_MERGE));
}
public static List<String> getExpectedParquetOutputWithTimestampColumnAfterMerge(BaseSqoopTestCase testCase) {
return asList(
"1940,Black Widow,Falcon," + testCase.timeFromString(INITIAL_TIMESTAMP_FOR_MERGE),
"1962,Spiderman,Thor," + testCase.timeFromString(NEW_TIMESTAMP_FOR_MERGE),
"1974,Wolverine,The Punisher," + testCase.timeFromString(NEW_TIMESTAMP_FOR_MERGE));
}
public static void failIfOutputFilePathContainingPatternExists(FileSystem s3Client, String pattern) throws IOException {
List<Path> outputFilesWithPathContainingPattern = FileSystemUtil.findFilesWithPathContainingPattern(getTargetDirPath(),
s3Client.getConf(), pattern);
if (outputFilesWithPathContainingPattern.size() != 0) {
fail("No output file was expected with pattern" + pattern);
}
}
public static void failIfOutputFilePathContainingPatternDoesNotExists(FileSystem s3Client, String pattern) throws IOException {
List<Path> outputFilesWithPathContainingPattern = FileSystemUtil.findFilesWithPathContainingPattern(getTargetDirPath(),
s3Client.getConf(), pattern);
if (outputFilesWithPathContainingPattern.size() == 0) {
fail("No output file was found with pattern" + pattern);
}
}
public static void cleanUpDirectory(FileSystem s3Client, Path directoryPath) {
try {
if (s3Client.exists(directoryPath)) {
s3Client.delete(directoryPath, true);
}
} catch (Exception e) {
LOG.error("Issue with cleaning up directory", e);
}
}
public static void tearDownS3ImportTestCase(FileSystem s3Client) {
cleanUpDirectory(s3Client, getTargetDirPath());
resetTargetDirName();
}
public static void tearDownS3IncrementalImportTestCase(FileSystem s3Client) {
cleanUpDirectory(s3Client, getTargetDirPath());
cleanUpDirectory(s3Client, getTemporaryRootDirPath());
resetTargetDirName();
System.clearProperty(MAPREDUCE_OUTPUT_BASENAME_PROPERTY);
}
}

View File

@ -50,6 +50,23 @@ public class SequenceFileTestUtils {
*/
public static void verify(BaseSqoopTestCase testCase, String[] expectedResults, FileSystem fileSystem, Path tablePath) throws Exception{
String outputFilePathString = tablePath.toString() + OUTPUT_FILE_NAME;
readAndVerify(testCase, expectedResults, fileSystem, outputFilePathString);
}
/**
* Verify results at the given tablePath.
* @param testCase current instance of BaseSqoopTestCase
* @param expectedResults string array of expected results
* @param fileSystem current fileSystem
* @param tablePath path of the output table
* @param outputFileName MapReduce output filename
*/
public static void verify(BaseSqoopTestCase testCase, String[] expectedResults, FileSystem fileSystem, Path tablePath, String outputFileName) throws Exception{
String outputFilePathString = tablePath.toString() + "/" + outputFileName;
readAndVerify(testCase, expectedResults, fileSystem, outputFilePathString);
}
private static void readAndVerify(BaseSqoopTestCase testCase, String[] expectedResults, FileSystem fileSystem, String outputFilePathString) throws Exception {
Path outputFilePath = new Path(outputFilePathString);
Configuration conf = fileSystem.getConf();
@ -75,6 +92,10 @@ public static void verify(BaseSqoopTestCase testCase, String[] expectedResults,
assertEquals(expectedResults[i++], value.toString());
hasNextRecord = reader.next(key, value);
}
if (expectedResults != null && expectedResults.length > i) {
fail("More output data was expected");
}
} catch (IOException ioe) {
LOG.error("Issue with verifying the output", ioe);
throw new RuntimeException(ioe);

View File

@ -33,7 +33,7 @@
public class TextFileTestUtils {
private static final String OUTPUT_FILE_NAME = "/part-m-00000";
private static final String DEFAULT_OUTPUT_FILE_NAME = "/part-m-00000";
public static final Log LOG = LogFactory.getLog(
TextFileTestUtils.class.getName());
@ -45,7 +45,23 @@ public class TextFileTestUtils {
* @param tablePath path of the output table
*/
public static void verify(String[] expectedResults, FileSystem fileSystem, Path tablePath) throws IOException {
String outputFilePathString = tablePath.toString() + OUTPUT_FILE_NAME;
String outputFilePathString = tablePath.toString() + DEFAULT_OUTPUT_FILE_NAME;
readAndVerify(expectedResults, fileSystem, outputFilePathString);
}
/**
* Verify results at the given tablePath.
* @param expectedResults string array of expected results
* @param fileSystem current filesystem
* @param tablePath path of the output table
* @param outputFileName MapReduce output filename
*/
public static void verify(String[] expectedResults, FileSystem fileSystem, Path tablePath, String outputFileName) {
String outputFilePathString = tablePath.toString() + "/" + outputFileName;
readAndVerify(expectedResults, fileSystem, outputFilePathString);
}
private static void readAndVerify(String[] expectedResults, FileSystem fileSystem, String outputFilePathString) {
Path outputFilePath = new Path(outputFilePathString);
try (BufferedReader br = new BufferedReader(new InputStreamReader(fileSystem.open(outputFilePath), Charset.forName("UTF-8")))) {
@ -61,6 +77,10 @@ public static void verify(String[] expectedResults, FileSystem fileSystem, Path
line = br.readLine();
}
if (expectedResults != null && expectedResults.length > i) {
fail("More output data was expected");
}
} catch (IOException ioe) {
LOG.error("Issue with verifying the output", ioe);
throw new RuntimeException(ioe);