Archive for the ‘EP-CEP’ Category.

Coral8 – Randomly Sampling a Stream

The example below randomly samples an input stream and inserts samples into a named window for later processing. A variable named Dice is "rolled" each time a new row enters the input stream. A row is copied into the window if Dice = 1.

-- Variable for random number
CREATE VARIABLE INTEGER Dice;
CREATE VARIABLE INTEGER DiceSides = 6;
 
-- Input stream
CREATE INPUT STREAM StreamIn
SCHEMA (
    Val INTEGER
);
 
-- Each time a row enters the input stream,
-- roll the Dice
ON StreamIn
SET Dice =
    TO_INTEGER(RANDOM() * TO_FLOAT(DiceSides)) + 1;
 
-- Some data entering the input stream
ATTACH INPUT ADAPTER RMG
    TYPE RandomTuplesGeneratorAdapterType
TO STREAM StreamIn
PROPERTIES
    RATE     = "20"
    ,ROWCOUNT = "600"
;
 
-- Named window for samples
CREATE WINDOW WR
SCHEMA(
    Val INTEGER
    ,Dice INTEGER
)
KEEP 150 ROWS
;
 
-- Randomly sample the input stream
INSERT
    WHEN Dice = 1 THEN WR
SELECT Val, Dice
    FROM StreamIn
;


Here is a result, out of 600 messages that entered the input stream, 96 ended in the window; I was using a six-sided die — not bad.

 

Read from DB – Coral8 [8]

As previously discussed, the Coral8 Read from Database Adapter is best suited for replay of historical records, or for testing algorithms against existing data.

For this article, I will create a program to monitor a measurement stream and raise alarms based on a statistical index. The adapter will be used to read existing data from one of my databases and test the program. I have some real-life measurements generated during assembly of car parts. Among other columns, the table of measurements includes:

  • Record Time — time-stamp;
  • Serial Number — unique part id;
  • Piston Press Force — a measurement.

Here is a SQL snippet:

CREATE TABLE dbo.PartData(
    RecordTime DATETIME
    ,SerialNumber VARCHAR(13)
    ,PistonPressForce DECIMAL(8,3)
    -- more columns here
);

This will do for the example, now the to-do list:

  1. Create an input stream.
  2. Attach the read-database adapter to the stream.
  3. Test the connection to the DB.
  4. Create a stream for statistical data (average, sigma, Cpk).
  5. Create a query to calculate statistics over a window of data rows.
  6. Test the statistics.
  7. Create an output stream for alarm messages.
  8. Create a query to raise alarm messages.
  9. Test alarms.

 

1. The Input Stream

The schema of the input stream matches database columns. Note the column-type mappings between SQL Server and Coral8. Stream fields have same names as database columns — this is necessary for the adapter type.

-- Input stream
CREATE INPUT STREAM StreamIn
SCHEMA (
    RecordTime TIMESTAMP
    ,SerialNumber STRING
    ,PistonPressForce FLOAT
);

 

2. The Adapter

As in the previous post, I have used point-and-click to insert the adapter; check out the adapter form.

-- Read from DB adapter
ATTACH INPUT ADAPTER ReadFromDB
    TYPE ReadFromDatabaseAdapterType
TO STREAM StreamIn
PROPERTIES
    DBNAME = "RCI4122_1"
    ,TABLE = "PartData"
    ,TIMESTAMPCOLUMN = "RecordTime"
    ,RATE = "1"
    ,WHERECLAUSE =
        "PistonPressForce > 0"
    ,TIMESTAMPCOLUMNINITVALUE =
        "2008-01-01"
;


The adapter parameters:

Field Comment
DB Connection Name Name of the database connection as specified in the coral8-services.xml file. For more details see the previous post.
Table or View Name of the database table.
Where Clause The expression in the WHERE clause of the query.
Loop Count How many times to loop through the data.
Rate Number of reads per second.
Timestamp column Name of the time-stamp column in the database.
Timestamp column initial value Initial value for the time-stamp. In the example above, the adapter reads all records since New Year 2008.


The adapter example is an equivalent of running this SQL query:

