Wednesday, April 10, 2013

Oracle Streams Synchronous Capture


Article Objective


This article contains the implementation of Oracle Streams Synchronous capture which was introduced in version 11g. The contents of the article has been practically executed and demoed to one of my Client
 
This article applies to following technology versions

Oracle Database Enterprise Edition 11.2.0.1 and above
Oracle Database Standard Edition 11.2.0.1 and above
Oracle Database Standard Edition One 11.2.0.1 and above


About Streams Synchronous Capture


Synchronous Capture replicates changes recorded on Source Database to Destination Database based on an Internal Mechanism which does not rely on Memory or Redo logs or even archive logs. As the name suggests, enqueing changes is Synchronous otherwise everything is Asynchronous. 

Advantages
  • Available with Standard Edition so as the dependancy to procure Enterprise Edition is no more
  • It is well recommended If business objective is to capture small set of tables
  • Database does not require to be in Archive Log mode


Disadvantages
  • DDL changes can not be replicated.
  • Changes made by Direct Path uploads can't be replicated.
  • It cannot capture DML changes made to temporary tables, object tables, or tables compressed with hybrid columnar compression.
  • It does not support Oracle Label security
  • It could slow down the performance on heavy loads, and takes time to replicate


 The analogy below applies to this document

SOURCE - > Source database name
TARGET -> Destination Database name
SOURCE.com -> Database Link to SOURCE Database & Global identity for Source Database
TARGET.com -> Database Link to TARGET Database & Global identity for Destination Database
“.com”-> Domain name
SCOTT->Schema used in replication
SALGRADE->Table owned by SCOTT Schema for replication


Configuring Synchronous Capture

Pre-requisites

Following pre-requisites should be met before configuration

Step (1):
Setup streams admin user on both Databases (SOURCE & TARGET)
connect / as sysdba

CREATE TABLESPACE tbs_sadmin DATAFILE '<location>/ts_sadmin_01.dbf' SIZE 250M AUTOEXTEND ON MAXSIZE 2048M;

CREATE USER sadmin IDENTIFIED BY <password> DEFAULT TABLESPACE tbs_sadmin QUOTA UNLIMITED ON tbs_sadmin;

grant dba to sadmin;

BEGIN
  DBMS_STREAMS_ADM.SET_UP_QUEUE(
     queue_table =>'sadmin.SOURCE_queue_table',
     queue_name  => 'sadmin.SOURCE_queue);
END;
/



 Step (2):
Set following parameters on both Databases
ALTER SYSTEM SET aq_tm_processes=2 SCOPE=BOTH;
ALTER SYSTEM SET job_queue_processes=20 SCOPE=BOTH;
ALTER SYSTEM SET global_names=true SCOPE=BOTH;

Step (3)
Configure TNS Client to listener of Destination Database, and back to Source Database
Assumption:  Listener of both databases running on port 1522

TNS to destination Database

TARGET =
  (DESCRIPTION =
    (ADDRESS = (PROTOCOL = TCP)(HOST = <Destination IP>)(PORT = 1522))
    (CONNECT_DATA =
      (SERVER = DEDICATED)
      (SERVICE_NAME = TARGET.com)
    )
  )

TNS to Source Database

SOURCE =
  (DESCRIPTION =
    (ADDRESS = (PROTOCOL = TCP)(HOST = <Source IP>)(PORT = 1522))
    (CONNECT_DATA =
      (SERVER = DEDICATED)
      (SERVICE_NAME = SOURCE.com)
    )
  )

Check the connectivity
$sqlplus sadmin/<password>@SOURCE -> to Source from Target
$sqlplus sadmin/<password>@TARGET-> to Target from Source

In the event of successful connection, a banner with version details should be displayed

Step (4)
Create Database Links from Source to Destination, and back to Source from Destination
From Source to Destination

CREATE DATABASE LINK TARGET.com CONNECT TO sadmin IDENTIFIED BY <password> USING 'TARGET';

From Destination to Source

CREATE DATABASE LINK SOURCE.com CONNECT TO sadmin IDENTIFIED BY <password> USING 'SOURCE';



Step (4.1) (On Source Database)

For the replication to work properly, If possible, each table for which changes are applied by an apply process should have a primary key. When a primary key is not possible, Oracle recommends that each table have a set of columns that can be used as a unique identifier for each row of the table. If the tables that you plan to use in your Oracle Streams environment do not have a primary key or a set of unique columns, then consider altering these tables accordingly.

In the absence of substitute key columns, primary key constraints, and unique key constraints, an apply process uses all of the columns in the table as the key columns, excluding LOB, LONG, and LONG RAW columns. In this case, you must create an unconditional supplemental log group containing these columns at the source database. Using substitute key columns is preferable when there is no primary key constraint for a table because fewer columns are needed in the row LCR.

Enable Unconditional Supplemental logging for all columns in case there are no key columns, or on key columns to make sure the data inside the table logically replicated and consistent, Login as the table owner

For All columns  (Mandatory for tables with no key columns)
SQL>ALTER TABLE SALGRADE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
For Key Columns
SQL>ALTER TABLE SALGRADE ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY) COLUMNS;
Note: If duplication on the table is desired, set allow_duplicate_rows to true on apply process  (step (10.1))

Configuration

Step (5) (On Source Database)
Creating Queue for Source table enqueue process
BEGIN
DBMS_STREAMS_ADM.SET_UP_QUEUE(
queue_table => 'sadmin.SOURCE_queue_table',
queue_name => 'sadmin.SOURCE_queue',
queue_user => 'sadmin');
END;
/

Step (6) (On Source Database)
Adding tables to the Queue
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'SCOTT.TEST_SALGRADE',
streams_type => 'SYNC_CAPTURE',
streams_name => 'SYNC_CAPTURE',
queue_name => 'sadmin.SOURCE_queue',
include_dml => true,
include_ddl => false,
include_tagged_lcr => false,
inclusion_rule => true,
source_database => 'SOURCE');
END;
/

Step (7) (On Source Database)
Creating Propagation Queue
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_PROPAGATION_RULES(
table_name => 'SCOTT.TEST_SALGRADE',
streams_name => 'SOURCE_propagation',
source_queue_name => 'sadmin.SOURCE_queue',
destination_queue_name => 'sadmin.TARGET_queue@TARGET.com',
include_dml => TRUE,
include_ddl => FALSE,
include_tagged_lcr => FALSE,
source_database => 'SOURCE',
inclusion_rule => TRUE,
queue_to_queue => TRUE);
END;
/

Step (8) (On Destination Database)
Create Queue for deque mechanism
BEGIN
DBMS_STREAMS_ADM.SET_UP_QUEUE(
queue_table => 'sadmin.TARGET_queue_table',
queue_name => 'sadmin.TARGET_queue',
queue_user => 'sadmin');
END;
/

Step (9) (On Destination Database)
Create Synchronous Capture Apply process
BEGIN
dbms_apply_adm.create_apply(
queue_name => 'sadmin.TARGET_queue',
apply_name => 'sync_apply',
apply_captured => false);
END;
/

Step (10) (On Destination Database)
Add replicated (desired) table to Apply process
BEGIN
dbms_streams_adm.add_table_rules( table_name => 'SCOTT.TEST_SALGRADE',
streams_type => 'APPLY',
streams_name => 'SYNC_APPLY',
queue_name => 'sadmin.TARGET_queue',
include_dml => true,
include_ddl => false,
include_tagged_lcr => false,
source_database => 'SOURCE');
END;
/

  
Step(10.1) (On Destination Database)
If the duplication is desired on the table with no key columns and in absence of unconditional supplemental logging, set allow_duplicate_rows to “Y” on apply process.
BEGIN
dbms_apply_adm.set_parameter (
   apply_name => 'sync_apply',
   parameter => 'allow_duplicate_rows',
   value => 'Y');
END;
/


Step (11)
Instantiate the Destination Table
The desired table in the DESTINATION schema must be instantiated, and store the instantiation SCN so that the apply process identifies what changes exactly it can apply. To instantiate the table, please execute the below flashback query,

Get the SCN, On Source Database, login as sadmin (Streams admin user)

$sqlplus sadmin/sadmin
SQL>COLUMN apply_scn FORMAT 99999999999
SELECT DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER APPLY_SCN FROM dual;

Take the copy of SCN, and execute the query below on Destination Database as sadmin user (Streams Administrator)

$sqlplus sadmin/sadmin
SQL>INSERT INTO SCOTT.TEST_SALGRADE (select * from SCOTT.TEST_SALGRADE@SOURCE as of scn <SCN Number>);
commit;



Step (12) (On Destination Database)
Store the Instantiation SCN on apply table 
BEGIN
dbms_apply_adm.set_table_instantiation_scn(
source_object_name => 'SCOTT.TEST_SALGRADE',
source_database_name => 'SOURCE',
instantiation_scn => <SCN Number>);
END;
/

