mirror of
https://github.com/apache/sqoop.git
synced 2025-05-02 15:31:33 +08:00
HADOOP-5887. Sqoop should create tables in Hive metastore after importing to HDFS. Contributed by Aaron Kimball.
From: Thomas White <tomwhite@apache.org> git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1149807 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
bf65299ba1
commit
dc191132a5
@ -70,6 +70,12 @@ to call at top-level: ant deploy-contrib compile-core-test
|
||||
<sysproperty key="hadoop.test.localoutputfile" value="${hadoop.test.localoutputfile}"/>
|
||||
<sysproperty key="hadoop.log.dir" value="${hadoop.log.dir}"/>
|
||||
|
||||
<!-- we have a mock "hive" shell instance in our testdata directory
|
||||
for testing hive integration. Set this property here to ensure
|
||||
that the unit tests pick it up.
|
||||
-->
|
||||
<sysproperty key="hive.home" value="${basedir}/testdata/hive" />
|
||||
|
||||
<!-- tools.jar from Sun JDK also required to invoke javac. -->
|
||||
<classpath>
|
||||
<path refid="test.classpath"/>
|
||||
|
@ -93,8 +93,9 @@ public enum FileLayout {
|
||||
private String warehouseDir;
|
||||
private FileLayout layout;
|
||||
private boolean local; // if true and conn is mysql, use mysqldump.
|
||||
|
||||
private String tmpDir; // where temp data goes; usually /tmp
|
||||
private String hiveHome;
|
||||
private boolean hiveImport;
|
||||
|
||||
private static final String DEFAULT_CONFIG_FILE = "sqoop.properties";
|
||||
|
||||
@ -138,11 +139,17 @@ private void loadFromProperties() {
|
||||
this.orderByCol = props.getProperty("db.sort.column", this.orderByCol);
|
||||
this.driverClassName = props.getProperty("jdbc.driver", this.driverClassName);
|
||||
this.warehouseDir = props.getProperty("hdfs.warehouse.dir", this.warehouseDir);
|
||||
this.hiveHome = props.getProperty("hive.home", this.hiveHome);
|
||||
|
||||
String localImport = props.getProperty("local.import",
|
||||
Boolean.toString(this.local)).toLowerCase();
|
||||
this.local = "true".equals(localImport) || "yes".equals(localImport)
|
||||
|| "1".equals(localImport);
|
||||
|
||||
String hiveImportStr = props.getProperty("hive.import",
|
||||
Boolean.toString(this.hiveImport)).toLowerCase();
|
||||
this.hiveImport = "true".equals(hiveImportStr) || "yes".equals(hiveImportStr)
|
||||
|| "1".equals(hiveImportStr);
|
||||
} catch (IOException ioe) {
|
||||
LOG.error("Could not read properties file " + DEFAULT_CONFIG_FILE + ": " + ioe.toString());
|
||||
} finally {
|
||||
@ -156,11 +163,25 @@ private void loadFromProperties() {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the temp directory to use; this is guaranteed to end with
|
||||
* the file separator character (e.g., '/')
|
||||
*/
|
||||
public String getTempDir() {
|
||||
return this.tmpDir;
|
||||
}
|
||||
|
||||
private void initDefaults() {
|
||||
// first, set the true defaults if nothing else happens.
|
||||
// default action is to run the full pipeline.
|
||||
this.action = ControlAction.FullImport;
|
||||
this.hadoopHome = System.getenv("HADOOP_HOME");
|
||||
|
||||
// Set this with $HIVE_HOME, but -Dhive.home can override.
|
||||
this.hiveHome = System.getenv("HIVE_HOME");
|
||||
this.hiveHome = System.getProperty("hive.home", this.hiveHome);
|
||||
|
||||
// Set this to cwd, but -Dsqoop.src.dir can override.
|
||||
this.codeOutputDir = System.getProperty("sqoop.src.dir", ".");
|
||||
|
||||
String myTmpDir = System.getProperty("test.build.data", "/tmp/");
|
||||
@ -193,11 +214,13 @@ public static void printUsage() {
|
||||
System.out.println("--columns (col,col,col...) Columns to export from table");
|
||||
System.out.println("--order-by (column-name) Column of the table used to order results");
|
||||
System.out.println("--hadoop-home (dir) Override $HADOOP_HOME");
|
||||
System.out.println("--hive-home (dir) Override $HIVE_HOME");
|
||||
System.out.println("--warehouse-dir (dir) HDFS path for table destination");
|
||||
System.out.println("--as-sequencefile Imports data to SequenceFiles");
|
||||
System.out.println("--as-textfile Imports data as plain text (default)");
|
||||
System.out.println("--all-tables Import all tables in database");
|
||||
System.out.println(" (Ignores --table, --columns and --order-by)");
|
||||
System.out.println("--hive-import If set, then import the table into Hive");
|
||||
System.out.println("");
|
||||
System.out.println("Code generation options:");
|
||||
System.out.println("--outdir (dir) Output directory for generated code");
|
||||
@ -254,6 +277,10 @@ public void parse(String [] args) throws InvalidOptionsException {
|
||||
this.password = args[++i];
|
||||
} else if (args[i].equals("--hadoop-home")) {
|
||||
this.hadoopHome = args[++i];
|
||||
} else if (args[i].equals("--hive-home")) {
|
||||
this.hiveHome = args[++i];
|
||||
} else if (args[i].equals("--hive-import")) {
|
||||
this.hiveImport = true;
|
||||
} else if (args[i].equals("--outdir")) {
|
||||
this.codeOutputDir = args[++i];
|
||||
} else if (args[i].equals("--as-sequencefile")) {
|
||||
@ -358,6 +385,15 @@ public boolean isLocal() {
|
||||
return local;
|
||||
}
|
||||
|
||||
public String getHiveHome() {
|
||||
return hiveHome;
|
||||
}
|
||||
|
||||
/** @return true if we should import the table into Hive */
|
||||
public boolean doHiveImport() {
|
||||
return hiveImport;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return location where .java files go; guaranteed to end with '/'
|
||||
*/
|
||||
|
@ -26,6 +26,7 @@
|
||||
import org.apache.hadoop.util.Tool;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
|
||||
import org.apache.hadoop.sqoop.hive.HiveImport;
|
||||
import org.apache.hadoop.sqoop.manager.ConnManager;
|
||||
import org.apache.hadoop.sqoop.orm.ClassWriter;
|
||||
import org.apache.hadoop.sqoop.orm.CompilationManager;
|
||||
@ -42,6 +43,7 @@ public class Sqoop extends Configured implements Tool {
|
||||
|
||||
private ImportOptions options;
|
||||
private ConnManager manager;
|
||||
private HiveImport hiveImport;
|
||||
|
||||
public Sqoop() {
|
||||
}
|
||||
@ -69,12 +71,18 @@ private void importTable(String tableName) throws IOException, ImportError {
|
||||
String jarFile = null;
|
||||
|
||||
// Generate the ORM code for the tables.
|
||||
// TODO(aaron): Allow this to be bypassed if the user has already generated code
|
||||
// TODO(aaron): Allow this to be bypassed if the user has already generated code,
|
||||
// or if they're using a non-MapReduce import method (e.g., mysqldump).
|
||||
jarFile = generateORM(tableName);
|
||||
|
||||
if (options.getAction() == ImportOptions.ControlAction.FullImport) {
|
||||
// Proceed onward to do the import.
|
||||
manager.importTable(tableName, jarFile, getConf());
|
||||
|
||||
// If the user wants this table to be in Hive, perform that post-load.
|
||||
if (options.doHiveImport()) {
|
||||
hiveImport.importTable(tableName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -101,6 +109,10 @@ public int run(String [] args) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (options.doHiveImport()) {
|
||||
hiveImport = new HiveImport(options, manager, getConf());
|
||||
}
|
||||
|
||||
ImportOptions.ControlAction action = options.getAction();
|
||||
if (action == ImportOptions.ControlAction.ListTables) {
|
||||
String [] tables = manager.listTables();
|
||||
|
177
src/java/org/apache/hadoop/sqoop/hive/HiveImport.java
Normal file
177
src/java/org/apache/hadoop/sqoop/hive/HiveImport.java
Normal file
@ -0,0 +1,177 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.sqoop.hive;
|
||||
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.OutputStreamWriter;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.sqoop.ImportOptions;
|
||||
import org.apache.hadoop.sqoop.manager.ConnManager;
|
||||
import org.apache.hadoop.sqoop.util.Executor;
|
||||
import org.apache.hadoop.sqoop.util.LoggingStreamHandlerFactory;
|
||||
|
||||
/**
|
||||
* Utility to import a table into the Hive metastore. Manages the connection
|
||||
* to Hive itself as well as orchestrating the use of the other classes in this
|
||||
* package.
|
||||
*/
|
||||
public class HiveImport {
|
||||
|
||||
public static final Log LOG = LogFactory.getLog(HiveImport.class.getName());
|
||||
|
||||
private ImportOptions options;
|
||||
private ConnManager connManager;
|
||||
private Configuration configuration;
|
||||
|
||||
public HiveImport(final ImportOptions opts, final ConnManager connMgr, final Configuration conf) {
|
||||
this.options = opts;
|
||||
this.connManager = connMgr;
|
||||
this.configuration = conf;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @return the filename of the hive executable to run to do the import
|
||||
*/
|
||||
private String getHiveBinPath() {
|
||||
// If the user has $HIVE_HOME set, then use $HIVE_HOME/bin/hive if it
|
||||
// exists.
|
||||
// Fall back to just plain 'hive' and hope it's in the path.
|
||||
|
||||
String hiveHome = options.getHiveHome();
|
||||
if (null == hiveHome) {
|
||||
return "hive";
|
||||
}
|
||||
|
||||
Path p = new Path(hiveHome);
|
||||
p = new Path(p, "bin");
|
||||
p = new Path(p, "hive");
|
||||
String hiveBinStr = p.toString();
|
||||
if (new File(hiveBinStr).exists()) {
|
||||
return hiveBinStr;
|
||||
} else {
|
||||
return "hive";
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* If we used a MapReduce-based upload of the data, remove the _logs dir
|
||||
* from where we put it, before running Hive LOAD DATA INPATH
|
||||
*/
|
||||
private void removeTempLogs(String tableName) throws IOException {
|
||||
FileSystem fs = FileSystem.get(configuration);
|
||||
String warehouseDir = options.getWarehouseDir();
|
||||
Path tablePath;
|
||||
if (warehouseDir != null) {
|
||||
tablePath = new Path(new Path(warehouseDir), tableName);
|
||||
} else {
|
||||
tablePath = new Path(tableName);
|
||||
}
|
||||
|
||||
Path logsPath = new Path(tablePath, "_logs");
|
||||
if (fs.exists(logsPath)) {
|
||||
LOG.info("Removing temporary files from import process: " + logsPath);
|
||||
if (!fs.delete(logsPath, true)) {
|
||||
LOG.warn("Could not delete temporary files; continuing with import, but it may fail.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void importTable(String tableName) throws IOException {
|
||||
removeTempLogs(tableName);
|
||||
|
||||
LOG.info("Loading uploaded data into Hive");
|
||||
|
||||
// For testing purposes against our mock hive implementation,
|
||||
// if the sysproperty "expected.script" is set, we set the EXPECTED_SCRIPT
|
||||
// environment variable for the child hive process. We also disable
|
||||
// timestamp comments so that we have deterministic table creation scripts.
|
||||
String expectedScript = System.getProperty("expected.script");
|
||||
List<String> env = Executor.getCurEnvpStrings();
|
||||
boolean debugMode = expectedScript != null;
|
||||
if (debugMode) {
|
||||
env.add("EXPECTED_SCRIPT=" + expectedScript);
|
||||
env.add("TMPDIR=" + options.getTempDir());
|
||||
}
|
||||
|
||||
// generate the HQL statements to run.
|
||||
TableDefWriter tableWriter = new TableDefWriter(options, connManager, tableName,
|
||||
configuration, !debugMode);
|
||||
String createTableStr = tableWriter.getCreateTableStmt() + ";\n";
|
||||
String loadDataStmtStr = tableWriter.getLoadDataStmt() + ";\n";
|
||||
|
||||
// write them to a script file.
|
||||
File tempFile = File.createTempFile("hive-script-",".txt", new File(options.getTempDir()));
|
||||
try {
|
||||
String tmpFilename = tempFile.toString();
|
||||
BufferedWriter w = null;
|
||||
try {
|
||||
FileOutputStream fos = new FileOutputStream(tempFile);
|
||||
w = new BufferedWriter(new OutputStreamWriter(fos));
|
||||
w.write(createTableStr, 0, createTableStr.length());
|
||||
w.write(loadDataStmtStr, 0, loadDataStmtStr.length());
|
||||
} catch (IOException ioe) {
|
||||
LOG.error("Error writing Hive load-in script: " + ioe.toString());
|
||||
ioe.printStackTrace();
|
||||
throw ioe;
|
||||
} finally {
|
||||
if (null != w) {
|
||||
try {
|
||||
w.close();
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("IOException closing stream to Hive script: " + ioe.toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// run Hive on the script and note the return code.
|
||||
String hiveExec = getHiveBinPath();
|
||||
ArrayList<String> args = new ArrayList<String>();
|
||||
args.add(hiveExec);
|
||||
args.add("-f");
|
||||
args.add(tmpFilename);
|
||||
|
||||
LoggingStreamHandlerFactory lshf = new LoggingStreamHandlerFactory(LOG);
|
||||
int ret = Executor.exec(args.toArray(new String[0]), env.toArray(new String[0]), lshf, lshf);
|
||||
if (0 != ret) {
|
||||
throw new IOException("Hive exited with status " + ret);
|
||||
}
|
||||
|
||||
LOG.info("Hive import complete.");
|
||||
} finally {
|
||||
if (!tempFile.delete()) {
|
||||
LOG.warn("Could not remove temporary file: " + tempFile.toString());
|
||||
// try to delete the file later.
|
||||
tempFile.deleteOnExit();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
95
src/java/org/apache/hadoop/sqoop/hive/HiveTypes.java
Normal file
95
src/java/org/apache/hadoop/sqoop/hive/HiveTypes.java
Normal file
@ -0,0 +1,95 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.sqoop.hive;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import java.sql.Types;
|
||||
|
||||
/**
|
||||
* Defines conversion between SQL types and Hive types.
|
||||
*/
|
||||
public class HiveTypes {
|
||||
|
||||
public static final Log LOG = LogFactory.getLog(HiveTypes.class.getName());
|
||||
|
||||
/**
|
||||
* Given JDBC SQL types coming from another database, what is the best
|
||||
* mapping to a Hive-specific type?
|
||||
*/
|
||||
public static String toHiveType(int sqlType) {
|
||||
if (sqlType == Types.INTEGER) {
|
||||
return "INT";
|
||||
} else if (sqlType == Types.VARCHAR) {
|
||||
return "STRING";
|
||||
} else if (sqlType == Types.CHAR) {
|
||||
return "STRING";
|
||||
} else if (sqlType == Types.LONGVARCHAR) {
|
||||
return "STRING";
|
||||
} else if (sqlType == Types.NUMERIC) {
|
||||
// Per suggestion on hive-user, this is converted to DOUBLE for now.
|
||||
return "DOUBLE";
|
||||
} else if (sqlType == Types.DECIMAL) {
|
||||
// Per suggestion on hive-user, this is converted to DOUBLE for now.
|
||||
return "DOUBLE";
|
||||
} else if (sqlType == Types.BIT) {
|
||||
return "BOOLEAN";
|
||||
} else if (sqlType == Types.BOOLEAN) {
|
||||
return "BOOLEAN";
|
||||
} else if (sqlType == Types.TINYINT) {
|
||||
return "TINYINT";
|
||||
} else if (sqlType == Types.SMALLINT) {
|
||||
return "INTEGER";
|
||||
} else if (sqlType == Types.BIGINT) {
|
||||
return "BIGINT";
|
||||
} else if (sqlType == Types.REAL) {
|
||||
return "DOUBLE";
|
||||
} else if (sqlType == Types.FLOAT) {
|
||||
return "DOUBLE";
|
||||
} else if (sqlType == Types.DOUBLE) {
|
||||
return "DOUBLE";
|
||||
} else if (sqlType == Types.DATE) {
|
||||
// unfortunate type coercion
|
||||
return "STRING";
|
||||
} else if (sqlType == Types.TIME) {
|
||||
// unfortunate type coercion
|
||||
return "STRING";
|
||||
} else if (sqlType == Types.TIMESTAMP) {
|
||||
// unfortunate type coercion
|
||||
return "STRING";
|
||||
} else {
|
||||
// TODO(aaron): Support BINARY, VARBINARY, LONGVARBINARY, DISTINCT, CLOB,
|
||||
// BLOB, ARRAY, STRUCT, REF, JAVA_OBJECT.
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if a sql type can't be translated to a precise match
|
||||
* in Hive, and we have to cast it to something more generic.
|
||||
*/
|
||||
public static boolean isHiveTypeImprovised(int sqlType) {
|
||||
return sqlType == Types.DATE || sqlType == Types.TIME
|
||||
|| sqlType == Types.TIMESTAMP
|
||||
|| sqlType == Types.DECIMAL
|
||||
|| sqlType == Types.NUMERIC;
|
||||
}
|
||||
}
|
||||
|
171
src/java/org/apache/hadoop/sqoop/hive/TableDefWriter.java
Normal file
171
src/java/org/apache/hadoop/sqoop/hive/TableDefWriter.java
Normal file
@ -0,0 +1,171 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.sqoop.hive;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
import org.apache.hadoop.sqoop.ImportOptions;
|
||||
import org.apache.hadoop.sqoop.manager.ConnManager;
|
||||
import org.apache.hadoop.sqoop.hive.HiveTypes;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.Date;
|
||||
import java.text.DateFormat;
|
||||
import java.text.SimpleDateFormat;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
/**
|
||||
* Creates (Hive-specific) SQL DDL statements to create tables to hold data
|
||||
* we're importing from another source.
|
||||
*
|
||||
* After we import the database into HDFS, we can inject it into Hive using
|
||||
* the CREATE TABLE and LOAD DATA INPATH statements generated by this object.
|
||||
*/
|
||||
public class TableDefWriter {
|
||||
|
||||
public static final Log LOG = LogFactory.getLog(TableDefWriter.class.getName());
|
||||
|
||||
private ImportOptions options;
|
||||
private ConnManager connManager;
|
||||
private Configuration configuration;
|
||||
private String tableName;
|
||||
private boolean commentsEnabled;
|
||||
|
||||
/**
|
||||
* Creates a new TableDefWriter to generate a Hive CREATE TABLE statement.
|
||||
* @param opts program-wide options
|
||||
* @param connMgr the connection manager used to describe the table.
|
||||
* @param table the name of the table to read.
|
||||
* @param config the Hadoop configuration to use to connect to the dfs
|
||||
* @param withComments if true, then tables will be created with a
|
||||
* timestamp comment.
|
||||
*/
|
||||
public TableDefWriter(final ImportOptions opts, final ConnManager connMgr,
|
||||
final String table, final Configuration config, final boolean withComments) {
|
||||
this.options = opts;
|
||||
this.connManager = connMgr;
|
||||
this.tableName = table;
|
||||
this.configuration = config;
|
||||
this.commentsEnabled = withComments;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the CREATE TABLE statement for the table to load into hive.
|
||||
*/
|
||||
public String getCreateTableStmt() throws IOException {
|
||||
Map<String, Integer> columnTypes = connManager.getColumnTypes(tableName);
|
||||
|
||||
String [] colNames = options.getColumns();
|
||||
if (null == colNames) {
|
||||
colNames = connManager.getColumnNames(tableName);
|
||||
}
|
||||
|
||||
StringBuilder sb = new StringBuilder();
|
||||
|
||||
sb.append("CREATE TABLE " + tableName + " ( ");
|
||||
|
||||
boolean first = true;
|
||||
for (String col : colNames) {
|
||||
if (!first) {
|
||||
sb.append(", ");
|
||||
}
|
||||
|
||||
first = false;
|
||||
|
||||
Integer colType = columnTypes.get(col);
|
||||
String hiveColType = HiveTypes.toHiveType(colType);
|
||||
if (null == hiveColType) {
|
||||
throw new IOException("Hive does not support the SQL type for column " + col);
|
||||
}
|
||||
|
||||
sb.append(col + " " + hiveColType);
|
||||
|
||||
if (HiveTypes.isHiveTypeImprovised(colType)) {
|
||||
LOG.warn("Column " + col + " had to be cast to a less precise type in Hive");
|
||||
}
|
||||
}
|
||||
|
||||
sb.append(") ");
|
||||
|
||||
if (commentsEnabled) {
|
||||
DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
|
||||
String curDateStr = dateFormat.format(new Date());
|
||||
sb.append("COMMENT 'Imported by sqoop on " + curDateStr + "' ");
|
||||
}
|
||||
|
||||
sb.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ");
|
||||
sb.append("LINES TERMINATED BY '\\n' STORED AS TEXTFILE");
|
||||
|
||||
LOG.debug("Create statement: " + sb.toString());
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
private static final int DEFAULT_HDFS_PORT =
|
||||
org.apache.hadoop.hdfs.server.namenode.NameNode.DEFAULT_PORT;
|
||||
|
||||
/**
|
||||
* @return the LOAD DATA statement to import the data in HDFS into hive
|
||||
*/
|
||||
public String getLoadDataStmt() throws IOException {
|
||||
String warehouseDir = options.getWarehouseDir();
|
||||
if (null == warehouseDir) {
|
||||
warehouseDir = "";
|
||||
} else if (!warehouseDir.endsWith(File.separator)) {
|
||||
warehouseDir = warehouseDir + File.separator;
|
||||
}
|
||||
|
||||
String tablePath = warehouseDir + tableName;
|
||||
FileSystem fs = FileSystem.get(configuration);
|
||||
Path finalPath = new Path(tablePath).makeQualified(fs);
|
||||
String finalPathStr = finalPath.toString();
|
||||
if (finalPathStr.startsWith("hdfs://") && finalPathStr.indexOf(":", 7) == -1) {
|
||||
// Hadoop removed the port number from the fully-qualified URL.
|
||||
// We need to reinsert this or else Hive will complain.
|
||||
// Do this right before the third instance of the '/' character.
|
||||
int insertPoint = 0;
|
||||
for (int i = 0; i < 3; i++) {
|
||||
insertPoint = finalPathStr.indexOf("/", insertPoint + 1);
|
||||
}
|
||||
|
||||
if (insertPoint == -1) {
|
||||
LOG.warn("Fully-qualified HDFS path does not contain a port.");
|
||||
LOG.warn("this may cause a Hive error.");
|
||||
} else {
|
||||
finalPathStr = finalPathStr.substring(0, insertPoint) + ":" + DEFAULT_HDFS_PORT
|
||||
+ finalPathStr.substring(insertPoint, finalPathStr.length());
|
||||
}
|
||||
}
|
||||
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("LOAD DATA INPATH '");
|
||||
sb.append(finalPathStr);
|
||||
sb.append("' INTO TABLE ");
|
||||
sb.append(tableName);
|
||||
|
||||
LOG.debug("Load statement: " + sb.toString());
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
|
@ -75,13 +75,6 @@ public interface ConnManager {
|
||||
*/
|
||||
Connection getConnection() throws SQLException;
|
||||
|
||||
/**
|
||||
* Resolve a database-specific type to the Java type that should contain it.
|
||||
* @param sqlType
|
||||
* @return the name of a Java type to hold the sql datatype, or null if none.
|
||||
*/
|
||||
String toJavaType(int sqlType);
|
||||
|
||||
/**
|
||||
* @return a string identifying the driver class to load for this JDBC connection type.
|
||||
*/
|
||||
|
@ -261,7 +261,12 @@ protected ResultSet execute(String stmt, Object... args) {
|
||||
// Or must statement.close() be called too?
|
||||
}
|
||||
|
||||
public String toJavaType(int sqlType) {
|
||||
/**
|
||||
* Resolve a database-specific type to the Java type that should contain it.
|
||||
* @param sqlType
|
||||
* @return the name of a Java type to hold the sql datatype, or null if none.
|
||||
*/
|
||||
public static String toJavaType(int sqlType) {
|
||||
// mappings from http://java.sun.com/j2se/1.3/docs/guide/jdbc/getstart/mapping.html
|
||||
if (sqlType == Types.INTEGER) {
|
||||
return "Integer";
|
||||
|
@ -20,6 +20,7 @@
|
||||
|
||||
import org.apache.hadoop.sqoop.ImportOptions;
|
||||
import org.apache.hadoop.sqoop.manager.ConnManager;
|
||||
import org.apache.hadoop.sqoop.manager.SqlManager;
|
||||
import org.apache.hadoop.sqoop.lib.BigDecimalSerializer;
|
||||
import org.apache.hadoop.sqoop.lib.JdbcWritableBridge;
|
||||
|
||||
@ -247,7 +248,7 @@ private void generateFields(Map<String, Integer> columnTypes, String [] colNames
|
||||
|
||||
for (String col : colNames) {
|
||||
int sqlType = columnTypes.get(col);
|
||||
String javaType = connManager.toJavaType(sqlType);
|
||||
String javaType = SqlManager.toJavaType(sqlType);
|
||||
if (null == javaType) {
|
||||
LOG.error("Cannot resolve SQL type " + sqlType);
|
||||
continue;
|
||||
@ -277,7 +278,7 @@ private void generateDbRead(Map<String, Integer> columnTypes, String [] colNames
|
||||
fieldNum++;
|
||||
|
||||
int sqlType = columnTypes.get(col);
|
||||
String javaType = connManager.toJavaType(sqlType);
|
||||
String javaType = SqlManager.toJavaType(sqlType);
|
||||
if (null == javaType) {
|
||||
LOG.error("No Java type for SQL type " + sqlType);
|
||||
continue;
|
||||
@ -314,7 +315,7 @@ private void generateDbWrite(Map<String, Integer> columnTypes, String [] colName
|
||||
fieldNum++;
|
||||
|
||||
int sqlType = columnTypes.get(col);
|
||||
String javaType = connManager.toJavaType(sqlType);
|
||||
String javaType = SqlManager.toJavaType(sqlType);
|
||||
if (null == javaType) {
|
||||
LOG.error("No Java type for SQL type " + sqlType);
|
||||
continue;
|
||||
@ -347,7 +348,7 @@ private void generateHadoopRead(Map<String, Integer> columnTypes, String [] colN
|
||||
|
||||
for (String col : colNames) {
|
||||
int sqlType = columnTypes.get(col);
|
||||
String javaType = connManager.toJavaType(sqlType);
|
||||
String javaType = SqlManager.toJavaType(sqlType);
|
||||
if (null == javaType) {
|
||||
LOG.error("No Java type for SQL type " + sqlType);
|
||||
continue;
|
||||
@ -380,7 +381,7 @@ private void generateToString(Map<String, Integer> columnTypes, String [] colNam
|
||||
boolean first = true;
|
||||
for (String col : colNames) {
|
||||
int sqlType = columnTypes.get(col);
|
||||
String javaType = connManager.toJavaType(sqlType);
|
||||
String javaType = SqlManager.toJavaType(sqlType);
|
||||
if (null == javaType) {
|
||||
LOG.error("No Java type for SQL type " + sqlType);
|
||||
continue;
|
||||
@ -420,7 +421,7 @@ private void generateHadoopWrite(Map<String, Integer> columnTypes, String [] col
|
||||
|
||||
for (String col : colNames) {
|
||||
int sqlType = columnTypes.get(col);
|
||||
String javaType = connManager.toJavaType(sqlType);
|
||||
String javaType = SqlManager.toJavaType(sqlType);
|
||||
if (null == javaType) {
|
||||
LOG.error("No Java type for SQL type " + sqlType);
|
||||
continue;
|
||||
|
117
src/java/org/apache/hadoop/sqoop/util/Executor.java
Normal file
117
src/java/org/apache/hadoop/sqoop/util/Executor.java
Normal file
@ -0,0 +1,117 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.sqoop.util;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
/**
|
||||
* Runs a process via Runtime.exec() and allows handling of stdout/stderr to be
|
||||
* deferred to other threads.
|
||||
*
|
||||
*/
|
||||
public final class Executor {
|
||||
|
||||
public static final Log LOG = LogFactory.getLog(Executor.class.getName());
|
||||
|
||||
private Executor() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute a program defined by the args array with default stream handlers
|
||||
* that consume the program's output (to prevent it from blocking on buffers)
|
||||
* and then ignore said output.
|
||||
*/
|
||||
public static int exec(String [] args) throws IOException {
|
||||
NullStreamHandlerFactory f = new NullStreamHandlerFactory();
|
||||
return exec(args, f, f);
|
||||
}
|
||||
|
||||
/**
|
||||
* Run a command via Runtime.exec(), with its stdout and stderr streams
|
||||
* directed to be handled by threads generated by StreamHandlerFactories.
|
||||
* Block until the child process terminates.
|
||||
*
|
||||
* @return the exit status of the ran program
|
||||
*/
|
||||
public static int exec(String [] args, StreamHandlerFactory outHandler,
|
||||
StreamHandlerFactory errHandler) throws IOException {
|
||||
return exec(args, null, outHandler, errHandler);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Run a command via Runtime.exec(), with its stdout and stderr streams
|
||||
* directed to be handled by threads generated by StreamHandlerFactories.
|
||||
* Block until the child process terminates. Allows the programmer to
|
||||
* specify an environment for the child program.
|
||||
*
|
||||
* @return the exit status of the ran program
|
||||
*/
|
||||
public static int exec(String [] args, String [] envp, StreamHandlerFactory outHandler,
|
||||
StreamHandlerFactory errHandler) throws IOException {
|
||||
|
||||
// launch the process.
|
||||
Process p = Runtime.getRuntime().exec(args, envp);
|
||||
|
||||
// dispatch its stdout and stderr to stream handlers if available.
|
||||
if (null != outHandler) {
|
||||
outHandler.processStream(p.getInputStream());
|
||||
}
|
||||
|
||||
if (null != errHandler) {
|
||||
errHandler.processStream(p.getErrorStream());
|
||||
}
|
||||
|
||||
// wait for the return value.
|
||||
while (true) {
|
||||
try {
|
||||
int ret = p.waitFor();
|
||||
return ret;
|
||||
} catch (InterruptedException ie) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @return An array formatted correctly for use as an envp based on the
|
||||
* current environment for this program.
|
||||
*/
|
||||
public static List<String> getCurEnvpStrings() {
|
||||
Map<String, String> curEnv = System.getenv();
|
||||
ArrayList<String> array = new ArrayList<String>();
|
||||
|
||||
if (null == curEnv) {
|
||||
return null;
|
||||
}
|
||||
|
||||
for (Map.Entry<String, String> entry : curEnv.entrySet()) {
|
||||
array.add(entry.getKey() + "=" + entry.getValue());
|
||||
}
|
||||
|
||||
return array;
|
||||
}
|
||||
}
|
@ -0,0 +1,89 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.sqoop.util;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
/**
|
||||
* A StreamHandlerFactory that takes the contents of a stream and writes
|
||||
* it to log4j.
|
||||
*
|
||||
*/
|
||||
public class LoggingStreamHandlerFactory implements StreamHandlerFactory {
|
||||
|
||||
public static final Log LOG = LogFactory.getLog(LoggingStreamHandlerFactory.class.getName());
|
||||
|
||||
private Log contextLog;
|
||||
|
||||
public LoggingStreamHandlerFactory(final Log context) {
|
||||
if (null == context) {
|
||||
this.contextLog = LOG;
|
||||
} else {
|
||||
this.contextLog = context;
|
||||
}
|
||||
}
|
||||
|
||||
public void processStream(InputStream is) {
|
||||
new LoggingThread(is).start();
|
||||
}
|
||||
|
||||
/**
|
||||
* Run a background thread that copies the contents of the stream
|
||||
* to the output context log.
|
||||
*/
|
||||
private class LoggingThread extends Thread {
|
||||
|
||||
private InputStream stream;
|
||||
|
||||
LoggingThread(final InputStream is) {
|
||||
this.stream = is;
|
||||
}
|
||||
|
||||
public void run() {
|
||||
InputStreamReader isr = new InputStreamReader(this.stream);
|
||||
BufferedReader r = new BufferedReader(isr);
|
||||
|
||||
try {
|
||||
while (true) {
|
||||
String line = r.readLine();
|
||||
if (null == line) {
|
||||
break; // stream was closed by remote end.
|
||||
}
|
||||
|
||||
LoggingStreamHandlerFactory.this.contextLog.info(line);
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
LOG.error("IOException reading from stream: " + ioe.toString());
|
||||
}
|
||||
|
||||
try {
|
||||
r.close();
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("Error closing stream in LoggingStreamHandler: " + ioe.toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,76 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.sqoop.util;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
/**
|
||||
* A StreamHandlerFactory that takes the contents of a stream and ignores it.
|
||||
*
|
||||
*/
|
||||
public class NullStreamHandlerFactory implements StreamHandlerFactory {
|
||||
|
||||
public static final Log LOG = LogFactory.getLog(NullStreamHandlerFactory.class.getName());
|
||||
|
||||
public void processStream(InputStream is) {
|
||||
new IgnoringThread(is).start();
|
||||
}
|
||||
|
||||
/**
|
||||
* Run a background thread that reads and ignores the
|
||||
* contents of the stream.
|
||||
*/
|
||||
private class IgnoringThread extends Thread {
|
||||
|
||||
private InputStream stream;
|
||||
|
||||
IgnoringThread(final InputStream is) {
|
||||
this.stream = is;
|
||||
}
|
||||
|
||||
public void run() {
|
||||
InputStreamReader isr = new InputStreamReader(this.stream);
|
||||
BufferedReader r = new BufferedReader(isr);
|
||||
|
||||
try {
|
||||
while (true) {
|
||||
String line = r.readLine();
|
||||
if (null == line) {
|
||||
break; // stream was closed by remote end.
|
||||
}
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("IOException reading from (ignored) stream: " + ioe.toString());
|
||||
}
|
||||
|
||||
try {
|
||||
r.close();
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("Error closing stream in NullStreamHandler: " + ioe.toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,39 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.sqoop.util;
|
||||
|
||||
import java.io.InputStream;
|
||||
|
||||
/**
|
||||
* An interface describing a factory class for a Thread class that handles
|
||||
* input from some sort of stream.
|
||||
*
|
||||
* When the stream is closed, the thread should terminate.
|
||||
*
|
||||
*/
|
||||
public interface StreamHandlerFactory {
|
||||
|
||||
/**
|
||||
* Create and run a thread to handle input from the provided InputStream.
|
||||
* When processStream returns, the thread should be running; it should
|
||||
* continue to run until the InputStream is exhausted.
|
||||
*/
|
||||
void processStream(InputStream is);
|
||||
}
|
||||
|
@ -18,6 +18,7 @@
|
||||
|
||||
package org.apache.hadoop.sqoop;
|
||||
|
||||
import org.apache.hadoop.sqoop.hive.TestHiveImport;
|
||||
import org.apache.hadoop.sqoop.manager.LocalMySQLTest;
|
||||
import org.apache.hadoop.sqoop.manager.TestHsqldbManager;
|
||||
import org.apache.hadoop.sqoop.manager.TestSqlManager;
|
||||
@ -46,6 +47,7 @@ public static Test suite() {
|
||||
suite.addTestSuite(TestMultiCols.class);
|
||||
suite.addTestSuite(TestOrderBy.class);
|
||||
suite.addTestSuite(LocalMySQLTest.class);
|
||||
suite.addTestSuite(TestHiveImport.class);
|
||||
|
||||
return suite;
|
||||
}
|
||||
|
146
src/test/org/apache/hadoop/sqoop/hive/TestHiveImport.java
Normal file
146
src/test/org/apache/hadoop/sqoop/hive/TestHiveImport.java
Normal file
@ -0,0 +1,146 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.sqoop.hive;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.sql.SQLException;
|
||||
import java.util.ArrayList;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
import org.apache.hadoop.sqoop.ImportOptions;
|
||||
import org.apache.hadoop.sqoop.testutil.HsqldbTestServer;
|
||||
import org.apache.hadoop.sqoop.testutil.ImportJobTestCase;
|
||||
|
||||
/**
|
||||
* Test HiveImport capability after an import to HDFS.
|
||||
*/
|
||||
public class TestHiveImport extends ImportJobTestCase {
|
||||
|
||||
public static final Log LOG = LogFactory.getLog(TestHiveImport.class.getName());
|
||||
|
||||
/**
|
||||
* Create the argv to pass to Sqoop
|
||||
* @return the argv as an array of strings.
|
||||
*/
|
||||
private String [] getArgv(boolean includeHadoopFlags) {
|
||||
ArrayList<String> args = new ArrayList<String>();
|
||||
|
||||
if (includeHadoopFlags) {
|
||||
args.add("-D");
|
||||
args.add("mapred.job.tracker=local");
|
||||
args.add("-D");
|
||||
args.add("mapred.map.tasks=1");
|
||||
args.add("-D");
|
||||
args.add("fs.default.name=file:///");
|
||||
}
|
||||
|
||||
args.add("--table");
|
||||
args.add(getTableName());
|
||||
args.add("--warehouse-dir");
|
||||
args.add(getWarehouseDir());
|
||||
args.add("--connect");
|
||||
args.add(HsqldbTestServer.getUrl());
|
||||
args.add("--hive-import");
|
||||
args.add("--order-by");
|
||||
args.add(getColNames()[0]);
|
||||
|
||||
return args.toArray(new String[0]);
|
||||
}
|
||||
|
||||
private ImportOptions getImportOptions() {
|
||||
ImportOptions opts = new ImportOptions();
|
||||
try {
|
||||
opts.parse(getArgv(false));
|
||||
} catch (ImportOptions.InvalidOptionsException ioe) {
|
||||
fail("Invalid options: " + ioe.toString());
|
||||
}
|
||||
|
||||
return opts;
|
||||
}
|
||||
|
||||
private void runImportTest(String tableName, String [] types, String [] values,
|
||||
String verificationScript) throws IOException {
|
||||
|
||||
// create a table and populate it with a row...
|
||||
setCurTableName(tableName);
|
||||
createTableWithColTypes(types, values);
|
||||
|
||||
// set up our mock hive shell to compare our generated script
|
||||
// against the correct expected one.
|
||||
ImportOptions options = getImportOptions();
|
||||
String hiveHome = options.getHiveHome();
|
||||
assertNotNull("hive.home was not set", hiveHome);
|
||||
Path testDataPath = new Path(new Path(hiveHome), "scripts/" + verificationScript);
|
||||
System.setProperty("expected.script", testDataPath.toString());
|
||||
|
||||
// verify that we can import it correctly into hive.
|
||||
runImport(getArgv(true));
|
||||
}
|
||||
|
||||
/** Test that strings and ints are handled in the normal fashion */
|
||||
@Test
|
||||
public void testNormalHiveImport() throws IOException {
|
||||
String [] types = { "VARCHAR(32)", "INTEGER", "CHAR(64)" };
|
||||
String [] vals = { "'test'", "42", "'somestring'" };
|
||||
runImportTest("NORMAL_HIVE_IMPORT", types, vals, "normalImport.q");
|
||||
}
|
||||
|
||||
/** Test that dates are coerced properly to strings */
|
||||
@Test
|
||||
public void testDate() throws IOException {
|
||||
String [] types = { "VARCHAR(32)", "DATE" };
|
||||
String [] vals = { "'test'", "'2009-05-12'" };
|
||||
runImportTest("DATE_HIVE_IMPORT", types, vals, "dateImport.q");
|
||||
}
|
||||
|
||||
/** Test that NUMERICs are coerced to doubles */
|
||||
@Test
|
||||
public void testNumeric() throws IOException {
|
||||
String [] types = { "NUMERIC", "CHAR(64)" };
|
||||
String [] vals = { "3.14159", "'foo'" };
|
||||
runImportTest("NUMERIC_HIVE_IMPORT", types, vals, "numericImport.q");
|
||||
}
|
||||
|
||||
/** If bin/hive returns an error exit status, we should get an IOException */
|
||||
@Test
|
||||
public void testHiveExitFails() {
|
||||
// The expected script is different than the one which would be generated
|
||||
// by this, so we expect an IOException out.
|
||||
String [] types = { "NUMERIC", "CHAR(64)" };
|
||||
String [] vals = { "3.14159", "'foo'" };
|
||||
try {
|
||||
runImportTest("FAILING_HIVE_IMPORT", types, vals, "failingImport.q");
|
||||
// If we get here, then the run succeeded -- which is incorrect.
|
||||
fail("FAILING_HIVE_IMPORT test should have thrown IOException");
|
||||
} catch (IOException ioe) {
|
||||
// expected; ok.
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -46,9 +46,6 @@
|
||||
/**
|
||||
* Class that implements common methods required for tests which import data
|
||||
* from SQL into HDFS and verify correct import.
|
||||
*
|
||||
*
|
||||
*
|
||||
*/
|
||||
public class ImportJobTestCase extends TestCase {
|
||||
|
||||
@ -71,6 +68,13 @@ public class ImportJobTestCase extends TestCase {
|
||||
LOCAL_WAREHOUSE_DIR = TEMP_BASE_DIR + "sqoop/warehouse";
|
||||
}
|
||||
|
||||
// Used if a test manually sets the table name to be used.
|
||||
private String curTableName;
|
||||
|
||||
protected void setCurTableName(String curName) {
|
||||
this.curTableName = curName;
|
||||
}
|
||||
|
||||
/**
|
||||
* Because of how classloading works, we don't actually want to name
|
||||
* all the tables the same thing -- they'll actually just use the same
|
||||
@ -83,7 +87,11 @@ public class ImportJobTestCase extends TestCase {
|
||||
static final String TABLE_NAME = "IMPORT_TABLE_";
|
||||
|
||||
protected String getTableName() {
|
||||
return TABLE_NAME + Integer.toString(tableNum);
|
||||
if (null != curTableName) {
|
||||
return curTableName;
|
||||
} else {
|
||||
return TABLE_NAME + Integer.toString(tableNum);
|
||||
}
|
||||
}
|
||||
|
||||
protected String getWarehouseDir() {
|
||||
@ -140,12 +148,15 @@ public void setUp() {
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
setCurTableName(null); // clear user-override table name.
|
||||
|
||||
try {
|
||||
manager.close();
|
||||
} catch (SQLException sqlE) {
|
||||
LOG.error("Got SQLException: " + sqlE.toString());
|
||||
fail("Got SQLException: " + sqlE.toString());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
static final String BASE_COL_NAME = "DATA_COL";
|
||||
@ -385,7 +396,9 @@ protected void runImport(String [] argv) throws IOException {
|
||||
}
|
||||
|
||||
// expect a successful return.
|
||||
assertEquals("Failure during job", 0, ret);
|
||||
if (0 != ret) {
|
||||
throw new IOException("Failure during job; return status " + ret);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
59
testdata/hive/bin/hive
vendored
Normal file
59
testdata/hive/bin/hive
vendored
Normal file
@ -0,0 +1,59 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# This is a mock "Hive" shell that validates whether various test imports
|
||||
# succeeded. It accepts commands of the form 'hive -f scriptname'
|
||||
# and validates that the script contents match those of an expected script.
|
||||
# The filename to that expected script is set via the environment variable
|
||||
# EXPECTED_SCRIPT.
|
||||
|
||||
# The script will contain a pathname as part of the LOAD DATA INPATH statement;
|
||||
# depending on where you run the tests from, this can change. So the expected
|
||||
# script file actually contains the marker string "BASEPATH" which is replaced
|
||||
# by this script with the contents of $TMPDIR, which is set to 'test.build.data'.
|
||||
|
||||
if [ -z "$EXPECTED_SCRIPT" ]; then
|
||||
echo "No expected script set"
|
||||
exit 1
|
||||
elif [ -z "$TMPDIR" ]; then
|
||||
TMPDIR=/tmp
|
||||
elif [ "$1" != "-f" ]; then
|
||||
echo "Misunderstood argument: $1"
|
||||
echo "Expected '-f'."
|
||||
exit 1
|
||||
elif [ -z "$2" ]; then
|
||||
echo "Expected: hive -f filename"
|
||||
exit 1
|
||||
else
|
||||
GENERATED_SCRIPT=$2
|
||||
fi
|
||||
|
||||
# Normalize this to an absolute path
|
||||
TMPDIR=`cd $TMPDIR && pwd`
|
||||
|
||||
# Copy the expected script into the tmpdir and replace the marker.
|
||||
cp "$EXPECTED_SCRIPT" "$TMPDIR"
|
||||
SCRIPT_BASE=`basename $EXPECTED_SCRIPT`
|
||||
COPIED_SCRIPT="$TMPDIR/$SCRIPT_BASE"
|
||||
sed -i -e "s|BASEPATH|$TMPDIR|" $COPIED_SCRIPT
|
||||
|
||||
# Actually check to see that the input we got matches up.
|
||||
diff --ignore-all-space --ignore-blank-lines "$COPIED_SCRIPT" "$GENERATED_SCRIPT"
|
||||
ret=$?
|
||||
|
||||
exit $ret
|
||||
|
2
testdata/hive/scripts/dateImport.q
vendored
Normal file
2
testdata/hive/scripts/dateImport.q
vendored
Normal file
@ -0,0 +1,2 @@
|
||||
CREATE TABLE DATE_HIVE_IMPORT ( DATA_COL0 STRING, DATA_COL1 STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE;
|
||||
LOAD DATA INPATH 'file:BASEPATH/sqoop/warehouse/DATE_HIVE_IMPORT' INTO TABLE DATE_HIVE_IMPORT;
|
2
testdata/hive/scripts/failingImport.q
vendored
Normal file
2
testdata/hive/scripts/failingImport.q
vendored
Normal file
@ -0,0 +1,2 @@
|
||||
CREATE TABLE DATE_HIVE_IMPORT ( DATA_COL0 STRING, DATA_COL1 STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE;
|
||||
LOAD DATA INPATH 'file:BASEPATH/sqoop/warehouse/DATE_HIVE_IMPORT' INTO TABLE DATE_HIVE_IMPORT;
|
2
testdata/hive/scripts/normalImport.q
vendored
Normal file
2
testdata/hive/scripts/normalImport.q
vendored
Normal file
@ -0,0 +1,2 @@
|
||||
CREATE TABLE NORMAL_HIVE_IMPORT ( DATA_COL0 STRING, DATA_COL1 INT, DATA_COL2 STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE;
|
||||
LOAD DATA INPATH 'file:BASEPATH/sqoop/warehouse/NORMAL_HIVE_IMPORT' INTO TABLE NORMAL_HIVE_IMPORT;
|
2
testdata/hive/scripts/numericImport.q
vendored
Normal file
2
testdata/hive/scripts/numericImport.q
vendored
Normal file
@ -0,0 +1,2 @@
|
||||
CREATE TABLE NUMERIC_HIVE_IMPORT ( DATA_COL0 DOUBLE, DATA_COL1 STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE;
|
||||
LOAD DATA INPATH 'file:BASEPATH/sqoop/warehouse/NUMERIC_HIVE_IMPORT' INTO TABLE NUMERIC_HIVE_IMPORT;
|
Loading…
Reference in New Issue
Block a user