Agent Pipeline
Last updated
Last updated
The goal of the Agent Pipeline is to automate aspects of our system, configurable by the end users of our system.
The automation pipeline is based heavily on Huginn, a data pipeline tool for having "automated agents" that operate on your behalf. The best way to understand what the pipeline is trying to accomplish is to read and watch the tutorial as provided by Huginn.
From the baseline understanding provided above, here are the BIG things that are different about how our system works versus Huginn.
Events are stored in Cassandra. The implications of this are significant. How Huginn sorted and queries for which events to process was primarily based on, and counting on, the idea that an event ID would be an integer. So, it would only retrieve events past a sequence number. Cassandra IDs cannot be used this way.
Our events/links have "groups" on them that indicate conditions of the event. For example, if you have an Email agent and it receives an email, it will pass an event that it received an email, like Huginn. But unlike Huginn, ours will also have a group tag of "on_receive" that is used to filter the event to only be handles by downstream agents that want an "on_receive" event from its upstream agent. This is a core difference, and fundamental to how our system runs.
Our system can "split" the stream to do split tests, Huginn could not. In Huginn, everything has to be passed to every downstream agent. Obviously for use cases like a traditional marketing split test, this doesn't work. So, we added the concept of "event groups" to Huginn. These event groups essentially let you split the stream. Downstream agents can choose to take "all" of the events from their upstream agents. Or, they can listen to a subset. This is handled with an "event group". This also means we have had to implement things like "merge" to merge them all back together again.
Projects/Workflow. Huginn is all in one "space." There is no concept of walled off events or sections. In our automation system, you have a "Workflow" that contains an automation. This keeps them separate, and lets you measure metrics/performance for each workflow you're running. A single project can have multiple workflows. And, a single person/team can have multiple project.
GUI. Huginn does not have a GUI. Instead, it relies on you to configure JSON configuration hashes in its UI. We have a drag and drop graphical workflow builder.
Agents - which are the primary component of the system.
Links - which are the connections between agents (shown in the diagram above as a Connection as this is what it's called in the V1 production version). In v2, they are called Links.
Events - which are an event that is happening in the system.
An agent can source or receive events, and processed these events and passes NEW events downstream. If an agent sources an event, they will create the event with their OWN id, and potentially tags of "groups" which indicate a particular status of event. For example, if we have an EmailChecker agent, it would pass an event when it receives a new message in the inbox. This message would have a group tag that was "on_receive" to indicate that it was a message that was received.
Agents can process events two ways. The first is by the #check method. This is what is called when an agent has a schedule. The schedule calls the check method, to see if it needs to perform any action. The action is checked, which should create events. For example, if I have an agent set to run every 5 minutes, it will call the check action on the agent every 5 minutes.
The other is by "reacting" to the upstream agent. When an upstream agent creates an event, there is an "notify!" called which creates a job that "routes" the event to the right agents for further processing. This job reads the event, the source agent, and its links, and finds the agents that are expecting to #receive events from this agent (with the appropriate groups). Then, a job is created for the agent to handle this event. All of this is done with ActiveJob on top of Sidekiq. This is performed using PubSub processing using Whisper.
Agents can label their events into groups, and other agents can subscribe (LINK) to a single event type. If you have watched the video of our V1 workflow, you have seen that when we drop an agent onto the canvas, on the right hand side there is a selection circle that has a spinning circle out of it. These are the "on_x" methods that this Agent supports to push events with. When an event matches one of these events, it should push them with this item.
In the V2 of the drag and drop interface, we have a UI based on Rappid.js framework. This framework speeds our delivery time of the drag and drop interface, keeping us from reinventing the wheel. On top of this, there is significant customization to implement the configuration wheel and configuration panels of the system.
When the new workflow is created, the canvas is empty. The #options of the workflow look like this:
{}
On the left hand side, there is a stencil. This stencil contains agents that are preconfigured with options for this project. So, when a user drops one of these onto the canvas, it has a pre-configured set of options for the agent. These are retrieved by the React application through the API, at /api/v1/projects/:project_id/workflows/a_gent_types. Because it's scoped to the project, all the project-based settings are available on the configuration panels.
There are three types of agents: trigger, action, and filter.
Trigger agents "start" a flow. They cannot receive events. Most commonly, our users will use a Mention segment or a person segment--which triggers when any new mention or person matching that segment is added to the system. These will produce an event that contains the ID of the matching person/company/mention.
We can also trigger on changes to a list. If you manually add a person to a list at a stage, this can also trigger a workflow.
Action agents perform actions. They can either be grooming, send messages, check for data, etc. Lots of activities can happen with the actions.
Filter agents check the incoming data, and evaluate to true or false. If true, they pass actions. If false, the event flow terminates.
When an agent is dropped onto the canvas, it creates a "cell" in the workflow #options, and then the workflow automatically saves (PUT /api/v1/projects/:id/workflows/:id) . Any time that the workflow is saved, it is evaluated as to whether its valid or not, using the AutomationBuilder. The return value from the API is :ok if its able to save, and the meta shows the errors, the error count, and the agent count of the workflow.
All agents on the canvas correspond to a "cell".
The other cell type, is a link. When 2 agents are linked together, the link between them is also a cell. This cell can have an event group (group_id) on it, or not. Anytime two agents are linked together, this causes the system to save as well.
A listing inside a workflow#options looks typically like this:
The field configuration from the agent is in the "config" part of this cell. it contains a "segment_id" which corresponds to a segment id in the project. This segment will emit events when a new mention matches the segment.
There is a lot of information on the cell that is related to the display of the cell on the canvas. For the purposes of the agent configuration, these should be saved, but ignored. We only care about the "config" part of the setup.
Note that the isValid is not a configuration.. its whether the cell meets the criteria of all of the fields that should be filled out, being filled out.
Every time the workflow changes, new version is saved.
Every time a workflow is saved, the updated configuration is checked to determine if it appears to be valid. If so, then it will notify as such. But, it's really only at build time that we know that a workflow is really ready to be launched.
Building a workflow means creating or updating the agents that exist in our system, configuring them, and then enabling them to generate/process events. This is done with the AutomationBuilder.
The AutomationBuilder class takes a workflow as the argument. It extracts the data from the options, and then loops through the cells and figures out if they are existing or not. We identify an agent as existing, by saving the Cell_id into the agent. So this is how we match up. IF a user that is using the UI deletes an agent, then re-adds the same agent type, it will have a different ID, and thus when building the Workflow, the existing agent will be deleted, and a new agent will be created.
Each agent is an ActiveRecord model, and uses validations to determine if its valid to run. If not, it will add its errors to the AutomationBuilder itself as errors, and list its cell_id as the problem, with a description of the issue(s).
You can use the AutomationBuilder to either #check or to build. If you check, then it won't create the agents.. (it will "new" them, but not save them).. or you can build them, and this will create the agents and activate them.
Once a workflow is created, it can be launched by toggling the "start" on the view. This will put the workflow with its state to active. If that is the case, then it should build the workflow and launch it.