HP SQLMX Publish Subscribe Functionality

What Is Publish/Subscribe?

Traditionally, relational database systems are passive data stores. Applications can
only retrieve and update data. Applications cannot request notification when new data
becomes available or when existing data is updated. However, immediate notification
of additions and updates to the database is essential for many applications.
Publish/subscribe is an event-driven mechanism in which a process known as a
subscriber receives notification of a published event from a process known as a
publisher. Queuing differs from publish/subscribe in that one, and only one, subscriber
consumes the published event or message. Queuing and publish/subscribe is often
used in applications in which a process needs to know of the existence of a timely
event, such the availability of a stock at a certain price or the arrival of a parcel at a
particular location.
Queuing and publish/subscribe services allow applications to receive notification of
changes to a database as soon as they occur. Applications might update and retrieve
data, but might need to be notified when data is updated or new data is added.
Publish/subscribe makes it possible to be notified in real time, and at less cost to the
system.

Why Use Publish/Subscribe?

Suppose that you have a system to process new invoices. The invoices are stored in a
table, and a column called PROCESSED indicates unprocessed invoices. An
application processes these new invoices and updates the PROCESSED column to
show they have been processed. After all the invoices have been processed, the
application tries to fetch another unprocessed invoice and receives an end-of-file
condition.
Without publish/subscribe services, this process is more costly. To retrieve new
invoices that are inserted into the table after the end-of-file condition, the application
must close its cursor and reopen it, perhaps after waiting an interval, to see if new
invoices have arrived. This polling requires the closing and reopening of the cursor,
which has significant performance costs. It also requires application code to handle the
looping and end-of-file conditions.
You would probably code this application with an updatable cursor using a selection
predicate to include rows whose PROCESSED column is set to FALSE. You might also
use a positioned UPDATE statement to change the PROCESSED column to TRUE.
The application would need both of these statements to retrieve the information it
needs to process the invoice, and to make sure the invoice is processed only once. As
a result, every invoice would need two calls to the SQL executor, again at the expense
of extra performance costs and
application complexity.
 
 
If you simply started many instances of the application in an attempt to make this
application scalable, they would conflict with each other. As one instance held a lock
on the next invoice that needed processing, the others would have to wait. To avoid
conflict, you would have to devise a logical partitioning scheme so each instance
processed a subset of the invoices, adding complexity to the application and the
database schema.
Publish/Subscribe services provides solutions for each of these problems:
Rather than polling for changes to the invoice table, the application can use stream
access mode. Stream access mode changes the behavior of the table: an attempt
to fetch a row blocks, if necessary, until more rows are available. The application
code becomes simpler and performs better.
Instead of using a positioned UPDATE to mark the row as processed, the
application can use a special type of statement, an embedded UPDATE, to select
and update a row in a single call. Again, the application code is simpler and
performs better.
 
Publish/subscribe can use a special access mode, skip conflict access. As a result,
the various instances of the invoice processing application skip rows that are
locked and select and update other available rows. The application achieves better
performance without having to use a partitioning scheme.
 
Without Pub/Sub
A. retrieve record
B. process ... ...
C. update status
 
With Pub/Sub
A. retrieve record(Embeded update)
B. process ... ...
 
Publish/subscribe supports only audited tables.
 

Terminologys about Publish/Subscrib

Publishing, Inserting or updating rows of data in a queue or channel by using an
INSERT or UPDATE statement.
Subscribing, Subscribing to entries in a queue or channel by using a SELECT
statement specifying stream access to be notified when changes
occur.
Stream access, Changes the behavior of a table in a SELECT or embedded
UPDATE or DELETE statement. When no more rows satisfy the
selection predicate, the statement waits for more rows.
Queue, A database table read and updated in stream access mode, using an
UPDATE or DELETE statement embedded in the FROM clause of a
SELECT statement. Only a single subscriber will receive the new
entry.
Channel, A database table used by applications specifying stream access to
subscribe to newly published entries. The entries remain in the table.
All subscribers receive new entries.
 
Queuing enqueueing,Inserting entries into a queue by using an INSERT or UPDATE
statement.
Dequeueing, Reading and deleting entries with a single operation by using a
SELECT statement with an embedded DELETE or UPDATE. This
dequeue operation is sometimes referred to as a destructive
SELECT. For another description of a destructive SELECT
Semi-queuing,Reading a queue by skipping rows that are locked by another
transaction so that transactions do not block each other.
 
 
Major Features
Stream access mode
Embedded DELETE and UPDATE statements
Skip conflict access mode
Set on rollback
Holdable cursors
Rowset integration
Horizontally partitioned tables
Ordered streams
 

