Since a scheduled event is executed under the context for which the event-scheduler thread is running, a user can create a scheduled event that is out of the user’s context.
However, a user must have been GRANTed the EVENT privilege in order to create or modify a scheduled event.
So, review your GRANTs.
Start the event scheduler thread:
Starting in version 5.1.6 of mysql server scheduled events are featured and secure.
You can configure the event-scheduler thread to run (see `show processlist`) by modifying the `/etc/my.cnf` file as follows:
service mysqld restart
This is an example of CREATE EVENT syntax that is related to flow-inspector.
This event runs once a day, starting today, at midnight, deleting some records that are older than one month.
use flowinspector; DELIMITER | CREATE EVENT `flowinspector_dump` ON SCHEDULE EVERY 1 DAY STARTS date_format(now(), '%Y-%m-%d 00:00:00') ON COMPLETION NOT PRESERVE ENABLE DO BEGIN set @a=unix_timestamp(date_sub(now(),interval 1 month)); delete from flowinspector.flows_600 where bucket < @a; delete from flowinspector.flows_aggr_600 where bucket < @a; delete from flowinspector.index_nodes_600 where bucket < @a; delete from flowinspector.index_ports_600 where bucket < @a; END; | DELIMITER ;
It appears that I missed the partial desublimation of ntopng, a newer version of ntop. I’ve yet to build it, but it looks great. Maybe it will replace the unstable flow-inspector that I am currently using. Unfortunately, I can not give it the time it deserves to contribute code at this point.
I will be keeping an eye on the ntop user’s mailing list that I am subscribed to via feedburner.
As of May 1st, 2013, Lothar Braun has yet to merge the module that exports from VERMONT to the flow-inspector redis queue called ipfixFlowInspectorExported to the mainline git repo for VERMONT, but it is located in a repo called merge-features.
I will be continuing from the first reference to this page from the page on Configuring Vermont.
Clone the VERMONT repo and append the merged-features repo:
git clone http://github.com/constcast/vermont.git cd ./vermont git branch merge-features origin/merge-features git checkout merge-features cmake -DSUPPORT_SCTP=OFF -DSUPPORT_REDIS=ON -DWITH_TOOLS=OFF . make #find and remove the follow section from .\CMakeList.txt #INSTALL(FILES ipfix-config-schema.xsd # DESTINATION share/vermont #) make install
You may continue back to section “Download and Build the VERMONT Manager web UI” (if you wish) on Configure VERMONT (VERsatile MONitoring Toolkit) on CentOS6.
Start dumping flows to the redis queue:
The binary is located: /usr/local/bin/vermont
The flow-inspector config is located: ./configs/flowinspector_exporter.xml
cp /usr/local/share/vermont/configs/flowinspector_exporter.xml /usr/local/share/vermont/configs/flowinspector_exporter_original.xml
Refer to the Understanding a Vermont config file and modify `flowinspector_exporter.xml` as necessary.
Start the VERMONT probe:
/usr/local/bin/vermont -f /usr/local/share/vermont/configs/flowinspector_exporter.xml
As Lothar describes in detail the waiting for data in his writeup, the following occurs:
1) VERMONT has a timeout on active flows of 10 minutes and inactive flows of 5 minutes. This means, it doesn’t push the flows to the redis queue until that time. You will see “Processed 0 flows” in the output of preprocess.py when there are no flows being pushed.
2) flow-inspector also caches data in the redis queue for five minutes before flushing it to the backend DB to be presented over the web UI. You will see “Live import. Flushing caches …” in the output of preprocess.py when this process occurs.
greping the source of flow-inspector for the word “import redis” (excluding “vendor”) reveals two scripts:
Real time redis queue population:
`preprocess.py` contains a method to populate the redis queue as data is presented.
This is done using the python module redis’s blpop(), which listens to the queue.
if obj == "END": print "%s: Reached END. Terminating..." % (datetime.datetime.now()) print "%s: Flusing caches. Do not terminate this process or you will have data loss!" break
1) If a string “END” pops into the queue, then the program is closed.
2) If a string that is JSON comes in (determined if it can be parsed with json.loads()), then the following occurs:
try: obj = json.loads(obj) obj[common.COL_FIRST_SWITCHED] = int(obj[common.COL_FIRST_SWITCHED]) obj[common.COL_LAST_SWITCHED] = int(obj[common.COL_LAST_SWITCHED]) for s in config.flow_aggr_sums: obj[s] = int(obj[s]) except ValueError, e: print >> sys.stderr, "Could not decode JSON object in queue: ", e continue
1) A check for the presence of a definition for flowStartSeconds, and an attempt to convert it to an integer are made.
2) A check for the presence of a definition for flowEndSeconds, and an attempt to convert it to an integer are made.
3) The values from the config file’s (./config/config.py) setting for flow_aggr_sums are then read, then then are used to pull data from the JSON object. By default these are: packetDeltaCount and octetDeltaCount.
4) except …
# only import flow if it is newer than config.max_flow_time if config.max_flow_age != 0 and obj[common.COL_FIRST_SWITCHED] < (time.mktime(datetime.datetime.utcfromtimestamp(time.time()).timetuple()) - config.max_flow_age): print "Flow is too old to be imported into mongodb. Skipping flow ..." continue
Skip the flow if: the value from the config file setting max_flow_age is not equal to 0 and the flowStartSeconds provided within the JSON data is greater than the current time minus the max_flow_age. By default, the max_flow_age value is equal to 0.
for handler in handlers: handler.handleFlow(obj) common.update_node_index(obj, node_index_collection, config.flow_aggr_sums) common.update_port_index(obj, port_index_collection, config.flow_aggr_sums, known_ports) output_flows += 1
This is where most of the work is done. handlers was set much earlier and contains instances of FlowHandler() objects. The contained FlowHandlers are defined by flow_bucket_sizes, and a bunch of great values from the config file and contains the destination database table and column information.
I’ll stop before I analyze the FlowHandler() class, as I’m simply trying to understand what the redis queue objects should look like.
It appears that flowStartSeconds, flowEndSeconds, and the contents of the flow_aggr_values and flow_aggr_sums should be sent to the redis queue. By default the list would be as follows:
All of these objects are IPFIX information export entities and can be configured to be created and passed by Vermont’s packetAggregator module.
… to the redis queue as JSON as follows:
key = the information entity
value = string value
Not overwhelmingly hard.
One time import to redis queue:
`import_db_to_redis.py` contains a method to take idle stuff and import it into the redis queue.
This is done using the python module redis’s rpush().
I will be summarizing the documentation available within the project’s github wiki to bring a simple understand of a fairly complex configuration process.
For our example we are concerned with setting up Vermont to be an IPFIX generator, receiver, and transmitter to a local redis queue “entry:queue.”
Note that as of April 30th, 2013, there is no redis queue output module for vermont. I have reached out to Lothar to find out more about this possibility. Until then, since our goal is to configure vermont to be a probe to populate flow-inspector’s backend DB, I will be using Vermont to receive PCAP and output to a database. However, I will cover outputting IPFIX packets using the ipfixExporter module. See the bottom of this page for more output options, including IpfixDbWriter which I will be using for my flow-inspector instance.
Working with the modular design:
Vermont config files operate like our friends’ nsclient++, nxlog, and rsyslog config files, in a modular design.
The data flow is decided upon by the “id” of each module (declared in the module root xml node), and the “next” child node of that module root node.
Multiple “next” tags can be included, but there are special stipulations for this. Translating docs from developer speak to admin speak:
1) Each receiving module will see the same packets, not a copy.
2) So… if a module modifies the data, then it is modified. This can be confusing for the other modules.
If multiple receivers are used, then a queue can be used. This allows the modules to do work in a synchronous manner.
Configuring an IPFIX generator:
We’ll be working off a file ipfix-export.xml and I will describe simply non-obvious portions.
A lot of these are defined in code: ./src/modules/ipfix/*.h
- The root node is the name of the config.
- “sensorManager”: use the “checkInterval” setting to control how many frequently (in seconds) sensors are polled.
- “observer” (Input type: none, Output type: Packet): takes input from pcap interface.
- “packetQueue” (Input type: Packet, Output type: Packet): holds packets in queue, up to child node “maxSize”, until the “next” module is ready. If full, pauses previous module. The large maxSize, the more RAM is utilized.
- “packetAggregator” (Input type: Packet, Output type: IpfixRecord): takes incoming packets and makes IPFIX records out of them, using the provided settings:
- “rule”: defines scope
- “templateId”: IPFIX Template ID.
- “flowKey”: Parent node of ieName… that includes in aggregation? **I don’t understand the wiki’s definition: “Flow key information element – flows are aggregated according to those keys.”
- “nonFlowKey”: Parent node of ieName… that excludes from aggregation? **I don’t understand the wiki’s definition: “Non-flow key information element – those IEs are aggregated.”
- “ieName”: IPFIX information elements
- “match”: matches the ieName
- protocolIdentifier: “TCP”, “UDP”, “ICMP”, or IANA number (for IPv4 RFC791, for IPv6 RFC2460).
- (sourceIPv4Address, destinationIPv4Address, ipNextHopIPv4Address, bgpNextHopIPv4Address, sourceIPv4Prefix, destinationIPv4Prefix, mplsTopLabelIPv4Address, exporterIPv4Address, collectorIPv4Address, postNATSourceIPv4Address, postNATDestinationIPv4Address, staIPv4Address), (sourceIPv6Address, destinationIPv6Address, ipNextHopIPv6Address, bgpNextHopIPv6Address, exporterIPv6Address, mplsTopLabelIPv6Address, destinationIPv6Prefix, sourceIPv6Prefix, collectorIPv6Address, postNATSourceIPv6Address, postNATDestinationIPv6Address) (not clear if supports ipv6): for IPv4 use CIDR notation, for IPv6 see RFC5952.
- udpSourcePort, udpDestinationPort, tcpSourcePort, tcpDestinationPort, sourceTransportPort, destinationTransportPort, collectorTransportPort, exporterTransportPort, postNAPTSourceTransportPort, postNAPTDestinationTransportPort: an unsigned16 that represents a destination port (for UDP see RFC768, for TCP see RFC793, for SCTP see RFC2960, for NAPT see RFC3022. Note that a port range is can be defined as [start port]:[end port].
- tcpControlBits: “URG”, “ACK”, “PSH”, “RST”, “SYN”, “FIN”. Combine in a comma-separated list.
- “expiration”: defines scope
- “inactiveTimeout”: timeout for inactive flows.
- “unit”: “sec”, “msec”, “usec”.
- “activeTimeout”: timeout for long-lasting active flows.
- “inactiveTimeout”: timeout for inactive flows.
- “pollInterval”: interval when flow should be passed to next module.
- “rule”: defines scope
- “ipfixQueue” (Input type: IpfixRecord; Output type: IpfixRecord): holds IPFIX records in queue until the next module is ready to process them.
- “ipfixExporter” (Input type: IpfixRecord, Output type: none): module that sends IPFIX outbound to the network.
- “observationDomainId”: IPFIX spec for observation domain as defined in RFC5153
- “transportProtocol”: aka “exportTransportProtocol” in the IPFIX spec. Accepted values in the code: “17” and “UDP”, “132” and “SCTP”, “DTLS_OVER_UDP”, “DTLS_OVER_SCTP”, “TCP”
Configuring an IPFIX to MySQL DB receiver:
You should now understand the concept of the modular design, and by referring to the available module list and their input and output specs, should understand how to implement modules effectively.
Here is what we generally want:
packets -> NIC of Vermont probe -> take certain fields -> DB
Knowing the list of available modules, we already know how to configure Vermont to be an IPFIX receiver to MySQL DB receiver:
observer -> packetqueue -> packetaggregator -> ipfixqueue -> ipfixdbwriter
Outputting to stdout:
With the modules and some cli-fu:
observer -> packetqueue -> packetaggregator -> ipfixprinter -> pipe -> [data processing script] -> redis API or `redis-cli -x`
The challenge here is that the output of ipfixprinter to stdout is not engineered to be input to `redis-cli -x`, so a script or program to process the data is necessary.
Tuning for performance:
As evidenced in the above example, queuing becomes sort of an arbitrarily important part of your probe config design. The queues are memory mapped, so this is a consideration, low RAM = not so large queues; while saturated CPU would mean you need a larger queue. Some monitoring and tweaking would need to be done in this area.
Four facts should be gained: incoming packet flow, process RAM utilization, process CPU utilization, outgoing packet flow. This will give you very basic information on if you can tweak your queues better. Remember, unpredictably large packet flow might prove difficult to engineer for.
I have published a new page called Implement flow-inspector on CentOS6.
I’m going to circle back to Vermont, as it natively supports pushing flow data to the redis queue used by flow-inspector.
flow-inspector: open source web UI and backend processor and storage to visualize network flows with d3.js!
Update Devember 7th, 2012: I’ve started an email thread with Lothar Braun and Mario Volke; Lothar got back to me right away with some questions about usage and to explain the inner workings of flow-inspector some more.
I’ve sent him some info on argus, and if I understand his implication correctly, the process to push data from anything (including argus) to flow-inspector is very simple:
1) define fields in flow_aggr_values and flow_aggr_sums (you wish to pivot against/process data for).
2) push JSON (field:data) to a redis queue named ‘entry:queue’.
3) processor.py connects to the redis queue, processes some data and places it in to the configured DB (waiting for an entry simply called “END” to stop listening).
Wow! How did I miss this?!
flow-inspector is EXACTLY what I wanted to build for use with argus. And I kid you not, EXACTLY what I was looking for.
Thank you guys so much for releasing this open source and releasing it at all! Some times (too many times) I come across research papers and the research source is never released. This is exactly what is needed by the community.
Mario Volke is the original developer.
Now to work argus into it.