import os import struct from itertools import imap, ifilter import logging from collections import namedtuple, defaultdict import hashlib logger = logging.getLogger('pcaputil') TCP_PROTOCOL = 6 TCP_SYN = 2 TCP_FIN = 1 def is_tcp(pkt): return (len(pkt) > 47 and pkt[12:14] == '\x08\x00' # Ethernet.Type == IP and ord(pkt[23]) == TCP_PROTOCOL) def is_tcp_data(pkt): return (is_tcp(pkt) and (not ord(pkt[47]) & (TCP_SYN | TCP_FIN))) def is_tcp_syn(pkt): return (is_tcp(pkt) and (ord(pkt[47]) & TCP_SYN)) TCP = namedtuple('TCP', ['src', 'dst', 'flags', 'seq', 'ack', 'window', 'data']) def tcp_parse(pkt): tcp_offset = 14 + (ord(pkt[14]) & 0xf) * 4 src, dst, seq, ack, data_offset16, flags, window = struct.unpack( '>HHIIBBH', pkt[tcp_offset:tcp_offset+16]) data_offset = tcp_offset + (data_offset16 >> 2) if data_offset != 66: logger.debug("WHOOHOO") return TCP(src, dst, flags, seq, ack, window, pkt[data_offset:]) def myparsepcap(filename): with open(filename, 'r') as fd: file_size = os.stat(filename).st_size i = 0 i = 24 hdr_len = 8 + 4 + 4 i_pkt = 0 preamble = fd.read(24) while i + hdr_len < file_size: ts, l1, l2 = struct.unpack('QII', fd.read(hdr_len)) if l1 < l2: logger.debug("short recorded packet: #%s,%s: %s < %s" % (i_pkt, i, l1, l2)) pkt = fd.read(l1) if is_tcp_data(pkt): src, dst, tcp_payload = tcp_parse(pkt) yield ts, src, dst, tcp_payload i_pkt += 1 i += hdr_len + l1 def get_conversations(file): convs = {} port_counts = {} for ts, src, dst, data in myparsepcap(file): key = (src,dst) if key not in convs: convs[key] = [ts] convs[key].append(data) ports = [src for src,dst in convs.keys()] server_port = sorted(ports)[0] times = {} for src, dst in convs.keys(): if src != server_port: src, dst = dst, src server = convs[(src, dst)] client = convs[(dst, src)] assert(client[0] < server[0]) # client sends first msg times[client[0]] = (client, server) return [map(lambda x: ''.join(x[1:]), times[t]) for t in sorted(times.keys())] class Conversation(object): def __init__(self, npkt, iter): self.npkt = npkt self.iter = iter def conversations_iter(packet_iter, **keyed_iters): convs = {} keys = {(False, True): 'server', (False, False): 'client', (True, True): 'server_start', (True, False): 'client_start'} filter_result = ((lambda x: x is not None and keyed_iters['filter_result'](x)) if 'filter_result' in keyed_iters else (lambda x: x is not None)) for i_pkt, (src, dst, payload) in enumerate(packet_iter): logger.debug("%3d: conversation_iter %s->%s #(%s)" % (i_pkt, src, dst, len(payload))) key = (src, dst) if key not in convs: iter_gen = keyed_iters[keys[(True, src < dst)]] convs[key] = Conversation(npkt=1, iter=iter_gen(*key)) convs[key].iter.next() else: conv = convs[key] conv.npkt += 1 result = convs[key].iter.send((src, dst, payload)) if filter_result(result): yield result class TCPConversation(object): def __init__(self, npkt, min_next_seq, iter): self.npkt = npkt self.min_next_seq = min_next_seq self.iter = iter def tcp_conversations_iter(packet_iter, **keyed_iters): """ conversation_start_iter is used when we catch the SYN packet, otherwise we use conversation_iter. the later is expected to be used by the former. """ convs = {} prints = set() keys = {(False, True): 'server', (False, False): 'client', (True, True): 'server_start', (True, False): 'client_start'} filter_result = ((lambda x: x is not None and keyed_iters['filter_result'](x)) if 'filter_result' in keyed_iters else (lambda x: x is not None)) for i_pkt, tcp in enumerate(non_reordering_tcp_iter(packet_iter)): src, dst, flags, payload = tcp.src, tcp.dst, tcp.flags, tcp.data logger.debug("%3d: conversation_iter %s->%s #(%s)" % (i_pkt, src, dst, len(payload))) key = (src, dst) fingerprint = hashlib.md5(payload).digest() if key not in convs: iter_gen = keyed_iters[keys[(flags & TCP_SYN > 0, src < dst)]] convs[key] = TCPConversation(npkt=1, iter=iter_gen(*key), min_next_seq = tcp.seq + len(payload) + (1 if flags & TCP_SYN else 0)) convs[key].iter.next() else: conv = convs[key] conv.npkt += 1 seq_change = tcp.seq - conv.min_next_seq next_min_seq = max(conv.min_next_seq, tcp.seq + len(payload)) logger.debug("%4d: seq changed by %s, expect +%s" % ( conv.npkt, seq_change, next_min_seq - conv.min_next_seq)) if seq_change < 0 and fingerprint not in prints: print "busted!" conv.min_next_seq = next_min_seq if seq_change < 0: continue prints.add(fingerprint) result = convs[key].iter.send((src, dst, payload)) if filter_result(result): yield result def tcp_conversations_iter_with_sequence(packet_iter, **keyed_iters): """ conversation_start_iter is used when we catch the SYN packet, otherwise we use conversation_iter. the later is expected to be used by the former. """ conversations = {} keys = {(False, True): 'server', (False, False): 'client', (True, True): 'server_start', (True, False): 'client_start'} filter_result = ((lambda x: x is not None and keyed_iters['filter_result'](x)) if 'filter_result' in keyed_iters else (lambda x: x is not None)) outstanding = defaultdict(lambda: []) seqs = {} seqs_start = {} acks = {} def show_seq(key): return '%s (R%s)' % (seqs[key], sum(seqs[key]) - seqs_start[key]) for i_pkt, tcp in enumerate(non_reordering_tcp_iter(packet_iter)): src, dst, flags, seq, payload = tcp.src, tcp.dst, tcp.flags, tcp.seq, tcp.data logger.debug("%3d: conversation_iter %s->%s #(%s)" % (i_pkt, src, dst, len(payload))) key = (src, dst) rev_key = (dst, src) if key not in conversations: iter_gen = keyed_iters[keys[(flags & TCP_SYN > 0, src < dst)]] conversations[key] = iter_gen(*key) conversations[key].next() seqs[key] = (seq , 0) # sequence# of first byte in payload, payload length - next packet should have seq of a+b seqs_start[key] = seq acks[key] = tcp.ack logger.debug("_ %s setting seq to %s" % ( key, show_seq(key))) outstanding[key].append(tcp) removed = [] for (the_seq, i, ot) in sorted( (ot.seq, i, ot) for i, ot in enumerate(outstanding[key])): expected = sum(seqs[key]) if expected == the_seq: logger.debug("A %s (%s,%s) %s" % (str(key), len(payload), flags & TCP_SYN > 0, show_seq(key))) removed.append(i) result = conversations[key].send((ot.src, ot.dst, ot.data)) if filter_result(result): yield result seqs[key] = (the_seq, max(len(ot.data), (ot.flags & TCP_SYN) / 2)) else: logger.debug("C %s +%s == %s != %s, ack = %s" % ( key, seqs[key], show_seq(key), the_seq, acks.get(rev_key, None))) for i in reversed(removed): del outstanding[key][i] if len(removed) == 0: logger.debug("B %s: seq %s" % (key, show_seq(key))) #print "B %s: %s" % (key, [(x.seq-seqs[key], len(x.data)) for x in outstanding[key]]) #import pdb; pdb.set_trace() def consume_packets(packets, needed_len): """ >>> pcaputil.consume_packets(['01234','5678','90ab','cdef'], 7) ('0123456', ['78', '90ab', 'cdef']) """ pkt = None ret_packets = packets total_len = sum(map(len, packets)) if total_len >= needed_len: pkt = ''.join(packets) if total_len > needed_len: part_len = 0 for i, p in enumerate(packets): if part_len + len(p) > needed_len: ret_packets = [p[needed_len - part_len:]] + packets[i+1:] break part_len += len(p) pkt = pkt[:needed_len] assert(sum(map(len, ret_packets)) + len(pkt) == total_len) else: ret_packets = [] assert(pkt is None or len(pkt) == needed_len) return pkt, ret_packets def ident(x, **kw): return x CollectorResult = namedtuple('CollectorResult', ('src', 'dst', 'msg')) CollectorPacket = namedtuple('CollectorPacket', ('header', 'data')) def collect_packets(header_iter_gen): """ Example: >>> list(pcaputil.send_multiple(pcaputil.collect_packets(lambda src, dst: iter([(10, ident, lambda h: 20, ident)]*2))(17, 42), [(42, 17, ' '*30)])) [(' ', ' ')] """ def collector(start_src, start_dst): packets = [] history = [] output = [] src, dst = start_src, start_dst header_iter = header_iter_gen(src, dst) hdr_size, hdr_ctor, size_from_hdr, pkt_ctor = header_iter.next() msg = None header = pkt = None searched_size = hdr_size while True: collector_result = CollectorResult(src, dst, msg) output.append(collector_result) src, dst, payload = yield collector_result packets.append(payload) history.append(payload) logger.debug("collect_packets: (%s->%s) searching for %s (%s)" % (src, dst, searched_size, sum(map(len, packets)))) data, packets = consume_packets(packets, searched_size) while data != None: logger.debug("collect_packets: (%s->%s) %s" % (src, dst, len(data))) if header is None: header = hdr_ctor(data) if header is None: # invalid header break logger.debug("collect_packets: (%s->%s) header of length %s, expect %s" % ( src, dst, len(data), size_from_hdr(header))) searched_size = size_from_hdr(header) else: logger.debug("collect_packets: (%s->%s) packet of length %s" % (src, dst, len(data))) pkt = pkt_ctor(data, src=src, dst=dst, header=header) msg = CollectorPacket(header, pkt) hdr_size, hdr_ctor, size_from_hdr, pkt_ctor = header_iter.next() header = None searched_size = hdr_size data, packets = consume_packets(packets, searched_size) return collector def send_multiple(gen, sent_iter): sent_iter = iter(sent_iter) gen.next() try: while True: next = sent_iter.next() yield gen.send(next) except StopIteration: return def header_conversation_iter(packet_iter, **kw): opts = dict([(k, collect_packets(v)) for k,v in kw.items() if k != 'filter_result']) if 'filter_result' in kw: opts['filter_result'] = kw['filter_result'] return conversations_iter(packet_iter, **opts) def tcp_data_iter(packet_iter): return ifilter(lambda src, dst, flags, data: len(data) > 0, imap(tcp_parse, ifilter(is_tcp_data, imap(lambda x: ''.join(x[1]), packet_iter)))) def non_reordering_tcp_iter(packet_iter): return ifilter(lambda t: t.flags & (TCP_SYN | TCP_FIN) or len(t.data) > 0, imap(tcp_parse, ifilter(is_tcp, imap(lambda x: ''.join(x[1]), packet_iter)))) def iter_conv(n_tcp, src, dst): for t in n_tcp: if t.src == src and t.dst == dst: yield t def packet_iter(dev): import mypcap as pcap def filter_none(): for t in pcap.pcap(dev): if t is None: print "another None from pcap??" else: yield t return filter_none() def test(): logging.basicConfig(level=logging.DEBUG,filename="testout.log") p = packet_iter('lo') class Handler(object): def __init__(self, src, dst): self.src = src self.dst = dst self.data = [] def __iter__(self): data = '' src, dst = self.src, self.dst while True: src, dst, data = yield None print ('%s->%s %s' % (src, dst, len(data))) self.data.append(data) def handler(src, dst): return iter(Handler(src, dst)) server = client = server_start = client_start = handler for b in conversations_iter(p, server=server,client=client, server_start=server_start,client_start=client_start): print b if __name__ == '__main__': test()