USE RCI4122_1;
SELECT RecordTime, SerialNumber, PistonPressForce
  FROM PartData
  WHERE RecordTime > '2008-01-01'
    AND PistonPressForce > 0
ORDER BY RecordTime;

 

3. Test the Connection to the DB

Looks OK.

 

4. Stream for Statistical Data

I would like to calculate the following statistics for the measurement:

  • average,
  • sigma (standard deviation) ,
  • Cpk — process capability index — is a distance from the average to the nearest specification limit measured in three-sigma units.

The stream definition:

-- Local stream for stats
CREATE LOCAL STREAM StreamStats
SCHEMA (
    RecordTime TIMESTAMP
    ,p_Avg FLOAT
    ,p_Std FLOAT
    ,p_CpkUp FLOAT
    ,p_CpkLo FLOAT
);

The Cpk is broken into the p_CpkUp and the p_CpkLo. The p_CpkUp is the distance from the average to the upper limit, and the p_CpkLo is the distance to the lower limit; distances are in three-sigma units.

 

5. Query to Calculate Statistics

Process limits could be read from a database or a file, but using pre-defined variables is simpler.

-- Lower and Upper limits
CREATE VARIABLE FLOAT p_LSL = 2.485;
CREATE VARIABLE FLOAT p_USL = 2.515;

The query creates a sliding window of last 35 rows and inserts calculated statistics into StreamStats. The 35 is frequently used as a minimum sample size for this type of statistics to make sense. The query updates after each new row of data (new part) and calculates statistics over the last 35 parts.

-- Calculate stats
INSERT INTO StreamStats
SELECT
  MAX(StreamIn.RecordTime)
  ,AVG(StreamIn.PistonPressForce)
  ,STDDEVIATION(StreamIn.PistonPressForce)
  ,(p_USL - AVG(StreamIn.PistonPressForce))/
     (3 * STDDEVIATION(StreamIn.PistonPressForce))
  ,(AVG(StreamIn.PistonPressForce) – p_LSL)/
     (3 * STDDEVIATION(StreamIn.PistonPressForce))
FROM StreamIn
    KEEP 35 ROWS
;

 

6. Test the Statistics

This works nice, now I will raise an alarm when the Cpk drops below 1.33 — when the distance between the average and a limit is less than four sigma.

 

7. Output Stream for Alarm Messages

The output stream has: an alarm message, a time-stamp and the value which raised the alarm.

-- Output stream for Cpk alarm
CREATE OUTPUT STREAM StreamOut
SCHEMA (
    RecordTime TIMESTAMP
    ,p_Cpk FLOAT
    ,AlmMsg STRING
);

 

8. Query to Raise Alarms

Selects all rows from StreamStats with the Cpk below 1.33 and inserts the time-stamp, the smaller Cpk value and the alarm message "Low Cpk" into StreamOut.

-- Monitor Cpk and raise alarm
INSERT INTO StreamOut
    SELECT RecordTime, MIN(p_CpkUp, p_CpkLo), 'Low Cpk'
FROM StreamStats
    WHERE p_CpkUp < 1.33
       OR p_CpkLo < 1.33
;

 

9. Test Alarms

The final test shows alarm messages in the output stream, generated when the Cpk drops below 1.33.

 

The next step would be to attach an output adapter to this stream to log alarms and send notifications. The code still needs some optimization; once that is done the input adapter can be replaced by a live-measurement feed to generate alarms in real-time.

 

Poll from DB – Coral8 [7]

I read somewhere that — when it comes to programming — being lazy is good. Hence, I will reuse as much code as possible from previous posts. The objective for today is to use Coral8 Poll from Database Input Adapter.

First, let us look at the database (same as here):

CREATE TABLE dbo.GpsData(
    Entity VARCHAR(25) NOT NULL
    ,Latitude FLOAT
    ,Longitude FLOAT
    ,Course FLOAT
    ,Speed FLOAT
    ,TimeUTC DATETIME NOT NULL
)

A caveat; the SQL server DATETIME type is not time zone aware, while the Coral8 TIMESTAMP is. If the time zone is not specified, Coral8 assumes UTC. In general, this is not a problem; however, stream viewers always display time in local format, so for me (Toronto, EST) all database time-stamps are shifted by five hours. One way to remedy the problem is to use the Timezone parameter in the service specification, like this:

<!-- DB Time Zone -->
<param name="Timezone">EST</param>

The code has to be added to the coral8-services.xml file; here is the whole section for the database used:

<!-- DamirTest on DAMIR5 -->
<service name="DamirTest" type="DATABASE">
  <description>Connection to DamirTest</description>
  <param name="DBDriverType">DBDriverODBC</param>
  <!-- DBDriverConnectString = DSN name -->
  <param name="DBDriverConnectString">DamirTest</param>
  <param name="DBReadNoCommit">True</param>
  <!-- Username and Password -->
  <param name="Username">username_here</param>
  <param name="Password">password_here</param>
  <!-- DB Time Zone -->
  <param name="Timezone">EST</param>
</service>

Now I need a stream to read the records into. For the example, I will use only: time-stamp, entity, latitude, and longitude.

-- Input stream
CREATE INPUT STREAM StreamIn
SCHEMA (
    DbTime TIMESTAMP
    ,Entity STRING
    ,Latitude FLOAT
    ,Longitude FLOAT
);

You may have noticed that I have chosen different names for time-stamps: DbTime vs. TimeUTC. This was done on purpose, to clarify adapter definition. Here is the adapter:

-- Poll from Db adtapter
ATTACH INPUT ADAPTER PollDB
    TYPE ReadFromDBAdapterType
TO STREAM StreamIn
PROPERTIES
    DBNAME = "DamirTest",
    QUERY  = [[
SELECT TimeUTC, Entity, Latitude, Longitude
FROM GpsData
WHERE TimeUTC> ?C8_TIMESTAMP_FIELD
ORDER BY TimeUTC;
]],
TIMESTAMPFIELD = "DbTime"
,TIMESTAMPFIELDINITVALUE =
    "2009-01-01 00:00:00"
,POLLINTERVAL = "2000000"
;

Looks complicated, however the code was auto-generated using a point-and-click  procedure and filling-in the adapter form.


The adapter TYPE ReadFromDBAdapterType may sound a bit confusing, but this its the Poll Adapter.

Field Comment
DBConnectionName Name of the database connection as specified in the coral8-services.xml file.
Poll Interval Interval at which the adapter polls the database. Note that the form allows time units, while the generated code uses microseconds.
Timestamp field Refers to the name of the time-stamp field in the stream schema.
Timestamp field initial value Initial value for the time-stamp. In the example above -- upon start-up -- the adapter reads all records since New Year 2009.
Query The query to run against the database. Thing of everything from the WHERE clause on, as a boiler-plate example. The TimeUTC is the timestamp column name in the database.


As described in the previous post, the adapter can use a time-stamp field or a counter (ID) field to remember the last record read. I am using the time-stamp, hence counter fields are left empty. The last row of the query, ORDER BY TimeUTC must be present, because the the adapter remembers the last value read — not necessarily the latest one.

When the server is started, the adapter reads all records between the TIMESTAMPFIELDINITVALUE and NOW().

 

When I manually insert a row into the database using:

INSERT INTO dbo.GpsData
    (TimeUTC, Entity, Latitude, Longitude)
VALUES
    (getUTCdate(), 'ABC', 1.0, 2.0)
;

The new row appears in the stream viewer too.

From this point on, only new database rows are added to the stream. All looks fine, but one problem still remains. Suppose we run this for several months and then re-start the server. The adapter would (upon start-up) read all these records again.

To avoid the scenario, the module persistence option should be enabled for modules containing the Poll DB Adapter. The persistence can be enabled in Coral8 Studio on the module property tab. With the persistence enabled, Coral8 periodically stores all module variables and rows to a disk dive, and re-loads them upon server start-up. This way the adapter will remember the timestamp of the latest row and will continue from that point.

 

DB as an Event Source – Coral8 [6]

