Category Archives: Code

Code samples

Bulk change post categories in WordPress

The following code moves all WordPress posts from one category to another. Both categories must already exists in the database. The from_category is not deleted form the database, only posts are moved. Needless to say, do backup your database before trying this.
Make sure to specify category slugs (not names) in the first two lines of the code. Comments tell the story; you may also want to look at the database model.

-- Move all post from one category to another; 
-- use category SLUG, not name.
-- Define categories here.
SET	@from_cat_slug = _utf8'from_category_slug_here';
SET	@to_cat_slug = _utf8'to_category_slug_here';
-- Fetch term_taxonomy_id for both categories.
SELECT @from_cat_tax_id := tax.`term_taxonomy_id`
	FROM `wp_term_taxonomy` AS tax JOIN
		 `wp_terms` AS trm ON tax.`term_id` = trm.`term_id`
	WHERE trm.`slug` = @from_cat_slug
		AND tax.taxonomy = 'category';
SELECT @to_cat_tax_id := tax.`term_taxonomy_id`
	from `wp_term_taxonomy` AS tax JOIN
		 `wp_terms` AS trm ON tax.`term_id` = trm.`term_id`
	WHERE trm.`slug` = @to_cat_slug
		AND tax.taxonomy = 'category';	
-- Create table to hold post ids.
-- List all posts linked to the from_category.
INSERT INTO ds_cat_mrg (`id`)
	SELECT `object_id`
	FROM `wp_term_relationships`
	WHERE `term_taxonomy_id` = @from_cat_tax_id;
-- Delete links between these posts and both categories.
DELETE FROM `wp_term_relationships` 
	WHERE `object_id` IN (SELECT `id` FROM `ds_cat_mrg`)
	AND `term_taxonomy_id` 
               IN (@to_cat_tax_id, @from_cat_tax_id);
-- Insert links to the new category.
INSERT INTO `wp_term_relationships` 
        (`object_id`, `term_taxonomy_id`)
	SELECT `id`, @to_cat_tax_id FROM `ds_cat_mrg`;


Coral8 – Randomly Sampling a Stream

The example below randomly samples an input stream and inserts samples into a named window for later processing. A variable named Dice is "rolled" each time a new row enters the input stream. A row is copied into the window if Dice = 1.

-- Variable for random number
-- Input stream
-- Each time a row enters the input stream,
-- roll the Dice
ON StreamIn
SET Dice =
    TO_INTEGER(RANDOM() * TO_FLOAT(DiceSides)) + 1;
-- Some data entering the input stream
    TYPE RandomTuplesGeneratorAdapterType
    RATE     = "20"
    ,ROWCOUNT = "600"
-- Named window for samples
    ,Dice INTEGER
-- Randomly sample the input stream
    WHEN Dice = 1 THEN WR
SELECT Val, Dice
    FROM StreamIn

Here is a result, out of 600 messages that entered the input stream, 96 ended in the window; I was using a six-sided die — not bad.


Bloated WordPress Database

It all started when Google crawler reported a few long-to-load pages on the site. After some poking around, I finally looked at the database. The wp_options table was three times bigger than the wp_posts table — oops. Seems that the wp_options is also used for the RSS cache — so it eventually bloats.

As a quick fix, you may simply run this query to get rid of the RSS cache entries.

  FROM `wp_options`
  WHERE `option_name` LIKE 'rs_%'
  AND`autoload` = 'no'
  AND char_length(`option_name`)  > 25;

After that optimize the table using:

OPTIMIZE TABLE `wp_options`;

In my case, the wp_options table went from 950 kb down to 47 kb; almost 20 times smaller. For finer control of what to delete — and if you do not like SQL — try the WP-Options-Manager plug-in. The manager also allows cleanup of junk data, left behind by inactive plug-ins.

Somewhere around version 2.6, WordPress introduced auto-save and revisions — you may or may not like these. The following query lists all revisions in the wp_posts table.

  FROM `wp_posts`
  WHERE `post_type` = 'revision';

The auto-save and revisions can be disabled using this plug-in. And a final warning — before you start SQL scripts — make sure you have the latest database backup somewhere safe; bad things do happen.


The database backup dropped from 209 kb to 86 kb — by about 60%.


Read from DB – Coral8 [8]

As previously discussed, the Coral8 Read from Database Adapter is best suited for replay of historical records, or for testing algorithms against existing data.

For this article, I will create a program to monitor a measurement stream and raise alarms based on a statistical index. The adapter will be used to read existing data from one of my databases and test the program. I have some real-life measurements generated during assembly of car parts. Among other columns, the table of measurements includes:

  • Record Time — time-stamp;
  • Serial Number — unique part id;
  • Piston Press Force — a measurement.

Here is a SQL snippet:

CREATE TABLE dbo.PartData(
    RecordTime datetime
    ,SerialNumber varchar(13)
    ,PistonPressForce decimal(8,3)
    -- more columns here

This will do for the example, now the to-do list:

  1. Create an input stream.
  2. Attach the read-database adapter to the stream.
  3. Test the connection to the DB.
  4. Create a stream for statistical data (average, sigma, Cpk).
  5. Create a query to calculate statistics over a window of data rows.
  6. Test the statistics.
  7. Create an output stream for alarm messages.
  8. Create a query to raise alarm messages.
  9. Test alarms.


1. The Input Stream

The schema of the input stream matches database columns. Note the column-type mappings between SQL Server and Coral8. Stream fields have same names as database columns — this is necessary for the adapter type.

-- Input stream
    RecordTime TIMESTAMP
    ,SerialNumber STRING
    ,PistonPressForce FLOAT


2. The Adapter

As in the previous post, I have used point-and-click to insert the adapter; check out the adapter form.

-- Read from DB adapter
    TYPE ReadFromDatabaseAdapterType
    DBNAME = "RCI4122_1"
    ,TABLE = "PartData"
    ,TIMESTAMPCOLUMN = "RecordTime"
    ,RATE = "1"
        "PistonPressForce  > 0"

The adapter parameters:

Field Comment
DB Connection Name Name of the database connection as specified in the coral8-services.xml file. For more details see the previous post.
Table or View Name of the database table.
Where Clause The expression in the WHERE clause of the query.
Loop Count How many times to loop through the data.
Rate Number of reads per second.
Timestamp column Name of the time-stamp column in the database.
Timestamp column initial value Initial value for the time-stamp. In the example above, the adapter reads all records since New Year 2008.

The adapter example is an equivalent of running this SQL query:

USE RCI4122_1;
SELECT RecordTime, SerialNumber, PistonPressForce
  FROM PartData 
  WHERE RecordTime  > '2008-01-01'
    AND PistonPressForce  > 0
ORDER BY RecordTime;


3. Test the Connection to the DB

Looks OK.


4. Stream for Statistical Data

I would like to calculate the following statistics for the measurement:

  • average,
  • sigma (standard deviation) ,
  • Cpk — process capability index — is a distance from the average to the nearest specification limit measured in three-sigma units.

The stream definition:

-- Local stream for stats
    RecordTime TIMESTAMP
    ,p_Avg FLOAT
    ,p_Std FLOAT
    ,p_CpkUp FLOAT
    ,p_CpkLo FLOAT

The Cpk is broken into the p_CpkUp and the p_CpkLo. The p_CpkUp is the distance from the average to the upper limit, and the p_CpkLo is the distance to the lower limit; distances are in three-sigma units.


5. Query to Calculate Statistics

Process limits could be read from a database or a file, but using pre-defined variables is simpler.

-- Lower and Upper limits

The query creates a sliding window of last 35 rows and inserts calculated statistics into StreamStats. The 35 is frequently used as a minimum sample size for this type of statistics to make sense. The query updates after each new row of data (new part) and calculates statistics over the last 35 parts.

-- Calculate stats
  ,(p_USL - avg(StreamIn.PistonPressForce))/
     (3 * STDDEVIATION(StreamIn.PistonPressForce))
  ,(avg(StreamIn.PistonPressForce) – p_LSL)/
     (3 * STDDEVIATION(StreamIn.PistonPressForce))
FROM StreamIn
    KEEP 35 ROWS


6. Test the Statistics

This works nice, now I will raise an alarm when the Cpk drops below 1.33 — when the distance between the average and a limit is less than four sigma.


7. Output Stream for Alarm Messages

The output stream has: an alarm message, a time-stamp and the value which raised the alarm.

-- Output stream for Cpk alarm
    RecordTime TIMESTAMP
    ,p_Cpk FLOAT
    ,AlmMsg STRING


8. Query to Raise Alarms

Selects all rows from StreamStats with the Cpk below 1.33 and inserts the time-stamp, the smaller Cpk value and the alarm message "Low Cpk" into StreamOut.

-- Monitor Cpk and raise alarm
    SELECT RecordTime, min(p_CpkUp, p_CpkLo), 'Low Cpk'
FROM StreamStats
    WHERE p_CpkUp <  1.33
       OR p_CpkLo <  1.33


9. Test Alarms

The final test shows alarm messages in the output stream, generated when the Cpk drops below 1.33.


The next step would be to attach an output adapter to this stream to log alarms and send notifications. The code still needs some optimization; once that is done the input adapter can be replaced by a live-measurement feed to generate alarms in real-time.


Poll from DB – Coral8 [7]

I read somewhere that — when it comes to programming — being lazy is good. Hence, I will reuse as much code as possible from previous posts. The objective for today is to use Coral8 Poll from Database Input Adapter.

First, let us look at the database (same as here):

	Entity varchar(25) NOT NULL
	,Latitude float
	,Longitude float
	,Course float
	,Speed float
	,TimeUTC datetime NOT NULL

A caveat; the SQL server DATETIME type is not time zone aware, while the Coral8 TIMESTAMP is. If the time zone is not specified, Coral8 assumes UTC. In general, this is not a problem; however, stream viewers always display time in local format, so for me (Toronto, EST) all database time-stamps are shifted by five hours. One way to remedy the problem is to use the Timezone parameter in the service specification, like this:

<!-- DB Time Zone -->
<param name="Timezone">EST</param>

The code has to be added to the coral8-services.xml file; here is the whole section for the database used:

<!-- DamirTest on DAMIR5 -->
<service name="DamirTest" type="DATABASE">
  <description>Connection to DamirTest</description>
  <param name="DBDriverType">DBDriverODBC</param>
  <!-- DBDriverConnectString = DSN name -->
  <param name="DBDriverConnectString" >DamirTest</param>
  <param name="DBReadNoCommit">True</param>
  <!-- Username and Password -->
  <param name="Username">username_here</param>
  <param name="Password">password_here</param>
  <!-- DB Time Zone -->
  <param name="Timezone">EST</param>

Now I need a stream to read the records into. For the example, I will use only: time-stamp, entity, latitude, and longitude.

-- Input stream
    ,Entity STRING
    ,Latitude FLOAT
    ,Longitude FLOAT

You may have noticed that I have chosen different names for time-stamps: DbTime vs. TimeUTC. This was done on purpose, to clarify adapter definition. Here is the adapter:

-- Poll from Db adtapter
    TYPE ReadFromDBAdapterType
    DBNAME = "DamirTest",
    QUERY  = [[
SELECT TimeUTC, Entity, Latitude, Longitude
FROM GpsData
    "2009-01-01 00:00:00"
,POLLINTERVAL = "2000000"

Looks complicated, however the code was auto-generated using a point-and-click  procedure and filling-in the adapter form.

The adapter TYPE ReadFromDBAdapterType may sound a bit confusing, but this its the Poll Adapter.

Field Comment
DBConnectionName Name of the database connection as specified in the coral8-services.xml file.
Poll Interval Interval at which the adapter polls the database. Note that the form allows time units, while the generated code uses microseconds.
Timestamp field Refers to the name of the time-stamp field in the stream schema.
Timestamp field initial value Initial value for the time-stamp. In the example above -- upon start-up -- the adapter reads all records since New Year 2009.
Query The query to run against the database. Thing of everything from the WHERE clause on, as a boiler-plate example. The TimeUTC is the timestamp column name in the database.

As described in the previous post, the adapter can use a time-stamp field or a counter (ID) field to remember the last record read. I am using the time-stamp, hence counter fields are left empty. The last row of the query, ORDER BY TimeUTC must be present, because the the adapter remembers the last value read — not necessarily the latest one.

When the server is started, the adapter reads all records between the TIMESTAMPFIELDINITVALUE and NOW().


When I manually insert a row into the database using:

	(TimeUTC, Entity, Latitude, Longitude)
	(getUTCdate(), 'ABC', 1.0, 2.0)

The new row appears in the stream viewer too.

From this point on, only new database rows are added to the stream. All looks fine, but one problem still remains. Suppose we run this for several months and then re-start the server. The adapter would (upon start-up) read all these records again.

To avoid the scenario, the module persistence option should be enabled for modules containing the Poll DB Adapter. The persistence can be enabled in Coral8 Studio on the module property tab. With the persistence enabled, Coral8 periodically stores all module variables and rows to a disk dive, and re-loads them upon server start-up. This way the adapter will remember the timestamp of the latest row and will continue from that point.


DB as an Event Source – Coral8 [6]

These days, almost every action in an organization is reflected in some kind of a database. Invoices, purchase orders, shipping/receiving logs, production reports, etc. — all rely on an underlying database. By tapping into databases, it is possible to bring together events (data) from independent systems and benefit from the application of EP/CEP even within a simple IT infrastructure (no ESB, no enterprise architecture, systems added in an ad hoc manner).

There seems to be three basic ways for Coral8 to read from a database:

  1. Read from the Database input adapter;
  2. Poll from the Database input adapter;
  3. Read directly from a CCL query.

The first one is best suited for replay of historical data, or testing of algorithms against existing data. The adapter reads one row at a time, at a pre-set rate (rows per second).

The second adapter polls the database at a specified interval, and reads only rows that were added since the last read. The database must have a time-stamp — or an ID — column, that is unique to each row and increases in value. My preference here is to use an UTC time-stamp assigned by a client application — the one that inserted the data in the database — to preserve order of events entering a Coral8 stream. Using the ID column is convenient, but should be considered only when there is a single source of data and there is no way to swap records entering the database; or if the order of events is irrelevant.

The third method provides a specific syntax to create database sub-queries within a standard CCL query.

In the next few posts I will go through examples of each method.


GPS Event [17] – Coral8 [5]

Last time we detected when a device stops streaming data, however the message remained within Coral8. Today, I would like to attach an output adapter to send the message out. Coral8 comes with several output adapters, including XML over HTTP. This one seems appropriate for any kind of server based application. Just for fun, I decided to transfer data to an application running on Google App Engine.

Starting with Google App Engine is fairly easy; detailed instructions are available, so I will not describe it here.

The to-do list:

  1. Add a XML over HTTP output adapter to the Coral8 example;
  2. Write an application on Google App Engine to receive, store and display Coral8 messages;
  3. Test.

1. The Output Adapter

--Adapter for GoogleAppEngine
    TYPE WriteXmlOverHttpAdapterType
 URL = "",
[[<xsl:stylesheet xmlns:xsl="" version="1.0">
<xsl:output method="text" />
<xsl:template match="/">
<xsl:for-each select="*">
<xsl:for-each select="*">
<xsl:value-of select="@Name" />
<xsl:value-of select="." />


The adapter has two properties:

  • URL of the server, including the port number;
  • XSLTEMPLATE for an optional XSL transformation.

If the XSLTEMPLATE is not specified, Coral8 sends a raw XML string like this one:

<tuple xmlns="" timestamp="1232757671788000">
	<field name="Entity">DS_1</field>
	<field name="Msg">Stream stopped</field>
	<field name="AlmTime">2009-01-23 19:41:09.788000</field>


The template in the adapter transforms the XML to a dictionary-like string:
{'Entity':'DS_1','Msg':'Stream stopped','AlmTime':'2009-01-23 19:41:09.788000',}.
Using the XSL I have simply avoided parsing the XML on the server side.

2. The Application

Capture Coral8 messages to DB.
Display last 10 messages.
from google.appengine.ext import webapp
from google.appengine.ext.webapp.util \
     import run_wsgi_app
from google.appengine.ext import db
class Dta(db.Model):
  content = db.StringProperty(multiline=True)
  date = db.DateTimeProperty(auto_now_add=True)
class MainPage(webapp.RequestHandler):
  def get(self):
    self.response.headers \
      ['Content-Type'] = 'text/plain'
    dt = db.GqlQuery \
      ("SELECT * FROM Dta ORDER BY date DESC LIMIT 10")
    for d in dt:
        dic = eval(d.content)
        self.response.out.write('\n%s' % ('-' * 40))
        for k in dic.keys():
          self.response.out.write \
            ('\n%s = %s' % (k, dic[k]))
  def post(self):
    rec = Dta()
    doc = str(self.request.body)
    rec.content = ' '.join(doc.split())
application = webapp.WSGIApplication \
              ([('/', MainPage)], debug=True)
def main():
if __name__ == "__main__":

May seem complicated, but here is the basics:

class Dta(db.Model) defines a "database table" with two columns:

  • content – a string column for Coral8 data;
  • date – a timestamp column;

post(self) method is activated when Coral8 sends data to the application's web page;

get(self) method is activated when a browser visits the web page.

3. Test

For testing, I have used the same example as in the previous post — send several messages to Coral 8 and then stop. Here is the result in a browser window:

GPS Event [16] – Coral8 [4]

Time for an experiment. Let's look at a way to detect that a sensor stopped streaming data. For the example I will use the same code as in the last test.

The idea is to monitor StreamIn and when it stops, insert a message into a new output stream. The output stream can then be connected to an output adapter and send the message "out there". So, here is the output stream named sAlarm, with three fields:

  • Entity – name of the device which stopped streaming data;
  • Msg – a message, like "Stream stopped";
  • AlmTime – time of detection.
-- Alarm stream
    Entity STRING,
    Msg STRING,
    AlmTime STRING

The AlmTime is formatted as a STRING, because I intend to send this message later via an email or similar adapter.

The detection technique is straight from the Coral8 CCL Cookbook; let's see how it works.

-- Detect no data from StreamIn
SELECT E1.Entity
	,'Stream stopped'
FROM StreamIn AS E1, StreamIn AS E2
	ON E1.Entity = E2.Entity

The MATCHING clause needs at least two streams (think tables), hence  StreamIn is used twice with different aliases: E1 and E2. It is waiting for an event in E1, followed by a non-event from E2, within a time-window of two seconds. The GETTIMESTAMP(E1) function returns the timestamp of the last row from the stream, so the AlmTime is actually time of the last record received.

As in the previous example, I have sent ten numbers (0-9) and then the stream stopped. Notice that AlmTime matches the time-stamp of the last StreamIn entry.

To be continued.