5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-04 02:21:10 +08:00

SQOOP-1073: Sqoop2: Introduce schema for transferred data

(Jarek Jarcec Cecho via Venkat Ranganathan)
This commit is contained in:
Venkat Ranganathan 2013-06-30 21:05:50 -07:00
parent 344c6309c8
commit aa8e1e7794
40 changed files with 2478 additions and 2 deletions

View File

@ -405,6 +405,10 @@ public class Constants {
"submission.executed_success";
public static final String RES_SUBMISSION_SERVER_URL =
"submission.server_url";
public static final String RES_CONNECTOR_SCHEMA =
"submission.connector_schema";
public static final String RES_HIO_SCHEMA =
"submission.hio_schema";
private Constants() {
// Instantiation is prohibited

View File

@ -61,6 +61,16 @@ public static void displayHeader(MSubmission submission) {
println("\t" + externalLink);
}
}
if(isVerbose() && submission.getConnectorSchema() != null) {
print(resourceString(Constants.RES_CONNECTOR_SCHEMA)+": ");
println(submission.getConnectorSchema());
}
if(isVerbose() && submission.getHioSchema() != null) {
print(resourceString(Constants.RES_HIO_SCHEMA)+": ");
println(submission.getHioSchema());
}
}
/**

View File

@ -171,7 +171,6 @@ sqoop.prompt_shell_loadedrc = Resource file loaded.
submission.usage = Usage: submission {0}
submission.prompt_synchronous = Wait for submission to finish
submission.prompt_poll_timeout = How often the client should communicate with the server in milliseconds (Default: 10000)
# Various Table headers
table.header.id = Id
@ -211,4 +210,6 @@ submission.external_id = External ID
submission.progress_not_available = Progress is not available
submission.counters = Counters
submission.executed_success = Job executed successfully
submission.server_url = Server URL
submission.server_url = Server URL
submission.connector_schema = Connector schema
submission.hio_schema = Input/Output schema

View File

@ -41,6 +41,10 @@ limitations under the License.
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
</dependency>
</dependencies>
<build>

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.json;
import org.apache.sqoop.json.util.SchemaSerialization;
import org.apache.sqoop.schema.Schema;
import org.json.simple.JSONObject;
/**
*
*/
public class SchemaBean implements JsonBean {
private Schema schema;
// For "extract"
public SchemaBean(Schema schema) {
this.schema = schema;
}
// For "restore"
public SchemaBean() {
}
public Schema getSchema() {
return schema;
}
@Override
public JSONObject extract(boolean skipSensitive) {
return SchemaSerialization.extractSchema(schema);
}
@Override
public void restore(JSONObject jsonObject) {
schema = SchemaSerialization.restoreSchemna(jsonObject);
}
}

View File