Step (13) (On Destination Database)
Begin the Apply Process
EXEC DBMS_APPLY_ADM.START_APPLY('SYNC_APPLY');

Step (14)
Verify Replication
Perform all DML Operations on the source able, and every operation should be reflected onto destination table


Monitoring Streams (Capture Process/Synchronous Capture)

To display this general information about each synchronous capture in a database, run the following query

COLUMN CAPTURE_NAME HEADING 'Synchronous|Capture Name' FORMAT A20
COLUMN QUEUE_NAME HEADING 'Synchronous|Capture Queue' FORMAT A20
COLUMN RULE_SET_NAME HEADING 'Positive Rule Set' FORMAT A20
COLUMN CAPTURE_USER HEADING 'Capture User' FORMAT A15

SELECT CAPTURE_NAME, QUEUE_NAME, RULE_SET_NAME, CAPTURE_USER
   FROM DBA_SYNC_CAPTURE;


Displaying Tables with Stream Configuration
set lines 120
col streams_name format a12
col streams_type format a12
col table_owner format a11
col table_name format a5
col rule_type format a8
col rule_name format a4

SELECT STREAMS_NAME,
      STREAMS_TYPE,
      TABLE_OWNER,
      TABLE_NAME,
      RULE_TYPE,
      RULE_NAME
 FROM DBA_STREAMS_TABLE_RULES;

Check the Tables For Which Synchronous Capture Captures Changes
COLUMN STREAMS_NAME HEADING 'Synchronous|Capture Name' FORMAT A15
COLUMN RULE_NAME HEADING 'Rule Name' FORMAT A15
COLUMN SUBSETTING_OPERATION HEADING 'Subsetting|Operation' FORMAT A10
COLUMN TABLE_OWNER HEADING 'Table|Owner' FORMAT A10
COLUMN TABLE_NAME HEADING 'Table Name' FORMAT A15
COLUMN ENABLED HEADING 'Enabled?' FORMAT A8

SELECT r.STREAMS_NAME,
       r.RULE_NAME,
       r.SUBSETTING_OPERATION,
       t.TABLE_OWNER,
       t.TABLE_NAME,
       t.ENABLED
   FROM DBA_STREAMS_TABLE_RULES r,
        DBA_SYNC_CAPTURE_TABLES t
   WHERE r.STREAMS_TYPE = 'SYNC_CAPTURE' AND
         r.TABLE_OWNER  = t.TABLE_OWNER AND
         r.TABLE_NAME   = t.TABLE_NAME;

Displaying the Queue, Rule Sets, and Status of Each Capture Process
COLUMN CAPTURE_NAME HEADING 'Capture|Process|Name' FORMAT A15
COLUMN QUEUE_NAME HEADING 'Capture|Process|Queue' FORMAT A15
COLUMN RULE_SET_NAME HEADING 'Positive|Rule Set' FORMAT A15
COLUMN NEGATIVE_RULE_SET_NAME HEADING 'Negative|Rule Set' FORMAT A15
COLUMN STATUS HEADING 'Capture|Process|Status' FORMAT A15

SELECT CAPTURE_NAME, QUEUE_NAME, RULE_SET_NAME, NEGATIVE_RULE_SET_NAME, STATUS
   FROM DBA_CAPTURE;

Displaying Change Capture Information About Each Capture Process
COLUMN CAPTURE_NAME HEADING 'Capture|Name' FORMAT A7
COLUMN PROCESS_NAME HEADING 'Capture|Process|Number' FORMAT A7
COLUMN SID HEADING 'Session|ID' FORMAT 9999
COLUMN SERIAL# HEADING 'Session|Serial|Number' FORMAT 9999
COLUMN STATE HEADING 'State' FORMAT A20
COLUMN TOTAL_MESSAGES_CAPTURED HEADING 'Redo|Entries|Evaluated|In Detail' FORMAT 9999999
COLUMN TOTAL_MESSAGES_ENQUEUED HEADING 'Total|LCRs|Enqueued' FORMAT 9999999999

SELECT c.CAPTURE_NAME,
       SUBSTR(s.PROGRAM,INSTR(s.PROGRAM,'(')+1,4) PROCESS_NAME,
       c.SID,
       c.SERIAL#,
       c.STATE,
       c.TOTAL_MESSAGES_CAPTURED,
       c.TOTAL_MESSAGES_ENQUEUED
  FROM V$STREAMS_CAPTURE c, V$SESSION s
  WHERE c.SID = s.SID AND
        c.SERIAL# = s.SERIAL#;

