This chapter describes the OCCI implementation of Oracle Streams Advanced Queuing (AQ) for messages.
This chapter contains these topics:
Oracle Streams is a new information sharing feature that provides replication, message queuing, data warehouse loading, and event notification. It is also the foundation behind Oracle Streams Advanced Queuing (AQ).
Advanced Queuing is the integrated message queuing feature that exposes message queuing capabilities of Oracle Streams. AQ enables applications to:
Perform message queuing operations similar to SQL operations from the Oracle database
Communicate asynchronously through messages in AQ queues
Integrate with database for unprecedented levels of operational simplicity, reliability, and security to message queuing
Audit and track messages
Supports both synchronous and asynchronous modes of communication
See Also: http://www.oracle.com/technology/products/dataint/ for more information about the Advanced Queuing feature |
The advantages of using AQ in OCCI applications include:
Create applications that communicate with each other in a consistent, reliable, secure, and autonomous manner
Store messages in database tables, bringing the reliability and recoverability of the database to your messaging infrastructure
Retain messages in the database automatically for auditing and business intelligence
Create applications that leverage messaging without having to deal with a different security, data type, or operational mode
Leverage transactional characteristics of the database
Since traditional messaging solutions have single subscriber queues, a queue must be created for each pair of applications that communicate with each other. The publish/subscribe protocol of the AQ makes it easy to add additional applications (subscribers) to a conversation between multiple applications.
OCCI AQ is a set of interfaces that allows messaging clients to access the Advanced Queuing feature of Oracle for enterprise messaging applications. Currently, OCCI AQ supports only the operational interfaces and not the administrative interface, but administrative operations can be accessed through embedded PL/SQL calls.
See Also: Package DBMS_AQADM in Oracle Database PL/SQL Packages and Types Reference for administrative operations in AQ support through PL/SQL |
The AQ feature can be used with other interfaces available through OCCI for sending, receiving, publishing, and subscribing in a message-enabled database. Synchronous and asynchronous message consumption is available based on a message selection rule.
Enqueuing refers to sending a message to a queue and dequeuing refers to receiving one. A client application can create a message, set the desired properties on it and enqueue it by storing the message in the queue, a table in the database. When dequeuing a message, an application can either dequeue it synchronously by calling receive methods on the queue, or asynchronously by waiting for a notification from the database.
The AQ feature is implemented through the abstractions Message, Agent, Producer, Consumer, Listener and Subscription.
A message is the basic unit of information being inserted into and retrieved from a queue. A message consists of control information and payload data. The control information represents message properties used by AQ to manage messages. The payload data is the information stored in the queue and is transparent to AQ.
An Agent
represents and identifies a user of the queue, either producer or consumer of the message, either an end-user or an application. An Agent
is identified by a name, an address and a protocol. The name can be either assigned by the application, or be the application itself. The address is determined in terms of the communication protocol. If the protocol is 0
(default), the address is of the form[schema.]queuename[@dblink]
, a database link.
Agent
s on the same queue must have a unique combination of name, address, and protocol. Example 10-1 demonstrates an instantiation of a new Agent
object in a client program.
A client uses a Producer
object to enqueue Message
s into a queue. It is also used to specify various enqueue options.
A client uses a Consumer
object to dequeue Message
s that have been delivered to a queue. It also specifies various dequeuing options.
Before a consumer can receive messages,
Example 10-2 Setting the Agent on the Consumer
Consumer cons(conn); ... cons.setAgent(ag); cons.receive();
A Listener
listens for Message
s for registered Agent
s at specified queues.
A Subscription
encapsulates the information and operations necessary for registering a subscriber for notifications.
As mentioned previously, a Message
is a basic unit of information that contains both the properties of the message and its content, or payload. Each message is enqueued by the Producer
and dequeued by the Consumer
objects.
OCCI supports three types of message payloads: RAW, AnyData, and User-defined.
RAW
payloads are mapped as objects of the Bytes Class in OCCI.
The AnyData
type models self-descriptive data encapsulation; it contains both the type information and the actual data value. Data values of most SQL types can be converted to AnyData
, and then be converted to the original data type. AnyData
also supports user-defined data types. The advantage of using AnyData
payloads is that it ensures both type preservation after an enqueue and dequeue process, and that it allows the user to use a single queue for all types used in the application. Example 10-3 demonstrates how to create an AnyData
message. Example 10-4 shows how to retrieve the original data type from the message.
OCCI supports enqueuing and dequeuing of user-defined types as payloads. Example 10-5 demonstrates how to create a payload with a user-defined Employee
object.
Aside from payloads, the user can specify several additional message properties, such as Correlation, Sender, Delay and Expiration, Recipients, and Priority and Ordering.
Applications can specify a correlation identifier of the message during the enqueuing process, as demonstrated in Example 10-6. This identifier can then be used by the dequeuing application.
Applications can specify the sender of the message, as demonstrated in Example 10-7. The sender identifier can then be used by the receiver of the message.
Time settings control the delay and expiration times of the message in seconds, as demonstrated in Example 10-8.
The agents for whom the message is intended can be specified during message encoding, as demonstrated in Example 10-9. This ensures that only the specified recipients can access the message.
By assigning a priority level to a message, the sender can control the order in which the messages are dequeued by the receiver. Example 10-10 demonstrates how to set the priority of a message.
Messages are enqueued by the Producer. The Producer Class is also used to specify enqueue options. A Producer
object can be created on a valid connection where enqueuing is performed, as illustrated in Example 10-11.
The transactional behavior of the enqueue operation can be defined based on application requirements. The application can make the effect of the enqueue operation visible externally either immediately after it is completed, as in Example 10-11, or only after the enclosing transaction has been committed.
To enqueue the message, use the send()
method, as demonstrated in Example 10-11. A client may retain the Message
object after it is sent, modify it, and send it again.
Example 10-11 Creating a Producer, Setting Visibility, and Enqueuing the Message
Producer prod(conn); ... prod.setVisibility(Producer::ENQ_IMMEDIATE); ... Message mes(env); ... mes.setBytes(obj); // obj represents the content of the message prod.send(mes, queueName); // queueName is the name of the queue
Messages delivered to a queue are dequeued by the Consumer
. The Consumer Class is also used to specify dequeue options. A Consumer
object can be created on a valid connection to the database where a queue exists, as demonstrated in Example 10-12.
In applications that support multiple consumers in the same queue, the name of the consumer has to be specified as a registered subscriber to the queue, as shown in Example 10-12.
To dequeue the message, use the receive()
method, as demonstrated in Example 10-12. The typeName
and schemaName
parameters of the receive()
method specify the type of payload and the schema of the payload type.
Example 10-12 Creating a Consumer, Naming the Consumer, and Receiving a Message
Consumer cons(conn); ... // Name must be registered with the queue through administrative interface cons.setConsumerName("BillApp"); cons.setQueueName(queueName); ... Message mes = cons.receive(Message::OBJECT, "BILL_TYPE", "BILL_PROCESSOR"); ... // obj is is assigned the content of the message obj = mes.getObject();
When the queue payload type is either RAW or AnyData, schemaName
and typeName
are optional, but you must specify these parameters explicitly when working with user-defined payloads. This is illustrated in Example 10-13.
Example 10-13 Receiving a Message
//receiving a RAW message Message mes = cons.receive(Message::RAW); ... //receiving an ANYDATA message Message mes = cons.receive(Message::ANYDATA); ...
The dequeuing application can specify several dequeuing options before it begins to receive messages. These include Correlation, Mode, and Navigation.
The message can be dequeued based on the value of its correlation identifier using the setCorrelationId()
method, as shown in Example 10-14.
Based on application requirements, the user can choose to only browse through messages in the queue, remove the messages from the queue, or lock messages using the setDequeueMode()
method, as shown in Example 10-14.
Messages enqueued in a single transaction can be viewed as a single group by implementing the setPositionOfMessage()
method, as shown in Example 10-14.
The Listener listens for messages on queues on behalf of its registered clients. The Listener Class implements the listen()
method, which is a blocking call that returns when a queue has a message for a registered agent, or throws an error when the time out period expires. Example 10-15 illustrates the listening protocol.
Example 10-15 Listening for messages
Listener listener(conn); vector<Agent> agtList; for( int i=0; i<num_agents; i++) agtList.push_back( Agent( name, address, protocol); listener.setAgentList(agtList); listener.setTimeOutForListen(10); Agent agt(env); try{ agt = listener.listen(); } catch{ cout<<e.getMessage()<<endl; }
The Subscription Class implements the publish-subscribe notification feature. It allows an OCCI AQ application to receive client notifications directly, register an e-mail address to which notifications can be sent, register an HTTP URL to which notifications can be posted, or register a PL/SQL procedure to be invoked on a notification. Registered clients are notified asynchronously when events are triggered or on an explicit AQ enqueue. Clients do not have to be connected to a database.
An OCCI application can do all of the following:
Register interest in notification in the AQ namespace, and be notified when an enqueue occurs.
Register interest in subscriptions to database events, and receive notifications when these events are triggered.
Manage registrations, such as disable registrations temporarily, or dropping registrations entirely.
Post (or send) notifications to registered clients.
Notifications can work in several ways. They can be:
received directly by the OCCI application
sent to a pre-specified e-mail address
sent to a pre-defined HTTP URL
invoke a pre-specified database PL/SQL procedure
Registered clients are notified asynchronously when events are triggered, or on an explicit AQ enqueue. Clients do not have to be connected to a database for notifications to work. Registration can be accomplished either as Direct Registration or Open Registration.
You can register directly with the database. This is relatively simple, and the registration takes effect immediately. Example 10-16 outlines the required steps to successfully register for direct event notification. It is assumed that the appropriate event trigger or queue is in existence, and that the initialization parameter COMPATIBLE
is set to 8.1
or higher.
Example 10-16 How to Register for Notifications; Direct Registration
Create the environment in Environment::EVENTS
mode.
Create the Subscription
object.
Set these Subscription
attributes.
The namespace
can be set to these options:
To receive notifications from AQ queues, namespace
must be set to Subscription::NS_AQ
. The subscription name is then either of the form SCHEMA.QUEUE
when registering on a single consumer queue, or SCHEMA.QUEUE:
CONSUMER_NAME
when registering on a multiconsumer queue.
To receive notifications from other applications that use conn->postToSubscription()
method, namespace
must be set to Subscription::NS_ANONYMOUS
The protocol
can be set to these options:
If an OCCI client must receive an event notification, this attribute should be set to Subscription::PROTO_CBK
. You also must set the notification callback and the subscription context before registering the Subscription
. The notification callback is called when the event occurs.
For an e-mail notification, set the protocol to Subscription::PROTO_MAIL
. You must set the recipient name before subscribing to avoid an application error.
For an HTTP URL notification, set the protocol to Subscription::HTTP
. You must set the recipient name before subscribing to avoid an application error.
To invoke PL/SQL procedures in the database on event notification, set protocol to Subscription::PROTO_SERVER
. You must set the recipient name before subscribing to avoid an application error.
Register the subscriptions using connection->registerSubscriptions()
.
You can also register through an intermediate LDAP that sends the registration request to the database. This is used when the client cannot have a direct database connection; for example, the client wants to register for an open event while the database is down. This approach is also used when a client wants to register for the same event(s) in multiple databases, concurrently.
Example 10-17 outlines the LDAP open registration using the Oracle Enterprise Security Manager (OESM). Open registration has these prerequisites:
The client must be an enterprise user
In each enterprise domain, create an enterprise role ENTERPRISE_AQ_USER_ROLE
For each database in the enterprise domain, add a global role GLOBAL_AQ_USER_ROLE
to enterprise the role ENTERPRISE_AQ_USER_ROLE
.
For each enterprise domain, add an enterprise role ENTERPRISE_AQ_USER_ROLE
to the privilege group cn=OracleDBAQUsers
under cn=oraclecontext
in the administrative context
For each enterprise user that is authorized to register for events in the database, grant enterprise the role ENTERPRISE_AQ_USER_ROLE
The compatibility of the database must be 9.0 or higher
LDAP_REGISTRATION_ENABLED
must be set to TRUE
(default is FALSE
):
ALTER SYSTEM SET LDAP_REGISTRATION_ENABLED=TRUE
LDAP_REG_SYNC_INTERVAL
must be set to the time_interval
(in seconds) to refresh registrations from LDAP (default is 0
, do not refresh):
ALTER SYSTEM SET LDAP_REG_SYNC_INTERVAL = time_interval
To force a database refresh of LDAP registration information immediately, issue this command:
ALTER SYSTEM REFRESH LDAP_REGISTRATION
Example 10-17 How to Use Open Registration with LDAP
Create the environment in Environment::EVENTS|Environment::USE_LDAP
mode.
Set the Environment
object for accessing LDAP:
The host and port on which the LDAP server is residing and listening
The authentication method; only simple username and password authentication is currently supported
The username (distinguished name) and password for authentication with the LDAP server
The administrative context for Oracle in the LDAP server
Create the Subscription
object.
Set the distinguished names of the databases in which the client wants to receive notifications on the Subscription
object.
Set these Subscription
attributes.
The namespace
can be set to these options:
To receive notifications from AQ queues, namespace
must be set to Subscription::NS_AQ
. The subscription name is then either of the form SCHEMA.QUEUE
when registering on a single consumer queue, or SCHEMA.QUEUE:
CONSUMER_NAME
when registering on a multiconsumer queue.
To receive notifications from other applications that use conn->postToSubscription()
method, namespace
must be set to Subscription::NS_ANONYMOUS
The protocol
can be set to these options:
If an OCCI client must receive an event notification, this attribute should be set to Subscription::PROTO_CBK
. You also must set the notification callback and the subscription context before registering the Subscription
. The notification callback is called when the event occurs.
For an e-mail notification, set the protocol to Subscription::PROTO_MAIL
. You must then set the recipient name to the e-mail address to which the notifications must be sent.
For an HTTP URL notification, set the protocol to Subscription::HTTP
. You must set the recipient name to the URL to which the notification must be posted.
To invoke PL/SQL procedures in the database on event notification, set protocol to Subscription::PROTO_SERVER
. You must set the recipient name to the database procedure invoked on notification.
Register the subscription: environment->registerSubscriptions()
.
Open registration takes effect when the database accesses LDAP to pick up new registrations. The frequency of pick-ups is determined by the value of REG_SYNC_INTERVAL
.
Clients can temporarily disable subscriptions, re-enable them, or permanently unregister from future notifications.
The client must register a notification callback. This callback is invoked only when there is some activity on the registered subscription. In the Streams AQ namespace, this happens when a message of interest is enqueued.
The callback must return 0
, and it must have the following specification:
typedef unsigned int (*callbackfn) (Subscription &sub, NotifyResult *nr);
where:
The sub
parameter is the Subscription
object which was used when the callback was registered.
The nr
parameter is the NotifyResult
object holding the notification info.
Ensure that the subscription object used to register for notifications is not destroyed until it explicitly unregisters the subscription.
The user can retrieve the payload, message, message id, queue name and consumer name from the NotifyResult
object, depending on the source of notification. These results are summarized in Table 10-1. Only a bytes payload is currently supported, and you must explicitly dequeue messages from persistent queues in the AQ namespace. If notifications come from non-persistent queues, messages are available to the callback directly; only RAW
payloads are supported. If notifications come from persistent queues, the message has to be explicitly dequeued; all payload types are supported.
Table 10-1 Notification Result Attributes; ANONYMOUS and AQ Namespace
Notification Result Attribute | ANONYMOUS Namespace | AQ Namespace, Persistent Queue | AQ Namespace, Non-Persistent Queue |
---|---|---|---|
payload |
valid |
invalid |
invalid |
message |
invalid |
invalid |
valid |
messageID |
invalid |
valid |
valid |
consumer name |
invalid |
valid |
valid |
queue name |
invalid |
valid |
valid |
Applications often use data in different formats, and this requires a type transformation. A transformation is implemented as a SQL function that takes the source data type as input and returns an object of the target data type.Transformations can be applied when message are enqueued, dequeued, or when they are propagated to a remote subscriber.
See Also:
The following chapters of the Oracle Streams Advanced Queuing User's Guide for information of format transformation: