Build projectM on OSX
This is my second attempt at building projectM on OSX.
Since the last time I attempted, MacPorts has increased some version, my confidence in *nix has increased (not sure if my skills have progressed with it).
Download prerequisites:
1) download and install the latest package from the macports site.
2) download and build pulseaudio (source):
cd git clone git://anongit.freedesktop.org/pulseaudio/pulseaudio git clone git://github.com/zonque/PulseAudioOSX.git cd PulseAudioOSX/deploy ./bootstrap_machine.sh #this will take a very long time (as in seriously long time, 30+ minutes) cd PulseAudioOSX/deploy ./deploy.sh #breaks repeatedly... no idea how to progress cd PulseAudioOSX/deploy/output ls -al
2b) instead of pulse, I’m trying jack.
Download latest from http://www.jackosx.com/
3) download projectM:
cd git clone http://git.code.sf.net/p/projectm/code projectm-code cd projectm-code/src ccmake . #hit C #make some changes (as necessary)... our master plan is to be able to route system audio to projectM #hit G make && make install
Effectively parsing PCAPs with python’s querycsv
Here is a quick method I used today, before implementing an argus probe in a sustainable way, to create and parse PCAPs to determine high bandwidth offends.
Create PCAPs:
ifconfig eth0 promisc netstat -i | grep eth #check for the P flag mkdir /pcaps/ nohup tcpdump -i 1 -w /pcaps/pcap -C 20 -W 250 'host 192.168.100.27' -Z root & #not really secure, look at -Z
Note that querycsv.py/sqlite3.py doesn’t like _underscores_, -dashes-, and files that contains only numbers.
This will generate /pcaps/pcapNNN…
Process PCAPs on a Windows box with tshark.exe:
echo echo ipsrc,ipdst,dstporttcp,srcporttcp,len ^> %1.csv > %temp%\tshark_len.bat echo ".\Wireshark\tshark.EXE" -r %1 -T fields -e ip.src -e ip.dst -e tcp.dstport -e tcp.srcport -e frame.len -E separator=, -E quote=d ^>^> %1.csv >> %temp%\tshark_len.bat for /r . %G in (pcap*) do %temp%\tshark_len.bat %G
`tcpdump` has relatively the same syntax.
Query CSVs with SQL statements:
Use a python module to quickly return calculations (http://pythonhosted.org/querycsv/):
pip install querycsv
Per-file operations
Find the total bytes incoming to host:
querycsv.py -i test.csv "select ipsrc, ipdst, dstporttcp, dstporttcp, srcporttcp, sum(len) as 'len_in_bytes' from test group by ipsrc" 1 Find the total bytes outgoing from host: 1 querycsv.py -i test.csv "select ipsrc, ipdst, dstporttcp, dstporttcp, srcporttcp, sum(len) as 'len_in_bytes' from test group by ipdst"
For all files:
echo ipsrc,ipdst,dstporttcp,srcporttcp,len > mergedpcap.csv cat *.csv | grep -v "ipsrc,ipdst,dstporttcp,srcporttcp,len" >> mergedpcap.csv
Find the total bytes incoming to host:
querycsv.py -i mergedpcap.csv "select distinct dstporttcp, ipsrc, ipdst, sum(len) as 'len_in_bytes' from mergedpcap group by ipdst" > incoming.log [/source ] The record where ipsrc is the targeted host (in this case 192.168.100.27), will return the TOTAL length of all packets sent from the targeted host. (all uploadeded, yes UPloaded, bytes) Find the total bytes outgoing from host: 1 querycsv.py -i mergedpcap.csv "select distinct dstporttcp, ipsrc, ipdst, sum(len) as 'len_in_bytes' from mergedpcap group by ipdst" > outgoing.log
The record where ipdst is the targeted host (in this case 192.168.100.27), will return the TOTAL length of all packets sent to the targeted host. (all downloaded, yes DOWNloaded, bytes)
Other solutions:
You’ll notice that querycsv first imports the csv data to an in memory sqlite3 db. This makes offering a full set of sql queries and functions trivial.
There exists other options to solve this sort of situation:
1) PCAP to SQL platforms: pcap2sql and c5 sigma.
2) SQL querying PCAP directly: PacketQ (which lacks some SQL queries and functions, see here). Here is a neat example of displaying some results.
3) robust solutions like pcapr-local, with integration to mu studio.
Quick script to parse Windows DNS logs and produce a report
grep "[client address]" dns.log | grep "\ R\ " | grep -v "SERVFAIL]" | awk "{print $1,$2,$16}" | awk "{$1=$1}1" OFS=","
grep "[client address]" dns.log | grep "\ R\ " | grep -v "SERVFAIL]" | awk "{print $16}" | sed "s/([0-9])\|([0-9][0-9])/./g" | sed "s/^.//g" | sort | uniq
Using ipfixFlowInspectorExporter with VERMONT merge-features branch
As of May 1st, 2013, Lothar Braun has yet to merge the module that exports from VERMONT to the flow-inspector redis queue called ipfixFlowInspectorExported to the mainline git repo for VERMONT, but it is located in a repo called merge-features.
I will be continuing from the first reference to this page from the page on Configuring Vermont.
Clone the VERMONT repo and append the merged-features repo:
git clone http://github.com/constcast/vermont.git cd ./vermont git branch merge-features origin/merge-features git checkout merge-features cmake -DSUPPORT_SCTP=OFF -DSUPPORT_REDIS=ON -DWITH_TOOLS=OFF . make #find and remove the follow section from .\CMakeList.txt #INSTALL(FILES ipfix-config-schema.xsd # DESTINATION share/vermont #) make install
You may continue back to section “Download and Build the VERMONT Manager web UI” (if you wish) on Configure VERMONT (VERsatile MONitoring Toolkit) on CentOS6.
or…
Start dumping flows to the redis queue:
The binary is located: /usr/local/bin/vermont
The flow-inspector config is located: ./configs/flowinspector_exporter.xml
cp /usr/local/share/vermont/configs/flowinspector_exporter.xml /usr/local/share/vermont/configs/flowinspector_exporter_original.xml
Refer to the Understanding a Vermont config file and modify `flowinspector_exporter.xml` as necessary.
Start the VERMONT probe:
/usr/local/bin/vermont -f /usr/local/share/vermont/configs/flowinspector_exporter.xml
Patience…
As Lothar describes in detail the waiting for data in his writeup, the following occurs:
1) VERMONT has a timeout on active flows of 10 minutes and inactive flows of 5 minutes. This means, it doesn’t push the flows to the redis queue until that time. You will see “Processed 0 flows” in the output of preprocess.py when there are no flows being pushed.
2) flow-inspector also caches data in the redis queue for five minutes before flushing it to the backend DB to be presented over the web UI. You will see “Live import. Flushing caches …” in the output of preprocess.py when this process occurs.
Understanding ipFixPrinter module for Vermont
Here is the output of the various stdout formats available from ipfixprinter:
table:
srcip dstip srcport dstport prot srcpkts dstpkts srcoct dstoct srcstart srcend dststart dstend srcplen dstplen forcedexp revart flowcnt tranoct revtranoct 8.8.8.8 192.168.100.27 443 2050 0 4631 0 6889619 0 1367430677347 1367430688342 0 0 0 0 0 0 13105524809039621472 13160276945998446592 192.168.100.27 8.8.8.8 2050 443 0 2149 0 85960 0 1367430677347 1367430688342 0 0 0 0 0 0 13105480347538162928 13160276945998446592 192.168.100.170 192.168.100.23 22 5199 0 13 0 1532 0 1367430677348 1367430680400 0 0 0 0 0 0 13105524809039621472 13160276945998446592 192.168.101.22 192.168.100.36 34773 5432 0 83 0 12106 0 1367430677349 1367430688237 0 0 0 0 0 0 13105480347538162928 13160276945998446592
tree:
-+--- Ipfix Data Data Record (id=997, preceding=0) from non-IPv4 address:0 (0) `- fixed data ' `- protocolIdentifier (id=4, length=1) : UDP `- variable data ' `- sourceIPv4Address (id=8, length=5) : 192.168.100.111/32 ' `- destinationIPv4Address (id=12, length=5) : 192.168.100.251/32 ' `- sourceTransportPort (id=7, length=2) : 62523 ' `- destinationTransportPort (id=11, length=2) : 53 ' `- flowStartMilliSeconds (id=152, length=8) : 1367431737884 (Wed May 1 14:08:57 2013) ' `- flowEndMilliSeconds (id=153, length=8) : 1367431737884 (Wed May 1 14:08:57 2013) ' `- octetDeltaCount (id=1, length=8) : 67 ' `- packetDeltaCount (id=2, length=8) : 1 `--- -+--- Ipfix Data Data Record (id=997, preceding=0) from non-IPv4 address:0 (0) `- fixed data ' `- protocolIdentifier (id=4, length=1) : UDP `- variable data ' `- sourceIPv4Address (id=8, length=5) : 192.168.100.251/32 ' `- destinationIPv4Address (id=12, length=5) : 192.168.100.111/32 ' `- sourceTransportPort (id=7, length=2) : 53 ' `- destinationTransportPort (id=11, length=2) : 62523 ' `- flowStartMilliSeconds (id=152, length=8) : 1367431737890 (Wed May 1 14:08:57 2013) ' `- flowEndMilliSeconds (id=153, length=8) : 1367431737890 (Wed May 1 14:08:57 2013) ' `- octetDeltaCount (id=1, length=8) : 230 ' `- packetDeltaCount (id=2, length=8) : 1 `---
line:
Flow recvd. Flow start Duratn Prot Src IP:Port Dst IP:Port Pckts Bytes
-----------------------------------------------------------------------------------------------------------------
2013-05-01 14:10:25.120 2013-05-01 14:10:24 2799331.0944 --- 192.168.100.21:49357 4.2.2.1:443 1 130
2013-05-01 14:10:25.120 2013-05-01 14:10:24 2799332.0944 --- 4.2.2.1:443 192.168.100.21:49321 1 40
It appears that the best way to be able to use the stdout is via the `table` option.
Here is a way to create output in command separated list using awk:
/usr/local/bin/vermont -f /usr/local/share/vermont/configs/flow-inspector-stdout_table.xml | awk '{$1=$1}1' OFS=","
The field output is:
srcip,dstip,srcport,dstport,prot,srcpkts,dstpkts,srcoct,dstoct,srcstart,srcend,dststart,dstend,srcplen,dstplen,forcedexp,revstart,flowcnt,tranoct,revtranoct
./src/modules/ipfix/IpfixPrinter.cpp contains:
void IpfixPrinter::printTableRecord(IpfixDataRecord* record)
{
Connection c(record);
fprintf(fh, "%s\t%s\t%hu\t%hu\t%hhu\t%llu\t%llu\t%llu\t%llu\t%llu\t%llu\t%llu\t%llu\t%u\t%u\t%hhu\t%hhu\t%u\t%llu\t%llu\n", IPToString(c.srcIP).c_str(), IPToString(c.dstIP).c_str(), ntohs(c.srcPort), ntohs(c.dstPort), c.protocol, (long long unsigned)ntohll(c.srcPackets), (long long unsigned)ntohll(c.dstPackets), (long long unsigned)ntohll(c.srcOctets), (long long unsigned)ntohll(c.dstOctets), (long long unsigned)c.srcTimeStart, (long long unsigned)c.srcTimeEnd, (long long unsigned)c.dstTimeStart, (long long unsigned)c.dstTimeEnd, c.srcPayloadLen, c.dstPayloadLen, c.dpaForcedExport, c.dpaReverseStart, c.dpaFlowCount, (long long unsigned)c.srcTransOctets, (long long unsigned)c.dstTransOctets);
}
An IpfixDataRecord is defined in ./src/modules/ipfix/IpfixRecord.hpp:
class IpfixDataRecord : public IpfixRecord, public ManagedInstance<IpfixDataRecord> {
public:
IpfixDataRecord(InstanceManager<IpfixDataRecord>* im) : ManagedInstance<IpfixDataRecord>(im) {}
boost::shared_ptr<TemplateInfo> templateInfo;
int dataLength;
boost::shared_array<IpfixRecord::Data> message; /**< data block that contains @c data */
IpfixRecord::Data* data; /**< pointer to start of field data in @c message. Undefined after @c message goes out of scope. */
// redirector to reference remover of ManagedInstance
virtual void removeReference() { ManagedInstance<IpfixDataRecord>::removeReference(); }
virtual void addReference(int count = 1) { ManagedInstance<IpfixDataRecord>::addReference(count); }
};
Connection is defined in: ./src/modules/ipfix/Connection.h:
Back to redis for flow-inspector:
So, we need the following IPFIX information elements for input:
sourceIPv4Address = Connection.srcIP = first column
destinationIPv4Address = Connection.dstIP = second column
sourceTransportPort = Connection.srcPort = third column
destinationTransportPort = Connection.dstPort = fourth column
protocolIdentifier = Connection.protocol = fifth column
packetDeltaCount = n/a
octetDeltaCount = n/a
Both the latter are available in the “line” view of the ipFixPrinter module according to code in: ./src/modules/ipfix/IpfixPrinter.cpp as long as the fields aren’t NULL, so I guess they are null? hmm…
Understanding the redis queue for flow-inspector
greping the source of flow-inspector for the word “import redis” (excluding “vendor”) reveals two scripts:
./preprocess/preprocess.py ./preprocess/import_db_to_redis.py
Real time redis queue population:
`preprocess.py` contains a method to populate the redis queue as data is presented.
This is done using the python module redis’s blpop(), which listens to the queue.
Block 1:
if obj == "END":
print "%s: Reached END. Terminating..." % (datetime.datetime.now())
print "%s: Flusing caches. Do not terminate this process or you will have data loss!"
break
1) If a string “END” pops into the queue, then the program is closed.
2) If a string that is JSON comes in (determined if it can be parsed with json.loads()), then the following occurs:
Block 2:
try:
obj = json.loads(obj)
obj[common.COL_FIRST_SWITCHED] = int(obj[common.COL_FIRST_SWITCHED])
obj[common.COL_LAST_SWITCHED] = int(obj[common.COL_LAST_SWITCHED])
for s in config.flow_aggr_sums:
obj[s] = int(obj[s])
except ValueError, e:
print >> sys.stderr, "Could not decode JSON object in queue: ", e
continue
1) A check for the presence of a definition for flowStartSeconds, and an attempt to convert it to an integer are made.
2) A check for the presence of a definition for flowEndSeconds, and an attempt to convert it to an integer are made.
3) The values from the config file’s (./config/config.py) setting for flow_aggr_sums are then read, then then are used to pull data from the JSON object. By default these are: packetDeltaCount and octetDeltaCount.
4) except …
Block 3:
# only import flow if it is newer than config.max_flow_time
if config.max_flow_age != 0 and obj[common.COL_FIRST_SWITCHED] < (time.mktime(datetime.datetime.utcfromtimestamp(time.time()).timetuple()) - config.max_flow_age):
print "Flow is too old to be imported into mongodb. Skipping flow ..."
continue
Skip the flow if: the value from the config file setting max_flow_age is not equal to 0 and the flowStartSeconds provided within the JSON data is greater than the current time minus the max_flow_age. By default, the max_flow_age value is equal to 0.
Block 4:
for handler in handlers:
handler.handleFlow(obj)
common.update_node_index(obj, node_index_collection, config.flow_aggr_sums)
common.update_port_index(obj, port_index_collection, config.flow_aggr_sums, known_ports)
output_flows += 1
This is where most of the work is done. handlers[] was set much earlier and contains instances of FlowHandler() objects. The contained FlowHandlers are defined by flow_bucket_sizes, and a bunch of great values from the config file and contains the destination database table and column information.
Outcome:
I’ll stop before I analyze the FlowHandler() class, as I’m simply trying to understand what the redis queue objects should look like.
It appears that flowStartSeconds, flowEndSeconds, and the contents of the flow_aggr_values and flow_aggr_sums should be sent to the redis queue. By default the list would be as follows:
- flowStartSeconds
- flowEndSeconds
- sourceIPv4Address
- destinationIPv4Address
- sourceTransportPort
- destinationTransportPort
- protocolIdentifier
- packetDeltaCount
- octetDeltaCount
All of these objects are IPFIX information export entities and can be configured to be created and passed by Vermont’s packetAggregator module.
… to the redis queue as JSON as follows:
key = the information entity
value = string value
Not overwhelmingly hard.
One time import to redis queue:
`import_db_to_redis.py` contains a method to take idle stuff and import it into the redis queue.
This is done using the python module redis’s rpush().
