5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-09 01:50:07 +08:00

SQOOP-638: Add an optional, simple and extensible validation framework for sqoop

(Venkatesh Seetharam via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2012-12-01 13:00:47 -08:00
parent 0f0066f525
commit 0b465594d2
22 changed files with 871 additions and 7 deletions

View File

@ -54,6 +54,8 @@ include::import-all-tables.txt[]
include::export.txt[]
include::validation.txt[]
include::saved-jobs.txt[]
include::codegen.txt[]
@ -77,4 +79,3 @@ include::connectors.txt[]
include::support.txt[]
include::troubleshooting.txt[]

View File

@ -36,4 +36,3 @@ Argument Description
+\--connection-param-file <filename>+ Optional properties file that\
provides connection parameters
-------------------------------------------------------------------------------

View File

@ -42,6 +42,8 @@ another.
include::common-args.txt[]
include::validation-args.txt[]
.Export control arguments:
[grid="all"]
`----------------------------------------`------------------------------
@ -266,3 +268,10 @@ Sqoop attempts to insert rows which violate constraints in the database
fails.
Another basic export to populate a table named +bar+ with validation enabled:
<<validation,More Details>>
----
$ sqoop export --connect jdbc:mysql://db.example.com/foo --table bar \
--export-dir /results/bar_data --validate
----

View File

@ -48,6 +48,8 @@ include::common-args.txt[]
include::connecting.txt[]
include::validation-args.txt[]
.Import control arguments:
[grid="all"]
`---------------------------------`--------------------------------------
@ -677,4 +679,12 @@ $ sqoop import --connect jdbc:mysql://db.foo.com/somedb --table sometable \
--where "id > 100000" --target-dir /incremental_dataset --append
----
An import of a table named +EMPLOYEES+ in the +corp+ database that uses
validation to validate the import using the table row count and number of
rows copied into HDFS:
<<validation,More Details>>
----
$ sqoop import --connect jdbc:mysql://db.foo.com/corp \
--table EMPLOYEES --validate
----

View File

@ -0,0 +1,32 @@
////
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
////
.Validation arguments <<validation,More Details>>
[grid="all"]
`----------------------------------------`-------------------------------------
Argument Description
-------------------------------------------------------------------------------
+\--validate+ Enable validation of data copied, \
supports single table copy only. \
+\--validator <class-name>+ Specify validator class to use.
+\--validation-threshold <class-name>+ Specify validation threshold class \
to use.
+\--validation-failurehandler <class-name>+ Specify validation failure \
handler class to use.
-------------------------------------------------------------------------------

View File

@ -0,0 +1,136 @@
////
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
////
[[validation]]
+validation+
--------------
Purpose
~~~~~~~
Validate the data copied, either import or export by comparing the row
counts from the source and the target post copy.
Introduction
~~~~~~~~~~~~
There are 3 basic interfaces:
ValidationThreshold - Determines if the error margin between the source and
target are acceptable: Absolute, Percentage Tolerant, etc.
Default implementation is AbsoluteValidationThreshold which ensures the row
counts from source and targets are the same.
ValidationFailureHandler - Responsible for handling failures: log an
error/warning, abort, etc.
Default implementation is LogOnFailureHandler that logs a warning message to
the configured logger.
Validator - Drives the validation logic by delegating the decision to
ValidationThreshold and delegating failure handling to ValidationFailureHandler.
The default implementation is RowCountValidator which validates the row
counts from source and the target.
Syntax
~~~~~~
----
$ sqoop import (generic-args) (import-args)
$ sqoop export (generic-args) (export-args)
----
Validation arguments are part of import and export arguments.
Configuration
~~~~~~~~~~~~~
The validation framework is extensible and pluggable. It comes with default
implementations but the interfaces can be extended to allow custom
implementations by passing them as part of the command line arguments as
described below.
.Validator
Property: validator
Description: Driver for validation,
must implement org.apache.sqoop.validation.Validator
Supported values: The value has to be a fully qualified class name.
Default value: org.apache.sqoop.validation.RowCountValidator
.Validation Threshold
Property: validation-threshold
Description: Drives the decision based on the validation meeting the
threshold or not. Must implement
org.apache.sqoop.validation.ValidationThreshold
Supported values: The value has to be a fully qualified class name.
Default value: org.apache.sqoop.validation.AbsoluteValidationThreshold
.Validation Failure Handler
Property: validation-failurehandler
Description: Responsible for handling failures, must implement
org.apache.sqoop.validation.ValidationFailureHandler
Supported values: The value has to be a fully qualified class name.
Default value: org.apache.sqoop.validation.LogOnFailureHandler
Limitations
~~~~~~~~~~~
Validation currently only validates data copied from a single table into HDFS.
The following are the limitations in the current implementation:
* all-tables option
* free-form query option
* Data imported into Hive or HBase
* table import with --where argument
* incremental imports
Example Invocations
~~~~~~~~~~~~~~~~~~~
A basic import of a table named +EMPLOYEES+ in the +corp+ database that uses
validation to validate the row counts:
----
$ sqoop import --connect jdbc:mysql://db.foo.com/corp \
--table EMPLOYEES --validate
----
A basic export to populate a table named +bar+ with validation enabled:
----
$ sqoop export --connect jdbc:mysql://db.example.com/foo --table bar \
--export-dir /results/bar_data --validate
----
Another example that overrides the validation args:
----
$ sqoop import --connect jdbc:mysql://db.foo.com/corp --table EMPLOYEES \
--validate --validator org.apache.sqoop.validation.RowCountValidator \
--validation-threshold \
org.apache.sqoop.validation.AbsoluteValidationThreshold \
--validation-failurehandler \
org.apache.sqoop.validation.LogOnFailureHandler
----

View File

@ -18,10 +18,19 @@
package com.cloudera.sqoop.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat;
import com.cloudera.sqoop.SqoopOptions;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.sqoop.config.ConfigurationHelper;
import org.apache.sqoop.manager.ConnManager;
import org.apache.sqoop.validation.*;
import java.io.IOException;
import java.sql.SQLException;
/**
* @deprecated Moving to use org.apache.sqoop namespace.
@ -44,4 +53,28 @@ public JobBase(final SqoopOptions opts,
super(opts, mapperClass, inputFormatClass, outputFormatClass);
}
protected long getRowCountFromDB(ConnManager connManager, String tableName)
throws SQLException {
return connManager.getTableRowCount(tableName);
}
protected long getRowCountFromHadoop(Job job)
throws IOException, InterruptedException {
return ConfigurationHelper.getNumMapOutputRecords(job);
}
protected void doValidate(SqoopOptions options, Configuration conf,
ValidationContext validationContext)
throws ValidationException {
Validator validator = (Validator) ReflectionUtils.newInstance(
options.getValidatorClass(), conf);
ValidationThreshold threshold = (ValidationThreshold)
ReflectionUtils.newInstance(options.getValidationThresholdClass(),
conf);
ValidationFailureHandler failureHandler = (ValidationFailureHandler)
ReflectionUtils.newInstance(options.getValidationFailureHandlerClass(),
conf);
validator.validate(validationContext, threshold, failureHandler);
}
}

View File

@ -39,6 +39,9 @@
import com.cloudera.sqoop.util.RandomHash;
import com.cloudera.sqoop.util.StoredAsProperty;
import org.apache.sqoop.util.LoggingUtils;
import org.apache.sqoop.validation.AbsoluteValidationThreshold;
import org.apache.sqoop.validation.LogOnFailureHandler;
import org.apache.sqoop.validation.RowCountValidator;
/**
* Configurable state used by Sqoop tools.
@ -248,6 +251,13 @@ public String toString() {
// (JobBase, etc).
private SqoopTool activeSqoopTool;
// Flag to determine if data copied needs to be validated against the source
private boolean isValidationEnabled;
// These take FQCN as input, convert them to Class in light of failing early
private Class validatorClass; // Class for the validator implementation.
private Class validationThresholdClass; // ValidationThreshold implementation
private Class validationFailureHandlerClass; // FailureHandler implementation
public SqoopOptions() {
initDefaults(null);
}
@ -819,6 +829,10 @@ private void initDefaults(Configuration baseConfiguration) {
// We do not want to be verbose too much if not explicitly needed
this.verbose = false;
this.isValidationEnabled = false; // validation is disabled by default
this.validatorClass = RowCountValidator.class;
this.validationThresholdClass = AbsoluteValidationThreshold.class;
this.validationFailureHandlerClass = LogOnFailureHandler.class;
}
/**
@ -1899,9 +1913,7 @@ public void setMergeKeyCol(String col) {
this.mergeKeyCol = col;
}
/**
* Return the name of the column used to merge an old and new dataset.
*/
/** Return the name of the column used to merge an old and new dataset. */
public String getMergeKeyCol() {
return this.mergeKeyCol;
}
@ -1963,5 +1975,37 @@ public void setConnectionParams(Properties params) {
public Properties getConnectionParams() {
return connectionParams;
}
}
public void setValidationEnabled(boolean validationEnabled) {
isValidationEnabled = validationEnabled;
}
public boolean isValidationEnabled() {
return isValidationEnabled;
}
public Class getValidatorClass() {
return validatorClass;
}
public void setValidatorClass(Class validatorClazz) {
this.validatorClass = validatorClazz;
}
public Class getValidationThresholdClass() {
return validationThresholdClass;
}
public void setValidationThresholdClass(Class validationThresholdClazz) {
this.validationThresholdClass = validationThresholdClazz;
}
public Class getValidationFailureHandlerClass() {
return validationFailureHandlerClass;
}
public void setValidationFailureHandlerClass(
Class validationFailureHandlerClazz) {
this.validationFailureHandlerClass = validationFailureHandlerClazz;
}
}

View File

@ -21,6 +21,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.sql.SQLException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -44,6 +45,7 @@
import com.cloudera.sqoop.orm.TableClassName;
import com.cloudera.sqoop.mapreduce.JobBase;
import com.cloudera.sqoop.util.ExportException;
import org.apache.sqoop.validation.*;
/**
* Base class for running an export MapReduce job.
@ -374,6 +376,10 @@ public void runExport() throws ExportException, IOException {
if (!success) {
throw new ExportException("Export job failed!");
}
if (options.isValidationEnabled()) {
validateExport(tableName, conf, job);
}
} catch (InterruptedException ie) {
throw new IOException(ie);
} catch (ClassNotFoundException cnfe) {
@ -399,6 +405,26 @@ public void runExport() throws ExportException, IOException {
}
}
protected void validateExport(String tableName, Configuration conf, Job job)
throws ExportException {
LOG.debug("Validating exported data.");
try {
ValidationContext validationContext = new ValidationContext(
getRowCountFromHadoop(job),
getRowCountFromDB(context.getConnManager(), tableName));
doValidate(options, conf, validationContext);
} catch (ValidationException e) {
throw new ExportException("Error validating row counts", e);
} catch (SQLException e) {
throw new ExportException("Error retrieving DB target row count", e);
} catch (IOException e) {
throw new ExportException("Error retrieving source row count", e);
} catch (InterruptedException e) {
throw new ExportException("Error retrieving source row count", e);
}
}
/**
* @return true if the input directory contains SequenceFiles.
* @deprecated use {@link #getInputFileType()} instead

View File

@ -19,6 +19,7 @@
package org.apache.sqoop.mapreduce;
import java.io.IOException;
import java.sql.SQLException;
import org.apache.avro.file.DataFileConstants;
import org.apache.avro.mapred.AvroJob;
@ -44,6 +45,7 @@
import com.cloudera.sqoop.mapreduce.JobBase;
import com.cloudera.sqoop.orm.TableClassName;
import com.cloudera.sqoop.util.ImportException;
import org.apache.sqoop.validation.*;
/**
* Base class for running an import MapReduce job.
@ -212,6 +214,10 @@ public void runImport(String tableName, String ormJarFile, String splitByCol,
if (!success) {
throw new ImportException("Import job failed!");
}
if (options.isValidationEnabled()) {
validateImport(tableName, conf, job);
}
} catch (InterruptedException ie) {
throw new IOException(ie);
} catch (ClassNotFoundException cnfe) {
@ -222,6 +228,26 @@ public void runImport(String tableName, String ormJarFile, String splitByCol,
}
}
protected void validateImport(String tableName, Configuration conf, Job job)
throws ImportException {
LOG.debug("Validating imported data.");
try {
ValidationContext validationContext = new ValidationContext(
getRowCountFromDB(context.getConnManager(), tableName), // source
getRowCountFromHadoop(job)); // target
doValidate(options, conf, validationContext);
} catch (ValidationException e) {
throw new ImportException("Error validating row counts", e);
} catch (SQLException e) {
throw new ImportException("Error retrieving DB source row count", e);
} catch (IOException e) {
throw new ImportException("Error retrieving target row count", e);
} catch (InterruptedException e) {
throw new ImportException("Error retrieving target row count", e);
}
}
/**
* Open-ended "setup" routine that is called after the job is configured
* but just before it is submitted to MapReduce. Subclasses may override

View File

@ -142,6 +142,14 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool {
public static final String UPDATE_KEY_ARG = "update-key";
public static final String UPDATE_MODE_ARG = "update-mode";
// Arguments for validation.
public static final String VALIDATE_ARG = "validate";
public static final String VALIDATOR_CLASS_ARG = "validator";
public static final String VALIDATION_THRESHOLD_CLASS_ARG =
"validation-threshold";
public static final String VALIDATION_FAILURE_HANDLER_CLASS_ARG =
"validation-failurehandler";
// Arguments for incremental imports.
public static final String INCREMENT_TYPE_ARG = "incremental";
public static final String INCREMENT_COL_ARG = "check-column";
@ -619,7 +627,29 @@ protected RelatedOptions getHBaseOptions() {
return hbaseOpts;
}
@SuppressWarnings("static-access")
protected void addValidationOpts(RelatedOptions validationOptions) {
validationOptions.addOption(OptionBuilder
.withDescription("Validate the copy using the configured validator")
.withLongOpt(VALIDATE_ARG)
.create());
validationOptions.addOption(OptionBuilder
.withArgName(VALIDATOR_CLASS_ARG).hasArg()
.withDescription("Fully qualified class name for the Validator")
.withLongOpt(VALIDATOR_CLASS_ARG)
.create());
validationOptions.addOption(OptionBuilder
.withArgName(VALIDATION_THRESHOLD_CLASS_ARG).hasArg()
.withDescription("Fully qualified class name for ValidationThreshold")
.withLongOpt(VALIDATION_THRESHOLD_CLASS_ARG)
.create());
validationOptions.addOption(OptionBuilder
.withArgName(VALIDATION_FAILURE_HANDLER_CLASS_ARG).hasArg()
.withDescription("Fully qualified class name for "
+ "ValidationFailureHandler")
.withLongOpt(VALIDATION_FAILURE_HANDLER_CLASS_ARG)
.create());
}
/**
* Apply common command-line to the state.
@ -885,6 +915,39 @@ protected void applyHBaseOptions(CommandLine in, SqoopOptions out) {
}
}
protected void applyValidationOptions(CommandLine in, SqoopOptions out)
throws InvalidOptionsException {
if (in.hasOption(VALIDATE_ARG)) {
out.setValidationEnabled(true);
}
// Class Names are converted to Class in light of failing early
if (in.hasOption(VALIDATOR_CLASS_ARG)) {
out.setValidatorClass(
getClassByName(in.getOptionValue(VALIDATOR_CLASS_ARG)));
}
if (in.hasOption(VALIDATION_THRESHOLD_CLASS_ARG)) {
out.setValidationThresholdClass(
getClassByName(in.getOptionValue(VALIDATION_THRESHOLD_CLASS_ARG)));
}
if (in.hasOption(VALIDATION_FAILURE_HANDLER_CLASS_ARG)) {
out.setValidationFailureHandlerClass(getClassByName(
in.getOptionValue(VALIDATION_FAILURE_HANDLER_CLASS_ARG)));
}
}
protected Class<?> getClassByName(String className)
throws InvalidOptionsException {
try {
return Class.forName(className, true,
Thread.currentThread().getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new InvalidOptionsException(e.getMessage());
}
}
protected void validateCommonOptions(SqoopOptions options)
throws InvalidOptionsException {
if (options.getConnectString() == null) {

View File

@ -177,6 +177,8 @@ protected RelatedOptions getExportOptions() {
.withLongOpt(UPDATE_MODE_ARG)
.create());
addValidationOpts(exportOpts);
return exportOpts;
}
@ -271,6 +273,7 @@ public void applyOptions(CommandLine in, SqoopOptions out)
out.setClearStagingTable(true);
}
applyValidationOptions(in, out);
applyNewUpdateOptions(in, out);
applyInputFormatOptions(in, out);
applyOutputFormatOptions(in, out);

View File

@ -508,6 +508,7 @@ public int run(SqoopOptions options) {
* @return the RelatedOptions that can be used to parse the import
* arguments.
*/
@SuppressWarnings("static-access")
protected RelatedOptions getImportOptions() {
// Imports
RelatedOptions importOpts = new RelatedOptions("Import control arguments");
@ -554,6 +555,8 @@ protected RelatedOptions getImportOptions() {
+ " value of the primary key")
.withLongOpt(SQL_QUERY_BOUNDARY)
.create());
addValidationOpts(importOpts);
}
importOpts.addOption(OptionBuilder.withArgName("dir")
@ -756,6 +759,8 @@ public void applyOptions(CommandLine in, SqoopOptions out)
if (in.hasOption(SQL_QUERY_BOUNDARY)) {
out.setBoundaryQuery(in.getOptionValue(SQL_QUERY_BOUNDARY));
}
applyValidationOptions(in, out);
}
if (in.hasOption(WAREHOUSE_DIR_ARG)) {
@ -873,6 +878,20 @@ protected void validateImportOptions(SqoopOptions options)
throw new InvalidOptionsException(
"Direct import currently do not support dropping hive delimiters,"
+ " please remove parameter --hive-drop-import-delims.");
} else if (allTables && options.isValidationEnabled()) {
throw new InvalidOptionsException("Validation is not supported for "
+ "all tables but single table only.");
} else if (options.getSqlQuery() != null && options.isValidationEnabled()) {
throw new InvalidOptionsException("Validation is not supported for "
+ "free from query but single table only.");
} else if (options.getWhereClause() != null
&& options.isValidationEnabled()) {
throw new InvalidOptionsException("Validation is not supported for "
+ "where clause but single table only.");
} else if (options.getIncrementalMode()
!= SqoopOptions.IncrementalMode.None && options.isValidationEnabled()) {
throw new InvalidOptionsException("Validation is not supported for "
+ "incremental imports but single table only.");
}
}

View File

@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.validation;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* A specific implementation of ValidationThreshold that validates based on
* two values being the same.
*
* This is used as the default ValidationThreshold implementation unless
* overridden in configuration.
*/
public class AbsoluteValidationThreshold implements ValidationThreshold {
private static final Log LOG =
LogFactory.getLog(AbsoluteValidationThreshold.class.getName());
@Override
public void setThresholdValue(long value) {
}
static final ValidationThreshold INSTANCE = new AbsoluteValidationThreshold();
@Override
@SuppressWarnings("unchecked")
public boolean compare(Comparable left, Comparable right) {
LOG.debug("Absolute Validation threshold comparing "
+ left + " with " + right);
return (Math.abs(left.compareTo(right)) == 0);
}
}

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.validation;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* A specific implementation of ValidationFailureHandler that logs the failure
* message and the reason with the configured logger.
*
* This is used as the default handler unless overridden in configuration.
*/
public class LogOnFailureHandler implements ValidationFailureHandler {
private static final Log LOG =
LogFactory.getLog(LogOnFailureHandler.class.getName());
static final ValidationFailureHandler INSTANCE = new LogOnFailureHandler();
@Override
public boolean handle(ValidationContext context) throws ValidationException {
LOG.warn(context.getMessage() + ", Reason: " + context.getReason());
return true;
}
}

View File

@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.validation;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* A specific implementation of validator that validates data copied,
* either import or export, using row counts from the data source and
* the target systems.
*
* This is used as the default validator unless overridden in configuration.
*/
public class RowCountValidator implements Validator {
public static final Log LOG = LogFactory.getLog(
RowCountValidator.class.getName());
@Override
public boolean validate(ValidationContext context)
throws ValidationException {
return validate(context,
AbsoluteValidationThreshold.INSTANCE, LogOnFailureHandler.INSTANCE);
}
@Override
public boolean validate(ValidationContext validationContext,
ValidationThreshold validationThreshold,
ValidationFailureHandler validationFailureHandler)
throws ValidationException {
LOG.debug("Validating data using row counts: Source ["
+ validationContext.getSourceRowCount() + "] with Target["
+ validationContext.getTargetRowCount() + "]");
if (validationThreshold.compare(validationContext.getSourceRowCount(),
validationContext.getTargetRowCount())) {
return true;
}
validationContext.setMessage(this.getClass().getSimpleName());
validationContext.setReason("The expected counter value was "
+ validationContext.getSourceRowCount() + " but the actual value was "
+ validationContext.getTargetRowCount());
return validationFailureHandler.handle(validationContext);
}
}

View File

@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.validation;
/**
* This object encapsulates the context for the validation framework.
* Before validation, the row counts are stored. Post validation,
* the message and failure reason are captured.
*/
public class ValidationContext {
private final long sourceRowCount;
private final long targetRowCount;
private String message;
private String reason;
public ValidationContext(long sourceRowCount, long targetRowCount) {
this.sourceRowCount = sourceRowCount;
this.targetRowCount = targetRowCount;
}
public String getMessage() {
return message;
}
public void setMessage(String aMessage) {
this.message = aMessage;
}
public String getReason() {
return reason;
}
public void setReason(String aReason) {
this.reason = aReason;
}
public long getSourceRowCount() {
return sourceRowCount;
}
public long getTargetRowCount() {
return targetRowCount;
}
}

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.validation;
/**
* An implementation of Exception that is used to propagate
* validation related errors or failures.
*/
public class ValidationException extends Exception {
public ValidationException(String s, Throwable throwable) {
super(s, throwable);
}
@Override
public String toString() {
String msg = getMessage();
return (null == msg) ? "ValidationException" : msg;
}
}

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.validation;
/**
* This is the primary interface that dictates as to
* how the validation failures are handled.
*/
public interface ValidationFailureHandler {
/**
* Method that handles the validation failure.
*
* @param validationContext validation context
* @return if failure was handled or not
* @throws ValidationException
*/
boolean handle(ValidationContext validationContext)
throws ValidationException;
}

View File

@ -0,0 +1,30 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.validation;
/**
* This is the primary interface that is responsible for driving the actual
* decision on validation based on an optional error margin threshold.
*/
public interface ValidationThreshold {
void setThresholdValue(long value);
boolean compare(Comparable left, Comparable right);
}

View File

@ -0,0 +1,55 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.validation;
/**
* This represents the primary interface that drives the validation logic
* by delegating the decision to ValidationThreshold and failure handling
* to ValidationFailureHandler. Uses ValidationContext to encapsulate
* the various required parameters.
*/
public interface Validator {
/**
* Method to validate the data copy with default implementations
* for ValidationThreshold and ValidationFailureHandler.
*
* @param validationContext validation context
* @return if validation was successful or not
* @throws ValidationException
*/
boolean validate(ValidationContext validationContext)
throws ValidationException;
/**
* Method to validate the data copy with specific implementations
* for ValidationThreshold and ValidationFailureHandler.
*
* @param validationContext validation context
* @param validationThreshold specific implementation of ValidationThreshold
* @param validationFailureHandler specific implementation of
* ValidationFailureHandler
* @return if validation was successful or not
* @throws ValidationException
*/
boolean validate(ValidationContext validationContext,
ValidationThreshold validationThreshold,
ValidationFailureHandler validationFailureHandler)
throws ValidationException;
}

