Coral8 – Randomly Sampling a Stream

The example below randomly samples an input stream and inserts samples into a named window for later processing. A variable named Dice is “rolled” each time a new row enters the input stream. A row is copied into the window if Dice = 1.

-- Variable for random number
CREATE VARIABLE INTEGER Dice;
CREATE VARIABLE INTEGER DiceSides = 6;
 
-- Input stream
CREATE INPUT STREAM StreamIn
SCHEMA (
    Val INTEGER
);
 
-- Each time a row enters the input stream,
-- roll the Dice
ON StreamIn
SET Dice =
    TO_INTEGER(RANDOM() * TO_FLOAT(DiceSides)) + 1;
 
-- Some data entering the input stream
ATTACH INPUT ADAPTER RMG
    TYPE RandomTuplesGeneratorAdapterType
TO STREAM StreamIn
PROPERTIES
    RATE     = "20"
    ,ROWCOUNT = "600"
;
 
-- Named window for samples
CREATE WINDOW WR
SCHEMA(
    Val INTEGER
    ,Dice INTEGER
)
KEEP 150 ROWS
;
 
-- Randomly sample the input stream
INSERT
    WHEN Dice = 1 THEN WR
SELECT Val, Dice
    FROM StreamIn
;


Here is a result, out of 600 messages that entered the input stream, 96 ended in the window; I was using a six-sided die — not bad.

 

Read from DB – Coral8 [8]

As previously discussed, the Coral8 Read from Database Adapter is best suited for replay of historical records, or for testing algorithms against existing data.

For this article, I will create a program to monitor a measurement stream and raise alarms based on a statistical index. The adapter will be used to read existing data from one of my databases and test the program. I have some real-life measurements generated during assembly of car parts. Among other columns, the table of measurements includes:

  • Record Time — time-stamp;
  • Serial Number — unique part id;
  • Piston Press Force — a measurement.

Here is a SQL snippet:

CREATE TABLE dbo.PartData(
    RecordTime datetime
    ,SerialNumber varchar(13)
    ,PistonPressForce decimal(8,3)
    -- more columns here
);

This will do for the example, now the to-do list:

  1. Create an input stream.
  2. Attach the read-database adapter to the stream.
  3. Test the connection to the DB.
  4. Create a stream for statistical data (average, sigma, Cpk).
  5. Create a query to calculate statistics over a window of data rows.
  6. Test the statistics.
  7. Create an output stream for alarm messages.
  8. Create a query to raise alarm messages.
  9. Test alarms.

 

1. The Input Stream

The schema of the input stream matches database columns. Note the column-type mappings between SQL Server and Coral8. Stream fields have same names as database columns — this is necessary for the adapter type.

-- Input stream
CREATE INPUT STREAM StreamIn
SCHEMA (
    RecordTime TIMESTAMP
    ,SerialNumber STRING
    ,PistonPressForce FLOAT
);

 

2. The Adapter

As in the previous post, I have used point-and-click to insert the adapter; check out the adapter form.

-- Read from DB adapter
ATTACH INPUT ADAPTER ReadFromDB
    TYPE ReadFromDatabaseAdapterType
TO STREAM StreamIn
PROPERTIES
    DBNAME = "RCI4122_1"
    ,TABLE = "PartData"
    ,TIMESTAMPCOLUMN = "RecordTime"
    ,RATE = "1"
    ,WHERECLAUSE = 
        "PistonPressForce  > 0"
    ,TIMESTAMPCOLUMNINITVALUE =
        "2008-01-01"
;


The adapter parameters:

Field Comment
DB Connection Name Name of the database connection as specified in the coral8-services.xml file. For more details see the previous post.
Table or View Name of the database table.
Where Clause The expression in the WHERE clause of the query.
Loop Count How many times to loop through the data.
Rate Number of reads per second.
Timestamp column Name of the time-stamp column in the database.
Timestamp column initial value Initial value for the time-stamp. In the example above, the adapter reads all records since New Year 2008.


The adapter example is an equivalent of running this SQL query:

USE RCI4122_1;
SELECT RecordTime, SerialNumber, PistonPressForce
  FROM PartData 
  WHERE RecordTime  > '2008-01-01'
    AND PistonPressForce  > 0
ORDER BY RecordTime;

 

3. Test the Connection to the DB

Looks OK.

 

4. Stream for Statistical Data

