Caesium: Real-time Notification System

A real time notification system built using NodeJS and Redis.

Posted by David Leonard on July 1, 2013

A real time notification system built using NodeJS and Redis. The motivation for this project is to integrate it into Graphyte, an ongoing research project under my mentor at The City College of New York.

Graphyte - The Scientific Collaboration Tool

Before diving into Caesium, some context about Graphyte is needed. Graphyte is a platform built by G.L.A.S.S Lab at The City College of New York to allow collaborators in any scientific collaborations to share their source code, data and computational artifacts with one another within a unified-looking environment. It currently consists of the following sub-systems:

  • Graphyte Core System (GCS)
  • Graphyte-CLI: Command Line Interface using Graphyte
  • Kalium: Simple data store
  • Aurum: GCS Client for creating unit-test based assessments for learning
  • Aurum-cli: Command line interface with the Aurum API
  • Aurum-Web: Web based interface for packaging scorm modules
  • Aurum-js: JavaScript client library for scorm modules using Aurum
  • Repoman: Wrapper for working with different repo-hosts (mercurial, git, scm-manager)
  • Canary: Wrapper for input/output data blobs to use (Kalium, EC2, OpenStack)
  • Caesium: A real time notification system used for debugging and notifying progress of a job being run

At a high level, Graphyte is currently being used for automated learning through the use of an Learning Management System (Aurum) through the Blackboard client. By creating scorm modules, a student can submit their code to the exercise on Blackboard, where their code is sent through a distributed computing network through the use of Condor. After the result is returned up through Condor, GCS and Aurum, respectfully, the student recieves a report based on what tests they passed and failed through the unit tests.

Inception of Caesium - The Reactive API

Where does the name Caesium come from? As seen from all other systems in our lab, Caesium also takes its name from chemical names. Caesium is a very reactive element, and therefore it was the perfect name for a system which sends notifications in real time based on a triggered event.

Due to the complexity of Graphyte, it is tough to debug across several web applications working in unison with each other. Sometimes the student’s code does not get saved into the database, and sometimes internal errors occur which can’t always be tracked. To help solve this problem, Caesium can be used to send notifications to the student while their code is being ran inside of the scorm module on Blackboard. On top of this, a progress bar can be displayed which will allow the user to track the progress of their job and to be able to report exactly where an error occurs in the stacktrace.

API Requirements and Roadmap

From my understanding of the task presented to me, I knew that I had to somehow propogate messages to the student. This would require having several connections open at the same time, and the server and client would be constantly checking if there is any new data to be sent. Instead of using a comet long-polling method (which could potentially cause problems when too many connections are being opened), I decided to use Node.js due to its evented I/O.

Using Node.js and socket.io in conjunction with each other, all that is needed is a middleman to send push notifications. After a little bit of research, I stumbled upon pubsubhubub, which is short for a hub containing a set of channels which can send and recieve push notifications.Instead of implementing my own version of pubsub, I used redis for its rich API which also includes a pubsub implementation. A simple example of redis’s pubsub is shown below:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
    """
    redis-python.py demonstrates how to publish a message
    from an application such as Graphyte. To publish, simply
    create a channel name and pass the message. 

    publish: ('channel', 'message')
    -------------------------------
        parameters:
        ----------
            Channel: name of channel client is subscribed to(str)
            Message: message to send to user(str)

        returns:
        -------
            Int(L): int represents # of clients msg is sent to
    """

    import redis

    #Graphyte library to publish notifications to redis
    #Create instance of redis
    r = Redis.redis()
    #Publish a message to redis
    z = r.publish('David123', 'this is a test message @stage1')
    y = r.publish('David123', 'this is a test message @stage2')
    x = r.publish('David123', 'this is a test message @stage3')
    #r.set('key', 'value')
    #x = r(lpush, 'key', 'test')

By using the redis-python client combined with node.js, socket.io and the redis client for node.js, we can generate a progress bar which will fill up based on the notifications being sent throughout the execution of the student’s job as it propogates through Graphyte.

The advantage of using this setup is that through the use of Node.js, whenever an event is triggered (in this case, the event is triggered whenever there is a notification to push) it will be published to the user based on the channel they automatically subscribe to. Using Node.js greatly reduces the overhead needed to sustain several connections by the client, and therefore these tools were right for the task.

Node.js Server