@ -31,6 +31,9 @@
import java.util.Map;
import java.util.Set;
import static org.apache.sqoop.json.util.SchemaSerialization.extractSchema;
import static org.apache.sqoop.json.util.SchemaSerialization.restoreSchemna;
/**
*
*/
@ -47,6 +50,8 @@ public class SubmissionBean implements JsonBean {
private static final String EXCEPTION_TRACE = "exception-trace";
private static final String PROGRESS = "progress";
private static final String COUNTERS = "counters";
private static final String CONNECTOR_SCHEMA = "schema-connector";
private static final String HIO_SCHEMA = "schema-hio";
private List<MSubmission> submissions;
@ -103,6 +108,12 @@ public JSONObject extract(boolean skipSensitive) {
if(submission.getCounters() != null) {
object.put(COUNTERS, extractCounters(submission.getCounters()));
}
if(submission.getConnectorSchema() != null) {
object.put(CONNECTOR_SCHEMA, extractSchema(submission.getConnectorSchema()));
}
if(submission.getHioSchema() != null) {
object.put(HIO_SCHEMA, extractSchema(submission.getHioSchema()));
}
array.add(object);
}
@ -163,6 +174,12 @@ public void restore(JSONObject json) {
if(object.containsKey(COUNTERS)) {
submission.setCounters(restoreCounters((JSONObject) object.get(COUNTERS)));
}
if(object.containsKey(CONNECTOR_SCHEMA)) {
submission.setConnectorSchema(restoreSchemna((JSONObject) object.get(CONNECTOR_SCHEMA)));
}
if(object.containsKey(HIO_SCHEMA)) {
submission.setHioSchema(restoreSchemna((JSONObject) object.get(HIO_SCHEMA)));
}
this.submissions.add(submission);
}

View File

@ -0,0 +1,230 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.json.util;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.schema.type.AbstractComplexType;
import org.apache.sqoop.schema.type.AbstractString;
import org.apache.sqoop.schema.type.Column;
import org.apache.sqoop.schema.type.Array;
import org.apache.sqoop.schema.type.Binary;
import org.apache.sqoop.schema.type.Bit;
import org.apache.sqoop.schema.type.Date;
import org.apache.sqoop.schema.type.DateTime;
import org.apache.sqoop.schema.type.Decimal;
import org.apache.sqoop.schema.type.Enum;
import org.apache.sqoop.schema.type.FixedPoint;
import org.apache.sqoop.schema.type.FloatingPoint;
import org.apache.sqoop.schema.type.Map;
import org.apache.sqoop.schema.type.Set;
import org.apache.sqoop.schema.type.Text;
import org.apache.sqoop.schema.type.Time;
import org.apache.sqoop.schema.type.Type;
import org.apache.sqoop.schema.type.Unsupported;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
/**
*
*/
public class SchemaSerialization {
private static final String NAME = "name";
private static final String CREATION_DATE = "created";
private static final String NOTE = "note";
private static final String COLUMNS = "columns";
private static final String TYPE = "type";
private static final String NULLABLE = "nullable";
private static final String KEY = "key";
private static final String VALUE = "value";
private static final String SIZE = "size";
private static final String FRACTION = "fraction";
private static final String TIMEZONE = "timezone";
private static final String PRECISION = "precision";
private static final String SCALE = "scale";
private static final String UNSIGNED = "unsigned";
private static final String JDBC_TYPE = "jdbc-type";
public static JSONObject extractSchema(Schema schema) {
JSONObject object = new JSONObject();
object.put(NAME, schema.getName());
object.put(CREATION_DATE, schema.getCreationDate().getTime());
if(schema.getNote() != null) {
object.put(NOTE, schema.getNote());
}
JSONArray columnArray = new JSONArray();
for(Column column : schema.getColumns()) {
columnArray.add(extractColumn(column));
}
object.put(COLUMNS, columnArray);
return object;
}
public static Schema restoreSchemna(JSONObject jsonObject) {
String name = (String)jsonObject.get(NAME);
String note = (String)jsonObject.get(NOTE);
java.util.Date date = new java.util.Date((Long)jsonObject.get(CREATION_DATE));
Schema schema = new Schema(name)
.setNote(note)
.setCreationDate(date);
JSONArray columnsArray = (JSONArray)jsonObject.get(COLUMNS);
for (Object obj : columnsArray) {
schema.addColumn(restoreColumn((JSONObject)obj));
}
return schema;
}
private static JSONObject extractColumn(Column column) {
JSONObject ret = new JSONObject();
ret.put(NAME, column.getName());
ret.put(NULLABLE, column.getNullable());
ret.put(TYPE, column.getType().name());
switch (column.getType()) {
case MAP:
ret.put(VALUE, extractColumn(((Map)column).getValue()));
case ARRAY:
case ENUM:
case SET:
ret.put(KEY, extractColumn(((AbstractComplexType) column).getKey()));
break;
case BINARY:
case TEXT:
ret.put(SIZE, ((AbstractString)column).getSize());
break;
case DATE_TIME:
ret.put(FRACTION, ((DateTime)column).getFraction());
ret.put(TIMEZONE, ((DateTime)column).getTimezone());
break;
case DECIMAL:
ret.put(PRECISION, ((Decimal)column).getPrecision());
ret.put(SCALE, ((Decimal)column).getScale());
break;
case FIXED_POINT:
ret.put(SIZE, ((FixedPoint) column).getByteSize());
ret.put(UNSIGNED, ((FixedPoint)column).getUnsigned());
break;
case FLOATING_POINT:
ret.put(SIZE, ((FloatingPoint) column).getByteSize());
break;
case TIME:
ret.put(FRACTION, ((Time)column).getFraction());
break;
case UNSUPPORTED:
ret.put(JDBC_TYPE, ((Unsupported) column).getJdbcType());
break;
case DATE:
case BIT:
// Nothing to do extra
break;
default:
// TODO(jarcec): Throw an exception of unsupported type?
}
return ret;
}
private static Column restoreColumn(JSONObject obj) {
String name = (String) obj.get(NAME);
Boolean nullable = (Boolean) obj.get(NULLABLE);
Column key = null;
if(obj.containsKey(KEY)) {
key = restoreColumn((JSONObject) obj.get(KEY));
}
Column value = null;
if(obj.containsKey(VALUE)) {
value = restoreColumn((JSONObject) obj.get(VALUE));
}
Long size = (Long)obj.get(SIZE);
Boolean fraction = (Boolean)obj.get(FRACTION);
Boolean timezone = (Boolean)obj.get(TIMEZONE);
Long precision = (Long)obj.get(PRECISION);
Long scale = (Long)obj.get(SCALE);
Boolean unsigned = (Boolean)obj.get(UNSIGNED);
Long jdbcType = (Long)obj.get(JDBC_TYPE);
Type type = Type.valueOf((String) obj.get(TYPE));
Column output = null;
switch (type) {
case ARRAY:
output = new Array(key);
break;
case BINARY:
output = new Binary().setSize(size);
break;
case BIT:
output = new Bit();
break;
case DATE:
output = new Date();
break;
case DATE_TIME:
output = new DateTime().setFraction(fraction).setTimezone(timezone);
break;
case DECIMAL:
output = new Decimal().setPrecision(precision).setScale(scale);
break;
case ENUM:
output = new Enum(key);
break;
case FIXED_POINT:
output = new FixedPoint().setByteSize(size).setUnsigned(unsigned);
break;
case FLOATING_POINT:
output = new FloatingPoint().setByteSize(size);
break;
case MAP:
output = new Map(key, value);
break;
case SET:
output = new Set(key);
break;
case TEXT:
output = new Text().setSize(size);
break;
case TIME:
output = new Time().setFraction(fraction);
break;
case UNSUPPORTED:
output = new Unsupported().setJdbcType(jdbcType);
break;
default:
// TODO(Jarcec): Throw an exception of unsupported type?
}
output.setName(name);
output.setNullable(nullable);
return output;
}
private SchemaSerialization() {
// Serialization is prohibited
}
}

View File

@ -17,6 +17,7 @@
*/
package org.apache.sqoop.model;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.submission.SubmissionStatus;
import org.apache.sqoop.submission.counter.Counters;
@ -91,6 +92,22 @@ public class MSubmission extends MAccountableEntity {
*/
String exceptionStackTrace;
/**
* Schema that was reported by the connector.
*
* This is optional property that is currently not serialized into metastore.
*/
Schema connectorSchema;
/**
* Optional schema that reported by the underlying I/O implementation. Please
* note that this property might be empty and in such case the connector
* schema will use also on Hadoop I/O side.
*
* This is optional property that is currently not serialized into metastore.
*/
Schema hioSchema;
public MSubmission() {
status = SubmissionStatus.UNKNOWN;
progress = -1;
@ -194,6 +211,22 @@ public void setException(Throwable e) {
this.setExceptionStackTrace(writer.toString());
}
public Schema getConnectorSchema() {
return connectorSchema;
}
public void setConnectorSchema(Schema connectorSchema) {
this.connectorSchema = connectorSchema;
}
public Schema getHioSchema() {
return hioSchema;
}
public void setHioSchema(Schema hioSchema) {
this.hioSchema = hioSchema;
}
@Override
public String toString() {
return "MSubmission{" +
@ -207,6 +240,8 @@ public String toString() {
", externalLink='" + externalLink + '\'' +
", exceptionInfo='" + exceptionInfo + '\'' +
", exceptionStackTrace='" + exceptionStackTrace + '\'' +
", connectorSchema='" + connectorSchema + '\'' +
", hioSchema='" + hioSchema + '\'' +
'}';
}

View File

@ -0,0 +1,153 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.schema;
import org.apache.commons.lang.StringUtils;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.schema.type.Column;
import java.util.Date;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
/**
* Schema represents data that are being transferred.
*/
public class Schema {
/**
* Name of the schema, usually a table name.
*/
private String name;
/**
* Optional note.
*/
private String note;
/**
* Generation date.
*/
private Date creationDate;
/**
* Columns associated with the schema.
*/
private List<Column> columns;
/**
* Helper set for quick column name lookups.
*/
private Set<String> columNames;
private Schema() {
creationDate = new Date();
columns = new LinkedList<Column>();
columNames = new HashSet<String>();
}
public Schema(String name) {
this();
assert name != null;
this.name = name;
}
/**
* Add column to the schema.
*
* Add new column to the schema at the end (e.g. after all previously added
* columns). The column names must be unique and thus adding column with the
* same name will lead to an exception being thrown.
*
* @param column Column that should be added to the schema at the end.
* @return
*/
public Schema addColumn(Column column) {
if(column.getName() == null) {
throw new SqoopException(SchemaError.SCHEMA_0001, "Column: " + column);
}
if(columNames.contains(column.getName())) {
throw new SqoopException(SchemaError.SCHEMA_0002, "Column: " + column);
}
columNames.add(column.getName());
columns.add(column);
return this;
}
public String getName() {
return name;
}
public Date getCreationDate() {
return creationDate;
}
public String getNote() {
return note;
}
public Schema setNote(String note) {
this.note = note;
return this;
}
public Schema setCreationDate(Date creationDate) {
this.creationDate = creationDate;
return this;
}
public List<Column> getColumns() {
return columns;
}
public String toString() {
return new StringBuilder("Schema{")
.append("name=").append(name).append("")
.append(",columns=[\n\t").append(StringUtils.join(columns, ",\n\t")).append("]")
.append("}")
.toString();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof Schema)) return false;
Schema schema = (Schema) o;
if (columns != null ? !columns.equals(schema.columns) : schema.columns != null)
return false;
if (name != null ? !name.equals(schema.name) : schema.name != null)
return false;
return true;
}
@Override
public int hashCode() {
int result = name != null ? name.hashCode() : 0;
result = 31 * result + (columns != null ? columns.hashCode() : 0);
return result;
}
}

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.schema;
import org.apache.sqoop.common.ErrorCode;
/**
*
*/
public enum SchemaError implements ErrorCode {
SCHEMA_0000("Unknown error"),
SCHEMA_0001("Column without name"),
SCHEMA_0002("Duplicate column name"),
;
private final String message;
private SchemaError(String message) {
this.message = message;
}
public String getCode() {
return name();
}
public String getMessage() {
return message;
}
}

View File

@ -0,0 +1,80 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.schema.type;
/**
* Complex types that are incorporating primitive types.
*/
public abstract class AbstractComplexType extends Column {
/**
* Incorporated type
*/
private Column key;
public AbstractComplexType(Column key) {
setKey(key);
}
public AbstractComplexType(String name, Column key) {
super(name);
setKey(key);
}
public AbstractComplexType(String name, Boolean nullable, Column key) {
super(name, nullable);
setKey(key);
}
public Column getKey() {
return key;
}
public void setKey(Column key) {
assert key != null;
this.key = key;
}
@Override
public String toString() {
return new StringBuilder(super.toString())
.append(",key=").append(key.toString())
.toString();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof AbstractComplexType)) return false;
if (!super.equals(o)) return false;
AbstractComplexType that = (AbstractComplexType) o;
if (key != null ? !key.equals(that.key) : that.key != null) return false;
return true;
}
@Override
public int hashCode() {
int result = super.hashCode();
result = 31 * result + (key != null ? key.hashCode() : 0);
return result;
}
}

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.schema.type;
/**
* Any time or date related data type.
*/
public abstract class AbstractDateTime extends Column {
protected AbstractDateTime() {
}
protected AbstractDateTime(String name) {
super(name);
}
protected AbstractDateTime(String name, Boolean nullable) {
super(name, nullable);
}
}

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.schema.type;
/**
* Any type related to number.
*/
public abstract class AbstractNumber extends Column {
protected AbstractNumber() {
}
protected AbstractNumber(String name) {
super(name);
}
protected AbstractNumber(String name, Boolean nullable) {
super(name, nullable);
}
}

