mirror of
https://github.com/apache/sqoop.git
synced 2025-05-03 04:11:44 +08:00
Add management of the schema in direct netezza import
This commit is contained in:
parent
0bd850396e
commit
60aafe073c
@ -23,6 +23,7 @@
|
|||||||
import java.sql.PreparedStatement;
|
import java.sql.PreparedStatement;
|
||||||
import java.sql.ResultSet;
|
import java.sql.ResultSet;
|
||||||
import java.sql.SQLException;
|
import java.sql.SQLException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
|
||||||
import org.apache.commons.cli.CommandLine;
|
import org.apache.commons.cli.CommandLine;
|
||||||
import org.apache.commons.cli.GnuParser;
|
import org.apache.commons.cli.GnuParser;
|
||||||
@ -86,6 +87,8 @@ public class DirectNetezzaManager extends NetezzaManager {
|
|||||||
public static final String NETEZZA_TABLE_ENCODING_LONG_ARG =
|
public static final String NETEZZA_TABLE_ENCODING_LONG_ARG =
|
||||||
"encoding";
|
"encoding";
|
||||||
|
|
||||||
|
public static final String NETEZZA_SCHEMA_OPT = "netezza.schema";
|
||||||
|
public static final String NETEZZA_TABLE_SCHEMA_LONG_ARG = "schema";
|
||||||
|
|
||||||
private static final String QUERY_CHECK_DICTIONARY_FOR_TABLE =
|
private static final String QUERY_CHECK_DICTIONARY_FOR_TABLE =
|
||||||
"SELECT 1 FROM _V_TABLE WHERE OWNER= ? "
|
"SELECT 1 FROM _V_TABLE WHERE OWNER= ? "
|
||||||
@ -293,6 +296,9 @@ protected RelatedOptions getNetezzaExtraOpts() {
|
|||||||
netezzaOpts.addOption(OptionBuilder.withArgName(NETEZZA_TABLE_ENCODING_OPT)
|
netezzaOpts.addOption(OptionBuilder.withArgName(NETEZZA_TABLE_ENCODING_OPT)
|
||||||
.hasArg().withDescription("Table encoding")
|
.hasArg().withDescription("Table encoding")
|
||||||
.withLongOpt(NETEZZA_TABLE_ENCODING_LONG_ARG).create());
|
.withLongOpt(NETEZZA_TABLE_ENCODING_LONG_ARG).create());
|
||||||
|
netezzaOpts.addOption(OptionBuilder.withArgName(NETEZZA_SCHEMA_OPT)
|
||||||
|
.hasArg().withDescription("Allow Schema")
|
||||||
|
.withLongOpt(NETEZZA_TABLE_SCHEMA_LONG_ARG).create());
|
||||||
return netezzaOpts;
|
return netezzaOpts;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -302,6 +308,8 @@ private void handleNetezzaExtraArgs(SqoopOptions opts)
|
|||||||
Configuration conf = opts.getConf();
|
Configuration conf = opts.getConf();
|
||||||
|
|
||||||
String[] extraArgs = opts.getExtraArgs();
|
String[] extraArgs = opts.getExtraArgs();
|
||||||
|
|
||||||
|
LOG.debug("extraArgs " + Arrays.toString(extraArgs));
|
||||||
|
|
||||||
RelatedOptions netezzaOpts = getNetezzaExtraOpts();
|
RelatedOptions netezzaOpts = getNetezzaExtraOpts();
|
||||||
CommandLine cmdLine = new GnuParser().parse(netezzaOpts, extraArgs, true);
|
CommandLine cmdLine = new GnuParser().parse(netezzaOpts, extraArgs, true);
|
||||||
@ -320,6 +328,12 @@ private void handleNetezzaExtraArgs(SqoopOptions opts)
|
|||||||
conf.set(NETEZZA_TABLE_ENCODING_OPT, encoding);
|
conf.set(NETEZZA_TABLE_ENCODING_OPT, encoding);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (cmdLine.hasOption(NETEZZA_TABLE_SCHEMA_LONG_ARG)) {
|
||||||
|
String schemaName = cmdLine.getOptionValue(NETEZZA_TABLE_SCHEMA_LONG_ARG);
|
||||||
|
LOG.info("We will use schema " + schemaName);
|
||||||
|
conf.set(NETEZZA_SCHEMA_OPT, schemaName);
|
||||||
|
}
|
||||||
|
|
||||||
conf.setBoolean(NETEZZA_CTRL_CHARS_OPT,
|
conf.setBoolean(NETEZZA_CTRL_CHARS_OPT,
|
||||||
cmdLine.hasOption(NETEZZA_CTRL_CHARS_LONG_ARG));
|
cmdLine.hasOption(NETEZZA_CTRL_CHARS_LONG_ARG));
|
||||||
|
|
||||||
@ -353,6 +367,20 @@ public boolean isORMFacilitySelfManaged() {
|
|||||||
public boolean isDirectModeHCatSupported() {
|
public boolean isDirectModeHCatSupported() {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String escapeTableName(String tableName) {
|
||||||
|
Configuration conf = options.getConf();
|
||||||
|
|
||||||
|
String schema = conf.get(NETEZZA_SCHEMA_OPT);
|
||||||
|
// Return table name including schema if requested
|
||||||
|
if (schema != null && !schema.isEmpty()) {
|
||||||
|
return escapeIdentifier(schema) + "." + escapeIdentifier(tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
return escapeIdentifier(tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
public static String getLocalLogDir(TaskAttemptID attemptId) {
|
public static String getLocalLogDir(TaskAttemptID attemptId) {
|
||||||
|
@ -75,8 +75,15 @@ private String getSqlStatement(int myId) throws IOException {
|
|||||||
char ec = (char) conf.getInt(DelimiterSet.OUTPUT_ESCAPED_BY_KEY, 0);
|
char ec = (char) conf.getInt(DelimiterSet.OUTPUT_ESCAPED_BY_KEY, 0);
|
||||||
|
|
||||||
String nullValue = conf.get(DirectNetezzaManager.NETEZZA_NULL_VALUE);
|
String nullValue = conf.get(DirectNetezzaManager.NETEZZA_NULL_VALUE);
|
||||||
|
String schema = conf.get(DirectNetezzaManager.NETEZZA_SCHEMA_OPT);
|
||||||
|
|
||||||
|
String tableName = dbc.getInputTableName();
|
||||||
|
if (schema!=null)
|
||||||
|
{
|
||||||
|
tableName = schema +"."+tableName;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int errorThreshold = conf.getInt(
|
int errorThreshold = conf.getInt(
|
||||||
DirectNetezzaManager.NETEZZA_ERROR_THRESHOLD_OPT, 1);
|
DirectNetezzaManager.NETEZZA_ERROR_THRESHOLD_OPT, 1);
|
||||||
String logDir = conf.get(DirectNetezzaManager.NETEZZA_LOG_DIR_OPT);
|
String logDir = conf.get(DirectNetezzaManager.NETEZZA_LOG_DIR_OPT);
|
||||||
@ -129,7 +136,7 @@ private String getSqlStatement(int myId) throws IOException {
|
|||||||
sqlStmt.append(',').append(cols[i]);
|
sqlStmt.append(',').append(cols[i]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
sqlStmt.append(" FROM ").append(dbc.getInputTableName()).append(' ');
|
sqlStmt.append(" FROM ").append(tableName).append(' ');
|
||||||
sqlStmt.append("WHERE (DATASLICEID % ");
|
sqlStmt.append("WHERE (DATASLICEID % ");
|
||||||
sqlStmt.append(numMappers).append(") = ").append(myId);
|
sqlStmt.append(numMappers).append(") = ").append(myId);
|
||||||
if (inputConds != null && inputConds.length() > 0) {
|
if (inputConds != null && inputConds.length() > 0) {
|
||||||
|
Loading…
Reference in New Issue
Block a user