mirror of
https://github.com/apache/sqoop.git
synced 2025-05-03 01:31:04 +08:00
MAPREDUCE-1356. Allow user-specified hive table name in sqoop. Contributed by Aaron Kimball.
From: Thomas White <tomwhite@apache.org> git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1149855 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
69f04fff8b
commit
de836d714a
@ -101,6 +101,9 @@ Import control options
|
||||
--table (table-name)::
|
||||
The table to import
|
||||
|
||||
--hive-table (table-name)::
|
||||
When used with --hive-import, overrides the destination table name
|
||||
|
||||
--where (clause)::
|
||||
Import only the rows for which _clause_ is true.
|
||||
e.g.: `--where "user_id > 400 AND hidden == 0"`
|
||||
|
@ -46,6 +46,10 @@ to Hive. If you do not set any delimiters and do use +--hive-import+,
|
||||
the field delimiter will be set to +^A+ and the record delimiter will
|
||||
be set to +\n+ to be consistent with Hive's defaults.
|
||||
|
||||
The table name used in Hive is, by default, the same as that of the
|
||||
source table. You can control the output table name with the +--hive-table+
|
||||
option.
|
||||
|
||||
Hive's Type System
|
||||
~~~~~~~~~~~~~~~~~~
|
||||
|
||||
|
@ -124,7 +124,7 @@ private void importTable(String tableName) throws IOException, ImportException {
|
||||
|
||||
// If the user wants this table to be in Hive, perform that post-load.
|
||||
if (options.doHiveImport()) {
|
||||
hiveImport.importTable(tableName);
|
||||
hiveImport.importTable(tableName, options.getHiveTableName());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -100,6 +100,7 @@ public enum FileLayout {
|
||||
private String tmpDir; // where temp data goes; usually /tmp
|
||||
private String hiveHome;
|
||||
private boolean hiveImport;
|
||||
private String hiveTableName;
|
||||
private String packageName; // package to prepend to auto-named classes.
|
||||
private String className; // package+class to apply to individual table import.
|
||||
// also used as an *input* class with existingJarFile.
|
||||
@ -314,6 +315,8 @@ public static void printUsage() {
|
||||
System.out.println(" (Ignores --table, --columns and --split-by)");
|
||||
System.out.println("--hive-import If set, then import the table into Hive.");
|
||||
System.out.println(" (Uses Hive's default delimiters if none are set.)");
|
||||
System.out.println("--hive-table (tablename) Sets the table name to use when importing");
|
||||
System.out.println(" to hive.");
|
||||
System.out.println("-m, --num-mappers (n) Use 'n' map tasks to import in parallel");
|
||||
System.out.println("-z, --compress Enable compression");
|
||||
System.out.println("--direct-split-size (n) Split the input stream every 'n' bytes");
|
||||
@ -510,6 +513,8 @@ public void parse(String [] args) throws InvalidOptionsException {
|
||||
this.hiveHome = args[++i];
|
||||
} else if (args[i].equals("--hive-import")) {
|
||||
this.hiveImport = true;
|
||||
} else if (args[i].equals("--hive-table")) {
|
||||
this.hiveTableName = args[++i];
|
||||
} else if (args[i].equals("--num-mappers") || args[i].equals("-m")) {
|
||||
String numMappersStr = args[++i];
|
||||
this.numMappers = Integer.valueOf(numMappersStr);
|
||||
@ -626,6 +631,15 @@ public void validate() throws InvalidOptionsException {
|
||||
// If we're reading all tables, can't set individual class name
|
||||
throw new InvalidOptionsException("--class-name and --all-tables are incompatible options."
|
||||
+ HELP_STR);
|
||||
} else if (this.allTables && this.hiveTableName != null) {
|
||||
// If we're reading all tables, can't set hive target table name
|
||||
throw new InvalidOptionsException(
|
||||
"--hive-table and --all-tables are incompatible options."
|
||||
+ HELP_STR);
|
||||
} else if (this.hiveTableName != null && !this.hiveImport) {
|
||||
throw new InvalidOptionsException(
|
||||
"--hive-table is invalid without --hive-import"
|
||||
+ HELP_STR);
|
||||
} else if (this.connectString == null) {
|
||||
throw new InvalidOptionsException("Error: Required argument --connect is missing."
|
||||
+ HELP_STR);
|
||||
@ -935,6 +949,17 @@ public boolean shouldUseCompression() {
|
||||
return this.useCompression;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the name of the destination table when importing to Hive
|
||||
*/
|
||||
public String getHiveTableName( ) {
|
||||
if (null != this.hiveTableName) {
|
||||
return this.hiveTableName;
|
||||
} else {
|
||||
return this.tableName;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the file size to split by when using --direct mode.
|
||||
*/
|
||||
|
@ -104,11 +104,24 @@ private void removeTempLogs(String tableName) throws IOException {
|
||||
}
|
||||
}
|
||||
|
||||
public void importTable(String tableName) throws IOException {
|
||||
removeTempLogs(tableName);
|
||||
/**
|
||||
* Perform the import of data from an HDFS path to a Hive table.
|
||||
*
|
||||
* @param inputTableName the name of the table as loaded into HDFS
|
||||
* @param outputTableName the name of the table to create in Hive.
|
||||
*/
|
||||
public void importTable(String inputTableName, String outputTableName)
|
||||
throws IOException {
|
||||
removeTempLogs(inputTableName);
|
||||
|
||||
LOG.info("Loading uploaded data into Hive");
|
||||
|
||||
if (null == outputTableName) {
|
||||
outputTableName = inputTableName;
|
||||
}
|
||||
LOG.debug("Hive.inputTable: " + inputTableName);
|
||||
LOG.debug("Hive.outputTable: " + outputTableName);
|
||||
|
||||
// For testing purposes against our mock hive implementation,
|
||||
// if the sysproperty "expected.script" is set, we set the EXPECTED_SCRIPT
|
||||
// environment variable for the child hive process. We also disable
|
||||
@ -122,7 +135,8 @@ public void importTable(String tableName) throws IOException {
|
||||
}
|
||||
|
||||
// generate the HQL statements to run.
|
||||
TableDefWriter tableWriter = new TableDefWriter(options, connManager, tableName,
|
||||
TableDefWriter tableWriter = new TableDefWriter(options, connManager,
|
||||
inputTableName, outputTableName,
|
||||
configuration, !debugMode);
|
||||
String createTableStr = tableWriter.getCreateTableStmt() + ";\n";
|
||||
String loadDataStmtStr = tableWriter.getLoadDataStmt() + ";\n";
|
||||
|
@ -27,6 +27,7 @@
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Map;
|
||||
import java.util.Date;
|
||||
import java.text.DateFormat;
|
||||
@ -49,7 +50,8 @@ public class TableDefWriter {
|
||||
private SqoopOptions options;
|
||||
private ConnManager connManager;
|
||||
private Configuration configuration;
|
||||
private String tableName;
|
||||
private String inputTableName;
|
||||
private String outputTableName;
|
||||
private boolean commentsEnabled;
|
||||
|
||||
/**
|
||||
@ -62,28 +64,64 @@ public class TableDefWriter {
|
||||
* timestamp comment.
|
||||
*/
|
||||
public TableDefWriter(final SqoopOptions opts, final ConnManager connMgr,
|
||||
final String table, final Configuration config, final boolean withComments) {
|
||||
final String inputTable, final String outputTable,
|
||||
final Configuration config, final boolean withComments) {
|
||||
this.options = opts;
|
||||
this.connManager = connMgr;
|
||||
this.tableName = table;
|
||||
this.inputTableName = inputTable;
|
||||
this.outputTableName = outputTable;
|
||||
this.configuration = config;
|
||||
this.commentsEnabled = withComments;
|
||||
}
|
||||
|
||||
private Map<String, Integer> externalColTypes;
|
||||
|
||||
/**
|
||||
* Set the column type map to be used.
|
||||
* (dependency injection for testing; not used in production.)
|
||||
*/
|
||||
void setColumnTypes(Map<String, Integer> colTypes) {
|
||||
this.externalColTypes = colTypes;
|
||||
LOG.debug("Using test-controlled type map");
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the column names to import.
|
||||
*/
|
||||
private String [] getColumnNames() {
|
||||
String [] colNames = options.getColumns();
|
||||
if (null != colNames) {
|
||||
return colNames; // user-specified column names.
|
||||
} else if (null != externalColTypes) {
|
||||
// Test-injection column mapping. Extract the col names from this.
|
||||
ArrayList<String> keyList = new ArrayList<String>();
|
||||
for (String key : externalColTypes.keySet()) {
|
||||
keyList.add(key);
|
||||
}
|
||||
|
||||
return keyList.toArray(new String[keyList.size()]);
|
||||
} else {
|
||||
return connManager.getColumnNames(inputTableName);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the CREATE TABLE statement for the table to load into hive.
|
||||
*/
|
||||
public String getCreateTableStmt() throws IOException {
|
||||
Map<String, Integer> columnTypes = connManager.getColumnTypes(tableName);
|
||||
Map<String, Integer> columnTypes;
|
||||
|
||||
String [] colNames = options.getColumns();
|
||||
if (null == colNames) {
|
||||
colNames = connManager.getColumnNames(tableName);
|
||||
if (externalColTypes != null) {
|
||||
// Use pre-defined column types.
|
||||
columnTypes = externalColTypes;
|
||||
} else {
|
||||
// Get these from the database.
|
||||
columnTypes = connManager.getColumnTypes(inputTableName);
|
||||
}
|
||||
|
||||
String [] colNames = getColumnNames();
|
||||
StringBuilder sb = new StringBuilder();
|
||||
|
||||
sb.append("CREATE TABLE " + tableName + " ( ");
|
||||
sb.append("CREATE TABLE " + outputTableName + " ( ");
|
||||
|
||||
boolean first = true;
|
||||
for (String col : colNames) {
|
||||
@ -138,7 +176,7 @@ public String getLoadDataStmt() throws IOException {
|
||||
warehouseDir = warehouseDir + File.separator;
|
||||
}
|
||||
|
||||
String tablePath = warehouseDir + tableName;
|
||||
String tablePath = warehouseDir + inputTableName;
|
||||
FileSystem fs = FileSystem.get(configuration);
|
||||
Path finalPath = new Path(tablePath).makeQualified(fs);
|
||||
String finalPathStr = finalPath.toString();
|
||||
@ -147,7 +185,7 @@ public String getLoadDataStmt() throws IOException {
|
||||
sb.append("LOAD DATA INPATH '");
|
||||
sb.append(finalPathStr);
|
||||
sb.append("' INTO TABLE ");
|
||||
sb.append(tableName);
|
||||
sb.append(outputTableName);
|
||||
|
||||
LOG.debug("Load statement: " + sb.toString());
|
||||
return sb.toString();
|
||||
|
@ -18,8 +18,13 @@
|
||||
|
||||
package org.apache.hadoop.sqoop.hive;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.sqoop.SqoopOptions;
|
||||
import org.junit.Test;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
@ -29,8 +34,7 @@
|
||||
*/
|
||||
public class TestTableDefWriter extends TestCase {
|
||||
|
||||
public static final Log LOG = LogFactory.getLog(TestHiveImport.class.getName());
|
||||
|
||||
public static final Log LOG = LogFactory.getLog(TestTableDefWriter.class.getName());
|
||||
|
||||
// Test getHiveOctalCharCode and expect an IllegalArgumentException.
|
||||
private void expectExceptionInCharCode(int charCode) {
|
||||
@ -52,4 +56,25 @@ public void testHiveOctalCharCode() {
|
||||
expectExceptionInCharCode(0200);
|
||||
expectExceptionInCharCode(254);
|
||||
}
|
||||
|
||||
public void testDifferentTableNames() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
SqoopOptions options = new SqoopOptions();
|
||||
TableDefWriter writer = new TableDefWriter(options, null,
|
||||
"inputTable", "outputTable", conf, false);
|
||||
|
||||
Map<String, Integer> colTypes = new HashMap<String, Integer>();
|
||||
writer.setColumnTypes(colTypes);
|
||||
|
||||
String createTable = writer.getCreateTableStmt();
|
||||
String loadData = writer.getLoadDataStmt();
|
||||
|
||||
LOG.debug("Create table stmt: " + createTable);
|
||||
LOG.debug("Load data stmt: " + loadData);
|
||||
|
||||
// Assert that the statements generated have the form we expect.
|
||||
assertTrue(createTable.indexOf("CREATE TABLE outputTable") != -1);
|
||||
assertTrue(loadData.indexOf("INTO TABLE outputTable") != -1);
|
||||
assertTrue(loadData.indexOf("/inputTable'") != -1);
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user