16 Managing Staging and Propagation

The following topics describe managing ANYDATA queues and propagations:

Each task described in this chapter should be completed by an Oracle Streams administrator that has been granted the appropriate privileges, unless specified otherwise.

Managing Queues

An ANYDATA queue stages messages whose payloads are of ANYDATA type. Therefore, an ANYDATA queue can stage a message with a payload of nearly any type, if the payload is wrapped in an ANYDATA wrapper. Each Oracle Streams capture process, apply process, and messaging client is associated with one ANYDATA queue, and each Oracle Streams propagation is associated with one ANYDATA source queue and one ANYDATA destination queue.

This section contains instructions for completing the following tasks related to queues:

Enabling a User to Perform Operations on a Secure Queue

For a user to perform queue operations, such as enqueue and dequeue, on a secure queue, the user must be configured as a secure queue user of the queue. If you use the SET_UP_QUEUE procedure in the DBMS_STREAMS_ADM package to create the secure queue, then the queue owner and the user specified by the queue_user parameter are configured as secure users of the queue automatically. If you want to enable other users to perform operations on the queue, then you can configure these users in one of the following ways:

  • Run SET_UP_QUEUE and specify a queue_user. Queue creation is skipped if the queue already exists, but a new queue user is configured if one is specified.

  • Associate the user with an Oracle Streams Advanced Queuing (AQ) agent manually.

The following example illustrates associating a user with an Oracle Streams AQ agent manually. Suppose you want to enable the oe user to perform queue operations on a queue named streams_queue. The following steps configure the oe user as a secure queue user of streams_queue:

  1. In SQL*Plus, connect as an administrative user who can create Oracle Streams AQ agents and alter users.

    See Oracle Database Administrator's Guide for information about connecting to a database in SQL*Plus.

  2. Create an agent:

    EXEC DBMS_AQADM.CREATE_AQ_AGENT(agent_name => 'streams_queue_agent');
    
  3. If the user must be able to dequeue messages from queue, then make the agent a subscriber of the secure queue:

    DECLARE
      subscriber SYS.AQ$_AGENT;
    BEGIN
      subscriber :=  SYS.AQ$_AGENT('streams_queue_agent', NULL, NULL);  
      DBMS_AQADM.ADD_SUBSCRIBER(
        queue_name          =>  'strmadmin.streams_queue',
        subscriber          =>  subscriber,
        rule                =>  NULL,
        transformation      =>  NULL);
    END;
    /
    
  4. Associate the user with the agent:

    BEGIN
      DBMS_AQADM.ENABLE_DB_ACCESS(
        agent_name  => 'streams_queue_agent',
        db_username => 'oe');
    END;
    /
    
  5. Grant the user EXECUTE privilege on the DBMS_STREAMS_MESSAGING package or the DBMS_AQ package, if the user is not already granted these privileges:

    GRANT EXECUTE ON DBMS_STREAMS_MESSAGING TO oe;
    
    GRANT EXECUTE ON DBMS_AQ TO oe;
    

When these steps are complete, the oe user is a secure user of the streams_queue queue and can perform operations on the queue. You still must grant the user specific privileges to perform queue operations, such as enqueue and dequeue privileges.

See Also:

Disabling a User from Performing Operations on a Secure Queue

You might want to disable a user from performing queue operations on a secure queue for the following reasons:

  • You dropped a capture process or a synchronous capture, but you did not drop the queue that was used by the capture process or synchronous capture, and you do not want the user who was the capture user to be able to perform operations on the remaining secure queue.

  • You dropped an apply process, but you did not drop the queue that was used by the apply process, and you do not want the user who was the apply user to be able to perform operations on the remaining secure queue.

  • You used the ALTER_APPLY procedure in the DBMS_APPLY_ADM package to change the apply_user for an apply process, and you do not want the old apply_user to be able to perform operations on the apply process's queue.

  • You enabled a user to perform operations on a secure queue by completing the steps described in Enabling a User to Perform Operations on a Secure Queue, but you no longer want this user to be able to perform operations on the secure queue.

To disable a secure queue user, you can revoke ENQUEUE and DEQUEUE privilege on the queue from the user, or you can run the DISABLE_DB_ACCESS procedure in the DBMS_AQADM package. For example, suppose you want to disable the oe user from performing queue operations on a queue named streams_queue.

Caution:

If an Oracle Streams AQ agent is used for multiple secure queues, then running DISABLE_DB_ACCESS for the agent prevents the user associated with the agent from performing operations on all of these queues.
  1. Run the following procedure to disable the oe user from performing queue operations on the secure queue streams_queue:

    BEGIN
      DBMS_AQADM.DISABLE_DB_ACCESS(
        agent_name  => 'streams_queue_agent',
        db_username => 'oe');
    END;
    /
    
  2. If the agent is no longer needed, you can drop the agent:

    BEGIN
      DBMS_AQADM.DROP_AQ_AGENT(
        agent_name  => 'streams_queue_agent');
    END;
    /
    
  3. Revoke privileges on the queue from the user, if the user no longer needs these privileges.

    BEGIN
      DBMS_AQADM.REVOKE_QUEUE_PRIVILEGE (
       privilege   => 'ALL',
       queue_name  => 'strmadmin.streams_queue',
       grantee     => 'oe');
    END;
    /
    

See Also:

Removing a Queue

You use the REMOVE_QUEUE procedure in the DBMS_STREAMS_ADM package to remove an existing ANYDATA queue. When you run the REMOVE_QUEUE procedure, it waits until any existing messages in the queue are consumed. Next, it stops the queue, which means that no further enqueues into the queue or dequeues from the queue are allowed. When the queue is stopped, it drops the queue.

You can also drop the queue table for the queue if it is empty and is not used by another queue. To do so, specify TRUE, the default, for the drop_unused_queue_table parameter.

In addition, you can drop any Oracle Streams clients that use the queue by setting the cascade parameter to TRUE. By default, the cascade parameter is set to FALSE.

For example, to remove an ANYDATA queue named streams_queue in the strmadmin schema and drop its empty queue table, run the following procedure:

BEGIN
  DBMS_STREAMS_ADM.REMOVE_QUEUE(
    queue_name              => 'strmadmin.streams_queue',
    cascade                 => FALSE,
    drop_unused_queue_table => TRUE);
END;
/

In this case, because the cascade parameter is set to FALSE, this procedure drops the streams_queue only if no Oracle Streams clients use the queue. If the cascade parameter is set to FALSE and any Oracle Streams client uses the queue, then an error is raised.

Managing Oracle Streams Propagations and Propagation Jobs

A propagation propagates messages from an Oracle Streams source queue to an Oracle Streams destination queue. This section provides instructions for completing the following tasks:

In addition, you can use the features of Oracle Streams Advanced Queuing (AQ) to manage Oracle Streams propagations.

See Also:

Starting a Propagation

You run the START_PROPAGATION procedure in the DBMS_PROPAGATION_ADM package to start an existing propagation. For example, the following procedure starts a propagation named strm01_propagation:

BEGIN
  DBMS_PROPAGATION_ADM.START_PROPAGATION(
    propagation_name => 'strm01_propagation');
END;
/

Stopping a Propagation

You run the STOP_PROPAGATION procedure in the DBMS_PROPAGATION_ADM package to stop an existing propagation. For example, the following procedure stops a propagation named strm01_propagation:

BEGIN
  DBMS_PROPAGATION_ADM.STOP_PROPAGATION(
    propagation_name => 'strm01_propagation',
    force            => FALSE);
END;
/

To clear the statistics for the propagation when it is stopped, set the force parameter to TRUE. If there is a problem with a propagation, then stopping the propagation with the force parameter set to TRUE and restarting the propagation might correct the problem. If the force parameter is set to FALSE, then the statistics for the propagation are not cleared.

Altering the Schedule of a Propagation Job

To alter the schedule of an existing propagation job, use the ALTER_PROPAGATION_SCHEDULE procedure in the DBMS_AQADM package. The following sections contain examples that alter the schedule of a propagation job for a queue-to-queue propagation and for a queue-to-dblink propagation. These examples set the propagation job to propagate messages every 15 minutes (900 seconds), with each propagation lasting 300 seconds, and a 25-second wait before new messages in a completely propagated queue are propagated.

This section contains these topics:

See Also:

Altering the Schedule of a Propagation Job for a Queue-to-Queue Propagation

To alter the schedule of a propagation job for a queue-to-queue propagation that propagates messages from the strmadmin.strm_a_queue source queue to the strmadmin.strm_b_queue destination queue using the dbs2.example.com database link, run the following procedure:

BEGIN
  DBMS_AQADM.ALTER_PROPAGATION_SCHEDULE(
   queue_name        => 'strmadmin.strm_a_queue',
   destination       => 'dbs2.example.com',
   duration          => 300,
   next_time         => 'SYSDATE + 900/86400',
   latency           => 25,
   destination_queue => 'strmadmin.strm_b_queue'); 
END;
/

Because each queue-to-queue propagation has its own propagation job, this procedure alters only the schedule of the propagation that propagates messages between the two queues specified. The destination_queue parameter must specify the name of the destination queue to alter the propagation schedule of a queue-to-queue propagation.

Altering the Schedule of a Propagation Job for a Queue-to-Dblink Propagation

To alter the schedule of a propagation job for a queue-to-dblink propagation that propagates messages from the strmadmin.streams_queue source queue using the dbs3.example.com database link, run the following procedure:

BEGIN
  DBMS_AQADM.ALTER_PROPAGATION_SCHEDULE(
   queue_name  => 'strmadmin.streams_queue',
   destination => 'dbs3.example.com',
   duration    => 300,
   next_time   => 'SYSDATE + 900/86400',
   latency     => 25); 
END;
/

Because the propagation is a queue-to-dblink propagation, the destination_queue parameter is not specified. Completing this task affects all queue-to-dblink propagations that propagate messages from the source queue to all destination queues that use the dbs3.example.com database link.

Specifying the Rule Set for a Propagation

You can specify one positive rule set and one negative rule set for a propagation. The propagation propagates a message if it evaluates to TRUE for at least one rule in the positive rule set and discards a change if it evaluates to TRUE for at least one rule in the negative rule set. The negative rule set is evaluated before the positive rule set.

This section contains these topics:

Specifying a Positive Rule Set for a Propagation

You specify an existing rule set as the positive rule set for an existing propagation using the rule_set_name parameter in the ALTER_PROPAGATION procedure. This procedure is in the DBMS_PROPAGATION_ADM package.

For example, the following procedure sets the positive rule set for a propagation named strm01_propagation to strm02_rule_set.

BEGIN
  DBMS_PROPAGATION_ADM.ALTER_PROPAGATION(
    propagation_name  => 'strm01_propagation',
    rule_set_name     => 'strmadmin.strm02_rule_set');
END;
/

Specifying a Negative Rule Set for a Propagation

You specify an existing rule set as the negative rule set for an existing propagation using the negative_rule_set_name parameter in the ALTER_PROPAGATION procedure. This procedure is in the DBMS_PROPAGATION_ADM package.

For example, the following procedure sets the negative rule set for a propagation named strm01_propagation to strm03_rule_set.

BEGIN
  DBMS_PROPAGATION_ADM.ALTER_PROPAGATION(
    propagation_name        => 'strm01_propagation',
    negative_rule_set_name  => 'strmadmin.strm03_rule_set');
END;
/

Adding Rules to the Rule Set for a Propagation

To add rules to the rule set of a propagation, you can run one of the following procedures:

Excluding the ADD_SUBSET_PROPAGATION_RULES procedure, these procedures can add rules to the positive rule set or negative rule set for a propagation. The ADD_SUBSET_PROPAGATION_RULES procedure can add rules only to the positive rule set for a propagation.

This section contains these topics:

Adding Rules to the Positive Rule Set for a Propagation

The following example runs the ADD_TABLE_PROPAGATION_RULES procedure in the DBMS_STREAMS_ADM package to add rules to the positive rule set of an existing propagation named strm01_propagation:

BEGIN
  DBMS_STREAMS_ADM.ADD_TABLE_PROPAGATION_RULES(
    table_name              => 'hr.locations',
    streams_name            => 'strm01_propagation',
    source_queue_name       => 'strmadmin.strm_a_queue',
    destination_queue_name  => 'strmadmin.strm_b_queue@dbs2.example.com',
    include_dml             => TRUE,
    include_ddl             => TRUE,
    source_database         => 'dbs1.example.com',
    inclusion_rule          => TRUE);
END;
/

Running this procedure performs the following actions:

  • Creates two rules. One rule evaluates to TRUE for row LCRs that contain the results of DML changes to the hr.locations table. The other rule evaluates to TRUE for DDL LCRs that contain DDL changes to the hr.locations table. The rule names are system generated.

  • Specifies that both rules evaluate to TRUE only for LCRs whose changes originated at the dbs1.example.com source database.

  • Adds the two rules to the positive rule set associated with the propagation because the inclusion_rule parameter is set to TRUE.

Adding Rules to the Negative Rule Set for a Propagation

The following example runs the ADD_TABLE_PROPAGATION_RULES procedure in the DBMS_STREAMS_ADM package to add rules to the negative rule set of an existing propagation named strm01_propagation:

BEGIN
  DBMS_STREAMS_ADM.ADD_TABLE_PROPAGATION_RULES(
    table_name              => 'hr.departments',
    streams_name            => 'strm01_propagation',
    source_queue_name       => 'strmadmin.strm_a_queue',
    destination_queue_name  => 'strmadmin.strm_b_queue@dbs2.example.com',
    include_dml             => TRUE,
    include_ddl             => TRUE,
    source_database         => 'dbs1.example.com',
    inclusion_rule          => FALSE);
END;
/

Running this procedure performs the following actions:

  • Creates two rules. One rule evaluates to TRUE for row LCRs that contain the results of DML changes to the hr.departments table, and the other rule evaluates to TRUE for DDL LCRs that contain DDL changes to the hr.departments table. The rule names are system generated.

  • Specifies that both rules evaluate to TRUE only for LCRs whose changes originated at the dbs1.example.com source database.

  • Adds the two rules to the negative rule set associated with the propagation because the inclusion_rule parameter is set to FALSE.

Removing a Rule from the Rule Set for a Propagation

You remove a rule from the rule set for an existing propagation by running the REMOVE_RULE procedure in the DBMS_STREAMS_ADM package. For example, the following procedure removes a rule named departments3 from the positive rule set of a propagation named strm01_propagation.

BEGIN
  DBMS_STREAMS_ADM.REMOVE_RULE(
    rule_name        => 'departments3',
    streams_type     => 'propagation',
    streams_name     => 'strm01_propagation',
    drop_unused_rule => TRUE,
    inclusion_rule   => TRUE);
END;
/

In this example, the drop_unused_rule parameter in the REMOVE_RULE procedure is set to TRUE, which is the default setting. Therefore, if the rule being removed is not in any other rule set, then it will be dropped from the database. If the drop_unused_rule parameter is set to FALSE, then the rule is removed from the rule set, but it is not dropped from the database even if it is not in any other rule set.

If the inclusion_rule parameter is set to FALSE, then the REMOVE_RULE procedure removes the rule from the negative rule set for the propagation, not the positive rule set.

To remove all of the rules in the rule set for the propagation, then specify NULL for the rule_name parameter when you run the REMOVE_RULE procedure.

Removing a Rule Set for a Propagation

You remove a rule set from a propagation using the ALTER_PROPAGATION procedure in the DBMS_PROPAGATION_ADM package. This procedure can remove the positive rule set, negative rule set, or both. Specify TRUE for the remove_rule_set parameter to remove the positive rule set for the propagation. Specify TRUE for the remove_negative_rule_set parameter to remove the negative rule set for the propagation.

For example, the following procedure removes both the positive and the negative rule set from a propagation named strm01_propagation.

BEGIN
  DBMS_PROPAGATION_ADM.ALTER_PROPAGATION(
    propagation_name         => 'strm01_propagation',
    remove_rule_set          => TRUE,
    remove_negative_rule_set => TRUE);
END;
/

Note:

If a propagation does not have a positive or negative rule set, then the propagation propagates all messages in the source queue to the destination queue.

Dropping a Propagation

You run the DROP_PROPAGATION procedure in the DBMS_PROPAGATION_ADM package to drop an existing propagation. For example, the following procedure drops a propagation named strm01_propagation:

BEGIN
  DBMS_PROPAGATION_ADM.DROP_PROPAGATION(
    propagation_name      => 'strm01_propagation',
    drop_unused_rule_sets => TRUE);
END;
/

Because the drop_unused_rule_sets parameter is set to TRUE, this procedure also drops any rule sets used by the propagation strm01_propagation, unless a rule set is used by another Oracle Streams client. If the drop_unused_rule_sets parameter is set to TRUE, then both the positive rule set and negative rule set for the propagation might be dropped. If this procedure drops a rule set, then it also drops any rules in the rule set that are not in another rule set.

Note:

When you drop a propagation, the propagation job used by the propagation is dropped automatically, if no other propagations are using the propagation job.