mirror of
https://github.com/apache/sqoop.git
synced 2025-05-21 19:31:13 +08:00
SQOOP-655: Generic JDBC connector for export
(Bilung Lee via Jarek Jarcec Cecho)
This commit is contained in:
parent
2481b7f8d0
commit
0976713f01
@ -18,7 +18,6 @@
|
||||
package org.apache.sqoop.common;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
|
@ -20,6 +20,7 @@
|
||||
import java.sql.Connection;
|
||||
import java.sql.DatabaseMetaData;
|
||||
import java.sql.DriverManager;
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.ResultSetMetaData;
|
||||
import java.sql.SQLException;
|
||||
@ -30,6 +31,7 @@
|
||||
public class GenericJdbcExecutor {
|
||||
|
||||
private Connection connection;
|
||||
private PreparedStatement preparedStatement;
|
||||
|
||||
public GenericJdbcExecutor(String driver, String url,
|
||||
String username, String password) {
|
||||
@ -71,6 +73,52 @@ public void executeUpdate(String sql) {
|
||||
}
|
||||
}
|
||||
|
||||
public void beginBatch(String sql) {
|
||||
try {
|
||||
preparedStatement = connection.prepareStatement(sql,
|
||||
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
|
||||
|
||||
} catch (SQLException e) {
|
||||
throw new SqoopException(
|
||||
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0002, e);
|
||||
}
|
||||
}
|
||||
|
||||
public void addBatch(Object[] array) {
|
||||
try {
|
||||
for (int i=0; i<array.length; i++) {
|
||||
preparedStatement.setObject(i+1, array[i]);
|
||||
}
|
||||
preparedStatement.addBatch();
|
||||
} catch (SQLException e) {
|
||||
throw new SqoopException(
|
||||
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0002, e);
|
||||
}
|
||||
}
|
||||
|
||||
public void executeBatch(boolean commit) {
|
||||
try {
|
||||
preparedStatement.executeBatch();
|
||||
if (commit) {
|
||||
connection.commit();
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
throw new SqoopException(
|
||||
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0002, e);
|
||||
}
|
||||
}
|
||||
|
||||
public void endBatch() {
|
||||
try {
|
||||
if (preparedStatement != null) {
|
||||
preparedStatement.close();
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
throw new SqoopException(
|
||||
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0002, e);
|
||||
}
|
||||
}
|
||||
|
||||
public String getPrimaryKey(String table) {
|
||||
try {
|
||||
String[] splitNames = dequalify(table);
|
||||
|
@ -17,13 +17,13 @@
|
||||
*/
|
||||
package org.apache.sqoop.connector.jdbc;
|
||||
|
||||
import org.apache.sqoop.common.MapContext;
|
||||
import org.apache.sqoop.common.ImmutableContext;
|
||||
import org.apache.sqoop.job.etl.Destroyer;
|
||||
|
||||
public class GenericJdbcExportDestroyer extends Destroyer {
|
||||
|
||||
@Override
|
||||
public void run(MapContext context) {
|
||||
public void run(ImmutableContext context) {
|
||||
// TODO Auto-generated method stub
|
||||
}
|
||||
|
||||
|
@ -17,14 +17,178 @@
|
||||
*/
|
||||
package org.apache.sqoop.connector.jdbc;
|
||||
|
||||
import org.apache.sqoop.common.MutableMapContext;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.sqoop.common.ImmutableContext;
|
||||
import org.apache.sqoop.common.MutableContext;
|
||||
import org.apache.sqoop.common.SqoopException;
|
||||
import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
|
||||
import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration;
|
||||
import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
|
||||
import org.apache.sqoop.job.Constants;
|
||||
import org.apache.sqoop.job.etl.Initializer;
|
||||
import org.apache.sqoop.utils.ClassUtils;
|
||||
|
||||
public class GenericJdbcExportInitializer extends Initializer {
|
||||
|
||||
private GenericJdbcExecutor executor;
|
||||
|
||||
@Override
|
||||
public void initialize(MutableMapContext context, Object connectionConfiguration, Object jobConfiguration) {
|
||||
// TODO Auto-generated method stub
|
||||
public void initialize(MutableContext context, Object connectionConfiguration, Object jobConfiguration) {
|
||||
ConnectionConfiguration connectionConfig = (ConnectionConfiguration)connectionConfiguration;
|
||||
ExportJobConfiguration jobConfig = (ExportJobConfiguration)jobConfiguration;
|
||||
|
||||
configureJdbcProperties(context, connectionConfig, jobConfig);
|
||||
try {
|
||||
configureTableProperties(context, connectionConfig, jobConfig);
|
||||
|
||||
} finally {
|
||||
executor.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getJars(ImmutableContext context, Object connectionConfiguration, Object jobConfiguration) {
|
||||
List<String> jars = new LinkedList<String>();
|
||||
|
||||
ConnectionConfiguration connection = (ConnectionConfiguration) connectionConfiguration;
|
||||
jars.add(ClassUtils.jarForClass(connection.jdbcDriver));
|
||||
|
||||
return jars;
|
||||
}
|
||||
|
||||
private void configureJdbcProperties(MutableContext context, ConnectionConfiguration connectionConfig, ExportJobConfiguration jobConfig) {
|
||||
String driver = connectionConfig.jdbcDriver;
|
||||
String url = connectionConfig.connectionString;
|
||||
String username = connectionConfig.username;
|
||||
String password = connectionConfig.password;
|
||||
|
||||
if (driver == null) {
|
||||
throw new SqoopException(
|
||||
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0012,
|
||||
GenericJdbcConnectorConstants.INPUT_CONN_JDBCDRIVER);
|
||||
}
|
||||
context.setString(
|
||||
GenericJdbcConnectorConstants.CONNECTOR_JDBC_DRIVER,
|
||||
driver);
|
||||
|
||||
if (url == null) {
|
||||
throw new SqoopException(
|
||||
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0012,
|
||||
GenericJdbcConnectorConstants.INPUT_CONN_CONNECTSTRING);
|
||||
}
|
||||
context.setString(
|
||||
GenericJdbcConnectorConstants.CONNECTOR_JDBC_URL,
|
||||
url);
|
||||
|
||||
if (username != null) {
|
||||
context.setString(
|
||||
GenericJdbcConnectorConstants.CONNECTOR_JDBC_USERNAME,
|
||||
username);
|
||||
}
|
||||
|
||||
if (password != null) {
|
||||
context.setString(
|
||||
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PASSWORD,
|
||||
password);
|
||||
}
|
||||
|
||||
executor = new GenericJdbcExecutor(driver, url, username, password);
|
||||
}
|
||||
|
||||
private void configureTableProperties(MutableContext context, ConnectionConfiguration connectionConfig, ExportJobConfiguration jobConfig) {
|
||||
String dataSql;
|
||||
String inputDirectory;
|
||||
|
||||
String tableName = connectionConfig.tableName;
|
||||
String tableSql = connectionConfig.sql;
|
||||
String tableColumns = connectionConfig.columns;
|
||||
|
||||
String datadir = connectionConfig.dataDirectory;
|
||||
String warehouse = connectionConfig.warehouse;
|
||||
if (warehouse == null) {
|
||||
warehouse = GenericJdbcConnectorConstants.DEFAULT_WAREHOUSE;
|
||||
} else if (!warehouse.endsWith(GenericJdbcConnectorConstants.FILE_SEPARATOR)) {
|
||||
warehouse += GenericJdbcConnectorConstants.FILE_SEPARATOR;
|
||||
}
|
||||
|
||||
if (tableName != null && tableSql != null) {
|
||||
// when both table name and table sql are specified:
|
||||
throw new SqoopException(
|
||||
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0007);
|
||||
|
||||
} else if (tableName != null) {
|
||||
// when table name is specified:
|
||||
|
||||
if (tableColumns == null) {
|
||||
String[] columns = executor.getQueryColumns("SELECT * FROM "
|
||||
+ executor.delimitIdentifier(tableName) + " WHERE 1 = 0");
|
||||
StringBuilder builder = new StringBuilder();
|
||||
builder.append("INSERT INTO ");
|
||||
builder.append(executor.delimitIdentifier(tableName));
|
||||
builder.append(" VALUES (?");
|
||||
for (int i = 1; i < columns.length; i++) {
|
||||
builder.append(",?");
|
||||
}
|
||||
builder.append(")");
|
||||
dataSql = builder.toString();
|
||||
|
||||
} else {
|
||||
String[] columns = StringUtils.split(tableColumns, ',');
|
||||
StringBuilder builder = new StringBuilder();
|
||||
builder.append("INSERT INTO ");
|
||||
builder.append(executor.delimitIdentifier(tableName));
|
||||
builder.append(" (");
|
||||
builder.append(tableColumns);
|
||||
builder.append(") VALUES (?");
|
||||
for (int i = 1; i < columns.length; i++) {
|
||||
builder.append(",?");
|
||||
}
|
||||
builder.append(")");
|
||||
dataSql = builder.toString();
|
||||
}
|
||||
|
||||
if (datadir == null) {
|
||||
inputDirectory = warehouse + tableName;
|
||||
} else {
|
||||
inputDirectory = warehouse + datadir;
|
||||
}
|
||||
|
||||
} else if (tableSql != null) {
|
||||
// when table sql is specified:
|
||||
|
||||
if (tableSql.indexOf(
|
||||
GenericJdbcConnectorConstants.SQL_PARAMETER_MARKER) == -1) {
|
||||
// make sure parameter marker is in the specified sql
|
||||
throw new SqoopException(
|
||||
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0013);
|
||||
}
|
||||
|
||||
if (tableColumns == null) {
|
||||
dataSql = tableSql;
|
||||
} else {
|
||||
throw new SqoopException(
|
||||
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0014);
|
||||
}
|
||||
|
||||
if (datadir == null) {
|
||||
inputDirectory =
|
||||
warehouse + GenericJdbcConnectorConstants.DEFAULT_DATADIR;
|
||||
} else {
|
||||
inputDirectory = warehouse + datadir;
|
||||
}
|
||||
|
||||
} else {
|
||||
// when neither are specified:
|
||||
throw new SqoopException(
|
||||
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0008);
|
||||
}
|
||||
|
||||
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL,
|
||||
dataSql.toString());
|
||||
context.setString(Constants.JOB_ETL_INPUT_DIRECTORY, inputDirectory);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -23,9 +23,58 @@
|
||||
|
||||
public class GenericJdbcExportLoader extends Loader {
|
||||
|
||||
public static final int DEFAULT_ROWS_PER_BATCH = 100;
|
||||
public static final int DEFAULT_BATCHES_PER_TRANSACTION = 100;
|
||||
private int rowsPerBatch = DEFAULT_ROWS_PER_BATCH;
|
||||
private int batchesPerTransaction = DEFAULT_BATCHES_PER_TRANSACTION;
|
||||
|
||||
@Override
|
||||
public void run(ImmutableContext context, DataReader reader) {
|
||||
// TODO Auto-generated method stub
|
||||
String driver = context.getString(
|
||||
GenericJdbcConnectorConstants.CONNECTOR_JDBC_DRIVER);
|
||||
String url = context.getString(
|
||||
GenericJdbcConnectorConstants.CONNECTOR_JDBC_URL);
|
||||
String username = context.getString(
|
||||
GenericJdbcConnectorConstants.CONNECTOR_JDBC_USERNAME);
|
||||
String password = context.getString(
|
||||
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PASSWORD);
|
||||
GenericJdbcExecutor executor = new GenericJdbcExecutor(
|
||||
driver, url, username, password);
|
||||
|
||||
String sql = context.getString(
|
||||
GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL);
|
||||
executor.beginBatch(sql);
|
||||
try {
|
||||
int numberOfRows = 0;
|
||||
int numberOfBatches = 0;
|
||||
Object[] array;
|
||||
|
||||
while ((array = reader.readArrayRecord()) != null) {
|
||||
numberOfRows++;
|
||||
executor.addBatch(array);
|
||||
|
||||
if (numberOfRows == rowsPerBatch) {
|
||||
numberOfBatches++;
|
||||
if (numberOfBatches == batchesPerTransaction) {
|
||||
executor.executeBatch(true);
|
||||
numberOfBatches = 0;
|
||||
} else {
|
||||
executor.executeBatch(false);
|
||||
}
|
||||
numberOfRows = 0;
|
||||
}
|
||||
}
|
||||
|
||||
if (numberOfRows != 0) {
|
||||
// execute and commit the remaining rows
|
||||
executor.executeBatch(true);
|
||||
}
|
||||
|
||||
executor.endBatch();
|
||||
|
||||
} finally {
|
||||
executor.close();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -17,13 +17,13 @@
|
||||
*/
|
||||
package org.apache.sqoop.connector.jdbc;
|
||||
|
||||
import org.apache.sqoop.common.MapContext;
|
||||
import org.apache.sqoop.common.ImmutableContext;
|
||||
import org.apache.sqoop.job.etl.Destroyer;
|
||||
|
||||
public class GenericJdbcImportDestroyer extends Destroyer {
|
||||
|
||||
@Override
|
||||
public void run(MapContext context) {
|
||||
public void run(ImmutableContext context) {
|
||||
// TODO Auto-generated method stub
|
||||
}
|
||||
|
||||
|
@ -25,8 +25,8 @@
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.sqoop.common.MapContext;
|
||||
import org.apache.sqoop.common.MutableMapContext;
|
||||
import org.apache.sqoop.common.ImmutableContext;
|
||||
import org.apache.sqoop.common.MutableContext;
|
||||
import org.apache.sqoop.common.SqoopException;
|
||||
import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
|
||||
import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
|
||||
@ -42,7 +42,7 @@ public class GenericJdbcImportInitializer extends Initializer {
|
||||
private GenericJdbcExecutor executor;
|
||||
|
||||
@Override
|
||||
public void initialize(MutableMapContext context, Object oConnectionConfig, Object oJobConfig) {
|
||||
public void initialize(MutableContext context, Object oConnectionConfig, Object oJobConfig) {
|
||||
ConnectionConfiguration connectionConfig = (ConnectionConfiguration)oConnectionConfig;
|
||||
ImportJobConfiguration jobConfig = (ImportJobConfiguration)oJobConfig;
|
||||
|
||||
@ -58,7 +58,7 @@ public void initialize(MutableMapContext context, Object oConnectionConfig, Obje
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getJars(MapContext context, Object connectionConfiguration, Object jobConfiguration) {
|
||||
public List<String> getJars(ImmutableContext context, Object connectionConfiguration, Object jobConfiguration) {
|
||||
List<String> jars = new LinkedList<String>();
|
||||
|
||||
ConnectionConfiguration connection = (ConnectionConfiguration) connectionConfiguration;
|
||||
@ -67,7 +67,7 @@ public List<String> getJars(MapContext context, Object connectionConfiguration,
|
||||
return jars;
|
||||
}
|
||||
|
||||
private void configureJdbcProperties(MutableMapContext context, ConnectionConfiguration connectionConfig, ImportJobConfiguration jobConfig) {
|
||||
private void configureJdbcProperties(MutableContext context, ConnectionConfiguration connectionConfig, ImportJobConfiguration jobConfig) {
|
||||
String driver = connectionConfig.jdbcDriver;
|
||||
String url = connectionConfig.connectionString;
|
||||
String username = connectionConfig.username;
|
||||
@ -107,7 +107,7 @@ private void configureJdbcProperties(MutableMapContext context, ConnectionConfig
|
||||
executor = new GenericJdbcExecutor(driver, url, username, password);
|
||||
}
|
||||
|
||||
private void configurePartitionProperties(MutableMapContext context, ConnectionConfiguration connectionConfig, ImportJobConfiguration jobConfig) {
|
||||
private void configurePartitionProperties(MutableContext context, ConnectionConfiguration connectionConfig, ImportJobConfiguration jobConfig) {
|
||||
// ----- configure column name -----
|
||||
|
||||
String partitionColumnName = connectionConfig.partitionColumn;
|
||||
@ -207,7 +207,7 @@ private void configurePartitionProperties(MutableMapContext context, ConnectionC
|
||||
}
|
||||
}
|
||||
|
||||
private void configureTableProperties(MutableMapContext context, ConnectionConfiguration connectionConfig, ImportJobConfiguration jobConfig) {
|
||||
private void configureTableProperties(MutableContext context, ConnectionConfiguration connectionConfig, ImportJobConfiguration jobConfig) {
|
||||
String dataSql;
|
||||
String fieldNames;
|
||||
String outputDirectory;
|
||||
|
@ -22,7 +22,6 @@
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.sqoop.common.ImmutableContext;
|
||||
import org.apache.sqoop.common.MapContext;
|
||||
import org.apache.sqoop.common.SqoopException;
|
||||
import org.apache.sqoop.job.Constants;
|
||||
import org.apache.sqoop.job.etl.Partition;
|
||||
|
@ -0,0 +1,164 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.sqoop.connector.jdbc;
|
||||
|
||||
import java.util.Hashtable;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
|
||||
import org.apache.sqoop.job.Constants;
|
||||
import org.apache.sqoop.job.etl.Initializer;
|
||||
//import org.apache.sqoop.job.etl.MutableContext;
|
||||
//import org.apache.sqoop.job.etl.Options;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestExportInitializer extends TestCase {
|
||||
|
||||
private final String tableName;
|
||||
private final String tableSql;
|
||||
private final String tableColumns;
|
||||
|
||||
private GenericJdbcExecutor executor;
|
||||
|
||||
public TestExportInitializer() {
|
||||
tableName = getClass().getSimpleName();
|
||||
tableSql = "INSERT INTO \"" + tableName + "\" VALUES (?,?,?)";
|
||||
tableColumns = "ICOL,VCOL";
|
||||
}
|
||||
|
||||
public void testVoid() { }
|
||||
|
||||
// @Override
|
||||
// public void setUp() {
|
||||
// executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER,
|
||||
// GenericJdbcTestConstants.URL, null, null);
|
||||
//
|
||||
// if (!executor.existTable(tableName)) {
|
||||
// executor.executeUpdate("CREATE TABLE "
|
||||
// + executor.delimitIdentifier(tableName)
|
||||
// + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))");
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// @Override
|
||||
// public void tearDown() {
|
||||
// executor.close();
|
||||
// }
|
||||
//
|
||||
// @Test
|
||||
// public void testTableName() throws Exception {
|
||||
// DummyOptions options = new DummyOptions();
|
||||
// options.setOption(GenericJdbcConnectorConstants.INPUT_CONN_JDBCDRIVER,
|
||||
// GenericJdbcTestConstants.DRIVER);
|
||||
// options.setOption(GenericJdbcConnectorConstants.INPUT_CONN_CONNECTSTRING,
|
||||
// GenericJdbcTestConstants.URL);
|
||||
// options.setOption(GenericJdbcConnectorConstants.INPUT_TBL_NAME,
|
||||
// tableName);
|
||||
//
|
||||
// DummyContext context = new DummyContext();
|
||||
//
|
||||
// Initializer initializer = new GenericJdbcExportInitializer();
|
||||
// initializer.run(context, options);
|
||||
//
|
||||
// verifyResult(context,
|
||||
// "INSERT INTO " + executor.delimitIdentifier(tableName)
|
||||
// + " VALUES (?,?,?)",
|
||||
// GenericJdbcConnectorConstants.DEFAULT_WAREHOUSE + tableName);
|
||||
// }
|
||||
//
|
||||
// @Test
|
||||
// public void testTableNameWithTableColumns() throws Exception {
|
||||
// DummyOptions options = new DummyOptions();
|
||||
// options.setOption(GenericJdbcConnectorConstants.INPUT_CONN_JDBCDRIVER,
|
||||
// GenericJdbcTestConstants.DRIVER);
|
||||
// options.setOption(GenericJdbcConnectorConstants.INPUT_CONN_CONNECTSTRING,
|
||||
// GenericJdbcTestConstants.URL);
|
||||
// options.setOption(GenericJdbcConnectorConstants.INPUT_TBL_NAME,
|
||||
// tableName);
|
||||
// options.setOption(GenericJdbcConnectorConstants.INPUT_TBL_COLUMNS,
|
||||
// tableColumns);
|
||||
//
|
||||
// DummyContext context = new DummyContext();
|
||||
//
|
||||
// Initializer initializer = new GenericJdbcExportInitializer();
|
||||
// initializer.run(context, options);
|
||||
//
|
||||
// verifyResult(context,
|
||||
// "INSERT INTO " + executor.delimitIdentifier(tableName)
|
||||
// + " (" + tableColumns + ") VALUES (?,?)",
|
||||
// GenericJdbcConnectorConstants.DEFAULT_WAREHOUSE + tableName);
|
||||
// }
|
||||
//
|
||||
// @Test
|
||||
// public void testTableSql() throws Exception {
|
||||
// DummyOptions options = new DummyOptions();
|
||||
// options.setOption(GenericJdbcConnectorConstants.INPUT_CONN_JDBCDRIVER,
|
||||
// GenericJdbcTestConstants.DRIVER);
|
||||
// options.setOption(GenericJdbcConnectorConstants.INPUT_CONN_CONNECTSTRING,
|
||||
// GenericJdbcTestConstants.URL);
|
||||
// options.setOption(GenericJdbcConnectorConstants.INPUT_TBL_SQL,
|
||||
// tableSql);
|
||||
//
|
||||
// DummyContext context = new DummyContext();
|
||||
//
|
||||
// Initializer initializer = new GenericJdbcExportInitializer();
|
||||
// initializer.run(context, options);
|
||||
//
|
||||
// verifyResult(context,
|
||||
// "INSERT INTO " + executor.delimitIdentifier(tableName)
|
||||
// + " VALUES (?,?,?)",
|
||||
// GenericJdbcConnectorConstants.DEFAULT_WAREHOUSE
|
||||
// + GenericJdbcConnectorConstants.DEFAULT_DATADIR);
|
||||
// }
|
||||
//
|
||||
// private void verifyResult(DummyContext context,
|
||||
// String dataSql, String inputDirectory) {
|
||||
// assertEquals(dataSql, context.getString(
|
||||
// GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL));
|
||||
// assertEquals(inputDirectory, context.getString(
|
||||
// Constants.JOB_ETL_INPUT_DIRECTORY));
|
||||
// }
|
||||
//
|
||||
// public class DummyOptions implements Options {
|
||||
// Hashtable<String, String> store = new Hashtable<String, String>();
|
||||
//
|
||||
// public void setOption(String key, String value) {
|
||||
// store.put(key, value);
|
||||
// }
|
||||
//
|
||||
// @Override
|
||||
// public String getOption(String key) {
|
||||
// return store.get(key);
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// public class DummyContext implements MutableContext {
|
||||
// Hashtable<String, String> store = new Hashtable<String, String>();
|
||||
//
|
||||
// @Override
|
||||
// public String getString(String key) {
|
||||
// return store.get(key);
|
||||
// }
|
||||
//
|
||||
// @Override
|
||||
// public void setString(String key, String value) {
|
||||
// store.put(key, value);
|
||||
// }
|
||||
// }
|
||||
|
||||
}
|
@ -0,0 +1,140 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.sqoop.connector.jdbc;
|
||||
|
||||
import java.sql.ResultSet;
|
||||
import java.util.HashMap;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
|
||||
import org.apache.sqoop.job.etl.Loader;
|
||||
import org.apache.sqoop.job.io.DataReader;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestExportLoader extends TestCase {
|
||||
|
||||
private final String tableName;
|
||||
|
||||
private GenericJdbcExecutor executor;
|
||||
|
||||
private static final int START = -50;
|
||||
private static final int NUMBER_OF_ROWS = 101;
|
||||
|
||||
public TestExportLoader() {
|
||||
tableName = getClass().getSimpleName();
|
||||
}
|
||||
|
||||
public void testVoid() { }
|
||||
|
||||
// @Override
|
||||
// public void setUp() {
|
||||
// executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER,
|
||||
// GenericJdbcTestConstants.URL, null, null);
|
||||
//
|
||||
// if (!executor.existTable(tableName)) {
|
||||
// executor.executeUpdate("CREATE TABLE "
|
||||
// + executor.delimitIdentifier(tableName)
|
||||
// + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))");
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// @Override
|
||||
// public void tearDown() {
|
||||
// executor.close();
|
||||
// }
|
||||
//
|
||||
// @Test
|
||||
// public void testInsert() throws Exception {
|
||||
// DummyContext context = new DummyContext();
|
||||
// context.setString(
|
||||
// GenericJdbcConnectorConstants.CONNECTOR_JDBC_DRIVER,
|
||||
// GenericJdbcTestConstants.DRIVER);
|
||||
// context.setString(
|
||||
// GenericJdbcConnectorConstants.CONNECTOR_JDBC_URL,
|
||||
// GenericJdbcTestConstants.URL);
|
||||
// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL,
|
||||
// "INSERT INTO " + executor.delimitIdentifier(tableName)
|
||||
// + " VALUES (?,?,?)");
|
||||
//
|
||||
// Loader loader = new GenericJdbcExportLoader();
|
||||
// DummyReader reader = new DummyReader();
|
||||
//
|
||||
// loader.run(context, reader);
|
||||
//
|
||||
// int index = START;
|
||||
// ResultSet rs = executor.executeQuery("SELECT * FROM "
|
||||
// + executor.delimitIdentifier(tableName) + " ORDER BY ICOL");
|
||||
// while (rs.next()) {
|
||||
// assertEquals(Integer.valueOf(index), rs.getObject(1));
|
||||
// assertEquals(Double.valueOf(index), rs.getObject(2));
|
||||
// assertEquals(String.valueOf(index), rs.getObject(3));
|
||||
// index++;
|
||||
// }
|
||||
// assertEquals(NUMBER_OF_ROWS, index-START);
|
||||
// }
|
||||
//
|
||||
// public class DummyContext implements MutableContext {
|
||||
// HashMap<String, String> store = new HashMap<String, String>();
|
||||
//
|
||||
// @Override
|
||||
// public String getString(String key) {
|
||||
// return store.get(key);
|
||||
// }
|
||||
//
|
||||
// @Override
|
||||
// public void setString(String key, String value) {
|
||||
// store.put(key, value);
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// public class DummyReader extends DataReader {
|
||||
// int index = 0;
|
||||
//
|
||||
// @Override
|
||||
// public void setFieldDelimiter(char fieldDelimiter) {
|
||||
// // do nothing and use default delimiter
|
||||
// }
|
||||
//
|
||||
// @Override
|
||||
// public Object[] readArrayRecord() {
|
||||
// if (index < NUMBER_OF_ROWS) {
|
||||
// Object[] array = new Object[] {
|
||||
// new Integer(START+index),
|
||||
// new Double(START+index),
|
||||
// String.valueOf(START+index) };
|
||||
// index++;
|
||||
// return array;
|
||||
// } else {
|
||||
// return null;
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// @Override
|
||||
// public String readCsvRecord() {
|
||||
// fail("This method should not be invoked.");
|
||||
// return null;
|
||||
// }
|
||||
//
|
||||
// @Override
|
||||
// public Object readContent(int type) {
|
||||
// fail("This method should not be invoked.");
|
||||
// return null;
|
||||
// }
|
||||
// }
|
||||
|
||||
}
|
@ -34,6 +34,9 @@ public class Constants {
|
||||
public static final String JOB_ETL_OUTPUT_DIRECTORY = PREFIX_CONFIG
|
||||
+ "etl.output.directory";
|
||||
|
||||
public static final String JOB_ETL_INPUT_DIRECTORY = PREFIX_CONFIG
|
||||
+ "etl.input.directory";
|
||||
|
||||
protected Constants() {
|
||||
// Disable explicit object creation
|
||||
}
|
||||
|
@ -17,7 +17,7 @@
|
||||
*/
|
||||
package org.apache.sqoop.job.etl;
|
||||
|
||||
import org.apache.sqoop.common.MapContext;
|
||||
import org.apache.sqoop.common.ImmutableContext;
|
||||
|
||||
/**
|
||||
* This allows connector to define work to complete execution, for example,
|
||||
@ -25,7 +25,6 @@
|
||||
*/
|
||||
public abstract class Destroyer {
|
||||
|
||||
// TODO(Jarcec): This should be called with ImmutableContext
|
||||
public abstract void run(MapContext context);
|
||||
public abstract void run(ImmutableContext context);
|
||||
|
||||
}
|
||||
|
@ -17,8 +17,8 @@
|
||||
*/
|
||||
package org.apache.sqoop.job.etl;
|
||||
|
||||
import org.apache.sqoop.common.MapContext;
|
||||
import org.apache.sqoop.common.MutableMapContext;
|
||||
import org.apache.sqoop.common.ImmutableContext;
|
||||
import org.apache.sqoop.common.MutableContext;
|
||||
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
@ -38,7 +38,7 @@ public abstract class Initializer {
|
||||
* @param connectionConfiguration Connector's connection configuration object
|
||||
* @param jobConfiguration Connector's job configuration object
|
||||
*/
|
||||
public abstract void initialize(MutableMapContext context,
|
||||
public abstract void initialize(MutableContext context,
|
||||
Object connectionConfiguration,
|
||||
Object jobConfiguration);
|
||||
|
||||
@ -49,7 +49,7 @@ public abstract void initialize(MutableMapContext context,
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
public List<String> getJars(MapContext context,
|
||||
public List<String> getJars(ImmutableContext context,
|
||||
Object connectionConfiguration,
|
||||
Object jobConfiguration) {
|
||||
return new LinkedList<String>();
|
||||
|
Loading…
Reference in New Issue
Block a user