View File

@ -0,0 +1,90 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.validation;
import com.cloudera.sqoop.testutil.ImportJobTestCase;
import org.apache.hadoop.conf.Configuration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
* Tests for RowCountValidator.
*/
public class RowCountValidatorImportTest extends ImportJobTestCase {
protected List<String> getExtraArgs(Configuration conf) {
ArrayList<String> list = new ArrayList<String>(1);
list.add("--validate");
return list;
}
/**
* Test the implementation for AbsoluteValidationThreshold.
* Both arguments should be same else fail.
*/
public void testAbsoluteValidationThreshold() {
ValidationThreshold validationThreshold = new AbsoluteValidationThreshold();
assertTrue(validationThreshold.compare(100, 100));
assertFalse(validationThreshold.compare(100, 90));
assertFalse(validationThreshold.compare(90, 100));
}
/**
* Test if teh --validate flag actually made it through the options.
*
* @throws Exception
*/
public void testValidateOptionIsEnabled() throws Exception {
String[] types = {"INT NOT NULL PRIMARY KEY", "VARCHAR(32)", "VARCHAR(32)"};
String[] insertVals = {"1", "'Bob'", "'sales'"};
try {
createTableWithColTypes(types, insertVals);
String[] args = getArgv(true, null, getConf());
ArrayList<String> argsList = new ArrayList<String>();
Collections.addAll(argsList, args);
assertTrue("Validate option missing.", argsList.contains("--validate"));
} finally {
dropTableIfExists(getTableName());
}
}
/**
* Test the validation for a sample import, positive case.
*
* @throws Exception
*/
public void testValidatorForImportTable() throws Exception {
String[] types = {"INT NOT NULL PRIMARY KEY", "VARCHAR(32)", "VARCHAR(32)"};
String[] insertVals = {"1", "'Bob'", "'sales'"};
String validateLine = "1,Bob,sales";
try {
createTableWithColTypes(types, insertVals);
verifyImport(validateLine, null);
LOG.debug("Verified input line as " + validateLine + " -- ok!");
} finally {
dropTableIfExists(getTableName());
}
}
}