Home > Uncategorized > Understanding the redis queue for flow-inspector

Understanding the redis queue for flow-inspector

greping the source of flow-inspector for the word “import redis” (excluding “vendor”) reveals two scripts:


Real time redis queue population:
`preprocess.py` contains a method to populate the redis queue as data is presented.
This is done using the python module redis’s blpop(), which listens to the queue.

Block 1:

if obj == "END":
    print "%s: Reached END. Terminating..." % (datetime.datetime.now())
    print "%s: Flusing caches. Do not terminate this process or you will have data loss!"

1) If a string “END” pops into the queue, then the program is closed.
2) If a string that is JSON comes in (determined if it can be parsed with json.loads()), then the following occurs:

Block 2:

    obj = json.loads(obj)
    obj[common.COL_FIRST_SWITCHED] = int(obj[common.COL_FIRST_SWITCHED])
    obj[common.COL_LAST_SWITCHED] = int(obj[common.COL_LAST_SWITCHED])
    for s in config.flow_aggr_sums:
        obj[s] = int(obj[s])
except ValueError, e:
    print >> sys.stderr, "Could not decode JSON object in queue: ", e

1) A check for the presence of a definition for flowStartSeconds, and an attempt to convert it to an integer are made.
2) A check for the presence of a definition for flowEndSeconds, and an attempt to convert it to an integer are made.
3) The values from the config file’s (./config/config.py) setting for flow_aggr_sums are then read, then then are used to pull data from the JSON object. By default these are: packetDeltaCount and octetDeltaCount.
4) except …

Block 3:

# only import flow if it is newer than config.max_flow_time
if config.max_flow_age != 0 and obj[common.COL_FIRST_SWITCHED] < (time.mktime(datetime.datetime.utcfromtimestamp(time.time()).timetuple()) - config.max_flow_age):
    print "Flow is too old to be imported into mongodb. Skipping flow ..."

Skip the flow if: the value from the config file setting max_flow_age is not equal to 0 and the flowStartSeconds provided within the JSON data is greater than the current time minus the max_flow_age. By default, the max_flow_age value is equal to 0.

Block 4:

for handler in handlers:

common.update_node_index(obj, node_index_collection, config.flow_aggr_sums)
common.update_port_index(obj, port_index_collection, config.flow_aggr_sums, known_ports)

output_flows += 1

This is where most of the work is done. handlers[] was set much earlier and contains instances of FlowHandler() objects. The contained FlowHandlers are defined by flow_bucket_sizes, and a bunch of great values from the config file and contains the destination database table and column information.

I’ll stop before I analyze the FlowHandler() class, as I’m simply trying to understand what the redis queue objects should look like.

It appears that flowStartSeconds, flowEndSeconds, and the contents of the flow_aggr_values and flow_aggr_sums should be sent to the redis queue. By default the list would be as follows:

  • flowStartSeconds
  • flowEndSeconds
  • sourceIPv4Address
  • destinationIPv4Address
  • sourceTransportPort
  • destinationTransportPort
  • protocolIdentifier
  • packetDeltaCount
  • octetDeltaCount

All of these objects are IPFIX information export entities and can be configured to be created and passed by Vermont’s packetAggregator module.

… to the redis queue as JSON as follows:
key = the information entity
value = string value

Not overwhelmingly hard.


One time import to redis queue:
`import_db_to_redis.py` contains a method to take idle stuff and import it into the redis queue.
This is done using the python module redis’s rpush().

  1. No comments yet.
  1. No trackbacks yet.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: