1723 lines
60 KiB
Plaintext
Executable File
1723 lines
60 KiB
Plaintext
Executable File
#! @PYTHON@
|
|
#
|
|
# Copyright (c) 2013 Nicira, Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at:
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
#
|
|
#
|
|
# The approximate_size code was copied from
|
|
# http://getpython3.com/diveintopython3/your-first-python-program.html#divingin
|
|
# which is licensed under # "Dive Into Python 3," Copyright 2011 Mark Pilgrim,
|
|
# used under a Creative Commons Attribution-Share-Alike license:
|
|
# http://creativecommons.org/licenses/by-sa/3.0/
|
|
#
|
|
#
|
|
|
|
"""Top like behavior for ovs-dpctl dump-flows output.
|
|
|
|
This program summarizes ovs-dpctl flow content by aggregating the number
|
|
of packets, total bytes and occurrence of the following fields:
|
|
|
|
- Datapath in_port
|
|
|
|
- Ethernet type
|
|
|
|
- Source and destination MAC addresses
|
|
|
|
- IP protocol
|
|
|
|
- Source and destination IPv4 addresses
|
|
|
|
- Source and destination IPv6 addresses
|
|
|
|
- UDP and TCP destination port
|
|
|
|
- Tunnel source and destination addresses
|
|
|
|
|
|
Output shows four values:
|
|
- FIELDS: the flow fields for example in_port(1).
|
|
|
|
- PACKETS: the total number of packets containing the flow field.
|
|
|
|
- BYTES: the total number of bytes containing the flow field. If units are
|
|
not present then values are in bytes.
|
|
|
|
- AVERAGE: the average packets size (BYTES/PACKET).
|
|
|
|
- COUNT: the number of lines in the dump-flow output contain the flow field.
|
|
|
|
Top Behavior
|
|
|
|
While in top mode, the default behavior, the following single character
|
|
commands are supported:
|
|
|
|
a - toggles top in accumulate and live mode. Accumulate mode is described
|
|
below.
|
|
|
|
s - toggles which column is used to sort content in decreasing order. A
|
|
DESC title is placed over the column.
|
|
|
|
_ - a space indicating to collect dump-flow content again
|
|
|
|
h - halt output. Any character will restart sampling
|
|
|
|
f - cycle through flow fields. The initial field is in_port
|
|
|
|
q - q for quit.
|
|
|
|
Accumulate Mode
|
|
|
|
There are two supported modes: live and accumulate. The default is live.
|
|
The parameter --accumulate or the 'a' character in top mode enables the
|
|
latter. In live mode, recent dump-flow content is presented.
|
|
Where as accumulate mode keeps track of the prior historical
|
|
information until the flow is reset not when the flow is purged. Reset
|
|
flows are determined when the packet count for a flow has decreased from
|
|
its previous sample. There is one caveat, eventually the system will
|
|
run out of memory if, after the accumulate-decay period any flows that
|
|
have not been refreshed are purged. The goal here is to free memory
|
|
of flows that are not active. Statistics are not decremented. Their purpose
|
|
is to reflect the overall history of the flow fields.
|
|
|
|
|
|
Debugging Errors
|
|
|
|
Parsing errors are counted and displayed in the status line at the beginning
|
|
of the output. Use the --verbose option with --script to see what output
|
|
was not parsed, like this:
|
|
$ ovs-dpctl dump-flows | ovs-dpctl-top --script --verbose
|
|
|
|
Error messages will identify content that failed to parse.
|
|
|
|
|
|
Access Remote Hosts
|
|
|
|
The --host must follow the format user@hostname. This script simply calls
|
|
'ssh user@Hostname' without checking for login credentials therefore public
|
|
keys should be installed on the system identified by hostname, such as:
|
|
|
|
$ ssh-copy-id user@hostname
|
|
|
|
Consult ssh-copy-id man pages for more details.
|
|
|
|
|
|
Expected usage
|
|
|
|
$ ovs-dpctl-top
|
|
|
|
or to run as a script:
|
|
$ ovs-dpctl dump-flows > dump-flows.log
|
|
$ ovs-dpctl-top --script --flow-file dump-flows.log
|
|
|
|
"""
|
|
|
|
# pylint: disable-msg=C0103
|
|
# pylint: disable-msg=C0302
|
|
# pylint: disable-msg=R0902
|
|
# pylint: disable-msg=R0903
|
|
# pylint: disable-msg=R0904
|
|
# pylint: disable-msg=R0912
|
|
# pylint: disable-msg=R0913
|
|
# pylint: disable-msg=R0914
|
|
|
|
import sys
|
|
import os
|
|
try:
|
|
##
|
|
# Arg parse is not installed on older Python distributions.
|
|
# ovs ships with a version in the directory mentioned below.
|
|
import argparse
|
|
except ImportError:
|
|
sys.path.append(os.path.join("@pkgdatadir@", "python"))
|
|
import argparse
|
|
import logging
|
|
import re
|
|
import unittest
|
|
import copy
|
|
import curses
|
|
import operator
|
|
import subprocess
|
|
import fcntl
|
|
import struct
|
|
import termios
|
|
import datetime
|
|
import threading
|
|
import time
|
|
import socket
|
|
|
|
|
|
##
|
|
# The following two definitions provide the necessary netaddr functionality.
|
|
# Python netaddr module is not part of the core installation. Packaging
|
|
# netaddr was involved and seems inappropriate given that only two
|
|
# methods where used.
|
|
def ipv4_to_network(ip_str):
|
|
""" Calculate the network given a ipv4/mask value.
|
|
If a mask is not present simply return ip_str.
|
|
"""
|
|
pack_length = '!HH'
|
|
try:
|
|
(ip, mask) = ip_str.split("/")
|
|
except ValueError:
|
|
# just an ip address no mask.
|
|
return ip_str
|
|
|
|
ip_p = socket.inet_pton(socket.AF_INET, ip)
|
|
ip_t = struct.unpack(pack_length, ip_p)
|
|
mask_t = struct.unpack(pack_length, socket.inet_pton(socket.AF_INET, mask))
|
|
network_n = [ii & jj for (ii, jj) in zip(ip_t, mask_t)]
|
|
|
|
return socket.inet_ntop(socket.AF_INET,
|
|
struct.pack('!HH', network_n[0], network_n[1]))
|
|
|
|
|
|
def ipv6_to_network(ip_str):
|
|
""" Calculate the network given a ipv6/mask value.
|
|
If a mask is not present simply return ip_str.
|
|
"""
|
|
pack_length = '!HHHHHHHH'
|
|
try:
|
|
(ip, mask) = ip_str.split("/")
|
|
except ValueError:
|
|
# just an ip address no mask.
|
|
return ip_str
|
|
|
|
ip_p = socket.inet_pton(socket.AF_INET6, ip)
|
|
ip_t = struct.unpack(pack_length, ip_p)
|
|
mask_t = struct.unpack(pack_length,
|
|
socket.inet_pton(socket.AF_INET6, mask))
|
|
network_n = [ii & jj for (ii, jj) in zip(ip_t, mask_t)]
|
|
|
|
return socket.inet_ntop(socket.AF_INET6,
|
|
struct.pack(pack_length,
|
|
network_n[0], network_n[1],
|
|
network_n[2], network_n[3],
|
|
network_n[4], network_n[5],
|
|
network_n[6], network_n[7]))
|
|
|
|
|
|
##
|
|
# columns displayed
|
|
##
|
|
class Columns:
|
|
""" Holds column specific content.
|
|
Titles needs to be less than 8 characters.
|
|
"""
|
|
VALUE_WIDTH = 9
|
|
FIELDS = "fields"
|
|
PACKETS = "packets"
|
|
COUNT = "count"
|
|
BYTES = "bytes"
|
|
AVERAGE = "average"
|
|
|
|
def __init__(self):
|
|
pass
|
|
|
|
@staticmethod
|
|
def assoc_list(obj):
|
|
""" Return a associated list. """
|
|
return [(Columns.FIELDS, repr(obj)),
|
|
(Columns.PACKETS, obj.packets),
|
|
(Columns.BYTES, obj.bytes),
|
|
(Columns.COUNT, obj.count),
|
|
(Columns.AVERAGE, obj.average),
|
|
]
|
|
|
|
|
|
def element_eth_get(field_type, element, stats_dict):
|
|
""" Extract eth frame src and dst from a dump-flow element."""
|
|
fmt = "%s(src=%s,dst=%s)"
|
|
|
|
element = fmt % (field_type, element["src"], element["dst"])
|
|
return SumData(field_type, element, stats_dict["packets"],
|
|
stats_dict["bytes"], element)
|
|
|
|
|
|
def element_ipv4_get(field_type, element, stats_dict):
|
|
""" Extract src and dst from a dump-flow element."""
|
|
fmt = "%s(src=%s,dst=%s)"
|
|
element_show = fmt % (field_type, element["src"], element["dst"])
|
|
|
|
element_key = fmt % (field_type, ipv4_to_network(element["src"]),
|
|
ipv4_to_network(element["dst"]))
|
|
|
|
return SumData(field_type, element_show, stats_dict["packets"],
|
|
stats_dict["bytes"], element_key)
|
|
|
|
|
|
def element_tunnel_get(field_type, element, stats_dict):
|
|
""" Extract src and dst from a tunnel."""
|
|
return element_ipv4_get(field_type, element, stats_dict)
|
|
|
|
|
|
def element_ipv6_get(field_type, element, stats_dict):
|
|
""" Extract src and dst from a dump-flow element."""
|
|
|
|
fmt = "%s(src=%s,dst=%s)"
|
|
element_show = fmt % (field_type, element["src"], element["dst"])
|
|
|
|
element_key = fmt % (field_type, ipv6_to_network(element["src"]),
|
|
ipv6_to_network(element["dst"]))
|
|
|
|
return SumData(field_type, element_show, stats_dict["packets"],
|
|
stats_dict["bytes"], element_key)
|
|
|
|
|
|
def element_dst_port_get(field_type, element, stats_dict):
|
|
""" Extract src and dst from a dump-flow element."""
|
|
element_key = "%s(dst=%s)" % (field_type, element["dst"])
|
|
return SumData(field_type, element_key, stats_dict["packets"],
|
|
stats_dict["bytes"], element_key)
|
|
|
|
|
|
def element_passthrough_get(field_type, element, stats_dict):
|
|
""" Extract src and dst from a dump-flow element."""
|
|
element_key = "%s(%s)" % (field_type, element)
|
|
return SumData(field_type, element_key,
|
|
stats_dict["packets"], stats_dict["bytes"], element_key)
|
|
|
|
|
|
# pylint: disable-msg=R0903
|
|
class OutputFormat:
|
|
""" Holds field_type and function to extract element value. """
|
|
def __init__(self, field_type, generator):
|
|
self.field_type = field_type
|
|
self.generator = generator
|
|
|
|
##
|
|
# The order below is important. The initial flow field depends on whether
|
|
# --script or top mode is used. In top mode, the expected behavior, in_port
|
|
# flow fields are shown first. A future feature will allow users to
|
|
# filter output by selecting a row. Filtering by in_port is a natural
|
|
# filtering starting point.
|
|
#
|
|
# In script mode, all fields are shown. The expectation is that users could
|
|
# filter output by piping through grep.
|
|
#
|
|
# In top mode, the default flow field is in_port. In --script mode,
|
|
# the default flow field is all.
|
|
#
|
|
# All is added to the end of the OUTPUT_FORMAT list.
|
|
##
|
|
OUTPUT_FORMAT = [
|
|
OutputFormat("in_port", element_passthrough_get),
|
|
OutputFormat("eth", element_eth_get),
|
|
OutputFormat("eth_type", element_passthrough_get),
|
|
OutputFormat("ipv4", element_ipv4_get),
|
|
OutputFormat("ipv6", element_ipv6_get),
|
|
OutputFormat("udp", element_dst_port_get),
|
|
OutputFormat("tcp", element_dst_port_get),
|
|
OutputFormat("tunnel", element_tunnel_get),
|
|
]
|
|
##
|
|
|
|
|
|
ELEMENT_KEY = {
|
|
"udp": "udp.dst",
|
|
"tcp": "tcp.dst"
|
|
}
|
|
|
|
|
|
def top_input_get(args):
|
|
""" Return subprocess stdout."""
|
|
cmd = []
|
|
if (args.host):
|
|
cmd += ["ssh", args.host]
|
|
cmd += ["ovs-dpctl", "dump-flows"]
|
|
|
|
return subprocess.Popen(cmd, stderr=subprocess.STDOUT,
|
|
stdout=subprocess.PIPE).stdout
|
|
|
|
|
|
def args_get():
|
|
""" read program parameters handle any necessary validation of input. """
|
|
|
|
parser = argparse.ArgumentParser(
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
description=__doc__)
|
|
##
|
|
# None is a special value indicating to read flows from stdin.
|
|
# This handles the case
|
|
# ovs-dpctl dump-flows | ovs-dpctl-flows.py
|
|
parser.add_argument("-v", "--version", version="@VERSION@",
|
|
action="version", help="show version")
|
|
parser.add_argument("-f", "--flow-file", dest="flowFiles", default=None,
|
|
action="append",
|
|
help="file containing flows from ovs-dpctl dump-flow")
|
|
parser.add_argument("-V", "--verbose", dest="verbose",
|
|
default=logging.CRITICAL,
|
|
action="store_const", const=logging.DEBUG,
|
|
help="enable debug level verbosity")
|
|
parser.add_argument("-s", "--script", dest="top", action="store_false",
|
|
help="Run from a script (no user interface)")
|
|
parser.add_argument("--host", dest="host",
|
|
help="Specify a user@host for retrieving flows see"
|
|
"Accessing Remote Hosts for more information")
|
|
|
|
parser.add_argument("-a", "--accumulate", dest="accumulate",
|
|
action="store_true", default=False,
|
|
help="Accumulate dump-flow content")
|
|
parser.add_argument("--accumulate-decay", dest="accumulateDecay",
|
|
default=5.0 * 60, type=float,
|
|
help="Decay old accumulated flows. "
|
|
"The default is 5 minutes. "
|
|
"A value of 0 disables decay.")
|
|
parser.add_argument("-d", "--delay", dest="delay", type=int,
|
|
default=1000,
|
|
help="Delay in milliseconds to collect dump-flow "
|
|
"content (sample rate).")
|
|
|
|
args = parser.parse_args()
|
|
|
|
logging.basicConfig(level=args.verbose)
|
|
|
|
return args
|
|
|
|
###
|
|
# Code to parse a single line in dump-flow
|
|
###
|
|
# key(values)
|
|
FIELDS_CMPND = re.compile("([\w]+)\((.+)\)")
|
|
# key:value
|
|
FIELDS_CMPND_ELEMENT = re.compile("([\w:]+)=([/\.\w:]+)")
|
|
FIELDS_ELEMENT = re.compile("([\w]+):([-\.\w]+)")
|
|
|
|
|
|
def flow_line_iter(line):
|
|
""" iterate over flow dump elements.
|
|
return tuples of (true, element) or (false, remaining element)
|
|
"""
|
|
# splits by , except for when in a (). Actions element was not
|
|
# split properly but we don't need it.
|
|
rc = []
|
|
|
|
element = ""
|
|
paren_count = 0
|
|
|
|
for ch in line:
|
|
if (ch == '('):
|
|
paren_count += 1
|
|
elif (ch == ')'):
|
|
paren_count -= 1
|
|
|
|
if (ch == ' '):
|
|
# ignore white space.
|
|
continue
|
|
elif ((ch == ',') and (paren_count == 0)):
|
|
rc.append(element)
|
|
element = ""
|
|
else:
|
|
element += ch
|
|
|
|
if (paren_count):
|
|
raise ValueError(line)
|
|
else:
|
|
if (len(element) > 0):
|
|
rc.append(element)
|
|
return rc
|
|
|
|
|
|
def flow_line_compound_parse(compound):
|
|
""" Parse compound element
|
|
for example
|
|
src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03
|
|
which is in
|
|
eth(src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03)
|
|
"""
|
|
result = {}
|
|
for element in flow_line_iter(compound):
|
|
match = FIELDS_CMPND_ELEMENT.search(element)
|
|
if (match):
|
|
key = match.group(1)
|
|
value = match.group(2)
|
|
result[key] = value
|
|
|
|
match = FIELDS_CMPND.search(element)
|
|
if (match):
|
|
key = match.group(1)
|
|
value = match.group(2)
|
|
result[key] = flow_line_compound_parse(value)
|
|
continue
|
|
|
|
if (len(result.keys()) == 0):
|
|
return compound
|
|
return result
|
|
|
|
|
|
def flow_line_split(line):
|
|
""" Convert a flow dump line into ([fields], [stats], actions) tuple.
|
|
Where fields and stats are lists.
|
|
This function relies on a the following ovs-dpctl dump-flow
|
|
output characteristics:
|
|
1. The dumpe flow line consists of a list of frame fields, list of stats
|
|
and action.
|
|
2. list of frame fields, each stat and action field are delimited by ', '.
|
|
3. That all other non stat field are not delimited by ', '.
|
|
|
|
"""
|
|
|
|
results = re.split(', ', line)
|
|
|
|
(field, stats, action) = (results[0], results[1:-1], results[-1])
|
|
|
|
fields = flow_line_iter(field)
|
|
return (fields, stats, action)
|
|
|
|
|
|
def elements_to_dict(elements):
|
|
""" Convert line to a hierarchy of dictionaries. """
|
|
result = {}
|
|
for element in elements:
|
|
match = FIELDS_CMPND.search(element)
|
|
if (match):
|
|
key = match.group(1)
|
|
value = match.group(2)
|
|
result[key] = flow_line_compound_parse(value)
|
|
continue
|
|
|
|
match = FIELDS_ELEMENT.search(element)
|
|
if (match):
|
|
key = match.group(1)
|
|
value = match.group(2)
|
|
result[key] = value
|
|
else:
|
|
raise ValueError("can't parse >%s<" % element)
|
|
return result
|
|
|
|
|
|
# pylint: disable-msg=R0903
|
|
class SumData(object):
|
|
""" Interface that all data going into SumDb must implement.
|
|
Holds the flow field and its corresponding count, total packets,
|
|
total bytes and calculates average.
|
|
|
|
__repr__ is used as key into SumData singleton.
|
|
__str__ is used as human readable output.
|
|
"""
|
|
|
|
def __init__(self, field_type, field, packets, flow_bytes, key):
|
|
# Count is the number of lines in the dump-flow log.
|
|
self.field_type = field_type
|
|
self.field = field
|
|
self.count = 1
|
|
self.packets = int(packets)
|
|
self.bytes = int(flow_bytes)
|
|
self.key = key
|
|
|
|
def decrement(self, decr_packets, decr_bytes, decr_count):
|
|
""" Decrement content to calculate delta from previous flow sample."""
|
|
self.packets -= decr_packets
|
|
self.bytes -= decr_bytes
|
|
self.count -= decr_count
|
|
|
|
def __iadd__(self, other):
|
|
""" Add two objects. """
|
|
|
|
if (self.key != other.key):
|
|
raise ValueError("adding two unrelated types")
|
|
|
|
self.count += other.count
|
|
self.packets += other.packets
|
|
self.bytes += other.bytes
|
|
return self
|
|
|
|
def __isub__(self, other):
|
|
""" Decrement two objects. """
|
|
|
|
if (self.key != other.key):
|
|
raise ValueError("adding two unrelated types")
|
|
|
|
self.count -= other.count
|
|
self.packets -= other.packets
|
|
self.bytes -= other.bytes
|
|
return self
|
|
|
|
def __getattr__(self, name):
|
|
""" Handle average. """
|
|
if (name == "average"):
|
|
if (self.packets == 0):
|
|
return float(0.0)
|
|
else:
|
|
return float(self.bytes) / float(self.packets)
|
|
raise AttributeError(name)
|
|
|
|
def __str__(self):
|
|
""" Used for debugging. """
|
|
return "%s %s %s %s" % (self.field, self.count,
|
|
self.packets, self.bytes)
|
|
|
|
def __repr__(self):
|
|
""" Used as key in the FlowDB table. """
|
|
return self.key
|
|
|
|
|
|
def flow_aggregate(fields_dict, stats_dict):
|
|
""" Search for content in a line.
|
|
Passed the flow port of the dump-flows plus the current stats consisting
|
|
of packets, bytes, etc
|
|
"""
|
|
result = []
|
|
|
|
for output_format in OUTPUT_FORMAT:
|
|
field = fields_dict.get(output_format.field_type, None)
|
|
if (field):
|
|
obj = output_format.generator(output_format.field_type,
|
|
field, stats_dict)
|
|
result.append(obj)
|
|
|
|
return result
|
|
|
|
|
|
def flows_read(ihdl, flow_db):
|
|
""" read flow content from ihdl and insert into flow_db. """
|
|
|
|
done = False
|
|
while (not done):
|
|
line = ihdl.readline()
|
|
if (len(line) == 0):
|
|
# end of input
|
|
break
|
|
|
|
try:
|
|
flow_db.flow_line_add(line)
|
|
except ValueError, arg:
|
|
logging.error(arg)
|
|
|
|
return flow_db
|
|
|
|
|
|
def get_terminal_size():
|
|
"""
|
|
return column width and height of the terminal
|
|
"""
|
|
for fd_io in [0, 1, 2]:
|
|
try:
|
|
result = struct.unpack('hh',
|
|
fcntl.ioctl(fd_io, termios.TIOCGWINSZ,
|
|
'1234'))
|
|
except IOError:
|
|
result = None
|
|
continue
|
|
|
|
if (result is None or result == (0, 0)):
|
|
# Maybe we can't get the width. In that case assume (25, 80)
|
|
result = (25, 80)
|
|
|
|
return result
|
|
|
|
##
|
|
# Content derived from:
|
|
# http://getpython3.com/diveintopython3/your-first-python-program.html#divingin
|
|
##
|
|
SUFFIXES = {1000: ['KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB'],
|
|
1024: ['KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB']}
|
|
|
|
|
|
def approximate_size(size, a_kilobyte_is_1024_bytes=True):
|
|
"""Convert a file size to human-readable form.
|
|
|
|
Keyword arguments:
|
|
size -- file size in bytes
|
|
a_kilobyte_is_1024_bytes -- if True (default), use multiples of 1024
|
|
if False, use multiples of 1000
|
|
|
|
Returns: string
|
|
|
|
"""
|
|
size = float(size)
|
|
if size < 0:
|
|
raise ValueError('number must be non-negative')
|
|
|
|
if (a_kilobyte_is_1024_bytes):
|
|
multiple = 1024
|
|
else:
|
|
multiple = 1000
|
|
for suffix in SUFFIXES[multiple]:
|
|
size /= multiple
|
|
if size < multiple:
|
|
return "%.1f %s" % (size, suffix)
|
|
|
|
raise ValueError('number too large')
|
|
|
|
|
|
##
|
|
# End copied content
|
|
##
|
|
class ColMeta:
|
|
""" Concepts about columns. """
|
|
def __init__(self, sortable, width):
|
|
self.sortable = sortable
|
|
self.width = width
|
|
|
|
|
|
class RowMeta:
|
|
""" How to render rows. """
|
|
def __init__(self, label, fmt):
|
|
self.label = label
|
|
self.fmt = fmt
|
|
|
|
|
|
def fmt_packet(obj, width):
|
|
""" Provide a string for packets that is appropriate for output."""
|
|
return str(obj.packets).rjust(width)
|
|
|
|
|
|
def fmt_count(obj, width):
|
|
""" Provide a string for average that is appropriate for output."""
|
|
return str(obj.count).rjust(width)
|
|
|
|
|
|
def fmt_avg(obj, width):
|
|
""" Provide a string for average that is appropriate for output."""
|
|
return str(int(obj.average)).rjust(width)
|
|
|
|
|
|
def fmt_field(obj, width):
|
|
""" truncate really long flow and insert ellipses to help make it
|
|
clear.
|
|
"""
|
|
|
|
ellipses = " ... "
|
|
value = obj.field
|
|
if (len(obj.field) > width):
|
|
value = value[:(width - len(ellipses))] + ellipses
|
|
return value.ljust(width)
|
|
|
|
|
|
def fmt_bytes(obj, width):
|
|
""" Provide a string for average that is appropriate for output."""
|
|
if (len(str(obj.bytes)) <= width):
|
|
value = str(obj.bytes)
|
|
else:
|
|
value = approximate_size(obj.bytes)
|
|
return value.rjust(width)
|
|
|
|
|
|
def title_center(value, width):
|
|
""" Center a column title."""
|
|
return value.upper().center(width)
|
|
|
|
|
|
def title_rjust(value, width):
|
|
""" Right justify a column title. """
|
|
return value.upper().rjust(width)
|
|
|
|
|
|
def column_picker(order, obj):
|
|
""" return the column as specified by order. """
|
|
if (order == 1):
|
|
return obj.count
|
|
elif (order == 2):
|
|
return obj.packets
|
|
elif (order == 3):
|
|
return obj.bytes
|
|
elif (order == 4):
|
|
return obj.average
|
|
else:
|
|
raise ValueError("order outside of range %s" % order)
|
|
|
|
|
|
class Render:
|
|
""" Renders flow data.
|
|
|
|
The two FIELD_SELECT variables should be set to the actual field minus
|
|
1. During construction, an internal method increments and initializes
|
|
this object.
|
|
"""
|
|
FLOW_FIELDS = [_field.field_type for _field in OUTPUT_FORMAT] + ["all"]
|
|
|
|
FIELD_SELECT_SCRIPT = 7
|
|
FIELD_SELECT_TOP = -1
|
|
|
|
def __init__(self, console_width, field_select):
|
|
""" Calculate column widths taking into account changes in format."""
|
|
|
|
self._start_time = datetime.datetime.now()
|
|
|
|
self._cols = [ColMeta(False, 0),
|
|
ColMeta(True, Columns.VALUE_WIDTH),
|
|
ColMeta(True, Columns.VALUE_WIDTH),
|
|
ColMeta(True, Columns.VALUE_WIDTH),
|
|
ColMeta(True, Columns.VALUE_WIDTH)]
|
|
self._console_width = console_width
|
|
self.console_width_set(console_width)
|
|
|
|
# Order in this array dictate the order of the columns.
|
|
# The 0 width for the first entry is a place holder. This is
|
|
# dynamically calculated. The first column is special. We need a
|
|
# way to indicate which field are presented.
|
|
self._descs = [RowMeta("", title_rjust),
|
|
RowMeta("", title_rjust),
|
|
RowMeta("", title_rjust),
|
|
RowMeta("", title_rjust),
|
|
RowMeta("", title_rjust)]
|
|
self._column_sort_select = 0
|
|
self.column_select_event()
|
|
|
|
self._titles = [
|
|
RowMeta(Columns.FIELDS, title_center),
|
|
RowMeta(Columns.COUNT, title_rjust),
|
|
RowMeta(Columns.PACKETS, title_rjust),
|
|
RowMeta(Columns.BYTES, title_rjust),
|
|
RowMeta(Columns.AVERAGE, title_rjust)
|
|
]
|
|
|
|
self._datas = [
|
|
RowMeta(None, fmt_field),
|
|
RowMeta(None, fmt_count),
|
|
RowMeta(None, fmt_packet),
|
|
RowMeta(None, fmt_bytes),
|
|
RowMeta(None, fmt_avg)
|
|
]
|
|
|
|
##
|
|
# _field_types hold which fields are displayed in the field
|
|
# column, with the keyword all implying all fields.
|
|
##
|
|
self._field_types = Render.FLOW_FIELDS
|
|
|
|
##
|
|
# The default is to show all field types.
|
|
##
|
|
self._field_type_select = field_select
|
|
self.field_type_toggle()
|
|
|
|
def _field_type_select_get(self):
|
|
""" Return which field type to display. """
|
|
return self._field_types[self._field_type_select]
|
|
|
|
def field_type_toggle(self):
|
|
""" toggle which field types to show. """
|
|
self._field_type_select += 1
|
|
if (self._field_type_select >= len(self._field_types)):
|
|
self._field_type_select = 0
|
|
value = Columns.FIELDS + " (%s)" % self._field_type_select_get()
|
|
self._titles[0].label = value
|
|
|
|
def column_select_event(self):
|
|
""" Handles column select toggle. """
|
|
|
|
self._descs[self._column_sort_select].label = ""
|
|
for _ in range(len(self._cols)):
|
|
self._column_sort_select += 1
|
|
if (self._column_sort_select >= len(self._cols)):
|
|
self._column_sort_select = 0
|
|
|
|
# Now look for the next sortable column
|
|
if (self._cols[self._column_sort_select].sortable):
|
|
break
|
|
self._descs[self._column_sort_select].label = "DESC"
|
|
|
|
def console_width_set(self, console_width):
|
|
""" Adjust the output given the new console_width. """
|
|
self._console_width = console_width
|
|
|
|
spaces = len(self._cols) - 1
|
|
##
|
|
# Calculating column width can be tedious but important. The
|
|
# flow field value can be long. The goal here is to dedicate
|
|
# fixed column space for packets, bytes, average and counts. Give the
|
|
# remaining space to the flow column. When numbers get large
|
|
# transition output to output generated by approximate_size which
|
|
# limits output to ###.# XiB in other words 9 characters.
|
|
##
|
|
# At this point, we know the maximum length values. We may
|
|
# truncate the flow column to get everything to fit.
|
|
self._cols[0].width = 0
|
|
values_max_length = sum([ii.width for ii in self._cols]) + spaces
|
|
flow_max_length = console_width - values_max_length
|
|
self._cols[0].width = flow_max_length
|
|
|
|
def format(self, flow_db):
|
|
""" shows flows based on --script parameter."""
|
|
|
|
rc = []
|
|
##
|
|
# Top output consists of
|
|
# Title
|
|
# Column title (2 rows)
|
|
# data
|
|
# statistics and status
|
|
|
|
##
|
|
# Title
|
|
##
|
|
rc.append("Flow Summary".center(self._console_width))
|
|
|
|
stats = " Total: %(flow_total)s errors: %(flow_errors)s " % \
|
|
flow_db.flow_stats_get()
|
|
accumulate = flow_db.accumulate_get()
|
|
if (accumulate):
|
|
stats += "Accumulate: on "
|
|
else:
|
|
stats += "Accumulate: off "
|
|
|
|
duration = datetime.datetime.now() - self._start_time
|
|
stats += "Duration: %s " % str(duration)
|
|
rc.append(stats.ljust(self._console_width))
|
|
|
|
##
|
|
# 2 rows for columns.
|
|
##
|
|
# Indicate which column is in descending order.
|
|
rc.append(" ".join([ii.fmt(ii.label, col.width)
|
|
for (ii, col) in zip(self._descs, self._cols)]))
|
|
|
|
rc.append(" ".join([ii.fmt(ii.label, col.width)
|
|
for (ii, col) in zip(self._titles, self._cols)]))
|
|
|
|
##
|
|
# Data.
|
|
##
|
|
for dd in flow_db.field_values_in_order(self._field_type_select_get(),
|
|
self._column_sort_select):
|
|
rc.append(" ".join([ii.fmt(dd, col.width)
|
|
for (ii, col) in zip(self._datas,
|
|
self._cols)]))
|
|
|
|
return rc
|
|
|
|
|
|
def curses_screen_begin():
|
|
""" begin curses screen control. """
|
|
stdscr = curses.initscr()
|
|
curses.cbreak()
|
|
curses.noecho()
|
|
stdscr.keypad(1)
|
|
return stdscr
|
|
|
|
|
|
def curses_screen_end(stdscr):
|
|
""" end curses screen control. """
|
|
curses.nocbreak()
|
|
stdscr.keypad(0)
|
|
curses.echo()
|
|
curses.endwin()
|
|
|
|
|
|
class FlowDB:
|
|
""" Implements live vs accumulate mode.
|
|
|
|
Flows are stored as key value pairs. The key consists of the content
|
|
prior to stat fields. The value portion consists of stats in a dictionary
|
|
form.
|
|
|
|
@ \todo future add filtering here.
|
|
"""
|
|
def __init__(self, accumulate):
|
|
self._accumulate = accumulate
|
|
self._error_count = 0
|
|
# Values are (stats, last update time.)
|
|
# The last update time is used for aging.
|
|
self._flow_lock = threading.Lock()
|
|
# This dictionary holds individual flows.
|
|
self._flows = {}
|
|
# This dictionary holds aggregate of flow fields.
|
|
self._fields = {}
|
|
|
|
def accumulate_get(self):
|
|
""" Return the current accumulate state. """
|
|
return self._accumulate
|
|
|
|
def accumulate_toggle(self):
|
|
""" toggle accumulate flow behavior. """
|
|
self._accumulate = not self._accumulate
|
|
|
|
def begin(self):
|
|
""" Indicate the beginning of processing flow content.
|
|
if accumulate is false clear current set of flows. """
|
|
|
|
if (not self._accumulate):
|
|
self._flow_lock.acquire()
|
|
try:
|
|
self._flows.clear()
|
|
finally:
|
|
self._flow_lock.release()
|
|
self._fields.clear()
|
|
|
|
def flow_line_add(self, line):
|
|
""" Split a line from a ovs-dpctl dump-flow into key and stats.
|
|
The order of the content in the flow should be:
|
|
- flow content
|
|
- stats for the flow
|
|
- actions
|
|
|
|
This method also assumes that the dump flow output does not
|
|
change order of fields of the same flow.
|
|
"""
|
|
|
|
line = line.rstrip("\n")
|
|
(fields, stats, _) = flow_line_split(line)
|
|
|
|
try:
|
|
fields_dict = elements_to_dict(fields)
|
|
|
|
if (len(fields_dict) == 0):
|
|
raise ValueError("flow fields are missing %s", line)
|
|
|
|
stats_dict = elements_to_dict(stats)
|
|
if (len(stats_dict) == 0):
|
|
raise ValueError("statistics are missing %s.", line)
|
|
|
|
##
|
|
# In accumulate mode, the Flow database can reach 10,000's of
|
|
# persistent flows. The interaction of the script with this many
|
|
# flows is too slow. Instead, delta are sent to the flow_db
|
|
# database allow incremental changes to be done in O(m) time
|
|
# where m is the current flow list, instead of iterating over
|
|
# all flows in O(n) time where n is the entire history of flows.
|
|
key = ",".join(fields)
|
|
|
|
self._flow_lock.acquire()
|
|
try:
|
|
(stats_old_dict, _) = self._flows.get(key, (None, None))
|
|
finally:
|
|
self._flow_lock.release()
|
|
|
|
self.flow_event(fields_dict, stats_old_dict, stats_dict)
|
|
|
|
except ValueError, arg:
|
|
logging.error(arg)
|
|
self._error_count += 1
|
|
raise
|
|
|
|
self._flow_lock.acquire()
|
|
try:
|
|
self._flows[key] = (stats_dict, datetime.datetime.now())
|
|
finally:
|
|
self._flow_lock.release()
|
|
|
|
def decay(self, decayTimeInSeconds):
|
|
""" Decay content. """
|
|
now = datetime.datetime.now()
|
|
for (key, value) in self._flows.items():
|
|
(stats_dict, updateTime) = value
|
|
delta = now - updateTime
|
|
|
|
if (delta.seconds > decayTimeInSeconds):
|
|
self._flow_lock.acquire()
|
|
try:
|
|
del self._flows[key]
|
|
|
|
fields_dict = elements_to_dict(flow_line_iter(key))
|
|
matches = flow_aggregate(fields_dict, stats_dict)
|
|
for match in matches:
|
|
self.field_dec(match)
|
|
|
|
finally:
|
|
self._flow_lock.release()
|
|
|
|
def flow_stats_get(self):
|
|
""" Return statistics in a form of a dictionary. """
|
|
rc = None
|
|
self._flow_lock.acquire()
|
|
try:
|
|
rc = {"flow_total": len(self._flows),
|
|
"flow_errors": self._error_count}
|
|
finally:
|
|
self._flow_lock.release()
|
|
return rc
|
|
|
|
def field_types_get(self):
|
|
""" Return the set of types stored in the singleton. """
|
|
types = set((ii.field_type for ii in self._fields.values()))
|
|
return types
|
|
|
|
def field_add(self, data):
|
|
""" Collect dump-flow data to sum number of times item appears. """
|
|
current = self._fields.get(repr(data), None)
|
|
if (current is None):
|
|
current = copy.copy(data)
|
|
else:
|
|
current += data
|
|
self._fields[repr(current)] = current
|
|
|
|
def field_dec(self, data):
|
|
""" Collect dump-flow data to sum number of times item appears. """
|
|
current = self._fields.get(repr(data), None)
|
|
if (current is None):
|
|
raise ValueError("decrementing field missing %s" % repr(data))
|
|
|
|
current -= data
|
|
self._fields[repr(current)] = current
|
|
if (current.count == 0):
|
|
del self._fields[repr(current)]
|
|
|
|
def field_values_in_order(self, field_type_select, column_order):
|
|
""" Return a list of items in order maximum first. """
|
|
values = self._fields.values()
|
|
if (field_type_select != "all"):
|
|
# If a field type other than "all" then reduce the list.
|
|
values = [ii for ii in values
|
|
if (ii.field_type == field_type_select)]
|
|
values = [(column_picker(column_order, ii), ii) for ii in values]
|
|
values.sort(key=operator.itemgetter(0))
|
|
values.reverse()
|
|
values = [ii[1] for ii in values]
|
|
return values
|
|
|
|
def flow_event(self, fields_dict, stats_old_dict, stats_new_dict):
|
|
""" Receives new flow information. """
|
|
|
|
# In order to avoid processing every flow at every sample
|
|
# period, changes in flow packet count is used to determine the
|
|
# delta in the flow statistics. This delta is used in the call
|
|
# to self.decrement prior to self.field_add
|
|
|
|
if (stats_old_dict is None):
|
|
# This is a new flow
|
|
matches = flow_aggregate(fields_dict, stats_new_dict)
|
|
for match in matches:
|
|
self.field_add(match)
|
|
else:
|
|
old_packets = int(stats_old_dict.get("packets", 0))
|
|
new_packets = int(stats_new_dict.get("packets", 0))
|
|
if (old_packets == new_packets):
|
|
# ignore. same data.
|
|
pass
|
|
else:
|
|
old_bytes = stats_old_dict.get("bytes", 0)
|
|
# old_packets != new_packets
|
|
# if old_packets > new_packets then we end up decrementing
|
|
# packets and bytes.
|
|
matches = flow_aggregate(fields_dict, stats_new_dict)
|
|
for match in matches:
|
|
match.decrement(int(old_packets), int(old_bytes), 1)
|
|
self.field_add(match)
|
|
|
|
|
|
class DecayThread(threading.Thread):
|
|
""" Periodically call flow database to see if any flows are old. """
|
|
def __init__(self, flow_db, interval):
|
|
""" Start decay thread. """
|
|
threading.Thread.__init__(self)
|
|
|
|
self._interval = max(1, interval)
|
|
self._min_interval = min(1, interval / 10)
|
|
self._flow_db = flow_db
|
|
self._event = threading.Event()
|
|
self._running = True
|
|
|
|
self.daemon = True
|
|
|
|
def run(self):
|
|
""" Worker thread which handles decaying accumulated flows. """
|
|
|
|
while(self._running):
|
|
self._event.wait(self._min_interval)
|
|
if (self._running):
|
|
self._flow_db.decay(self._interval)
|
|
|
|
def stop(self):
|
|
""" Stop thread. """
|
|
self._running = False
|
|
self._event.set()
|
|
##
|
|
# Give the calling thread time to terminate but not too long.
|
|
# this thread is a daemon so the application will terminate if
|
|
# we timeout during the join. This is just a cleaner way to
|
|
# release resources.
|
|
self.join(2.0)
|
|
|
|
|
|
def flow_top_command(stdscr, render, flow_db):
|
|
""" Handle input while in top mode. """
|
|
ch = stdscr.getch()
|
|
##
|
|
# Any character will restart sampling.
|
|
if (ch == ord('h')):
|
|
# halt output.
|
|
ch = stdscr.getch()
|
|
while (ch == -1):
|
|
ch = stdscr.getch()
|
|
|
|
if (ch == ord('s')):
|
|
# toggle which column sorts data in descending order.
|
|
render.column_select_event()
|
|
elif (ch == ord('a')):
|
|
flow_db.accumulate_toggle()
|
|
elif (ch == ord('f')):
|
|
render.field_type_toggle()
|
|
elif (ch == ord(' ')):
|
|
# resample
|
|
pass
|
|
|
|
return ch
|
|
|
|
|
|
def decay_timer_start(flow_db, accumulateDecay):
|
|
""" If accumulateDecay greater than zero then start timer. """
|
|
if (accumulateDecay > 0):
|
|
decay_timer = DecayThread(flow_db, accumulateDecay)
|
|
decay_timer.start()
|
|
return decay_timer
|
|
else:
|
|
return None
|
|
|
|
|
|
def flows_top(args):
|
|
""" handles top like behavior when --script is not specified. """
|
|
|
|
flow_db = FlowDB(args.accumulate)
|
|
render = Render(0, Render.FIELD_SELECT_TOP)
|
|
|
|
decay_timer = decay_timer_start(flow_db, args.accumulateDecay)
|
|
lines = []
|
|
|
|
try:
|
|
stdscr = curses_screen_begin()
|
|
try:
|
|
ch = 'X'
|
|
#stdscr.nodelay(1)
|
|
stdscr.timeout(args.delay)
|
|
|
|
while (ch != ord('q')):
|
|
flow_db.begin()
|
|
|
|
try:
|
|
ihdl = top_input_get(args)
|
|
try:
|
|
flows_read(ihdl, flow_db)
|
|
finally:
|
|
ihdl.close()
|
|
except OSError, arg:
|
|
logging.critical(arg)
|
|
break
|
|
|
|
(console_height, console_width) = stdscr.getmaxyx()
|
|
render.console_width_set(console_width)
|
|
|
|
output_height = console_height - 1
|
|
line_count = range(output_height)
|
|
line_output = render.format(flow_db)
|
|
lines = zip(line_count, line_output[:output_height])
|
|
|
|
stdscr.erase()
|
|
for (count, line) in lines:
|
|
stdscr.addstr(count, 0, line[:console_width])
|
|
stdscr.refresh()
|
|
|
|
ch = flow_top_command(stdscr, render, flow_db)
|
|
|
|
finally:
|
|
curses_screen_end(stdscr)
|
|
except KeyboardInterrupt:
|
|
pass
|
|
if (decay_timer):
|
|
decay_timer.stop()
|
|
|
|
# repeat output
|
|
for (count, line) in lines:
|
|
print line
|
|
|
|
|
|
def flows_script(args):
|
|
""" handles --script option. """
|
|
|
|
flow_db = FlowDB(args.accumulate)
|
|
flow_db.begin()
|
|
|
|
if (args.flowFiles is None):
|
|
logging.info("reading flows from stdin")
|
|
ihdl = os.fdopen(sys.stdin.fileno(), 'r', 0)
|
|
try:
|
|
flow_db = flows_read(ihdl, flow_db)
|
|
finally:
|
|
ihdl.close()
|
|
else:
|
|
for flowFile in args.flowFiles:
|
|
logging.info("reading flows from %s", flowFile)
|
|
ihdl = open(flowFile, "r")
|
|
try:
|
|
flow_db = flows_read(ihdl, flow_db)
|
|
finally:
|
|
ihdl.close()
|
|
|
|
(_, console_width) = get_terminal_size()
|
|
render = Render(console_width, Render.FIELD_SELECT_SCRIPT)
|
|
|
|
for line in render.format(flow_db):
|
|
print line
|
|
|
|
|
|
def main():
|
|
""" Return 0 on success or 1 on failure.
|
|
|
|
Algorithm
|
|
There are four stages to the process ovs-dpctl dump-flow content.
|
|
1. Retrieve current input
|
|
2. store in FlowDB and maintain history
|
|
3. Iterate over FlowDB and aggregating stats for each flow field
|
|
4. present data.
|
|
|
|
Retrieving current input is currently trivial, the ovs-dpctl dump-flow
|
|
is called. Future version will have more elaborate means for collecting
|
|
dump-flow content. FlowDB returns all data as in the form of a hierarchical
|
|
dictionary. Input will vary.
|
|
|
|
In the case of accumulate mode, flows are not purged from the FlowDB
|
|
manager. Instead at the very least, merely the latest statistics are
|
|
kept. In the case, of live output the FlowDB is purged prior to sampling
|
|
data.
|
|
|
|
Aggregating results requires identify flow fields to aggregate out
|
|
of the flow and summing stats.
|
|
|
|
"""
|
|
args = args_get()
|
|
|
|
try:
|
|
if (args.top):
|
|
flows_top(args)
|
|
else:
|
|
flows_script(args)
|
|
except KeyboardInterrupt:
|
|
return 1
|
|
return 0
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(main())
|
|
elif __name__ == 'ovs-dpctl-top':
|
|
# pylint: disable-msg=R0915
|
|
|
|
##
|
|
# Test case beyond this point.
|
|
# pylint: disable-msg=R0904
|
|
class TestsuiteFlowParse(unittest.TestCase):
|
|
"""
|
|
parse flow into hierarchy of dictionaries.
|
|
"""
|
|
def test_flow_parse(self):
|
|
""" test_flow_parse. """
|
|
line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
|
|
"dst=33:33:00:01:00:03),eth_type(0x86dd),"\
|
|
"ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
|
|
"label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
|
|
"udp(src=61252,dst=5355), packets:1, bytes:92, "\
|
|
"used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
|
|
"38,41,44,47,50,53,56,59,62,65"
|
|
|
|
(fields, stats, _) = flow_line_split(line)
|
|
flow_dict = elements_to_dict(fields + stats)
|
|
self.assertEqual(flow_dict["eth"]["src"], "00:50:56:b4:4e:f8")
|
|
self.assertEqual(flow_dict["eth"]["dst"], "33:33:00:01:00:03")
|
|
self.assertEqual(flow_dict["ipv6"]["src"],
|
|
"fe80::55bf:fe42:bc96:2812")
|
|
self.assertEqual(flow_dict["ipv6"]["dst"], "ff02::1:3")
|
|
self.assertEqual(flow_dict["packets"], "1")
|
|
self.assertEqual(flow_dict["bytes"], "92")
|
|
|
|
line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
|
|
"dst=33:33:00:01:00:03),eth_type(0x86dd),"\
|
|
"ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
|
|
"label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
|
|
"udp(src=61252,dst=5355), packets:1, bytes:92, "\
|
|
"used:-0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
|
|
"38,41,44,47,50,53,56,59,62,65"
|
|
|
|
(fields, stats, _) = flow_line_split(line)
|
|
flow_dict = elements_to_dict(fields + stats)
|
|
self.assertEqual(flow_dict["used"], "-0.703s")
|
|
self.assertEqual(flow_dict["packets"], "1")
|
|
self.assertEqual(flow_dict["bytes"], "92")
|
|
|
|
def test_flow_sum(self):
|
|
""" test_flow_sum. """
|
|
line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
|
|
"dst=33:33:00:01:00:03),eth_type(0x86dd),"\
|
|
"ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
|
|
"label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
|
|
"udp(src=61252,dst=5355), packets:2, bytes:92, "\
|
|
"used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
|
|
"38,41,44,47,50,53,56,59,62,65"
|
|
|
|
(fields, stats, _) = flow_line_split(line)
|
|
stats_dict = elements_to_dict(stats)
|
|
fields_dict = elements_to_dict(fields)
|
|
##
|
|
# Test simple case of one line.
|
|
flow_db = FlowDB(False)
|
|
matches = flow_aggregate(fields_dict, stats_dict)
|
|
for match in matches:
|
|
flow_db.field_add(match)
|
|
|
|
flow_types = flow_db.field_types_get()
|
|
expected_flow_types = ["eth", "eth_type", "udp", "in_port", "ipv6"]
|
|
self.assert_(len(flow_types) == len(expected_flow_types))
|
|
for flow_type in flow_types:
|
|
self.assertTrue(flow_type in expected_flow_types)
|
|
|
|
for flow_type in flow_types:
|
|
sum_value = flow_db.field_values_in_order("all", 1)
|
|
self.assert_(len(sum_value) == 5)
|
|
self.assert_(sum_value[0].packets == 2)
|
|
self.assert_(sum_value[0].count == 1)
|
|
self.assert_(sum_value[0].bytes == 92)
|
|
|
|
##
|
|
# Add line again just to see counts go up.
|
|
matches = flow_aggregate(fields_dict, stats_dict)
|
|
for match in matches:
|
|
flow_db.field_add(match)
|
|
|
|
flow_types = flow_db.field_types_get()
|
|
self.assert_(len(flow_types) == len(expected_flow_types))
|
|
for flow_type in flow_types:
|
|
self.assertTrue(flow_type in expected_flow_types)
|
|
|
|
for flow_type in flow_types:
|
|
sum_value = flow_db.field_values_in_order("all", 1)
|
|
self.assert_(len(sum_value) == 5)
|
|
self.assert_(sum_value[0].packets == 4)
|
|
self.assert_(sum_value[0].count == 2)
|
|
self.assert_(sum_value[0].bytes == 2 * 92)
|
|
|
|
def test_assoc_list(self):
|
|
""" test_assoc_list. """
|
|
line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
|
|
"dst=33:33:00:01:00:03),eth_type(0x86dd),"\
|
|
"ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
|
|
"label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
|
|
"udp(src=61252,dst=5355), packets:2, bytes:92, "\
|
|
"used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
|
|
"38,41,44,47,50,53,56,59,62,65"
|
|
|
|
valid_flows = [
|
|
'eth_type(0x86dd)',
|
|
'udp(dst=5355)',
|
|
'in_port(4)',
|
|
'ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3)',
|
|
'eth(src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03)'
|
|
]
|
|
|
|
(fields, stats, _) = flow_line_split(line)
|
|
stats_dict = elements_to_dict(stats)
|
|
fields_dict = elements_to_dict(fields)
|
|
|
|
##
|
|
# Test simple case of one line.
|
|
flow_db = FlowDB(False)
|
|
matches = flow_aggregate(fields_dict, stats_dict)
|
|
for match in matches:
|
|
flow_db.field_add(match)
|
|
|
|
for sum_value in flow_db.field_values_in_order("all", 1):
|
|
assoc_list = Columns.assoc_list(sum_value)
|
|
for item in assoc_list:
|
|
if (item[0] == "fields"):
|
|
self.assertTrue(item[1] in valid_flows)
|
|
elif (item[0] == "packets"):
|
|
self.assertTrue(item[1] == 2)
|
|
elif (item[0] == "count"):
|
|
self.assertTrue(item[1] == 1)
|
|
elif (item[0] == "average"):
|
|
self.assertTrue(item[1] == 46.0)
|
|
elif (item[0] == "bytes"):
|
|
self.assertTrue(item[1] == 92)
|
|
else:
|
|
raise ValueError("unknown %s", item[0])
|
|
|
|
def test_human_format(self):
|
|
""" test_assoc_list. """
|
|
|
|
self.assertEqual(approximate_size(0.0), "0.0 KiB")
|
|
self.assertEqual(approximate_size(1024), "1.0 KiB")
|
|
self.assertEqual(approximate_size(1024 * 1024), "1.0 MiB")
|
|
self.assertEqual(approximate_size((1024 * 1024) + 100000),
|
|
"1.1 MiB")
|
|
value = (1024 * 1024 * 1024) + 100000000
|
|
self.assertEqual(approximate_size(value), "1.1 GiB")
|
|
|
|
def test_flow_line_split(self):
|
|
""" Splitting a flow line is not trivial.
|
|
There is no clear delimiter. Comma is used liberally."""
|
|
expected_fields = ["in_port(4)",
|
|
"eth(src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03)",
|
|
"eth_type(0x86dd)",
|
|
"ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"
|
|
"label=0,proto=17,tclass=0,hlimit=1,frag=no)",
|
|
"udp(src=61252,dst=5355)"]
|
|
expected_stats = ["packets:2", "bytes:92", "used:0.703s"]
|
|
expected_actions = "actions:3,8,11,14,17,20,23,26,29,32,35," \
|
|
"38,41,44,47,50,53,56,59,62,65"
|
|
|
|
line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
|
|
"dst=33:33:00:01:00:03),eth_type(0x86dd),"\
|
|
"ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
|
|
"label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
|
|
"udp(src=61252,dst=5355), packets:2, bytes:92, "\
|
|
"used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
|
|
"38,41,44,47,50,53,56,59,62,65"
|
|
|
|
(fields, stats, actions) = flow_line_split(line)
|
|
|
|
self.assertEqual(fields, expected_fields)
|
|
self.assertEqual(stats, expected_stats)
|
|
self.assertEqual(actions, expected_actions)
|
|
|
|
def test_accumulate_decay(self):
|
|
""" test_accumulate_decay: test accumulated decay. """
|
|
lines = ["in_port(1),eth(src=00:50:56:4f:dc:3b,"
|
|
"dst=ff:ff:ff:ff:ff:ff),"
|
|
"eth_type(0x0806),arp(sip=10.24.105.107/255.255.255.255,"
|
|
"tip=10.24.104.230/255.255.255.255,op=1/0xff,"
|
|
"sha=00:50:56:4f:dc:3b/00:00:00:00:00:00,"
|
|
"tha=00:00:00:00:00:00/00:00:00:00:00:00), "
|
|
"packets:1, bytes:120, used:0.004s, actions:1"]
|
|
|
|
flow_db = FlowDB(True)
|
|
flow_db.begin()
|
|
flow_db.flow_line_add(lines[0])
|
|
|
|
# Make sure we decay
|
|
time.sleep(4)
|
|
self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1)
|
|
flow_db.decay(1)
|
|
self.assertEqual(flow_db.flow_stats_get()["flow_total"], 0)
|
|
|
|
flow_db.flow_line_add(lines[0])
|
|
self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1)
|
|
flow_db.decay(30)
|
|
# Should not be deleted.
|
|
self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1)
|
|
|
|
flow_db.flow_line_add(lines[0])
|
|
self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1)
|
|
timer = decay_timer_start(flow_db, 2)
|
|
time.sleep(10)
|
|
self.assertEqual(flow_db.flow_stats_get()["flow_total"], 0)
|
|
timer.stop()
|
|
|
|
def test_accumulate(self):
|
|
""" test_accumulate test that FlowDB supports accumulate. """
|
|
|
|
lines = ["in_port(1),eth(src=00:50:56:4f:dc:3b,"
|
|
"dst=ff:ff:ff:ff:ff:ff),"
|
|
"eth_type(0x0806),arp(sip=10.24.105.107/255.255.255.255,"
|
|
"tip=10.24.104.230/255.255.255.255,op=1/0xff,"
|
|
"sha=00:50:56:4f:dc:3b/00:00:00:00:00:00,"
|
|
"tha=00:00:00:00:00:00/00:00:00:00:00:00), "
|
|
"packets:1, bytes:120, used:0.004s, actions:1",
|
|
"in_port(2),"
|
|
"eth(src=68:ef:bd:25:ef:c0,dst=33:33:00:00:00:66),"
|
|
"eth_type(0x86dd),ipv6(src=fe80::6aef:bdff:fe25:efc0/::,"
|
|
"dst=ff02::66/::,label=0/0,proto=17/0xff,tclass=0xe0/0,"
|
|
"hlimit=255/0,frag=no/0),udp(src=2029,dst=2029), "
|
|
"packets:2, bytes:5026, used:0.348s, actions:1",
|
|
"in_port(1),eth(src=ee:ee:ee:ee:ee:ee,"
|
|
"dst=ff:ff:ff:ff:ff:ff),"
|
|
"eth_type(0x0806),arp(sip=10.24.105.107/255.255.255.255,"
|
|
"tip=10.24.104.230/255.255.255.255,op=1/0xff,"
|
|
"sha=00:50:56:4f:dc:3b/00:00:00:00:00:00,"
|
|
"tha=00:00:00:00:00:00/00:00:00:00:00:00), packets:2, "
|
|
"bytes:240, used:0.004s, actions:1"]
|
|
|
|
lines = [
|
|
"in_port(1),eth_type(0x0806), packets:1, bytes:120, actions:1",
|
|
"in_port(2),eth_type(0x0806), packets:2, bytes:126, actions:1",
|
|
"in_port(1),eth_type(0x0806), packets:2, bytes:240, actions:1",
|
|
"in_port(1),eth_type(0x0800), packets:1, bytes:120, actions:1",
|
|
"in_port(1),eth_type(0x0800), packets:2, bytes:240, actions:1",
|
|
"in_port(1),eth_type(0x0806), packets:1, bytes:120, actions:1",
|
|
]
|
|
|
|
# Turn on accumulate.
|
|
flow_db = FlowDB(True)
|
|
flow_db.begin()
|
|
|
|
flow_db.flow_line_add(lines[0])
|
|
|
|
# Test one flow exist.
|
|
sum_values = flow_db.field_values_in_order("all", 1)
|
|
in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
|
|
self.assertEqual(len(in_ports), 1)
|
|
self.assertEqual(in_ports[0].packets, 1)
|
|
self.assertEqual(in_ports[0].bytes, 120)
|
|
self.assertEqual(in_ports[0].count, 1)
|
|
|
|
# simulate another sample
|
|
# Test two different flows exist.
|
|
flow_db.begin()
|
|
flow_db.flow_line_add(lines[1])
|
|
sum_values = flow_db.field_values_in_order("all", 1)
|
|
in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
|
|
self.assertEqual(len(in_ports), 1)
|
|
self.assertEqual(in_ports[0].packets, 1)
|
|
self.assertEqual(in_ports[0].bytes, 120)
|
|
self.assertEqual(in_ports[0].count, 1)
|
|
|
|
in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
|
|
self.assertEqual(len(in_ports), 1)
|
|
self.assertEqual(in_ports[0].packets, 2)
|
|
self.assertEqual(in_ports[0].bytes, 126)
|
|
self.assertEqual(in_ports[0].count, 1)
|
|
|
|
# Test first flow increments packets.
|
|
flow_db.begin()
|
|
flow_db.flow_line_add(lines[2])
|
|
sum_values = flow_db.field_values_in_order("all", 1)
|
|
in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
|
|
self.assertEqual(len(in_ports), 1)
|
|
self.assertEqual(in_ports[0].packets, 2)
|
|
self.assertEqual(in_ports[0].bytes, 240)
|
|
self.assertEqual(in_ports[0].count, 1)
|
|
|
|
in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
|
|
self.assertEqual(len(in_ports), 1)
|
|
self.assertEqual(in_ports[0].packets, 2)
|
|
self.assertEqual(in_ports[0].bytes, 126)
|
|
self.assertEqual(in_ports[0].count, 1)
|
|
|
|
# Test third flow but with the same in_port(1) as the first flow.
|
|
flow_db.begin()
|
|
flow_db.flow_line_add(lines[3])
|
|
sum_values = flow_db.field_values_in_order("all", 1)
|
|
in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
|
|
self.assertEqual(len(in_ports), 1)
|
|
self.assertEqual(in_ports[0].packets, 3)
|
|
self.assertEqual(in_ports[0].bytes, 360)
|
|
self.assertEqual(in_ports[0].count, 2)
|
|
|
|
in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
|
|
self.assertEqual(len(in_ports), 1)
|
|
self.assertEqual(in_ports[0].packets, 2)
|
|
self.assertEqual(in_ports[0].bytes, 126)
|
|
self.assertEqual(in_ports[0].count, 1)
|
|
|
|
# Third flow has changes.
|
|
flow_db.begin()
|
|
flow_db.flow_line_add(lines[4])
|
|
sum_values = flow_db.field_values_in_order("all", 1)
|
|
in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
|
|
self.assertEqual(len(in_ports), 1)
|
|
self.assertEqual(in_ports[0].packets, 4)
|
|
self.assertEqual(in_ports[0].bytes, 480)
|
|
self.assertEqual(in_ports[0].count, 2)
|
|
|
|
in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
|
|
self.assertEqual(len(in_ports), 1)
|
|
self.assertEqual(in_ports[0].packets, 2)
|
|
self.assertEqual(in_ports[0].bytes, 126)
|
|
self.assertEqual(in_ports[0].count, 1)
|
|
|
|
# First flow reset.
|
|
flow_db.begin()
|
|
flow_db.flow_line_add(lines[5])
|
|
sum_values = flow_db.field_values_in_order("all", 1)
|
|
in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
|
|
self.assertEqual(len(in_ports), 1)
|
|
self.assertEqual(in_ports[0].packets, 3)
|
|
self.assertEqual(in_ports[0].bytes, 360)
|
|
self.assertEqual(in_ports[0].count, 2)
|
|
|
|
in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
|
|
self.assertEqual(len(in_ports), 1)
|
|
self.assertEqual(in_ports[0].packets, 2)
|
|
self.assertEqual(in_ports[0].bytes, 126)
|
|
self.assertEqual(in_ports[0].count, 1)
|
|
|
|
def test_parse_character_errors(self):
|
|
""" test_parsing errors.
|
|
The flow parses is purposely loose. Its not designed to validate
|
|
input. Merely pull out what it can but there are situations
|
|
that a parse error can be detected.
|
|
"""
|
|
|
|
lines = ["complete garbage",
|
|
"in_port(2),eth(src=68:ef:bd:25:ef:c0,"
|
|
"dst=33:33:00:00:00:66),"
|
|
"eth_type(0x86dd),ipv6(src=fe80::6aef:bdff:fe25:efc0/::,"
|
|
"dst=ff02::66/::,label=0/0,proto=17/0xff,tclass=0xe0/0,"
|
|
"hlimit=255/0,frag=no/0),udp(src=2029,dst=2029),"
|
|
"packets:2,bytes:5026,actions:1"]
|
|
|
|
flow_db = FlowDB(False)
|
|
flow_db.begin()
|
|
for line in lines:
|
|
try:
|
|
flow_db.flow_line_add(line)
|
|
except ValueError:
|
|
# We want an exception. That is how we know we have
|
|
# correctly found a simple parsing error. We are not
|
|
# looking to validate flow output just catch simple issues.
|
|
continue
|
|
self.assertTrue(False)
|
|
|
|
def test_tunnel_parsing(self):
|
|
""" test_tunnel_parsing test parse flows with tunnel. """
|
|
lines = [
|
|
"tunnel(tun_id=0x0,src=192.168.1.1,dst=192.168.1.10,"
|
|
"tos=0x0,ttl=64,flags(key)),in_port(1),"
|
|
"eth(src=9e:40:f5:ef:ec:ee,dst=01:23:20:00:00:30),"
|
|
"eth_type(0x8902), packets:6, bytes:534, used:0.128s, "
|
|
"actions:userspace(pid=4294962691,slow_path(cfm))"
|
|
]
|
|
flow_db = FlowDB(False)
|
|
flow_db.begin()
|
|
flow_db.flow_line_add(lines[0])
|
|
sum_values = flow_db.field_values_in_order("all", 1)
|
|
in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
|
|
self.assertEqual(len(in_ports), 1)
|
|
self.assertEqual(in_ports[0].packets, 6)
|
|
self.assertEqual(in_ports[0].bytes, 534)
|
|
self.assertEqual(in_ports[0].count, 1)
|
|
|
|
def test_flow_multiple_paren(self):
|
|
""" test_flow_multiple_paren. """
|
|
line = "tunnel(tun_id=0x0,src=192.168.1.1,flags(key)),in_port(2)"
|
|
valid = ["tunnel(tun_id=0x0,src=192.168.1.1,flags(key))",
|
|
"in_port(2)"]
|
|
rc = flow_line_iter(line)
|
|
self.assertEqual(valid, rc)
|
|
|
|
def test_to_network(self):
|
|
""" test_to_network test ipv4_to_network and ipv6_to_network. """
|
|
ipv4s = [
|
|
("192.168.0.1", "192.168.0.1"),
|
|
("192.168.0.1/255.255.255.255", "192.168.0.1"),
|
|
("192.168.0.1/255.255.255.0", "192.168.0.0"),
|
|
("192.168.0.1/255.255.0.0", "192.168.0.0"),
|
|
("192.168.0.1/255.0.0.0", "192.0.0.0"),
|
|
("192.168.0.1/0.0.0.0", "0.0.0.0"),
|
|
("10.24.106.230/255.255.255.255", "10.24.106.230"),
|
|
("10.24.106.230/255.255.255.0", "10.24.106.0"),
|
|
("10.24.106.0/255.255.255.0", "10.24.106.0"),
|
|
("10.24.106.0/255.255.252.0", "10.24.104.0")
|
|
]
|
|
|
|
ipv6s = [
|
|
("1::192:168:0:1", "1::192:168:0:1"),
|
|
("1::192:168:0:1/1::ffff:ffff:ffff:ffff", "1::192:168:0:1"),
|
|
("1::192:168:0:1/1::ffff:ffff:ffff:0", "1::192:168:0:0"),
|
|
("1::192:168:0:1/1::ffff:ffff:0:0", "1::192:168:0:0"),
|
|
("1::192:168:0:1/1::ffff:0:0:0", "1::192:0:0:0"),
|
|
("1::192:168:0:1/1::0:0:0:0", "1::"),
|
|
("1::192:168:0:1/::", "::")
|
|
]
|
|
|
|
for (ipv4_test, ipv4_check) in ipv4s:
|
|
self.assertEqual(ipv4_to_network(ipv4_test), ipv4_check)
|
|
|
|
for (ipv6_test, ipv6_check) in ipv6s:
|
|
self.assertEqual(ipv6_to_network(ipv6_test), ipv6_check)
|
|
|
|
def test_ui(self):
|
|
""" test_ui: test expected ui behavior. """
|
|
#pylint: disable=W0212
|
|
top_render = Render(80, Render.FIELD_SELECT_TOP)
|
|
script_render = Render(80, Render.FIELD_SELECT_SCRIPT)
|
|
self.assertEqual(top_render._field_type_select_get(), "in_port")
|
|
self.assertEqual(script_render._field_type_select_get(), "all")
|
|
#pylint: enable=W0212
|