5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-06 06:30:37 +08:00

SQOOP-2441: Sqoop2: Generic JDBC: Drop support for specifying custom query when exporting data

(Jarek Jarcec Cecho via Abraham Elmahrek)
This commit is contained in:
Abraham Elmahrek 2015-08-24 13:59:03 -07:00
parent 548dcca423
commit 09a21649d4
5 changed files with 46 additions and 159 deletions

View File

@ -42,14 +42,6 @@ public enum GenericJdbcConnectorError implements ErrorCode {
/** No boundaries are found for partition column. */
GENERIC_JDBC_CONNECTOR_0006("No boundaries are found for partition column"),
/** The table name and the table sql cannot be specify together. */
GENERIC_JDBC_CONNECTOR_0007("The table name and the table sql "
+ "cannot be specified together"),
/** Neither the table name nor the table sql are specified. */
GENERIC_JDBC_CONNECTOR_0008("Neither the table name nor the table sql "
+ "are specified"),
/** No substitute token in the specified sql. */
GENERIC_JDBC_CONNECTOR_0010("No substitute token in the specified sql"),
@ -79,8 +71,6 @@ public enum GenericJdbcConnectorError implements ErrorCode {
GENERIC_JDBC_CONNECTOR_0018("Error occurred while transferring data from " +
"stage table to destination table."),
GENERIC_JDBC_CONNECTOR_0019("Table name extraction not supported."),
GENERIC_JDBC_CONNECTOR_0020("Unknown direction."),
GENERIC_JDBC_CONNECTOR_0021("Schema column size do not match the result set column size"),

View File

@ -65,10 +65,7 @@ public Schema getSchema(InitializerContext context, LinkConfiguration linkConfig
executor = new GenericJdbcExecutor(linkConfig);
String schemaName = executor.encloseIdentifiers(toJobConfig.toJobConfig.schemaName, toJobConfig.toJobConfig.tableName);
if (schemaName == null) {
throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0019,
"Table name extraction not supported yet.");
}
assert schemaName != null;
Schema schema = new Schema(schemaName);
try (Statement statement = executor.getConnection().createStatement(
@ -106,86 +103,55 @@ private void configureTableProperties(MutableContext context, LinkConfiguration
false : toJobConfig.toJobConfig.shouldClearStageTable;
final boolean stageEnabled =
stageTableName != null && stageTableName.length() > 0;
String tableSql = toJobConfig.toJobConfig.sql;
String tableColumns = toJobConfig.toJobConfig.columns;
if (tableName != null && tableSql != null) {
// when both fromTable name and fromTable sql are specified:
throw new SqoopException(
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0007);
} else if (tableName != null) {
// when fromTable name is specified:
if(stageEnabled) {
LOG.info("Stage has been enabled.");
LOG.info("Use stageTable: " + stageTableName +
" with clearStageTable: " + clearStageTable);
if(clearStageTable) {
executor.deleteTableData(stageTableName);
} else {
long stageRowCount = executor.getTableRowCount(stageTableName);
if(stageRowCount > 0) {
throw new SqoopException(
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0017);
}
}
}
// For databases that support schemas (IE: postgresql).
final String tableInUse = stageEnabled ? stageTableName : tableName;
String fullTableName = executor.encloseIdentifiers(schemaName, tableInUse);
if (tableColumns == null) {
String[] columns = executor.getQueryColumns("SELECT * FROM "
+ fullTableName + " WHERE 1 = 0");
StringBuilder builder = new StringBuilder();
builder.append("INSERT INTO ");
builder.append(fullTableName);
builder.append(" VALUES (?");
for (int i = 1; i < columns.length; i++) {
builder.append(",?");
}
builder.append(")");
dataSql = builder.toString();
// when fromTable name is specified:
if(stageEnabled) {
LOG.info("Stage has been enabled.");
LOG.info("Use stageTable: " + stageTableName + " with clearStageTable: " + clearStageTable);
if(clearStageTable) {
executor.deleteTableData(stageTableName);
} else {
String[] columns = StringUtils.split(tableColumns, ',');
StringBuilder builder = new StringBuilder();
builder.append("INSERT INTO ");
builder.append(fullTableName);
builder.append(" (");
builder.append(tableColumns);
builder.append(") VALUES (?");
for (int i = 1; i < columns.length; i++) {
builder.append(",?");
long stageRowCount = executor.getTableRowCount(stageTableName);
if(stageRowCount > 0) {
throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0017, "Found rows: " + stageRowCount);
}
builder.append(")");
dataSql = builder.toString();
}
} else if (tableSql != null) {
// when fromTable sql is specified:
if (tableSql.indexOf(
GenericJdbcConnectorConstants.SQL_PARAMETER_MARKER) == -1) {
// make sure parameter marker is in the specified sql
throw new SqoopException(
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0013);
}
if (tableColumns == null) {
dataSql = tableSql;
} else {
throw new SqoopException(
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0014);
}
} else {
// when neither are specified:
throw new SqoopException(
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0008);
}
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_TO_DATA_SQL,
dataSql);
// For databases that support schemas (IE: postgresql).
final String tableInUse = stageEnabled ? stageTableName : tableName;
String fullTableName = executor.encloseIdentifiers(schemaName, tableInUse);
if (tableColumns == null) {
String[] columns = executor.getQueryColumns("SELECT * FROM " + fullTableName + " WHERE 1 = 0");
StringBuilder builder = new StringBuilder();
builder.append("INSERT INTO ");
builder.append(fullTableName);
builder.append(" VALUES (?");
for (int i = 1; i < columns.length; i++) {
builder.append(",?");
}
builder.append(")");
dataSql = builder.toString();
} else {
String[] columns = StringUtils.split(tableColumns, ',');
StringBuilder builder = new StringBuilder();
builder.append("INSERT INTO ");
builder.append(fullTableName);
builder.append(" (");
builder.append(tableColumns);
builder.append(") VALUES (?");
for (int i = 1; i < columns.length; i++) {
builder.append(",?");
}
builder.append(")");
dataSql = builder.toString();
}
LOG.info("Using query to insert data: " + dataSql);
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_TO_DATA_SQL, dataSql);
}
}

View File

@ -22,6 +22,7 @@
import org.apache.sqoop.model.Validator;
import org.apache.sqoop.validation.Status;
import org.apache.sqoop.validation.validators.AbstractValidator;
import org.apache.sqoop.validation.validators.NotEmpty;
/**
*
@ -31,12 +32,9 @@ public class ToJobConfig {
@Input(size = 50)
public String schemaName;
@Input(size = 2000)
@Input(size = 2000, validators = { @Validator(NotEmpty.class)})
public String tableName;
@Input(size = 50)
public String sql;
@Input(size = 50)
public String columns;
@ -49,16 +47,6 @@ public class ToJobConfig {
public static class ConfigValidator extends AbstractValidator<ToJobConfig> {
@Override
public void validate(ToJobConfig config) {
if (config.tableName == null && config.sql == null) {
addMessage(Status.ERROR, "Either table name or SQL must be specified");
}
if (config.tableName != null && config.sql != null) {
addMessage(Status.ERROR, "Both table name and SQL cannot be specified");
}
if (config.tableName == null && config.stageTableName != null) {
addMessage(Status.ERROR,
"Stage table name cannot be specified without specifying table name");
}
if (config.stageTableName == null && config.shouldClearStageTable != null) {
addMessage(Status.ERROR,
"Should Clear stage table cannot be specified without specifying the name of the stage table.");

View File

@ -17,24 +17,16 @@
*/
package org.apache.sqoop.connector.jdbc;
import org.apache.commons.lang.StringUtils;
import org.apache.sqoop.connector.jdbc.configuration.FromJobConfiguration;
import org.apache.sqoop.connector.jdbc.configuration.LinkConfiguration;
import org.apache.sqoop.connector.jdbc.configuration.ToJobConfiguration;
import org.apache.sqoop.model.ConfigUtils;
import org.apache.sqoop.model.InputEditable;
import org.apache.sqoop.model.MBooleanInput;
import org.apache.sqoop.model.MConfig;
import org.apache.sqoop.model.MFromConfig;
import org.apache.sqoop.model.MInput;
import org.apache.sqoop.model.MLinkConfig;
import org.apache.sqoop.model.MStringInput;
import org.apache.sqoop.model.MToConfig;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.util.LinkedList;
import static org.testng.Assert.assertEquals;
/**
@ -56,7 +48,6 @@ public void testFromConfig() {
MFromConfig newConfigs = new MFromConfig(ConfigUtils.toConfigs(FromJobConfiguration.class));
originalConfigs.getInput("fromJobConfig.schemaName").setValue("test-schema");
originalConfigs.getInput("fromJobConfig.tableName").setValue("test-tableName");
originalConfigs.getInput("fromJobConfig.sql").setValue("test-sql");
originalConfigs.getInput("fromJobConfig.columns").setValue("test-columns");
originalConfigs.getInput("fromJobConfig.partitionColumn").setValue("test-partitionColumn");
originalConfigs.getInput("fromJobConfig.allowNullValueInPartitionColumn").setValue("test-allowNullValueInPartitionColumn");
@ -64,7 +55,6 @@ public void testFromConfig() {
assertEquals(originalConfigs, newConfigs);
assertEquals("test-schema", newConfigs.getInput("fromJobConfig.schemaName").getValue());
assertEquals("test-tableName", newConfigs.getInput("fromJobConfig.tableName").getValue());
assertEquals("test-sql", newConfigs.getInput("fromJobConfig.sql").getValue());
assertEquals("test-columns", newConfigs.getInput("fromJobConfig.columns").getValue());
assertEquals("test-partitionColumn", newConfigs.getInput("fromJobConfig.partitionColumn").getValue());
assertEquals("test-allowNullValueInPartitionColumn", newConfigs.getInput("fromJobConfig.allowNullValueInPartitionColumn").getValue());
@ -77,7 +67,6 @@ public void testToConfig() {
MToConfig newConfigs = new MToConfig(ConfigUtils.toConfigs(ToJobConfiguration.class));
originalConfigs.getInput("toJobConfig.schemaName").setValue("test-schema");
originalConfigs.getInput("toJobConfig.tableName").setValue("test-tableName");
originalConfigs.getInput("toJobConfig.sql").setValue("test-sql");
originalConfigs.getInput("toJobConfig.columns").setValue("test-columns");
originalConfigs.getInput("toJobConfig.stageTableName").setValue("test-stageTableName");
originalConfigs.getInput("toJobConfig.shouldClearStageTable").setValue("test-shouldClearStageTable");
@ -85,7 +74,6 @@ public void testToConfig() {
assertEquals(originalConfigs, newConfigs);
assertEquals("test-schema", newConfigs.getInput("toJobConfig.schemaName").getValue());
assertEquals("test-tableName", newConfigs.getInput("toJobConfig.tableName").getValue());
assertEquals("test-sql", newConfigs.getInput("toJobConfig.sql").getValue());
assertEquals("test-columns", newConfigs.getInput("toJobConfig.columns").getValue());
assertEquals("test-stageTableName", newConfigs.getInput("toJobConfig.stageTableName").getValue());
assertEquals("test-shouldClearStageTable", newConfigs.getInput("toJobConfig.shouldClearStageTable").getValue());

View File

@ -40,8 +40,6 @@ public class TestToInitializer {
private final String tableName;
private final String schemalessTableName;
private final String stageTableName;
private final String tableSql;
private final String schemalessTableSql;
private final String tableColumns;
private GenericJdbcExecutor executor;
@ -50,10 +48,7 @@ public TestToInitializer() {
schemaName = getClass().getSimpleName().toUpperCase() + "SCHEMA";
tableName = getClass().getSimpleName().toUpperCase() + "TABLEWITHSCHEMA";
schemalessTableName = getClass().getSimpleName().toUpperCase() + "TABLE";
stageTableName = getClass().getSimpleName().toUpperCase() +
"_STAGE_TABLE";
tableSql = "INSERT INTO " + tableName + " VALUES (?,?,?)";
schemalessTableSql = "INSERT INTO " + schemalessTableName + " VALUES (?,?,?)";
stageTableName = getClass().getSimpleName().toUpperCase() + "_STAGE_TABLE";
tableColumns = "ICOL,VCOL";
}
@ -123,26 +118,6 @@ public void testTableNameWithTableColumns() throws Exception {
verifyResult(context, "INSERT INTO " + fullTableName + " (" + tableColumns + ") VALUES (?,?)");
}
@Test
@SuppressWarnings("unchecked")
public void testTableSql() throws Exception {
LinkConfiguration linkConfig = new LinkConfiguration();
ToJobConfiguration jobConfig = new ToJobConfiguration();
linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER;
linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
jobConfig.toJobConfig.sql = schemalessTableSql;
MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context);
@SuppressWarnings("rawtypes")
Initializer initializer = new GenericJdbcToInitializer();
initializer.initialize(initializerContext, linkConfig, jobConfig);
verifyResult(context, schemalessTableSql);
}
@Test
@SuppressWarnings("unchecked")
public void testTableNameWithSchema() throws Exception {
@ -286,26 +261,6 @@ public void testClearStageTableValidation() throws Exception {
"toJobConfig"));
}
@Test
public void testStageTableWithoutTable() throws Exception {
LinkConfiguration linkConfig = new LinkConfiguration();
ToJobConfiguration jobConfig = new ToJobConfiguration();
linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER;
linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
//specifying stage table without specifying table name
jobConfig.toJobConfig.stageTableName = stageTableName;
jobConfig.toJobConfig.sql = "";
ConfigValidationRunner validationRunner = new ConfigValidationRunner();
ConfigValidationResult result = validationRunner.validate(jobConfig);
assertEquals(Status.ERROR, result.getStatus(),
"Stage table name cannot be specified without specifying " +
"table name");
assertTrue(result.getMessages().containsKey(
"toJobConfig"));
}
@Test
@SuppressWarnings("unchecked")
public void testClearStageTable() throws Exception {