I would like to calculate the following statistics for the measurement:

  • average,
  • sigma (standard deviation) ,
  • Cpk — process capability index — is a distance from the average to the nearest specification limit measured in three-sigma units.

The stream definition:

-- Local stream for stats
CREATE LOCAL STREAM StreamStats
SCHEMA (
    RecordTime TIMESTAMP
    ,p_Avg FLOAT
    ,p_Std FLOAT
    ,p_CpkUp FLOAT
    ,p_CpkLo FLOAT
);

The Cpk is broken into the p_CpkUp and the p_CpkLo. The p_CpkUp is the distance from the average to the upper limit, and the p_CpkLo is the distance to the lower limit; distances are in three-sigma units.

 

5. Query to Calculate Statistics

Process limits could be read from a database or a file, but using pre-defined variables is simpler.

-- Lower and Upper limits
CREATE VARIABLE FLOAT p_LSL = 2.485;
CREATE VARIABLE FLOAT p_USL = 2.515;

The query creates a sliding window of last 35 rows and inserts calculated statistics into StreamStats. The 35 is frequently used as a minimum sample size for this type of statistics to make sense. The query updates after each new row of data (new part) and calculates statistics over the last 35 parts.

-- Calculate stats
INSERT INTO StreamStats
SELECT
  max(StreamIn.RecordTime)
  ,avg(StreamIn.PistonPressForce)
  ,STDDEVIATION(StreamIn.PistonPressForce)
  ,(p_USL - avg(StreamIn.PistonPressForce))/
     (3 * STDDEVIATION(StreamIn.PistonPressForce))
  ,(avg(StreamIn.PistonPressForce) – p_LSL)/
     (3 * STDDEVIATION(StreamIn.PistonPressForce))
FROM StreamIn
    KEEP 35 ROWS
;

 

6. Test the Statistics

This works nice, now I will raise an alarm when the Cpk drops below 1.33 — when the distance between the average and a limit is less than four sigma.

 

7. Output Stream for Alarm Messages

The output stream has: an alarm message, a time-stamp and the value which raised the alarm.

-- Output stream for Cpk alarm
CREATE OUTPUT STREAM StreamOut
SCHEMA (
    RecordTime TIMESTAMP
    ,p_Cpk FLOAT
    ,AlmMsg STRING
);

 

8. Query to Raise Alarms

Selects all rows from StreamStats with the Cpk below 1.33 and inserts the time-stamp, the smaller Cpk value and the alarm message “Low Cpk” into StreamOut.

-- Monitor Cpk and raise alarm
INSERT INTO StreamOut
    SELECT RecordTime, min(p_CpkUp, p_CpkLo), 'Low Cpk'
FROM StreamStats
    WHERE p_CpkUp <  1.33
       OR p_CpkLo <  1.33
;

 

9. Test Alarms

The final test shows alarm messages in the output stream, generated when the Cpk drops below 1.33.

 

The next step would be to attach an output adapter to this stream to log alarms and send notifications. The code still needs some optimization; once that is done the input adapter can be replaced by a live-measurement feed to generate alarms in real-time.

 

Poll from DB – Coral8 [7]

I read somewhere that — when it comes to programming — being lazy is good. Hence, I will reuse as much code as possible from previous posts. The objective for today is to use Coral8 Poll from Database Input Adapter.

First, let us look at the database (same as here):

CREATE TABLE dbo.GpsData(
	Entity varchar(25) NOT NULL
	,Latitude float
	,Longitude float
	,Course float
	,Speed float
	,TimeUTC datetime NOT NULL
)

A caveat; the SQL server DATETIME type is not time zone aware, while the Coral8 TIMESTAMP is. If the time zone is not specified, Coral8 assumes UTC. In general, this is not a problem; however, stream viewers always display time in local format, so for me (Toronto, EST) all database time-stamps are shifted by five hours. One way to remedy the problem is to use the Timezone parameter in the service specification, like this:

<!-- DB Time Zone -->
<param name="Timezone">EST</param>

The code has to be added to the coral8-services.xml file; here is the whole section for the database used:

<!-- DamirTest on DAMIR5 -->
<service name="DamirTest" type="DATABASE">
  <description>Connection to DamirTest</description>
  <param name="DBDriverType">DBDriverODBC</param>
  <!-- DBDriverConnectString = DSN name -->
  <param name="DBDriverConnectString" >DamirTest</param>
  <param name="DBReadNoCommit">True</param>
  <!-- Username and Password -->
  <param name="Username">username_here</param>
  <param name="Password">password_here</param>
  <!-- DB Time Zone -->
  <param name="Timezone">EST</param>
</service>

Now I need a stream to read the records into. For the example, I will use only: time-stamp, entity, latitude, and longitude.

-- Input stream
CREATE INPUT STREAM StreamIn
SCHEMA (
    DbTime TIMESTAMP
    ,Entity STRING
    ,Latitude FLOAT
    ,Longitude FLOAT
);

You may have noticed that I have chosen different names for time-stamps: DbTime vs. TimeUTC. This was done on purpose, to clarify adapter definition. Here is the adapter:

-- Poll from Db adtapter
ATTACH INPUT ADAPTER PollDB
    TYPE ReadFromDBAdapterType
TO STREAM StreamIn
PROPERTIES
    DBNAME = "DamirTest",
    QUERY  = [[
SELECT TimeUTC, Entity, Latitude, Longitude
FROM GpsData
WHERE TimeUTC > ?C8_TIMESTAMP_FIELD
ORDER BY TimeUTC;
]],
TIMESTAMPFIELD = "DbTime"
,TIMESTAMPFIELDINITVALUE =
    "2009-01-01 00:00:00"
,POLLINTERVAL = "2000000"
;

Looks complicated, however the code was auto-generated using a point-and-click  procedure and filling-in the adapter form.


The adapter TYPE ReadFromDBAdapterType may sound a bit confusing, but this its the Poll Adapter.

Field Comment
DBConnectionName Name of the database connection as specified in the coral8-services.xml file.
Poll Interval Interval at which the adapter polls the database. Note that the form allows time units, while the generated code uses microseconds.
Timestamp field Refers to the name of the time-stamp field in the stream schema.
Timestamp field initial value Initial value for the time-stamp. In the example above — upon start-up — the adapter reads all records since New Year 2009.
Query The query to run against the database. Thing of everything from the WHERE clause on, as a boiler-plate example. The TimeUTC is the timestamp column name in the database.


As described in the previous post, the adapter can use a time-stamp field or a counter (ID) field to remember the last record read. I am using the time-stamp, hence counter fields are left empty. The last row of the query, ORDER BY TimeUTC must be present, because the the adapter remembers the last value read — not necessarily the latest one.

When the server is started, the adapter reads all records between the TIMESTAMPFIELDINITVALUE and NOW().

 

When I manually insert a row into the database using:

INSERT INTO dbo.GpsData
	(TimeUTC, Entity, Latitude, Longitude)
VALUES 
	(getUTCdate(), 'ABC', 1.0, 2.0)
;

The new row appears in the stream viewer too.

From this point on, only new database rows are added to the stream. All looks fine, but one problem still remains. Suppose we run this for several months and then re-start the server. The adapter would (upon start-up) read all these records again.

To avoid the scenario, the module persistence option should be enabled for modules containing the Poll DB Adapter. The persistence can be enabled in Coral8 Studio on the module property tab. With the persistence enabled, Coral8 periodically stores all module variables and rows to a disk dive, and re-loads them upon server start-up. This way the adapter will remember the timestamp of the latest row and will continue from that point.

 

DB as an Event Source – Coral8 [6]

These days, almost every action in an organization is reflected in some kind of a database. Invoices, purchase orders, shipping/receiving logs, production reports, etc. — all rely on an underlying database. By tapping into databases, it is possible to bring together events (data) from independent systems and benefit from the application of EP/CEP even within a simple IT infrastructure (no ESB, no enterprise architecture, systems added in an ad hoc manner).

There seems to be three basic ways for Coral8 to read from a database:

  1. Read from the Database input adapter;
  2. Poll from the Database input adapter;
  3. Read directly from a CCL query.

The first one is best suited for replay of historical data, or testing of algorithms against existing data. The adapter reads one row at a time, at a pre-set rate (rows per second).

The second adapter polls the database at a specified interval, and reads only rows that were added since the last read. The database must have a time-stamp — or an ID — column, that is unique to each row and increases in value. My preference here is to use an UTC time-stamp assigned by a client application — the one that inserted the data in the database — to preserve order of events entering a Coral8 stream. Using the ID column is convenient, but should be considered only when there is a single source of data and there is no way to swap records entering the database; or if the order of events is irrelevant.

The third method provides a specific syntax to create database sub-queries within a standard CCL query.

In the next few posts I will go through examples of each method.