Understanding the redis queue for flow-inspector
greping the source of flow-inspector for the word “import redis” (excluding “vendor”) reveals two scripts:
Real time redis queue population:
`preprocess.py` contains a method to populate the redis queue as data is presented.
This is done using the python module redis’s blpop(), which listens to the queue.
if obj == "END": print "%s: Reached END. Terminating..." % (datetime.datetime.now()) print "%s: Flusing caches. Do not terminate this process or you will have data loss!" break
1) If a string “END” pops into the queue, then the program is closed.
2) If a string that is JSON comes in (determined if it can be parsed with json.loads()), then the following occurs:
try: obj = json.loads(obj) obj[common.COL_FIRST_SWITCHED] = int(obj[common.COL_FIRST_SWITCHED]) obj[common.COL_LAST_SWITCHED] = int(obj[common.COL_LAST_SWITCHED]) for s in config.flow_aggr_sums: obj[s] = int(obj[s]) except ValueError, e: print >> sys.stderr, "Could not decode JSON object in queue: ", e continue
1) A check for the presence of a definition for flowStartSeconds, and an attempt to convert it to an integer are made.
2) A check for the presence of a definition for flowEndSeconds, and an attempt to convert it to an integer are made.
3) The values from the config file’s (./config/config.py) setting for flow_aggr_sums are then read, then then are used to pull data from the JSON object. By default these are: packetDeltaCount and octetDeltaCount.
4) except …
# only import flow if it is newer than config.max_flow_time if config.max_flow_age != 0 and obj[common.COL_FIRST_SWITCHED] < (time.mktime(datetime.datetime.utcfromtimestamp(time.time()).timetuple()) - config.max_flow_age): print "Flow is too old to be imported into mongodb. Skipping flow ..." continue
Skip the flow if: the value from the config file setting max_flow_age is not equal to 0 and the flowStartSeconds provided within the JSON data is greater than the current time minus the max_flow_age. By default, the max_flow_age value is equal to 0.
for handler in handlers: handler.handleFlow(obj) common.update_node_index(obj, node_index_collection, config.flow_aggr_sums) common.update_port_index(obj, port_index_collection, config.flow_aggr_sums, known_ports) output_flows += 1
This is where most of the work is done. handlers was set much earlier and contains instances of FlowHandler() objects. The contained FlowHandlers are defined by flow_bucket_sizes, and a bunch of great values from the config file and contains the destination database table and column information.
I’ll stop before I analyze the FlowHandler() class, as I’m simply trying to understand what the redis queue objects should look like.
It appears that flowStartSeconds, flowEndSeconds, and the contents of the flow_aggr_values and flow_aggr_sums should be sent to the redis queue. By default the list would be as follows:
All of these objects are IPFIX information export entities and can be configured to be created and passed by Vermont’s packetAggregator module.
… to the redis queue as JSON as follows:
key = the information entity
value = string value
Not overwhelmingly hard.
One time import to redis queue:
`import_db_to_redis.py` contains a method to take idle stuff and import it into the redis queue.
This is done using the python module redis’s rpush().