mirror of
https://github.com/apache/sqoop.git
synced 2025-05-19 02:10:54 +08:00
SQOOP-2230: Sqoop2: Kite connector can use URIBuilder
(Abraham Elmahrek via Jarek Jarcec Cecho)
This commit is contained in:
parent
700d55ac74
commit
598460f4a5
@ -21,6 +21,7 @@
|
||||
import com.google.common.io.Closeables;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.sqoop.common.SqoopException;
|
||||
import org.apache.sqoop.connector.common.FileFormat;
|
||||
import org.apache.sqoop.connector.common.AvroDataTypeUtil;
|
||||
@ -162,7 +163,25 @@ public void mergeDataset(String uri) {
|
||||
*/
|
||||
public static String suggestTemporaryDatasetUri(String uri) {
|
||||
if (uri.startsWith("dataset:hdfs:")) {
|
||||
return uri + TEMPORARY_DATASET_PREFIX + UUID.randomUUID().toString().replace("-", "");
|
||||
int pathStart = uri.lastIndexOf(":") + 1;
|
||||
int pathEnd = uri.lastIndexOf("?");
|
||||
String[] uriParts = null;
|
||||
|
||||
// Get URI parts
|
||||
if (pathEnd > -1) {
|
||||
uriParts = new String[3];
|
||||
uriParts[2] = uri.substring(pathEnd, uri.length());
|
||||
} else {
|
||||
pathEnd = uri.length();
|
||||
uriParts = new String[2];
|
||||
}
|
||||
uriParts[1] = uri.substring(pathStart, pathEnd);
|
||||
uriParts[0] = uri.substring(0, pathStart);
|
||||
|
||||
// Add to path
|
||||
uriParts[1] += TEMPORARY_DATASET_PREFIX + UUID.randomUUID().toString().replace("-", "");
|
||||
|
||||
return StringUtils.join(uriParts, "");
|
||||
} else {
|
||||
throw new SqoopException(
|
||||
KiteConnectorError.GENERIC_KITE_CONNECTOR_0000, uri);
|
||||
|
@ -18,6 +18,7 @@
|
||||
package org.apache.sqoop.connector.kite.configuration;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
import org.kitesdk.data.URIBuilder;
|
||||
|
||||
public class ConfigUtil {
|
||||
|
||||
@ -27,10 +28,19 @@ public class ConfigUtil {
|
||||
*/
|
||||
public static String buildDatasetUri(String fsLocation, String uri) {
|
||||
if (!Strings.isNullOrEmpty(fsLocation) && !uri.contains("://")) {
|
||||
// Add fsLocation after the second colon
|
||||
int p = uri.indexOf(":", uri.indexOf(":") + 1);
|
||||
return uri.substring(0, p + 1) + "//" + fsLocation + uri.substring(p + 1);
|
||||
URIBuilder builder = new URIBuilder(uri);
|
||||
|
||||
String[] parts = fsLocation.split(":");
|
||||
if (parts.length > 0) {
|
||||
builder.with("auth:host", parts[0]);
|
||||
}
|
||||
if (parts.length > 1) {
|
||||
builder.with("auth:port", parts[1]);
|
||||
}
|
||||
|
||||
return builder.build().toString().replaceFirst("view:", "dataset:");
|
||||
}
|
||||
|
||||
return uri;
|
||||
}
|
||||
|
||||
|
@ -147,6 +147,19 @@ public void testCloseReader() {
|
||||
assertTrue(executor.isReaderClosed());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSuggestTemporaryDatasetUri() {
|
||||
String uri = "dataset:hdfs:/tmp/sqoop/test";
|
||||
String suggestedUri = KiteDatasetExecutor.suggestTemporaryDatasetUri(uri);
|
||||
assertTrue(suggestedUri.length() > uri.length());
|
||||
assertTrue(suggestedUri.contains(uri));
|
||||
|
||||
uri = "dataset:hdfs://namenode:8020/tmp/sqoop/test";
|
||||
suggestedUri = KiteDatasetExecutor.suggestTemporaryDatasetUri(uri);
|
||||
assertTrue(suggestedUri.length() > uri.length());
|
||||
assertTrue(suggestedUri.contains(uri));
|
||||
}
|
||||
|
||||
private static Schema createTwoFieldSchema() {
|
||||
return new Schema.Parser().parse("{" +
|
||||
"\"name\":\"test\",\"type\":\"record\"," +
|
||||
|
Loading…
Reference in New Issue
Block a user