5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-20 19:00:48 +08:00

SQOOP-2525. Sqoop2: Add support for incremental From in HDFS Connector

(Jarcec via Hari)
This commit is contained in:
Hari Shreedharan 2015-09-18 12:52:25 -07:00
parent 208d5daf45
commit bd7252480f
12 changed files with 366 additions and 12 deletions

View File

@ -34,7 +34,7 @@ public enum HdfsConnectorError implements ErrorCode{
GENERIC_HDFS_CONNECTOR_0005("Error occurs during loader run"),
GENERIC_HDFS_CONNECTOR_0006("Unknown job type"),
GENERIC_HDFS_CONNECTOR_0007("Invalid output directory"),
GENERIC_HDFS_CONNECTOR_0007("Invalid input/output directory"),
GENERIC_HDFS_CONNECTOR_0008("Error occurs during destroyer run"),

View File

@ -33,4 +33,6 @@ public final class HdfsConstants extends Constants {
public static final String PREFIX = "org.apache.sqoop.connector.hdfs.";
public static final String WORK_DIRECTORY = PREFIX + "work_dir";
public static final String MAX_IMPORT_DATE = PREFIX + "max_import_date";
}

View File

@ -17,12 +17,17 @@
*/
package org.apache.sqoop.connector.hdfs;
import org.apache.log4j.Logger;
import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
import org.apache.sqoop.job.etl.Destroyer;
import org.apache.sqoop.job.etl.DestroyerContext;
import org.joda.time.DateTime;
public class HdfsFromDestroyer extends Destroyer<LinkConfiguration, FromJobConfiguration> {
private static final Logger LOG = Logger.getLogger(HdfsFromDestroyer.class);
/**
* Callback to clean up after job execution.
*
@ -31,8 +36,14 @@ public class HdfsFromDestroyer extends Destroyer<LinkConfiguration, FromJobConfi
* @param jobConfig FROM job configuration object
*/
@Override
public void destroy(DestroyerContext context, LinkConfiguration linkConfig,
FromJobConfiguration jobConfig) {
// do nothing at this point
public void destroy(DestroyerContext context, LinkConfiguration linkConfig, FromJobConfiguration jobConfig) {
LOG.info("Running HDFS connector destroyer");
}
@Override
public void updateConfiguration(DestroyerContext context, LinkConfiguration linkConfiguration, FromJobConfiguration jobConfiguration) {
LOG.info("Updating HDFS connector options");
long epoch = context.getLong(HdfsConstants.MAX_IMPORT_DATE, -1);
jobConfiguration.incremental.lastImportedDate = epoch == -1 ? null : new DateTime(epoch);
}
}

View File

@ -18,13 +18,25 @@
package org.apache.sqoop.connector.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.IncrementalType;
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
import org.apache.sqoop.error.code.HdfsConnectorError;
import org.apache.sqoop.job.etl.Initializer;
import org.apache.sqoop.job.etl.InitializerContext;
import org.apache.log4j.Logger;
import java.io.IOException;
public class HdfsFromInitializer extends Initializer<LinkConfiguration, FromJobConfiguration> {
public static final Logger LOG = Logger.getLogger(HdfsFromInitializer.class);
/**
* Initialize new submission based on given configuration properties. Any
* needed temporary values might be saved to context object and they will be
@ -36,8 +48,44 @@ public class HdfsFromInitializer extends Initializer<LinkConfiguration, FromJobC
*/
@Override
public void initialize(InitializerContext context, LinkConfiguration linkConfig, FromJobConfiguration jobConfig) {
assert jobConfig.incremental != null;
Configuration configuration = HdfsUtils.createConfiguration(linkConfig);
HdfsUtils.configurationToContext(configuration, context.getContext());
context.getContext().setAll(linkConfig.linkConfig.configOverrides);
boolean incremental = jobConfig.incremental.incrementalType != null && jobConfig.incremental.incrementalType == IncrementalType.NEW_FILES;
// In case of incremental import, we need to persist the highest last modified
try {
FileSystem fs = FileSystem.get(configuration);
Path path = new Path(jobConfig.fromJobConfig.inputDirectory);
LOG.info("Input directory: " + path.toString());
if(!fs.exists(path)) {
throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007, "Input directory doesn't exists");
}
if(fs.isFile(path)) {
throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007, "Input directory is a file");
}
if(incremental) {
LOG.info("Detected incremental import");
long maxModifiedTime = -1;
FileStatus[] fileStatuses = fs.listStatus(path);
for(FileStatus status : fileStatuses) {
if(maxModifiedTime < status.getModificationTime()) {
maxModifiedTime = status.getModificationTime();
}
}
LOG.info("Maximal age of file is: " + maxModifiedTime);
context.getContext().setLong(HdfsConstants.MAX_IMPORT_DATE, maxModifiedTime);
}
} catch (IOException e) {
throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007, "Unexpected exception", e);
}
}
}