View File

@ -0,0 +1,84 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.schema.type;
/**
* Any type that is encoding character (or byte) array.
*/
public abstract class AbstractString extends Column {
private Long size;
protected AbstractString() {
}
protected AbstractString(String name) {
super(name);
}
protected AbstractString(String name, Long size) {
super(name);
this.size = size;
}
protected AbstractString(String name, Boolean nullable) {
super(name, nullable);
}
protected AbstractString(String name, Boolean nullable, Long size) {
super(name, nullable);
this.size = size;
}
public Long getSize() {
return size;
}
public AbstractString setSize(Long size) {
this.size = size;
return this;
}
@Override
public String toString() {
return new StringBuilder(super.toString())
.append(",size=").append(size)
.toString();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof AbstractString)) return false;
if (!super.equals(o)) return false;
AbstractString that = (AbstractString) o;
if (size != null ? !size.equals(that.size) : that.size != null)
return false;
return true;
}
@Override
public int hashCode() {
int result = super.hashCode();
result = 31 * result + (size != null ? size.hashCode() : 0);
return result;
}
}

View File

@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.schema.type;
/**
* Array contains multiple values of the same type.
*
* JDBC Types: array
*/
public class Array extends AbstractComplexType {
public Array(Column key) {
super(key);
}
public Array(String name, Column key) {
super(name, key);
}
public Array(String name, Boolean nullable, Column key) {
super(name, nullable, key);
}
@Override
public Type getType() {
return Type.ARRAY;
}
@Override
public String toString() {
return new StringBuilder("Array{")
.append(super.toString())
.append("}")
.toString();
}
}