1. Stream Access

    It allows an application to access SQL/MP or SQL/MX database tables as continuous data streams.
The stream access mode first causes a regular scan of the table and, after all available
rows have been returned, causes fetch operations to wait (block) instead of returning
the end-of-data condition. The fetch operation resumes when new rows become
available.
 
  1. CREATE TABLE sos.ecui.quotes
  2. (symbol CHAR(5), price INT);
  3.  
  4. SET NAMETYPE ANSI;
  5. SET SCHEMA sos.ecui;
  6. SELECT * FROM STREAM(quotes);
  7. //This application is in a wait state until rows are inserted into the table.
  8.  
  9. SYMBOL  PRICE
  10. ------  -----------
  11.  
  12. IBM              10
  13. APP              11
  14.  
  15. >>INSERT INTO quotes VALUES('IBM',10);
  16.  
  17. ---1 row(s) inserted.
  18. >>INSERT INTO quotes VALUES('APP',11);
  19.  
  20. ---1 row(s) inserted.
  21.  
  22. UntilBreake it.
  23.  
  24. *** WARNING[15033]Break was received.
  25.  
  26.  
  27. *** WARNING[15018]Break was received.  Thelast statement may be lost.
 
 Another way to stop it, set the timeout for the stream. the default value is -1.
The STREAM_TIMEOUT value is set to 300 in hundredths of seconds, which is
equivalent to 3 seconds.  When the stream times out, control is returned to the application.
If the application is MXCI, the user will not be able to use the cursor, even though it is still opened. This is
because MXCI does not allow the user to control (open, fetch, and close) cursors
directly. However, a user-written application can use the cursor to fetch again.
 
 
  1. >>CONTROL QUERY DEFAULT stream_timeout '300';
  2.  
  3. --- SQL operation complete.
  4. >>SELECT * FROM STREAM(quotes);
  5.  
  6. SYMBOL  PRICE
  7. ------  -----------
  8.  
  9. IBM              10
  10. APP              11
  11.  
  12. *** ERROR[8006]The stream timed out, but the cursor is still open.
  13.  
  14. ---2 row(s) selected.
 

2. Embedded DELETE

Embedded DELETE statements allow applications to read and delete rows with a
single operation. Dequeue operations are implemented by using an embedded
DELETE together with stream access.
Destructive SELECTs,
Support for SQL/MP and SQL/MX Tables
 
  1. CREATE TABLE ecui.invoices
  2. (contractnbr INT, amount INT, priority INT);
  3.  
  4. INSERT INTO invoices VALUES(100,10500,1);
  5. INSERT INTO invoices VALUES(200,20390,2);
  6. INSERT INTO invoices VALUES(300,30800,3);
  7.  
  8. >>SELECT * FROM
  9. +>(DELETE FROM STREAM(invoices)) AS invoices;
  10.  
  11. CONTRACTNBR  AMOUNT       PRIORITY
  12. -----------  -----------  -----------
  13.  
  14.         100        10500            1
  15.         200        20390            2
  16.         300        30800            3
  17.  
  18. *** ERROR[8006]The stream timed out, but the cursor is still open.
  19.  
  20. ---3 row(s) selected.
  21. >>SELECT * FROM invoices;
  22.  
  23. ---0 row(s) selected.
The available rows are retrieved and deleted.The application is now in a wait stateuntil more rows are inserted into the table. or the stream timeout. And, we can use RollBack to cancle the Deletion. Because the test will delete the records, so we will control the transaction manurally. so we need to know something about BEGIN WORK statment;

The BEGIN WORK statement enables you to start a transaction explicitly—where the transaction consists of the set of operations defined by the sequence of SQL statements that begins immediately after BEGIN WORK and ends with the next COMMIT WORK or ROLLBACK WORK statement. The BEGIN WORK statement has no effect on nonaudited tables.

the structure is asbelow,

  1. ---This statement initiates a transaction.
  2. BEGIN WORK;
  3. --- SQL operation complete.
  4.  there are some other SQL operations......
  5. ---This statement ends a transaction.
  6. COMMIT WORK;/ ROLLBACK WORK;
  7. --- SQL operation complete.
  8.  
  9. CONTROL QUERY DEFAULT stream_timeout '300';
  10. BEGIN WORK;
  11. SELECT * FROM
  12. (DELETE FROM STREAM(invoices)) AS invoices;
  13. CONTRACTNBR AMOUNT PRIORITY
  14. ---------------------------------
  15. 100105001
  16. 200203902
  17. 300308003
  18. *** ERROR[8006]The stream timed out,
  19. but the cursor is still open.
  20. ROLLBACK WORK;
  21. SELECT * FROM invoices;
  22. CONTRACTNBR AMOUNT PRIORITY
  23. ---------------------------------
  24. 100105001
  25. 200203902
  26. 300308003
  27. ---3 row(s) selected.
  28.  
  29.  
  30. the differences between "delete form table_name where ......."?
  31. Directlydelete just delete it, cannot get the value in this record and process it.
  32. With the Pub/Sub, we can get this record and then process it and delete it at last. 

