Oracle DBMS_KAFKA
Version 26ai

General Information
Library Note Morgan's Library Page Header
The best Oracle News for FY2026

Oracle Database 26ai is now availble. If you haven't you downloaded already: Why?
Purpose Oracle SQL Access to Kafka (OSAK) is a feature of Oracle Database that allows data in Kafka topics to be queried from Oracle SQL. OSAK allows Kafka to be queried in one of three ways, each abstracted as a type of application created using one of three PL/SQL calls: CREATE_LOAD_APP, CREATE_STREAMING_APP, or CREATE_SEEKABLE_APP.
AUTHID CURRENT_USER
Constants
Name Data Type Value
 Application Types
APPLICATION_TYPE_LOAD VARCHAR2(30) 'LOAD'
APPLICATION_TYPE_SEEKABLE VARCHAR2(30) 'SEEKABLE'
APPLICATION_TYPE_STREAMING VARCHAR2(30) 'STREAMING'
 AVRO Options
OPT_AVRODECIMALTYPE VARCHAR2(30) 'avrodecimaltype'
 DSV Options
OPT_BLANKASNULL VARCHAR2(30) 'blankasnull'
 Debugging/Logging Options
OPT_DEBUG VARCHAR2(30) 'debug'
OPT_LOGEXEC VARCHAR2(30) 'logexec'
OPT_LOGOPT VARCHAR2(30) 'logopt'
OPT_LOGQC VARCHAR2(30) 'logqc'
 General Options
OPT_BUFSIZE VARCHAR2(30) 'bufsize'
OPT_SEPARATOR VARCHAR2(30) 'separator'
OPT_STRICT VARCHAR2(30) 'strict'
OPT_TERMINATOR VARCHAR2(30) 'terminator'
 Kafka Value Options
OPT_AVROSCHEMA VARCHAR2(30) 'avroschema'
OPT_FMT VARCHAR2(30) 'fmt'
OPT_JSONDT VARCHAR2(30) 'jsondt'
OPT_REFTABLE VARCHAR2(30) 'reftable'
 Kafka Key Options
OPT_KEYFMT VARCHAR2(30) 'keyfmt'
 Not Implemented Option Character Sets
OPT_CONVERSIONERRS VARCHAR2(30) 'conversionerrs'
OPT_DATEFMT VARCHAR2(30) 'datefmt'
OPT_ESCAPEDBY VARCHAR2(30) 'escapedby'
OPT_NULLDEFINEDAS VARCHAR2(30) 'nulldefinedas'
 Not Yet Implemented Option Columns
OPT_QUOTE VARCHAR2(30) 'quote'
OPT_REJECTLMT VARCHAR2(30) 'rejectlmt'
OPT_REMOVEQUOTES VARCHAR2(30) 'removequotes'
OPT_TSFMT VARCHAR2(30) 'tsfmt'
OPT_TSLTZFMT VARCHAR2(30) 'tsltzfmt'
OPT_TSTZFMT VARCHAR2(30) 'tstzfmt'
OPT_TRIMSPACES VARCHAR2(30) 'trimspaces'
OPT_TRUNCATECOL VARCHAR2(30) 'truncatecol'
 NYI Option Key Columns
OPT_KEYAVROSCHEMA VARCHAR2(30) 'keyavroschema'
OPT_KEYREFTABLE VARCHAR2(30) 'keyreftable'
 Topic Formats
TOPIC_FORMAT_AVRO VARCHAR2(30) 'AVRO'
TOPIC_FORMAT_DSV VARCHAR2(30) 'DSV'
TOPIC_FORMAT_JSON VARCHAR2(30) 'JSON'
 Watermark
WATER_MARK_HIGH VARCHAR2(30) 'HIGH'
WATER_MARK_LOW VARCHAR2(30) 'LOW'
Dependencies
DBMS_ASSERT DBMS_STANDARD USER_KAFKA_OPS
DBMS_KAFKA_MD DBMS_SYS_ERROR USER_KAFKA_OPS_RESULTS
DBMS_KAFKA_UTL DUAL USER_KAFKA_PARTITIONS
DBMS_LOB JSON_OBJECT_T USER_VIEWS
DBMS_OUTPUT USER_KAFKA_APPLICATIONS  
Documented No
First Available 23ai
Pragmas SUPPLEMENTAL_LOG_DATA(default, NONE);
Security Model Owned by SYS with EXECUTE granted to PUBLIC
Source {ORACLE_HOME}/rdbms/admin/dbmskafka.sql
{ORACLE_HOME}/rdbms/admin/prvtkafka.plb
Subprograms
 
ADD_PARTITIONS (new 23ai)
Adds additional Kafka partitions to an existing set of OSAK views dbms_kafka.add_partitions(
cluster_name     IN VARCHAR2,
application_name IN VARCHAR2);
TBD
 
CONVERT_MS_TO_TIMESTAMP (new 23ai)
Convert the input number of milliseconds since the epoch time to a TIMESTAMP WITH TIME ZONE and return this value dbms_kafka.convert_ms_to_timestamp(
milliseconds IN INTEGER,
timezone     IN VARCHAR2 DEFAULT NULL)
RETURN TIMESTAMP WITH TIME ZONE;
TBD
 
CONVERT_MS_TO_TIMESTAMP_TZ (new 23ai)
Convert the input number of milliseconds since the epoch time to a TIMESTAMP WITH TIME ZONE and return this value dbms_kafka.convert_ms_to_timestamp(
milliseconds IN INTEGER,
timezone     IN VARCHAR2 DEFAULT NULL)
RETURN TIMESTAMP WITH TIME ZONE;
TBD
 
CONVERT_TIMESTAMP_TO_MS (new 23ai)
Calculate and return the number of milliseconds since epoch time from the input TIMESTAMP WITH TIME ZONE

Overload 1
dbms_kafka.timestamp_to_ms(datetime_tz IN TIMESTAMP WITH TIME ZONE) RETURN INTEGER;
TBD
Overload 2 dbms_kafka.timestamp_to_ms(
datetime IN TIMESTAMP,
timezone IN VARCHAR2 DEFAULT NULL)
RETURN INTEGER;
TBD
 
CREATE_LOAD_APP (new 23ai)
Creates an Oracle SQL Access to Kafka LOAD application that will retrieve data from all partitions in a Kafka topic for the purpose of loading Kafka data into an Oracle Database table dbms_kafka.create_load_app(
cluster_name     IN VARCHAR2,
application_name IN VARCHAR2,
topic_name       IN VARCHAR2,
options          IN CLOB);
TBD
 
CREATE_SEEKABLE_APP (new 23ai)
Creates one OSAK view and an associated global temporary table to seek and load Kafka records between a user specified window of time dbms_kafka.create_seekable_app(
cluster_name     IN VARCHAR2,
application_name IN VARCHAR2,
topic_name       IN VARCHAR2,
options          IN CLOB);
TBD
 
CREATE_STREAMING_APP (new 23ai)
Creates an OSAK streaming application which includes a set of dedicated OSAK global temporary tables and OSAK views used for retrieving new, unread records from partitions in a Kafka topic. dbms_kafka.create_streaming_app(
cluster_name     IN VARCHAR2,
application_name IN VARCHAR2,
topic_name       IN VARCHAR2,
options          IN CLOB,
view_count       IN INTEGER DEFAULT 1);
TBD
 
DROP_ALL_APPS (new 23ai)
Drop all applications for the Kafka cluster dbms_kafka.drop_all_apps(cluster_name IN VARCHAR2);
TBD
 
DROP_LOAD_APP (new 23ai)
Drop LOAD application and remove the related metadata dbms_kafka.drop_load_app(
cluster_name     IN VARCHAR2,
application_name IN VARCHAR2);
TBD
 
DROP_SEEKABLE_APP (new 23ai)
Dropa seekable application and remove the related metadata dbms_kafka.drop_seekable_app(
cluster_name     IN VARCHAR2,
application_name IN VARCHAR2);
TBD
 
DROP_STREAMING_APP (new 23ai)
Dropa streaming application and remove the related metadata dbms_kafka.drop_streaming_app(
cluster_name     IN VARCHAR2,
application_name IN VARCHAR2);
TBD
 