View File

@ -38,8 +38,10 @@
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.NodeBase;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.IncrementalType;
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
import org.apache.sqoop.error.code.HdfsConnectorError;
import org.apache.sqoop.job.etl.Partition;
@ -52,6 +54,8 @@
*/
public class HdfsPartitioner extends Partitioner<LinkConfiguration, FromJobConfiguration> {
public static final Logger LOG = Logger.getLogger(HdfsPartitioner.class);
public static final String SPLIT_MINSIZE_PERNODE =
"mapreduce.input.fileinputformat.split.minsize.per.node";
public static final String SPLIT_MINSIZE_PERRACK =
@ -70,6 +74,8 @@ public class HdfsPartitioner extends Partitioner<LinkConfiguration, FromJobConfi
public List<Partition> getPartitions(PartitionerContext context,
LinkConfiguration linkConfiguration,
FromJobConfiguration fromJobConfig) {
assert fromJobConfig.incremental != null;
Configuration conf = new Configuration();
HdfsUtils.contextToConfiguration(context.getContext(), conf);
@ -118,6 +124,11 @@ public List<Partition> getPartitions(PartitionerContext context,
"size per rack " + minSizeRack);
}
// Incremental import related options
boolean incremental = fromJobConfig.incremental.incrementalType != null && fromJobConfig.incremental.incrementalType == IncrementalType.NEW_FILES;
long lastImportedDate = fromJobConfig.incremental.lastImportedDate != null ? fromJobConfig.incremental.lastImportedDate.getMillis() : -1;
long maxImportDate = context.getLong(HdfsConstants.MAX_IMPORT_DATE, -1);
// all the files in input set
String indir = fromJobConfig.fromJobConfig.inputDirectory;
FileSystem fs = FileSystem.get(conf);
@ -125,7 +136,19 @@ public List<Partition> getPartitions(PartitionerContext context,
List<Path> paths = new LinkedList<Path>();
for(FileStatus status : fs.listStatus(new Path(indir))) {
if(!status.isDir()) {
paths.add(status.getPath());
if(incremental) {
long modifiedDate = status.getModificationTime();
if(lastImportedDate < modifiedDate && modifiedDate <= maxImportDate) {
LOG.info("Will process input file: " + status.getPath() + " with modification date " + modifiedDate);
paths.add(status.getPath());
} else {
LOG.info("Skipping input file: " + status.getPath() + " with modification date " + modifiedDate);
}
} else {
// Without incremental mode, we're processing all files
LOG.info("Will process input file: " + status.getPath());
paths.add(status.getPath());
}
}
}

View File

@ -22,10 +22,14 @@
@ConfigurationClass
public class FromJobConfiguration {
@Config public FromJobConfig fromJobConfig;
@Config public IncrementalRead incremental;
public FromJobConfiguration() {
fromJobConfig = new FromJobConfig();
incremental = new IncrementalRead();
}
}

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.hdfs.configuration;
import org.apache.sqoop.model.ConfigClass;
import org.apache.sqoop.model.Input;
import org.apache.sqoop.validation.Status;
import org.apache.sqoop.validation.validators.AbstractValidator;
import org.joda.time.DateTime;
@ConfigClass
public class IncrementalRead {
@Input
public IncrementalType incrementalType;
@Input
public DateTime lastImportedDate;
public static class ConfigValidator extends AbstractValidator<IncrementalRead> {
@Override
public void validate(IncrementalRead conf) {
if(conf.incrementalType != IncrementalType.NEW_FILES && conf.lastImportedDate != null) {
addMessage(Status.ERROR, "Can't specify last imported date without enabling incremental import.");
}
}
}
}

View File

@ -0,0 +1,23 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.hdfs.configuration;
public enum IncrementalType {
NONE,
NEW_FILES, IncrementalType,
}

View File

@ -63,6 +63,15 @@ toJobConfig.nullValue.label = Null value
toJobConfig.nullValue.help = Use this particular character or sequence of characters \
as a value representing null when outputting to a file.
incremental.label = Incremental import
incremental.help = Information relevant for incremental import from HDFS
incremental.incrementalType.label = Incremental type
incremental.incrementalType.help = Type of incremental import
incremental.lastImportedDate.label = Last imported date
incremental.lastImportedDate.help = Date when last import happened
# From Job Config
#
fromJobConfig.label = From HDFS configuration

View File

@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.hdfs;
import org.apache.sqoop.common.MutableContext;
import org.apache.sqoop.common.MutableMapContext;
import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
import org.apache.sqoop.job.etl.Destroyer;
import org.apache.sqoop.job.etl.DestroyerContext;
import org.testng.annotations.Test;
import org.joda.time.DateTime;
import static org.testng.AssertJUnit.assertEquals;
public class TestFromDestroyer {
Destroyer<LinkConfiguration, FromJobConfiguration> destroyer;
LinkConfiguration linkConfig;
FromJobConfiguration jobConfig;
MutableContext context;
public TestFromDestroyer() {
linkConfig = new LinkConfiguration();
jobConfig = new FromJobConfiguration();
context = new MutableMapContext();
destroyer = new HdfsFromDestroyer();
}
@Test
public void testUpdateConfiguration() {
DateTime dt = new DateTime();
context.setLong(HdfsConstants.MAX_IMPORT_DATE, dt.getMillis());
destroyer.updateConfiguration(new DestroyerContext(context, true, null), linkConfig, jobConfig);
assertEquals(jobConfig.incremental.lastImportedDate, dt);
}
}

View File

@ -18,30 +18,77 @@
*/
package org.apache.sqoop.connector.hdfs;
import com.google.common.io.Files;
import org.apache.sqoop.common.MutableContext;
import org.apache.sqoop.common.MutableMapContext;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.IncrementalType;
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
import org.apache.sqoop.job.etl.Initializer;
import org.apache.sqoop.job.etl.InitializerContext;
import org.testng.annotations.Test;
import java.io.File;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
public class TestFromInitializer {
Initializer<LinkConfiguration, FromJobConfiguration> initializer;
InitializerContext initializerContext;
LinkConfiguration linkConfig;
FromJobConfiguration jobConfig;
MutableContext context;
public TestFromInitializer() {
linkConfig = new LinkConfiguration();
jobConfig = new FromJobConfiguration();
context = new MutableMapContext();
initializer = new HdfsFromInitializer();
initializerContext = new InitializerContext(context);
}
@Test
public void testConfigOverrides() {
LinkConfiguration linkConfig = new LinkConfiguration();
FromJobConfiguration jobConfig = new FromJobConfiguration();
linkConfig.linkConfig.uri = "file:///";
linkConfig.linkConfig.configOverrides.put("key", "value");
jobConfig.fromJobConfig.inputDirectory = "/tmp";
InitializerContext initializerContext = new InitializerContext(new MutableMapContext());
Initializer initializer = new HdfsFromInitializer();
initializer.initialize(initializerContext, linkConfig, jobConfig);
assertEquals(initializerContext.getString("key"), "value");
}
@Test(expectedExceptions = SqoopException.class)
public void testFailIfInputDirectoryDoNotExists() {
jobConfig.fromJobConfig.inputDirectory = "/tmp/this/directory/definitely/do/not/exists";
initializer.initialize(initializerContext, linkConfig, jobConfig);
}
@Test(expectedExceptions = SqoopException.class)
public void testFailIfInputDirectoryIsFile() throws Exception {
File workDir = Files.createTempDir();
File inputFile = File.createTempFile("part-01-", ".txt", workDir);
inputFile.createNewFile();
jobConfig.fromJobConfig.inputDirectory = inputFile.getAbsolutePath();
initializer.initialize(initializerContext, linkConfig, jobConfig);
}
@Test
public void testIncremental() throws Exception {
File workDir = Files.createTempDir();
File.createTempFile("part-01-", ".txt", workDir).createNewFile();
File.createTempFile("part-02-", ".txt", workDir).createNewFile();
jobConfig.fromJobConfig.inputDirectory = workDir.getAbsolutePath();
jobConfig.incremental.incrementalType = IncrementalType.NEW_FILES;
initializer.initialize(initializerContext, linkConfig, jobConfig);
// Max import date must be defined if we are running incremental
assertNotNull(context.getString(HdfsConstants.MAX_IMPORT_DATE));
}
}

View File

@ -0,0 +1,93 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.integration.connector.hdfs;
import org.apache.sqoop.connector.hdfs.configuration.IncrementalType;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MLink;
import org.apache.sqoop.test.testcases.ConnectorTestCase;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
public class HdfsIncrementalReadTest extends ConnectorTestCase {
@BeforeMethod(alwaysRun = true)
public void createTable() {
createTableCities();
}
@AfterMethod(alwaysRun = true)
public void dropTable() {
super.dropTable();
}
@Test
public void testBasic() throws Exception {
createFromFile("input-0001",
"1,'USA','2004-10-23','San Francisco'"
);
// RDBMS link
MLink rdbmsLink = getClient().createLink("generic-jdbc-connector");
fillRdbmsLinkConfig(rdbmsLink);
saveLink(rdbmsLink);
// HDFS link
MLink hdfsLink = getClient().createLink("hdfs-connector");
fillHdfsLink(hdfsLink);
saveLink(hdfsLink);
// Job creation
MJob job = getClient().createJob(hdfsLink.getPersistenceId(), rdbmsLink.getPersistenceId());
fillHdfsFromConfig(job);
job.getFromJobConfig().getEnumInput("incremental.incrementalType").setValue(IncrementalType.NEW_FILES);
fillRdbmsToConfig(job);
saveJob(job);
// Execute for the first time
executeJob(job);
assertEquals(provider.rowCount(getTableName()), 1);
assertRowInCities(1, "USA", "2004-10-23", "San Francisco");
// Second execution
createFromFile("input-0002",
"2,'USA','2004-10-24','Sunnyvale'",
"3,'Czech Republic','2004-10-25','Brno'"
);
executeJob(job);
assertEquals(provider.rowCount(getTableName()), 3);
assertRowInCities(1, "USA", "2004-10-23", "San Francisco");
assertRowInCities(2, "USA", "2004-10-24", "Sunnyvale");
assertRowInCities(3, "Czech Republic", "2004-10-25", "Brno");
// And last execution
createFromFile("input-0003",
"4,'USA','2004-10-26','Palo Alto'"
);
executeJob(job);
assertEquals(provider.rowCount(getTableName()), 4);
assertRowInCities(1, "USA", "2004-10-23", "San Francisco");
assertRowInCities(2, "USA", "2004-10-24", "Sunnyvale");
assertRowInCities(3, "Czech Republic", "2004-10-25", "Brno");
assertRowInCities(4, "USA", "2004-10-26", "Palo Alto");
}
}