5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-08 06:01:55 +08:00

SQOOP-2281: Set overwrite on kite dataset

(Abraham Elmahrek via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2015-04-03 09:45:55 -07:00
parent 64c5fc3755
commit 032bc54c86
3 changed files with 13 additions and 6 deletions

View File

@ -103,9 +103,8 @@ protected void configureMapper(Job job, String tableName,
// Parquet data records. The import will fail, if schema is invalid. // Parquet data records. The import will fail, if schema is invalid.
Schema schema = generateAvroSchema(tableName); Schema schema = generateAvroSchema(tableName);
String uri = getKiteUri(conf, tableName); String uri = getKiteUri(conf, tableName);
boolean reuseExistingDataset = options.isAppendMode() || ParquetJob.configureImportJob(conf, schema, uri, options.isAppendMode(),
(options.doHiveImport() && options.doOverwriteHiveTable()); options.doHiveImport() && options.doOverwriteHiveTable());
ParquetJob.configureImportJob(conf, schema, uri, reuseExistingDataset);
} }
job.setMapperClass(getMapperClass()); job.setMapperClass(getMapperClass());

View File

@ -71,9 +71,9 @@ public static CompressionType getCompressionType(Configuration conf) {
* {@link org.apache.avro.generic.GenericRecord}. * {@link org.apache.avro.generic.GenericRecord}.
*/ */
public static void configureImportJob(Configuration conf, Schema schema, public static void configureImportJob(Configuration conf, Schema schema,
String uri, boolean reuseExistingDataset) throws IOException { String uri, boolean reuseExistingDataset, boolean overwrite) throws IOException {
Dataset dataset; Dataset dataset;
if (reuseExistingDataset) { if (reuseExistingDataset || overwrite) {
try { try {
dataset = Datasets.load(uri); dataset = Datasets.load(uri);
} catch (DatasetNotFoundException ex) { } catch (DatasetNotFoundException ex) {
@ -89,7 +89,12 @@ public static void configureImportJob(Configuration conf, Schema schema,
dataset = createDataset(schema, getCompressionType(conf), uri); dataset = createDataset(schema, getCompressionType(conf), uri);
} }
conf.set(CONF_AVRO_SCHEMA, schema.toString()); conf.set(CONF_AVRO_SCHEMA, schema.toString());
DatasetKeyOutputFormat.configure(conf).writeTo(dataset);
if (overwrite) {
DatasetKeyOutputFormat.configure(conf).overwrite(dataset);
} else {
DatasetKeyOutputFormat.configure(conf).writeTo(dataset);
}
} }
private static Dataset createDataset(Schema schema, private static Dataset createDataset(Schema schema,

View File

@ -307,6 +307,9 @@ public void testCreateOverwriteHiveImport() throws IOException {
runImportTest(TABLE_NAME, types, vals, runImportTest(TABLE_NAME, types, vals,
"createOverwriteImport.q", getCreateHiveTableArgs(extraArgs), "createOverwriteImport.q", getCreateHiveTableArgs(extraArgs),
new CreateHiveTableTool()); new CreateHiveTableTool());
runImportTest(TABLE_NAME, types, vals,
"createOverwriteImport.q", getCreateHiveTableArgs(extraArgs),
new CreateHiveTableTool());
} }
/** Test that dates are coerced properly to strings. */ /** Test that dates are coerced properly to strings. */