3. Embedded UPDATE

Embedded UPDATE statements enable applications to read and update rows with a
single operation.
 
  1. CREATE TABLE ecui.orders
  2. (order_nbr INT,
  3. amount INT,
  4. status CHAR(1) DEFAULT 'N');
  5.  
  6. INSERT INTO ORDERS(ORDER_NBR,AMOUNT) VALUES(1,500);
  7. INSERT INTO ORDERS(ORDER_NBR,AMOUNT) VALUES(2,4500);
  8. INSERT INTO ORDERS(ORDER_NBR,AMOUNT) VALUES(3,2100);
  9.  
  10. >>SELECT * FROM ORDERS;
  11.  
  12. ORDER_NBR    AMOUNT       STATUS
  13. -----------  -----------  ------
  14.  
  15.           1          500  N
  16.           2         4500  N
  17.           3         2100  N
  18.  
  19. ---3 row(s) selected.
  20.  
  21. >>SELECT * FROM
  22. +>(UPDATE STREAM(orders) SET status ='Y' WHERE status ='N')
  23. +>AS orders;
  24.  
  25. ORDER_NBR    AMOUNT       STATUS
  26. -----------  -----------  ------
  27.  
  28.           1          500  Y
  29.           2         4500  Y
  30.           3         2100  Y
  31.  
  32. *** ERROR[8006]The stream timed out, but the cursor is still open.
  33.  
  34. ---3 row(s) selected.
The differences between (update table_name set ..... where .....).

4. Skip Conflict Access

The skip conflict access method is important in implementing an efficient transactional
queuing feature. It prevents concurrent transactions from blocking each other by
waiting for the release of locks on rows currently being inserted or updated while other
rows are available that are not currently locked by another transaction.
 
  1. INSERT INTO invoices VALUES(100,10500,1);
  2. INSERT INTO invoices VALUES(200,20390,2);
  3. INSERT INTO invoices VALUES(300,30800,3);
  4.  
  5. CONTROL QUERY DEFAULT stream_timeout '-1';
  6. SET NAMETYPE ANSI;
  7. SET SCHEMA sos.ecui;
  8.  
  9. >>SELECT * FROM
  10. +>(DELETE FROM STREAM(invoices)
  11. +>FOR SKIP CONFLICT ACCESS) AS invoices;
  12.  
  13. CONTRACTNBR  AMOUNT       PRIORITY
  14. -----------  -----------  -----------
  15.  
  16.         100        10500            1
  17.         200        20390            2
  18.         300        30800            3
 
in another session
 
  1. CONTROL QUERY DEFAULT stream_timeout '-1';
  2. SET SCHEMA sos.ecui;
  3. SELECT * FROM
  4. (DELETE FROM STREAM(invoices)
  5. FOR SKIP CONFLICT ACCESS) AS invoices;
 
in the 3rd session,
  1. CONTROL QUERY DEFAULT stream_timeout '-1';
  2. SET NAMETYPE ANSI;
  3. SET SCHEMA sos.ecui;
  4. INSERT INTO invoices VALUES(600,54800,6);
 
then we can find the 1st session get this new record
  1. CONTRACTNBR  AMOUNT       PRIORITY
  2. -----------  -----------  -----------
  3.  
  4.         100        10500            1
  5.         200        20390            2
  6.         300        30800            3
  7.         600        54800            6
 
5. AFTER LAST ROW
The stream skips all existing rows and returns rows published after the stream's cursor
was opened.
  1. >>SELECT * FROM INVOICES;
  2.  
  3. CONTRACTNBR  AMOUNT       PRIORITY
  4. -----------  -----------  -----------
  5.  
  6.         100        10500            1
  7.         200        20390            2
  8.         300        30800            3
  9.  
  10. ---3 row(s) selected.
  11.  
  12. CONTROL QUERY DEFAULT stream_timeout '-1';
  13. SELECT * FROM STREAM(invoices) AFTER LAST ROW;
 
In another session, we insert two records,
  1. >>INSERT INTO invoices VALUES(400,54800,4);
  2.  
  3. ---1 row(s) inserted.
  4. >>INSERT INTO invoices VALUES(400,54800,5);
  5.  
  6. ---1 row(s) inserted.