Displaying Tables with Apply Process
set lines 120
col streams_name format a12
col streams_type format a12
col table_owner format a11
col table_name format a5
col rule_type format a8
col rule_name format a5

SELECT STREAMS_NAME,
      STREAMS_TYPE,
      TABLE_OWNER,
      TABLE_NAME,
      RULE_TYPE,
      RULE_NAME
 FROM DBA_STREAMS_TABLE_RULES;



Check if Propagation is enabled, and reason if it is disabled
COLUMN DESTINATION_DBLINK HEADING 'Database|Link'      FORMAT A15
COLUMN STATUS             HEADING 'Status'             FORMAT A8
COLUMN ERROR_DATE         HEADING 'Error|Date'
COLUMN ERROR_MESSAGE      HEADING 'Error Message'      FORMAT A35

SELECT DESTINATION_DBLINK,
       STATUS,
       ERROR_DATE,
       ERROR_MESSAGE
  FROM DBA_PROPAGATION
  WHERE PROPAGATION_NAME = '<name of the propagation>';

Check if apply process is enabled
SELECT STATUS FROM DBA_APPLY WHERE APPLY_NAME = '<name of the apply process>';


Check the reason for apply process abortion
COLUMN APPLY_NAME HEADING 'APPLY|Process|Name' FORMAT A10
COLUMN STATUS_CHANGE_TIME HEADING 'Abort Time'
COLUMN ERROR_NUMBER HEADING 'Error Number' FORMAT 99999999
COLUMN ERROR_MESSAGE HEADING 'Error Message' FORMAT A40

SELECT APPLY_NAME, STATUS_CHANGE_TIME, ERROR_NUMBER, ERROR_MESSAGE
  FROM DBA_APPLY WHERE STATUS='ABORTED';


Check for apply errors
COLUMN APPLY_NAME HEADING 'Apply|Process|Name' FORMAT A10
COLUMN SOURCE_DATABASE HEADING 'Source|Database' FORMAT A10
COLUMN LOCAL_TRANSACTION_ID HEADING 'Local|Transaction|ID' FORMAT A11
COLUMN ERROR_NUMBER HEADING 'Error Number' FORMAT 99999999
COLUMN ERROR_MESSAGE HEADING 'Error Message' FORMAT A20
COLUMN MESSAGE_COUNT HEADING 'Messages in|Error|Transaction' FORMAT 99999999

SELECT APPLY_NAME,
       SOURCE_DATABASE,
       LOCAL_TRANSACTION_ID,
       ERROR_NUMBER,
       ERROR_MESSAGE,
       MESSAGE_COUNT
  FROM DBA_APPLY_ERROR;

Check apply process information
COLUMN APPLY_NAME HEADING 'Apply Process|Name' FORMAT A15
COLUMN APPLY_CAPTURED HEADING 'Dequeues Captured|Messages?' FORMAT A17
COLUMN PROCESS_NAME HEADING 'Process|Name' FORMAT A7
COLUMN STATE HEADING 'State' FORMAT A17
COLUMN TOTAL_MESSAGES_DEQUEUED HEADING 'Total Messages|Dequeued' FORMAT 99999999

SELECT r.APPLY_NAME,
       ap.APPLY_CAPTURED,
       SUBSTR(s.PROGRAM,INSTR(s.PROGRAM,'(')+1,4) PROCESS_NAME,
       r.STATE,
       r.TOTAL_MESSAGES_DEQUEUED
       FROM V$STREAMS_APPLY_READER r, V$SESSION s, DBA_APPLY ap
       WHERE r.SID = s.SID AND
             r.SERIAL# = s.SERIAL# AND
             r.APPLY_NAME = ap.APPLY_NAME;



To display this general information about each apply process in a database, run the following query:

COLUMN APPLY_NAME HEADING 'Apply Process Name' FORMAT A20
COLUMN APPLY_CAPTURED HEADING 'Applies Captured LCRs?' FORMAT A30
COLUMN APPLY_USER HEADING 'Apply User' FORMAT A20

SELECT APPLY_NAME, APPLY_CAPTURED, APPLY_USER
  FROM DBA_APPLY;


Determining the Queue, Rule Sets, and Status for Each Apply Process
COLUMN APPLY_NAME HEADING 'Apply|Process|Name' FORMAT A15
COLUMN QUEUE_NAME HEADING 'Apply|Process|Queue' FORMAT A15
COLUMN RULE_SET_NAME HEADING 'Positive|Rule Set' FORMAT A15
COLUMN NEGATIVE_RULE_SET_NAME HEADING 'Negative|Rule Set' FORMAT A15
COLUMN STATUS HEADING 'Apply|Process|Status' FORMAT A15

