mirror of
https://github.com/apache/sqoop.git
synced 2025-05-03 00:42:27 +08:00
Merge e0538e2bb1
into 2328971411
This commit is contained in:
commit
3c92a02ca3
21
bin/sqoop
21
bin/sqoop
@ -98,4 +98,25 @@ bin=`dirname ${prgm}`
|
||||
bin=`cd ${bin} && pwd`
|
||||
|
||||
source ${bin}/configure-sqoop "${bin}"
|
||||
#exec ${HADOOP_COMMON_HOME}/bin/hadoop org.apache.sqoop.Sqoop "$@"
|
||||
if [ "--config" = "$1" ]
|
||||
then
|
||||
shift
|
||||
if [[ $1 =~ "--" ]]
|
||||
then
|
||||
echo "you need input hadoop-config values."
|
||||
exit -1
|
||||
elif [[ $1 = "codegen" ]] || [[ $1 = "create-hive-table" ]] || [[ $1 = "eval" ]] || [[ $1 = "export" ]] || [[ $1 = "help" ]] || [[ $1 = "import" ]] || [[ $1 = "import-all-tables" ]] || [[ $1 = "import-mainframe" ]] || [[ $1 = "job" ]] || [[ $1 = "list-databases" ]] || [[ $1 = "list-tables" ]] || [[ $1 = "merge" ]] || [[ $1 = "metastore" ]] || [[ $1 = "version" ]]
|
||||
then
|
||||
echo "you need input hadoop-config values."
|
||||
exit -1
|
||||
else
|
||||
hadoopconfig=$1
|
||||
shift
|
||||
fi
|
||||
fi
|
||||
if [ ! -n "$hadoopconfig" ] ;then
|
||||
exec ${HADOOP_COMMON_HOME}/bin/hadoop org.apache.sqoop.Sqoop "$@"
|
||||
else
|
||||
exec ${HADOOP_COMMON_HOME}/bin/hadoop --config "$hadoopconfig" org.apache.sqoop.Sqoop "$@"
|
||||
fi
|
||||
|
@ -186,5 +186,10 @@ under the License.
|
||||
</description>
|
||||
</property>
|
||||
-->
|
||||
<property>
|
||||
<name>com.newland.component.FujianBI.service.list</name>
|
||||
<value>com.newland.component.FujianBI.service.impl.KerberosLoginService</value>
|
||||
<description>service list</description>
|
||||
</property>
|
||||
|
||||
</configuration>
|
||||
|
1370
pom-old.xml
1370
pom-old.xml
File diff suppressed because it is too large
Load Diff
@ -46,7 +46,7 @@ public class DataDrivenDBInputFormat<T extends DBWritable>
|
||||
*
|
||||
* @deprecated use org.apache.sqoop.mapreduce.db.DataDrivenDBInputFormat.
|
||||
* DataDrivenDBInputSplit instead.
|
||||
* @see org.apache.sqoop.mapreduce.db.DataDrivenDBInputFormat.
|
||||
* @see org.apache.sqoop.mapreduce.db.DataDrivenDBInputFormat
|
||||
* DataDrivenDBInputSplit
|
||||
*/
|
||||
public static class DataDrivenDBInputSplit extends
|
||||
|
@ -20,6 +20,7 @@
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
import com.newland.component.FujianBI.service.ServiceTool;
|
||||
import org.apache.commons.lang.ArrayUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
@ -221,6 +222,14 @@ public static int runTool(String [] args, Configuration conf) {
|
||||
}
|
||||
|
||||
String toolName = expandedArgs[0];
|
||||
|
||||
// Only import and export add service tool processing
|
||||
if (!isWindow() && (toolName.equals("import") || toolName.equals("export"))) {
|
||||
ServiceTool serviceTool = ServiceTool.builder(conf, args);
|
||||
serviceTool.initServices();
|
||||
serviceTool.startServices();
|
||||
}
|
||||
|
||||
Configuration pluginConf = SqoopTool.loadPlugins(conf);
|
||||
SqoopTool tool = SqoopTool.getTool(toolName);
|
||||
if (null == tool) {
|
||||
@ -243,6 +252,20 @@ public static int runTool(String [] args) {
|
||||
return runTool(args, new Configuration());
|
||||
}
|
||||
|
||||
/**
|
||||
* 是否是本地测试
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
public static boolean isWindow() {
|
||||
String systemType = System.getProperty("os.name");
|
||||
if (systemType.toUpperCase().startsWith("WINDOWS")) {
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String [] args) {
|
||||
if (args.length == 0) {
|
||||
System.err.println("Try 'sqoop help' for usage.");
|
||||
|
6
src/java/org/apache/sqoop/SqoopVersion.java
Normal file
6
src/java/org/apache/sqoop/SqoopVersion.java
Normal file
@ -0,0 +1,6 @@
|
||||
package org.apache.sqoop;
|
||||
|
||||
public class SqoopVersion {
|
||||
public static final String VERSION = "1.4.7";
|
||||
// public static final String GIT_HASH = "";
|
||||
}
|
@ -31,6 +31,8 @@
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import static org.apache.sqoop.tool.BaseSqoopTool.ENCODE;
|
||||
|
||||
/**
|
||||
* Converts an input record from a string representation to a parsed Sqoop
|
||||
* record and emits that DBWritable to the OutputFormat for writeback to the
|
||||
@ -44,6 +46,7 @@ public class TextExportMapper
|
||||
public static final Log LOG =
|
||||
LogFactory.getLog(TextExportMapper.class.getName());
|
||||
|
||||
private String encoding;
|
||||
private SqoopRecord recordImpl;
|
||||
|
||||
boolean enableDataDumpOnError;
|
||||
@ -80,13 +83,21 @@ protected void setup(Context context)
|
||||
}
|
||||
|
||||
enableDataDumpOnError = conf.getBoolean(DUMP_DATA_ON_ERROR_KEY, false);
|
||||
|
||||
encoding = conf.get(ENCODE);
|
||||
}
|
||||
|
||||
|
||||
public void map(LongWritable key, Text val, Context context)
|
||||
throws IOException, InterruptedException {
|
||||
try {
|
||||
recordImpl.parse(val);
|
||||
// 据说转码比较消耗性能
|
||||
if (encoding != null) {
|
||||
String newValue = new String(val.getBytes(), 0, val.getLength(), encoding);
|
||||
recordImpl.parse(newValue);
|
||||
} else {
|
||||
recordImpl.parse(val);
|
||||
}
|
||||
context.write(recordImpl, NullWritable.get());
|
||||
} catch (Exception e) {
|
||||
// Something bad has happened
|
||||
|
@ -87,7 +87,8 @@ public List<InputSplit> split(Configuration conf, ResultSet results,
|
||||
// Catch any overage and create the closed interval for the last split.
|
||||
if (curLower <= maxVal || splits.size() == 1) {
|
||||
splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
|
||||
lowClausePrefix + Double.toString(curUpper),
|
||||
// lowClausePrefix + Double.toString(curUpper),
|
||||
lowClausePrefix + Double.toString(curLower),
|
||||
colName + " <= " + Double.toString(maxVal)));
|
||||
}
|
||||
|
||||
|
@ -174,6 +174,8 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool {
|
||||
public static final String THROW_ON_ERROR_ARG = "throw-on-error";
|
||||
public static final String ORACLE_ESCAPING_DISABLED = "oracle-escaping-disabled";
|
||||
public static final String ESCAPE_MAPPING_COLUMN_NAMES_ENABLED = "escape-mapping-column-names";
|
||||
public static final String FILE_ENCODING = "fileencoding";//文件编码
|
||||
public static final String ENCODE = "sqoop.mapreduce.export.encode";
|
||||
|
||||
// Arguments for validation.
|
||||
public static final String VALIDATE_ARG = "validate";
|
||||
|
@ -208,6 +208,23 @@ public void configureOptions(ToolOptions toolOptions) {
|
||||
|
||||
toolOptions.addUniqueOptions(codeGenOpts);
|
||||
toolOptions.addUniqueOptions(getHCatalogOptions());
|
||||
|
||||
toolOptions.addUniqueOptions(getFileencodingOptions());
|
||||
}
|
||||
|
||||
/**
|
||||
* 文件编码
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
protected RelatedOptions getFileencodingOptions() {
|
||||
RelatedOptions fileencodingOptions = new RelatedOptions("fileencoding arguments");
|
||||
fileencodingOptions.addOption(OptionBuilder
|
||||
.hasArg()
|
||||
.withDescription("fileencoding")
|
||||
.withLongOpt("fileencoding")
|
||||
.create());
|
||||
return fileencodingOptions;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -279,6 +296,11 @@ public void applyOptions(CommandLine in, SqoopOptions out)
|
||||
out.setCall(in.getOptionValue(CALL_ARG));
|
||||
}
|
||||
|
||||
//设置文件编码
|
||||
if (in.hasOption(FILE_ENCODING)) {
|
||||
out.getConf().set(ENCODE, in.getOptionValue(FILE_ENCODING));
|
||||
}
|
||||
|
||||
applyValidationOptions(in, out);
|
||||
applyNewUpdateOptions(in, out);
|
||||
applyInputFormatOptions(in, out);
|
||||
|
31
src/test/org/apache/sqoop/SqoopTest.java
Normal file
31
src/test/org/apache/sqoop/SqoopTest.java
Normal file
@ -0,0 +1,31 @@
|
||||
package org.apache.sqoop;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class SqoopTest {
|
||||
|
||||
private String[] args;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
// String params = "import --connect jdbc:oracle:thin:@10.1.0.242:1521:ywxx --username bishow --password bishow -m 4 --split-by 'product_id' --query 'select sum_date,product_name,product_id from cqx_test2 where $CONDITIONS' --target-dir '/cqx/hivetable/cqx_test2/' --fields-terminated-by '|' --as-textfile --delete-target-dir --null-string '' --null-non-string ''";
|
||||
// args = params.split(" ", -1);
|
||||
String[] arg = {"import", "--connect", "jdbc:oracle:thin:@10.1.0.242:1521:ywxx",
|
||||
"--username", "bishow", "--password", "C%MuhN#q$4", "-m", "4", "--split-by", "product_id", "--query",
|
||||
"select sum_date,product_name,product_id from cqx_test2 where $CONDITIONS",
|
||||
"--target-dir", "/cqx/hivetable/cqx_test2/", "--fields-terminated-by", "|", "--as-textfile",
|
||||
"--delete-target-dir", "--null-string", "", "--null-non-string", ""};
|
||||
args = arg;
|
||||
System.out.println("args:");
|
||||
for (String p : args) {
|
||||
System.out.print(p+" ");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void run() {
|
||||
int ret = Sqoop.runTool(args);
|
||||
System.out.println("ret:" + ret);
|
||||
}
|
||||
}
|
126
src/test/org/apache/sqoop/mapreduce/db/DateSplitterTest.java
Normal file
126
src/test/org/apache/sqoop/mapreduce/db/DateSplitterTest.java
Normal file
@ -0,0 +1,126 @@
|
||||
package org.apache.sqoop.mapreduce.db;
|
||||
|
||||
import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;
|
||||
import org.apache.hadoop.mapreduce.InputSplit;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.sql.Types;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
public class DateSplitterTest {
|
||||
|
||||
private OracleDateSplitter dateSplitter;
|
||||
private SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
||||
|
||||
@Test
|
||||
public void split() throws Exception {
|
||||
dateSplitter = new OracleDateSplitter();
|
||||
String colName = "checkTime";
|
||||
final long MS_IN_SEC = 1000L;
|
||||
long minVal;
|
||||
long maxVal;
|
||||
|
||||
int sqlDataType = Types.TIMESTAMP;
|
||||
minVal = df.parse("2019-04-22 00:00:00").getTime();
|
||||
maxVal = df.parse("2019-04-22 23:59:59").getTime();
|
||||
|
||||
String lowClausePrefix = colName + " >= ";
|
||||
String highClausePrefix = colName + " < ";
|
||||
|
||||
int numSplits = 1440;
|
||||
if (numSplits < 1) {
|
||||
numSplits = 1;
|
||||
}
|
||||
|
||||
if (minVal == Long.MIN_VALUE && maxVal == Long.MIN_VALUE) {
|
||||
// The range of acceptable dates is NULL to NULL. Just create a single
|
||||
// split.
|
||||
List<InputSplit> splits = new ArrayList<InputSplit>();
|
||||
splits.add(new com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat.DataDrivenDBInputSplit(
|
||||
colName + " IS NULL", colName + " IS NULL"));
|
||||
return;
|
||||
}
|
||||
|
||||
// For split size we are using seconds. So we need to convert to milliseconds.
|
||||
long splitLimit = -1 * MS_IN_SEC;
|
||||
|
||||
// Gather the split point integers
|
||||
List<Long> splitPoints = dateSplitter.split(numSplits, splitLimit, minVal, maxVal);
|
||||
List<InputSplit> splits = new ArrayList<InputSplit>();
|
||||
|
||||
// Turn the split points into a set of intervals.
|
||||
long start = splitPoints.get(0);
|
||||
Date startDate = longToDate(start, sqlDataType);
|
||||
if (sqlDataType == Types.TIMESTAMP) {
|
||||
// The lower bound's nanos value needs to match the actual lower-bound
|
||||
// nanos.
|
||||
try {
|
||||
((java.sql.Timestamp) startDate).setNanos(0);
|
||||
} catch (NullPointerException npe) {
|
||||
// If the lower bound was NULL, we'll get an NPE; just ignore it and
|
||||
// don't set nanos.
|
||||
}
|
||||
}
|
||||
|
||||
for (int i = 1; i < splitPoints.size(); i++) {
|
||||
long end = splitPoints.get(i);
|
||||
Date endDate = longToDate(end, sqlDataType);
|
||||
|
||||
if (i == splitPoints.size() - 1) {
|
||||
if (sqlDataType == Types.TIMESTAMP) {
|
||||
// The upper bound's nanos value needs to match the actual
|
||||
// upper-bound nanos.
|
||||
try {
|
||||
((java.sql.Timestamp) endDate).setNanos(0);
|
||||
} catch (NullPointerException npe) {
|
||||
// If the upper bound was NULL, we'll get an NPE; just ignore it
|
||||
// and don't set nanos.
|
||||
}
|
||||
}
|
||||
// This is the last one; use a closed interval.
|
||||
splits.add(new com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat.DataDrivenDBInputSplit(
|
||||
lowClausePrefix + dateSplitter.dateToString(startDate),
|
||||
colName + " <= " + dateSplitter.dateToString(endDate)));
|
||||
} else {
|
||||
// Normal open-interval case.
|
||||
splits.add(new com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat.DataDrivenDBInputSplit(
|
||||
lowClausePrefix + dateSplitter.dateToString(startDate),
|
||||
highClausePrefix + dateSplitter.dateToString(endDate)));
|
||||
}
|
||||
|
||||
start = end;
|
||||
startDate = endDate;
|
||||
}
|
||||
|
||||
if (minVal == Long.MIN_VALUE || maxVal == Long.MIN_VALUE) {
|
||||
// Add an extra split to handle the null case that we saw.
|
||||
splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
|
||||
colName + " IS NULL", colName + " IS NULL"));
|
||||
}
|
||||
|
||||
printList(splits);
|
||||
}
|
||||
|
||||
private <E> void printList(List<E> list) {
|
||||
for (E e : list) {
|
||||
System.out.println(e.toString());
|
||||
}
|
||||
}
|
||||
|
||||
private Date longToDate(long val, int sqlDataType) {
|
||||
switch (sqlDataType) {
|
||||
case Types.DATE:
|
||||
return new java.sql.Date(val);
|
||||
case Types.TIME:
|
||||
return new java.sql.Time(val);
|
||||
case Types.TIMESTAMP:
|
||||
return new java.sql.Timestamp(val);
|
||||
default: // Shouldn't ever hit this case.
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,59 @@
|
||||
package org.apache.sqoop.mapreduce.db;
|
||||
|
||||
import org.apache.hadoop.mapreduce.InputSplit;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public class FloatSplitterTest {
|
||||
|
||||
@Test
|
||||
public void split() {
|
||||
double MIN_INCREMENT = 10000 * Double.MIN_VALUE;
|
||||
|
||||
System.out.println("Generating splits for a floating-point index column. Due to the");
|
||||
System.out.println("imprecise representation of floating-point values in Java, this");
|
||||
System.out.println("may result in an incomplete import.");
|
||||
System.out.println("You are strongly encouraged to choose an integral split column.");
|
||||
|
||||
List<InputSplit> splits = new ArrayList<InputSplit>();
|
||||
String colName = "float_code";
|
||||
double minVal = 1.111;
|
||||
double maxVal = 133.333;
|
||||
|
||||
// Use this as a hint. May need an extra task if the size doesn't
|
||||
// divide cleanly.
|
||||
int numSplits = 2;
|
||||
double splitSize = (maxVal - minVal) / (double) numSplits;
|
||||
|
||||
if (splitSize < MIN_INCREMENT) {
|
||||
splitSize = MIN_INCREMENT;
|
||||
}
|
||||
|
||||
String lowClausePrefix = colName + " >= ";
|
||||
String highClausePrefix = colName + " < ";
|
||||
|
||||
double curLower = minVal;
|
||||
double curUpper = curLower + splitSize;
|
||||
|
||||
while (curUpper < maxVal) {
|
||||
splits.add(new com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat.DataDrivenDBInputSplit(
|
||||
lowClausePrefix + Double.toString(curLower),
|
||||
highClausePrefix + Double.toString(curUpper)));
|
||||
|
||||
curLower = curUpper;
|
||||
curUpper += splitSize;
|
||||
}
|
||||
|
||||
// Catch any overage and create the closed interval for the last split.
|
||||
if (curLower <= maxVal || splits.size() == 1) {
|
||||
splits.add(new com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat.DataDrivenDBInputSplit(
|
||||
// lowClausePrefix + Double.toString(curUpper),
|
||||
lowClausePrefix + Double.toString(curLower),
|
||||
colName + " <= " + Double.toString(maxVal)));
|
||||
}
|
||||
|
||||
System.out.println(splits);
|
||||
}
|
||||
}
|
@ -19,14 +19,21 @@
|
||||
package org.apache.sqoop.mapreduce.db;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.sql.SQLException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import com.cloudera.sqoop.mapreduce.db.BigDecimalSplitter;
|
||||
import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;
|
||||
import org.apache.hadoop.mapreduce.InputSplit;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class TestBigDecimalSplitter {
|
||||
|
||||
private org.apache.sqoop.mapreduce.db.BigDecimalSplitter bigDecimalSplitter = new org.apache.sqoop.mapreduce.db.BigDecimalSplitter();
|
||||
|
||||
/* Test if the decimal split sizes are generated as expected */
|
||||
@Test
|
||||
public void testDecimalTryDivide() {
|
||||
@ -60,4 +67,54 @@ public void testRecurringTryDivide() {
|
||||
assertEquals(expected, out);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSplit() throws SQLException {
|
||||
String colName = "cur_lac";
|
||||
|
||||
BigDecimal minVal = new BigDecimal(6591);
|
||||
BigDecimal maxVal = new BigDecimal(24996);
|
||||
|
||||
String lowClausePrefix = colName + " >= ";
|
||||
String highClausePrefix = colName + " < ";
|
||||
|
||||
BigDecimal numSplits = new BigDecimal(2000);
|
||||
|
||||
if (minVal == null && maxVal == null) {
|
||||
// Range is null to null. Return a null split accordingly.
|
||||
List<InputSplit> splits = new ArrayList<InputSplit>();
|
||||
splits.add(new com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat.DataDrivenDBInputSplit(
|
||||
colName + " IS NULL", colName + " IS NULL"));
|
||||
return;
|
||||
}
|
||||
|
||||
if (minVal == null || maxVal == null) {
|
||||
// Don't know what is a reasonable min/max value for interpolation. Fail.
|
||||
System.out.println("Cannot find a range for NUMERIC or DECIMAL "
|
||||
+ "fields with one end NULL.");
|
||||
return;
|
||||
}
|
||||
|
||||
// Get all the split points together.
|
||||
List<BigDecimal> splitPoints = bigDecimalSplitter.split(numSplits, minVal, maxVal);
|
||||
List<InputSplit> splits = new ArrayList<InputSplit>();
|
||||
|
||||
// Turn the split points into a set of intervals.
|
||||
BigDecimal start = splitPoints.get(0);
|
||||
for (int i = 1; i < splitPoints.size(); i++) {
|
||||
BigDecimal end = splitPoints.get(i);
|
||||
|
||||
if (i == splitPoints.size() - 1) {
|
||||
// This is the last one; use a closed interval.
|
||||
splits.add(new com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat.DataDrivenDBInputSplit(
|
||||
lowClausePrefix + start.toString(),
|
||||
colName + " <= " + end.toString()));
|
||||
} else {
|
||||
// Normal open-interval case.
|
||||
splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
|
||||
lowClausePrefix + start.toString(),
|
||||
highClausePrefix + end.toString()));
|
||||
}
|
||||
start = end;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,32 @@
|
||||
package org.apache.sqoop.mapreduce.db;
|
||||
|
||||
import org.apache.hadoop.mapreduce.InputSplit;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public class TestBooleanSplitter {
|
||||
|
||||
@Test
|
||||
public void split() {
|
||||
List<InputSplit> splits = new ArrayList<>();
|
||||
String colName = "isCheck";
|
||||
|
||||
boolean minVal = false;
|
||||
boolean maxVal = true;
|
||||
|
||||
// Use one or two splits.
|
||||
if (!minVal) {
|
||||
splits.add(new com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat.DataDrivenDBInputSplit(
|
||||
colName + " = FALSE", colName + " = FALSE"));
|
||||
}
|
||||
|
||||
if (maxVal) {
|
||||
splits.add(new com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat.DataDrivenDBInputSplit(
|
||||
colName + " = TRUE", colName + " = TRUE"));
|
||||
}
|
||||
|
||||
System.out.println(splits);
|
||||
}
|
||||
}
|
@ -18,9 +18,13 @@
|
||||
package org.apache.sqoop.mapreduce.db;
|
||||
|
||||
import java.sql.SQLException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import com.cloudera.sqoop.config.ConfigurationHelper;
|
||||
import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;
|
||||
import com.cloudera.sqoop.mapreduce.db.IntegerSplitter;
|
||||
import org.apache.hadoop.mapreduce.InputSplit;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
@ -143,6 +147,7 @@ public void testEvenSplitsWithLimit() throws SQLException {
|
||||
@Test
|
||||
public void testOddSplitsWithLimit() throws SQLException {
|
||||
List<Long> splits = new IntegerSplitter().split(5, 10, 0, 95);
|
||||
System.out.println(splits);
|
||||
long [] expected = { 0, 10, 20, 30, 40, 50, 59, 68, 77, 86, 95};
|
||||
assertLongArrayEquals(expected, toLongArray(splits));
|
||||
}
|
||||
@ -150,6 +155,7 @@ public void testOddSplitsWithLimit() throws SQLException {
|
||||
@Test
|
||||
public void testSplitWithBiggerLimit() throws SQLException {
|
||||
List<Long> splits = new IntegerSplitter().split(10, 15, 0, 100);
|
||||
System.out.println(splits);
|
||||
long [] expected = {0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100};
|
||||
assertLongArrayEquals(expected, toLongArray(splits));
|
||||
}
|
||||
@ -157,7 +163,56 @@ public void testSplitWithBiggerLimit() throws SQLException {
|
||||
@Test
|
||||
public void testFractionalSplitWithLimit() throws SQLException {
|
||||
List<Long> splits = new IntegerSplitter().split(5, 1, 1, 10);
|
||||
System.out.println(splits);
|
||||
long [] expected = {1,2, 3, 4, 5, 6, 7, 8, 9, 10, 10};
|
||||
assertLongArrayEquals(expected, toLongArray(splits));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSplit() throws Exception {
|
||||
org.apache.sqoop.mapreduce.db.IntegerSplitter integerSplitter = new org.apache.sqoop.mapreduce.db.IntegerSplitter();
|
||||
String colName = "cnt";
|
||||
long minVal = 1;
|
||||
long maxVal = 100;
|
||||
|
||||
String lowClausePrefix = colName + " >= ";
|
||||
String highClausePrefix = colName + " < ";
|
||||
|
||||
int numSplits = 3;
|
||||
if (numSplits < 1) {
|
||||
numSplits = 1;
|
||||
}
|
||||
|
||||
long splitLimit = -1;
|
||||
|
||||
// Get all the split points together.
|
||||
List<Long> splitPoints = integerSplitter.split(numSplits, splitLimit, minVal, maxVal);
|
||||
System.out.println(String.format("Splits: [%,28d to %,28d] into %d parts",
|
||||
minVal, maxVal, numSplits));
|
||||
for (int i = 0; i < splitPoints.size(); i++) {
|
||||
System.out.println(String.format("%,28d", splitPoints.get(i)));
|
||||
}
|
||||
List<InputSplit> splits = new ArrayList<InputSplit>();
|
||||
|
||||
// Turn the split points into a set of intervals.
|
||||
long start = splitPoints.get(0);
|
||||
for (int i = 1; i < splitPoints.size(); i++) {
|
||||
long end = splitPoints.get(i);
|
||||
|
||||
if (i == splitPoints.size() - 1) {
|
||||
// This is the last one; use a closed interval.
|
||||
splits.add(new com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat.DataDrivenDBInputSplit(
|
||||
lowClausePrefix + Long.toString(start),
|
||||
colName + " <= " + Long.toString(end)));
|
||||
} else {
|
||||
// Normal open-interval case.
|
||||
splits.add(new com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat.DataDrivenDBInputSplit(
|
||||
lowClausePrefix + Long.toString(start),
|
||||
highClausePrefix + Long.toString(end)));
|
||||
}
|
||||
|
||||
start = end;
|
||||
}
|
||||
System.out.println(splits);
|
||||
}
|
||||
}
|
||||
|
@ -19,9 +19,13 @@
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.sql.SQLException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import com.cloudera.sqoop.config.ConfigurationHelper;
|
||||
import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;
|
||||
import com.cloudera.sqoop.mapreduce.db.TextSplitter;
|
||||
import org.apache.hadoop.mapreduce.InputSplit;
|
||||
import org.apache.sqoop.validation.ValidationException;
|
||||
import org.junit.Test;
|
||||
|
||||
@ -173,4 +177,93 @@ public void testNChar() throws SQLException {
|
||||
assertEquals(false, splitter2.isUseNCharStrings());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSplit() throws Exception {
|
||||
System.out.println("Generating splits for a textual index column.");
|
||||
System.out.println("If your database sorts in a case-insensitive order, "
|
||||
+ "this may result in a partial import or duplicate records.");
|
||||
System.out.println("You are strongly encouraged to choose an integral split column.");
|
||||
|
||||
org.apache.sqoop.mapreduce.db.TextSplitter textSplitter = new org.apache.sqoop.mapreduce.db.TextSplitter();
|
||||
boolean useNCharStrings = false;
|
||||
String colName = "produce_name";
|
||||
String minString = "1231";
|
||||
String maxString = "12324";
|
||||
|
||||
boolean minIsNull = false;
|
||||
|
||||
// If the min value is null, switch it to an empty string instead for
|
||||
// purposes of interpolation. Then add [null, null] as a special case
|
||||
// split.
|
||||
if (null == minString) {
|
||||
minString = "";
|
||||
minIsNull = true;
|
||||
}
|
||||
|
||||
if (null == maxString) {
|
||||
// If the max string is null, then the min string has to be null too.
|
||||
// Just return a special split for this case.
|
||||
List<InputSplit> splits = new ArrayList<InputSplit>();
|
||||
splits.add(new com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat.DataDrivenDBInputSplit(
|
||||
colName + " IS NULL", colName + " IS NULL"));
|
||||
return;
|
||||
}
|
||||
|
||||
// Use this as a hint. May need an extra task if the size doesn't
|
||||
// divide cleanly.
|
||||
// 本地是1
|
||||
// 远程默认是2
|
||||
int numSplits = 3;
|
||||
|
||||
String lowClausePrefix = colName + " >= " + (useNCharStrings ? "N'" : "'");
|
||||
String highClausePrefix = colName + " < " + (useNCharStrings ? "N'" : "'");
|
||||
|
||||
// If there is a common prefix between minString and maxString, establish
|
||||
// it and pull it out of minString and maxString.
|
||||
int maxPrefixLen = Math.min(minString.length(), maxString.length());
|
||||
int sharedLen;
|
||||
for (sharedLen = 0; sharedLen < maxPrefixLen; sharedLen++) {
|
||||
char c1 = minString.charAt(sharedLen);
|
||||
char c2 = maxString.charAt(sharedLen);
|
||||
if (c1 != c2) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// The common prefix has length 'sharedLen'. Extract it from both.
|
||||
String commonPrefix = minString.substring(0, sharedLen);
|
||||
minString = minString.substring(sharedLen);
|
||||
maxString = maxString.substring(sharedLen);
|
||||
|
||||
List<String> splitStrings = textSplitter.split(numSplits, minString, maxString,
|
||||
commonPrefix);
|
||||
List<InputSplit> splits = new ArrayList<InputSplit>();
|
||||
|
||||
// Convert the list of split point strings into an actual set of
|
||||
// InputSplits.
|
||||
String start = splitStrings.get(0);
|
||||
for (int i = 1; i < splitStrings.size(); i++) {
|
||||
String end = splitStrings.get(i);
|
||||
|
||||
if (i == splitStrings.size() - 1) {
|
||||
// This is the last one; use a closed interval.
|
||||
splits.add(new com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat.DataDrivenDBInputSplit(
|
||||
lowClausePrefix + start + "'", colName
|
||||
+ " <= " + (useNCharStrings ? "N'" : "'") + end + "'"));
|
||||
} else {
|
||||
// Normal open-interval case.
|
||||
splits.add(new com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat.DataDrivenDBInputSplit(
|
||||
lowClausePrefix + start + "'", highClausePrefix + end + "'"));
|
||||
}
|
||||
|
||||
start = end;
|
||||
}
|
||||
|
||||
if (minIsNull) {
|
||||
// Add the special null split at the end.
|
||||
splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
|
||||
colName + " IS NULL", colName + " IS NULL"));
|
||||
}
|
||||
System.out.println(splits);
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user