View File

@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.schema.type;
/**
* Binary type can contain any binary value (images, text, ...).
*
* JDBC Types: blob, binary, varbinary
*/
public class Binary extends AbstractString {
public Binary() {
}
public Binary(String name) {
super(name);
}
public Binary(String name, Long size) {
super(name);
}
public Binary(String name, Boolean nullable) {
super(name, nullable);
}
public Binary(String name, Boolean nullable, Long size) {
super(name, nullable, size);
}
@Override
public Type getType() {
return Type.BINARY;
}
@Override
public String toString() {
return new StringBuilder("Binary{")
.append(super.toString())
.append("}")
.toString();
}
}

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.schema.type;
/**
* True/False value.
*
* JDBC Types: bit, boolean
*/
public class Bit extends Column {
public Bit() {
}
public Bit(String name) {
super(name);
}
public Bit(String name, Boolean nullable) {
super(name, nullable);
}
@Override
public Type getType() {
return Type.BIT;
}
@Override
public String toString() {
return new StringBuilder("Bit{")
.append(super.toString())
.append("}")
.toString();
}
}

View File

@ -0,0 +1,101 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.schema.type;
/**
* Parent of all Sqoop types.
*/
public abstract class Column {
/**
* Name of the column.
*/
String name;
/**
* Whether NULL is allowed or not.
*/
Boolean nullable;
public Column() {
}
public Column(String name) {
setName(name);
}
public Column(String name, Boolean nullable) {
setName(name);
setNullable(nullable);
}
/**
* Return type of the Column.
*
* @return Type of the column
*/
public abstract Type getType();
public Column setName(String name) {
this.name = name;
return this;
}
public Column setNullable(Boolean nullable) {
this.nullable = nullable;
return this;
}
public Boolean getNullable() {
return nullable;
}
public String getName() {
return name;
}
public String toString() {
return new StringBuilder()
.append("name=").append(name).append(",")
.append("nullable=").append(nullable)
.toString();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof Column)) return false;
Column that = (Column) o;
if (name != null ? !name.equals(that.name) : that.name != null)
return false;
if (nullable != null ? !nullable.equals(that.nullable) : that.nullable != null)
return false;
return true;
}
@Override
public int hashCode() {
int result = 1;
result = 31 * result + (name != null ? name.hashCode() : 0);
result = 31 * result + (nullable != null ? nullable.hashCode() : 0);
return result;
}
}

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.schema.type;
/**
* Date (year, month, day).
*
* JDBC Types: date
*/
public class Date extends AbstractDateTime {
public Date() {
}
public Date(String name) {
super(name);
}
public Date(String name, Boolean nullable) {
super(name, nullable);
}
@Override
public Type getType() {
return Type.DATE;
}
@Override
public String toString() {
return new StringBuilder("Date{")
.append(super.toString())
.append("}")
.toString();
}
}

View File

@ -0,0 +1,117 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.schema.type;
/**
* Date and time information together.
*
* JDBC Types: datetime, timestamp
*/
public class DateTime extends AbstractDateTime {
/**
* The column can contain fractions of seconds.
*/
private Boolean fraction;
/**
* The column do have encoded timezone.
*/
private Boolean timezone;
public DateTime() {
}
public DateTime(String name) {
super(name);
}
public DateTime(Boolean fraction, Boolean timezone) {
this.fraction = fraction;
this.timezone = timezone;
}
public DateTime(String name, Boolean fraction, Boolean timezone) {
super(name);
this.fraction = fraction;
this.timezone = timezone;
}
public DateTime(String name, Boolean nullable, Boolean fraction, Boolean timezone) {
super(name, nullable);
this.fraction = fraction;
this.timezone = timezone;
}
public Boolean getFraction() {
return fraction;
}
public DateTime setFraction(Boolean fraction) {
this.fraction = fraction;
return this;
}
public Boolean getTimezone() {
return timezone;
}
public DateTime setTimezone(Boolean timezone) {
this.timezone = timezone;
return this;
}
@Override
public Type getType() {
return Type.DATE_TIME;
}
@Override
public String toString() {
return new StringBuilder("Date{")
.append(super.toString())
.append(",fraction=").append(fraction)
.append(",timezone=").append(timezone)
.append("}")
.toString();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof DateTime)) return false;
if (!super.equals(o)) return false;
DateTime dateTime = (DateTime) o;
if (fraction != null ? !fraction.equals(dateTime.fraction) : dateTime.fraction != null)
return false;
if (timezone != null ? !timezone.equals(dateTime.timezone) : dateTime.timezone != null)
return false;
return true;
}
@Override
public int hashCode() {
int result = super.hashCode();
result = 31 * result + (fraction != null ? fraction.hashCode() : 0);
result = 31 * result + (timezone != null ? timezone.hashCode() : 0);
return result;
}
}

View File

@ -0,0 +1,117 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.schema.type;
/**
* Fixed point number with configurable precision and scale.
*
* JDBC Types: numeric, decimal
*/
public class Decimal extends AbstractNumber {
/**
* Number of valid numbers.
*/
private Long precision;
/**
* Number of decimal places.
*/
private Long scale;
public Decimal() {
}
public Decimal(String name) {
super(name);
}
public Decimal(Long precision, Long scale) {
this.precision = precision;
this.scale = scale;
}
public Decimal(String name, Long precision, Long scale) {
super(name);
this.precision = precision;
this.scale = scale;
}
public Decimal(String name, Boolean nullable, Long precision, Long scale) {
super(name, nullable);
this.precision = precision;
this.scale = scale;
}
public Long getPrecision() {
return precision;
}
public Decimal setPrecision(Long precision) {
this.precision = precision;
return this;
}
public Long getScale() {
return scale;
}
public Decimal setScale(Long scale) {
this.scale = scale;
return this;
}
@Override
public Type getType() {
return Type.DECIMAL;
}
@Override
public String toString() {
return new StringBuilder("Decimal{")
.append(super.toString())
.append(",precision=").append(precision)
.append(",scale=").append(scale)
.append("}")
.toString();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof Decimal)) return false;
if (!super.equals(o)) return false;
Decimal decimal = (Decimal) o;
if (precision != null ? !precision.equals(decimal.precision) : decimal.precision != null)
return false;
if (scale != null ? !scale.equals(decimal.scale) : decimal.scale != null)
return false;
return true;
}
@Override
public int hashCode() {
int result = super.hashCode();
result = 31 * result + (precision != null ? precision.hashCode() : 0);
result = 31 * result + (scale != null ? scale.hashCode() : 0);
return result;
}
}

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.schema.type;
/**
* Enum can contain one value from predefined list.
*
* JDBC Types: enum
*/
public class Enum extends AbstractComplexType {
public Enum(Column key) {
super(key);
}
public Enum(String name, Column key) {
super(name, key);
}
public Enum(String name, Boolean nullable, Column key) {
super(name, nullable, key);
}
@Override
public Type getType() {
return Type.ENUM;
}
@Override
public String toString() {
return new StringBuilder("Enum{")
.append(super.toString())
.append("}")
.toString();
}
}