ENABLE_VIEW_QUERY (new 23ai)
Sets session context to allow an OSAK application to query an OSAK view directly dbms_kafka.enableview_query(view_name IN VARCHAR2);
TBD
 
EXECUTE_LOAD_APP (new 23ai)
Loads a user defined target table with content from a Kafka topic.

For subsequent calls, only new unread Kafka records are inserted into the target table.
dbms_kafka.execute_load_app(
cluster_name     IN  VARCHAR2,
application_name IN  VARCHAR2,
target_table     IN  VARCHAR2,
records_loaded   OUT INTEGER,
parallel_hint    IN  INTEGER DEFAULT 0);
TBD
 
GET_GRANULE_COUNT (new 23ai)
Returnss the number of granules associated with an OSAK view dbms_kafka.get_granule_count(view_name IN VARCHAR2) RETURN INTEGER;
TBD
 
INIT_OFFSET (new 23ai)
Specifies a starting offset by specifying a delta number of records from either the high or low watermark of every partition dbms_kafka.init_offset(
view_name    IN VARCHAR2,
record_count IN INTEGER,
water_mark   IN VARCHAR2 DEFAULT WATER_MARK_HIGH);
TBD
 
INIT_OFFSET_TS (new 23ai)
Positions the processing of Kafka records to a point that is relatively current, potentially skipping unprocessed older records in the Kafka partitions

Overload 1
dbms_kafka.init_offset_ts(
view_name          IN VARCHAR2,
start_timestamp_ms IN INTEGER);
TBD
Overload 2 dbms_kafka.init_offset_ts(
view_name       IN VARCHAR2,
start_timestamp IN TIMESTAMP WITH TIME ZONE);
TBD
Overload 3 dbms_kafka.init_offset_ts(
view_name       IN VARCHAR2,
start_timestamp IN TIMESTAMP,
timezone        IN VARCHAR2 DEFAULT NULL);
TBD
 
LOAD_TEMP_TABLE (new 23ai)
Loads a global temporary table from a SELECT * FROM <view_name>, where view name is an OSAK view dbms_kafka.load_temp_table(
temp_table_name IN VARCHAR2,
parallel_hint   IN INTEGER DEFAULT 0);
TBD
 
SEEK_OFFSET_TS (new 23ai)
Positions an OSAK view to start reading Kafka records between two user specified epoch timestamps

Overload 1
dbms_kafka.seek_offset_ts(
view_name          IN VARCHAR2,
start_timestamp_ms IN INTEGER,
end_timestamp_ms   IN INTEGER);
TBD
Overload 2 dbms_kafka.seek_offset_ts(
view_name       IN VARCHAR2,
start_timestamp IN TIMESTAMP WITH TIME ZONE,
end_timestamp   IN TIMESTAMP WITH TIME ZONE);
TBD
Overload 3 dbms_kafka.seek_offset_ts(
view_name       IN VARCHAR2,
start_timestamp IN TIMESTAMP,
end_timestamp   IN TIMESTAMP,
timezone        IN VARCHAR2 DEFAULT NULL);
TBD
 
SET_TRACING (new 23ai)
Enables or disables debug level tracing for the external table driver code dbms_kafka.set_tracing(
cluster_name     IN VARCHAR2,
application_name IN VARCHAR2,
enable           IN BOOLEAN);
TBD
 
UPDATE_OFFSET (new 23ai)
Updates the last Kafka partition offsets read after a STREAMING application instance has has finished querying a set of Kafka records captured in an OSAK temporary table dbms_kafka.update_offset(view_name IN VARCHAR2);
TBD

Related Topics
Built-in Functions
Built-in Packages
Database Security
DBMS_KAFKA_ADM
DBMS_KAFKA_INT
DBMS_KAFKA_MD
DBMS_KAFKA_UTL
What's New In 21c
What's New In 26ai

Morgan's Library Page Footer
This site is maintained by Daniel Morgan. Last Updated: This site is protected by copyright and trademark laws under U.S. and International law. © 1998-2026 Daniel A. Morgan All Rights Reserved