Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti
metadata.isSigned(index), true);
}

public Schema getSchema(String typeName, int sqlType, int precision, int scale, String columnName,
boolean isSigned) throws SQLException {
return DBUtils.getSchema(typeName, sqlType, precision, scale, columnName, isSigned, true);
}

@Override
public boolean shouldIgnoreColumn(ResultSetMetaData metadata, int index) throws SQLException {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,14 @@
import java.lang.reflect.InvocationTargetException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.sql.Blob;
import java.sql.Clob;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Struct;
import java.sql.Timestamp;
import java.sql.Types;
import java.time.LocalDateTime;
Expand Down Expand Up @@ -106,7 +109,7 @@ record = recordBuilder.build();
@Override
protected void handleField(ResultSet resultSet, StructuredRecord.Builder recordBuilder, Schema.Field field,
int columnIndex, int sqlType, int sqlPrecision, int sqlScale) throws SQLException {
if (OracleSourceSchemaReader.ORACLE_TYPES.contains(sqlType) || sqlType == Types.NCLOB) {
if (OracleSourceSchemaReader.ORACLE_TYPES.contains(sqlType) || sqlType == Types.NCLOB || sqlType == Types.STRUCT) {
handleOracleSpecificType(resultSet, recordBuilder, field, columnIndex, sqlType, sqlPrecision, sqlScale);
} else {
setField(resultSet, recordBuilder, field, columnIndex, sqlType, sqlPrecision, sqlScale);
Expand Down Expand Up @@ -232,11 +235,15 @@ private Object createOracleTimestamp(Connection connection, String timestampStri
*/
private byte[] getBfileBytes(ResultSet resultSet, String columnName) throws SQLException {
Object bfile = resultSet.getObject(columnName);
return getBfileBytes(bfile, columnName);
}

public static byte[] getBfileBytes(Object bfile, String columnName) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this static?

if (bfile == null) {
return null;
}
try {
ClassLoader classLoader = resultSet.getClass().getClassLoader();
ClassLoader classLoader = bfile.getClass().getClassLoader();
Class<?> oracleBfileClass = classLoader.loadClass("oracle.jdbc.OracleBfile");
boolean isFileExist = (boolean) oracleBfileClass.getMethod("fileExists").invoke(bfile);
if (!isFileExist) {
Expand Down Expand Up @@ -341,6 +348,12 @@ private void handleOracleSpecificType(ResultSet resultSet, StructuredRecord.Buil
case OracleSourceSchemaReader.LONG_RAW:
recordBuilder.set(field.getName(), resultSet.getBytes(columnIndex));
break;
case Types.STRUCT:
Struct structValue = (Struct) resultSet.getObject(columnIndex);
if (structValue != null) {
recordBuilder.set(field.getName(), convertStructToRecord(structValue, nonNullSchema, resultSet));
}
break;
case Types.DECIMAL:
case Types.NUMERIC:
// This is the only way to differentiate FLOAT/REAL columns from other numeric columns, that based on NUMBER.
Expand Down Expand Up @@ -371,6 +384,35 @@ private void handleOracleSpecificType(ResultSet resultSet, StructuredRecord.Buil
}
}

private StructuredRecord convertStructToRecord(Struct struct, Schema schema, ResultSet resultSet)
throws SQLException {
Object[] attributes = struct.getAttributes();
List<Schema.Field> fields = schema.getFields();
StructuredRecord.Builder builder = StructuredRecord.builder(schema);

for (int index = 0; index < attributes.length; index++) {
Schema.Field field = fields.get(index);
Object attrValue = attributes[index];

if (attrValue == null) {
builder.set(field.getName(), null);
continue;
}
// If it is an internal nested STRUCT, recurse down
if (attrValue instanceof Struct) {
Schema fieldSchema = field.getSchema().isNullable() ? field.getSchema().getNonNullable() : field.getSchema();
builder.set(field.getName(), convertStructToRecord((Struct) attrValue, fieldSchema, resultSet));
continue;
}

String attrClassName = attrValue.getClass().getName();
Schema fieldSchema = field.getSchema().isNullable() ? field.getSchema().getNonNullable() : field.getSchema();

OracleStructAttributeConverters.convertValue(builder, field, fieldSchema, attrValue, attrClassName);
}
return builder.build();
}

/**
* Get the scale set in Non-nullable schema associated with the schema
* */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,16 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Types;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;

Expand All @@ -45,6 +52,47 @@ public class OracleSourceSchemaReader extends CommonSchemaReader {
public static final int LONG = -1;
public static final int LONG_RAW = -4;

/**
* Maps Oracle string data type inside UDT to their corresponding java.sql.Types integer constants
*/
private static final Map<String, Integer> DATA_TYPE_MAP = new HashMap<>();
static {
DATA_TYPE_MAP.put("TIMESTAMP WITH LOCAL TZ", TIMESTAMP_LTZ);
DATA_TYPE_MAP.put("TIMESTAMP WITH TZ", TIMESTAMP_TZ);
DATA_TYPE_MAP.put("TIMESTAMP", Types.TIMESTAMP);
DATA_TYPE_MAP.put("DATE", Types.DATE);
DATA_TYPE_MAP.put("TIME", Types.TIME);
DATA_TYPE_MAP.put("FLOAT", Types.FLOAT);
DATA_TYPE_MAP.put("BINARY_FLOAT", BINARY_FLOAT);
DATA_TYPE_MAP.put("REAL", Types.REAL);
DATA_TYPE_MAP.put("BINARY_DOUBLE", BINARY_DOUBLE);
DATA_TYPE_MAP.put("DOUBLE", Types.DOUBLE);
DATA_TYPE_MAP.put("BFILE", BFILE);
DATA_TYPE_MAP.put("RAW", LONG_RAW);
DATA_TYPE_MAP.put("LONG RAW", LONG_RAW);
DATA_TYPE_MAP.put("LONG", LONG);
DATA_TYPE_MAP.put("INTERVAL DAY TO SECOND", INTERVAL_DS);
DATA_TYPE_MAP.put("INTERVAL YEAR TO MONTH", INTERVAL_YM);
DATA_TYPE_MAP.put("XMLTYPE", Types.SQLXML);
DATA_TYPE_MAP.put("ARRAY", Types.ARRAY);
DATA_TYPE_MAP.put("ANYDATA", Types.JAVA_OBJECT);
DATA_TYPE_MAP.put("OTHER", Types.OTHER);
DATA_TYPE_MAP.put("NUMBER", Types.NUMERIC);
DATA_TYPE_MAP.put("DECIMAL", Types.DECIMAL);
DATA_TYPE_MAP.put("INTEGER", Types.INTEGER);
DATA_TYPE_MAP.put("ROWID", Types.ROWID);
DATA_TYPE_MAP.put("UROWID", Types.ROWID);
DATA_TYPE_MAP.put("BLOB", Types.BLOB);
DATA_TYPE_MAP.put("CLOB", Types.CLOB);
DATA_TYPE_MAP.put("NCLOB", Types.NCLOB);
DATA_TYPE_MAP.put("VARCHAR2", Types.VARCHAR);
DATA_TYPE_MAP.put("VARCHAR", Types.VARCHAR);
DATA_TYPE_MAP.put("CHAR", Types.CHAR);
DATA_TYPE_MAP.put("CHAR2", Types.CHAR);
DATA_TYPE_MAP.put("NCHAR", Types.NCHAR);
DATA_TYPE_MAP.put("NVARCHAR2", Types.NVARCHAR);
}

/**
* Logger instance for Oracle Schema reader.
*/
Expand All @@ -71,6 +119,7 @@ public class OracleSourceSchemaReader extends CommonSchemaReader {
private final Boolean isPrecisionlessNumAsDecimal;
private final Boolean isTimestampLtzFieldTimestamp;
private final Boolean isXmlTypeEnabled;
private Connection connection;

public OracleSourceSchemaReader() {
this(null, false, false, false, false);
Expand All @@ -88,14 +137,27 @@ public OracleSourceSchemaReader(@Nullable String sessionID, boolean isTimestampO
@Override
public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLException {
int sqlType = metadata.getColumnType(index);
String owner = (metadata.getColumnTypeName(index) != null
&& metadata.getColumnTypeName(index).contains(".")) ? metadata.getColumnTypeName(index)
.substring(0, metadata.getColumnTypeName(index).lastIndexOf('.')) : null;

return getSchemaMapping(sqlType, metadata.getColumnClassName(index), metadata.getPrecision(index),
metadata.getScale(index), metadata.getColumnName(index), metadata.getColumnTypeName(index),
metadata.isSigned(index), owner, 0);
}

public Schema getSchemaMapping(int sqlType, String columnClassName, int columnPrecision,
int columnScale, String columnName, String columnTypeName,
boolean isSigned, String owner, int nestingLevel) throws SQLException {

switch (sqlType) {
case TIMESTAMP_TZ:
return isTimestampOldBehavior ? Schema.of(Schema.Type.STRING) : Schema.of(Schema.LogicalType.TIMESTAMP_MICROS);
case TIMESTAMP_LTZ:
return getTimestampLtzSchema();
case Types.TIMESTAMP:
return isTimestampOldBehavior ? super.getSchema(metadata, index) : Schema.of(Schema.LogicalType.DATETIME);
return isTimestampOldBehavior ? super.getSchema(columnTypeName, sqlType,
columnPrecision, columnScale, columnName, isSigned) : Schema.of(Schema.LogicalType.DATETIME);
case BINARY_FLOAT:
return Schema.of(Schema.Type.FLOAT);
case BINARY_DOUBLE:
Expand All @@ -109,15 +171,16 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti
return Schema.of(Schema.Type.STRING);
case Types.SQLXML:
// Enabling XML type support for DTS connectors only as it is not in working state in CDAP plugin.
return isXmlTypeEnabled ? Schema.of(Schema.Type.STRING) : super.getSchema(metadata, index);
return isXmlTypeEnabled ? Schema.of(Schema.Type.STRING) : super.getSchema(columnTypeName,
sqlType, columnPrecision, columnScale, columnName, isSigned);
case Types.NUMERIC:
case Types.DECIMAL:
// FLOAT and REAL are returned as java.sql.Types.NUMERIC but with value that is a java.lang.Double
if (Double.class.getTypeName().equals(metadata.getColumnClassName(index))) {
if (Double.class.getTypeName().equals(columnClassName)) {
return Schema.of(Schema.Type.DOUBLE);
} else {
int precision = metadata.getPrecision(index); // total number of digits
int scale = metadata.getScale(index); // digits after the decimal point
int precision = columnPrecision; // total number of digits
int scale = columnScale; // digits after the decimal point
// For a Number type without specified precision and scale, precision will be 0 and scale will be -127
if (precision == 0) {
// reference : https://docs.oracle.com/cd/B28359_01/server.111/b28318/datatype.htm#CNCPT1832
Expand All @@ -128,23 +191,91 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti
+ "there may be a precision loss while running the pipeline. "
+ "Please define an output precision and scale for field '%s' to avoid "
+ "precision loss.",
metadata.getColumnTypeName(index),
metadata.getColumnName(index)));
columnTypeName, columnName));
return Schema.decimalOf(precision, scale);
} else {
LOG.warn(String.format("Field '%s' is a %s type without precision and scale, "
+ "converting into STRING type to avoid any precision loss.",
metadata.getColumnName(index),
metadata.getColumnTypeName(index),
metadata.getColumnName(index)));
columnName, columnTypeName, columnName));
return Schema.of(Schema.Type.STRING);
}
}
return Schema.decimalOf(precision, scale);
}
case Types.STRUCT:
if (connection == null) {
throw new SQLException("Cannot resolve STRUCT schema without a database connection. "
+ "Use getSchemaFields(ResultSet) to enable STRUCT type resolution.");
}
if (nestingLevel >= 4) {
throw new IllegalArgumentException(String.format("Cannot resolve STRUCT schema for attribute %s with " +
"nested structure depth more than 4.", columnName));
}
return getStructSchema(connection, columnTypeName, owner, nestingLevel);
default:
return super.getSchema(metadata, index);
return super.getSchema(columnTypeName, sqlType, columnPrecision, columnScale, columnName, isSigned);
}
}

@Override
public List<Schema.Field> getSchemaFields(ResultSet resultSet) throws SQLException {
this.connection = resultSet.getStatement().getConnection();
return super.getSchemaFields(resultSet);
}

/**
* Builds a CDAP RECORD schema for an Oracle STRUCT type by querying the
* database metadata
* for the type's attributes.
*
* @param connection the database connection
* @param typeName the Oracle type name (e.g., "ADDRESS_TYPE")
* @param owner the Owner of the user-defined data type
* @param level the level of nesting of the user-defined data type
* @return a CDAP RECORD schema with fields corresponding to the STRUCT's
* attributes
*/
private Schema getStructSchema(Connection connection, String typeName, String owner, int level) throws SQLException {
List<Schema.Field> fields = new ArrayList<>();
String sql = "SELECT * FROM ALL_TYPE_ATTRS WHERE TYPE_NAME = ? AND OWNER = ? ORDER BY ATTR_NO";

try (PreparedStatement stmt = connection.prepareStatement(sql)) {
stmt.setString(1, typeName.substring(typeName.lastIndexOf('.') + 1));
stmt.setString(2, owner);

try (ResultSet attrRs = stmt.executeQuery()) {
while (attrRs.next()) {
String attrName = attrRs.getString("ATTR_NAME");
String attrTypeName = attrRs.getString("ATTR_TYPE_NAME");
int attrSize = attrRs.getInt("PRECISION");
int attrScale = attrRs.getInt("SCALE");
Integer sqlType = DATA_TYPE_MAP.getOrDefault(attrTypeName, null);

int nextLevel = level;
if (sqlType == null) {
owner = attrRs.getString("ATTR_TYPE_OWNER");
if (owner == null || owner.isEmpty()) {
throw new SQLException(String.format("Attribute '%s' is not a primitive type, but it lacks a type " +
"owner. Therefore, it cannot be resolved as a STRUCT type. ", attrName));
}
sqlType = Types.STRUCT;
nextLevel = level + 1;
}
Schema attrSchema = getSchemaMapping(sqlType, null, attrSize,
attrScale, attrName, attrTypeName, true, owner, nextLevel);
fields.add(Schema.Field.of(attrName, attrSchema));
}
}
}

if (fields.isEmpty()) {
throw new SQLException(String.format(
"No attributes found for Oracle STRUCT type '%s'. "
+ "Ensure the type exists and is accessible.",
typeName));
}

return Schema.recordOf(typeName, fields);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

CDAP Schema.recordOf requires the record name to be a valid identifier (matching [A-Za-z_][A-Za-z0-9_]*). If typeName is fully qualified (e.g., MY_SCHEMA.MY_TYPE), this call will throw an IllegalArgumentException due to the dot. Consider using only the simple type name part for the record name.

    String recordName = typeName.contains(".") ? typeName.substring(typeName.lastIndexOf('.') + 1) : typeName;
    return Schema.recordOf(recordName, fields);

}

private Schema getTimestampLtzSchema() {
Expand Down
Loading
Loading