View File

@ -0,0 +1,111 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.schema.type;
/**
* Basic non-floating number.
*
* JDBC Types: int, long, bigint, smallint
*/
public class FixedPoint extends AbstractNumber {
private Long byteSize;
private Boolean unsigned;
public FixedPoint() {
}
public FixedPoint(String name) {
super(name);
}
public FixedPoint(Long byteSize, Boolean unsigned) {
this.byteSize = byteSize;
this.unsigned = unsigned;
}
public FixedPoint(String name, Long byteSize, Boolean unsigned) {
super(name);
this.byteSize = byteSize;
this.unsigned = unsigned;
}
public FixedPoint(String name, Boolean nullable, Long byteSize, Boolean unsigned) {
super(name, nullable);
this.byteSize = byteSize;
this.unsigned = unsigned;
}
public Long getByteSize() {
return byteSize;
}
public FixedPoint setByteSize(Long byteSize) {
this.byteSize = byteSize;
return this;
}
public Boolean getUnsigned() {
return unsigned;
}
public FixedPoint setUnsigned(Boolean unsigned) {
this.unsigned = unsigned;
return this;
}
@Override
public Type getType() {
return Type.FIXED_POINT;
}
@Override
public String toString() {
return new StringBuilder("FixedPoint{")
.append(super.toString())
.append(",byteSize=").append(byteSize)
.append(",unsigned=").append(unsigned)
.append("}")
.toString();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof FixedPoint)) return false;
if (!super.equals(o)) return false;
FixedPoint that = (FixedPoint) o;
if (byteSize != null ? !byteSize.equals(that.byteSize) : that.byteSize != null)
return false;
if (unsigned != null ? !unsigned.equals(that.unsigned) : that.unsigned != null)
return false;
return true;
}
@Override
public int hashCode() {
int result = super.hashCode();
result = 31 * result + (byteSize != null ? byteSize.hashCode() : 0);
result = 31 * result + (unsigned != null ? unsigned.hashCode() : 0);
return result;
}
}

View File

@ -0,0 +1,93 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.schema.type;
/**
* Floating point represented as IEEE norm.
*
* JDBC Types: double, float, real
*/
public class FloatingPoint extends AbstractNumber {
private Long byteSize;
public FloatingPoint() {
}
public FloatingPoint(String name) {
super(name);
}
public FloatingPoint(Long byteSize) {
this.byteSize = byteSize;
}
public FloatingPoint(String name, Long byteSize) {
super(name);
this.byteSize = byteSize;
}
public FloatingPoint(String name, Boolean nullable, Long byteSize) {
super(name, nullable);
this.byteSize = byteSize;
}
public Long getByteSize() {
return byteSize;
}
public FloatingPoint setByteSize(Long byteSize) {
this.byteSize = byteSize;
return this;
}
@Override
public Type getType() {
return Type.FLOATING_POINT;
}
@Override
public String toString() {
return new StringBuilder("FloatingPoint{")
.append(super.toString())
.append(",byteSize=").append(byteSize)
.append("}")
.toString();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof FloatingPoint)) return false;
if (!super.equals(o)) return false;
FloatingPoint that = (FloatingPoint) o;
if (byteSize != null ? !byteSize.equals(that.byteSize) : that.byteSize != null)
return false;
return true;
}
@Override
public int hashCode() {
int result = super.hashCode();
result = 31 * result + (byteSize != null ? byteSize.hashCode() : 0);
return result;
}
}

View File

@ -0,0 +1,82 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.schema.type;
/**
* Associative array.
*
* JDBC Types: map
*/
public class Map extends AbstractComplexType {
private Column value;
public Map(Column key, Column value) {
super(key);
this.value = value;
}
public Map(String name, Column key, Column value) {
super(name, key);
this.value = value;
}
public Map(String name, Boolean nullable, Column key, Column value) {
super(name, nullable, key);
this.value = value;
}
@Override
public Type getType() {
return Type.MAP;
}
public Column getValue() {
return value;
}
@Override
public String toString() {
return new StringBuilder("Map{")
.append(super.toString())
.append(",value=").append(value)
.append("}")
.toString();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof Map)) return false;
if (!super.equals(o)) return false;
Map map = (Map) o;
if (value != null ? !value.equals(map.value) : map.value != null)
return false;
return true;
}
@Override
public int hashCode() {
int result = super.hashCode();
result = 31 * result + (value != null ? value.hashCode() : 0);
return result;
}
}

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.schema.type;
/**
* Multiple values of the same type.
*
* JDBC Types: set
*/
public class Set extends AbstractComplexType {
public Set(Column key) {
super(key);
}
public Set(String name, Column key) {
super(name, key);
}
public Set(String name, Boolean nullable, Column key) {
super(name, nullable, key);
}
@Override
public Type getType() {
return Type.SET;
}
@Override
public String toString() {
return new StringBuilder("Set{")
.append(super.toString())
.append("}")
.toString();
}
}

View File

