5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-02 18:30:15 +08:00

Use mysql to import data from hdfs to the database when sqoop.mysql.export.clause is set

This commit is contained in:
rgan 2016-08-01 22:27:54 -04:00
parent 7808c6bc3e
commit 1bbd501153

View File

@ -74,6 +74,12 @@ public class MySQLExportMapper<KEYIN, VALIN>
public static final long DEFAULT_CHECKPOINT_SLEEP_MS = 0;
public static final String MYSQL_CHARSET_KEY =
"sqoop.mysql.export.charset";
public static final String MYSQL_CLAUSE =
"sqoop.mysql.export.clause";
// Configured value for MYSQL_CHECKPOINT_SLEEP_KEY.
protected long checkpointSleepMs;
@ -111,8 +117,9 @@ public class MySQLExportMapper<KEYIN, VALIN>
private void initMySQLImportProcess() throws IOException {
File taskAttemptDir = TaskId.getLocalWorkPath(conf);
String tableName = conf.get(MySQLUtils.TABLE_NAME_KEY, "UNKNOWN_TABLE");
this.fifoFile = new File(taskAttemptDir,
conf.get(MySQLUtils.TABLE_NAME_KEY, "UNKNOWN_TABLE") + ".txt");
tableName + ".txt");
String filename = fifoFile.toString();
// Create the FIFO itself.
@ -126,9 +133,6 @@ private void initMySQLImportProcess() throws IOException {
"Could not create FIFO to interface with mysqlimport", ioe);
}
// Now open the connection to mysqlimport.
ArrayList<String> args = new ArrayList<String>();
String connectString = conf.get(MySQLUtils.CONNECT_STRING_KEY);
String databaseName = JdbcUrl.getDatabaseName(connectString);
String hostname = JdbcUrl.getHostName(connectString);
@ -138,23 +142,63 @@ private void initMySQLImportProcess() throws IOException {
throw new IOException("Could not determine database name");
}
ArrayList<String> args = null;
// Use mysql to do the import if sqoop.mysql.export.clause is set
// e.g. hadoop jar build/sqoop-1.4.7-SNAPSHOT.jar org.apache.sqoop.Sqoop export -Dsqoop.mysql.export.clause="set @bh_dataformat='mysql'"
// --connect "jdbc:mysql://localhost/test?user=xxx&password=xxx" --table countries
// --export-dir /user/test/countries.txt --fields-terminated-by '\t' --direct --password xxx --username xxx
String mysqlClause = conf.get(MYSQL_CLAUSE);
if (mysqlClause != null) {
args = getMysqlCommandWithClause(filename, tableName, mysqlClause);
} else {
args = getMysqlImportCommand(filename);
}
// Begin the export in an external process.
LOG.info("Starting import with arguments:");
for (String arg : args) {
LOG.info(" " + arg);
}
// Actually start mysqlimport.
mysqlImportProcess = Runtime.getRuntime().exec(args.toArray(new String[0]));
// Log everything it writes to stderr.
// Ignore anything on stdout.
this.outSink = new NullAsyncSink();
this.outSink.processStream(mysqlImportProcess.getInputStream());
this.errSink = new LoggingAsyncSink(LOG);
this.errSink.processStream(mysqlImportProcess.getErrorStream());
// Open the named FIFO after starting mysqlimport.
this.importStream = new BufferedOutputStream(
new FileOutputStream(fifoFile));
// At this point, mysqlimport is running and hooked up to our FIFO.
// The mapper just needs to populate it with data.
this.bytesWritten = 0;
}
private ArrayList<String> getMysqlCommandWithClause(String filename,
String tablename, String clause) throws IOException {
ArrayList<String> args = new ArrayList<String>();
args.add("mysql"); // needs to be on the path.
String databaseName = setupDbHostUserPwd(args);
args.add(databaseName);
args.add("-e");
args.add(clause + ";" + String.format("load data local infile '%s' into table %s", filename, tablename));
return args;
}
private ArrayList<String> getMysqlImportCommand(String filename) throws IOException {
// Now open the connection to mysqlimport.
ArrayList<String> args = new ArrayList<String>();
args.add(MySQLUtils.MYSQL_IMPORT_CMD); // needs to be on the path.
String password = DBConfiguration.getPassword((JobConf) conf);
if (null != password && password.length() > 0) {
passwordFile = new File(MySQLUtils.writePasswordFile(conf));
args.add("--defaults-file=" + passwordFile);
}
String username = conf.get(MySQLUtils.USERNAME_KEY);
if (null != username) {
args.add("--user=" + username);
}
args.add("--host=" + hostname);
if (-1 != port) {
args.add("--port=" + Integer.toString(port));
}
String databaseName = setupDbHostUserPwd(args);
args.add("--compress");
args.add("--local");
@ -207,32 +251,61 @@ private void initMySQLImportProcess() throws IOException {
// These two arguments are positional and must be last.
args.add(databaseName);
args.add(filename);
return args;
}
// Begin the export in an external process.
LOG.debug("Starting mysqlimport with arguments:");
for (String arg : args) {
LOG.debug(" " + arg);
private String setupDbHostUserPwd(ArrayList<String> args) throws IOException {
String connectString = conf.get(MySQLUtils.CONNECT_STRING_KEY);
String databaseName = JdbcUrl.getDatabaseName(connectString);
String hostname = JdbcUrl.getHostName(connectString);
int port = JdbcUrl.getPort(connectString);
if (null == databaseName) {
throw new IOException("Could not determine database name");
}
// Actually start mysqlimport.
mysqlImportProcess = Runtime.getRuntime().exec(args.toArray(new String[0]));
String password = DBConfiguration.getPassword((JobConf) conf);
// Log everything it writes to stderr.
// Ignore anything on stdout.
this.outSink = new NullAsyncSink();
this.outSink.processStream(mysqlImportProcess.getInputStream());
if (null != password && password.length() > 0) {
passwordFile = new File(MySQLUtils.writePasswordFile(conf));
args.add("--defaults-file=" + passwordFile);
}
this.errSink = new LoggingAsyncSink(LOG);
this.errSink.processStream(mysqlImportProcess.getErrorStream());
String username = conf.get(MySQLUtils.USERNAME_KEY);
if (null != username) {
args.add("--user=" + username);
}
// Open the named FIFO after starting mysqlimport.
this.importStream = new BufferedOutputStream(
new FileOutputStream(fifoFile));
args.add("--host=" + hostname);
if (-1 != port) {
args.add("--port=" + Integer.toString(port));
}
return databaseName;
}
// At this point, mysqlimport is running and hooked up to our FIFO.
// The mapper just needs to populate it with data.
private String setupDbNameHostUserPwd(ArrayList<String> args) throws IOException {
String connectString = conf.get(MySQLUtils.CONNECT_STRING_KEY);
String databaseName = JdbcUrl.getDatabaseName(connectString);
String hostname = JdbcUrl.getHostName(connectString);
int port = JdbcUrl.getPort(connectString);
this.bytesWritten = 0;
if (null == databaseName) {
throw new IOException("Could not determine database name");
}
args.add(MySQLUtils.MYSQL_IMPORT_CMD); // needs to be on the path.
String password = DBConfiguration.getPassword((JobConf) conf);
String username = conf.get(MySQLUtils.USERNAME_KEY);
if (null != username) {
args.add("--user=" + username);
}
args.add("--host=" + hostname);
if (-1 != port) {
args.add("--port=" + Integer.toString(port));
}
return databaseName;
}
@Override
@ -319,7 +392,8 @@ protected void setup(Context context) {
this.conf = context.getConfiguration();
// TODO: Support additional encodings.
this.mysqlCharSet = MySQLUtils.MYSQL_DEFAULT_CHARSET;
this.mysqlCharSet = conf.get(MYSQL_CHARSET_KEY,
MySQLUtils.MYSQL_DEFAULT_CHARSET);
this.checkpointDistInBytes = conf.getLong(
MYSQL_CHECKPOINT_BYTES_KEY, DEFAULT_CHECKPOINT_BYTES);