Heroku Connect is a service offered by Heroku which performs 2-way data synchronization between Salesforce and a Heroku Postgres database.
When we first built Heroku Connect, we decided to use polling to determine when data had changed on either side. Polling isn't pretty, but its simple and reliable, and those are "top line" features for Heroku Connect. But polling incurs two significant costs: high latency and wasted resources. The more you poll the more you waste API calls and database queries checking when there are no data changes. But if you lengthen your polling interval then you grow the latency for the data synchronization.
Events all around
The solution is conceptually simple - register an event handler with each data source so that it can send you an event when data has changed. Of course, most people that have done event-driven programming know that the practice is often harder than it sounds in theory.
In this post we will show how to use the force.com Streaming API to subscribe to event notifications from force.com. Then we will show how to do something similar to listen for events from Postgres.
force.com events
The force.com Streaming API is devoted to sending real-time events of data changes inside force.com.
The Streaming API supports a publish/subscribe model. In order to create a subscription, we first need to construct a PushTopic
. This creates a named query in the force.com system that we can subscribe to. Whenever a record changes which satisfies the query, the streaming API will publish those changes as events.
We construct the PushTopic automatically based on the force.com table that you are synchronizing with Heroku Connect. This is the Python code which accesses the force.com SOAP API (using the salesforce-python-toolkit):
def create_push_topic(self, object_name):
topic = self.h.generateObject('PushTopic')
topic.ApiVersion = 30.0
topic.Name = "hconnect_{0}".format(object_name)
topic.NotifyForFields = "All"
topic.NotifyForOperationCreate = True
topic.NotifyForOperationUpdate = True
topic.NotifyForOperationDelete = True
topic.query = "select Id from {0}".format(object_name)
res = self.h.create(topic)
if hasattr(res, 'errors'):
raise InvalidConfigurationException("PushTopic creation failed: {0}".format(res.errors[0]))
Now we need to setup a subscriber to this topic. We are using Node.js for most of our event-driven needs because it handles that architecture so well. The best force.com client library for Node.js is called NForce. NForce supports the Streaming API and makes it incredibly easy to subscribe to a PushTopic:
var nforce = require("nforce");
var org = nforce.createConnection(...);
org.authenticate(...);
var pt_name = "hconnect_" + object_name;
// Create a connection to the Streaming API
var str = org.stream({ topic: pt_name });
str.on('connect', function(){
console.log('connected to pushtopic');
});
str.on('error', function(error) {
console.log('error: ' + error);
});
str.on('data', function(data) {
// Data will contain details of the streaming notification:
console.log(data);
});
For the purposes of Heroku Connect, whenever we receive a streaming event we fire off a task through Redis which commands a worker to query for the latest data from force.com and sync it to Postgres. Although it's possible to receive record data over the Streaming API, there are various limitations. Also, if our subscriber is down for some reason then we may miss events. For these reasons we use streaming events as a notifier that some data has changed and we should query for those changes using the force.com SOAP API.
Listen / Notify
Next up is how we listen for change events on the Postgres side.
Postgres supports a very cool publish/subscribe system using the built-in Listen and Notify commands.
Listen
is the "subscribe" part. You issue this command through your database client to create a subscription to a named channel.
Inside the database, you use the Notify
command to publish events to the channel.
Sounds simple enough, right?
The publisher
One easy way to call Notify
within the database is to create a function which performs the call and to create a trigger which calls your function when data is modified.
Here's the PLSQL code for our trigger function:
CREATE FUNCTION table1_notify_trigger() RETURNS trigger AS $$
DECLARE
BEGIN
PERFORM pg_notify('channel1');
RETURN new;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER table1_insert_trigger AFTER INSERT ON table1
FOR EACH ROW EXECUTE PROCEDURE table1_notify_trigger();
This code defines a new function called table1_notify_trigger
. When this function is executed it calls pg_notify
with our channel argument channel1
.
Finally, we create an AFTER INSERT trigger on table1
that executes our function whenever a row is inserted in the table.
If we want to track other DML events we can create additional AFTER UPDATE or AFTER DELETE triggers.
The subscriber
Again we've chosen to use Node.js to listen for events from Postgres because the LISTEN command is well support by the node-postgres module.
The code is very simple:
pg.connect(db_url, function(err, client) {
if (err) {
console.log("Error connecting to database: " + err);
} else {
client.on('notification', function(msg) {
console.log("DATABASE NOTIFY: ", msg.payload);
// Move some data...
});
var query = client.query("LISTEN channel1");
}
});
After connecting to the database, we just call client.on
to register a handler for notifications, and then execute the SQL command LISTEN channel1
to setup the subscription. You can pass data to your subscriber and access it through the msg.payload
attribute.
Yay for events
The notifications system in Postgres works very well. It could be even better if Postgres supported a "default event set" that avoided the need to create the trigger. However, having those primitives gives you lots of control.
You can easily envision some really cool things you could do, like streaming change events out over a websocket to the browser. Give it a try and let us know what cool things you come up with.