@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.schema.type;
/**
* String.
*
* JDBC Types: char, varchar, nchar, clob
*/
public class Text extends AbstractString {
public Text() {
}
public Text(String name) {
super(name);
}
public Text(String name, Long size) {
super(name, size);
}
public Text(String name, Boolean nullable) {
super(name, nullable);
}
public Text(String name, Boolean nullable, Long size) {
super(name, nullable, size);
}
@Override
public Type getType() {
return Type.TEXT;
}
@Override
public String toString() {
return new StringBuilder("Text{")
.append(super.toString())
.append("}")
.toString();
}
}

View File

@ -0,0 +1,93 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.schema.type;
/**
* Time (hours, minutes, seconds).
*
* JDBC Types: time
*/
public class Time extends AbstractDateTime {
private Boolean fraction;
public Time() {
}
public Time(String name) {
super(name);
}
public Time(Boolean fraction) {
this.fraction = fraction;
}
public Time(String name, Boolean fraction) {
super(name);
this.fraction = fraction;
}
public Time(String name, Boolean nullable, Boolean fraction) {
super(name, nullable);
this.fraction = fraction;
}
public Boolean getFraction() {
return fraction;
}
public Time setFraction(Boolean fraction) {
this.fraction = fraction;
return this;
}
@Override
public Type getType() {
return Type.TIME;
}
@Override
public String toString() {
return new StringBuilder("Time{")
.append(super.toString())
.append(",fraction=").append(fraction)
.append("}")
.toString();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof Time)) return false;
if (!super.equals(o)) return false;
Time time = (Time) o;
if (fraction != null ? !fraction.equals(time.fraction) : time.fraction != null)
return false;
return true;
}
@Override
public int hashCode() {
int result = super.hashCode();
result = 31 * result + (fraction != null ? fraction.hashCode() : 0);
return result;
}
}

View File

@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.schema.type;
/**
* All data types supported by Sqoop.
*/
public enum Type {
ARRAY,
BINARY,
BIT,
DATE,
DATE_TIME,
DECIMAL,
ENUM,
FIXED_POINT,
FLOATING_POINT,
MAP,
SET,
TEXT,
TIME,
UNSUPPORTED,
;
}

View File

@ -0,0 +1,69 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.schema.type;
/**
* Unsupported data type (internally encoded as binary).
*/
public class Unsupported extends Binary {
/**
* Optional JDBC type that is unknown.
*/
Long jdbcType;
@Override
public Type getType() {
return Type.UNSUPPORTED;
}
public Long getJdbcType() {
return jdbcType;
}
public Unsupported setJdbcType(Long jdbcType) {
this.jdbcType = jdbcType;
return this;
}
public Unsupported() {
}
public Unsupported(Long jdbcType) {
setJdbcType(jdbcType);
}
public Unsupported(String name) {
super(name);
}
public Unsupported(String name, Long jdbcType) {
super(name);
setJdbcType(jdbcType);
}
public Unsupported(String name, Boolean nullable) {
super(name, nullable);
}
public Unsupported(String name, Boolean nullable, Long jdbcType) {
super(name, nullable);
setJdbcType(jdbcType);
}
}

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.json;
import org.apache.sqoop.json.util.TestSchemaSerialization;
import org.apache.sqoop.schema.Schema;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
/**
* Run the same tests as TestSchemaSerialization, but using the SchamaBean
* as a means of transfer.
*/
public class TestSchemaBean extends TestSchemaSerialization {
/**
* Override the transfer method to use the SchemaBean.
*
* @param schema
* @return
*/
@Override
protected Schema transfer(Schema schema) {
SchemaBean extractBean = new SchemaBean(schema);
JSONObject extractJson = extractBean.extract(true);
String transferredString = extractJson.toJSONString();
JSONObject restoreJson = (JSONObject) JSONValue.parse(transferredString);
SchemaBean restoreBean = new SchemaBean();
restoreBean.restore(restoreJson);
return restoreBean.getSchema();
}}

View File

