Home > Uncategorized > Understanding the redis queue for flow-inspector

Understanding the redis queue for flow-inspector

greping the source of flow-inspector for the word “import redis” (excluding “vendor”) reveals two scripts:

./preprocess/preprocess.py
./preprocess/import_db_to_redis.py

Real time redis queue population:
`preprocess.py` contains a method to populate the redis queue as data is presented.
This is done using the python module redis’s blpop(), which listens to the queue.

Block 1:

if obj == "END":
    print "%s: Reached END. Terminating..." % (datetime.datetime.now())
    print "%s: Flusing caches. Do not terminate this process or you will have data loss!"
    break

1) If a string “END” pops into the queue, then the program is closed.
2) If a string that is JSON comes in (determined if it can be parsed with json.loads()), then the following occurs:

Block 2:

try:
    obj = json.loads(obj)
    obj[common.COL_FIRST_SWITCHED] = int(obj[common.COL_FIRST_SWITCHED])
    obj[common.COL_LAST_SWITCHED] = int(obj[common.COL_LAST_SWITCHED])
    for s in config.flow_aggr_sums:
        obj[s] = int(obj[s])
except ValueError, e:
    print >> sys.stderr, "Could not decode JSON object in queue: ", e
    continue

1) A check for the presence of a definition for flowStartSeconds, and an attempt to convert it to an integer are made.
2) A check for the presence of a definition for flowEndSeconds, and an attempt to convert it to an integer are made.
3) The values from the config file’s (./config/config.py) setting for flow_aggr_sums are then read, then then are used to pull data from the JSON object. By default these are: packetDeltaCount and octetDeltaCount.
4) except …

Block 3:

# only import flow if it is newer than config.max_flow_time
if config.max_flow_age != 0 and obj[common.COL_FIRST_SWITCHED] < (time.mktime(datetime.datetime.utcfromtimestamp(time.time()).timetuple()) - config.max_flow_age):
    print "Flow is too old to be imported into mongodb. Skipping flow ..."
    continue

Skip the flow if: the value from the config file setting max_flow_age is not equal to 0 and the flowStartSeconds provided within the JSON data is greater than the current time minus the max_flow_age. By default, the max_flow_age value is equal to 0.

Block 4:

for handler in handlers:
    handler.handleFlow(obj)

common.update_node_index(obj, node_index_collection, config.flow_aggr_sums)
common.update_port_index(obj, port_index_collection, config.flow_aggr_sums, known_ports)

output_flows += 1

This is where most of the work is done. handlers[] was set much earlier and contains instances of FlowHandler() objects. The contained FlowHandlers are defined by flow_bucket_sizes, and a bunch of great values from the config file and contains the destination database table and column information.

Outcome:
I’ll stop before I analyze the FlowHandler() class, as I’m simply trying to understand what the redis queue objects should look like.

It appears that flowStartSeconds, flowEndSeconds, and the contents of the flow_aggr_values and flow_aggr_sums should be sent to the redis queue. By default the list would be as follows:

  • flowStartSeconds
  • flowEndSeconds
  • sourceIPv4Address
  • destinationIPv4Address
  • sourceTransportPort
  • destinationTransportPort
  • protocolIdentifier
  • packetDeltaCount
  • octetDeltaCount

All of these objects are IPFIX information export entities and can be configured to be created and passed by Vermont’s packetAggregator module.

… to the redis queue as JSON as follows:
key = the information entity
value = string value

Not overwhelmingly hard.

 

One time import to redis queue:
`import_db_to_redis.py` contains a method to take idle stuff and import it into the redis queue.
This is done using the python module redis’s rpush().

Advertisements
  1. No comments yet.
  1. No trackbacks yet.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: