mirror of
https://github.com/apache/sqoop.git
synced 2025-05-02 15:49:24 +08:00
Merge 7752a95a8b
into f8beae32a0
This commit is contained in:
commit
63ca100a49
@ -19,7 +19,7 @@
|
||||
|
||||
javaSourceCompatibilityVersion=1.8
|
||||
|
||||
avroVersion=1.8.1
|
||||
avroVersion=1.8.2
|
||||
parquetVersion=1.9.0
|
||||
hadoopVersion=2.9.2
|
||||
aspectjVersion=1.7.4
|
||||
|
@ -21,6 +21,7 @@
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
@ -167,12 +168,12 @@ public String getCreateTableStmt() throws IOException {
|
||||
}
|
||||
|
||||
boolean first = true;
|
||||
String partitionKey = options.getHivePartitionKey();
|
||||
for (String col : colNames) {
|
||||
if (col.equals(partitionKey)) {
|
||||
throw new IllegalArgumentException("Partition key " + col + " cannot "
|
||||
+ "be a column to import.");
|
||||
}
|
||||
final String partitionKey = options.getHivePartitionKey();
|
||||
if (Arrays.asList(colNames).contains(partitionKey)) {
|
||||
throw new IllegalArgumentException(
|
||||
"Partition key " + partitionKey + " cannot be a column to import.");
|
||||
}
|
||||
for (final String col : colNames) {
|
||||
|
||||
if (!first) {
|
||||
sb.append(", ");
|
||||
@ -180,14 +181,23 @@ public String getCreateTableStmt() throws IOException {
|
||||
|
||||
first = false;
|
||||
|
||||
String hiveColType;
|
||||
if (options.getFileLayout() == SqoopOptions.FileLayout.TextFile) {
|
||||
final String hiveColType;
|
||||
switch (options.getFileLayout()) {
|
||||
case TextFile:
|
||||
case AvroDataFile:
|
||||
Integer colType = columnTypes.get(col);
|
||||
hiveColType = getHiveColumnTypeForTextTable(userMapping, col, colType);
|
||||
} else if (options.getFileLayout() == SqoopOptions.FileLayout.ParquetFile) {
|
||||
hiveColType = HiveTypes.toHiveType(columnNameToAvroFieldSchema.get(col), options);
|
||||
} else {
|
||||
throw new RuntimeException("File format is not supported for Hive tables.");
|
||||
break;
|
||||
case ParquetFile:
|
||||
hiveColType =
|
||||
HiveTypes.toHiveType(columnNameToAvroFieldSchema.get(col), options);
|
||||
break;
|
||||
case BinaryFile:
|
||||
case SequenceFile:
|
||||
default:
|
||||
throw new RuntimeException(
|
||||
"File format is not supported for Hive tables: "
|
||||
+ options.getFileLayout());
|
||||
}
|
||||
|
||||
sb.append('`').append(col).append("` ").append(hiveColType);
|
||||
@ -208,9 +218,14 @@ public String getCreateTableStmt() throws IOException {
|
||||
.append(" STRING) ");
|
||||
}
|
||||
|
||||
if (SqoopOptions.FileLayout.ParquetFile.equals(options.getFileLayout())) {
|
||||
switch(options.getFileLayout()) {
|
||||
case ParquetFile:
|
||||
sb.append("STORED AS PARQUET");
|
||||
} else {
|
||||
break;
|
||||
case AvroDataFile:
|
||||
sb.append("STORED AS AVRO");
|
||||
break;
|
||||
case TextFile: {
|
||||
sb.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '");
|
||||
sb.append(getHiveOctalCharCode((int) options.getOutputFieldDelim()));
|
||||
sb.append("' LINES TERMINATED BY '");
|
||||
@ -226,14 +241,21 @@ public String getCreateTableStmt() throws IOException {
|
||||
sb.append("' STORED AS TEXTFILE");
|
||||
}
|
||||
}
|
||||
break;
|
||||
default:
|
||||
throw new RuntimeException(
|
||||
"File format is not supported for Hive tables: "
|
||||
+ options.getFileLayout());
|
||||
}
|
||||
|
||||
if (isHiveExternalTableSet) {
|
||||
// add location
|
||||
sb.append(" LOCATION '"+options.getHiveExternalTableDir()+"'");
|
||||
}
|
||||
|
||||
LOG.debug("Create statement: " + sb.toString());
|
||||
return sb.toString();
|
||||
final String tableCreateStatement = sb.toString();
|
||||
LOG.debug("Create statement: " + tableCreateStatement);
|
||||
return tableCreateStatement;
|
||||
}
|
||||
|
||||
private Map<String, Schema> getColumnNameToAvroTypeMapping() {
|
||||
|
@ -1604,12 +1604,6 @@ protected void validateHiveOptions(SqoopOptions options)
|
||||
HCATALOG_EXTERNAL_TABLE_ARG, CREATE_HCATALOG_TABLE_ARG, DROP_AND_CREATE_HCATALOG_TABLE));
|
||||
}
|
||||
|
||||
if (options.doHiveImport()
|
||||
&& options.getFileLayout() == SqoopOptions.FileLayout.AvroDataFile) {
|
||||
throw new InvalidOptionsException("Hive import is not compatible with "
|
||||
+ "importing into AVRO format.");
|
||||
}
|
||||
|
||||
if (options.doHiveImport()
|
||||
&& options.getFileLayout() == SqoopOptions.FileLayout.SequenceFile) {
|
||||
throw new InvalidOptionsException("Hive import is not compatible with "
|
||||
|
576
src/test/org/apache/sqoop/hive/TestHiveAvroImport.java
Normal file
576
src/test/org/apache/sqoop/hive/TestHiveAvroImport.java
Normal file
@ -0,0 +1,576 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.sqoop.hive;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.File;
|
||||
import java.io.FileReader;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.apache.sqoop.SqoopOptions;
|
||||
import org.apache.sqoop.SqoopOptions.InvalidOptionsException;
|
||||
import org.apache.sqoop.testutil.CommonArgs;
|
||||
import org.apache.sqoop.testutil.ImportJobTestCase;
|
||||
import org.apache.sqoop.tool.BaseSqoopTool;
|
||||
import org.apache.sqoop.tool.CodeGenTool;
|
||||
import org.apache.sqoop.tool.CreateHiveTableTool;
|
||||
import org.apache.sqoop.tool.ImportTool;
|
||||
import org.apache.sqoop.tool.SqoopTool;
|
||||
import org.apache.commons.cli.ParseException;
|
||||
import org.junit.rules.ExpectedException;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
/**
|
||||
* Test HiveImport capability after an import to HDFS.
|
||||
*/
|
||||
|
||||
public class TestHiveAvroImport extends ImportJobTestCase {
|
||||
|
||||
public static final Log LOG = LogFactory.getLog(
|
||||
TestHiveAvroImport.class.getName());
|
||||
|
||||
@Rule
|
||||
public ExpectedException thrown = ExpectedException.none();
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
super.setUp();
|
||||
HiveImport.setTestMode(true);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
super.tearDown();
|
||||
HiveImport.setTestMode(false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the expected number of columns in the table being manipulated
|
||||
* by the test. Under the hood, this sets the expected column names
|
||||
* to DATA_COLi for 0 <= i < numCols.
|
||||
* @param numCols the number of columns to be created.
|
||||
*/
|
||||
protected void setNumCols(int numCols) {
|
||||
String [] cols = new String[numCols];
|
||||
for (int i = 0; i < numCols; i++) {
|
||||
cols[i] = "DATA_COL" + i;
|
||||
}
|
||||
|
||||
setColNames(cols);
|
||||
}
|
||||
|
||||
protected String[] getTypes() {
|
||||
String[] types = { "VARCHAR(32)", "INTEGER", "CHAR(64)" };
|
||||
return types;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the argv to pass to Sqoop.
|
||||
* @return the argv as an array of strings.
|
||||
*/
|
||||
protected String [] getArgv(boolean includeHadoopFlags, String [] moreArgs) {
|
||||
ArrayList<String> args = new ArrayList<String>();
|
||||
|
||||
if (includeHadoopFlags) {
|
||||
CommonArgs.addHadoopFlags(args);
|
||||
}
|
||||
|
||||
if (null != moreArgs) {
|
||||
for (String arg: moreArgs) {
|
||||
args.add(arg);
|
||||
}
|
||||
}
|
||||
|
||||
args.add("--table");
|
||||
args.add(getTableName());
|
||||
args.add("--warehouse-dir");
|
||||
args.add(getWarehouseDir());
|
||||
args.add("--connect");
|
||||
args.add(getConnectString());
|
||||
args.add("--hive-import");
|
||||
String [] colNames = getColNames();
|
||||
if (null != colNames) {
|
||||
args.add("--split-by");
|
||||
args.add(colNames[0]);
|
||||
} else {
|
||||
fail("Could not determine column names.");
|
||||
}
|
||||
|
||||
args.add("--num-mappers");
|
||||
args.add("1");
|
||||
|
||||
for (String a : args) {
|
||||
LOG.debug("ARG : "+ a);
|
||||
}
|
||||
|
||||
return args.toArray(new String[0]);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the argv to supply to a create-table only job for Hive imports.
|
||||
*/
|
||||
protected String [] getCreateTableArgv(boolean includeHadoopFlags,
|
||||
String [] moreArgs) {
|
||||
|
||||
ArrayList<String> args = new ArrayList<String>();
|
||||
|
||||
if (null != moreArgs) {
|
||||
for (String arg: moreArgs) {
|
||||
args.add(arg);
|
||||
}
|
||||
}
|
||||
|
||||
args.add("--table");
|
||||
args.add(getTableName());
|
||||
args.add("--connect");
|
||||
args.add(getConnectString());
|
||||
|
||||
return args.toArray(new String[0]);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the argv to supply to a code-gen only job for Hive imports.
|
||||
*/
|
||||
protected String [] getCodeGenArgs() {
|
||||
ArrayList<String> args = new ArrayList<String>();
|
||||
|
||||
args.add("--table");
|
||||
args.add(getTableName());
|
||||
args.add("--connect");
|
||||
args.add(getConnectString());
|
||||
args.add("--hive-import");
|
||||
|
||||
return args.toArray(new String[0]);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the argv to supply to a ddl-executing-only job for Hive imports.
|
||||
*/
|
||||
protected String [] getCreateHiveTableArgs(String [] extraArgs) {
|
||||
ArrayList<String> args = new ArrayList<String>();
|
||||
|
||||
args.add("--table");
|
||||
args.add(getTableName());
|
||||
args.add("--connect");
|
||||
args.add(getConnectString());
|
||||
|
||||
if (null != extraArgs) {
|
||||
for (String arg : extraArgs) {
|
||||
args.add(arg);
|
||||
}
|
||||
}
|
||||
|
||||
return args.toArray(new String[0]);
|
||||
}
|
||||
|
||||
private SqoopOptions getSqoopOptions(String [] args, SqoopTool tool) {
|
||||
SqoopOptions opts = null;
|
||||
try {
|
||||
opts = tool.parseArguments(args, null, null, true);
|
||||
} catch (Exception e) {
|
||||
fail("Invalid options: " + e.toString());
|
||||
}
|
||||
|
||||
return opts;
|
||||
}
|
||||
|
||||
private void runImportTest(String tableName, String [] types,
|
||||
String [] values, String verificationScript, String [] args,
|
||||
SqoopTool tool) throws IOException {
|
||||
|
||||
// create a table and populate it with a row...
|
||||
createTableWithColTypes(types, values);
|
||||
|
||||
// set up our mock hive shell to compare our generated script
|
||||
// against the correct expected one.
|
||||
SqoopOptions options = getSqoopOptions(args, tool);
|
||||
String hiveHome = options.getHiveHome();
|
||||
assertNotNull("hive.home was not set", hiveHome);
|
||||
String testDataPath = new Path(new Path(hiveHome),
|
||||
"scripts/" + verificationScript).toString();
|
||||
System.setProperty("expected.script",
|
||||
new File(testDataPath).getAbsolutePath());
|
||||
|
||||
// verify that we can import it correctly into hive.
|
||||
runImport(tool, args);
|
||||
}
|
||||
|
||||
/** Test that we can generate a file containing the DDL and not import. */
|
||||
@Test
|
||||
public void testGenerateOnly() throws IOException {
|
||||
final String TABLE_NAME = "GenerateOnly";
|
||||
setCurTableName(TABLE_NAME);
|
||||
setNumCols(1);
|
||||
|
||||
// Figure out where our target generated .q file is going to be.
|
||||
SqoopOptions options = getSqoopOptions(getArgv(false, null),
|
||||
new ImportTool());
|
||||
Path ddlFile = new Path(new Path(options.getCodeOutputDir()),
|
||||
TABLE_NAME + ".q");
|
||||
FileSystem fs = FileSystem.getLocal(new Configuration());
|
||||
|
||||
// If it's already there, remove it before running the test to ensure
|
||||
// that it's the current test that generated the file.
|
||||
if (fs.exists(ddlFile)) {
|
||||
if (!fs.delete(ddlFile, false)) {
|
||||
LOG.warn("Could not delete previous ddl file: " + ddlFile);
|
||||
}
|
||||
}
|
||||
|
||||
// Run a basic import, but specify that we're just generating definitions.
|
||||
String [] types = { "INTEGER" };
|
||||
String [] vals = { "42" };
|
||||
runImportTest(TABLE_NAME, types, vals, null, getCodeGenArgs(),
|
||||
new CodeGenTool());
|
||||
|
||||
// Test that the generated definition file exists.
|
||||
assertTrue("Couldn't find expected ddl file", fs.exists(ddlFile));
|
||||
|
||||
Path hiveImportPath = new Path(new Path(options.getWarehouseDir()),
|
||||
TABLE_NAME);
|
||||
assertFalse("Import actually happened!", fs.exists(hiveImportPath));
|
||||
}
|
||||
|
||||
/** Test that strings and ints are handled in the normal fashion. */
|
||||
@Test
|
||||
public void testNormalHiveImport() throws IOException {
|
||||
final String TABLE_NAME = "NORMAL_HIVE_IMPORT";
|
||||
setCurTableName(TABLE_NAME);
|
||||
setNumCols(3);
|
||||
String [] types = { "VARCHAR(32)", "INTEGER", "CHAR(64)" };
|
||||
String [] vals = { "'test'", "42", "'somestring'" };
|
||||
runImportTest(TABLE_NAME, types, vals, "normalImport.q",
|
||||
getArgv(false, null), new ImportTool());
|
||||
}
|
||||
|
||||
/** Test that table is created in hive with no data import. */
|
||||
@Test
|
||||
public void testCreateOnlyHiveImport() throws IOException {
|
||||
final String TABLE_NAME = "CREATE_ONLY_HIVE_IMPORT";
|
||||
setCurTableName(TABLE_NAME);
|
||||
setNumCols(3);
|
||||
String [] types = { "VARCHAR(32)", "INTEGER", "CHAR(64)" };
|
||||
String [] vals = { "'test'", "42", "'somestring'" };
|
||||
runImportTest(TABLE_NAME, types, vals,
|
||||
"createOnlyImport.q", getCreateHiveTableArgs(null),
|
||||
new CreateHiveTableTool());
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that table is created in hive and replaces the existing table if
|
||||
* any.
|
||||
*/
|
||||
@Test
|
||||
public void testCreateOverwriteHiveImport() throws IOException {
|
||||
final String TABLE_NAME = "CREATE_OVERWRITE_HIVE_IMPORT";
|
||||
setCurTableName(TABLE_NAME);
|
||||
setNumCols(3);
|
||||
String [] types = { "VARCHAR(32)", "INTEGER", "CHAR(64)" };
|
||||
String [] vals = { "'test'", "42", "'somestring'" };
|
||||
String [] extraArgs = {"--hive-overwrite", "--create-hive-table"};
|
||||
runImportTest(TABLE_NAME, types, vals,
|
||||
"createOverwriteImport.q", getCreateHiveTableArgs(extraArgs),
|
||||
new CreateHiveTableTool());
|
||||
runImportTest(TABLE_NAME, types, vals,
|
||||
"createOverwriteImport.q", getCreateHiveTableArgs(extraArgs),
|
||||
new CreateHiveTableTool());
|
||||
}
|
||||
|
||||
/** Test that dates are coerced properly to strings. */
|
||||
@Test
|
||||
public void testDate() throws IOException {
|
||||
final String TABLE_NAME = "DATE_HIVE_IMPORT";
|
||||
setCurTableName(TABLE_NAME);
|
||||
setNumCols(2);
|
||||
String [] types = { "VARCHAR(32)", "DATE" };
|
||||
String [] vals = { "'test'", "'2009-05-12'" };
|
||||
runImportTest(TABLE_NAME, types, vals, "dateImport.q",
|
||||
getArgv(false, null), new ImportTool());
|
||||
}
|
||||
|
||||
/** Test that NUMERICs are coerced to doubles. */
|
||||
@Test
|
||||
public void testNumeric() throws IOException {
|
||||
final String TABLE_NAME = "NUMERIC_HIVE_IMPORT";
|
||||
setCurTableName(TABLE_NAME);
|
||||
setNumCols(2);
|
||||
String [] types = { "NUMERIC", "CHAR(64)" };
|
||||
String [] vals = { "3.14159", "'foo'" };
|
||||
runImportTest(TABLE_NAME, types, vals, "numericImport.q",
|
||||
getArgv(false, null), new ImportTool());
|
||||
}
|
||||
|
||||
/** Test that DECIMALS using --map-column-hive option maps can run without issues. */
|
||||
@Test
|
||||
public void testDecimalMapColumnHive() throws IOException {
|
||||
final String TABLE_NAME = "DECIMAL_MAP_HIVE_IMPORT";
|
||||
setCurTableName(TABLE_NAME);
|
||||
setNumCols(2);
|
||||
String [] types = { "NUMERIC", "CHAR(64)" };
|
||||
String [] vals = { "12343.14159", "'foo'" };
|
||||
|
||||
ArrayList<String> args = new ArrayList<String>();
|
||||
args.add("--map-column-hive");
|
||||
args.add(BASE_COL_NAME + "0=DECIMAL(10,10)");
|
||||
|
||||
runImportTest(TABLE_NAME, types, vals, "decimalMapImport.q",
|
||||
getArgv(false, args.toArray(new String[args.size()])), new ImportTool());
|
||||
}
|
||||
|
||||
/** If bin/hive returns an error exit status, we should get an IOException. */
|
||||
@Test
|
||||
public void testHiveExitFails() throws IOException {
|
||||
// The expected script is different than the one which would be generated
|
||||
// by this, so we expect an IOException out.
|
||||
final String TABLE_NAME = "FAILING_HIVE_IMPORT";
|
||||
setCurTableName(TABLE_NAME);
|
||||
setNumCols(2);
|
||||
String [] types = { "NUMERIC", "CHAR(64)" };
|
||||
String [] vals = { "3.14159", "'foo'" };
|
||||
|
||||
thrown.expect(IOException.class);
|
||||
thrown.reportMissingExceptionWithMessage("Expected IOException on erroneous Hive exit status");
|
||||
runImportTest(TABLE_NAME, types, vals, "failingImport.q",
|
||||
getArgv(false, null), new ImportTool());
|
||||
}
|
||||
|
||||
/** Test that we can set delimiters how we want them. */
|
||||
@Test
|
||||
public void testCustomDelimiters() throws IOException {
|
||||
final String TABLE_NAME = "CUSTOM_DELIM_IMPORT";
|
||||
setCurTableName(TABLE_NAME);
|
||||
setNumCols(3);
|
||||
String [] types = { "VARCHAR(32)", "INTEGER", "CHAR(64)" };
|
||||
String [] vals = { "'test'", "42", "'somestring'" };
|
||||
String [] extraArgs = {
|
||||
"--fields-terminated-by", ",",
|
||||
"--lines-terminated-by", "|",
|
||||
};
|
||||
runImportTest(TABLE_NAME, types, vals, "customDelimImport.q",
|
||||
getArgv(false, extraArgs), new ImportTool());
|
||||
}
|
||||
|
||||
/**
|
||||
* Test hive import with row that has new line in it.
|
||||
*/
|
||||
@Test
|
||||
public void testFieldWithHiveDelims() throws IOException,
|
||||
InterruptedException {
|
||||
final String TABLE_NAME = "FIELD_WITH_NL_HIVE_IMPORT";
|
||||
|
||||
LOG.info("Doing import of single row into FIELD_WITH_NL_HIVE_IMPORT table");
|
||||
setCurTableName(TABLE_NAME);
|
||||
setNumCols(3);
|
||||
String[] types = getTypes();
|
||||
String[] vals = { "'test with \n new lines \n'", "42",
|
||||
"'oh no " + '\01' + " field delims " + '\01' + "'", };
|
||||
String[] moreArgs = { "--"+ BaseSqoopTool.HIVE_DROP_DELIMS_ARG };
|
||||
|
||||
runImportTest(TABLE_NAME, types, vals, "fieldWithNewlineImport.q",
|
||||
getArgv(false, moreArgs), new ImportTool());
|
||||
|
||||
LOG.info("Validating data in single row is present in: "
|
||||
+ "FIELD_WITH_NL_HIVE_IMPORT table");
|
||||
|
||||
// Ideally, we would actually invoke hive code to verify that record with
|
||||
// record and field delimiters have values replaced and that we have the
|
||||
// proper number of hive records. Unfortunately, this is a non-trivial task,
|
||||
// and better dealt with at an integration test level
|
||||
//
|
||||
// Instead, this assumes the path of the generated table and just validate
|
||||
// map job output.
|
||||
|
||||
// Get and read the raw output file
|
||||
String whDir = getWarehouseDir();
|
||||
File p = new File(new File(whDir, TABLE_NAME), "part-m-00000");
|
||||
File f = new File(p.toString());
|
||||
FileReader fr = new FileReader(f);
|
||||
BufferedReader br = new BufferedReader(fr);
|
||||
try {
|
||||
// verify the output
|
||||
assertEquals(br.readLine(), "test with new lines " + '\01' + "42"
|
||||
+ '\01' + "oh no field delims ");
|
||||
assertEquals(br.readLine(), null); // should only be one line
|
||||
} catch (IOException ioe) {
|
||||
fail("Unable to read files generated from hive");
|
||||
} finally {
|
||||
br.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test hive import with row that has new line in it.
|
||||
*/
|
||||
@Test
|
||||
public void testFieldWithHiveDelimsReplacement() throws IOException,
|
||||
InterruptedException {
|
||||
final String TABLE_NAME = "FIELD_WITH_NL_REPLACEMENT_HIVE_IMPORT";
|
||||
|
||||
LOG.info("Doing import of single row into "
|
||||
+ "FIELD_WITH_NL_REPLACEMENT_HIVE_IMPORT table");
|
||||
setCurTableName(TABLE_NAME);
|
||||
setNumCols(3);
|
||||
String[] types = getTypes();
|
||||
String[] vals = { "'test with\nnew lines\n'", "42",
|
||||
"'oh no " + '\01' + " field delims " + '\01' + "'", };
|
||||
String[] moreArgs = { "--"+BaseSqoopTool.HIVE_DELIMS_REPLACEMENT_ARG, " "};
|
||||
|
||||
runImportTest(TABLE_NAME, types, vals,
|
||||
"fieldWithNewlineReplacementImport.q", getArgv(false, moreArgs),
|
||||
new ImportTool());
|
||||
|
||||
LOG.info("Validating data in single row is present in: "
|
||||
+ "FIELD_WITH_NL_REPLACEMENT_HIVE_IMPORT table");
|
||||
|
||||
// Ideally, we would actually invoke hive code to verify that record with
|
||||
// record and field delimiters have values replaced and that we have the
|
||||
// proper number of hive records. Unfortunately, this is a non-trivial task,
|
||||
// and better dealt with at an integration test level
|
||||
//
|
||||
// Instead, this assumes the path of the generated table and just validate
|
||||
// map job output.
|
||||
|
||||
// Get and read the raw output file
|
||||
String whDir = getWarehouseDir();
|
||||
File p = new File(new File(whDir, TABLE_NAME), "part-m-00000");
|
||||
File f = new File(p.toString());
|
||||
FileReader fr = new FileReader(f);
|
||||
BufferedReader br = new BufferedReader(fr);
|
||||
try {
|
||||
// verify the output
|
||||
assertEquals(br.readLine(), "test with new lines " + '\01' + "42"
|
||||
+ '\01' + "oh no field delims ");
|
||||
assertEquals(br.readLine(), null); // should only be one line
|
||||
} catch (IOException ioe) {
|
||||
fail("Unable to read files generated from hive");
|
||||
} finally {
|
||||
br.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test hive drop and replace option validation.
|
||||
*/
|
||||
@Test
|
||||
public void testHiveDropAndReplaceOptionValidation() throws ParseException, InvalidOptionsException {
|
||||
LOG.info("Testing conflicting Hive delimiter drop/replace options");
|
||||
|
||||
setNumCols(3);
|
||||
String[] moreArgs = { "--"+BaseSqoopTool.HIVE_DELIMS_REPLACEMENT_ARG, " ",
|
||||
"--"+BaseSqoopTool.HIVE_DROP_DELIMS_ARG, };
|
||||
|
||||
ImportTool tool = new ImportTool();
|
||||
|
||||
thrown.expect(InvalidOptionsException.class);
|
||||
thrown.reportMissingExceptionWithMessage("Expected InvalidOptionsException with conflicting Hive delimiter " +
|
||||
"drop/replace options");
|
||||
tool.validateOptions(tool.parseArguments(getArgv(false, moreArgs), null,
|
||||
null, true));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test hive import with row that has new line in it.
|
||||
*/
|
||||
@Test
|
||||
public void testImportHiveWithPartitions() throws IOException,
|
||||
InterruptedException {
|
||||
final String TABLE_NAME = "PARTITION_HIVE_IMPORT";
|
||||
|
||||
LOG.info("Doing import of single row into PARTITION_HIVE_IMPORT table");
|
||||
setCurTableName(TABLE_NAME);
|
||||
setNumCols(3);
|
||||
String[] types = { "VARCHAR(32)", "INTEGER", "CHAR(64)", };
|
||||
String[] vals = { "'whoop'", "42", "'I am a row in a partition'", };
|
||||
String[] moreArgs = { "--" + BaseSqoopTool.HIVE_PARTITION_KEY_ARG, "ds",
|
||||
"--" + BaseSqoopTool.HIVE_PARTITION_VALUE_ARG, "20110413", };
|
||||
|
||||
runImportTest(TABLE_NAME, types, vals, "partitionImport.q",
|
||||
getArgv(false, moreArgs), new ImportTool());
|
||||
}
|
||||
|
||||
/**
|
||||
* If partition key is set to one of importing columns, we should get an
|
||||
* IOException.
|
||||
* */
|
||||
@Test
|
||||
public void testImportWithBadPartitionKey() throws IOException {
|
||||
final String TABLE_NAME = "FAILING_PARTITION_HIVE_IMPORT";
|
||||
|
||||
LOG.info("Doing import of single row into " + TABLE_NAME + " table");
|
||||
setCurTableName(TABLE_NAME);
|
||||
setNumCols(3);
|
||||
String[] types = { "VARCHAR(32)", "INTEGER", "CHAR(64)", };
|
||||
String[] vals = { "'key'", "42", "'I am a row in a partition'", };
|
||||
|
||||
String partitionKey = getColNames()[0];
|
||||
|
||||
// Specify 1st column as partition key and import every column of the
|
||||
// table by default (i.e. no --columns option).
|
||||
String[] moreArgs1 = {
|
||||
"--" + BaseSqoopTool.HIVE_PARTITION_KEY_ARG,
|
||||
partitionKey,
|
||||
};
|
||||
|
||||
// Specify 1st column as both partition key and importing column.
|
||||
String[] moreArgs2 = {
|
||||
"--" + BaseSqoopTool.HIVE_PARTITION_KEY_ARG,
|
||||
partitionKey,
|
||||
"--" + BaseSqoopTool.COLUMNS_ARG,
|
||||
partitionKey,
|
||||
};
|
||||
|
||||
// Test hive-import with the 1st args.
|
||||
thrown.expect(IOException.class);
|
||||
thrown.reportMissingExceptionWithMessage("Expected IOException during Hive import with partition key " +
|
||||
"as importing column");
|
||||
runImportTest(TABLE_NAME, types, vals, "partitionImport.q",
|
||||
getArgv(false, moreArgs1), new ImportTool());
|
||||
|
||||
// Test hive-import with the 2nd args.
|
||||
thrown.expect(IOException.class);
|
||||
thrown.reportMissingExceptionWithMessage("Expected IOException during Hive import with partition key " +
|
||||
"as importing column");
|
||||
runImportTest(TABLE_NAME, types, vals, "partitionImport.q",
|
||||
getArgv(false, moreArgs2), new ImportTool());
|
||||
|
||||
// Test create-hive-table with the 1st args.
|
||||
thrown.expect(IOException.class);
|
||||
thrown.reportMissingExceptionWithMessage("Expected IOException during Hive table creation with partition key " +
|
||||
"as importing column");
|
||||
runImportTest(TABLE_NAME, types, vals, "partitionImport.q",
|
||||
getCreateTableArgv(false, moreArgs1), new CreateHiveTableTool());
|
||||
}
|
||||
|
||||
}
|
@ -28,6 +28,7 @@
|
||||
import org.apache.sqoop.util.SqlTypeMap;
|
||||
|
||||
import org.apache.sqoop.SqoopOptions;
|
||||
import org.apache.sqoop.SqoopOptions.FileLayout;
|
||||
import org.apache.sqoop.testutil.HsqldbTestServer;
|
||||
|
||||
import org.junit.Before;
|
||||
@ -266,13 +267,31 @@ public void testGetCreateTableStmtDiscardsConnection() throws Exception {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetCreateTableStmtWithAvroSchema() throws Exception {
|
||||
public void testGetCreateParquetTableStmtWithAvroSchema() throws Exception {
|
||||
options.setFileLayout(ParquetFile);
|
||||
options.getConf().set(SQOOP_PARQUET_AVRO_SCHEMA_KEY, TEST_AVRO_SCHEMA);
|
||||
|
||||
assertEquals(EXPECTED_CREATE_PARQUET_TABLE_STMNT, writer.getCreateTableStmt());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetCreateAvroTableStmtWithAvroSchema() throws Exception {
|
||||
final String EXPECTED_CREATE_AVRO_TABLE_STMNT =
|
||||
"CREATE TABLE IF NOT EXISTS `outputTable` "
|
||||
+ "( `id` INT, `value` STRING) STORED AS AVRO";
|
||||
|
||||
Map<String, Integer> colTypes = new SqlTypeMap<String, Integer>();
|
||||
colTypes.put("id", Types.INTEGER);
|
||||
colTypes.put("value", Types.VARCHAR);
|
||||
|
||||
setUpMockConnManager(this.inputTable, colTypes);
|
||||
|
||||
options.setMapColumnHive("id=INT,value=STRING");
|
||||
options.setFileLayout(FileLayout.AvroDataFile);
|
||||
|
||||
assertEquals(EXPECTED_CREATE_AVRO_TABLE_STMNT, writer.getCreateTableStmt());
|
||||
}
|
||||
|
||||
private void setUpMockConnManager(String tableName, Map<String, Integer> typeMap) {
|
||||
when(connManager.getColumnTypes(tableName)).thenReturn(typeMap);
|
||||
when(connManager.getColumnNames(tableName)).thenReturn(typeMap.keySet().toArray(new String[]{}));
|
||||
|
Loading…
Reference in New Issue
Block a user