These days, almost every action in an organization is reflected in some kind of a database. Invoices, purchase orders, shipping/receiving logs, production reports, etc. — all rely on an underlying database. By tapping into databases, it is possible to bring together events (data) from independent systems and benefit from the application of EP/CEP even within a simple IT infrastructure (no ESB, no enterprise architecture, systems added in an ad hoc manner).

There seems to be three basic ways for Coral8 to read from a database:

  1. Read from the Database input adapter;
  2. Poll from the Database input adapter;
  3. Read directly from a CCL query.

The first one is best suited for replay of historical data, or testing of algorithms against existing data. The adapter reads one row at a time, at a pre-set rate (rows per second).

The second adapter polls the database at a specified interval, and reads only rows that were added since the last read. The database must have a time-stamp — or an ID — column, that is unique to each row and increases in value. My preference here is to use an UTC time-stamp assigned by a client application — the one that inserted the data in the database — to preserve order of events entering a Coral8 stream. Using the ID column is convenient, but should be considered only when there is a single source of data and there is no way to swap records entering the database; or if the order of events is irrelevant.

The third method provides a specific syntax to create database sub-queries within a standard CCL query.

In the next few posts I will go through examples of each method.

 

GPS Event [17] – Coral8 [5]

Last time we detected when a device stops streaming data, however the message remained within Coral8. Today, I would like to attach an output adapter to send the message out. Coral8 comes with several output adapters, including XML over HTTP. This one seems appropriate for any kind of server based application. Just for fun, I decided to transfer data to an application running on Google App Engine.

Note:
Starting with Google App Engine is fairly easy; detailed instructions are available, so I will not describe it here.

The to-do list:

  1. Add a XML over HTTP output adapter to the Coral8 example;
  2. Write an application on Google App Engine to receive, store and display Coral8 messages;
  3. Test.

1. The Output Adapter

--Adapter for GoogleAppEngine
ATTACH OUTPUT ADAPTER WriteXMLOverHTTP
    TYPE WriteXmlOverHttpAdapterType
TO STREAM sAlarm
PROPERTIES
 URL = "http://192.168.xxx.76:8080",
 XSLTEMPLATE =
[[<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform" version="1.0">
<xsl:OUTPUT method="text" />
<xsl:template match="/">
<xsl:text>{</xsl:text>
<xsl:for-each SELECT="*">
<xsl:for-each SELECT="*">
<xsl:text>'</xsl:text>
<xsl:value-of select="@Name" />
<xsl:text>'
:'</xsl:text>
<xsl:value-of select="." />
<xsl:text>'
,</xsl:text>
</xsl:for-each>
</xsl:for-each>
<xsl:text>}</xsl:text>
</xsl:template>
</xsl:stylesheet>
]]
;

 

The adapter has two properties:

  • URL of the server, including the port number;
  • XSLTEMPLATE for an optional XSL transformation.

If the XSLTEMPLATE is not specified, Coral8 sends a raw XML string like this one:

<tuple xmlns="http://www.coral8.com/cpx/2004/03/" timestamp="1232757671788000">
    <field name="Entity">DS_1</field>
    <field name="Msg">Stream stopped</field>
    <field name="AlmTime">2009-01-23 19:41:09.788000</field>
</tuple>

 

The template in the adapter transforms the XML to a dictionary-like string:
{'Entity':'DS_1','Msg':'Stream stopped','AlmTime':'2009-01-23 19:41:09.788000',}.
Using the XSL I have simply avoided parsing the XML on the server side.

2. The Application

