mirror of
https://github.com/apache/sqoop.git
synced 2025-05-03 19:51:41 +08:00
SQOOP-1627: Fix Hadoop100 and Hadoop20 profile
(Venkat Ranganathan via Abraham Elmahrek)
This commit is contained in:
parent
52604b1661
commit
e1c6e4a731
@ -125,9 +125,9 @@
|
|||||||
<if>
|
<if>
|
||||||
<equals arg1="${hadoopversion}" arg2="20" />
|
<equals arg1="${hadoopversion}" arg2="20" />
|
||||||
<then>
|
<then>
|
||||||
<property name="hadoop.version" value="0.20.2-cdh3u1" />
|
<property name="hadoop.version" value="0.20.2-cdh3u5" />
|
||||||
<property name="hbase94.version" value="0.90.3-cdh3u1" />
|
<property name="hbase94.version" value="0.90.6-cdh3u5" />
|
||||||
<property name="zookeeper.version" value="3.3.3-cdh3u1" />
|
<property name="zookeeper.version" value="3.3.3-cdh3u5" />
|
||||||
<property name="hadoop.version.full" value="0.20" />
|
<property name="hadoop.version.full" value="0.20" />
|
||||||
<property name="hcatalog.version" value="0.13.0" />
|
<property name="hcatalog.version" value="0.13.0" />
|
||||||
<property name="hbasecompatprofile" value="1" />
|
<property name="hbasecompatprofile" value="1" />
|
||||||
@ -150,7 +150,7 @@
|
|||||||
<elseif>
|
<elseif>
|
||||||
<equals arg1="${hadoopversion}" arg2="100" />
|
<equals arg1="${hadoopversion}" arg2="100" />
|
||||||
<then>
|
<then>
|
||||||
<property name="hadoop.version" value="1.0.0" />
|
<property name="hadoop.version" value="1.0.4" />
|
||||||
<property name="hbase94.version" value="0.92.0" />
|
<property name="hbase94.version" value="0.92.0" />
|
||||||
<property name="zookeeper.version" value="3.4.2" />
|
<property name="zookeeper.version" value="3.4.2" />
|
||||||
<property name="hadoop.version.full" value="1.0.0" />
|
<property name="hadoop.version.full" value="1.0.0" />
|
||||||
|
@ -155,10 +155,25 @@ protected void jobSetup(Job job) throws IOException, ImportException {
|
|||||||
throw new ImportException(
|
throw new ImportException(
|
||||||
"Import to HBase error: Column family not specified");
|
"Import to HBase error: Column family not specified");
|
||||||
}
|
}
|
||||||
|
Method m = null;
|
||||||
|
try {
|
||||||
|
m = HBaseConfiguration.class.getMethod("merge",
|
||||||
|
Configuration.class, Configuration.class);
|
||||||
|
} catch (NoSuchMethodException nsme) {
|
||||||
|
}
|
||||||
|
|
||||||
// Add HBase configuration files to this conf object.
|
if (m != null) {
|
||||||
Configuration newConf = HBaseConfiguration.create(conf);
|
// Add HBase configuration files to this conf object.
|
||||||
HBaseConfiguration.merge(conf, newConf);
|
|
||||||
|
Configuration newConf = HBaseConfiguration.create(conf);
|
||||||
|
try {
|
||||||
|
m.invoke(null, conf, newConf);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new ImportException(e);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
HBaseConfiguration.addHbaseResources(conf);
|
||||||
|
}
|
||||||
|
|
||||||
HBaseAdmin admin = new HBaseAdmin(conf);
|
HBaseAdmin admin = new HBaseAdmin(conf);
|
||||||
|
|
||||||
|
@ -201,7 +201,7 @@ public void setUp() throws IOException {
|
|||||||
conf.setClass(DBConfiguration.INPUT_CLASS_PROPERTY, DummySqoopRecord.class,
|
conf.setClass(DBConfiguration.INPUT_CLASS_PROPERTY, DummySqoopRecord.class,
|
||||||
DBWritable.class);
|
DBWritable.class);
|
||||||
|
|
||||||
Job job = Job.getInstance(conf);
|
Job job = new Job(conf);
|
||||||
mfDIS = new MainframeDatasetInputSplit();
|
mfDIS = new MainframeDatasetInputSplit();
|
||||||
mfDIS.addDataset("test1");
|
mfDIS.addDataset("test1");
|
||||||
mfDIS.addDataset("test2");
|
mfDIS.addDataset("test2");
|
||||||
|
@ -96,7 +96,7 @@ public void testRetrieveDatasets() throws IOException {
|
|||||||
|
|
||||||
String dsName = "dsName1";
|
String dsName = "dsName1";
|
||||||
conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME, dsName);
|
conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME, dsName);
|
||||||
Job job = Job.getInstance(conf);
|
Job job = new Job(conf);
|
||||||
format.getSplits(job);
|
format.getSplits(job);
|
||||||
|
|
||||||
List<InputSplit> splits = new ArrayList<InputSplit>();
|
List<InputSplit> splits = new ArrayList<InputSplit>();
|
||||||
|
@ -1,13 +1,15 @@
|
|||||||
package org.apache.sqoop.mapreduce.sqlserver;
|
package org.apache.sqoop.mapreduce.sqlserver;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
|
||||||
|
import java.lang.reflect.Constructor;
|
||||||
|
|
||||||
import org.apache.commons.lang.StringUtils;
|
import org.apache.commons.lang.StringUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
|
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
|
||||||
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||||
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||||
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
|
|
||||||
import org.apache.sqoop.manager.SQLServerManager;
|
import org.apache.sqoop.manager.SQLServerManager;
|
||||||
import org.apache.sqoop.mapreduce.ExportJobBase;
|
import org.apache.sqoop.mapreduce.ExportJobBase;
|
||||||
import org.apache.sqoop.mapreduce.sqlserver.SqlServerUpsertOutputFormat.SqlServerUpsertRecordWriter;
|
import org.apache.sqoop.mapreduce.sqlserver.SqlServerUpsertOutputFormat.SqlServerUpsertRecordWriter;
|
||||||
@ -19,7 +21,8 @@ public class SqlServerUpsertOutputFormatTest {
|
|||||||
@Test
|
@Test
|
||||||
public void Merge_statement_is_parameterized_correctly() throws Exception {
|
public void Merge_statement_is_parameterized_correctly() throws Exception {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
conf.set(DBConfiguration.DRIVER_CLASS_PROPERTY, org.hsqldb.jdbcDriver.class.getName());
|
conf.set(DBConfiguration.DRIVER_CLASS_PROPERTY,
|
||||||
|
org.hsqldb.jdbcDriver.class.getName());
|
||||||
conf.set(DBConfiguration.URL_PROPERTY, "jdbc:hsqldb:.");
|
conf.set(DBConfiguration.URL_PROPERTY, "jdbc:hsqldb:.");
|
||||||
conf.set(ExportJobBase.SQOOP_EXPORT_UPDATE_COL_KEY, "");
|
conf.set(ExportJobBase.SQOOP_EXPORT_UPDATE_COL_KEY, "");
|
||||||
conf.set(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY, "");
|
conf.set(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY, "");
|
||||||
@ -27,18 +30,49 @@ public void Merge_statement_is_parameterized_correctly() throws Exception {
|
|||||||
String[] columnNames = { "FirstColumn", "SecondColumn", "ThirdColumn" };
|
String[] columnNames = { "FirstColumn", "SecondColumn", "ThirdColumn" };
|
||||||
String[] updateKeyColumns = { "FirstColumn" };
|
String[] updateKeyColumns = { "FirstColumn" };
|
||||||
conf.set(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY, tableName);
|
conf.set(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY, tableName);
|
||||||
conf.set(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY, StringUtils.join(columnNames, ','));
|
conf.set(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY,
|
||||||
conf.set(ExportJobBase.SQOOP_EXPORT_UPDATE_COL_KEY, StringUtils.join(updateKeyColumns, ','));
|
StringUtils.join(columnNames, ','));
|
||||||
|
conf.set(ExportJobBase.SQOOP_EXPORT_UPDATE_COL_KEY,
|
||||||
|
StringUtils.join(updateKeyColumns, ','));
|
||||||
conf.set(SQLServerManager.TABLE_HINTS_PROP, "NOLOCK");
|
conf.set(SQLServerManager.TABLE_HINTS_PROP, "NOLOCK");
|
||||||
conf.set(SQLServerManager.IDENTITY_INSERT_PROP, "true");
|
conf.set(SQLServerManager.IDENTITY_INSERT_PROP, "true");
|
||||||
TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
|
TaskAttemptContext context = null;
|
||||||
SqlServerUpsertOutputFormat outputFormat = new SqlServerUpsertOutputFormat();
|
Class cls = null;
|
||||||
SqlServerUpsertRecordWriter recordWriter = outputFormat.new SqlServerUpsertRecordWriter(context);
|
try {
|
||||||
assertEquals("SET IDENTITY_INSERT #myTable ON " +
|
cls =
|
||||||
"MERGE INTO #myTable AS _target USING ( VALUES ( ?, ?, ? ) ) AS _source ( FirstColumn, SecondColumn, ThirdColumn ) ON _source.FirstColumn = _target.FirstColumn" +
|
Class
|
||||||
" WHEN MATCHED THEN UPDATE SET _target.SecondColumn = _source.SecondColumn, _target.ThirdColumn = _source.ThirdColumn" +
|
.forName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl");
|
||||||
" WHEN NOT MATCHED THEN INSERT ( FirstColumn, SecondColumn, ThirdColumn ) VALUES " +
|
}
|
||||||
"( _source.FirstColumn, _source.SecondColumn, _source.ThirdColumn ) " +
|
catch(ClassNotFoundException cnfe) {
|
||||||
"OPTION (NOLOCK);", recordWriter.getUpdateStatement());
|
// Not hadoop 2.0
|
||||||
|
}
|
||||||
|
if (cls == null) {
|
||||||
|
try {
|
||||||
|
cls =
|
||||||
|
Class
|
||||||
|
.forName("org.apache.hadoop.mapreduce.task.TaskAttemptContext");
|
||||||
|
}
|
||||||
|
catch(ClassNotFoundException cnfe) {
|
||||||
|
// Something wrong
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertNotNull(cls);
|
||||||
|
Constructor c = cls.getConstructor(Configuration.class,
|
||||||
|
TaskAttemptID.class);
|
||||||
|
context = (TaskAttemptContext)c.newInstance(conf, new TaskAttemptID());
|
||||||
|
SqlServerUpsertOutputFormat outputFormat =
|
||||||
|
new SqlServerUpsertOutputFormat();
|
||||||
|
SqlServerUpsertRecordWriter recordWriter =
|
||||||
|
outputFormat.new SqlServerUpsertRecordWriter(context);
|
||||||
|
assertEquals("SET IDENTITY_INSERT #myTable ON "
|
||||||
|
+ "MERGE INTO #myTable AS _target USING ( VALUES ( ?, ?, ? ) )"
|
||||||
|
+ " AS _source ( FirstColumn, SecondColumn, ThirdColumn ) ON "
|
||||||
|
+ "_source.FirstColumn = _target.FirstColumn"
|
||||||
|
+ " WHEN MATCHED THEN UPDATE SET _target.SecondColumn = "
|
||||||
|
+ "_source.SecondColumn, _target.ThirdColumn = _source.ThirdColumn"
|
||||||
|
+ " WHEN NOT MATCHED THEN INSERT ( FirstColumn, SecondColumn, "
|
||||||
|
+ " ThirdColumn ) VALUES "
|
||||||
|
+ "( _source.FirstColumn, _source.SecondColumn, _source.ThirdColumn ) "
|
||||||
|
+ "OPTION (NOLOCK);", recordWriter.getUpdateStatement());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user