@ -19,6 +19,9 @@
import junit.framework.TestCase;
import org.apache.sqoop.model.MSubmission;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.schema.type.Decimal;
import org.apache.sqoop.schema.type.Text;
import org.apache.sqoop.submission.SubmissionStatus;
import org.apache.sqoop.submission.counter.Counter;
import org.apache.sqoop.submission.counter.CounterGroup;
@ -356,6 +359,31 @@ public void testTransferCounters() {
assertEquals(222222, counter.getValue());
}
public void testTransferConnectorSchema() {
MSubmission source = new MSubmission();
source.setConnectorSchema(getSchema());
Schema target = transfer(source).getConnectorSchema();
assertNotNull(target);
assertEquals(getSchema(), target);
}
public void testTransferHioSchema() {
MSubmission source = new MSubmission();
source.setHioSchema(getSchema());
Schema target = transfer(source).getHioSchema();
assertNotNull(target);
assertEquals(getSchema(), target);
}
private Schema getSchema() {
return new Schema("schema")
.addColumn(new Text("col1"))
.addColumn(new Decimal("col2"))
;
}
/**
* Simulate transfer of MSubmission structure using SubmissionBean
*

View File

@ -0,0 +1,177 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.json.util;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.schema.type.Array;
import org.apache.sqoop.schema.type.Binary;
import org.apache.sqoop.schema.type.Bit;
import org.apache.sqoop.schema.type.Date;
import org.apache.sqoop.schema.type.DateTime;
import org.apache.sqoop.schema.type.Decimal;
import org.apache.sqoop.schema.type.Enum;
import org.apache.sqoop.schema.type.FixedPoint;
import org.apache.sqoop.schema.type.FloatingPoint;
import org.apache.sqoop.schema.type.Map;
import org.apache.sqoop.schema.type.Set;
import org.apache.sqoop.schema.type.Text;
import org.apache.sqoop.schema.type.Time;
import org.apache.sqoop.schema.type.Unsupported;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
/**
*
*/
public class TestSchemaSerialization {
@Test
public void testArray() {
Schema array = new Schema("array").addColumn(new Array("a", new Decimal()));
transferAndAssert(array);
}
@Test
public void testBinary() {
Schema binary = new Schema("b").addColumn(new Binary("A", 100L));
transferAndAssert(binary);
}
@Test
public void testBit() {
Schema bit = new Schema("b").addColumn(new Bit("B"));
transferAndAssert(bit);
}
@Test
public void testDate() {
Schema date = new Schema("d").addColumn(new Date("d"));
transferAndAssert(date);
}
@Test
public void testDateTime() {
Schema dateTime = new Schema("dt").addColumn(new DateTime("dt", Boolean.FALSE, Boolean.TRUE));
transferAndAssert(dateTime);
}
@Test
public void testDecimal() {
Schema decimal = new Schema("d").addColumn(new Decimal("d", 12L, 15L));
transferAndAssert(decimal);
}
@Test
public void testEnum() {
Schema e = new Schema("e").addColumn(new Enum("e", new Text()));
transferAndAssert(e);
}
@Test
public void testFixedPoint() {
Schema f = new Schema("f").addColumn(new FixedPoint("fp", 4L, Boolean.FALSE));
transferAndAssert(f);
}
@Test
public void testFloatingPoint() {
Schema fp = new Schema("fp").addColumn(new FloatingPoint("k", 4L));
transferAndAssert(fp);
}
@Test
public void testMap() {
Schema m = new Schema("m").addColumn(new Map("m", new Text(), new Decimal()));
transferAndAssert(m);
}
@Test
public void testSet() {
Schema s = new Schema("s").addColumn(new Set("b", new Binary()));
transferAndAssert(s);
}
@Test
public void testText() {
Schema t = new Schema("t").addColumn(new Text("x", 10L));
transferAndAssert(t);
}
@Test
public void testTime() {
Schema t = new Schema("t").addColumn(new Time("t", Boolean.FALSE));
transferAndAssert(t);
}
@Test
public void testUnsupported() {
Schema t = new Schema("t").addColumn(new Unsupported("u", 4L));
transferAndAssert(t);
}
@Test
public void testNullable() {
Schema nullable = new Schema("n").addColumn(new Text("x", Boolean.FALSE));
transferAndAssert(nullable);
}
@Test
public void testAllTypes() {
Schema allTypes = new Schema("all-types")
.addColumn(new Array("a", new Text()))
.addColumn(new Binary("b"))
.addColumn(new Bit("c"))
.addColumn(new Date("d"))
.addColumn(new DateTime("e"))
.addColumn(new Decimal("f"))
.addColumn(new Enum("g", new Text()))
.addColumn(new FixedPoint("h"))
.addColumn(new FloatingPoint("i"))
.addColumn(new Map("j", new Text(), new Text()))
.addColumn(new Set("k", new Text()))
.addColumn(new Text("l"))
.addColumn(new Time("m"))
.addColumn(new Unsupported("u"))
;
transferAndAssert(allTypes);
}
@Test
public void testComplex() {
Schema complex = new Schema("complex")
.addColumn(new Map(new Array(new Enum(new Text())), new Set(new Array(new Text()))).setName("a"))
;
transferAndAssert(complex);
}
private void transferAndAssert(Schema schema) {
Schema transferred = transfer(schema);
assertEquals(schema, transferred);
}
protected Schema transfer(Schema schema) {
JSONObject extractJson = SchemaSerialization.extractSchema(schema);
String transferredString = extractJson.toJSONString();
JSONObject restoreJson = (JSONObject) JSONValue.parse(transferredString);
return SchemaSerialization.restoreSchemna(restoreJson);
}
}

View File