After understanding the requirements, it was time to write the Node.js server. The code server.js is shown below:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
    /*
    Server.js handles the following:

    1. Creates the HTTP server.
    2. Creates the redis client. 
    3. Uses socket.io to open a connection between client-server.
    4. Handles PUSH operations to proper clients.

    */


    //Instantiates the application
    var app = require('http').createServer(handler)
      , io = require('socket.io').listen(app)
      , fs = require('fs')
      , redis = require("#Redis")


    app.listen(81);

    //Our redis client which subscribes to channels for updates
    redisClient = Redis.createClient();

    //look for connection errors and log
    RedisClient.on("error", function (err) {
        console.log("error event - " + RedisClient.host + ":" + RedisClient.port + " - " + err);
    });

    //http handler, currently just sends index.html on new connection
    function handler (req, res) {
      fs.readFile(__dirname + '/../www/index.html',
      function (err, data) {
        if (err) {
          res.writeHead(500);
          return res.end('Error loading index.html' + __dirname);
        }

        res.writeHead(200);
        res.end(data);
      });
    }

    //  socket io client, which listens for new websocket connection
    //  and then handles various requests
    io.sockets.on('connection', function (socket) {
      //on subscription request joins specified room
      //later messages are broadcasted on the rooms
      socket.on('subscribe', function (data) {
        socket.join(data.channel);
        // Set the client subscription to be on the socket.on event.
        // Data will be sent from the client -> server through this
        // connection.
        RedisClient.subscribe(data.channel);
      });
    });

    /**
     * subscribe to redis channel when client is ready
     */

    redisClient.on('ready', function() {
        console.log('#redis ready');
    });

     // This sends updates to users.
     // Waits for messages from the redis channel
     // On new message, sends updates on the rooms named 
     // after channels. 
    redisClient.on("message", function(channel, message){
        var resp = {'text': message, 'channel':channel}
        io.sockets.in(channel).emit('message', resp);
    });

Node.js Client

After writing and testing the server, I moved onto writing the client to recieve these messages, index.html:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
    <script>  
        var student_ID = "David";
        var exercise_ID = "123";
    </script> 
        
    <script>
    var value = 0;

    var channel_name = student_ID.concat(exercise_ID);
    console.log(channel_name);
    var last_stage = false;    
    //socket io client
    var socket = io.connect('http://localhost');

    // updates status to the status div
    function setStatus(msg) {
      $('#status').html('Connection Status : ' + msg);
    }

    // adds message to messages div
    function addMessage(msg) {
      var str = "<tr><td>" + msg + "</td></tr>";
      console.log(str)
      $('#messages').append(str)  
    }

    //on connection, updates connection state and sends subscribe request
    socket.on('connect', function(data){
      setStatus('connected');
      socket.emit('subscribe', {channel:channel_name});
    });

    //when reconnection is attempted, updates status 
    socket.on('reconnecting', function(data){
      setStatus('reconnecting');
    });

    //on new message adds a new message to display
    socket.on('message', function (data) {
      var msg = "";
      if (data.channel) {
        msg += 'Channel:' + data.channel + ', ' + data.text; 
      } else {
        msg = data.text;
      }
      addMessage(msg);

      // Increase Progress bar on new message
      // Need parsing function to decide how much to raise it by

      // Last stage before job is sent to graphyte, 
      // after this we just increase the progress bar by like 1% every ten seconds

        if(data.text.indexOf("@stage1") != -1) {
          value =10;
        } else if (data.text.indexOf("@stage2") != -1) {
          value =15;
        } else if (data.text.indexOf("@stage3") != -1) {
          value = 30;
          last_stage = true;
        } else {
          value +=5;  
        }
      
      $("#progressbar").css('width', value + "%");
    });
    </script>

The goal here is to fire off events based on reconnection, connection and messages being sent. For now, a channel is created using a two dummy variables. The client is automatically subscribed to this channel, and through the use of our test Python script, whenever a message is sent to that channel, the client’s progress bar will increase by a set amount depending on the execution location of their job.

Current Progress

Now that the code has been written, the process of folding in Caesium into the production machine is underway. Caesium has only been integrated into Aurum, but fortunately it is easy to do pubsub across several sub-systems, as long as a channel name is persisted across the platform. More to come in the future.

Experience

Throughout my time working on this project over the last 7 months, I have learned a wide array of skills with new and unfamiliar technologies. After being exposed to redis and the power it has when coupled with Node.js and Socket.io, I look forward to developing more applications using these technologies.

Aside from learning new technology, I’ve learned to adapt to a large codebase and figure out where and how I can contribute. The biggest learning curve for me was to understand how to structure my code in a way such that it can be folded into any sub-system seamlessly without the need for hard-coded dependencies, a skill that is often hard to develop in school due to a lack of projects with large codebases.