5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-02 20:52:31 +08:00

SQOOP-1138: incremental lastmodified should re-use output directory

(Abraham Elmahrek via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2014-07-14 22:24:20 -07:00
parent d4ff097ea3
commit 34e4efd0d7
2 changed files with 369 additions and 7 deletions

View File

@ -28,12 +28,17 @@
import java.util.List;
import java.util.Map;
import com.cloudera.sqoop.mapreduce.MergeJob;
import com.cloudera.sqoop.orm.TableClassName;
import com.cloudera.sqoop.util.ClassLoaderStack;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.StringUtils;
import com.cloudera.sqoop.Sqoop;
@ -66,6 +71,9 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool {
// store check column type for incremental option
private int checkColumnType;
// Set classloader for local job runner
private ClassLoader prevClassLoader = null;
public ImportTool() {
this("import", false);
}
@ -90,6 +98,34 @@ public List<String> getGeneratedJarFiles() {
return this.codeGenerator.getGeneratedJarFiles();
}
/**
* If jars must be loaded into the local environment, do so here.
*/
private void loadJars(Configuration conf, String ormJarFile,
String tableClassName) throws IOException {
boolean isLocal = "local".equals(conf.get("mapreduce.jobtracker.address"))
|| "local".equals(conf.get("mapred.job.tracker"));
if (isLocal) {
// If we're using the LocalJobRunner, then instead of using the compiled
// jar file as the job source, we're running in the current thread. Push
// on another classloader that loads from that jar in addition to
// everything currently on the classpath.
this.prevClassLoader = ClassLoaderStack.addJarFile(ormJarFile,
tableClassName);
}
}
/**
* If any classloader was invoked by loadJars, free it here.
*/
private void unloadJars() {
if (null != this.prevClassLoader) {
// unload the special classloader for this jar.
ClassLoaderStack.setCurrentClassLoader(this.prevClassLoader);
}
}
/**
* @return true if the supplied options specify an incremental import.
*/
@ -256,6 +292,7 @@ private boolean initIncrementalConstraints(SqoopOptions options,
return true;
}
FileSystem fs = FileSystem.get(options.getConf());
SqoopOptions.IncrementalMode incrementalMode = options.getIncrementalMode();
String nextIncrementalValue = null;
@ -280,6 +317,12 @@ private boolean initIncrementalConstraints(SqoopOptions options,
}
break;
case DateLastModified:
if (options.getMergeKeyCol() == null && !options.isAppendMode()
&& fs.exists(getOutputPath(options, context.getTableName(), false))) {
throw new ImportException("--" + MERGE_KEY_ARG + " or " + "--" + APPEND_ARG
+ " is required when using --" + this.INCREMENT_TYPE_ARG
+ " lastmodified and the output directory exists.");
}
checkColumnType = manager.getColumnTypes(options.getTableName(),
options.getSqlQuery()).get(options.getIncrementalTestColumn());
nextVal = manager.getCurrentDbTimestamp();
@ -381,6 +424,48 @@ private boolean initIncrementalConstraints(SqoopOptions options,
return true;
}
/**
* Merge HDFS output directories
*/
protected void lastModifiedMerge(SqoopOptions options, ImportJobContext context) throws IOException {
FileSystem fs = FileSystem.get(options.getConf());
if (context.getDestination() != null && fs.exists(context.getDestination())) {
Path userDestDir = getOutputPath(options, context.getTableName(), false);
if (fs.exists(userDestDir)) {
String tableClassName = null;
if (!context.getConnManager().isORMFacilitySelfManaged()) {
tableClassName =
new TableClassName(options).getClassForTable(context.getTableName());
}
Path destDir = getOutputPath(options, context.getTableName());
options.setExistingJarName(context.getJarFile());
options.setClassName(tableClassName);
options.setMergeOldPath(userDestDir.toString());
options.setMergeNewPath(context.getDestination().toString());
// Merge to temporary directory so that original directory remains intact.
options.setTargetDir(destDir.toString());
// Local job tracker needs jars in the classpath.
loadJars(options.getConf(), context.getJarFile(), context.getTableName());
MergeJob mergeJob = new MergeJob(options);
if (mergeJob.runMergeJob()) {
// Rename destination directory to proper location.
Path tmpDir = getOutputPath(options, context.getTableName());
fs.rename(userDestDir, tmpDir);
fs.rename(destDir, userDestDir);
fs.delete(tmpDir, true);
} else {
LOG.error("Merge MapReduce job failed!");
}
unloadJars();
} else {
fs.rename(context.getDestination(), userDestDir);
}
}
}
/**
* Import a table or query.
* @return true if an import was performed, false otherwise.
@ -392,9 +477,11 @@ protected boolean importTable(SqoopOptions options, String tableName,
// Generate the ORM code for the tables.
jarFile = codeGenerator.generateORM(options, tableName);
Path outputPath = getOutputPath(options, tableName);
// Do the actual import.
ImportJobContext context = new ImportJobContext(tableName, jarFile,
options, getOutputPath(options, tableName));
options, outputPath);
// If we're doing an incremental import, set up the
// filtering conditions used to get the latest records.
@ -415,6 +502,8 @@ protected boolean importTable(SqoopOptions options, String tableName,
if (options.isAppendMode()) {
AppendUtils app = new AppendUtils(context);
app.append();
} else if (options.getIncrementalMode() == SqoopOptions.IncrementalMode.DateLastModified) {
lastModifiedMerge(options, context);
}
// If the user wants this table to be in Hive, perform that post-load.
@ -449,11 +538,20 @@ private void deleteTargetDir(ImportJobContext context) throws IOException {
* if importing to hbase, this may return null.
*/
private Path getOutputPath(SqoopOptions options, String tableName) {
return getOutputPath(options, tableName, options.isAppendMode()
|| options.getIncrementalMode().equals(SqoopOptions.IncrementalMode.DateLastModified));
}
/**
* @return the output path for the imported files;
* if importing to hbase, this may return null.
*/
private Path getOutputPath(SqoopOptions options, String tableName, boolean temp) {
// Get output directory
String hdfsWarehouseDir = options.getWarehouseDir();
String hdfsTargetDir = options.getTargetDir();
Path outputPath = null;
if (options.isAppendMode()) {
if (temp) {
// Use temporary path, later removed when appending
String salt = tableName;
if(salt == null && options.getSqlQuery() != null) {
@ -586,6 +684,10 @@ protected RelatedOptions getImportOptions() {
+ " value of the primary key")
.withLongOpt(SQL_QUERY_BOUNDARY)
.create());
importOpts.addOption(OptionBuilder.withArgName("column")
.hasArg().withDescription("Key column to use to join results")
.withLongOpt(MERGE_KEY_ARG)
.create());
addValidationOpts(importOpts);
}
@ -798,6 +900,10 @@ public void applyOptions(CommandLine in, SqoopOptions out)
out.setBoundaryQuery(in.getOptionValue(SQL_QUERY_BOUNDARY));
}
if (in.hasOption(MERGE_KEY_ARG)) {
out.setMergeKeyCol(in.getOptionValue(MERGE_KEY_ARG));
}
applyValidationOptions(in, out);
}
@ -941,14 +1047,14 @@ protected void validateImportOptions(SqoopOptions options)
&& options.getHCatTableName() != null) {
throw new InvalidOptionsException("--hcatalog-table cannot be used "
+ " --warehouse-dir or --target-dir options");
} else if (options.isDeleteMode() && options.isAppendMode()) {
} else if (options.isDeleteMode() && options.isAppendMode()) {
throw new InvalidOptionsException("--append and --delete-target-dir can"
+ " not be used together.");
} else if (options.isDeleteMode() && options.getIncrementalMode()
} else if (options.isDeleteMode() && options.getIncrementalMode()
!= SqoopOptions.IncrementalMode.None) {
throw new InvalidOptionsException("--delete-target-dir can not be used"
+ " with incremental imports.");
}
}
}
/**
@ -969,6 +1075,13 @@ private void validateIncrementalOptions(SqoopOptions options)
"You must specify an incremental import mode with --"
+ INCREMENT_TYPE_ARG + ". " + HELP_STR);
}
if (options.getIncrementalMode() == SqoopOptions.IncrementalMode.DateLastModified
&& options.getFileLayout() == SqoopOptions.FileLayout.AvroDataFile) {
throw new InvalidOptionsException("--"
+ INCREMENT_TYPE_ARG + " lastmodified cannot be used in conjunction with --"
+ FMT_AVRODATAFILE_ARG + "." + HELP_STR);
}
}
@Override

View File

@ -322,11 +322,63 @@ public void assertDirOfNumbers(String tableName, int expectedNums) {
}
}
/**
* Look at a directory that should contain files full of an imported 'id'
* column and 'last_modified' column. Assert that all numbers in [0, expectedNums) are present
* in order.
*/
public void assertDirOfNumbersAndTimestamps(String tableName, int expectedNums) {
try {
FileSystem fs = FileSystem.getLocal(new Configuration());
Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
Path tableDir = new Path(warehouse, tableName);
FileStatus [] stats = fs.listStatus(tableDir);
String [] fileNames = new String[stats.length];
for (int i = 0; i < stats.length; i++) {
fileNames[i] = stats[i].getPath().toString();
}
Arrays.sort(fileNames);
// Read all the files in sorted order, adding the value lines to the list.
List<String> receivedNums = new ArrayList<String>();
for (String fileName : fileNames) {
if (fileName.startsWith("_") || fileName.startsWith(".")) {
continue;
}
BufferedReader r = new BufferedReader(
new InputStreamReader(fs.open(new Path(fileName))));
try {
while (true) {
String s = r.readLine();
if (null == s) {
break;
}
receivedNums.add(s.trim());
}
} finally {
r.close();
}
}
assertEquals(expectedNums, receivedNums.size());
// Compare the received values with the expected set.
for (int i = 0; i < expectedNums; i++) {
assertEquals((int) i, (int) Integer.valueOf(receivedNums.get(i).split(",")[0]));
}
} catch (Exception e) {
fail("Got unexpected exception: " + StringUtils.stringifyException(e));
}
}
/**
* Assert that a directory contains a file with exactly one line
* in it, containing the prescribed number 'val'.
*/
public void assertSpecificNumber(String tableName, int val) {
public void assertFirstSpecificNumber(String tableName, int val) {
try {
FileSystem fs = FileSystem.getLocal(new Configuration());
Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
@ -375,6 +427,53 @@ public void assertSpecificNumber(String tableName, int val) {
}
}
/**
* Assert that a directory contains a file with exactly one line
* in it, containing the prescribed number 'val'.
*/
public void assertSpecificNumber(String tableName, int val) {
try {
FileSystem fs = FileSystem.getLocal(new Configuration());
Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
Path tableDir = new Path(warehouse, tableName);
FileStatus [] stats = fs.listStatus(tableDir);
String [] filePaths = new String[stats.length];
for (int i = 0; i < stats.length; i++) {
filePaths[i] = stats[i].getPath().toString();
}
// Read the first file that is not a hidden file.
boolean foundVal = false;
for (String filePath : filePaths) {
String fileName = new Path(filePath).getName();
if (fileName.startsWith("_") || fileName.startsWith(".")) {
continue;
}
if (foundVal) {
// Make sure we don't have two or more "real" files in the dir.
fail("Got an extra data-containing file in this directory.");
}
BufferedReader r = new BufferedReader(
new InputStreamReader(fs.open(new Path(filePath))));
try {
String s = r.readLine();
if (val == (int) Integer.valueOf(s.trim().split(",")[0])) {
if (foundVal) {
fail("Expected only one result, but got another line: " + s);
}
foundVal = true;
}
} finally {
r.close();
}
}
} catch (IOException e) {
fail("Got unexpected exception: " + StringUtils.stringifyException(e));
}
}
public void runImport(SqoopOptions options, List<String> args) {
try {
Sqoop importer = new Sqoop(new ImportTool(), options.getConf(), options);
@ -465,7 +564,7 @@ private List<String> getArgListForQuery(String query, String directoryName,
args.add("--incremental");
args.add("lastmodified");
args.add("--check-column");
args.add("last_modified");
args.add("LAST_MODIFIED");
}
args.add("-m");
args.add("1");
@ -858,6 +957,156 @@ public void testModifyWithTimestamp() throws Exception {
// Import only the new row.
clearDir(TABLE_NAME);
runJob(TABLE_NAME);
assertFirstSpecificNumber(TABLE_NAME, 4000);
}
public void testUpdateModifyWithTimestamp() throws Exception {
// Create a table with data in it; import it.
// Then modify some existing rows, and verify that we only grab
// those rows.
final String TABLE_NAME = "updateModifyTimestamp";
Timestamp thePast = new Timestamp(System.currentTimeMillis() - 100);
createTimestampTable(TABLE_NAME, 10, thePast);
List<String> args = getArgListForTable(TABLE_NAME, false, false);
Configuration conf = newConf();
SqoopOptions options = new SqoopOptions();
options.setConf(conf);
runImport(options, args);
assertDirOfNumbers(TABLE_NAME, 10);
// Modify a row.
long importWasBefore = System.currentTimeMillis();
Thread.sleep(50);
long rowsAddedTime = System.currentTimeMillis() - 5;
assertTrue(rowsAddedTime > importWasBefore);
assertTrue(rowsAddedTime < System.currentTimeMillis());
SqoopOptions options2 = new SqoopOptions();
options2.setConnectString(SOURCE_DB_URL);
HsqldbManager manager = new HsqldbManager(options2);
Connection c = manager.getConnection();
PreparedStatement s = null;
try {
s = c.prepareStatement("UPDATE " + TABLE_NAME
+ " SET id=?, last_modified=? WHERE id=?");
s.setInt(1, 4000); // the first row should have '4000' in it now.
s.setTimestamp(2, new Timestamp(rowsAddedTime));
s.setInt(3, 0);
s.executeUpdate();
c.commit();
} finally {
s.close();
}
// Update the new row.
args.add("--last-value");
args.add(new Timestamp(importWasBefore).toString());
args.add("--merge-key");
args.add("id");
conf = newConf();
options = new SqoopOptions();
options.setConf(conf);
runImport(options, args);
assertSpecificNumber(TABLE_NAME, 4000);
}
public void testUpdateModifyWithTimestampWithQuery() throws Exception {
// Create an empty table. Import it; nothing happens.
// Add some rows. Verify they are appended.
final String TABLE_NAME = "UpdateModifyWithTimestampWithQuery";
Timestamp thePast = new Timestamp(System.currentTimeMillis() - 100);
createTimestampTable(TABLE_NAME, 10, thePast);
final String QUERY = "SELECT id, last_modified FROM UpdateModifyWithTimestampWithQuery WHERE $CONDITIONS";
List<String> args = getArgListForQuery(QUERY, TABLE_NAME,
true, false, false);
Configuration conf = newConf();
SqoopOptions options = new SqoopOptions();
options.setConf(conf);
runImport(options, args);
assertDirOfNumbersAndTimestamps(TABLE_NAME, 10);
// Modify a row.
long importWasBefore = System.currentTimeMillis();
Thread.sleep(50);
long rowsAddedTime = System.currentTimeMillis() - 5;
assertTrue(rowsAddedTime > importWasBefore);
assertTrue(rowsAddedTime < System.currentTimeMillis());
SqoopOptions options2 = new SqoopOptions();
options2.setConnectString(SOURCE_DB_URL);
HsqldbManager manager = new HsqldbManager(options2);
Connection c = manager.getConnection();
PreparedStatement s = null;
try {
s = c.prepareStatement("UPDATE " + TABLE_NAME
+ " SET id=?, last_modified=? WHERE id=?");
s.setInt(1, 4000); // the first row should have '4000' in it now.
s.setTimestamp(2, new Timestamp(rowsAddedTime));
s.setInt(3, 0);
s.executeUpdate();
c.commit();
} finally {
s.close();
}
// Update the new row.
args.add("--last-value");
args.add(new Timestamp(importWasBefore).toString());
args.add("--merge-key");
args.add("id");
conf = newConf();
options = new SqoopOptions();
options.setConf(conf);
runImport(options, args);
assertSpecificNumber(TABLE_NAME, 4000);
}
public void testUpdateModifyWithTimestampJob() throws Exception {
// Create a table with data in it; import it.
// Then modify some existing rows, and verify that we only grab
// those rows.
final String TABLE_NAME = "updateModifyTimestampJob";
Timestamp thePast = new Timestamp(System.currentTimeMillis() - 100);
createTimestampTable(TABLE_NAME, 10, thePast);
List<String> args = getArgListForTable(TABLE_NAME, false, false);
args.add("--merge-key");
args.add("id");
createJob(TABLE_NAME, args);
runJob(TABLE_NAME);
assertDirOfNumbers(TABLE_NAME, 10);
// Modify a row.
long importWasBefore = System.currentTimeMillis();
Thread.sleep(50);
long rowsAddedTime = System.currentTimeMillis() - 5;
assertTrue(rowsAddedTime > importWasBefore);
assertTrue(rowsAddedTime < System.currentTimeMillis());
SqoopOptions options2 = new SqoopOptions();
options2.setConnectString(SOURCE_DB_URL);
HsqldbManager manager = new HsqldbManager(options2);
Connection c = manager.getConnection();
PreparedStatement s = null;
try {
s = c.prepareStatement("UPDATE " + TABLE_NAME
+ " SET id=?, last_modified=? WHERE id=?");
s.setInt(1, 4000); // the first row should have '4000' in it now.
s.setTimestamp(2, new Timestamp(rowsAddedTime));
s.setInt(3, 0);
s.executeUpdate();
c.commit();
} finally {
s.close();
}
// Update the new row.
runJob(TABLE_NAME);
assertSpecificNumber(TABLE_NAME, 4000);
}