@ -70,6 +70,9 @@ public enum GenericJdbcConnectorError implements ErrorCode {
/** Unsupported values in partition column */
GENERIC_JDBC_CONNECTOR_0015("Partition column contains unsupported values"),
/** Can't fetch schema */
GENERIC_JDBC_CONNECTOR_0016("Can't fetch schema"),
;
private final String message;

View File

@ -27,6 +27,7 @@
import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration;
import org.apache.sqoop.job.etl.Initializer;
import org.apache.sqoop.job.etl.InitializerContext;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.utils.ClassUtils;
public class GenericJdbcExportInitializer extends Initializer<ConnectionConfiguration, ExportJobConfiguration> {
@ -52,6 +53,11 @@ public List<String> getJars(InitializerContext context, ConnectionConfiguration
return jars;
}
@Override
public Schema getSchema(InitializerContext context, ConnectionConfiguration connectionConfiguration, ExportJobConfiguration exportJobConfiguration) {
return null;
}
private void configureJdbcProperties(MutableContext context, ConnectionConfiguration connectionConfig, ExportJobConfiguration jobConfig) {
String driver = connectionConfig.connection.jdbcDriver;
String url = connectionConfig.connection.connectionString;

View File

@ -29,9 +29,12 @@
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
import org.apache.sqoop.connector.jdbc.util.SqlTypesUtils;
import org.apache.sqoop.job.Constants;
import org.apache.sqoop.job.etl.Initializer;
import org.apache.sqoop.job.etl.InitializerContext;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.schema.type.Column;
import org.apache.sqoop.utils.ClassUtils;
public class GenericJdbcImportInitializer extends Initializer<ConnectionConfiguration, ImportJobConfiguration> {
@ -61,6 +64,55 @@ public List<String> getJars(InitializerContext context, ConnectionConfiguration
return jars;
}
@Override
public Schema getSchema(InitializerContext context, ConnectionConfiguration connectionConfiguration, ImportJobConfiguration importJobConfiguration) {
configureJdbcProperties(context.getContext(), connectionConfiguration, importJobConfiguration);
String schemaName = importJobConfiguration.table.tableName;
if(schemaName == null) {
schemaName = "Query";
}
Schema schema = new Schema(schemaName);
ResultSet rs = null;
ResultSetMetaData rsmt = null;
try {
rs = executor.executeQuery(
context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL)
.replace(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 0")
);
rsmt = rs.getMetaData();
for (int i = 1 ; i <= rsmt.getColumnCount(); i++) {
Column column = SqlTypesUtils.sqlTypeToAbstractType(rsmt.getColumnType(i));
String columnName = rsmt.getColumnName(i);
if (columnName == null || columnName.equals("")) {
columnName = rsmt.getColumnLabel(i);
if (null == columnName) {
columnName = "Column " + i;
}
}
column.setName(columnName);
schema.addColumn(column);
}
return schema;
} catch (SQLException e) {
throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0016, e);
} finally {
if(rs != null) {
try {
rs.close();
} catch (SQLException e) {
LOG.info("Ignoring exception while closing ResultSet", e);
}
}
}
}
private void configureJdbcProperties(MutableContext context, ConnectionConfiguration connectionConfig, ImportJobConfiguration jobConfig) {
String driver = connectionConfig.connection.jdbcDriver;
String url = connectionConfig.connection.connectionString;

View File

@ -0,0 +1,98 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.jdbc.util;
import org.apache.sqoop.schema.type.Column;
import org.apache.sqoop.schema.type.Binary;
import org.apache.sqoop.schema.type.Bit;
import org.apache.sqoop.schema.type.Date;
import org.apache.sqoop.schema.type.DateTime;
import org.apache.sqoop.schema.type.Decimal;
import org.apache.sqoop.schema.type.FixedPoint;
import org.apache.sqoop.schema.type.FloatingPoint;
import org.apache.sqoop.schema.type.Text;
import org.apache.sqoop.schema.type.Time;
import org.apache.sqoop.schema.type.Unsupported;
import java.sql.Types;
/**
* Utility class to work with SQL types.
*/
public class SqlTypesUtils {
/**
* Convert given java.sql.Types number into internal data type.
*
* @param sqlType java.sql.Types constant
* @return Concrete Column implementation
*/
public static Column sqlTypeToAbstractType(int sqlType) {
switch (sqlType) {
case Types.SMALLINT:
case Types.TINYINT:
case Types.INTEGER:
return new FixedPoint();
case Types.VARCHAR:
case Types.CHAR:
case Types.LONGVARCHAR:
case Types.NVARCHAR:
case Types.NCHAR:
case Types.LONGNVARCHAR:
return new Text();
case Types.DATE:
return new Date();
case Types.TIME:
return new Time();
case Types.TIMESTAMP:
return new DateTime();
case Types.CLOB:
case Types.FLOAT:
case Types.REAL:
case Types.DOUBLE:
return new FloatingPoint();
case Types.NUMERIC:
case Types.DECIMAL:
case Types.BIGINT:
return new Decimal();
case Types.BIT:
case Types.BOOLEAN:
return new Bit();
case Types.BINARY:
case Types.VARBINARY:
case Types.BLOB:
case Types.LONGVARBINARY:
return new Binary();
default:
return new Unsupported((long)sqlType);
}
}
private SqlTypesUtils() {
// Instantiation is prohibited
}
}

View File

@ -28,6 +28,10 @@
import org.apache.sqoop.job.Constants;
import org.apache.sqoop.job.etl.Initializer;
import org.apache.sqoop.job.etl.InitializerContext;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.schema.type.FixedPoint;
import org.apache.sqoop.schema.type.FloatingPoint;
import org.apache.sqoop.schema.type.Text;
public class TestImportInitializer extends TestCase {
@ -87,6 +91,20 @@ public void setUp() {
}
}
/**
* Return Schema representation for the testing table.
*
* @param name Name that should be used for the generated schema.
* @return
*/
public Schema getSchema(String name) {
return new Schema(name)
.addColumn(new FixedPoint("ICOL"))
.addColumn(new FloatingPoint("DCOL"))
.addColumn(new Text("VCOL"))
;
}
@Override
public void tearDown() {
executor.close();
@ -290,6 +308,49 @@ public void testTableSqlWithSchema() throws Exception {
String.valueOf((double)(START+NUMBER_OF_ROWS-1)));
}
@SuppressWarnings("unchecked")
public void testGetSchemaForTable() throws Exception {
ConnectionConfiguration connConf = new ConnectionConfiguration();
ImportJobConfiguration jobConf = new ImportJobConfiguration();
connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connConf.connection.connectionString = GenericJdbcTestConstants.URL;
jobConf.table.schemaName = schemaName;
jobConf.table.tableName = tableName;
jobConf.table.partitionColumn = "DCOL";
MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context);
@SuppressWarnings("rawtypes")
Initializer initializer = new GenericJdbcImportInitializer();
initializer.initialize(initializerContext, connConf, jobConf);
Schema schema = initializer.getSchema(initializerContext, connConf, jobConf);
assertEquals(getSchema(tableName), schema);
}
@SuppressWarnings("unchecked")
public void testGetSchemaForSql() throws Exception {
ConnectionConfiguration connConf = new ConnectionConfiguration();
ImportJobConfiguration jobConf = new ImportJobConfiguration();
connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connConf.connection.connectionString = GenericJdbcTestConstants.URL;
jobConf.table.schemaName = schemaName;
jobConf.table.sql = tableSql;
jobConf.table.partitionColumn = "DCOL";
MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context);
@SuppressWarnings("rawtypes")
Initializer initializer = new GenericJdbcImportInitializer();
initializer.initialize(initializerContext, connConf, jobConf);
Schema schema = initializer.getSchema(initializerContext, connConf, jobConf);
assertEquals(getSchema("Query"), schema);
}
@SuppressWarnings("unchecked")
public void testTableSqlWithTableColumnsWithSchema() throws Exception {
ConnectionConfiguration connConf = new ConnectionConfiguration();

View File

@ -355,6 +355,13 @@ public MSubmission submit(long jobId) {
request.getConfigConnectorConnection(),
request.getConfigConnectorJob()));
// Retrieve and persist the schema
request.getSummary().setConnectorSchema(initializer.getSchema(
initializerContext,
request.getConfigConnectorConnection(),
request.getConfigConnectorJob()
));
// Bootstrap job from framework perspective
switch (job.getType()) {
case IMPORT:

View File

@ -17,6 +17,8 @@
*/
package org.apache.sqoop.job.etl;
import org.apache.sqoop.schema.Schema;
import java.util.LinkedList;
import java.util.List;
@ -52,4 +54,8 @@ public List<String> getJars(InitializerContext context,
return new LinkedList<String>();
}
public abstract Schema getSchema(InitializerContext context,
ConnectionConfiguration connectionConfiguration,
JobConfiguration jobConfiguration);
}