Streams Plugin – Messages, Subscriptions and Notifications

The Streams plugin doesn’t just standardize and handle storing and retrieving data. It also standardizes and handles the complex machinery around making changes to that data. This is based around the concept of messages posted to streams.

Various users can post messages on streams, and they are propagated to others who are observing the stream. There are multiple ways for users to become notified of messages posted to a stream:

  • Pulling Data: Clients, such as browsers, can retain/release/refresh streams, which results in HTTP requests handled by Qbix Servers (using PHP)
  • Pushing Data: Clients can observe/neglect or join/leave streams, in order to tell Qbix Nodes (Node.js) to send updates via web sockets
  • Notifications: Users can subscribe/unsubscribe to streams, in order to manage delivery of notifications to their primary emailAddress, mobileNumber, native notifications on various devices, and webhooks on various third-party API clients.

Qbix Platform users can choose to handle all this privately using servers of their choice, to prevent third party platforms from snooping on traffic at scale.

Retaining and Releasing streams

Client-side Javascript can call Q.Streams.get(publisherId, streamName) with various in order to fetch a stream and its data in its latest state. Options to this method allow fetching the latest N messages, as well as participants in the stream, etc. (subject to access control checks)

Typically, this method is called by Qbix tools which want to display something about the stream. If the tool developer is interested in refresh()ing the interface of a tool whenever the stream changes, they would want to retain() the stream when calling the method.

This can be done by a simple call to Q.Streams.retainWith(tool).get(publisherId, streamName) which will retain a stream as long as the tool is active. Internally this calls retain/release methods, which release the stream when the last tool to retain it has been removed. These methods, in turn, call observe/neglect methods, to open a Q.Socket to the correct Node corresponding to the stream (obtained with Q.nodeUrl()) and start receiving real-time pushes of information.

Refreshing Streams

Multiple tools on a page can call stream.refresh() on various streams they are interested in. Q.Streams.refresh() can be called to trigger onRefresh() event on every retained stream, which in turn triggers every tool subscribed to the stream to refresh its interface to reflect the latest changes to the stream.

In any event, the Qbix Platform uses Javascript patterns like Q.getter() and Q.batcher() to cache, throttle and batch these requests together, and handle their responses in the order they were issued.

Tools that deal with streams should start out by using the retain/release/refresh functionality. However, periodically refreshing a tool’s entire interface might be pretty disorienting for a user, who might be witnessing the accumulation of a lot of changes to the stream. This is why, for incremental changes, there is additional functionality.

Incremental Interface Updates

Messages are designed to record any and all incremental changes to a stream, such as edits to a document or messages posted to a chat. The Streams_Message table stores essentially logs that can be replayed by anyone to reconstruct changes to a stream from any checkpoint.

The ordering of messages is kept consistent on the Qbix server, via Streams_Message.ordinal while the total number of messages in a stream is kept as Streams_Stream.messageCount.

To take advantage of incremental changes, and even animate them (e.g. moves in a Chess/game based on messages of type Chess/move), a tool developer would add an event listener to tool.onStateChanged(fieldNames) which returns a [Q.Event()](https://qbix.com/platform/guide/eventsClient). Actually, there is an all-in-one way to render a tool and subscribe to its incremental changes, [which you can read more about here](https://qbix.com/platform/guide/tools#designing).

Unseen Message Totals and Badges

Messages based on message types are kept in tables Streams_Total, so that browsers and clients can internally mark message total as “seen” for a specific type of message such as Streams/chat/message, enabling them to ask the server how many new messages of that type were posted. This is handled automatically by the Q.Streams.Message.Total Javascript class, with methods like latest(), seen(), unseen(), onSeen() event, and setUpElement() which automatically turns an element into a badge showing the number of unseen messages of a certain type (e.g. unread chats in a conversation).

This is typically used in “preview” tools with names like Streams/chat/preview and Media/episode/preview:


When you open a chat, the Streams/chat message requests all the latest messages and calls the Q.Streams.Message.Total.seen(messageTotal) which marks the new message total as seen, thereby updating / clearing the badges on all previews.

When a new message comes into the chat, an event from the Q.Streams.onMessage() event factory is triggered, and one or more tools that have been already activated on the page may handle this event and call Message.Total.seen() with the new total, thus preventing this message from showing up in badges of “unseen messages”.

Participants

Users with testWriteLevel("join") are able to join and leave a stream, resulting in Streams/joined and Streams/left messages being posted to it. Tools like Streams/chat may show this with messages like “so-and-so has joined / left the chat”.

Tools like Streams/participants, which are often placed below a stream player, listen for these messages in real-time, and are thus able to display the latest participants list of people joining and leaving, at least to users who has testReadLevel("participants"):

When a user joins a stream as a participant, the on the Streams plugin on the Qbix server automatically makes a note to start sending out all messages posted to the stream, until the user leaves. The result is that the user is getting messages from chats they are subscribed to, their “unseen message” badges are being updated, etc.

Of course, to participate in a stream, a user must be logged-in and have high-enough writeLevel access to do it. Meanwhile observe/neglect, can be used even by users who aren’t logged in, as long as the stream has a public readLevel at least equal to “messages”. In order to allow users who aren’t even logged-in to open a Q.Socket on certain pages, you need to add an entry under Q/capability/permissions and Users/capability/public configs.

Subscriptions and Notifications

A participant who has joined a stream may go further and subscribe to be notified whenever messages of a certain type are posted to it. Each subscription has a filter, which can contain regular expressions on stream types and message types, and other criteria checked by Qbix’s Node.js service before sending out the notifications.

Just like with stream access, the default settings for subscription filters on certain stream types can come from static templates and mutable templates for streams. As a fallback, they can come from Streams/types/$streamType/defaults and Streams/types/*/defaults config.

Each notification can result in an SMS message or an email being sent to the user’s primary mobile number or email address. It can also result in a native notification sent to one of their apps, or even a webhook to other servers, including Qbix servers, for further processing (such as private analytics, etc.)

You can also set up periodic cron jobs to call a Qbix back-end script to summarize the latest updates across a user’s subscriptions and notifications, and send them out once a day. Here is an example of what that could look like:

Decentralized Ecosystem

As more and more Qbix servers are deployed on the Internet, they can start to use subscriptions, notifications and web hooks to wire up various operations across multiple domains and services. For example:

  • Search Engines could not just crawl Qbix websites, but subscribe to be notified when content changes on a page, or even on an individual stream that is tied to multiple pages. This makes them a lot more up-to-date and efficient, not having to constantly pull information, but rather subscribe to voluntary pushes.

  • Aggregators such as events or real estate listings in a given area can subscribe to be automatically updated when a certain event is posted, or a certain house is sold. This can lead to private dating sites, sites for booking local services, etc.

  • Membership updates can also be propagated to others, letting them know when a payment has failed or a membership has lapsed, and reaching out to the user with an offer to renew it

Eventually, they can even start to use AI in the loop, in order to achieve distributed workflows like the following:


See more at https://engageusers.ai/ecosystem.pdf