SELECT APPLY_NAME,
       QUEUE_NAME,
       RULE_SET_NAME,
       NEGATIVE_RULE_SET_NAME,
       STATUS
  FROM DBA_APPLY;


Displaying the Queue and Rule Set of Each Synchronous Capture
COLUMN CAPTURE_NAME HEADING 'Synchronous|Capture Name' FORMAT A20
COLUMN QUEUE_NAME HEADING 'Synchronous|Capture Queue' FORMAT A20
COLUMN RULE_SET_NAME HEADING 'Positive Rule Set' FORMAT A20
COLUMN CAPTURE_USER HEADING 'Capture User' FORMAT A15

SELECT CAPTURE_NAME, QUEUE_NAME, RULE_SET_NAME, CAPTURE_USER
   FROM DBA_SYNC_CAPTURE;


Displaying the Tables For Which Synchronous Capture Changes
COLUMN STREAMS_NAME HEADING 'Synchronous|Capture Name' FORMAT A15
COLUMN RULE_NAME HEADING 'Rule Name' FORMAT A15
COLUMN SUBSETTING_OPERATION HEADING 'Subsetting|Operation' FORMAT A10
COLUMN TABLE_OWNER HEADING 'Table|Owner' FORMAT A10
COLUMN TABLE_NAME HEADING 'Table Name' FORMAT A15
COLUMN ENABLED HEADING 'Enabled?' FORMAT A8

SELECT r.STREAMS_NAME,
       r.RULE_NAME,
       r.SUBSETTING_OPERATION,
       t.TABLE_OWNER,
       t.TABLE_NAME,
       t.ENABLED
   FROM DBA_STREAMS_TABLE_RULES r,
        DBA_SYNC_CAPTURE_TABLES t
   WHERE r.STREAMS_TYPE = 'SYNC_CAPTURE' AND
         r.TABLE_OWNER  = t.TABLE_OWNER AND
         r.TABLE_NAME   = t.TABLE_NAME;


Viewing the Extra Attributes Captured by Implicit Capture
COLUMN CAPTURE_NAME HEADING 'Capture Process or|Synchronous Capture' FORMAT A20
COLUMN ATTRIBUTE_NAME HEADING 'Attribute Name' FORMAT A15
COLUMN INCLUDE HEADING 'Include Attribute in LCRs?' FORMAT A30

SELECT CAPTURE_NAME, ATTRIBUTE_NAME, INCLUDE
  FROM DBA_CAPTURE_EXTRA_ATTRIBUTES
  ORDER BY CAPTURE_NAME;


Managing Streams

To stop propagation (On Source)
BEGIN
  DBMS_PROPAGATION_ADM.STOP_PROPAGATION(
    propagation_name => 'SOURCE_PROPAGATION',
    force            => FALSE);
END;
/

To start propagation  (On Source)
BEGIN
  DBMS_PROPAGATION_ADM.START_PROPAGATION(
    propagation_name => 'SOURCE_PROPAGATION');
END;
/

To Disable/Stop Apply (On Destination Database)
BEGIN
  DBMS_APPLY_ADM.STOP_APPLY(
    apply_name => 'SYNC_APPLY');
END;
/

To Enable/Start Apply (On Destination Database)
BEGIN
  DBMS_APPLY_ADM.START_APPLY(
    apply_name => 'SYNC_APPLY');
END
/


To start Capture (Capture Process/Source Database)
BEGIN
  DBMS_CAPTURE_ADM.START_CAPTURE(
    capture_name => '<Capture Process Name>');
END;
/


To stop Capture (Capture Process/Source Database)
BEGIN
  DBMS_CAPTURE_ADM.STOP_CAPTURE(
    capture_name => '<Capture Process Name>');
END;
/

To drop Capture (Synchronous Capture/Capture Process, On Source )
BEGIN
  DBMS_CAPTURE_ADM.DROP_CAPTURE(
    capture_name          => 'SYNC_CAPTURE',
    drop_unused_rule_sets => TRUE);
END;
/

To remove a queue (On Source/Destination)
exec dbms_streams_adm.remove_queue('SOURCE_queue',true,true);
exec dbms_streams_adm.remove_queue('TARGET_queue',true,true);

To drop a propagation (On Source/Destination)        
BEGIN
  DBMS_PROPAGATION_ADM.DROP_PROPAGATION(
    propagation_name      => 'SOURCE_PROPAGATION',
    drop_unused_rule_sets => TRUE);
END;
/