"""
Capture Coral8 messages to DB.
Display last 10 messages.
"
""
from google.appengine.ext import webapp
from google.appengine.ext.webapp.util \
     import run_wsgi_app
from google.appengine.ext import db
 
class Dta(db.Model):
  content = db.StringProperty(multiline=True)
  date = db.DateTimeProperty(auto_now_add=True)
 
class MainPage(webapp.RequestHandler):
  def get(self):
    self.response.headers \
      ['Content-Type'] = 'text/plain'
    dt = db.GqlQuery \
      ("SELECT * FROM Dta ORDER BY date DESC LIMIT 10")
    for d in dt:
      try:
        dic = eval(d.content)
        self.response.out.write('\n%s' % ('-' * 40))
        for k in dic.keys():
          self.response.out.write \
            ('\n%s = %s' % (k, dic[k]))
      except:
        pass
 
  def post(self):
    rec = Dta()
    doc = str(self.request.body)
    rec.content = ' '.join(doc.split())
    rec.put()
     
application = webapp.WSGIApplication \
              ([('/', MainPage)], debug=True)
 
def main():
  run_wsgi_app(application)
 
if __name__ == "__main__":
  main()

May seem complicated, but here is the basics:

class Dta(db.Model) defines a "database table" with two columns:

  • content – a string column for Coral8 data;
  • date – a timestamp column;

post(self) method is activated when Coral8 sends data to the application's web page;

get(self) method is activated when a browser visits the web page.

3. Test

For testing, I have used the same example as in the previous post — send several messages to Coral 8 and then stop. Here is the result in a browser window:

GPS Event [16] – Coral8 [4]

Time for an experiment. Let's look at a way to detect that a sensor stopped streaming data. For the example I will use the same code as in the last test.

The idea is to monitor StreamIn and when it stops, insert a message into a new output stream. The output stream can then be connected to an output adapter and send the message "out there". So, here is the output stream named sAlarm, with three fields:

  • Entity – name of the device which stopped streaming data;
  • Msg – a message, like "Stream stopped";
  • AlmTime – time of detection.
-- Alarm stream
CREATE OUTPUT STREAM sAlarm
SCHEMA (
    Entity STRING,
    Msg STRING,
    AlmTime STRING
);

The AlmTime is formatted as a STRING, because I intend to send this message later via an email or similar adapter.

The detection technique is straight from the Coral8 CCL Cookbook; let's see how it works.

-- Detect no data from StreamIn
INSERT INTO sAlarm
SELECT E1.Entity
    ,'Stream stopped'
    ,TO_STRING(GETTIMESTAMP(E1))
FROM StreamIn AS E1, StreamIn AS E2
    MATCHING [2 SECONDS: E1, !E2]
    ON E1.Entity = E2.Entity
;

The MATCHING clause needs at least two streams (think tables), hence  StreamIn is used twice with different aliases: E1 and E2. It is waiting for an event in E1, followed by a non-event from E2, within a time-window of two seconds. The GETTIMESTAMP(E1) function returns the timestamp of the last row from the stream, so the AlmTime is actually time of the last record received.

As in the previous example, I have sent ten numbers (0-9) and then the stream stopped. Notice that AlmTime matches the time-stamp of the last StreamIn entry.

To be continued.

GPS Event [15] – Coral8 [3]

In the previous post I have managed to send GPS data to Coral8. This time I would like to use Coral8 as a database-loading front end, with minimum changes to the already existing program. I already have MS SQL Server running on the same machine as Coral8, so this should not be too complicated.

Here is the to do list:

  1. Create a database and a table for GPS data;
  2. Create an ODBC connection to the database;
  3. Register the database as a service with Coral8, using the ODBC connection;
  4. Restart Coral8 Server;
  5. Attach a write-to-database output adapter to the output stream;
  6. Test.

Step 1 – Create a database

Using MS SQL Server Manager Studio I have created a new database named DamirTest and and a table GpsData.

USE DamirTest
CREATE TABLE dbo.GpsData(
    Entity VARCHAR(25) NOT NULL
    ,Latitude FLOAT
    ,Longitude FLOAT
    ,Course FLOAT
    ,Speed FLOAT
    ,TimeUTC DATETIME NOT NULL
)

As you can see, the idea is to match the record of the output stream (see the previous post).

Step 2 – Create an ODBC connection

Coral8 communicates with MS SQL Server using an ODBC connection, so one has to be added for the database. The dialog can be accessed via Start >>> Administrative Tools >>> Data Sources (ODBC).  For SQL Server 2005 — and latter — specify SQL Native Client for the driver.



Step 3 – Register the database as a service with Coral8

Coral8 remote services are listed in the coral8-services.xml file. The file can be found in /Coral8/Server/conf/ directory. I had to add the following segment to the file:

<service name="DamirTest" type="DATABASE">
    <description>Connection to DamirTest</description>
    <param name="DBDriverType">DBDriverODBC</param>
    <!-- DBDriverConnectString = DSN name -->
    <param name="DBDriverConnectString">DamirTest</param>
    <param name="DBReadNoCommit">True</param>
    <!-- Username and Password -->
    <param name="Username">my_username_here</param>
    <param name="Password">my_password_here</param>
</service>

Step 4 – Restart Coral8 Server

Coral8 reads configuration files on start-up, so the server has to be restarted. There is no need to re-boot the machine, stop and start the program only.

Step 5 – Attach a write-to-database output adapter

No need to remember this syntax. I have simply used point and click to attach the adapter to the output stream — I did have to enter the query in the adapter form.

ATTACH OUTPUT ADAPTER ToDB
    TYPE WriteToDBAdapterType
    TO STREAM StreamOut
PROPERTIES
    DBNAME = "DamirTest",
    QUERY  = [[
INSERT INTO [DamirTest].[dbo].[GpsData]
    (Entity ,Latitude ,Longitude
    ,Course ,Speed ,TimeUTC)
VALUES
    (?Entity ,?Latitude ,?Longitude
    ,?Course ,?Speed ,?TimeUTC);
 ]],
    COMMITFREQUENCY = "0"
;

Note how all stream field names start with a question mark. They are positionally mapped to the list of database fields; so it is not really necessary to use same names for database and stream fields.

Step 6 - Test

This is all it takes, the rest of the code from the previous post remains unchanged. As in the last example, I have sent ten GPS readings to Coral8. Here they are in the output stream,

and in the database.

To be continued.

GPS Event [14] – Coral8 [2]

Ever heard about that hotel that didn’t have the 13th floor? Empty superstition, I don’t believe in things like that. Now where was I? Yes, GPS data to Coral8 — here we go.

Having done my homework in the previous post, this should not be too complicated. For the template I have chosen: entity, latitude, longitude, course, speed and the time stamp. The first thing I noticed is that my time format does not match the format used by Coral8. After few trail and error attempts, I have decided to transfer the time stamp to the server as a string, and later convert it to the time format. For this I needed an additional local stream.

-- Input from GPS
CREATE INPUT STREAM GpsIn
SCHEMA (
    Entity    STRING
    ,Latitude  FLOAT
    ,Longitude FLOAT
    ,Course FLOAT
    ,Speed FLOAT
    ,TimeUTC STRING
);
 
-- Local formatted stream with
-- TimeUTC as TIMESTAMP
CREATE LOCAL STREAM GpsFmtd
SCHEMA (
    Entity    STRING
    ,Latitude  FLOAT
    ,Longitude FLOAT
    ,Course FLOAT
    ,Speed FLOAT
    ,TimeUTC TIMESTAMP
);
 
-- Output stream
CREATE OUTPUT STREAM StreamOut
SCHEMA (
    Entity    STRING
    ,Latitude  FLOAT
    ,Longitude FLOAT
    ,Course FLOAT
    ,Speed FLOAT
    ,TimeUTC TIMESTAMP
);

The only purpose of the local GpsFmtd stream is to allow conversion of the TimeUTC field from a string to the proper time format. With streams in place, I can add queries to connect them.

-- From Input to Local
INSERT INTO GpsFmtd
SELECT
    Entity ,Latitude ,Longitude
    ,Course ,Speed
    ,TO_TIMESTAMP(TimeUTC)
FROM GpsIn;
 
-- From Local to Output
INSERT INTO StreamOut
SELECT * FROM GpsFmtd;

Here is the flow view.

This is all for the Coral8 side, now the Python code. Being on my local network, there is no need for an authorization module. I will need only template and service modules:

As in all previous examples, the service has to be activated in the main gps_event.py module:

# select service: none, aws, rc, tweet, email, gtalk, coral8, wp, blog
use_service = 'coral8'
 
# define sampling delay in seconds
sample_delay = 5

And here is the result. You may notice different display of the TimeUTC field. The top one is a string — as sent to the server — while the bottom one is in the Coral8 TIMESTAMP format.

In Coral8 terminology this would be called a GPS out-of-process adapter.

To be continued.