diff --git a/database-commons/src/main/java/io/cdap/plugin/db/CommonSchemaReader.java b/database-commons/src/main/java/io/cdap/plugin/db/CommonSchemaReader.java index 28c56db8c..b2348ee5e 100644 --- a/database-commons/src/main/java/io/cdap/plugin/db/CommonSchemaReader.java +++ b/database-commons/src/main/java/io/cdap/plugin/db/CommonSchemaReader.java @@ -57,6 +57,11 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti metadata.isSigned(index), true); } + public Schema getSchema(String typeName, int sqlType, int precision, int scale, String columnName, + boolean isSigned) throws SQLException { + return DBUtils.getSchema(typeName, sqlType, precision, scale, columnName, isSigned, true); + } + @Override public boolean shouldIgnoreColumn(ResultSetMetaData metadata, int index) throws SQLException { return false; diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceDBRecord.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceDBRecord.java index 44131a01b..010c8964a 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceDBRecord.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceDBRecord.java @@ -30,11 +30,14 @@ import java.lang.reflect.InvocationTargetException; import java.math.BigDecimal; import java.nio.ByteBuffer; +import java.sql.Blob; +import java.sql.Clob; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; +import java.sql.Struct; import java.sql.Timestamp; import java.sql.Types; import java.time.LocalDateTime; @@ -106,7 +109,7 @@ record = recordBuilder.build(); @Override protected void handleField(ResultSet resultSet, StructuredRecord.Builder recordBuilder, Schema.Field field, int columnIndex, int sqlType, int sqlPrecision, int sqlScale) throws SQLException { - if (OracleSourceSchemaReader.ORACLE_TYPES.contains(sqlType) || sqlType == Types.NCLOB) { + if (OracleSourceSchemaReader.ORACLE_TYPES.contains(sqlType) || sqlType == Types.NCLOB || sqlType == Types.STRUCT) { handleOracleSpecificType(resultSet, recordBuilder, field, columnIndex, sqlType, sqlPrecision, sqlScale); } else { setField(resultSet, recordBuilder, field, columnIndex, sqlType, sqlPrecision, sqlScale); @@ -232,11 +235,15 @@ private Object createOracleTimestamp(Connection connection, String timestampStri */ private byte[] getBfileBytes(ResultSet resultSet, String columnName) throws SQLException { Object bfile = resultSet.getObject(columnName); + return getBfileBytes(bfile, columnName); + } + + public static byte[] getBfileBytes(Object bfile, String columnName) { if (bfile == null) { return null; } try { - ClassLoader classLoader = resultSet.getClass().getClassLoader(); + ClassLoader classLoader = bfile.getClass().getClassLoader(); Class oracleBfileClass = classLoader.loadClass("oracle.jdbc.OracleBfile"); boolean isFileExist = (boolean) oracleBfileClass.getMethod("fileExists").invoke(bfile); if (!isFileExist) { @@ -341,6 +348,12 @@ private void handleOracleSpecificType(ResultSet resultSet, StructuredRecord.Buil case OracleSourceSchemaReader.LONG_RAW: recordBuilder.set(field.getName(), resultSet.getBytes(columnIndex)); break; + case Types.STRUCT: + Struct structValue = (Struct) resultSet.getObject(columnIndex); + if (structValue != null) { + recordBuilder.set(field.getName(), convertStructToRecord(structValue, nonNullSchema, resultSet)); + } + break; case Types.DECIMAL: case Types.NUMERIC: // This is the only way to differentiate FLOAT/REAL columns from other numeric columns, that based on NUMBER. @@ -371,6 +384,35 @@ private void handleOracleSpecificType(ResultSet resultSet, StructuredRecord.Buil } } + private StructuredRecord convertStructToRecord(Struct struct, Schema schema, ResultSet resultSet) + throws SQLException { + Object[] attributes = struct.getAttributes(); + List fields = schema.getFields(); + StructuredRecord.Builder builder = StructuredRecord.builder(schema); + + for (int index = 0; index < attributes.length; index++) { + Schema.Field field = fields.get(index); + Object attrValue = attributes[index]; + + if (attrValue == null) { + builder.set(field.getName(), null); + continue; + } + // If it is an internal nested STRUCT, recurse down + if (attrValue instanceof Struct) { + Schema fieldSchema = field.getSchema().isNullable() ? field.getSchema().getNonNullable() : field.getSchema(); + builder.set(field.getName(), convertStructToRecord((Struct) attrValue, fieldSchema, resultSet)); + continue; + } + + String attrClassName = attrValue.getClass().getName(); + Schema fieldSchema = field.getSchema().isNullable() ? field.getSchema().getNonNullable() : field.getSchema(); + + OracleStructAttributeConverters.convertValue(builder, field, fieldSchema, attrValue, attrClassName); + } + return builder.build(); + } + /** * Get the scale set in Non-nullable schema associated with the schema * */ diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java index b23dfa031..f4f308689 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java @@ -22,9 +22,16 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Types; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Set; import javax.annotation.Nullable; @@ -45,6 +52,47 @@ public class OracleSourceSchemaReader extends CommonSchemaReader { public static final int LONG = -1; public static final int LONG_RAW = -4; + /** + * Maps Oracle string data type inside UDT to their corresponding java.sql.Types integer constants + */ + private static final Map DATA_TYPE_MAP = new HashMap<>(); + static { + DATA_TYPE_MAP.put("TIMESTAMP WITH LOCAL TZ", TIMESTAMP_LTZ); + DATA_TYPE_MAP.put("TIMESTAMP WITH TZ", TIMESTAMP_TZ); + DATA_TYPE_MAP.put("TIMESTAMP", Types.TIMESTAMP); + DATA_TYPE_MAP.put("DATE", Types.DATE); + DATA_TYPE_MAP.put("TIME", Types.TIME); + DATA_TYPE_MAP.put("FLOAT", Types.FLOAT); + DATA_TYPE_MAP.put("BINARY_FLOAT", BINARY_FLOAT); + DATA_TYPE_MAP.put("REAL", Types.REAL); + DATA_TYPE_MAP.put("BINARY_DOUBLE", BINARY_DOUBLE); + DATA_TYPE_MAP.put("DOUBLE", Types.DOUBLE); + DATA_TYPE_MAP.put("BFILE", BFILE); + DATA_TYPE_MAP.put("RAW", LONG_RAW); + DATA_TYPE_MAP.put("LONG RAW", LONG_RAW); + DATA_TYPE_MAP.put("LONG", LONG); + DATA_TYPE_MAP.put("INTERVAL DAY TO SECOND", INTERVAL_DS); + DATA_TYPE_MAP.put("INTERVAL YEAR TO MONTH", INTERVAL_YM); + DATA_TYPE_MAP.put("XMLTYPE", Types.SQLXML); + DATA_TYPE_MAP.put("ARRAY", Types.ARRAY); + DATA_TYPE_MAP.put("ANYDATA", Types.JAVA_OBJECT); + DATA_TYPE_MAP.put("OTHER", Types.OTHER); + DATA_TYPE_MAP.put("NUMBER", Types.NUMERIC); + DATA_TYPE_MAP.put("DECIMAL", Types.DECIMAL); + DATA_TYPE_MAP.put("INTEGER", Types.INTEGER); + DATA_TYPE_MAP.put("ROWID", Types.ROWID); + DATA_TYPE_MAP.put("UROWID", Types.ROWID); + DATA_TYPE_MAP.put("BLOB", Types.BLOB); + DATA_TYPE_MAP.put("CLOB", Types.CLOB); + DATA_TYPE_MAP.put("NCLOB", Types.NCLOB); + DATA_TYPE_MAP.put("VARCHAR2", Types.VARCHAR); + DATA_TYPE_MAP.put("VARCHAR", Types.VARCHAR); + DATA_TYPE_MAP.put("CHAR", Types.CHAR); + DATA_TYPE_MAP.put("CHAR2", Types.CHAR); + DATA_TYPE_MAP.put("NCHAR", Types.NCHAR); + DATA_TYPE_MAP.put("NVARCHAR2", Types.NVARCHAR); + } + /** * Logger instance for Oracle Schema reader. */ @@ -71,6 +119,7 @@ public class OracleSourceSchemaReader extends CommonSchemaReader { private final Boolean isPrecisionlessNumAsDecimal; private final Boolean isTimestampLtzFieldTimestamp; private final Boolean isXmlTypeEnabled; + private Connection connection; public OracleSourceSchemaReader() { this(null, false, false, false, false); @@ -88,6 +137,18 @@ public OracleSourceSchemaReader(@Nullable String sessionID, boolean isTimestampO @Override public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLException { int sqlType = metadata.getColumnType(index); + String owner = (metadata.getColumnTypeName(index) != null + && metadata.getColumnTypeName(index).contains(".")) ? metadata.getColumnTypeName(index) + .substring(0, metadata.getColumnTypeName(index).lastIndexOf('.')) : null; + + return getSchemaMapping(sqlType, metadata.getColumnClassName(index), metadata.getPrecision(index), + metadata.getScale(index), metadata.getColumnName(index), metadata.getColumnTypeName(index), + metadata.isSigned(index), owner, 0); + } + + public Schema getSchemaMapping(int sqlType, String columnClassName, int columnPrecision, + int columnScale, String columnName, String columnTypeName, + boolean isSigned, String owner, int nestingLevel) throws SQLException { switch (sqlType) { case TIMESTAMP_TZ: @@ -95,7 +156,8 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti case TIMESTAMP_LTZ: return getTimestampLtzSchema(); case Types.TIMESTAMP: - return isTimestampOldBehavior ? super.getSchema(metadata, index) : Schema.of(Schema.LogicalType.DATETIME); + return isTimestampOldBehavior ? super.getSchema(columnTypeName, sqlType, + columnPrecision, columnScale, columnName, isSigned) : Schema.of(Schema.LogicalType.DATETIME); case BINARY_FLOAT: return Schema.of(Schema.Type.FLOAT); case BINARY_DOUBLE: @@ -109,15 +171,16 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti return Schema.of(Schema.Type.STRING); case Types.SQLXML: // Enabling XML type support for DTS connectors only as it is not in working state in CDAP plugin. - return isXmlTypeEnabled ? Schema.of(Schema.Type.STRING) : super.getSchema(metadata, index); + return isXmlTypeEnabled ? Schema.of(Schema.Type.STRING) : super.getSchema(columnTypeName, + sqlType, columnPrecision, columnScale, columnName, isSigned); case Types.NUMERIC: case Types.DECIMAL: // FLOAT and REAL are returned as java.sql.Types.NUMERIC but with value that is a java.lang.Double - if (Double.class.getTypeName().equals(metadata.getColumnClassName(index))) { + if (Double.class.getTypeName().equals(columnClassName)) { return Schema.of(Schema.Type.DOUBLE); } else { - int precision = metadata.getPrecision(index); // total number of digits - int scale = metadata.getScale(index); // digits after the decimal point + int precision = columnPrecision; // total number of digits + int scale = columnScale; // digits after the decimal point // For a Number type without specified precision and scale, precision will be 0 and scale will be -127 if (precision == 0) { // reference : https://docs.oracle.com/cd/B28359_01/server.111/b28318/datatype.htm#CNCPT1832 @@ -128,23 +191,91 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti + "there may be a precision loss while running the pipeline. " + "Please define an output precision and scale for field '%s' to avoid " + "precision loss.", - metadata.getColumnTypeName(index), - metadata.getColumnName(index))); + columnTypeName, columnName)); return Schema.decimalOf(precision, scale); } else { LOG.warn(String.format("Field '%s' is a %s type without precision and scale, " + "converting into STRING type to avoid any precision loss.", - metadata.getColumnName(index), - metadata.getColumnTypeName(index), - metadata.getColumnName(index))); + columnName, columnTypeName, columnName)); return Schema.of(Schema.Type.STRING); } } return Schema.decimalOf(precision, scale); } + case Types.STRUCT: + if (connection == null) { + throw new SQLException("Cannot resolve STRUCT schema without a database connection. " + + "Use getSchemaFields(ResultSet) to enable STRUCT type resolution."); + } + if (nestingLevel >= 4) { + throw new IllegalArgumentException(String.format("Cannot resolve STRUCT schema for attribute %s with " + + "nested structure depth more than 4.", columnName)); + } + return getStructSchema(connection, columnTypeName, owner, nestingLevel); default: - return super.getSchema(metadata, index); + return super.getSchema(columnTypeName, sqlType, columnPrecision, columnScale, columnName, isSigned); + } + } + + @Override + public List getSchemaFields(ResultSet resultSet) throws SQLException { + this.connection = resultSet.getStatement().getConnection(); + return super.getSchemaFields(resultSet); + } + + /** + * Builds a CDAP RECORD schema for an Oracle STRUCT type by querying the + * database metadata + * for the type's attributes. + * + * @param connection the database connection + * @param typeName the Oracle type name (e.g., "ADDRESS_TYPE") + * @param owner the Owner of the user-defined data type + * @param level the level of nesting of the user-defined data type + * @return a CDAP RECORD schema with fields corresponding to the STRUCT's + * attributes + */ + private Schema getStructSchema(Connection connection, String typeName, String owner, int level) throws SQLException { + List fields = new ArrayList<>(); + String sql = "SELECT * FROM ALL_TYPE_ATTRS WHERE TYPE_NAME = ? AND OWNER = ? ORDER BY ATTR_NO"; + + try (PreparedStatement stmt = connection.prepareStatement(sql)) { + stmt.setString(1, typeName.substring(typeName.lastIndexOf('.') + 1)); + stmt.setString(2, owner); + + try (ResultSet attrRs = stmt.executeQuery()) { + while (attrRs.next()) { + String attrName = attrRs.getString("ATTR_NAME"); + String attrTypeName = attrRs.getString("ATTR_TYPE_NAME"); + int attrSize = attrRs.getInt("PRECISION"); + int attrScale = attrRs.getInt("SCALE"); + Integer sqlType = DATA_TYPE_MAP.getOrDefault(attrTypeName, null); + + int nextLevel = level; + if (sqlType == null) { + owner = attrRs.getString("ATTR_TYPE_OWNER"); + if (owner == null || owner.isEmpty()) { + throw new SQLException(String.format("Attribute '%s' is not a primitive type, but it lacks a type " + + "owner. Therefore, it cannot be resolved as a STRUCT type. ", attrName)); + } + sqlType = Types.STRUCT; + nextLevel = level + 1; + } + Schema attrSchema = getSchemaMapping(sqlType, null, attrSize, + attrScale, attrName, attrTypeName, true, owner, nextLevel); + fields.add(Schema.Field.of(attrName, attrSchema)); + } + } } + + if (fields.isEmpty()) { + throw new SQLException(String.format( + "No attributes found for Oracle STRUCT type '%s'. " + + "Ensure the type exists and is accessible.", + typeName)); + } + + return Schema.recordOf(typeName, fields); } private Schema getTimestampLtzSchema() { diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleStructAttributeConverters.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleStructAttributeConverters.java new file mode 100644 index 000000000..f684a5fb3 --- /dev/null +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleStructAttributeConverters.java @@ -0,0 +1,281 @@ +/* + * Copyright © 2026 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.oracle; + +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.data.schema.Schema; + +import java.math.BigDecimal; +import java.sql.Blob; +import java.sql.Clob; +import java.sql.SQLException; +import java.sql.SQLXML; +import java.sql.Timestamp; +import java.time.OffsetDateTime; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.util.Arrays; +import java.util.List; + +/** + * Standalone registry of AttributeConverter strategies for converting Oracle STRUCT attributes. + */ +public final class OracleStructAttributeConverters { + + /** + * Strategy interface for translating structured attributes to CDAP records. + */ + public interface AttributeConverter { + boolean canConvert(Object attrValue, String attrClassName); + void convert(StructuredRecord.Builder builder, Schema.Field field, Schema fieldSchema, + Object attrValue) throws SQLException; + } + + private static class FloatConverter implements AttributeConverter { + @Override + public boolean canConvert(Object attrValue, String attrClassName) { + return attrValue instanceof Float; + } + + @Override + public void convert(StructuredRecord.Builder builder, Schema.Field field, Schema fieldSchema, + Object attrValue) throws SQLException { + Float floatVal = (Float) attrValue; + builder.set(field.getName(), floatVal); + } + } + + private static class DoubleConverter implements AttributeConverter { + @Override + public boolean canConvert(Object attrValue, String attrClassName) { + return attrValue instanceof Double; + } + + @Override + public void convert(StructuredRecord.Builder builder, Schema.Field field, Schema fieldSchema, + Object attrValue) throws SQLException { + Double doubleVal = (Double) attrValue; + builder.set(field.getName(), doubleVal); + } + } + + private static class BigDecimalConverter implements AttributeConverter { + @Override + public boolean canConvert(Object attrValue, String attrClassName) { + return attrValue instanceof BigDecimal; + } + + @Override + public void convert(StructuredRecord.Builder builder, Schema.Field field, Schema fieldSchema, + Object attrValue) throws SQLException { + BigDecimal bigDecimal = (BigDecimal) attrValue; + if (Schema.LogicalType.DECIMAL.equals(fieldSchema.getLogicalType())) { + builder.setDecimal(field.getName(), bigDecimal.setScale(fieldSchema.getScale(), + java.math.RoundingMode.HALF_UP)); + } else if (Schema.Type.DOUBLE.equals(fieldSchema.getType())) { + builder.set(field.getName(), bigDecimal.doubleValue()); + } else if (Schema.Type.FLOAT.equals(fieldSchema.getType())) { + builder.set(field.getName(), bigDecimal.floatValue()); + } else if (Schema.Type.INT.equals(fieldSchema.getType())) { + builder.set(field.getName(), bigDecimal.intValue()); + } else { + builder.set(field.getName(), bigDecimal.toString()); + } + } + } + + private static class TimestampConverter implements AttributeConverter { + @Override + public boolean canConvert(Object attrValue, String attrClassName) { + return attrValue instanceof Timestamp; + } + + @Override + public void convert(StructuredRecord.Builder builder, Schema.Field field, Schema fieldSchema, + Object attrValue) throws SQLException { + Timestamp timestamp = (Timestamp) attrValue; + if (Schema.LogicalType.DATETIME.equals(fieldSchema.getLogicalType())) { + builder.setDateTime(field.getName(), timestamp.toLocalDateTime()); + } else if (Schema.LogicalType.DATE.equals(fieldSchema.getLogicalType())) { + builder.setDate(field.getName(), timestamp.toLocalDateTime().toLocalDate()); + } else { + builder.set(field.getName(), attrValue.toString()); + } + } + } + + private static class ZonedDateTimeConverter implements AttributeConverter { + @Override + public boolean canConvert(Object attrValue, String attrClassName) { + return attrValue instanceof OffsetDateTime || attrValue instanceof ZonedDateTime; + } + + @Override + public void convert(StructuredRecord.Builder builder, Schema.Field field, Schema fieldSchema, + Object attrValue) throws SQLException { + ZonedDateTime zonedDateTime = (attrValue instanceof OffsetDateTime) + ? ((OffsetDateTime) attrValue).atZoneSameInstant(ZoneId.of("UTC")) + : ((ZonedDateTime) attrValue).withZoneSameInstant(ZoneId.of("UTC")); + if (fieldSchema.getLogicalType() != null && + (Schema.LogicalType.TIMESTAMP_MICROS.equals(fieldSchema.getLogicalType()) || + Schema.LogicalType.TIMESTAMP_MILLIS.equals(fieldSchema.getLogicalType()))) { + builder.setTimestamp(field.getName(), zonedDateTime); + } else if (Schema.Type.LONG.equals(fieldSchema.getType())) { + builder.set(field.getName(), zonedDateTime.toInstant().toEpochMilli()); + } else { + builder.set(field.getName(), zonedDateTime.toString()); + } + } + } + + private static class ClobConverter implements AttributeConverter { + @Override + public boolean canConvert(Object attrValue, String attrClassName) { + return attrValue instanceof Clob; + } + + @Override + public void convert(StructuredRecord.Builder builder, Schema.Field field, Schema fieldSchema, + Object attrValue) throws SQLException { + Clob clob = (Clob) attrValue; + builder.set(field.getName(), clob.getSubString(1, (int) clob.length())); + } + } + + private static class BlobConverter implements AttributeConverter { + @Override + public boolean canConvert(Object attrValue, String attrClassName) { + return attrValue instanceof Blob; + } + + @Override + public void convert(StructuredRecord.Builder builder, Schema.Field field, Schema fieldSchema, + Object attrValue) throws SQLException { + Blob blob = (Blob) attrValue; + builder.set(field.getName(), blob.getBytes(1, (int) blob.length())); + } + } + + private static class OracleBfileConverter implements AttributeConverter { + @Override + public boolean canConvert(Object attrValue, String attrClassName) { + try { + ClassLoader oracleLoader = attrValue.getClass().getClassLoader(); + Class bfileInterface = oracleLoader.loadClass("oracle.jdbc.OracleBfile"); + return bfileInterface.isInstance(attrValue); + } catch (Exception e) { + return false; + } + } + + @Override + public void convert(StructuredRecord.Builder builder, Schema.Field field, Schema fieldSchema, + Object attrValue) throws SQLException { + builder.set(field.getName(), OracleSourceDBRecord.getBfileBytes(attrValue, field.getName())); + } + } + + private static class ByteArrayConverter implements AttributeConverter { + @Override + public boolean canConvert(Object attrValue, String attrClassName) { + return attrValue instanceof byte[]; + } + + @Override + public void convert(StructuredRecord.Builder builder, Schema.Field field, Schema fieldSchema, + Object attrValue) throws SQLException { + builder.set(field.getName(), (byte[]) attrValue); + } + } + + private static class OracleIntervalConverter implements AttributeConverter { + @Override + public boolean canConvert(Object attrValue, String attrClassName) { + return "oracle.sql.INTERVALDS".equals(attrClassName) || "oracle.sql.INTERVALYM".equals(attrClassName); + } + + @Override + public void convert(StructuredRecord.Builder builder, Schema.Field field, Schema fieldSchema, + Object attrValue) throws SQLException { + builder.set(field.getName(), attrValue.toString()); + } + } + + private static class SqlXmlConverter implements AttributeConverter { + + @Override + public boolean canConvert(Object attrValue, String attrClassName) { + return attrValue instanceof SQLXML; + } + + @Override + public void convert( + StructuredRecord.Builder builder, + Schema.Field field, + Schema fieldSchema, + Object attrValue) throws SQLException { + + SQLXML xml = (SQLXML) attrValue; + builder.set(field.getName(), xml.getString()); + } + } + + private static class DefaultConverter implements AttributeConverter { + @Override + public boolean canConvert(Object attrValue, String attrClassName) { + return true; + } + + @Override + public void convert(StructuredRecord.Builder builder, Schema.Field field, Schema fieldSchema, + Object attrValue) throws SQLException { + builder.set(field.getName(), attrValue); + } + } + + private static final List CONVERTERS = Arrays.asList( + new BigDecimalConverter(), + new TimestampConverter(), + new ZonedDateTimeConverter(), + new ClobConverter(), + new BlobConverter(), + new OracleBfileConverter(), + new ByteArrayConverter(), + new OracleIntervalConverter(), + new SqlXmlConverter(), + new FloatConverter(), + new DoubleConverter(), + new DefaultConverter() + ); + + private OracleStructAttributeConverters() { + // Private constructor to prevent instantiation. + } + + /** + * Translates an Oracle STRUCT attribute to a CDAP structured record field. + */ + public static void convertValue(StructuredRecord.Builder builder, Schema.Field field, Schema fieldSchema, + Object attrValue, String attrClassName) throws SQLException { + for (AttributeConverter converter : CONVERTERS) { + if (converter.canConvert(attrValue, attrClassName)) { + converter.convert(builder, field, fieldSchema, attrValue); + break; + } + } + } +} diff --git a/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleSchemaReaderTest.java b/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleSchemaReaderTest.java index 586ca1141..48f07c48b 100644 --- a/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleSchemaReaderTest.java +++ b/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleSchemaReaderTest.java @@ -25,9 +25,12 @@ import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; +import java.sql.Connection; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; +import java.sql.Statement; import java.sql.Types; import java.util.List; @@ -39,6 +42,12 @@ public void getSchema_timestampLTZFieldTrue_returnTimestamp() throws SQLExceptio ResultSet resultSet = Mockito.mock(ResultSet.class); ResultSetMetaData metadata = Mockito.mock(ResultSetMetaData.class); + Statement statement = Mockito.mock(Statement.class); + Connection connection = Mockito.mock(Connection.class); + + Mockito.when(resultSet.getMetaData()).thenReturn(metadata); + Mockito.when(resultSet.getStatement()).thenReturn(statement); + Mockito.when(statement.getConnection()).thenReturn(connection); Mockito.when(resultSet.getMetaData()).thenReturn(metadata); @@ -70,9 +79,12 @@ public void getSchema_timestampLTZFieldFalse_returnDatetime() throws SQLExceptio ResultSet resultSet = Mockito.mock(ResultSet.class); ResultSetMetaData metadata = Mockito.mock(ResultSetMetaData.class); + Statement statement = Mockito.mock(Statement.class); + Connection connection = Mockito.mock(Connection.class); Mockito.when(resultSet.getMetaData()).thenReturn(metadata); - + Mockito.when(resultSet.getStatement()).thenReturn(statement); + Mockito.when(statement.getConnection()).thenReturn(connection); Mockito.when(metadata.getColumnCount()).thenReturn(2); // -101 is for TIMESTAMP_TZ Mockito.when(metadata.getColumnType(1)).thenReturn(-101); @@ -94,15 +106,59 @@ public void getSchema_timestampLTZFieldFalse_returnDatetime() throws SQLExceptio Assert.assertEquals(expectedSchemaFields.get(1).getSchema(), actualSchemaFields.get(1).getSchema()); } + @Test + public void getSchemaFields_structType_returnRecord() throws SQLException { + OracleSourceSchemaReader schemaReader = new OracleSourceSchemaReader(); + ResultSet resultSet = Mockito.mock(ResultSet.class); + ResultSetMetaData metadata = Mockito.mock(ResultSetMetaData.class); + Statement statement = Mockito.mock(Statement.class); + Connection connection = Mockito.mock(Connection.class); + PreparedStatement stmt = Mockito.mock(PreparedStatement.class); + ResultSet attrRs = Mockito.mock(ResultSet.class); + Mockito.when(resultSet.getMetaData()).thenReturn(metadata); + Mockito.when(resultSet.getStatement()).thenReturn(statement); + Mockito.when(statement.getConnection()).thenReturn(connection); + Mockito.when(connection.prepareStatement(Mockito.anyString())).thenReturn(stmt); + Mockito.when(stmt.executeQuery()).thenReturn(attrRs); + Mockito.when(metadata.getColumnCount()).thenReturn(1); + Mockito.when(metadata.getColumnType(1)).thenReturn(Types.STRUCT); + Mockito.when(metadata.getColumnName(1)).thenReturn("address"); + Mockito.when(metadata.getColumnTypeName(1)).thenReturn("CS_ITN.ADDRESS_TYPE"); + Mockito.when(metadata.getSchemaName(1)).thenReturn("TEST_SCHEMA"); + Mockito.when(attrRs.next()).thenReturn(true, true, false); + Mockito.when(attrRs.getString("ATTR_NAME")).thenReturn("STREET", "CITY"); + Mockito.when(attrRs.getString("ATTR_TYPE_NAME")).thenReturn("VARCHAR2", "VARCHAR2"); + Mockito.when(attrRs.getInt("PRECISION")).thenReturn(0, 0); + Mockito.when(attrRs.getInt("SCALE")).thenReturn(0, 0); + + List actualFields = schemaReader.getSchemaFields(resultSet); + + Schema.Field addressField = actualFields.get(0); + Schema addressSchema = addressField.getSchema().isNullable() + ? addressField.getSchema().getNonNullable() : addressField.getSchema(); + List structFields = addressSchema.getFields(); + Assert.assertEquals(1, actualFields.size()); + Assert.assertEquals("address", addressField.getName()); + Assert.assertEquals(Schema.Type.RECORD, addressSchema.getType()); + Assert.assertEquals("CS_ITN.ADDRESS_TYPE", addressSchema.getRecordName()); + Assert.assertEquals(2, structFields.size()); + Assert.assertEquals("STREET", structFields.get(0).getName()); + Assert.assertEquals("CITY", structFields.get(1).getName()); + } + @Test public void getSchema_xmlField_returnString() throws SQLException { OracleSourceSchemaReader schemaReader = new OracleSourceSchemaReader(null, false, false, false, true); ResultSet resultSet = Mockito.mock(ResultSet.class); ResultSetMetaData metadata = Mockito.mock(ResultSetMetaData.class); + Connection connection = Mockito.mock(Connection.class); + Statement statement = Mockito.mock(Statement.class); Mockito.when(resultSet.getMetaData()).thenReturn(metadata); Mockito.when(metadata.getColumnCount()).thenReturn(1); Mockito.when(metadata.getColumnType(1)).thenReturn(Types.SQLXML); Mockito.when(metadata.getColumnName(1)).thenReturn("xmlData"); + Mockito.when(resultSet.getStatement()).thenReturn(statement); + Mockito.when(statement.getConnection()).thenReturn(connection); List actualSchemaFields = schemaReader.getSchemaFields(resultSet); @@ -118,10 +174,43 @@ public void getSchema_xmlFieldDisabled_throwsProgramFailureException() throws SQ false, false, false, false); ResultSet resultSet = Mockito.mock(ResultSet.class); ResultSetMetaData metadata = Mockito.mock(ResultSetMetaData.class); + Connection connection = Mockito.mock(Connection.class); + Statement statement = Mockito.mock(Statement.class); Mockito.when(resultSet.getMetaData()).thenReturn(metadata); Mockito.when(metadata.getColumnCount()).thenReturn(1); Mockito.when(metadata.getColumnType(1)).thenReturn(Types.SQLXML); Mockito.when(metadata.getColumnName(1)).thenReturn("xmlData"); + Mockito.when(resultSet.getStatement()).thenReturn(statement); + Mockito.when(statement.getConnection()).thenReturn(connection); + + Assert.assertThrows(ProgramFailureException.class, () -> schemaReader.getSchemaFields(resultSet)); + + } + + @Test + public void getSchemaFields_structWithUnsupportedAttributeType_throwsException() throws SQLException { + OracleSourceSchemaReader schemaReader = new OracleSourceSchemaReader(); + ResultSet resultSet = Mockito.mock(ResultSet.class); + ResultSetMetaData metadata = Mockito.mock(ResultSetMetaData.class); + Statement statement = Mockito.mock(Statement.class); + Connection connection = Mockito.mock(Connection.class); + PreparedStatement stmt = Mockito.mock(PreparedStatement.class); + ResultSet attrRs = Mockito.mock(ResultSet.class); + Mockito.when(resultSet.getMetaData()).thenReturn(metadata); + Mockito.when(resultSet.getStatement()).thenReturn(statement); + Mockito.when(statement.getConnection()).thenReturn(connection); + Mockito.when(connection.prepareStatement(Mockito.anyString())).thenReturn(stmt); + Mockito.when(stmt.executeQuery()).thenReturn(attrRs); + Mockito.when(metadata.getColumnCount()).thenReturn(1); + Mockito.when(metadata.getColumnType(1)).thenReturn(Types.STRUCT); + Mockito.when(metadata.getColumnName(1)).thenReturn("complex_payload"); + Mockito.when(metadata.getColumnTypeName(1)).thenReturn("CS_ITN.ANYDATA_TYPE"); + Mockito.when(metadata.getSchemaName(1)).thenReturn("TEST_SCHEMA"); + Mockito.when(attrRs.next()).thenReturn(true, true, false); + Mockito.when(attrRs.getString("ATTR_NAME")).thenReturn("VALID_ID", "UNSUPPORTED_DATA"); + Mockito.when(attrRs.getString("ATTR_TYPE_NAME")).thenReturn("NUMBER", "ANYDATA"); + Mockito.when(attrRs.getInt("PRECISION")).thenReturn(10, 0); + Mockito.when(attrRs.getInt("SCALE")).thenReturn(0, 0); Assert.assertThrows(ProgramFailureException.class, () -> schemaReader.getSchemaFields(resultSet));