diff --git a/common/src/main/java/org/apache/sqoop/common/MapContext.java b/common/src/main/java/org/apache/sqoop/common/MapContext.java index c1d24ad9..b2451483 100644 --- a/common/src/main/java/org/apache/sqoop/common/MapContext.java +++ b/common/src/main/java/org/apache/sqoop/common/MapContext.java @@ -18,7 +18,6 @@ package org.apache.sqoop.common; import java.util.HashMap; -import java.util.Iterator; import java.util.Map; /** diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java index 226fcd3b..2dba8afe 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java @@ -20,6 +20,7 @@ import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.DriverManager; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; @@ -30,6 +31,7 @@ public class GenericJdbcExecutor { private Connection connection; + private PreparedStatement preparedStatement; public GenericJdbcExecutor(String driver, String url, String username, String password) { @@ -71,6 +73,52 @@ public void executeUpdate(String sql) { } } + public void beginBatch(String sql) { + try { + preparedStatement = connection.prepareStatement(sql, + ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + + } catch (SQLException e) { + throw new SqoopException( + GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0002, e); + } + } + + public void addBatch(Object[] array) { + try { + for (int i=0; i getJars(ImmutableContext context, Object connectionConfiguration, Object jobConfiguration) { + List jars = new LinkedList(); + + ConnectionConfiguration connection = (ConnectionConfiguration) connectionConfiguration; + jars.add(ClassUtils.jarForClass(connection.jdbcDriver)); + + return jars; + } + + private void configureJdbcProperties(MutableContext context, ConnectionConfiguration connectionConfig, ExportJobConfiguration jobConfig) { + String driver = connectionConfig.jdbcDriver; + String url = connectionConfig.connectionString; + String username = connectionConfig.username; + String password = connectionConfig.password; + + if (driver == null) { + throw new SqoopException( + GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0012, + GenericJdbcConnectorConstants.INPUT_CONN_JDBCDRIVER); + } + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_DRIVER, + driver); + + if (url == null) { + throw new SqoopException( + GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0012, + GenericJdbcConnectorConstants.INPUT_CONN_CONNECTSTRING); + } + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_URL, + url); + + if (username != null) { + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_USERNAME, + username); + } + + if (password != null) { + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PASSWORD, + password); + } + + executor = new GenericJdbcExecutor(driver, url, username, password); + } + + private void configureTableProperties(MutableContext context, ConnectionConfiguration connectionConfig, ExportJobConfiguration jobConfig) { + String dataSql; + String inputDirectory; + + String tableName = connectionConfig.tableName; + String tableSql = connectionConfig.sql; + String tableColumns = connectionConfig.columns; + + String datadir = connectionConfig.dataDirectory; + String warehouse = connectionConfig.warehouse; + if (warehouse == null) { + warehouse = GenericJdbcConnectorConstants.DEFAULT_WAREHOUSE; + } else if (!warehouse.endsWith(GenericJdbcConnectorConstants.FILE_SEPARATOR)) { + warehouse += GenericJdbcConnectorConstants.FILE_SEPARATOR; + } + + if (tableName != null && tableSql != null) { + // when both table name and table sql are specified: + throw new SqoopException( + GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0007); + + } else if (tableName != null) { + // when table name is specified: + + if (tableColumns == null) { + String[] columns = executor.getQueryColumns("SELECT * FROM " + + executor.delimitIdentifier(tableName) + " WHERE 1 = 0"); + StringBuilder builder = new StringBuilder(); + builder.append("INSERT INTO "); + builder.append(executor.delimitIdentifier(tableName)); + builder.append(" VALUES (?"); + for (int i = 1; i < columns.length; i++) { + builder.append(",?"); + } + builder.append(")"); + dataSql = builder.toString(); + + } else { + String[] columns = StringUtils.split(tableColumns, ','); + StringBuilder builder = new StringBuilder(); + builder.append("INSERT INTO "); + builder.append(executor.delimitIdentifier(tableName)); + builder.append(" ("); + builder.append(tableColumns); + builder.append(") VALUES (?"); + for (int i = 1; i < columns.length; i++) { + builder.append(",?"); + } + builder.append(")"); + dataSql = builder.toString(); + } + + if (datadir == null) { + inputDirectory = warehouse + tableName; + } else { + inputDirectory = warehouse + datadir; + } + + } else if (tableSql != null) { + // when table sql is specified: + + if (tableSql.indexOf( + GenericJdbcConnectorConstants.SQL_PARAMETER_MARKER) == -1) { + // make sure parameter marker is in the specified sql + throw new SqoopException( + GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0013); + } + + if (tableColumns == null) { + dataSql = tableSql; + } else { + throw new SqoopException( + GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0014); + } + + if (datadir == null) { + inputDirectory = + warehouse + GenericJdbcConnectorConstants.DEFAULT_DATADIR; + } else { + inputDirectory = warehouse + datadir; + } + + } else { + // when neither are specified: + throw new SqoopException( + GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0008); + } + + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL, + dataSql.toString()); + context.setString(Constants.JOB_ETL_INPUT_DIRECTORY, inputDirectory); } } diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java index 4cf0595c..ff7384c6 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java @@ -23,9 +23,58 @@ public class GenericJdbcExportLoader extends Loader { + public static final int DEFAULT_ROWS_PER_BATCH = 100; + public static final int DEFAULT_BATCHES_PER_TRANSACTION = 100; + private int rowsPerBatch = DEFAULT_ROWS_PER_BATCH; + private int batchesPerTransaction = DEFAULT_BATCHES_PER_TRANSACTION; + @Override public void run(ImmutableContext context, DataReader reader) { - // TODO Auto-generated method stub + String driver = context.getString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_DRIVER); + String url = context.getString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_URL); + String username = context.getString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_USERNAME); + String password = context.getString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PASSWORD); + GenericJdbcExecutor executor = new GenericJdbcExecutor( + driver, url, username, password); + + String sql = context.getString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL); + executor.beginBatch(sql); + try { + int numberOfRows = 0; + int numberOfBatches = 0; + Object[] array; + + while ((array = reader.readArrayRecord()) != null) { + numberOfRows++; + executor.addBatch(array); + + if (numberOfRows == rowsPerBatch) { + numberOfBatches++; + if (numberOfBatches == batchesPerTransaction) { + executor.executeBatch(true); + numberOfBatches = 0; + } else { + executor.executeBatch(false); + } + numberOfRows = 0; + } + } + + if (numberOfRows != 0) { + // execute and commit the remaining rows + executor.executeBatch(true); + } + + executor.endBatch(); + + } finally { + executor.close(); + } } } diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportDestroyer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportDestroyer.java index 3f6718de..a53fa595 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportDestroyer.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportDestroyer.java @@ -17,13 +17,13 @@ */ package org.apache.sqoop.connector.jdbc; -import org.apache.sqoop.common.MapContext; +import org.apache.sqoop.common.ImmutableContext; import org.apache.sqoop.job.etl.Destroyer; public class GenericJdbcImportDestroyer extends Destroyer { @Override - public void run(MapContext context) { + public void run(ImmutableContext context) { // TODO Auto-generated method stub } diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java index 2075d996..f8e941cb 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java @@ -25,8 +25,8 @@ import org.apache.commons.lang.StringUtils; import org.apache.log4j.Logger; -import org.apache.sqoop.common.MapContext; -import org.apache.sqoop.common.MutableMapContext; +import org.apache.sqoop.common.ImmutableContext; +import org.apache.sqoop.common.MutableContext; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration; @@ -42,7 +42,7 @@ public class GenericJdbcImportInitializer extends Initializer { private GenericJdbcExecutor executor; @Override - public void initialize(MutableMapContext context, Object oConnectionConfig, Object oJobConfig) { + public void initialize(MutableContext context, Object oConnectionConfig, Object oJobConfig) { ConnectionConfiguration connectionConfig = (ConnectionConfiguration)oConnectionConfig; ImportJobConfiguration jobConfig = (ImportJobConfiguration)oJobConfig; @@ -58,7 +58,7 @@ public void initialize(MutableMapContext context, Object oConnectionConfig, Obje } @Override - public List getJars(MapContext context, Object connectionConfiguration, Object jobConfiguration) { + public List getJars(ImmutableContext context, Object connectionConfiguration, Object jobConfiguration) { List jars = new LinkedList(); ConnectionConfiguration connection = (ConnectionConfiguration) connectionConfiguration; @@ -67,7 +67,7 @@ public List getJars(MapContext context, Object connectionConfiguration, return jars; } - private void configureJdbcProperties(MutableMapContext context, ConnectionConfiguration connectionConfig, ImportJobConfiguration jobConfig) { + private void configureJdbcProperties(MutableContext context, ConnectionConfiguration connectionConfig, ImportJobConfiguration jobConfig) { String driver = connectionConfig.jdbcDriver; String url = connectionConfig.connectionString; String username = connectionConfig.username; @@ -107,7 +107,7 @@ private void configureJdbcProperties(MutableMapContext context, ConnectionConfig executor = new GenericJdbcExecutor(driver, url, username, password); } - private void configurePartitionProperties(MutableMapContext context, ConnectionConfiguration connectionConfig, ImportJobConfiguration jobConfig) { + private void configurePartitionProperties(MutableContext context, ConnectionConfiguration connectionConfig, ImportJobConfiguration jobConfig) { // ----- configure column name ----- String partitionColumnName = connectionConfig.partitionColumn; @@ -207,7 +207,7 @@ private void configurePartitionProperties(MutableMapContext context, ConnectionC } } - private void configureTableProperties(MutableMapContext context, ConnectionConfiguration connectionConfig, ImportJobConfiguration jobConfig) { + private void configureTableProperties(MutableContext context, ConnectionConfiguration connectionConfig, ImportJobConfiguration jobConfig) { String dataSql; String fieldNames; String outputDirectory; diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java index 50714719..a6d3b52f 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java @@ -22,7 +22,6 @@ import java.util.List; import org.apache.sqoop.common.ImmutableContext; -import org.apache.sqoop.common.MapContext; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.job.Constants; import org.apache.sqoop.job.etl.Partition; diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java new file mode 100644 index 00000000..532e6fdc --- /dev/null +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.jdbc; + +import java.util.Hashtable; + +import junit.framework.TestCase; + +import org.apache.sqoop.job.Constants; +import org.apache.sqoop.job.etl.Initializer; +//import org.apache.sqoop.job.etl.MutableContext; +//import org.apache.sqoop.job.etl.Options; +import org.junit.Test; + +public class TestExportInitializer extends TestCase { + + private final String tableName; + private final String tableSql; + private final String tableColumns; + + private GenericJdbcExecutor executor; + + public TestExportInitializer() { + tableName = getClass().getSimpleName(); + tableSql = "INSERT INTO \"" + tableName + "\" VALUES (?,?,?)"; + tableColumns = "ICOL,VCOL"; + } + + public void testVoid() { } + +// @Override +// public void setUp() { +// executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER, +// GenericJdbcTestConstants.URL, null, null); +// +// if (!executor.existTable(tableName)) { +// executor.executeUpdate("CREATE TABLE " +// + executor.delimitIdentifier(tableName) +// + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))"); +// } +// } +// +// @Override +// public void tearDown() { +// executor.close(); +// } +// +// @Test +// public void testTableName() throws Exception { +// DummyOptions options = new DummyOptions(); +// options.setOption(GenericJdbcConnectorConstants.INPUT_CONN_JDBCDRIVER, +// GenericJdbcTestConstants.DRIVER); +// options.setOption(GenericJdbcConnectorConstants.INPUT_CONN_CONNECTSTRING, +// GenericJdbcTestConstants.URL); +// options.setOption(GenericJdbcConnectorConstants.INPUT_TBL_NAME, +// tableName); +// +// DummyContext context = new DummyContext(); +// +// Initializer initializer = new GenericJdbcExportInitializer(); +// initializer.run(context, options); +// +// verifyResult(context, +// "INSERT INTO " + executor.delimitIdentifier(tableName) +// + " VALUES (?,?,?)", +// GenericJdbcConnectorConstants.DEFAULT_WAREHOUSE + tableName); +// } +// +// @Test +// public void testTableNameWithTableColumns() throws Exception { +// DummyOptions options = new DummyOptions(); +// options.setOption(GenericJdbcConnectorConstants.INPUT_CONN_JDBCDRIVER, +// GenericJdbcTestConstants.DRIVER); +// options.setOption(GenericJdbcConnectorConstants.INPUT_CONN_CONNECTSTRING, +// GenericJdbcTestConstants.URL); +// options.setOption(GenericJdbcConnectorConstants.INPUT_TBL_NAME, +// tableName); +// options.setOption(GenericJdbcConnectorConstants.INPUT_TBL_COLUMNS, +// tableColumns); +// +// DummyContext context = new DummyContext(); +// +// Initializer initializer = new GenericJdbcExportInitializer(); +// initializer.run(context, options); +// +// verifyResult(context, +// "INSERT INTO " + executor.delimitIdentifier(tableName) +// + " (" + tableColumns + ") VALUES (?,?)", +// GenericJdbcConnectorConstants.DEFAULT_WAREHOUSE + tableName); +// } +// +// @Test +// public void testTableSql() throws Exception { +// DummyOptions options = new DummyOptions(); +// options.setOption(GenericJdbcConnectorConstants.INPUT_CONN_JDBCDRIVER, +// GenericJdbcTestConstants.DRIVER); +// options.setOption(GenericJdbcConnectorConstants.INPUT_CONN_CONNECTSTRING, +// GenericJdbcTestConstants.URL); +// options.setOption(GenericJdbcConnectorConstants.INPUT_TBL_SQL, +// tableSql); +// +// DummyContext context = new DummyContext(); +// +// Initializer initializer = new GenericJdbcExportInitializer(); +// initializer.run(context, options); +// +// verifyResult(context, +// "INSERT INTO " + executor.delimitIdentifier(tableName) +// + " VALUES (?,?,?)", +// GenericJdbcConnectorConstants.DEFAULT_WAREHOUSE +// + GenericJdbcConnectorConstants.DEFAULT_DATADIR); +// } +// +// private void verifyResult(DummyContext context, +// String dataSql, String inputDirectory) { +// assertEquals(dataSql, context.getString( +// GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL)); +// assertEquals(inputDirectory, context.getString( +// Constants.JOB_ETL_INPUT_DIRECTORY)); +// } +// +// public class DummyOptions implements Options { +// Hashtable store = new Hashtable(); +// +// public void setOption(String key, String value) { +// store.put(key, value); +// } +// +// @Override +// public String getOption(String key) { +// return store.get(key); +// } +// } +// +// public class DummyContext implements MutableContext { +// Hashtable store = new Hashtable(); +// +// @Override +// public String getString(String key) { +// return store.get(key); +// } +// +// @Override +// public void setString(String key, String value) { +// store.put(key, value); +// } +// } + +} diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java new file mode 100644 index 00000000..649808d4 --- /dev/null +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.jdbc; + +import java.sql.ResultSet; +import java.util.HashMap; + +import junit.framework.TestCase; + +import org.apache.sqoop.job.etl.Loader; +import org.apache.sqoop.job.io.DataReader; +import org.junit.Test; + +public class TestExportLoader extends TestCase { + + private final String tableName; + + private GenericJdbcExecutor executor; + + private static final int START = -50; + private static final int NUMBER_OF_ROWS = 101; + + public TestExportLoader() { + tableName = getClass().getSimpleName(); + } + + public void testVoid() { } + +// @Override +// public void setUp() { +// executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER, +// GenericJdbcTestConstants.URL, null, null); +// +// if (!executor.existTable(tableName)) { +// executor.executeUpdate("CREATE TABLE " +// + executor.delimitIdentifier(tableName) +// + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))"); +// } +// } +// +// @Override +// public void tearDown() { +// executor.close(); +// } +// +// @Test +// public void testInsert() throws Exception { +// DummyContext context = new DummyContext(); +// context.setString( +// GenericJdbcConnectorConstants.CONNECTOR_JDBC_DRIVER, +// GenericJdbcTestConstants.DRIVER); +// context.setString( +// GenericJdbcConnectorConstants.CONNECTOR_JDBC_URL, +// GenericJdbcTestConstants.URL); +// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL, +// "INSERT INTO " + executor.delimitIdentifier(tableName) +// + " VALUES (?,?,?)"); +// +// Loader loader = new GenericJdbcExportLoader(); +// DummyReader reader = new DummyReader(); +// +// loader.run(context, reader); +// +// int index = START; +// ResultSet rs = executor.executeQuery("SELECT * FROM " +// + executor.delimitIdentifier(tableName) + " ORDER BY ICOL"); +// while (rs.next()) { +// assertEquals(Integer.valueOf(index), rs.getObject(1)); +// assertEquals(Double.valueOf(index), rs.getObject(2)); +// assertEquals(String.valueOf(index), rs.getObject(3)); +// index++; +// } +// assertEquals(NUMBER_OF_ROWS, index-START); +// } +// +// public class DummyContext implements MutableContext { +// HashMap store = new HashMap(); +// +// @Override +// public String getString(String key) { +// return store.get(key); +// } +// +// @Override +// public void setString(String key, String value) { +// store.put(key, value); +// } +// } +// +// public class DummyReader extends DataReader { +// int index = 0; +// +// @Override +// public void setFieldDelimiter(char fieldDelimiter) { +// // do nothing and use default delimiter +// } +// +// @Override +// public Object[] readArrayRecord() { +// if (index < NUMBER_OF_ROWS) { +// Object[] array = new Object[] { +// new Integer(START+index), +// new Double(START+index), +// String.valueOf(START+index) }; +// index++; +// return array; +// } else { +// return null; +// } +// } +// +// @Override +// public String readCsvRecord() { +// fail("This method should not be invoked."); +// return null; +// } +// +// @Override +// public Object readContent(int type) { +// fail("This method should not be invoked."); +// return null; +// } +// } + +} diff --git a/spi/src/main/java/org/apache/sqoop/job/Constants.java b/spi/src/main/java/org/apache/sqoop/job/Constants.java index 927950d9..90935cfb 100644 --- a/spi/src/main/java/org/apache/sqoop/job/Constants.java +++ b/spi/src/main/java/org/apache/sqoop/job/Constants.java @@ -34,6 +34,9 @@ public class Constants { public static final String JOB_ETL_OUTPUT_DIRECTORY = PREFIX_CONFIG + "etl.output.directory"; + public static final String JOB_ETL_INPUT_DIRECTORY = PREFIX_CONFIG + + "etl.input.directory"; + protected Constants() { // Disable explicit object creation } diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java b/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java index 37b9f1b5..c8dc7c3d 100644 --- a/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java +++ b/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java @@ -17,7 +17,7 @@ */ package org.apache.sqoop.job.etl; -import org.apache.sqoop.common.MapContext; +import org.apache.sqoop.common.ImmutableContext; /** * This allows connector to define work to complete execution, for example, @@ -25,7 +25,6 @@ */ public abstract class Destroyer { - // TODO(Jarcec): This should be called with ImmutableContext - public abstract void run(MapContext context); + public abstract void run(ImmutableContext context); } diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java b/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java index 20928154..685378fb 100644 --- a/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java +++ b/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java @@ -17,8 +17,8 @@ */ package org.apache.sqoop.job.etl; -import org.apache.sqoop.common.MapContext; -import org.apache.sqoop.common.MutableMapContext; +import org.apache.sqoop.common.ImmutableContext; +import org.apache.sqoop.common.MutableContext; import java.util.LinkedList; import java.util.List; @@ -38,7 +38,7 @@ public abstract class Initializer { * @param connectionConfiguration Connector's connection configuration object * @param jobConfiguration Connector's job configuration object */ - public abstract void initialize(MutableMapContext context, + public abstract void initialize(MutableContext context, Object connectionConfiguration, Object jobConfiguration); @@ -49,7 +49,7 @@ public abstract void initialize(MutableMapContext context, * * @return */ - public List getJars(MapContext context, + public List getJars(ImmutableContext context, Object connectionConfiguration, Object jobConfiguration) { return new LinkedList();