5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-03 06:51:49 +08:00

1、集成外挂服务,目前集成了sqoop API

2、增加测试用例
3、sqoop启动脚本支持切换hadoop集群
This commit is contained in:
chenqixu 2019-04-23 13:48:40 +08:00
parent 5b503ea89c
commit 912fbc1c9c
12 changed files with 1262 additions and 559 deletions

View File

@ -98,4 +98,25 @@ bin=`dirname ${prgm}`
bin=`cd ${bin} && pwd`
source ${bin}/configure-sqoop "${bin}"
#exec ${HADOOP_COMMON_HOME}/bin/hadoop org.apache.sqoop.Sqoop "$@"
if [ "--config" = "$1" ]
then
shift
if [[ $1 =~ "--" ]]
then
echo "you need input hadoop-config values."
exit -1
elif [[ $1 = "codegen" ]] || [[ $1 = "create-hive-table" ]] || [[ $1 = "eval" ]] || [[ $1 = "export" ]] || [[ $1 = "help" ]] || [[ $1 = "import" ]] || [[ $1 = "import-all-tables" ]] || [[ $1 = "import-mainframe" ]] || [[ $1 = "job" ]] || [[ $1 = "list-databases" ]] || [[ $1 = "list-tables" ]] || [[ $1 = "merge" ]] || [[ $1 = "metastore" ]] || [[ $1 = "version" ]]
then
echo "you need input hadoop-config values."
exit -1
else
hadoopconfig=$1
shift
fi
fi
if [ ! -n "$hadoopconfig" ] ;then
exec ${HADOOP_COMMON_HOME}/bin/hadoop org.apache.sqoop.Sqoop "$@"
else
exec ${HADOOP_COMMON_HOME}/bin/hadoop --config "$hadoopconfig" org.apache.sqoop.Sqoop "$@"
fi

View File

@ -186,5 +186,10 @@ under the License.
</description>
</property>
-->
<property>
<name>com.newland.component.FujianBI.service.list</name>
<value>com.newland.component.FujianBI.service.impl.KerberosLoginService</value>
<description>service list</description>
</property>
</configuration>

View File