Meanwhile, in the 1st session, those two records will be retrieved.

Note that in the case where a publisher's transaction was in progress
when the subscriber’s statement began, not all the transaction's rows would be
returned to the subscriber. Instead only the rows inserted or updated after the
statement began executing would be returned.
  1. SELECT * FROM STREAM(invoices) AFTER LAST ROW;
  2. >>SELECT * FROM STREAM(invoices) AFTER LAST ROW;
  3.  
  4. CONTRACTNBR  AMOUNT       PRIORITY
  5. -----------  -----------  -----------
  6.  
  7.         400        54800            4
  8.         400        54800            5
 

6. Set Column Values On Rollback

Allows an application to update columns when
aborting a transaction during an embedded
DELETE or UPDATE operation. This field should be "NOT NULL"
 
 
  1. CREATE TABLE TEST_COL
  2. (
  3. ID INT DEFAULT NULL,
  4. NAME CHAR(20) DEFAULT NULL,
  5. COUNTER INT NOT NULL
  6. );
  7.  
  8. INSERT INTO TEST_COL VALUES(1,'ENDER',2);
  9. INSERT INTO TEST_COL VALUES(2,'ROCKY',2);
  10. CONTROL QUERY DEFAULT stream_timeout '300';
  11. COMMIT WORK or ROLLBACK WORK
  12. >>select * from TEST_COL;
  13.  
  14. ID           NAME                  COUNTER
  15. -----------  --------------------  -----------
  16.  
  17.           1  ENDER                           2
  18.           2  ROCKY                           2
  19.  
  20. ---2 row(s) selected.
  21. >>BEGIN WORK;
  22. --- SQL operation complete.
  23. >>SELECT * FROM(DELETE FROM STREAM(TEST_COL)
  24. +>SET ON ROLLBACK COUNTER = COUNTER +1
  25. +>FOR SKIP CONFLICT ACCESS) AS TEST_COL;
  26.  
  27. ID           NAME                  COUNTER
  28. -----------  --------------------  -----------
  29.  
  30.           1  ENDER                           2
  31.           2  ROCKY                           2
  32.  
  33. *** ERROR[8006]The stream timed out, but the cursor is still open.
  34.  
  35. ---2 row(s) selected.
  36. >>select * from TEST_COL;
  37.  
  38. ---0 row(s) selected.
  39. >>ROLLBACK WORK;
  40.  
  41. --- SQL operation complete.
  42. >>select * from TEST_COL;
  43.  
  44. ID           NAME                  COUNTER
  45. -----------  --------------------  -----------
  46.  
  47.           1  ENDER                           3
  48.           2  ROCKY                           3
  49.  
  50. ---2 row(s) selected.
 
7. Holdable Cursors
A holdable cursor enables an application to retain an open cursor across transactions.
This feature is supported only for cursors using the stream access mode or for cursors
defined with a table reference that uses an embedded UPDATE or DELETE.
The WITH HOLD clause can also be used with dynamic cursors, extended dynamic
cursors, and allocated cursors
 
Below DECLARE CURSOR get_invoices,
statement specifies the cursor is to remain open and maintain its position in the result
set even though a user-specified transaction has terminated。
  1. EXEC SQL DECLARE get_invoices CURSOR WITH HOLD FOR
  2. SELECT amount,contractnbr
  3. FROM
  4. (DELETE FROM STREAM(invoices)
  5. FOR SKIP CONFLICT ACCESS) AS invoices;
 
Because the cursor was declared WITH HOLD, it is not closed by the execution of the
COMMIT WORK statement. Only an error condition or a concurrent DDL or utility
operation can cause a cursor to close. For example, an error such as a system or disk
process failure that would cause any cursor to close, causes a holdable cursor to
close. Before using a cursor closed by an error condition, the application must reopen
the cursor.
Applications that delete or update many rows, using embedded DELETEs and
UPDATEs, experience a significant performance boost if they use a holdable cursor
and are coded to avoid closing and reopening the cursor with each COMMIT WORK
statement.

8. Rowset Integration

The integration of rowsets into the queuing and publish/subscribe features of SQL/MX
enables applications to enqueue or dequeue multiple rows at one time instead of
having to enqueue or dequeue rows one at a time.
Note, not supported in Java applications.
The cursor fetch into a rowset from a stream waits until the rowset is filled.
 
The rowsets are retrieved ten (or fewer than ten) at a time until the application must
wait for new arrivals.
 

9. Partitioned Queues

10. Ordered Streams

11. Joins

12. Run-Time Limits on Streams

13. Restarting a Subscriber

14. Embedded SQL

 
 





原文地址:https://www.cnblogs.com/ECNB/p/4611230.html