-
Notifications
You must be signed in to change notification settings - Fork 34
Provide Struct data type support for oracle plugin #659
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
231c6ee
fe03c7b
5ab3cd8
3bfd8ba
219b85e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,9 +22,16 @@ | |
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import java.sql.Connection; | ||
| import java.sql.PreparedStatement; | ||
| import java.sql.ResultSet; | ||
| import java.sql.ResultSetMetaData; | ||
| import java.sql.SQLException; | ||
| import java.sql.Types; | ||
| import java.util.ArrayList; | ||
| import java.util.HashMap; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Set; | ||
| import javax.annotation.Nullable; | ||
|
|
||
|
|
@@ -45,6 +52,47 @@ public class OracleSourceSchemaReader extends CommonSchemaReader { | |
| public static final int LONG = -1; | ||
| public static final int LONG_RAW = -4; | ||
|
|
||
| /** | ||
| * Maps Oracle string data type inside UDT to their corresponding java.sql.Types integer constants | ||
| */ | ||
| private static final Map<String, Integer> DATA_TYPE_MAP = new HashMap<>(); | ||
| static { | ||
| DATA_TYPE_MAP.put("TIMESTAMP WITH LOCAL TZ", TIMESTAMP_LTZ); | ||
| DATA_TYPE_MAP.put("TIMESTAMP WITH TZ", TIMESTAMP_TZ); | ||
| DATA_TYPE_MAP.put("TIMESTAMP", Types.TIMESTAMP); | ||
| DATA_TYPE_MAP.put("DATE", Types.DATE); | ||
| DATA_TYPE_MAP.put("TIME", Types.TIME); | ||
| DATA_TYPE_MAP.put("FLOAT", Types.FLOAT); | ||
| DATA_TYPE_MAP.put("BINARY_FLOAT", BINARY_FLOAT); | ||
| DATA_TYPE_MAP.put("REAL", Types.REAL); | ||
| DATA_TYPE_MAP.put("BINARY_DOUBLE", BINARY_DOUBLE); | ||
| DATA_TYPE_MAP.put("DOUBLE", Types.DOUBLE); | ||
| DATA_TYPE_MAP.put("BFILE", BFILE); | ||
| DATA_TYPE_MAP.put("RAW", LONG_RAW); | ||
| DATA_TYPE_MAP.put("LONG RAW", LONG_RAW); | ||
| DATA_TYPE_MAP.put("LONG", LONG); | ||
| DATA_TYPE_MAP.put("INTERVAL DAY TO SECOND", INTERVAL_DS); | ||
| DATA_TYPE_MAP.put("INTERVAL YEAR TO MONTH", INTERVAL_YM); | ||
| DATA_TYPE_MAP.put("XMLTYPE", Types.SQLXML); | ||
| DATA_TYPE_MAP.put("ARRAY", Types.ARRAY); | ||
| DATA_TYPE_MAP.put("ANYDATA", Types.JAVA_OBJECT); | ||
| DATA_TYPE_MAP.put("OTHER", Types.OTHER); | ||
| DATA_TYPE_MAP.put("NUMBER", Types.NUMERIC); | ||
| DATA_TYPE_MAP.put("DECIMAL", Types.DECIMAL); | ||
| DATA_TYPE_MAP.put("INTEGER", Types.INTEGER); | ||
| DATA_TYPE_MAP.put("ROWID", Types.ROWID); | ||
| DATA_TYPE_MAP.put("UROWID", Types.ROWID); | ||
| DATA_TYPE_MAP.put("BLOB", Types.BLOB); | ||
| DATA_TYPE_MAP.put("CLOB", Types.CLOB); | ||
| DATA_TYPE_MAP.put("NCLOB", Types.NCLOB); | ||
| DATA_TYPE_MAP.put("VARCHAR2", Types.VARCHAR); | ||
| DATA_TYPE_MAP.put("VARCHAR", Types.VARCHAR); | ||
| DATA_TYPE_MAP.put("CHAR", Types.CHAR); | ||
| DATA_TYPE_MAP.put("CHAR2", Types.CHAR); | ||
| DATA_TYPE_MAP.put("NCHAR", Types.NCHAR); | ||
| DATA_TYPE_MAP.put("NVARCHAR2", Types.NVARCHAR); | ||
| } | ||
|
|
||
| /** | ||
| * Logger instance for Oracle Schema reader. | ||
| */ | ||
|
|
@@ -71,6 +119,7 @@ public class OracleSourceSchemaReader extends CommonSchemaReader { | |
| private final Boolean isPrecisionlessNumAsDecimal; | ||
| private final Boolean isTimestampLtzFieldTimestamp; | ||
| private final Boolean isXmlTypeEnabled; | ||
| private Connection connection; | ||
|
|
||
| public OracleSourceSchemaReader() { | ||
| this(null, false, false, false, false); | ||
|
|
@@ -88,14 +137,27 @@ public OracleSourceSchemaReader(@Nullable String sessionID, boolean isTimestampO | |
| @Override | ||
| public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLException { | ||
| int sqlType = metadata.getColumnType(index); | ||
| String owner = (metadata.getColumnTypeName(index) != null | ||
| && metadata.getColumnTypeName(index).contains(".")) ? metadata.getColumnTypeName(index) | ||
| .substring(0, metadata.getColumnTypeName(index).lastIndexOf('.')) : null; | ||
|
|
||
| return getSchemaMapping(sqlType, metadata.getColumnClassName(index), metadata.getPrecision(index), | ||
| metadata.getScale(index), metadata.getColumnName(index), metadata.getColumnTypeName(index), | ||
| metadata.isSigned(index), owner, 0); | ||
| } | ||
|
|
||
| public Schema getSchemaMapping(int sqlType, String columnClassName, int columnPrecision, | ||
| int columnScale, String columnName, String columnTypeName, | ||
| boolean isSigned, String owner, int nestingLevel) throws SQLException { | ||
|
|
||
| switch (sqlType) { | ||
| case TIMESTAMP_TZ: | ||
| return isTimestampOldBehavior ? Schema.of(Schema.Type.STRING) : Schema.of(Schema.LogicalType.TIMESTAMP_MICROS); | ||
| case TIMESTAMP_LTZ: | ||
| return getTimestampLtzSchema(); | ||
| case Types.TIMESTAMP: | ||
| return isTimestampOldBehavior ? super.getSchema(metadata, index) : Schema.of(Schema.LogicalType.DATETIME); | ||
| return isTimestampOldBehavior ? super.getSchema(columnTypeName, sqlType, | ||
| columnPrecision, columnScale, columnName, isSigned) : Schema.of(Schema.LogicalType.DATETIME); | ||
| case BINARY_FLOAT: | ||
| return Schema.of(Schema.Type.FLOAT); | ||
| case BINARY_DOUBLE: | ||
|
|
@@ -109,15 +171,16 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti | |
| return Schema.of(Schema.Type.STRING); | ||
| case Types.SQLXML: | ||
| // Enabling XML type support for DTS connectors only as it is not in working state in CDAP plugin. | ||
| return isXmlTypeEnabled ? Schema.of(Schema.Type.STRING) : super.getSchema(metadata, index); | ||
| return isXmlTypeEnabled ? Schema.of(Schema.Type.STRING) : super.getSchema(columnTypeName, | ||
| sqlType, columnPrecision, columnScale, columnName, isSigned); | ||
| case Types.NUMERIC: | ||
| case Types.DECIMAL: | ||
| // FLOAT and REAL are returned as java.sql.Types.NUMERIC but with value that is a java.lang.Double | ||
| if (Double.class.getTypeName().equals(metadata.getColumnClassName(index))) { | ||
| if (Double.class.getTypeName().equals(columnClassName)) { | ||
| return Schema.of(Schema.Type.DOUBLE); | ||
| } else { | ||
| int precision = metadata.getPrecision(index); // total number of digits | ||
| int scale = metadata.getScale(index); // digits after the decimal point | ||
| int precision = columnPrecision; // total number of digits | ||
| int scale = columnScale; // digits after the decimal point | ||
| // For a Number type without specified precision and scale, precision will be 0 and scale will be -127 | ||
| if (precision == 0) { | ||
| // reference : https://docs.oracle.com/cd/B28359_01/server.111/b28318/datatype.htm#CNCPT1832 | ||
|
|
@@ -128,23 +191,91 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti | |
| + "there may be a precision loss while running the pipeline. " | ||
| + "Please define an output precision and scale for field '%s' to avoid " | ||
| + "precision loss.", | ||
| metadata.getColumnTypeName(index), | ||
| metadata.getColumnName(index))); | ||
| columnTypeName, columnName)); | ||
| return Schema.decimalOf(precision, scale); | ||
| } else { | ||
| LOG.warn(String.format("Field '%s' is a %s type without precision and scale, " | ||
| + "converting into STRING type to avoid any precision loss.", | ||
| metadata.getColumnName(index), | ||
| metadata.getColumnTypeName(index), | ||
| metadata.getColumnName(index))); | ||
| columnName, columnTypeName, columnName)); | ||
| return Schema.of(Schema.Type.STRING); | ||
| } | ||
| } | ||
| return Schema.decimalOf(precision, scale); | ||
| } | ||
| case Types.STRUCT: | ||
| if (connection == null) { | ||
| throw new SQLException("Cannot resolve STRUCT schema without a database connection. " | ||
| + "Use getSchemaFields(ResultSet) to enable STRUCT type resolution."); | ||
| } | ||
| if (nestingLevel >= 4) { | ||
| throw new IllegalArgumentException(String.format("Cannot resolve STRUCT schema for attribute %s with " + | ||
| "nested structure depth more than 4.", columnName)); | ||
| } | ||
| return getStructSchema(connection, columnTypeName, owner, nestingLevel); | ||
| default: | ||
| return super.getSchema(metadata, index); | ||
| return super.getSchema(columnTypeName, sqlType, columnPrecision, columnScale, columnName, isSigned); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public List<Schema.Field> getSchemaFields(ResultSet resultSet) throws SQLException { | ||
| this.connection = resultSet.getStatement().getConnection(); | ||
| return super.getSchemaFields(resultSet); | ||
| } | ||
|
|
||
| /** | ||
| * Builds a CDAP RECORD schema for an Oracle STRUCT type by querying the | ||
| * database metadata | ||
| * for the type's attributes. | ||
| * | ||
| * @param connection the database connection | ||
| * @param typeName the Oracle type name (e.g., "ADDRESS_TYPE") | ||
| * @param owner the Owner of the user-defined data type | ||
| * @param level the level of nesting of the user-defined data type | ||
| * @return a CDAP RECORD schema with fields corresponding to the STRUCT's | ||
| * attributes | ||
| */ | ||
| private Schema getStructSchema(Connection connection, String typeName, String owner, int level) throws SQLException { | ||
| List<Schema.Field> fields = new ArrayList<>(); | ||
| String sql = "SELECT * FROM ALL_TYPE_ATTRS WHERE TYPE_NAME = ? AND OWNER = ? ORDER BY ATTR_NO"; | ||
|
|
||
| try (PreparedStatement stmt = connection.prepareStatement(sql)) { | ||
| stmt.setString(1, typeName.substring(typeName.lastIndexOf('.') + 1)); | ||
| stmt.setString(2, owner); | ||
|
|
||
| try (ResultSet attrRs = stmt.executeQuery()) { | ||
| while (attrRs.next()) { | ||
| String attrName = attrRs.getString("ATTR_NAME"); | ||
| String attrTypeName = attrRs.getString("ATTR_TYPE_NAME"); | ||
| int attrSize = attrRs.getInt("PRECISION"); | ||
| int attrScale = attrRs.getInt("SCALE"); | ||
| Integer sqlType = DATA_TYPE_MAP.getOrDefault(attrTypeName, null); | ||
|
|
||
| int nextLevel = level; | ||
| if (sqlType == null) { | ||
| owner = attrRs.getString("ATTR_TYPE_OWNER"); | ||
| if (owner == null || owner.isEmpty()) { | ||
| throw new SQLException(String.format("Attribute '%s' is not a primitive type, but it lacks a type " + | ||
| "owner. Therefore, it cannot be resolved as a STRUCT type. ", attrName)); | ||
| } | ||
| sqlType = Types.STRUCT; | ||
| nextLevel = level + 1; | ||
| } | ||
| Schema attrSchema = getSchemaMapping(sqlType, null, attrSize, | ||
| attrScale, attrName, attrTypeName, true, owner, nextLevel); | ||
| fields.add(Schema.Field.of(attrName, attrSchema)); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if (fields.isEmpty()) { | ||
| throw new SQLException(String.format( | ||
| "No attributes found for Oracle STRUCT type '%s'. " | ||
| + "Ensure the type exists and is accessible.", | ||
| typeName)); | ||
| } | ||
|
|
||
| return Schema.recordOf(typeName, fields); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. CDAP String recordName = typeName.contains(".") ? typeName.substring(typeName.lastIndexOf('.') + 1) : typeName;
return Schema.recordOf(recordName, fields); |
||
| } | ||
|
|
||
| private Schema getTimestampLtzSchema() { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this static?