mirror of
https://github.com/apache/sqoop.git
synced 2025-05-02 23:21:22 +08:00
Merge 1bbd501153
into f8beae32a0
This commit is contained in:
commit
757f608416
@ -74,6 +74,12 @@ public class MySQLExportMapper<KEYIN, VALIN>
|
||||
|
||||
public static final long DEFAULT_CHECKPOINT_SLEEP_MS = 0;
|
||||
|
||||
public static final String MYSQL_CHARSET_KEY =
|
||||
"sqoop.mysql.export.charset";
|
||||
|
||||
public static final String MYSQL_CLAUSE =
|
||||
"sqoop.mysql.export.clause";
|
||||
|
||||
// Configured value for MYSQL_CHECKPOINT_SLEEP_KEY.
|
||||
protected long checkpointSleepMs;
|
||||
|
||||
@ -111,8 +117,9 @@ public class MySQLExportMapper<KEYIN, VALIN>
|
||||
private void initMySQLImportProcess() throws IOException {
|
||||
File taskAttemptDir = TaskId.getLocalWorkPath(conf);
|
||||
|
||||
String tableName = conf.get(MySQLUtils.TABLE_NAME_KEY, "UNKNOWN_TABLE");
|
||||
this.fifoFile = new File(taskAttemptDir,
|
||||
conf.get(MySQLUtils.TABLE_NAME_KEY, "UNKNOWN_TABLE") + ".txt");
|
||||
tableName + ".txt");
|
||||
String filename = fifoFile.toString();
|
||||
|
||||
// Create the FIFO itself.
|
||||
@ -126,9 +133,6 @@ private void initMySQLImportProcess() throws IOException {
|
||||
"Could not create FIFO to interface with mysqlimport", ioe);
|
||||
}
|
||||
|
||||
// Now open the connection to mysqlimport.
|
||||
ArrayList<String> args = new ArrayList<String>();
|
||||
|
||||
String connectString = conf.get(MySQLUtils.CONNECT_STRING_KEY);
|
||||
String databaseName = JdbcUrl.getDatabaseName(connectString);
|
||||
String hostname = JdbcUrl.getHostName(connectString);
|
||||
@ -138,23 +142,63 @@ private void initMySQLImportProcess() throws IOException {
|
||||
throw new IOException("Could not determine database name");
|
||||
}
|
||||
|
||||
ArrayList<String> args = null;
|
||||
// Use mysql to do the import if sqoop.mysql.export.clause is set
|
||||
// e.g. hadoop jar build/sqoop-1.4.7-SNAPSHOT.jar org.apache.sqoop.Sqoop export -Dsqoop.mysql.export.clause="set @bh_dataformat='mysql'"
|
||||
// --connect "jdbc:mysql://localhost/test?user=xxx&password=xxx" --table countries
|
||||
// --export-dir /user/test/countries.txt --fields-terminated-by '\t' --direct --password xxx --username xxx
|
||||
String mysqlClause = conf.get(MYSQL_CLAUSE);
|
||||
if (mysqlClause != null) {
|
||||
args = getMysqlCommandWithClause(filename, tableName, mysqlClause);
|
||||
} else {
|
||||
args = getMysqlImportCommand(filename);
|
||||
}
|
||||
|
||||
|
||||
// Begin the export in an external process.
|
||||
LOG.info("Starting import with arguments:");
|
||||
for (String arg : args) {
|
||||
LOG.info(" " + arg);
|
||||
}
|
||||
|
||||
// Actually start mysqlimport.
|
||||
mysqlImportProcess = Runtime.getRuntime().exec(args.toArray(new String[0]));
|
||||
|
||||
// Log everything it writes to stderr.
|
||||
// Ignore anything on stdout.
|
||||
this.outSink = new NullAsyncSink();
|
||||
this.outSink.processStream(mysqlImportProcess.getInputStream());
|
||||
|
||||
this.errSink = new LoggingAsyncSink(LOG);
|
||||
this.errSink.processStream(mysqlImportProcess.getErrorStream());
|
||||
|
||||
// Open the named FIFO after starting mysqlimport.
|
||||
this.importStream = new BufferedOutputStream(
|
||||
new FileOutputStream(fifoFile));
|
||||
|
||||
// At this point, mysqlimport is running and hooked up to our FIFO.
|
||||
// The mapper just needs to populate it with data.
|
||||
|
||||
this.bytesWritten = 0;
|
||||
}
|
||||
|
||||
private ArrayList<String> getMysqlCommandWithClause(String filename,
|
||||
String tablename, String clause) throws IOException {
|
||||
ArrayList<String> args = new ArrayList<String>();
|
||||
args.add("mysql"); // needs to be on the path.
|
||||
String databaseName = setupDbHostUserPwd(args);
|
||||
args.add(databaseName);
|
||||
args.add("-e");
|
||||
args.add(clause + ";" + String.format("load data local infile '%s' into table %s", filename, tablename));
|
||||
return args;
|
||||
}
|
||||
|
||||
private ArrayList<String> getMysqlImportCommand(String filename) throws IOException {
|
||||
// Now open the connection to mysqlimport.
|
||||
ArrayList<String> args = new ArrayList<String>();
|
||||
args.add(MySQLUtils.MYSQL_IMPORT_CMD); // needs to be on the path.
|
||||
String password = DBConfiguration.getPassword((JobConf) conf);
|
||||
|
||||
if (null != password && password.length() > 0) {
|
||||
passwordFile = new File(MySQLUtils.writePasswordFile(conf));
|
||||
args.add("--defaults-file=" + passwordFile);
|
||||
}
|
||||
|
||||
String username = conf.get(MySQLUtils.USERNAME_KEY);
|
||||
if (null != username) {
|
||||
args.add("--user=" + username);
|
||||
}
|
||||
|
||||
args.add("--host=" + hostname);
|
||||
if (-1 != port) {
|
||||
args.add("--port=" + Integer.toString(port));
|
||||
}
|
||||
String databaseName = setupDbHostUserPwd(args);
|
||||
|
||||
args.add("--compress");
|
||||
args.add("--local");
|
||||
@ -207,32 +251,61 @@ private void initMySQLImportProcess() throws IOException {
|
||||
// These two arguments are positional and must be last.
|
||||
args.add(databaseName);
|
||||
args.add(filename);
|
||||
return args;
|
||||
}
|
||||
|
||||
// Begin the export in an external process.
|
||||
LOG.debug("Starting mysqlimport with arguments:");
|
||||
for (String arg : args) {
|
||||
LOG.debug(" " + arg);
|
||||
private String setupDbHostUserPwd(ArrayList<String> args) throws IOException {
|
||||
String connectString = conf.get(MySQLUtils.CONNECT_STRING_KEY);
|
||||
String databaseName = JdbcUrl.getDatabaseName(connectString);
|
||||
String hostname = JdbcUrl.getHostName(connectString);
|
||||
int port = JdbcUrl.getPort(connectString);
|
||||
|
||||
if (null == databaseName) {
|
||||
throw new IOException("Could not determine database name");
|
||||
}
|
||||
|
||||
// Actually start mysqlimport.
|
||||
mysqlImportProcess = Runtime.getRuntime().exec(args.toArray(new String[0]));
|
||||
String password = DBConfiguration.getPassword((JobConf) conf);
|
||||
|
||||
// Log everything it writes to stderr.
|
||||
// Ignore anything on stdout.
|
||||
this.outSink = new NullAsyncSink();
|
||||
this.outSink.processStream(mysqlImportProcess.getInputStream());
|
||||
if (null != password && password.length() > 0) {
|
||||
passwordFile = new File(MySQLUtils.writePasswordFile(conf));
|
||||
args.add("--defaults-file=" + passwordFile);
|
||||
}
|
||||
|
||||
this.errSink = new LoggingAsyncSink(LOG);
|
||||
this.errSink.processStream(mysqlImportProcess.getErrorStream());
|
||||
String username = conf.get(MySQLUtils.USERNAME_KEY);
|
||||
if (null != username) {
|
||||
args.add("--user=" + username);
|
||||
}
|
||||
|
||||
// Open the named FIFO after starting mysqlimport.
|
||||
this.importStream = new BufferedOutputStream(
|
||||
new FileOutputStream(fifoFile));
|
||||
args.add("--host=" + hostname);
|
||||
if (-1 != port) {
|
||||
args.add("--port=" + Integer.toString(port));
|
||||
}
|
||||
return databaseName;
|
||||
}
|
||||
|
||||
// At this point, mysqlimport is running and hooked up to our FIFO.
|
||||
// The mapper just needs to populate it with data.
|
||||
private String setupDbNameHostUserPwd(ArrayList<String> args) throws IOException {
|
||||
String connectString = conf.get(MySQLUtils.CONNECT_STRING_KEY);
|
||||
String databaseName = JdbcUrl.getDatabaseName(connectString);
|
||||
String hostname = JdbcUrl.getHostName(connectString);
|
||||
int port = JdbcUrl.getPort(connectString);
|
||||
|
||||
this.bytesWritten = 0;
|
||||
if (null == databaseName) {
|
||||
throw new IOException("Could not determine database name");
|
||||
}
|
||||
|
||||
args.add(MySQLUtils.MYSQL_IMPORT_CMD); // needs to be on the path.
|
||||
String password = DBConfiguration.getPassword((JobConf) conf);
|
||||
|
||||
String username = conf.get(MySQLUtils.USERNAME_KEY);
|
||||
if (null != username) {
|
||||
args.add("--user=" + username);
|
||||
}
|
||||
|
||||
args.add("--host=" + hostname);
|
||||
if (-1 != port) {
|
||||
args.add("--port=" + Integer.toString(port));
|
||||
}
|
||||
return databaseName;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -319,7 +392,8 @@ protected void setup(Context context) {
|
||||
this.conf = context.getConfiguration();
|
||||
|
||||
// TODO: Support additional encodings.
|
||||
this.mysqlCharSet = MySQLUtils.MYSQL_DEFAULT_CHARSET;
|
||||
this.mysqlCharSet = conf.get(MYSQL_CHARSET_KEY,
|
||||
MySQLUtils.MYSQL_DEFAULT_CHARSET);
|
||||
|
||||
this.checkpointDistInBytes = conf.getLong(
|
||||
MYSQL_CHECKPOINT_BYTES_KEY, DEFAULT_CHECKPOINT_BYTES);
|
||||
|
Loading…
Reference in New Issue
Block a user