@ -26,7 +26,7 @@ case that someone actually needs it. We strongly encourage you to use
ant instead. You have been warned!
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
@ -38,7 +38,8 @@ ant instead. You have been warned!
<groupId>org.apache.sqoop</groupId>
<artifactId>sqoop</artifactId>
<version>1.4.0-incubating-SNAPSHOT</version>
<!-- <version>1.4.0-incubating-SNAPSHOT</version> -->
<version>1.4.7</version>
<packaging>jar</packaging>
<name>Apache Sqoop</name>
@ -86,21 +87,138 @@ ant instead. You have been warned!
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compile.source>1.6</maven.compile.source>
<maven.compile.target>1.6</maven.compile.target>
<avroVersion>1.5.2</avroVersion>
<maven.compile.source>1.7</maven.compile.source>
<maven.compile.target>1.7</maven.compile.target>
<avroVersion>1.8.1</avroVersion>
<!--
<hadoopVersion>0.20.203.0</hadoopVersion>
-->
<!-- FIXME Cloudera Distribution dependency version -->
<hadoopVersion>0.20.2-cdh3u1</hadoopVersion>
<hbaseVersion>0.90.3-cdh3u1</hbaseVersion>
<!-- <hadoopVersion>0.20.2-cdh3u1</hadoopVersion>
<hbaseVersion>0.90.3-cdh3u1</hbaseVersion> -->
<hadoopVersion>2.6.0</hadoopVersion>
<hadoopMRVersion>2.6.0-mr1-cdh5.7.2</hadoopMRVersion>
<hbaseVersion>1.1.1</hbaseVersion>
<hiveVersion>1.0.0</hiveVersion>
<log4j.version>1.2.16</log4j.version>
<accumulo.version>1.6.0</accumulo.version>
<kitesdk.version>1.0.0</kitesdk.version>
</properties>
<dependencies>
<!-- 丢失的依赖 -->
<dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-core</artifactId>
<version>${accumulo.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-minicluster</artifactId>
<version>${accumulo.version}</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hive.hcatalog</groupId>
<artifactId>hive-hcatalog-core</artifactId>
<version>${hiveVersion}</version>
<exclusions>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
<version>3.6</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.4</version>
</dependency>
<dependency>
<groupId>org.kitesdk</groupId>
<artifactId>kite-data-hive</artifactId>
<version>${kitesdk.version}</version>
</dependency>
<dependency>
<groupId>org.kitesdk</groupId>
<artifactId>kite-data-mapreduce</artifactId>
<version>${kitesdk.version}</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>org.json</artifactId>
<version>2.0</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>9.4-1201-jdbc41</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>1.4.197</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.oracle</groupId>
<artifactId>ojdbc7</artifactId>
<version>12.1.0.2</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- 服务 -->
<dependency>
<groupId>com.newland.bi.component</groupId>
<artifactId>nl-component-FujianBI-common</artifactId>
<version>1.0</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- 丢失的依赖 -->
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
@ -165,7 +283,7 @@ ant instead. You have been warned!
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>${hadoopVersion}</version>
<version>${hadoopMRVersion}</version>
<exclusions>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
@ -173,7 +291,97 @@ ant instead. You have been warned!
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoopVersion}</version>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<!-- 修改hbase依赖 -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbaseVersion}</version>
<exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-json</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.thrift</groupId>
<artifactId>thrift</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-mapreduce</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-mapreduce</artifactId>
<version>2.0.1</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbaseVersion}</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbaseVersion}</version>
<classifier>tests</classifier>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>${hbaseVersion}</version>
<classifier>tests</classifier>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
@ -212,34 +420,63 @@ ant instead. You have been warned!
<version>${hbaseVersion}</version>
<classifier>tests</classifier>
<scope>test</scope>
</dependency> -->
<!-- 修改hbase依赖 -->
<!-- 增加hive依赖 -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hiveVersion}</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- 增加hive依赖 -->
<dependency>
<groupId>hsqldb</groupId>
<artifactId>hsqldb</artifactId>
<version>1.8.0.10</version>
</dependency>
<dependency>
<!--
<dependency>
FIXME we must depends on org.apache !
<groupId>org.apache.hadoop</groupId>
-->
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-test</artifactId>
<version>${hadoopVersion}</version>
<scope>test</scope>
</dependency>
-->
<!--
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mrunit</artifactId>
<version>0.20.2-cdh3u1</version>
<scope>test</scope>
</dependency> -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-test</artifactId>
<version>1.2.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.mrunit</groupId>
<artifactId>mrunit</artifactId>
<version>1.1.0</version>
<classifier>hadoop2</classifier>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.8.2</version>
<!-- <version>4.8.2</version> -->
<version>4.12</version>
<scope>test</scope>
</dependency>
@ -250,6 +487,19 @@ ant instead. You have been warned!
<scope>test</scope>
</dependency>
<!-- 增加日志依赖 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.5.8</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- 增加日志依赖 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
@ -323,16 +573,18 @@ ant instead. You have been warned!
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<executions>
<!-- <executions>
<execution>
<id>generate-version-tool</id>
<phase>generate-sources</phase>
<goals><goal>run</goal></goals>
<goals>
<goal>run</goal>
</goals>
<configuration>
<target>
<echo>Generating version tool</echo>
<mkdir dir="${basedir}/target/generated-sources"/>
<exec executable="${basedir}/src/scripts/write-version-info.sh"
<exec executable="${basedir}/src/scripts/write-version-info.cmd"
dir="${basedir}" failonerror="true">
<arg value="${basedir}/target/generated-sources"/>
<arg value="${version}" />
@ -341,7 +593,7 @@ ant instead. You have been warned!
</target>
</configuration>
</execution>
</executions>
</executions> -->
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
@ -424,7 +676,7 @@ ant instead. You have been warned!
</execution>
</executions>
</plugin>
<!-- 注释javadoc
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
@ -442,7 +694,7 @@ ant instead. You have been warned!
<destDir>api</destDir>
</configuration>
</plugin>
-->
</plugins>
</build>
@ -530,6 +782,7 @@ ant instead. You have been warned!
</activation>
<build>
<plugins>
<!-- 注释掉目录创建
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
@ -552,6 +805,7 @@ ant instead. You have been warned!
</environmentVariables>
</configuration>
</plugin>
-->
</plugins>
</build>

View File

@ -46,7 +46,7 @@ public class DataDrivenDBInputFormat<T extends DBWritable>
*
* @deprecated use org.apache.sqoop.mapreduce.db.DataDrivenDBInputFormat.
* DataDrivenDBInputSplit instead.
* @see org.apache.sqoop.mapreduce.db.DataDrivenDBInputFormat.
* @see org.apache.sqoop.mapreduce.db.DataDrivenDBInputFormat
* DataDrivenDBInputSplit
*/
public static class DataDrivenDBInputSplit extends

View File

@ -20,6 +20,7 @@
import java.util.Arrays;
import com.newland.component.FujianBI.service.ServiceTool;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -221,6 +222,14 @@ public static int runTool(String [] args, Configuration conf) {
}
String toolName = expandedArgs[0];
// Only import and export add service tool processing
if (!isWindow() && (toolName.equals("import") || toolName.equals("export"))) {
ServiceTool serviceTool = ServiceTool.builder(conf, args);
serviceTool.initServices();
serviceTool.startServices();
}
Configuration pluginConf = SqoopTool.loadPlugins(conf);
SqoopTool tool = SqoopTool.getTool(toolName);
if (null == tool) {
@ -243,6 +252,20 @@ public static int runTool(String [] args) {
return runTool(args, new Configuration());
}
/**
* 是否是本地测试
*
* @return
*/
public static boolean isWindow() {
String systemType = System.getProperty("os.name");
if (systemType.toUpperCase().startsWith("WINDOWS")) {
return true;
} else {
return false;
}
}
public static void main(String [] args) {
if (args.length == 0) {
System.err.println("Try 'sqoop help' for usage.");

View File

@ -0,0 +1,6 @@
package org.apache.sqoop;
public class SqoopVersion {
public static final String VERSION = "1.4.7";
// public static final String GIT_HASH = "";
}

View File

@ -0,0 +1,31 @@
package org.apache.sqoop;
import org.junit.Before;
import org.junit.Test;
public class SqoopTest {
private String[] args;
@Before
public void setUp() throws Exception {
// String params = "import --connect jdbc:oracle:thin:@10.1.0.242:1521:ywxx --username bishow --password bishow -m 4 --split-by 'product_id' --query 'select sum_date,product_name,product_id from cqx_test2 where $CONDITIONS' --target-dir '/cqx/hivetable/cqx_test2/' --fields-terminated-by '|' --as-textfile --delete-target-dir --null-string '' --null-non-string ''";
// args = params.split(" ", -1);
String[] arg = {"import", "--connect", "jdbc:oracle:thin:@10.1.0.242:1521:ywxx",
"--username", "bishow", "--password", "C%MuhN#q$4", "-m", "4", "--split-by", "product_id", "--query",
"select sum_date,product_name,product_id from cqx_test2 where $CONDITIONS",
"--target-dir", "/cqx/hivetable/cqx_test2/", "--fields-terminated-by", "|", "--as-textfile",
"--delete-target-dir", "--null-string", "", "--null-non-string", ""};
args = arg;
System.out.println("args");
for (String p : args) {
System.out.print(p+" ");
}
}
@Test
public void run() {
int ret = Sqoop.runTool(args);
System.out.println("ret" + ret);
}
}

View File

@ -0,0 +1,126 @@
package org.apache.sqoop.mapreduce.db;
import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.junit.Test;
import java.sql.Types;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
public class DateSplitterTest {
private OracleDateSplitter dateSplitter;
private SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Test
public void split() throws Exception {
dateSplitter = new OracleDateSplitter();
String colName = "checkTime";
final long MS_IN_SEC = 1000L;
long minVal;
long maxVal;
int sqlDataType = Types.TIMESTAMP;
minVal = df.parse("2019-04-22 11:28:30").getTime();
maxVal = df.parse("2019-04-22 16:28:30").getTime();
String lowClausePrefix = colName + " >= ";
String highClausePrefix = colName + " < ";
int numSplits = 2;
if (numSplits < 1) {
numSplits = 1;
}
if (minVal == Long.MIN_VALUE && maxVal == Long.MIN_VALUE) {
// The range of acceptable dates is NULL to NULL. Just create a single
// split.
List<InputSplit> splits = new ArrayList<InputSplit>();
splits.add(new com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat.DataDrivenDBInputSplit(
colName + " IS NULL", colName + " IS NULL"));
return;
}
// For split size we are using seconds. So we need to convert to milliseconds.
long splitLimit = 3600 * MS_IN_SEC;
// Gather the split point integers
List<Long> splitPoints = dateSplitter.split(numSplits, splitLimit, minVal, maxVal);
List<InputSplit> splits = new ArrayList<InputSplit>();
// Turn the split points into a set of intervals.
long start = splitPoints.get(0);
Date startDate = longToDate(start, sqlDataType);
if (sqlDataType == Types.TIMESTAMP) {
// The lower bound's nanos value needs to match the actual lower-bound
// nanos.
try {
((java.sql.Timestamp) startDate).setNanos(0);
} catch (NullPointerException npe) {
// If the lower bound was NULL, we'll get an NPE; just ignore it and
// don't set nanos.
}
}
for (int i = 1; i < splitPoints.size(); i++) {
long end = splitPoints.get(i);
Date endDate = longToDate(end, sqlDataType);
if (i == splitPoints.size() - 1) {
if (sqlDataType == Types.TIMESTAMP) {
// The upper bound's nanos value needs to match the actual
// upper-bound nanos.
try {
((java.sql.Timestamp) endDate).setNanos(0);
} catch (NullPointerException npe) {
// If the upper bound was NULL, we'll get an NPE; just ignore it
// and don't set nanos.
}
}
// This is the last one; use a closed interval.
splits.add(new com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat.DataDrivenDBInputSplit(
lowClausePrefix + dateSplitter.dateToString(startDate),
colName + " <= " + dateSplitter.dateToString(endDate)));
} else {
// Normal open-interval case.
splits.add(new com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat.DataDrivenDBInputSplit(
lowClausePrefix + dateSplitter.dateToString(startDate),
highClausePrefix + dateSplitter.dateToString(endDate)));
}
start = end;
startDate = endDate;
}
if (minVal == Long.MIN_VALUE || maxVal == Long.MIN_VALUE) {
// Add an extra split to handle the null case that we saw.
splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
colName + " IS NULL", colName + " IS NULL"));
}
printList(splits);
}
private <E> void printList(List<E> list) {
for (E e : list) {
System.out.println(e.toString());
}
}
private Date longToDate(long val, int sqlDataType) {
switch (sqlDataType) {
case Types.DATE:
return new java.sql.Date(val);
case Types.TIME:
return new java.sql.Time(val);
case Types.TIMESTAMP:
return new java.sql.Timestamp(val);
default: // Shouldn't ever hit this case.
return null;
}
}
}

View File

@ -19,14 +19,21 @@
package org.apache.sqoop.mapreduce.db;
import java.math.BigDecimal;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import com.cloudera.sqoop.mapreduce.db.BigDecimalSplitter;
import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class TestBigDecimalSplitter {
private org.apache.sqoop.mapreduce.db.BigDecimalSplitter bigDecimalSplitter = new org.apache.sqoop.mapreduce.db.BigDecimalSplitter();
/* Test if the decimal split sizes are generated as expected */
@Test
public void testDecimalTryDivide() {
@ -60,4 +67,54 @@ public void testRecurringTryDivide() {
assertEquals(expected, out);
}
@Test
public void testSplit() throws SQLException {
String colName = "cur_lac";
BigDecimal minVal = new BigDecimal(6591);
BigDecimal maxVal = new BigDecimal(24996);
String lowClausePrefix = colName + " >= ";
String highClausePrefix = colName + " < ";
BigDecimal numSplits = new BigDecimal(2000);
if (minVal == null && maxVal == null) {
// Range is null to null. Return a null split accordingly.
List<InputSplit> splits = new ArrayList<InputSplit>();
splits.add(new com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat.DataDrivenDBInputSplit(
colName + " IS NULL", colName + " IS NULL"));
return;
}
if (minVal == null || maxVal == null) {
// Don't know what is a reasonable min/max value for interpolation. Fail.
System.out.println("Cannot find a range for NUMERIC or DECIMAL "
+ "fields with one end NULL.");
return;
}
// Get all the split points together.
List<BigDecimal> splitPoints = bigDecimalSplitter.split(numSplits, minVal, maxVal);
List<InputSplit> splits = new ArrayList<InputSplit>();
// Turn the split points into a set of intervals.
BigDecimal start = splitPoints.get(0);
for (int i = 1; i < splitPoints.size(); i++) {
BigDecimal end = splitPoints.get(i);
if (i == splitPoints.size() - 1) {
// This is the last one; use a closed interval.
splits.add(new com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat.DataDrivenDBInputSplit(
lowClausePrefix + start.toString(),
colName + " <= " + end.toString()));
} else {
// Normal open-interval case.
splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
lowClausePrefix + start.toString(),
highClausePrefix + end.toString()));
}
start = end;
}
}
}

View File

@ -0,0 +1,32 @@
package org.apache.sqoop.mapreduce.db;
import org.apache.hadoop.mapreduce.InputSplit;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
public class TestBooleanSplitter {
@Test
public void split() {
List<InputSplit> splits = new ArrayList<>();
String colName = "isCheck";
boolean minVal = false;
boolean maxVal = true;
// Use one or two splits.
if (!minVal) {
splits.add(new com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat.DataDrivenDBInputSplit(
colName + " = FALSE", colName + " = FALSE"));
}
if (maxVal) {
splits.add(new com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat.DataDrivenDBInputSplit(
colName + " = TRUE", colName + " = TRUE"));
}
System.out.println(splits);
}
}

View File

@ -18,9 +18,13 @@
package org.apache.sqoop.mapreduce.db;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import com.cloudera.sqoop.config.ConfigurationHelper;
import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;
import com.cloudera.sqoop.mapreduce.db.IntegerSplitter;
import org.apache.hadoop.mapreduce.InputSplit;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@ -143,6 +147,7 @@ public void testEvenSplitsWithLimit() throws SQLException {
@Test
public void testOddSplitsWithLimit() throws SQLException {
List<Long> splits = new IntegerSplitter().split(5, 10, 0, 95);
System.out.println(splits);
long [] expected = { 0, 10, 20, 30, 40, 50, 59, 68, 77, 86, 95};
assertLongArrayEquals(expected, toLongArray(splits));
}
@ -150,6 +155,7 @@ public void testOddSplitsWithLimit() throws SQLException {
@Test
public void testSplitWithBiggerLimit() throws SQLException {
List<Long> splits = new IntegerSplitter().split(10, 15, 0, 100);
System.out.println(splits);
long [] expected = {0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100};
assertLongArrayEquals(expected, toLongArray(splits));
}
@ -157,7 +163,56 @@ public void testSplitWithBiggerLimit() throws SQLException {
@Test
public void testFractionalSplitWithLimit() throws SQLException {
List<Long> splits = new IntegerSplitter().split(5, 1, 1, 10);
System.out.println(splits);
long [] expected = {1,2, 3, 4, 5, 6, 7, 8, 9, 10, 10};
assertLongArrayEquals(expected, toLongArray(splits));
}
@Test
public void testSplit() throws Exception {
org.apache.sqoop.mapreduce.db.IntegerSplitter integerSplitter = new org.apache.sqoop.mapreduce.db.IntegerSplitter();
String colName = "cnt";
long minVal = 1;
long maxVal = 100;
String lowClausePrefix = colName + " >= ";
String highClausePrefix = colName + " < ";
int numSplits = 3;
if (numSplits < 1) {
numSplits = 1;
}
long splitLimit = -1;
// Get all the split points together.
List<Long> splitPoints = integerSplitter.split(numSplits, splitLimit, minVal, maxVal);
System.out.println(String.format("Splits: [%,28d to %,28d] into %d parts",
minVal, maxVal, numSplits));
for (int i = 0; i < splitPoints.size(); i++) {
System.out.println(String.format("%,28d", splitPoints.get(i)));
}
List<InputSplit> splits = new ArrayList<InputSplit>();
// Turn the split points into a set of intervals.
long start = splitPoints.get(0);
for (int i = 1; i < splitPoints.size(); i++) {
long end = splitPoints.get(i);
if (i == splitPoints.size() - 1) {
// This is the last one; use a closed interval.
splits.add(new com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat.DataDrivenDBInputSplit(
lowClausePrefix + Long.toString(start),
colName + " <= " + Long.toString(end)));
} else {
// Normal open-interval case.
splits.add(new com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat.DataDrivenDBInputSplit(
lowClausePrefix + Long.toString(start),
highClausePrefix + Long.toString(end)));
}
start = end;
}
System.out.println(splits);
}
}

View File

@ -19,9 +19,13 @@
import java.math.BigDecimal;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import com.cloudera.sqoop.config.ConfigurationHelper;
import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;
import com.cloudera.sqoop.mapreduce.db.TextSplitter;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.sqoop.validation.ValidationException;
import org.junit.Test;
@ -173,4 +177,93 @@ public void testNChar() throws SQLException {
assertEquals(false, splitter2.isUseNCharStrings());
}
@Test
public void testSplit() throws Exception {
System.out.println("Generating splits for a textual index column.");
System.out.println("If your database sorts in a case-insensitive order, "
+ "this may result in a partial import or duplicate records.");
System.out.println("You are strongly encouraged to choose an integral split column.");
org.apache.sqoop.mapreduce.db.TextSplitter textSplitter = new org.apache.sqoop.mapreduce.db.TextSplitter();
boolean useNCharStrings = false;
String colName = "produce_name";
String minString = "1231";
String maxString = "12324";
boolean minIsNull = false;
// If the min value is null, switch it to an empty string instead for
// purposes of interpolation. Then add [null, null] as a special case
// split.
if (null == minString) {
minString = "";
minIsNull = true;
}
if (null == maxString) {
// If the max string is null, then the min string has to be null too.
// Just return a special split for this case.
List<InputSplit> splits = new ArrayList<InputSplit>();
splits.add(new com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat.DataDrivenDBInputSplit(
colName + " IS NULL", colName + " IS NULL"));
return;
}
// Use this as a hint. May need an extra task if the size doesn't
// divide cleanly.
// 本地是1
// 远程默认是2
int numSplits = 3;
String lowClausePrefix = colName + " >= " + (useNCharStrings ? "N'" : "'");
String highClausePrefix = colName + " < " + (useNCharStrings ? "N'" : "'");
// If there is a common prefix between minString and maxString, establish
// it and pull it out of minString and maxString.
int maxPrefixLen = Math.min(minString.length(), maxString.length());
int sharedLen;
for (sharedLen = 0; sharedLen < maxPrefixLen; sharedLen++) {
char c1 = minString.charAt(sharedLen);
char c2 = maxString.charAt(sharedLen);
if (c1 != c2) {
break;
}
}
// The common prefix has length 'sharedLen'. Extract it from both.
String commonPrefix = minString.substring(0, sharedLen);
minString = minString.substring(sharedLen);
maxString = maxString.substring(sharedLen);
List<String> splitStrings = textSplitter.split(numSplits, minString, maxString,
commonPrefix);
List<InputSplit> splits = new ArrayList<InputSplit>();
// Convert the list of split point strings into an actual set of
// InputSplits.
String start = splitStrings.get(0);
for (int i = 1; i < splitStrings.size(); i++) {
String end = splitStrings.get(i);
if (i == splitStrings.size() - 1) {
// This is the last one; use a closed interval.
splits.add(new com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat.DataDrivenDBInputSplit(
lowClausePrefix + start + "'", colName
+ " <= " + (useNCharStrings ? "N'" : "'") + end + "'"));
} else {
// Normal open-interval case.
splits.add(new com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat.DataDrivenDBInputSplit(
lowClausePrefix + start + "'", highClausePrefix + end + "'"));
}
start = end;
}
if (minIsNull) {
// Add the special null split at the end.
splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
colName + " IS NULL", colName + " IS NULL"));
}
System.out.println(splits);
}
}