5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-06 04:51:51 +08:00

SQOOP-1591: Sqoop2: PostgreSQL integration tests

(Abraham Elmahrek via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2015-02-03 20:25:04 -08:00
parent 27d87b4f2f
commit 332a7bdd80
22 changed files with 1543 additions and 97 deletions

View File

@ -22,6 +22,7 @@
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
@ -88,6 +89,20 @@ abstract public class DatabaseProvider {
*/
abstract public String escapeTableName(String tableName);
/**
* Escape schema name based on specific database requirements.
*
* @param schemaName Schema name
* @return Escaped schemaname
*/
public String escapeSchemaName(String schemaName) {
if (!isSupportingScheme()) {
throw new UnsupportedOperationException("Schema is not supported in this database");
}
return schemaName;
}
/**
* Escape string value that can be safely used in the queries.
*
@ -118,6 +133,26 @@ public String getJdbcDriver() {
return null;
}
/**
* Get full table name with qualifications
* @param schemaName
* @param tableName
* @param escape
* @return String table name
*/
public String getTableName(String schemaName, String tableName, boolean escape) {
StringBuilder sb = new StringBuilder();
if (schemaName != null) {
sb.append(escape ? escapeSchemaName(schemaName) : schemaName);
sb.append(".");
}
sb.append(escape ? escapeTableName(tableName) : tableName);
return sb.toString();
}
/**
* Start the handler.
*/
@ -216,6 +251,46 @@ public ResultSet executeQuery(String query) {
}
}
/**
* Execute given insert query in a new statement object and return
* generated IDs.
*
* @param query Query to execute
* @return Generated ID.
*/
public Long executeInsertQuery(String query, Object... args) {
LOG.info("Executing query: " + query);
ResultSet rs = null;
try {
PreparedStatement stmt = databaseConnection.prepareStatement(query, PreparedStatement.RETURN_GENERATED_KEYS);
for (int i = 0; i < args.length; ++i) {
if (args[i] instanceof String) {
stmt.setString(i + 1, (String) args[i]);
} else if (args[i] instanceof Long) {
stmt.setLong(i + 1, (Long) args[i]);
} else if (args[i] instanceof Boolean) {
stmt.setBoolean(i + 1, (Boolean) args[i]);
} else {
stmt.setObject(i + 1, args[i]);
}
}
stmt.execute();
rs = stmt.getGeneratedKeys();
if (rs.next()) {
return rs.getLong(1);
}
} catch (SQLException e) {
LOG.error("Error in executing query", e);
throw new RuntimeException("Error in executing query", e);
} finally {
closeResultSetWithStatement(rs);
}
return -1L;
}
/**
* Create new table.
*
@ -357,7 +432,7 @@ public void dropTable(String tableName) {
*/
public void dropSchema(String schemaName) {
StringBuilder sb = new StringBuilder("DROP SCHEMA ");
sb.append(escapeTableName(schemaName));
sb.append(escapeSchemaName(schemaName));
sb.append(" CASCADE");
try {
@ -370,12 +445,13 @@ public void dropSchema(String schemaName) {
/**
* Return number of rows from given table.
*
* @param schemaName Schema name
* @param tableName Table name
* @return Number of rows
*/
public long rowCount(String tableName) {
public long rowCount(String schemaName, String tableName) {
StringBuilder sb = new StringBuilder("SELECT COUNT(*) FROM ");
sb.append(escapeTableName(tableName));
sb.append(getTableName(schemaName, tableName, true));
ResultSet rs = null;
try {
@ -393,6 +469,16 @@ public long rowCount(String tableName) {
}
}
/**
* Return number of rows from a given table.
*
* @param tableName
* @return Number of rows
*/
public long rowCount(String tableName) {
return rowCount(null, tableName);
}
/**
* Close given result set (if not null) and associated statement.
*

View File

@ -127,6 +127,11 @@ public String escapeTableName(String tableName) {
return escape(tableName);
}
@Override
public String escapeSchemaName(String schemaName) {
return escape(schemaName);
}
@Override
public String escapeValueString(String value) {
return "'" + value + "'";

View File

@ -67,6 +67,11 @@ public String escapeTableName(String tableName) {
return escapeObjectName(tableName);
}
@Override
public String escapeSchemaName(String schemaName) {
return schemaName;
}
public String escapeObjectName(String name) {
return '"' + name + '"';
}

View File

@ -67,6 +67,11 @@ public String escapeTableName(String tableName) {
return escape(tableName);
}
@Override
public String escapeSchemaName(String schemaName) {
return escape(schemaName);
}
@Override
public String escapeValueString(String value) {
return "'" + value + "'";

View File

@ -67,6 +67,11 @@ public String escapeTableName(String tableName) {
return escape(tableName);
}
@Override
public String escapeSchemaName(String schemaName) {
return escape(schemaName);
}
@Override
public String escapeValueString(String value) {
return "'" + value + "'";

View File

@ -67,6 +67,11 @@ public String escapeTableName(String tableName) {
return escape(tableName);
}
@Override
public String escapeSchemaName(String schemaName) {
return escape(schemaName);
}
@Override
public String escapeValueString(String value) {
return "'" + value + "'";

View File

@ -67,6 +67,11 @@ public String escapeTableName(String tableName) {
return tableName;
}
@Override
public String escapeSchemaName(String schemaName) {
return schemaName;
}
@Override
public String escapeValueString(String value) {
return "'" + value + "'";

View File

@ -70,7 +70,69 @@ limitations under the License.
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<excludedGroups>postgresql</excludedGroups>
<excludes>
<exclude>**/integration/**</exclude>
</excludes>
<systemPropertyVariables>
<sqoop.integration.tmpdir>${project.build.directory}</sqoop.integration.tmpdir>
</systemPropertyVariables>
</configuration>
<executions>
<execution>
<id>integration-test</id>
<goals>
<goal>test</goal>
</goals>
<phase>integration-test</phase>
<configuration>
<excludes>
<exclude>none</exclude>
</excludes>
<includes>
<include>**/integration/**</include>
</includes>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>postgresql</id>
<activation>
<property>
<name>postgresql</name>
</property>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<groups>postgresql</groups>
<excludedGroups>none</excludedGroups>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>

View File

@ -23,7 +23,7 @@ public class PostgresqlRepoConstants {
* Expected version of the repository structures.
*
* History:
* 1 - Version 1.99.4
* 1 - Version 1.99.5
*/
public static final int LATEST_POSTGRESQL_REPOSITORY_VERSION = 1;

View File

@ -17,9 +17,6 @@
*/
package org.apache.sqoop.repository.postgresql;
import static org.apache.sqoop.repository.postgresql.PostgresqlSchemaQuery.*;
import static org.apache.sqoop.repository.postgresql.PostgresqlSchemaCreateQuery.*;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.common.SqoopException;
@ -85,7 +82,7 @@ public int detectRepositoryVersion(Connection conn) {
PostgresqlSchemaConstants.TABLE_SQ_SYSTEM_NAME.toLowerCase(), null);
if (metadataResultSet.next()) {
stmt = conn.prepareStatement(STMT_SELECT_SYSTEM);
stmt = conn.prepareStatement(PostgresqlSchemaQuery.STMT_SELECT_SYSTEM);
stmt.setString(1, CommonRepoConstants.SYSKEY_VERSION);
rs = stmt.executeQuery();
@ -118,22 +115,22 @@ public void createOrUpgradeRepository(Connection conn) {
}
if (version == 0) {
runQuery(QUERY_CREATE_SCHEMA_SQOOP, conn);
runQuery(QUERY_CREATE_TABLE_SQ_CONFIGURABLE, conn);
runQuery(QUERY_CREATE_TABLE_SQ_CONFIG, conn);
runQuery(QUERY_CREATE_TABLE_SQ_INPUT, conn);
runQuery(QUERY_CREATE_TABLE_SQ_LINK, conn);
runQuery(QUERY_CREATE_TABLE_SQ_JOB, conn);
runQuery(QUERY_CREATE_TABLE_SQ_LINK_INPUT, conn);
runQuery(QUERY_CREATE_TABLE_SQ_JOB_INPUT, conn);
runQuery(QUERY_CREATE_TABLE_SQ_SUBMISSION, conn);
runQuery(QUERY_CREATE_TABLE_SQ_COUNTER_GROUP, conn);
runQuery(QUERY_CREATE_TABLE_SQ_COUNTER, conn);
runQuery(QUERY_CREATE_TABLE_SQ_COUNTER_SUBMISSION, conn);
runQuery(QUERY_CREATE_TABLE_SQ_SYSTEM, conn);
runQuery(QUERY_CREATE_TABLE_SQ_DIRECTION, conn);
runQuery(QUERY_CREATE_TABLE_SQ_CONNECTOR_DIRECTIONS, conn);
runQuery(QUERY_CREATE_TABLE_SQ_CONFIG_DIRECTIONS, conn);
runQuery(PostgresqlSchemaCreateQuery.QUERY_CREATE_SCHEMA_SQOOP, conn);
runQuery(PostgresqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_CONFIGURABLE, conn);
runQuery(PostgresqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_CONFIG, conn);
runQuery(PostgresqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_INPUT, conn);
runQuery(PostgresqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_LINK, conn);
runQuery(PostgresqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_JOB, conn);
runQuery(PostgresqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_LINK_INPUT, conn);
runQuery(PostgresqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_JOB_INPUT, conn);
runQuery(PostgresqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_SUBMISSION, conn);
runQuery(PostgresqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_COUNTER_GROUP, conn);
runQuery(PostgresqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_COUNTER, conn);
runQuery(PostgresqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_COUNTER_SUBMISSION, conn);
runQuery(PostgresqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_SYSTEM, conn);
runQuery(PostgresqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_DIRECTION, conn);
runQuery(PostgresqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_CONNECTOR_DIRECTIONS, conn);
runQuery(PostgresqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_CONFIG_DIRECTIONS, conn);
// Insert FROM and TO directions.
insertDirections(conn);
@ -145,13 +142,13 @@ public void createOrUpgradeRepository(Connection conn) {
ResultSet rs = null;
PreparedStatement stmt = null;
try {
stmt = conn.prepareStatement(STMT_DELETE_SYSTEM);
stmt = conn.prepareStatement(PostgresqlSchemaQuery.STMT_DELETE_SYSTEM);
stmt.setString(1, CommonRepoConstants.SYSKEY_VERSION);
stmt.executeUpdate();
closeStatements(stmt);
stmt = conn.prepareStatement(STMT_INSERT_SYSTEM);
stmt = conn.prepareStatement(PostgresqlSchemaQuery.STMT_INSERT_SYSTEM);
stmt.setString(1, CommonRepoConstants.SYSKEY_VERSION);
stmt.setString(2, Integer.toString(PostgresqlRepoConstants.LATEST_POSTGRESQL_REPOSITORY_VERSION));
stmt.executeUpdate();
@ -175,7 +172,7 @@ protected Map<Direction, Long> insertDirections(Connection conn) {
try {
// Insert directions and get IDs.
for (Direction direction : Direction.values()) {
insertDirectionStmt = conn.prepareStatement(STMT_INSERT_DIRECTION, Statement.RETURN_GENERATED_KEYS);
insertDirectionStmt = conn.prepareStatement(PostgresqlSchemaQuery.STMT_INSERT_DIRECTION, Statement.RETURN_GENERATED_KEYS);
insertDirectionStmt.setString(1, direction.toString());
if (insertDirectionStmt.executeUpdate() != 1) {
throw new SqoopException(PostgresqlRepoError.POSTGRESQLREPO_0003, "Could not add directions FROM and TO.");

View File

@ -0,0 +1,166 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.integration.repository.postgresql;
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.common.test.db.DatabaseProvider;
import org.apache.sqoop.common.test.db.PostgreSQLProvider;
import org.apache.sqoop.json.DriverBean;
import org.apache.sqoop.model.*;
import org.apache.sqoop.repository.postgresql.PostgresqlRepositoryHandler;
import org.apache.sqoop.submission.SubmissionStatus;
import org.apache.sqoop.submission.counter.CounterGroup;
import org.apache.sqoop.submission.counter.Counters;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
/**
* Abstract class with convenience methods for testing postgresql repository.
*/
abstract public class PostgresqlTestCase {
public static DatabaseProvider provider;
public static PostgresqlTestUtils utils;
public PostgresqlRepositoryHandler handler;
@BeforeClass(alwaysRun = true)
public void setUpClass() {
provider = new PostgreSQLProvider();
utils = new PostgresqlTestUtils(provider);
}
@BeforeMethod(alwaysRun = true)
public void setUp() throws Exception {
provider.start();
handler = new PostgresqlRepositoryHandler();
handler.createOrUpgradeRepository(provider.getConnection());
}
@AfterMethod(alwaysRun = true)
public void tearDown() throws Exception {
provider.dropSchema("SQOOP");
provider.stop();
}
protected MConnector getConnector(String name, String className, String version, boolean from, boolean to) {
return new MConnector(name, className, version, getLinkConfig(),
from ? getFromConfig() : null, to ? getToConfig() : null);
}
protected MDriver getDriver() {
return new MDriver(getDriverConfig(), DriverBean.CURRENT_DRIVER_VERSION);
}
protected MLink getLink(String name, MConnector connector) {
MLink link = new MLink(connector.getPersistenceId(), connector.getLinkConfig());
link.setName(name);
fillLink(link);
return link;
}
protected MJob getJob(String name, MConnector connectorA, MConnector connectorB, MLink linkA, MLink linkB) {
MDriver driver = handler.findDriver(MDriver.DRIVER_NAME, provider.getConnection());
MJob job = new MJob(
connectorA.getPersistenceId(),
connectorB.getPersistenceId(),
linkA.getPersistenceId(),
linkB.getPersistenceId(),
connectorA.getFromConfig(),
connectorB.getToConfig(),
driver.getDriverConfig());
job.setName(name);
fillJob(job);
return job;
}
protected MSubmission getSubmission(MJob job, SubmissionStatus submissionStatus) {
MSubmission submission = new MSubmission(job.getPersistenceId(), new Date(), submissionStatus);
fillSubmission(submission);
return submission;
}
protected void fillLink(MLink link) {
List<MConfig> configs = link.getConnectorLinkConfig().getConfigs();
((MStringInput) configs.get(0).getInputs().get(0)).setValue("Value1");
((MStringInput) configs.get(1).getInputs().get(0)).setValue("Value2");
}
protected void fillJob(MJob job) {
List<MConfig> configs = job.getJobConfig(Direction.FROM).getConfigs();
((MStringInput) configs.get(0).getInputs().get(0)).setValue("Value1");
((MStringInput) configs.get(1).getInputs().get(0)).setValue("Value2");
configs = job.getJobConfig(Direction.TO).getConfigs();
((MStringInput) configs.get(0).getInputs().get(0)).setValue("Value1");
((MStringInput) configs.get(1).getInputs().get(0)).setValue("Value2");
configs = job.getDriverConfig().getConfigs();
((MStringInput) configs.get(0).getInputs().get(0)).setValue("Value1");
((MStringInput) configs.get(1).getInputs().get(0)).setValue("Value2");
}
protected void fillSubmission(MSubmission submission) {
Counters counters = new Counters();
counters.addCounterGroup(new CounterGroup("test-1"));
counters.addCounterGroup(new CounterGroup("test-2"));
submission.setCounters(counters);
}
protected MLinkConfig getLinkConfig() {
return new MLinkConfig(getConfigs("l1", "l2"));
}
protected MFromConfig getFromConfig() {
return new MFromConfig(getConfigs("from1", "from2"));
}
protected MToConfig getToConfig() {
return new MToConfig(getConfigs("to1", "to2"));
}
protected MDriverConfig getDriverConfig() {
return new MDriverConfig(getConfigs("d1", "d2"));
}
protected List<MConfig> getConfigs(String configName1, String configName2) {
List<MConfig> configs = new LinkedList<MConfig>();
List<MInput<?>> inputs = new LinkedList<MInput<?>>();
MInput input = new MStringInput("I1", false, (short) 30);
inputs.add(input);
input = new MMapInput("I2", false);
inputs.add(input);
configs.add(new MConfig(configName1, inputs));
inputs = new LinkedList<MInput<?>>();
input = new MStringInput("I3", false, (short) 30);
inputs.add(input);
input = new MMapInput("I4", false);
inputs.add(input);
configs.add(new MConfig(configName2, inputs));
return configs;
}
}

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.repository.postgresql;
package org.apache.sqoop.integration.repository.postgresql;
import org.apache.sqoop.common.test.db.DatabaseProvider;
import org.apache.sqoop.repository.common.CommonRepoUtils;

View File

@ -0,0 +1,152 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.integration.repository.postgresql;
import org.apache.sqoop.model.MConnector;
import org.testng.annotations.Test;
import java.util.List;
import static org.testng.Assert.assertEquals;
import static org.testng.AssertJUnit.assertNotNull;
import static org.testng.AssertJUnit.assertNull;
/**
* Test connector methods on Derby repository.
*/
@Test(groups = "postgresql")
public class TestConnectorHandling extends PostgresqlTestCase {
@Test
public void testFindConnector() throws Exception {
// On empty repository, no connectors should be there
assertNull(handler.findConnector("A", provider.getConnection()));
// Register a single connector
handler.registerConnector(
getConnector("A", "org.apache.sqoop.test.A", "1.0-test", true, true),
provider.getConnection());
// Retrieve it and compare with original
MConnector connector = handler.findConnector("A", provider.getConnection());
assertNotNull(connector);
assertEquals(
getConnector("A", "org.apache.sqoop.test.A", "1.0-test", true, true),
connector);
}
@Test
public void testFindAllConnectors() throws Exception {
// No connectors in an empty repository, we expect an empty list
assertEquals(handler.findConnectors(provider.getConnection()).size(), 0);
// Register connectors
handler.registerConnector(
getConnector("A", "org.apache.sqoop.test.A", "1.0-test", true, true),
provider.getConnection());
handler.registerConnector(
getConnector("B", "org.apache.sqoop.test.B", "1.0-test", true, true),
provider.getConnection());
// loadConfigurables();
// Retrieve connectors
List<MConnector> connectors = handler.findConnectors(provider.getConnection());
assertNotNull(connectors);
assertEquals(connectors.size(), 2);
assertEquals(connectors.get(0).getUniqueName(), "A");
assertEquals(connectors.get(1).getUniqueName(), "B");
}
@Test
public void testRegisterConnector() throws Exception {
MConnector connector = getConnector("A", "org.apache.sqoop.test.A", "1.0-test", true, true);
handler.registerConnector(connector, provider.getConnection());
// Connector should get persistence ID
assertEquals(1, connector.getPersistenceId());
// Now check content in corresponding tables
assertEquals(provider.rowCount("SQOOP", "SQ_CONFIGURABLE"), 1);
assertEquals(provider.rowCount("SQOOP", "SQ_CONFIG"), 6);
assertEquals(provider.rowCount("SQOOP", "SQ_INPUT"), 12);
// Registered connector should be easily recovered back
MConnector retrieved = handler.findConnector("A", provider.getConnection());
assertNotNull(retrieved);
assertEquals(connector, retrieved);
}
@Test
public void testFromDirection() throws Exception {
MConnector connector = getConnector("A", "org.apache.sqoop.test.A", "1.0-test", true, false);
handler.registerConnector(connector, provider.getConnection());
// Connector should get persistence ID
assertEquals(1, connector.getPersistenceId());
// Now check content in corresponding tables
assertEquals(provider.rowCount("SQOOP", "SQ_CONFIGURABLE"), 1);
assertEquals(provider.rowCount("SQOOP", "SQ_CONFIG"), 4);
assertEquals(provider.rowCount("SQOOP", "SQ_INPUT"), 8);
// Registered connector should be easily recovered back
MConnector retrieved = handler.findConnector("A", provider.getConnection());
assertNotNull(retrieved);
assertEquals(connector, retrieved);
}
@Test
public void testToDirection() throws Exception {
MConnector connector = getConnector("A", "org.apache.sqoop.test.A", "1.0-test", false, true);
handler.registerConnector(connector, provider.getConnection());
// Connector should get persistence ID
assertEquals(1, connector.getPersistenceId());
// Now check content in corresponding tables
assertEquals(provider.rowCount("SQOOP", "SQ_CONFIGURABLE"), 1);
assertEquals(provider.rowCount("SQOOP", "SQ_CONFIG"), 4);
assertEquals(provider.rowCount("SQOOP", "SQ_INPUT"), 8);
// Registered connector should be easily recovered back
MConnector retrieved = handler.findConnector("A", provider.getConnection());
assertNotNull(retrieved);
assertEquals(connector, retrieved);
}
@Test
public void testNeitherDirection() throws Exception {
MConnector connector = getConnector("A", "org.apache.sqoop.test.A", "1.0-test", false, false);
handler.registerConnector(connector, provider.getConnection());
// Connector should get persistence ID
assertEquals(1, connector.getPersistenceId());
// Now check content in corresponding tables
assertEquals(provider.rowCount("SQOOP", "SQ_CONFIGURABLE"), 1);
assertEquals(provider.rowCount("SQOOP", "SQ_CONFIG"), 2);
assertEquals(provider.rowCount("SQOOP", "SQ_INPUT"), 4);
// Registered connector should be easily recovered back
MConnector retrieved = handler.findConnector("A", provider.getConnection());
assertNotNull(retrieved);
assertEquals(connector, retrieved);
}
}

View File

@ -0,0 +1,85 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.integration.repository.postgresql;
import org.apache.sqoop.model.MDriver;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.AssertJUnit.assertNotNull;
/**
* Test driver methods on Derby repository.
*/
@Test(groups = "postgresql")
public class TestDriverHandling extends PostgresqlTestCase {
private static final Object CURRENT_DRIVER_VERSION = "1";
@Test
public void testFindDriver() throws Exception {
// On empty repository, no driverConfig should be there
assertNull(handler.findDriver(MDriver.DRIVER_NAME, provider.getConnection()));
// Register driver
handler.registerDriver(getDriver(), provider.getConnection());
// Retrieve it
MDriver driver = handler.findDriver(MDriver.DRIVER_NAME, provider.getConnection());
assertNotNull(driver);
assertNotNull(driver.getDriverConfig());
assertEquals("1", driver.getVersion());
assertEquals("1", driver.getVersion());
// Compare with original
assertEquals(getDriver().getDriverConfig(), driver.getDriverConfig());
}
@Test
public void testRegisterDriver() throws Exception {
MDriver driver = getDriver();
handler.registerDriver(driver, provider.getConnection());
// Connector should get persistence ID
assertEquals(1, driver.getPersistenceId());
// Now check content in corresponding tables
assertEquals(provider.rowCount("SQOOP", "SQ_CONFIGURABLE"), 1);
assertEquals(provider.rowCount("SQOOP", "SQ_CONFIG"), 2);
assertEquals(provider.rowCount("SQOOP", "SQ_INPUT"), 4);
// Registered driver and config should be easily recovered back
MDriver retrieved = handler.findDriver(MDriver.DRIVER_NAME, provider.getConnection());
assertNotNull(retrieved);
assertEquals(driver, retrieved);
assertEquals(driver.getVersion(), retrieved.getVersion());
}
@Test
public void testDriverVersionUpgrade() throws Exception {
MDriver driver = getDriver();
handler.registerDriver(driver, provider.getConnection());
String registeredDriverVersion = handler.findDriver(MDriver.DRIVER_NAME, provider.getConnection()).getVersion();
assertEquals(CURRENT_DRIVER_VERSION, registeredDriverVersion);
driver.setVersion("2");
handler.upgradeDriverAndConfigs(driver, provider.getConnection());
assertEquals("2", driver.getVersion());
}
}

View File

@ -0,0 +1,289 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.integration.repository.postgresql;
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.model.MConfig;
import org.apache.sqoop.model.MConnector;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MLink;
import org.apache.sqoop.model.MMapInput;
import org.apache.sqoop.model.MStringInput;
import org.apache.sqoop.model.MSubmission;
import org.apache.sqoop.submission.SubmissionStatus;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static junit.framework.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.AssertJUnit.assertFalse;
import static org.testng.AssertJUnit.assertNotNull;
import static org.testng.AssertJUnit.assertTrue;
/**
* Test job methods on Derby repository.
*/
@Test(groups = "postgresql")
public class TestJobHandling extends PostgresqlTestCase {
public static final String CONNECTOR_A_NAME = "A";
public static final String CONNECTOR_A_CLASSNAME = "org.apache.sqoop.test.A";
public static final String CONNECTOR_A_VERSION = "1.0-test";
public static final String CONNECTOR_B_NAME = "B";
public static final String CONNECTOR_B_CLASSNAME = "org.apache.sqoop.test.B";
public static final String CONNECTOR_B_VERSION = "1.0-test";
public static final String LINK_A_NAME = "Link-A";
public static final String LINK_B_NAME = "Link-B";
public static final String JOB_A_NAME = "Job-A";
public static final String JOB_B_NAME = "Job-B";
@BeforeMethod(alwaysRun = true)
public void setUp() throws Exception {
super.setUp();
handler.registerDriver(getDriver(), provider.getConnection());
MConnector connectorA = getConnector(CONNECTOR_A_NAME, CONNECTOR_A_CLASSNAME, CONNECTOR_A_VERSION, true, true);
MConnector connectorB = getConnector(CONNECTOR_B_NAME, CONNECTOR_B_CLASSNAME, CONNECTOR_B_VERSION, true, true);
handler.registerConnector(connectorA, provider.getConnection());
handler.registerConnector(connectorB, provider.getConnection());
MLink linkA = getLink(LINK_A_NAME, connectorA);
MLink linkB = getLink(LINK_B_NAME, connectorB);
handler.createLink(linkA, provider.getConnection());
handler.createLink(linkB, provider.getConnection());
handler.createJob(getJob(JOB_A_NAME, connectorA, connectorB, linkA, linkB), provider.getConnection());
handler.createJob(getJob(JOB_B_NAME, connectorB, connectorA, linkB, linkA), provider.getConnection());
}
@Test(expectedExceptions = SqoopException.class)
public void testFindJobFail() throws Exception {
for (MJob job : handler.findJobs(provider.getConnection())) {
handler.deleteJob(job.getPersistenceId(), provider.getConnection());
}
// Let's try to find non existing job
handler.findJob(1, provider.getConnection());
}
@Test
public void testFindJobSuccess() throws Exception {
MJob firstJob = handler.findJob(1, provider.getConnection());
assertNotNull(firstJob);
assertEquals(1, firstJob.getPersistenceId());
assertEquals(JOB_A_NAME, firstJob.getName());
List<MConfig> configs;
configs = firstJob.getJobConfig(Direction.FROM).getConfigs();
assertEquals(2, configs.size());
assertEquals("Value1", configs.get(0).getInputs().get(0).getValue());
assertNull(configs.get(0).getInputs().get(1).getValue());
assertEquals("Value2", configs.get(1).getInputs().get(0).getValue());
assertNull(configs.get(1).getInputs().get(1).getValue());
configs = firstJob.getJobConfig(Direction.TO).getConfigs();
assertEquals(2, configs.size());
assertEquals("Value1", configs.get(0).getInputs().get(0).getValue());
assertNull(configs.get(0).getInputs().get(1).getValue());
assertEquals("Value2", configs.get(1).getInputs().get(0).getValue());
assertNull(configs.get(1).getInputs().get(1).getValue());
configs = firstJob.getDriverConfig().getConfigs();
assertEquals(2, configs.size());
assertEquals("Value1", configs.get(0).getInputs().get(0).getValue());
assertNull(configs.get(0).getInputs().get(1).getValue());
assertEquals("Value2", configs.get(1).getInputs().get(0).getValue());
assertNull(configs.get(1).getInputs().get(1).getValue());
}
@Test
public void testFindJobs() throws Exception {
List<MJob> list;
list = handler.findJobs(provider.getConnection());
assertEquals(2, list.size());
assertEquals(JOB_A_NAME, list.get(0).getName());
assertEquals(JOB_B_NAME, list.get(1).getName());
// Delete jobs
for (MJob job : handler.findJobs(provider.getConnection())) {
handler.deleteJob(job.getPersistenceId(), provider.getConnection());
}
// Load all two links on loaded repository
list = handler.findJobs(provider.getConnection());
assertEquals(0, list.size());
}
@Test
public void testFindJobsByConnector() throws Exception {
List<MJob> list = handler.findJobsForConnector(
handler.findConnector("A", provider.getConnection()).getPersistenceId(),
provider.getConnection());
assertEquals(2, list.size());
assertEquals(JOB_A_NAME, list.get(0).getName());
assertEquals(JOB_B_NAME, list.get(1).getName());
}
@Test
public void testFindJobsForNonExistingConnector() throws Exception {
List<MJob> list = handler.findJobsForConnector(11, provider.getConnection());
assertEquals(0, list.size());
}
@Test
public void testExistsJob() throws Exception {
assertTrue(handler.existsJob(1, provider.getConnection()));
assertTrue(handler.existsJob(2, provider.getConnection()));
assertFalse(handler.existsJob(3, provider.getConnection()));
// Delete jobs
for (MJob job : handler.findJobs(provider.getConnection())) {
handler.deleteJob(job.getPersistenceId(), provider.getConnection());
}
// There shouldn't be anything on empty repository
assertFalse(handler.existsJob(1, provider.getConnection()));
assertFalse(handler.existsJob(2, provider.getConnection()));
assertFalse(handler.existsJob(3, provider.getConnection()));
}
@Test
public void testInUseJob() throws Exception {
MSubmission submission = getSubmission(handler.findJob(1, provider.getConnection()), SubmissionStatus.RUNNING);
handler.createSubmission(submission, provider.getConnection());
assertTrue(handler.inUseJob(1, provider.getConnection()));
assertFalse(handler.inUseJob(2, provider.getConnection()));
assertFalse(handler.inUseJob(3, provider.getConnection()));
}
@Test
public void testCreateJob() throws Exception {
Assert.assertEquals(provider.rowCount("SQOOP", "SQ_JOB"), 2);
Assert.assertEquals(provider.rowCount("SQOOP", "SQ_JOB_INPUT"), 12);
MJob retrieved = handler.findJob(1, provider.getConnection());
assertEquals(1, retrieved.getPersistenceId());
List<MConfig> configs;
configs = retrieved.getJobConfig(Direction.FROM).getConfigs();
assertEquals("Value1", configs.get(0).getInputs().get(0).getValue());
assertNull(configs.get(0).getInputs().get(1).getValue());
configs = retrieved.getJobConfig(Direction.TO).getConfigs();
assertEquals("Value2", configs.get(1).getInputs().get(0).getValue());
assertNull(configs.get(0).getInputs().get(1).getValue());
configs = retrieved.getDriverConfig().getConfigs();
assertEquals("Value1", configs.get(0).getInputs().get(0).getValue());
assertNull(configs.get(0).getInputs().get(1).getValue());
assertEquals("Value2", configs.get(1).getInputs().get(0).getValue());
assertNull(configs.get(1).getInputs().get(1).getValue());
}
@Test(expectedExceptions = SqoopException.class)
public void testCreateDuplicateJob() throws Exception {
// Duplicate jobs
MJob job = handler.findJob(JOB_A_NAME, provider.getConnection());
job.setPersistenceId(MJob.PERSISTANCE_ID_DEFAULT);
handler.createJob(job, provider.getConnection());
}
@Test
public void testUpdateJob() throws Exception {
Assert.assertEquals(provider.rowCount("SQOOP", "SQ_JOB"), 2);
Assert.assertEquals(provider.rowCount("SQOOP", "SQ_JOB_INPUT"), 12);
MJob job = handler.findJob(1, provider.getConnection());
List<MConfig> configs;
configs = job.getJobConfig(Direction.FROM).getConfigs();
((MStringInput)configs.get(0).getInputs().get(0)).setValue("Updated");
((MMapInput)configs.get(0).getInputs().get(1)).setValue(null);
configs = job.getJobConfig(Direction.TO).getConfigs();
((MStringInput)configs.get(0).getInputs().get(0)).setValue("Updated");
((MMapInput)configs.get(0).getInputs().get(1)).setValue(null);
configs = job.getDriverConfig().getConfigs();
((MStringInput)configs.get(0).getInputs().get(0)).setValue("Updated");
((MMapInput)configs.get(0).getInputs().get(1)).setValue(new HashMap<String, String>()); // inject new map value
((MStringInput)configs.get(1).getInputs().get(0)).setValue("Updated");
((MMapInput)configs.get(1).getInputs().get(1)).setValue(new HashMap<String, String>()); // inject new map value
job.setName("name");
handler.updateJob(job, provider.getConnection());
assertEquals(1, job.getPersistenceId());
Assert.assertEquals(provider.rowCount("SQOOP", "SQ_JOB"), 2);
Assert.assertEquals(provider.rowCount("SQOOP", "SQ_JOB_INPUT"), 14);
MJob retrieved = handler.findJob(1, provider.getConnection());
assertEquals("name", retrieved.getName());
configs = job.getJobConfig(Direction.FROM).getConfigs();
assertEquals(2, configs.size());
assertEquals("Updated", configs.get(0).getInputs().get(0).getValue());
assertNull(configs.get(0).getInputs().get(1).getValue());
configs = job.getJobConfig(Direction.TO).getConfigs();
assertEquals(2, configs.size());
assertEquals("Updated", configs.get(0).getInputs().get(0).getValue());
assertNull(configs.get(0).getInputs().get(1).getValue());
configs = retrieved.getDriverConfig().getConfigs();
assertEquals(2, configs.size());
assertEquals("Updated", configs.get(0).getInputs().get(0).getValue());
assertNotNull(configs.get(0).getInputs().get(1).getValue());
assertEquals(((Map)configs.get(0).getInputs().get(1).getValue()).size(), 0);
}
@Test
public void testEnableAndDisableJob() throws Exception {
// disable job 1
handler.enableJob(1, false, provider.getConnection());
MJob retrieved = handler.findJob(1, provider.getConnection());
assertNotNull(retrieved);
assertEquals(false, retrieved.getEnabled());
// enable job 1
handler.enableJob(1, true, provider.getConnection());
retrieved = handler.findJob(1, provider.getConnection());
assertNotNull(retrieved);
assertEquals(true, retrieved.getEnabled());
}
@Test
public void testDeleteJob() throws Exception {
handler.deleteJob(1, provider.getConnection());
Assert.assertEquals(provider.rowCount("SQOOP", "SQ_JOB"), 1);
Assert.assertEquals(provider.rowCount("SQOOP", "SQ_JOB_INPUT"), 6);
handler.deleteJob(2, provider.getConnection());
Assert.assertEquals(provider.rowCount("SQOOP", "SQ_JOB"), 0);
Assert.assertEquals(provider.rowCount("SQOOP", "SQ_JOB_INPUT"), 0);
}
}

View File

@ -0,0 +1,284 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.integration.repository.postgresql;
import java.util.List;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.model.MConfig;
import org.apache.sqoop.model.MConnector;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MLink;
import org.apache.sqoop.model.MMapInput;
import org.apache.sqoop.model.MStringInput;
import org.apache.sqoop.model.MSubmission;
import org.apache.sqoop.submission.SubmissionStatus;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import static junit.framework.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.AssertJUnit.assertFalse;
import static org.testng.AssertJUnit.assertNotNull;
import static org.testng.AssertJUnit.assertTrue;
/**
* Test link methods on Derby repository.
*/
@Test(groups = "postgresql")
public class TestLinkHandling extends PostgresqlTestCase {
public static final String CONNECTOR_A_NAME = "A";
public static final String CONNECTOR_A_CLASSNAME = "org.apache.sqoop.test.A";
public static final String CONNECTOR_A_VERSION = "1.0-test";
public static final String CONNECTOR_B_NAME = "B";
public static final String CONNECTOR_B_CLASSNAME = "org.apache.sqoop.test.B";
public static final String CONNECTOR_B_VERSION = "1.0-test";
public static final String LINK_A_NAME = "Link-A";
public static final String LINK_B_NAME = "Link-B";
@BeforeMethod(alwaysRun = true)
public void setUp() throws Exception {
super.setUp();
handler.registerDriver(getDriver(), provider.getConnection());
MConnector connectorA = getConnector(CONNECTOR_A_NAME, CONNECTOR_A_CLASSNAME, CONNECTOR_A_VERSION, true, true);
MConnector connectorB = getConnector(CONNECTOR_B_NAME, CONNECTOR_B_CLASSNAME, CONNECTOR_B_VERSION, true, true);
handler.registerConnector(connectorA, provider.getConnection());
handler.registerConnector(connectorB, provider.getConnection());
MLink linkA = getLink(LINK_A_NAME, connectorA);
MLink linkB = getLink(LINK_B_NAME, connectorB);
handler.createLink(linkA, provider.getConnection());
handler.createLink(linkB, provider.getConnection());
}
@Test(expectedExceptions = SqoopException.class)
public void testFindLinkFail() {
// Delete links
for (MLink link : handler.findLinks(provider.getConnection())) {
handler.deleteLink(link.getPersistenceId(), provider.getConnection());
}
handler.findLink(1, provider.getConnection());
}
@Test
public void testFindLinkSuccess() throws Exception {
MLink linkA = handler.findLink(1, provider.getConnection());
assertNotNull(linkA);
assertEquals(1, linkA.getPersistenceId());
assertEquals(LINK_A_NAME, linkA.getName());
// Check connector link config
List<MConfig> configs = linkA.getConnectorLinkConfig().getConfigs();
assertEquals("Value1", configs.get(0).getInputs().get(0).getValue());
assertNull(configs.get(0).getInputs().get(1).getValue());
assertEquals("Value2", configs.get(1).getInputs().get(0).getValue());
assertNull(configs.get(1).getInputs().get(1).getValue());
}
@Test
public void testFindLinkByName() throws Exception {
// Load non-existing
assertNull(handler.findLink("non-existing", provider.getConnection()));
MLink linkA = handler.findLink(LINK_A_NAME, provider.getConnection());
assertNotNull(linkA);
assertEquals(1, linkA.getPersistenceId());
assertEquals(LINK_A_NAME, linkA.getName());
// Check connector link config
List<MConfig> configs = linkA.getConnectorLinkConfig().getConfigs();
assertEquals("Value1", configs.get(0).getInputs().get(0).getValue());
assertNull(configs.get(0).getInputs().get(1).getValue());
assertEquals("Value2", configs.get(1).getInputs().get(0).getValue());
assertNull(configs.get(1).getInputs().get(1).getValue());
}
@Test
public void testFindLinks() throws Exception {
List<MLink> list;
// Load all two links on loaded repository
list = handler.findLinks(provider.getConnection());
assertEquals(2, list.size());
assertEquals(LINK_A_NAME, list.get(0).getName());
assertEquals(LINK_B_NAME, list.get(1).getName());
// Delete links
for (MLink link : handler.findLinks(provider.getConnection())) {
handler.deleteLink(link.getPersistenceId(), provider.getConnection());
}
// Load empty list on empty repository
list = handler.findLinks(provider.getConnection());
assertEquals(0, list.size());
}
@Test
public void testFindLinksByConnector() throws Exception {
List<MLink> list;
Long connectorId = handler.findConnector("A", provider.getConnection()).getPersistenceId();
// Load all two links on loaded repository
list = handler.findLinksForConnector(connectorId, provider.getConnection());
assertEquals(1, list.size());
assertEquals(LINK_A_NAME, list.get(0).getName());
// Delete links
for (MLink link : handler.findLinks(provider.getConnection())) {
handler.deleteLink(link.getPersistenceId(), provider.getConnection());
}
// Load empty list on empty repository
list = handler.findLinksForConnector(connectorId, provider.getConnection());
assertEquals(0, list.size());
}
@Test
public void testFindLinksByNonExistingConnector() throws Exception {
List<MLink> list = handler.findLinksForConnector(11, provider.getConnection());
assertEquals(0, list.size());
}
@Test
public void testExistsLink() throws Exception {
assertTrue(handler.existsLink(1, provider.getConnection()));
assertTrue(handler.existsLink(2, provider.getConnection()));
assertFalse(handler.existsLink(3, provider.getConnection()));
// Delete links
for (MLink link : handler.findLinks(provider.getConnection())) {
handler.deleteLink(link.getPersistenceId(), provider.getConnection());
}
assertFalse(handler.existsLink(1, provider.getConnection()));
assertFalse(handler.existsLink(2, provider.getConnection()));
assertFalse(handler.existsLink(3, provider.getConnection()));
}
@Test
public void testCreateLink() throws Exception {
List<MConfig> configs;
MLink retrieved = handler.findLink(1, provider.getConnection());
assertEquals(1, retrieved.getPersistenceId());
configs = retrieved.getConnectorLinkConfig().getConfigs();
assertEquals("Value1", configs.get(0).getInputs().get(0).getValue());
assertNull(configs.get(0).getInputs().get(1).getValue());
assertEquals("Value2", configs.get(1).getInputs().get(0).getValue());
assertNull(configs.get(1).getInputs().get(1).getValue());
retrieved = handler.findLink(2, provider.getConnection());
assertEquals(2, retrieved.getPersistenceId());
configs = retrieved.getConnectorLinkConfig().getConfigs();
assertEquals("Value1", configs.get(0).getInputs().get(0).getValue());
assertNull(configs.get(0).getInputs().get(1).getValue());
assertEquals("Value2", configs.get(1).getInputs().get(0).getValue());
assertNull(configs.get(1).getInputs().get(1).getValue());
Assert.assertEquals(provider.rowCount("SQOOP", "SQ_LINK"), 2);
Assert.assertEquals(provider.rowCount("SQOOP", "SQ_LINK_INPUT"), 4);
}
@Test(expectedExceptions = SqoopException.class)
public void testCreateDuplicateLink() throws Exception {
MLink link = handler.findLink(LINK_A_NAME, provider.getConnection());
link.setPersistenceId(MLink.PERSISTANCE_ID_DEFAULT);
handler.createLink(link, provider.getConnection());
}
@Test
public void testInUseLink() throws Exception {
assertFalse(handler.inUseLink(1, provider.getConnection()));
// Create job and submission and make that job in use to make sure link is in use.
MLink linkA = handler.findLink(LINK_A_NAME, provider.getConnection());
MJob job = getJob("Job-A",
handler.findConnector("A", provider.getConnection()),
handler.findConnector("B", provider.getConnection()),
linkA,
handler.findLink(LINK_B_NAME, provider.getConnection()));
handler.createJob(job, provider.getConnection());
MSubmission submission = getSubmission(job, SubmissionStatus.RUNNING);
handler.createSubmission(submission, provider.getConnection());
assertTrue(handler.inUseLink(linkA.getPersistenceId(), provider.getConnection()));
}
@Test
public void testUpdateLink() throws Exception {
MLink link = handler.findLink(1, provider.getConnection());
List<MConfig> configs;
configs = link.getConnectorLinkConfig().getConfigs();
((MStringInput) configs.get(0).getInputs().get(0)).setValue("Updated");
((MMapInput) configs.get(0).getInputs().get(1)).setValue(null);
((MStringInput) configs.get(1).getInputs().get(0)).setValue("Updated");
((MMapInput) configs.get(1).getInputs().get(1)).setValue(null);
link.setName("name");
handler.updateLink(link, provider.getConnection());
assertEquals(1, link.getPersistenceId());
Assert.assertEquals(provider.rowCount("SQOOP", "SQ_LINK"), 2);
Assert.assertEquals(provider.rowCount("SQOOP", "SQ_LINK_INPUT"), 4);
MLink retrieved = handler.findLink(1, provider.getConnection());
assertEquals("name", link.getName());
configs = retrieved.getConnectorLinkConfig().getConfigs();
assertEquals("Updated", configs.get(0).getInputs().get(0).getValue());
assertNull(configs.get(0).getInputs().get(1).getValue());
assertEquals("Updated", configs.get(1).getInputs().get(0).getValue());
assertNull(configs.get(1).getInputs().get(1).getValue());
}
@Test
public void testEnableAndDisableLink() throws Exception {
// disable link 1
handler.enableLink(1, false, provider.getConnection());
MLink retrieved = handler.findLink(1, provider.getConnection());
assertNotNull(retrieved);
assertEquals(false, retrieved.getEnabled());
// enable link 1
handler.enableLink(1, true, provider.getConnection());
retrieved = handler.findLink(1, provider.getConnection());
assertNotNull(retrieved);
assertEquals(true, retrieved.getEnabled());
}
@Test
public void testDeleteLink() throws Exception {
handler.deleteLink(1, provider.getConnection());
Assert.assertEquals(provider.rowCount("SQOOP", "SQ_LINK"), 1);
Assert.assertEquals(provider.rowCount("SQOOP", "SQ_LINK_INPUT"), 2);
handler.deleteLink(2, provider.getConnection());
Assert.assertEquals(provider.rowCount("SQOOP", "SQ_LINK"), 0);
Assert.assertEquals(provider.rowCount("SQOOP", "SQ_LINK_INPUT"), 0);
}
}

View File

@ -15,13 +15,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.repository.postgresql;
package org.apache.sqoop.integration.repository.postgresql;
import org.testng.annotations.Test;
/**
* Test connector methods on PostgreSQL repository.
*/
@Test(groups = "postgresql")
public class TestStructure extends PostgresqlTestCase {
@Test

View File

@ -0,0 +1,353 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.integration.repository.postgresql;
import org.apache.sqoop.model.MConnector;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MLink;
import org.apache.sqoop.model.MSubmission;
import org.apache.sqoop.submission.SubmissionStatus;
import org.apache.sqoop.submission.counter.Counter;
import org.apache.sqoop.submission.counter.CounterGroup;
import org.apache.sqoop.submission.counter.Counters;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import static junit.framework.Assert.assertEquals;
import static org.testng.AssertJUnit.assertFalse;
import static org.testng.AssertJUnit.assertNotNull;
import static org.testng.AssertJUnit.assertTrue;
/**
*
*/
@Test(groups = "postgresql")
public class TestSubmissionHandling extends PostgresqlTestCase {
public static final String CONNECTOR_A_NAME = "A";
public static final String CONNECTOR_A_CLASSNAME = "org.apache.sqoop.test.A";
public static final String CONNECTOR_A_VERSION = "1.0-test";
public static final String CONNECTOR_B_NAME = "B";
public static final String CONNECTOR_B_CLASSNAME = "org.apache.sqoop.test.B";
public static final String CONNECTOR_B_VERSION = "1.0-test";
public static final String LINK_A_NAME = "Link-A";
public static final String LINK_B_NAME = "Link-B";
public static final String JOB_A_NAME = "Job-A";
public static final String JOB_B_NAME = "Job-B";
@BeforeMethod(alwaysRun = true)
public void setUp() throws Exception {
super.setUp();
handler.registerDriver(getDriver(), provider.getConnection());
MConnector connectorA = getConnector(CONNECTOR_A_NAME, CONNECTOR_A_CLASSNAME, CONNECTOR_A_VERSION, true, true);
MConnector connectorB = getConnector(CONNECTOR_B_NAME, CONNECTOR_B_CLASSNAME, CONNECTOR_B_VERSION, true, true);
handler.registerConnector(connectorA, provider.getConnection());
handler.registerConnector(connectorB, provider.getConnection());
MLink linkA = getLink(LINK_A_NAME, connectorA);
MLink linkB = getLink(LINK_B_NAME, connectorB);
handler.createLink(linkA, provider.getConnection());
handler.createLink(linkB, provider.getConnection());
MJob jobA = getJob(JOB_A_NAME, connectorA, connectorB, linkA, linkB);
MJob jobB = getJob(JOB_B_NAME, connectorB, connectorA, linkB, linkA);
handler.createJob(jobA, provider.getConnection());
handler.createJob(jobB, provider.getConnection());
}
private void loadSubmissions() throws Exception {
MJob jobA = handler.findJob(JOB_A_NAME, provider.getConnection());
MJob jobB = handler.findJob(JOB_B_NAME, provider.getConnection());
MSubmission submissionA = getSubmission(jobA, SubmissionStatus.RUNNING);
submissionA.getCounters().getCounterGroup("test-1").addCounter(new Counter("counter-1"));
submissionA.getCounters().getCounterGroup("test-1").addCounter(new Counter("counter-2"));
submissionA.getCounters().getCounterGroup("test-1").getCounter("counter-1").setValue(300);
MSubmission submissionB = getSubmission(jobA, SubmissionStatus.SUCCEEDED);
MSubmission submissionC = getSubmission(jobB, SubmissionStatus.FAILED);
MSubmission submissionD = getSubmission(jobB, SubmissionStatus.UNKNOWN);
handler.createSubmission(submissionA, provider.getConnection());
handler.createSubmission(submissionB, provider.getConnection());
handler.createSubmission(submissionC, provider.getConnection());
handler.createSubmission(submissionD, provider.getConnection());
}
@Test
public void testFindSubmissionsUnfinished() throws Exception {
List<MSubmission> submissions;
submissions = handler.findUnfinishedSubmissions(provider.getConnection());
assertNotNull(submissions);
assertEquals(0, submissions.size());
loadSubmissions();
submissions = handler.findUnfinishedSubmissions(provider.getConnection());
assertNotNull(submissions);
assertEquals(1, submissions.size());
}
@Test
public void testExistsSubmission() throws Exception {
// There shouldn't be anything on empty repository
assertFalse(handler.existsSubmission(1, provider.getConnection()));
assertFalse(handler.existsSubmission(2, provider.getConnection()));
assertFalse(handler.existsSubmission(3, provider.getConnection()));
assertFalse(handler.existsSubmission(4, provider.getConnection()));
assertFalse(handler.existsSubmission(5, provider.getConnection()));
loadSubmissions();
assertTrue(handler.existsSubmission(1, provider.getConnection()));
assertTrue(handler.existsSubmission(2, provider.getConnection()));
assertTrue(handler.existsSubmission(3, provider.getConnection()));
assertTrue(handler.existsSubmission(4, provider.getConnection()));
assertFalse(handler.existsSubmission(5, provider.getConnection()));
}
@Test
public void testCreateSubmission() throws Exception {
Date creationDate = new Date();
Date updateDate = new Date();
CounterGroup firstGroup = new CounterGroup("ga");
CounterGroup secondGroup = new CounterGroup("gb");
firstGroup.addCounter(new Counter("ca", 100));
firstGroup.addCounter(new Counter("cb", 200));
secondGroup.addCounter(new Counter("ca", 300));
secondGroup.addCounter(new Counter("cd", 400));
Counters counters = new Counters();
counters.addCounterGroup(firstGroup);
counters.addCounterGroup(secondGroup);
MSubmission submission = new MSubmission();
submission.setJobId(1);
submission.setStatus(SubmissionStatus.RUNNING);
submission.setCreationDate(creationDate);
submission.setLastUpdateDate(updateDate);
submission.setExternalJobId("job-x");
submission.setExternalLink("http://somewhere");
submission.getError().setErrorSummary("RuntimeException");
submission.getError().setErrorDetails("Yeah it happens");
submission.setCounters(counters);
handler.createSubmission(submission, provider.getConnection());
assertEquals(1, submission.getPersistenceId());
Assert.assertEquals(provider.rowCount("SQOOP", "SQ_SUBMISSION"), 1);
List<MSubmission> submissions = handler.findUnfinishedSubmissions(provider.getConnection());
assertNotNull(submissions);
assertEquals(1, submissions.size());
submission = submissions.get(0);
assertEquals(1, submission.getJobId());
assertEquals(SubmissionStatus.RUNNING, submission.getStatus());
assertEquals(creationDate, submission.getCreationDate());
assertEquals(updateDate, submission.getLastUpdateDate());
assertEquals("job-x", submission.getExternalJobId());
assertEquals("http://somewhere", submission.getExternalLink());
assertEquals("RuntimeException", submission.getError().getErrorSummary());
assertEquals("Yeah it happens", submission.getError().getErrorDetails());
CounterGroup group;
Counter counter;
Counters retrievedCounters = submission.getCounters();
assertNotNull(retrievedCounters);
group = counters.getCounterGroup("ga");
assertNotNull(group);
counter = group.getCounter("ca");
assertNotNull(counter);
assertEquals(100, counter.getValue());
counter = group.getCounter("cb");
assertNotNull(counter);
assertEquals(200, counter.getValue());
group = counters.getCounterGroup("gb");
assertNotNull(group);
counter = group.getCounter("ca");
assertNotNull(counter);
assertEquals(300, counter.getValue());
counter = group.getCounter("cd");
assertNotNull(counter);
assertEquals(400, counter.getValue());
// Let's create second (simpler) connection
submission = new MSubmission(1, new Date(), SubmissionStatus.SUCCEEDED, "job-x");
handler.createSubmission(submission, provider.getConnection());
assertEquals(2, submission.getPersistenceId());
Assert.assertEquals(provider.rowCount("SQOOP", "SQ_SUBMISSION"), 2);
}
@Test
public void testUpdateSubmission() throws Exception {
loadSubmissions();
List<MSubmission> submissions = handler.findUnfinishedSubmissions(provider.getConnection());
assertNotNull(submissions);
assertEquals(1, submissions.size());
MSubmission submission = submissions.get(0);
submission.setStatus(SubmissionStatus.SUCCEEDED);
handler.updateSubmission(submission, provider.getConnection());
submissions = handler.findUnfinishedSubmissions(provider.getConnection());
assertNotNull(submissions);
assertEquals(0, submissions.size());
}
@Test
public void testCreateSubmissionExceptionDetailsMoreThanMaxLimit() throws Exception {
String externalLink = "http://somewheresomewheresomewheresomewheresomewheresomewheresomewheresomewheresomewheresomewheresomewheresom"
+ "ewheresomewheresomewheresomewheresomewher";
String errorSummary = "RuntimeExceptionRuntimeExceptionRuntimeExceptionRuntimeExceptionRuntimeExceptionRuntimeExceptions"
+ "RuntimeExceptionRuntimeExceptionRuntimeExceptiontests";
String errorDetail = "Yeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah"
+ " it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it hap"
+ "pensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYea"
+ "h it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it ha"
+ "ppensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah"
+ " it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happe"
+ "nsYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happens";
MSubmission submission = new MSubmission();
submission.setJobId(1);
submission.setStatus(SubmissionStatus.RUNNING);
submission.setCreationDate(new Date());
submission.setLastUpdateDate(new Date());
submission.setExternalJobId("job-x");
submission.setExternalLink(externalLink + "more than 150");
submission.getError().setErrorSummary("RuntimeException");
submission.getError().setErrorDetails(errorDetail + "morethan750");
submission.getError().setErrorSummary(errorSummary + "morethan150");
handler.createSubmission(submission, provider.getConnection());
List<MSubmission> submissions = handler.findSubmissionsForJob(1, provider.getConnection());
assertNotNull(submissions);
assertEquals(errorDetail, submissions.get(0).getError().getErrorDetails());
assertEquals(errorSummary, submissions.get(0).getError().getErrorSummary());
assertEquals(externalLink, submissions.get(0).getExternalLink());
}
@Test
public void testUpdateSubmissionExceptionDetailsMoreThanMaxLimit() throws Exception {
loadSubmissions();
List<MSubmission> submissions = handler.findUnfinishedSubmissions(provider.getConnection());
assertNotNull(submissions);
assertEquals(1, submissions.size());
String errorSummary = "RuntimeExceptionRuntimeExceptionRuntimeExceptionRuntimeExceptionRuntimeExceptionRuntimeExceptions"
+ "RuntimeExceptionRuntimeExceptionRuntimeExceptiontests";
String errorDetail = "Yeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah"
+ " it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it hap"
+ "pensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYea"
+ "h it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it ha"
+ "ppensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah"
+ " it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happe"
+ "nsYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happens";
MSubmission submission = submissions.get(0);
String externalLink = submission.getExternalLink();
submission.getError().setErrorDetails(errorDetail + "morethan750");
submission.getError().setErrorSummary(errorSummary + "morethan150");
submission.setExternalLink("cantupdate");
handler.updateSubmission(submission, provider.getConnection());
submissions = handler.findUnfinishedSubmissions(provider.getConnection());
assertNotNull(submissions);
assertEquals(errorDetail, submissions.get(0).getError().getErrorDetails());
assertEquals(errorSummary, submissions.get(0).getError().getErrorSummary());
// note we dont allow external link update
assertEquals(externalLink, submissions.get(0).getExternalLink());
}
@Test
public void testPurgeSubmissions() throws Exception {
loadSubmissions();
List<MSubmission> submissions;
submissions = handler.findUnfinishedSubmissions(provider.getConnection());
assertNotNull(submissions);
assertEquals(1, submissions.size());
Assert.assertEquals(provider.rowCount("SQOOP", "SQ_SUBMISSION"), 4);
Calendar calendar = Calendar.getInstance();
// 2012-01-03 05:05:05
calendar.set(2012, Calendar.JANUARY, 3, 5, 5, 5);
handler.purgeSubmissions(calendar.getTime(), provider.getConnection());
submissions = handler.findUnfinishedSubmissions(provider.getConnection());
assertNotNull(submissions);
assertEquals(1, submissions.size());
Assert.assertEquals(provider.rowCount("SQOOP", "SQ_SUBMISSION"), 4);
handler.purgeSubmissions(new Date(), provider.getConnection());
submissions = handler.findUnfinishedSubmissions(provider.getConnection());
assertNotNull(submissions);
assertEquals(0, submissions.size());
Assert.assertEquals(provider.rowCount("SQOOP", "SQ_SUBMISSION"), 0);
handler.purgeSubmissions(new Date(), provider.getConnection());
submissions = handler.findUnfinishedSubmissions(provider.getConnection());
assertNotNull(submissions);
assertEquals(0, submissions.size());
Assert.assertEquals(provider.rowCount("SQOOP", "SQ_SUBMISSION"), 0);
}
/**
* Test that by directly removing jobs we will also remove associated
* submissions and counters.
*
* @throws Exception
*/
@Test
public void testDeleteJobs() throws Exception {
MJob jobA = handler.findJob(JOB_A_NAME, provider.getConnection());
MJob jobB = handler.findJob(JOB_B_NAME, provider.getConnection());
loadSubmissions();
Assert.assertEquals(provider.rowCount("SQOOP", "SQ_SUBMISSION"), 4);
handler.deleteJob(jobA.getPersistenceId(), provider.getConnection());
Assert.assertEquals(provider.rowCount("SQOOP", "SQ_SUBMISSION"), 2);
handler.deleteJob(jobB.getPersistenceId(), provider.getConnection());
Assert.assertEquals(provider.rowCount("SQOOP", "SQ_SUBMISSION"), 0);
}
}

View File

@ -1,59 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.repository.postgresql;
import org.apache.sqoop.common.test.db.DatabaseProvider;
import org.apache.sqoop.common.test.db.PostgreSQLProvider;
import org.testng.SkipException;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
/**
* Abstract class with convenience methods for testing postgresql repository.
*/
abstract public class PostgresqlTestCase {
public static DatabaseProvider provider;
public static PostgresqlTestUtils utils;
public PostgresqlRepositoryHandler handler;
@BeforeClass(alwaysRun = true)
public void setUpClass() {
provider = new PostgreSQLProvider();
utils = new PostgresqlTestUtils(provider);
}
@BeforeMethod(alwaysRun = true)
public void setUp() throws Exception {
try {
provider.start();
} catch (RuntimeException e) {
throw new SkipException("Cannot connect to provider.", e);
}
handler = new PostgresqlRepositoryHandler();
handler.createOrUpgradeRepository(provider.getConnection());
}
@AfterMethod(alwaysRun = true)
public void tearDown() throws Exception {
provider.dropSchema("SQOOP");
provider.stop();
}
}

View File

@ -17,6 +17,7 @@
*/
package org.apache.sqoop.test.testcases;
import static org.testng.Assert.fail;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.Assert.assertNotSame;
@ -43,6 +44,9 @@
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import java.sql.ResultSet;
import java.sql.SQLException;
/**
* Base test case suitable for connector testing.
*
@ -117,10 +121,6 @@ protected void insertRow(Object ...values) {
provider.insertRow(getTableName(), values);
}
protected long rowCount() {
return provider.rowCount(getTableName());
}
/**
* Fill link config based on currently active provider.
*
@ -199,7 +199,7 @@ protected void createAndLoadTableUbuntuReleases() {
* @param conditions Conditions in config that are expected by the database provider
* @param values Values that are expected in the table (with corresponding types)
*/
protected void assertRow(Object []conditions, Object ...values) {
protected void assertRow(Object[] conditions, Object ...values) {
ProviderAsserts.assertRow(provider, getTableName(), conditions, values);
}

View File

@ -68,7 +68,7 @@ public void testBasic() throws Exception {
executeJob(job);
assertEquals(4L, rowCount());
assertEquals(4L, provider.rowCount(null, getTableName()));
assertRowInCities(1, "USA", "2004-10-23", "San Francisco");
assertRowInCities(2, "USA", "2004-10-24", "Sunnyvale");
assertRowInCities(3, "Czech Republic", "2004-10-25", "Brno");

View File

@ -74,8 +74,8 @@ public void testStagedTransfer() throws Exception {
executeJob(job);
assertEquals(0L, provider.rowCount(stageTableName));
assertEquals(4L, rowCount());
assertEquals(0L, provider.rowCount(null, stageTableName));
assertEquals(4L, provider.rowCount(null, getTableName()));
assertRowInCities(1, "USA", "2004-10-23", "San Francisco");
assertRowInCities(2, "USA", "2004-10-24", "Sunnyvale");
assertRowInCities(3, "Czech Republic", "2004-10-25", "Brno");