Question

I'm trying to create a packet sniffer using pcapy and impacket. I'm stuck with data extraction phase. Unfortunately impacket is not properly documented. At least i could n't find one. Could anyone tel me where to find the documentation or what functions i could use to extract data from captured packet?

edit

my current code

import datetime
import pcapy
import sys
from impacket.ImpactPacket import *
from impacket.ImpactDecoder import *

def main(argv):

    dev='ppp0'
    print "Sniffing device " + dev

    cap = pcapy.open_live(dev , 65536 , 1 , 0)

    while(1) :
        try:
            (header, packet) = cap.next()            
            eth= LinuxSLLDecoder().decode(packet)
            ip=eth.child()  #internet layer
            trans=ip.child()#transport layer

            try:                
                print 'protocol=',
                if ip.get_ip_p() == UDP.protocol:
                    print 'UDP'
                if ip.get_ip_p() == TCP.protocol:
                    print 'TCP','port=',trans.get_th_dport()
                    print trans.child()
                if ip.get_ip_p() == ICMP.protocol:
                    print 'ICMP'

                print 'src=',ip.get_ip_src(),'dest=',ip.get_ip_dst()

                print ''

            except:
                pass


        except pcapy.PcapError:
            continue             


if __name__ == "__main__":
  main(sys.argv)

Sample Output

src= xxx.xxx.xxx.xx dest= xx.xxx.xx.xx

protocol= TCP port= 443

1703 0300 2400 0000 0000 0000 07e2 a2a5    ....$...........
09fe 5b15 3cf1 803d 0c83 8ada 082e 8269    ..[.<..=.......i
0007 8b33 7d6b 5c1a 01                     ...3}k\..

What i want to do is extract more data, For example extract the url (if there is a url in packet)

Was it helpful?

Solution 2

I ran into similar problem. I guess when there is no documentation, the best documentation is the source code! And with python we are lucky to have source code most of the time. Anyway, I would suggest looking into ImpactDecoder.py and ImpactPacket.py. First one give some insights as far as how packets get decoded and second gives information on actual packets as a class and their methods. For instance, ImpactPacket.py and class PacketBuffer has following methods that you were probably looking for::

def set_bytes_from_string(self, data):
    "Sets the value of the packet buffer from the string 'data'"
    self.__bytes = array.array('B', data)

def get_buffer_as_string(self):
    "Returns the packet buffer as a string object"
    return self.__bytes.tostring()

def get_bytes(self):
    "Returns the packet buffer as an array"
    return self.__bytes

def set_bytes(self, bytes):
    "Set the packet buffer from an array"
    # Make a copy to be safe
    self.__bytes = array.array('B', bytes.tolist())

def set_byte(self, index, value):
    "Set byte at 'index' to 'value'"
    index = self.__validate_index(index, 1)
    self.__bytes[index] = value

def get_byte(self, index):
    "Return byte at 'index'"
    index = self.__validate_index(index, 1)
    return self.__bytes[index]

def set_word(self, index, value, order = '!'):
    "Set 2-byte word at 'index' to 'value'. See struct module's documentation to understand the meaning of 'order'."
    index = self.__validate_index(index, 2)
    ary = array.array("B", struct.pack(order + 'H', value))
    if -2 == index:
        self.__bytes[index:] = ary
    else:
        self.__bytes[index:index+2] = ary

def get_word(self, index, order = '!'):
    "Return 2-byte word at 'index'. See struct module's documentation to understand the meaning of 'order'."
    index = self.__validate_index(index, 2)
    if -2 == index:
        bytes = self.__bytes[index:]
    else:
        bytes = self.__bytes[index:index+2]
    (value,) = struct.unpack(order + 'H', bytes.tostring())
    return value

def set_long(self, index, value, order = '!'):
    "Set 4-byte 'value' at 'index'. See struct module's documentation to understand the meaning of 'order'."
    index = self.__validate_index(index, 4)
    ary = array.array("B", struct.pack(order + 'L', value))
    if -4 == index:
        self.__bytes[index:] = ary
    else:
        self.__bytes[index:index+4] = ary

def get_long(self, index, order = '!'):
    "Return 4-byte value at 'index'. See struct module's documentation to understand the meaning of 'order'."
    index = self.__validate_index(index, 4)
    if -4 == index:
        bytes = self.__bytes[index:]
    else:
        bytes = self.__bytes[index:index+4]
    (value,) = struct.unpack(order + 'L', bytes.tostring())
    return value

def set_long_long(self, index, value, order = '!'):
    "Set 8-byte 'value' at 'index'. See struct module's documentation to understand the meaning of 'order'."
    index = self.__validate_index(index, 8)
    ary = array.array("B", struct.pack(order + 'Q', value))
    if -8 == index:
        self.__bytes[index:] = ary
    else:
        self.__bytes[index:index+8] = ary

def get_long_long(self, index, order = '!'):
    "Return 8-byte value at 'index'. See struct module's documentation to understand the meaning of 'order'."
    index = self.__validate_index(index, 8)
    if -8 == index:
        bytes = self.__bytes[index:]
    else:
        bytes = self.__bytes[index:index+8]
    (value,) = struct.unpack(order + 'Q', bytes.tostring())
    return value


def get_ip_address(self, index):
    "Return 4-byte value at 'index' as an IP string"
    index = self.__validate_index(index, 4)
    if -4 == index:
        bytes = self.__bytes[index:]
    else:
        bytes = self.__bytes[index:index+4]
    return socket.inet_ntoa(bytes.tostring())

def set_ip_address(self, index, ip_string):
    "Set 4-byte value at 'index' from 'ip_string'"
    index = self.__validate_index(index, 4)
    raw = socket.inet_aton(ip_string)
    (b1,b2,b3,b4) = struct.unpack("BBBB", raw)
    self.set_byte(index, b1)
    self.set_byte(index + 1, b2)
    self.set_byte(index + 2, b3)
    self.set_byte(index + 3, b4)

The other super useful class from ImpactPacket.py is ProtocolLayer, that gives us following methods::

def child(self):
    "Return the child of this protocol layer"
    return self.__child

def parent(self):
    "Return the parent of this protocol layer"
    return self.__parent

So, basically impacket uses matreshka doll approach, and you can go to any layer you want using child and parent methods and use any methods of the PacketBuffer class on any layer. Pretty cool, huh? Furthermore, particular layers (or packets) have their specific methods but you would have to go dig ImpactPacket.py and ImpactDecoder.py if you want to find more about those.

Good luck and cheers mate!

OTHER TIPS

Here is an example for a syn-port scanner with pcap and python and impacket. Maybe you can tak the important parts out of it.

'''
synscan.py ...
see scan.py for parameters

this works extremely well an a windows that likes to communicate
scanning hosts in same ethernet is possible
scanning host not within the same ethernet may success but does not need to

many algorithms were tried

-   raw socket support needs higher previleges
    and is impossible because windows does not allow to sniff with them
    or to submit sniffable packets
->  not implemented here

    "Why do you need special libraries for TCP-SYN scans?"
    thats why.

using pcap the program is devided into phases
usually it succeeds in phase 1.
phase 0:
    add targets and phase 1
phase 1+: (parallel)
    send arp request to resolve target
    bombard it with the right packets
    sniff
phase 2:
    send out udp to resolve mac address by sniffing
    send out raw socket tcp syn requests (need higher previleges) optional
phase 3:
    if not yet succeeded in phase 1: = mac not found
    bombard all macs with packets
phase 4:
    bombard broadcasting [mac ff:ff:ff:ff:ff:ff] with packets
phase 5:
    clean up - no use

use DEBUG_PHASE to show phases

currently only ipv4 is supported

'''


import sys
import time
import thread
import pcap # pcapy
import impacket
import random

import impacket.ImpactDecoder as ImpactDecoder
import impacket.ImpactPacket as ImpactPacket

import array

import scan
from scan import *

DEFAULT_SOCKET_TIMEOUT = 20
NOTIFY_TIMEOUT = 2

# argument incdeces for socket.socket(...)
SOCK_INIT_FAMILY = 0
SOCK_INIT_TYPE = 1
SOCK_INIT_PROTO = 2

STATE_STATE = 1
STATE_TIME = 0

PCAP_ARGS = ()
PCAP_KW = dict(promisc = True, timeout_ms = 0)

DEBUG = False
DEBUG_IFACE =   False and DEBUG # put out which devices are set up
DEBUG_IP =      False and DEBUG # print ip debug output for ip packets v4
DEBUG_ARP =     False and DEBUG # send arp communication debug out
DEBUG_SYN =     False and DEBUG # print out the syn requests sent
DEBUG_PACKET =  False and DEBUG # packet inspection as seen by scanner
DEBUG_PHASE =   True and DEBUG # scanner phases - 5
DEBUG_STATE =   False and DEBUG # debug output about the state
DEBUG_PHASE2 =  False and DEBUG # debug output about what is sent in phase 2
                                 # you need higher previleges for some of these operations

ETHER_BROADCAST = (0xff,) * 6 # mac ff:ff:ff:ff:ff:ff


# --- Conversions --------------------------------------------------------------

def ip_tuple(ip):
    '''Decode an IP address [0.0.0.0] to a tuple of bytes'''
    return tuple(map(int, ip.split('.')))

def tuple_ip(ip):
    '''Encode a a tuple of bytes to an IP address [0.0.0.0]'''
    return '.'.join(map(str, (ip[0], ip[1], ip[2], ip[3])))

# --- Packet Creation --------------------------------------------------------------

def generate_empty_arp_request():
    # build ethernet frame
    eth = ImpactPacket.Ethernet()
    eth.set_ether_type(0x0806)          # this is an ARP packet
    eth.set_ether_dhost(ETHER_BROADCAST)# destination host (broadcast)

    # build ARP packet
    arp = ImpactPacket.ARP()
    arp.set_ar_hrd(1)
    arp.set_ar_hln(6)                   # ethernet address length = 6
    arp.set_ar_pln(4)                   # ip address length = 4
    arp.set_ar_pro(0x800)               # protocol: ip
    arp.set_ar_op(1)                    # opcode: request
    arp.set_ar_tha(ETHER_BROADCAST)     # target hardware address (broadcast)
    eth.contains(arp)
    return eth, arp

def generate_empty_ip_packet():
    eth = ImpactPacket.Ethernet()
    #### values to be set:
    # type, shost, dhost
    eth.set_ether_type(0x800)

    ip = ImpactPacket.IP()

    #### values to be set:
    # version, IHL, TOS, total_length, ID, Flags, Fragment offset, 
    # TTL, Protocol, Checksum, source_addr, destination_addr, options
    ip.set_ip_v(4)  
    ip.set_ip_hl(5)   # 5 * 32 bit
    ip.set_ip_tos(0)  # usal packet -> type of service = 0
    # total_length
    ip.set_ip_id(random.randint(1, 0xffff)) 
    ip.set_ip_df(0)   # flags redundant
    ip.set_ip_off(0)
    ip.set_ip_ttl(250)
    ip.set_ip_p(6)    # tcp = 6
    eth.contains(ip)
    return eth, ip

# --- Scanner --------------------------------------------------------------


def start_scan(timeout):
    '''return a scanner object


'''
    # mac addresses are used to send ethernet packages
    mac_addresses = {} # ip : set([mac])
    # threadsave access to the targets
    targets_lock = thread.allocate_lock()
    targets = [] # (family, (ip, port, ...))
    # list of target names
    target_hosts = set() # host ips

    def is_target(host):
        return host in target_hosts

    def add_target(family, address):
        target_hosts.add(address[IP])
        mac_addresses.setdefault(address[IP], set())
        with targets_lock:
            targets.append((family, address))

    def store_ip_mac_resolution_for(host):

        for family, socktype, proto, canonname, address in \
                                    socket.getaddrinfo(host, 0):
            mac_addresses.setdefault(address[IP], set())

    def associate_ip_mac(ip, mac):
        if ip in mac_addresses or is_target(ip):
            if type(mac) is list:
                hashable_array_constructor = ('B', ''.join(map(chr, mac)))
            else:
                hashable_array_constructor = (mac.typecode, mac.tostring())
            mac_addresses[ip].add(hashable_array_constructor)

    def get_macs(host):
        macs = set()
        empty_set = set()
        for family, socktype, proto, canonname, (ip, port) in \
                                    socket.getaddrinfo(host, 0):
            macs.update(mac_addresses.get(ip, empty_set))
        return [array.array(*mac) for mac in macs]

    def get_local_macs():
        macs = set()
        for ip in get_host_ips():
            for mac in get_macs(ip):
                macs.add((ip, tuple(mac.tolist())))
        return macs

    def ip_known(ip):
        return bool(mac_addresses.get(ip, False))

    def save_ip_mac_resolution(ether, ip_header):
        source_ip = ip_header.get_ip_src()
        source_mac = ether.get_ether_shost()
        associate_ip_mac(source_ip, source_mac)

        destination_ip = ip_header.get_ip_dst()
        destination_mac = ether.get_ether_dhost()
        associate_ip_mac(destination_ip, destination_mac)

    ## parse data directly from pcap

    def find_connection_response(data):
        # Parse the Ethernet packet
        decoder = ImpactDecoder.EthDecoder()
        find_connection_response_ethernet(decoder.decode(data))

    def find_connection_response_ethernet(ether):
        eth_type = ether.get_ether_type()
        if eth_type == 0x800: 
            # Received an IP-packet (2048)
            # Parse the IP packet inside the Ethernet packet
            find_connection_response_ip(ether, ether.child())
        elif eth_type == 0x0806:
            store_mac_of_target(ether)

    ## arp response handling

    def store_mac_of_target(ether):
        arp = ether.child()
        if arp.get_ar_op() in (2, ):
            if DEBUG_ARP:print 'response'
            # Received ARP Response
            source_mac_addr         = arp.get_ar_sha()
            source_ip_addr          = tuple_ip(arp.get_ar_spa())
            destination_mac_addr    = arp.get_ar_tha()
            destination_ip_addr     = tuple_ip(arp.get_ar_tpa())
            if DEBUG_ARP:print source_mac_addr, source_ip_addr, destination_mac_addr, destination_ip_addr
            if is_target(destination_ip_addr):
                if DEBUG_ARP:print 'intersting:', destination_ip_addr, destination_mac_addr
                associate_ip_mac(destination_ip_addr, destination_mac_addr)
            if is_target(source_ip_addr):
                if DEBUG_ARP:print 'intersting:', source_ip_addr, source_mac_addr
                associate_ip_mac(source_ip_addr, source_mac_addr)

    ## tcp syn-ack response handling

    def find_connection_response_ip(ether, ip_header):
        save_ip_mac_resolution(ether, ip_header)
        if ip_header.get_ip_p() == 0x6:
            # Received a TCP-packet
            # Parse the TCP packet inside the IP packet
            if DEBUG_IP > 2:
                print 'received ip packet: %s to %s' % (ip_header.get_ip_src(), \
                                                      ip_header.get_ip_dst())
            source_ip = ip_header.get_ip_src()
            destination_ip = ip_header.get_ip_dst()
            if not is_target(source_ip):
                return
            if DEBUG_IP > 1:print 'found interest in: %s' % ip_header.get_ip_src()
            find_connection_response_tcp(ip_header, ip_header.child())

    def find_connection_response_tcp(ip_header, tcp_header):
        # Only process SYN-ACK packets
        source_ip = ip_header.get_ip_src()
        source_port = tcp_header.get_th_sport()
        destination_ip = ip_header.get_ip_dst()
        destination_port = tcp_header.get_th_sport()
        print targets
        if tcp_header.get_SYN() and tcp_header.get_ACK():

            # Get the source and destination IP addresses

            # Print the results
            if DEBUG_IP: print("Connection attempt %s:(%s) <- %s:%s" % \
                              (source_ip, source_port, \
                               destination_ip, destination_port))
            if source_ip in target_hosts:
                put_port(source_port)
        elif tcp_header.get_SYN() and not tcp_header.get_ACK() and source_ip in get_host_ips():
            # someone sent a syn request along
            # asuming the acknoledge will come here, too
            target = (socket.AF_INET, (destination_ip, destination_port))
            if DEBUG_IP: print("Connection attempt %s:(%s) --> %s:%s" % \
                              (source_ip, source_port, \
                               destination_ip, destination_port))
            with targets_lock:
                try:
                    targets.remove(target)
                except ValueError:
                    pass

    def put_port(port):
        sys.stdout.write(str(port) + '\n')

    ## syn packet sending

    def send_syn(family, addr):
        if family == socket.AF_INET:
            send_syn_ipv4(addr)
        elif family == socket.AF_INET6:
            pass
        else:
            sys.stderr.write('Warning: in send_syn: family %s not supported\n' \
                             % family)

    def send_syn_ipv4(address):
        for packet in iter_syn_packets(address):
            if DEBUG_PACKET:
                print 'packet', id(packet)
            send_packet(packet)

    def iter_syn_packets(address):
        for tcp in iter_tcp_packets(address):
            for eth, ip in iter_eth_packets(address):
                ip.contains(tcp)
                packet = eth.get_packet()
                yield packet

    def get_host_ips():
        return socket.gethostbyname_ex(socket.gethostname())[2]

    def iter_eth_packets((target_ip, port)):
        eth, ip = generate_empty_ip_packet()
        for source_ip in get_host_ips():
            ip.set_ip_src(source_ip)
            ip.set_ip_dst(target_ip)
            for source_mac in get_macs(source_ip):
                eth.set_ether_shost(source_mac)
                for target_mac in get_macs(target_ip):
                    eth.set_ether_dhost(target_mac)
                    yield eth, ip

    def get_devices():
        return scanning.values()

    def iter_tcp_packets((_, target_port)):
        tcp = ImpactPacket.TCP()
        #### values to set:
        # source port, destination port, sequence number, window, flags
        source_port = random.randint(2048, 0xffff)
        tcp.set_th_sport(source_port)
        tcp.set_th_dport(target_port)
        tcp.set_th_seq(random.randint(1, 0x7fffffff))
        tcp.set_th_win(32768) # window -> discovered this as default
        tcp.set_SYN()
        yield tcp

    # waiting and scanner interaction

    keep_running = [1] # True
    def wait():
        if keep_running:
            keep_running.pop() # keep_running = False
        while scanning:
            time.sleep(0.01)
##        raw_input()

    def add_scan((socketargs, addr)):
        ip = addr[IP]
        port = addr[PORT]
        family = socketargs[SOCK_INIT_FAMILY]
        if ip_known(ip):
            send_syn(family, addr)
        else:
            add_target(family, addr)
            notify(family, addr)

    notified = {}
    def notify(family, addr):
        now = time.time()
        if family == socket.AF_INET:
            ip = addr[IP]
            if notified.get(ip, 0) < now - NOTIFY_TIMEOUT:
                notified[ip] = now
                send_who_is_ipv4(ip)
        elif family == socket.AF_INET6:
            pass
        else:
            raise ValueError('unknown protocol family type %i' % family)

    scanning_lock = thread.allocate_lock()
    scanning = {} # device_name : device

    def send_who_is_ipv4(target_ip):
        eth, arp = generate_empty_arp_request()
        arp.set_ar_tpa(ip_tuple(target_ip)) # target protocol address

        for ip, mac in get_local_macs():
            arp.set_ar_spa(ip_tuple(ip))    # source protocol address
            arp.set_ar_sha(mac)             # source hardware address
            eth.set_ether_shost(mac)        # source hardware address
            if DEBUG_ARP: print 'send_who_is_ipv4: %s%s -> %s' % (ip, mac, target_ip)
            send_packet(eth.get_packet())

    def send_packet(packet):
        t = -time.time()
        for device in get_devices():
            if DEBUG_PACKET:print device, repr(packet)
            device.sendpacket(packet)
        t -= time.time() - 0.002
        if t > 0:
            time.sleep(t)

    def scan(device_name, device):
        if DEBUG_IFACE: print 'dev up: %s' % device_name
        with scanning_lock:
            if device_name in scanning:
                return
            scanning[device_name] = device
        try:
            while device_name in scanning:
                time, data = next(device)
                find_connection_response(str(data))

        finally:
            with scanning_lock:
                scanning.pop(device_name, None )
            if DEBUG_IFACE: print 'dev down: %s' % device_name


    def start_scans():
        for device_name in pcap.findalldevs():
            start_scan(device_name)
        start_scan(pcap.lookupdev())

    def start_scan(device_name):
        device = pcap.pcap(device_name, *PCAP_ARGS, **PCAP_KW)
        thread.start_new(scan, (device_name, device))

    def notify_loop():
        targets_lock.acquire()
        while targets or phase:
            targets_lock.release()
            try:
                do_notify()
            except:
                traceback.print_exc()
            # iterate over scanner phases
            try:
                phases[0]()
            except:
                traceback.print_exc()
            targets_lock.acquire()
        targets_lock.release()

    def get_state():
        return len(targets)

    last_state = [time.time(), get_state()]

    def state_has_not_changed_for(timeout):
        now = time.time()
        state = get_state()
        if state != last_state[STATE_STATE]:
            last_state[STATE_TIME] = now
            last_state[STATE_STATE] = state
        if DEBUG_STATE: print 'state old:', last_state[STATE_TIME] + timeout < now
        return last_state[STATE_TIME] + timeout < now

    def reset_state():
        now = time.time()
        state = get_state()
        last_state[STATE_TIME] = now
        last_state[STATE_STATE] = state

    target_save = [] # needed between phase 3 and 4

    phases = []
    phase = phases.append
    @phase
    def do_scanner_phase():
        # wait for wait()
        if keep_running: return 
        if DEBUG_PHASE: print 'initiated phase 1 = waiting'
        reset_state()
        phases.pop(0)
        if not targets:
            give_up()
    @phase
    def do_scanner_phase():
        # wait to timeout without exiting wait
        # send ip packets to the host to enable 
        if not state_has_not_changed_for(timeout): return
        if DEBUG_PHASE: print 'initiated phase 2 = send packets'
        send_packets_to_addresses_to_sniff_mac()
        reset_state()
        phases.pop(0)
        if not targets:
            give_up()
    @phase
    def do_scanner_phase():
        # wait to timeout without exiting wait
        # set all ip hosts to have all mac addresses
        if not state_has_not_changed_for(timeout): return
        if DEBUG_PHASE: print 'initiated phase 3 = send to all'
        target_save.extend(targets[:])
        associate_all_ip_with_all_mac_addresses()
        reset_state()
        phases.pop(0)
        if not targets:
            give_up()
    @phase
    def do_scanner_phase():
        # wait to timeout without exiting wait
        # start broadcasting instead of using real mac address
        if not state_has_not_changed_for(timeout): return
        if DEBUG_PHASE: print 'initiated phase 4 = broadcast'
        if add_broadcast_to_all_mac_addresses():
            with targets_lock:
                targets.extend(target_save)
            reset_state()
        give_up()
    @phase
    def do_scanner_phase():
        # wait to timeout without exiting wait
        # give up
        if not state_has_not_changed_for(timeout): return
        if DEBUG_PHASE: print 'initiated phase 5 = give up'
        for device_name in scanning.keys():
            scanning.pop(device_name)
        reset_state()
        phases.insert(0, phases.pop(-1))
    @phase
    def do_scanner_phase():
        pass

    def give_up():
        phases.insert(0, phases.pop(-2))

    def send_packets_to_addresses_to_sniff_mac():
        udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        for host in target_hosts:
            send_udp(udp_sock, host)
        try:
            raw_sock = socket.socket(socket.AF_INET, socket.SOCK_RAW)
        except:
            sys.stderr.write('higher previleges needed to perform raw socket packet send\n')
            return
        for target in targets:
            send_raw(raw_sock, target)

    def send_raw(raw_sock, (family, addr)):
        if family == socket.AF_INET:
            send_raw_ipv4(raw_sock, addr)
        elif family == socket.AF_INET6:
            pass # todo: ipv6
        else:
            raise ValueError('invalid family %s' % (family,))

    def send_raw_ipv4(raw_sock, addr):
        for tcp in iter_tcp_packets(addr):
            if DEBUG_PHASE2: print 'sending tcp raw', repr(tcp.get_packet()), addr
            try:
                raw_sock.sendto(tcp.get_packet(), addr)
            except ():
                pass


    def send_udp(s, host):
        # send an udp packet to sniff mac address

        try:
            s.sendto(':)', (host, random.randint(0, 0xffff)))
        except socket_error as e:
            if DEBUG_PHASE2: print 'failed: send to %r %s' % (host, e)
        else:
            if DEBUG_PHASE2: print 'succeded: send to %r' % (host,)
        s.close()


    def associate_all_ip_with_all_mac_addresses():
        macs = set()
        for mac in mac_addresses.values():
            macs.update(mac)
        for mac in mac_addresses.values():
            mac.update(macs)
        if DEBUG_PHASE: print 'macs:', [mac for mac in macs]

    def add_broadcast_to_all_mac_addresses():
        updated_mac = False
        BC = ('B', ETHER_BROADCAST)
        for mac in mac_addresses.values():
            updated_mac = updated_mac or not BC in mac
            mac.add(('B', ETHER_BROADCAST))
        return updated_mac


    def do_notify():
        t = time.time()
        notified = set()
        for target in targets[:]:
            ip = target[1][IP]
            if ip in notified:
                continue
            if DEBUG_SYN:
                print 'nofifying %s' % ip,
            if ip_known(ip):
                if DEBUG_SYN:print 'send_syn', target[PORT]
                send_syn(*target)
                targets.remove(target)
            else:
                if DEBUG_SYN:print 'notify'
                notify(*target)
                notified.add(ip)
        t -= time.time() - NOTIFY_TIMEOUT
        if t > 0:
            time.sleep(t)

    def start_notify_loop():
        thread.start_new(notify_loop, ())

    store_ip_mac_resolution_for(socket.gethostname())
    start_scans()
    start_notify_loop()

    return obj(wait = wait, add_scan = add_scan)


def main():
    host, ports, timeout = parseArgs(DEFAULT_SOCKET_TIMEOUT)
    scanner = start_scan(timeout)
    for connection in connections(host, ports):
        scanner.add_scan(connection)
    scanner.wait()

if __name__ == '__main__':
    main()

Here is a sample code written in Python with working pcapy. This might be of help for many.

'''
Packet sniffer in python using the pcapy python library

Project website
http://oss.coresecurity.com/projects/pcapy.html
'''

import socket
from struct import *
import datetime
import pcapy
import sys
import socket

def main(argv):
    #list all devices
    devices = pcapy.findalldevs()
    print devices

    errbuf = ""

    #ask user to enter device name to sniff
    print "Available devices are :"
    for d in devices :
        print d

    dev = raw_input("Enter device name to sniff : ")

    print "Sniffing device " + dev

    '''
    open device
    # Arguments here are:
    #   device
    #   snaplen (maximum number of bytes to capture _per_packet_)
    #   promiscious mode (1 for true)
    #   timeout (in milliseconds)
    '''
    socket.setdefaulttimeout(2)
    s = socket.socket();
    #s.settimeout(100);    

    #dev = 'eth0' 
    cap = pcapy.open_live(dev , 65536 , 1 , 1000)

   #start sniffing packets
    while(1) :
        (header, packet) = cap.next()
        #print ('%s: captured %d bytes, truncated to %d bytes' %(datetime.datetime.now(), header.getlen(), header.getcaplen()))
        parse_packet(packet)

    #start sniffing packets
    #while(1) :

    #print ('%s: captured %d bytes, truncated to %d bytes' %(datetime.datetime.now(), header.getlen(), header.getcaplen()))


#Convert a string of 6 characters of ethernet address into a dash separated hex string
def eth_addr (a) :
    b = "%.2x:%.2x:%.2x:%.2x:%.2x:%.2x" % (ord(a[0]) , ord(a[1]) , ord(a[2]), ord(a[3]), ord(a[4]) , ord(a[5]))
    return b

#function to parse a packet
def parse_packet(packet) :

    #parse ethernet header
    eth_length = 14

    eth_header = packet[:eth_length]
    eth = unpack('!6s6sH' , eth_header)
    eth_protocol = socket.ntohs(eth[2])
    print 'Destination MAC : ' + eth_addr(packet[0:6]) + ' Source MAC : ' + eth_addr(packet[6:12]) + ' Protocol : ' + str(eth_protocol)

    #Parse IP packets, IP Protocol number = 8
    if eth_protocol == 8 :
        #Parse IP header
        #take first 20 characters for the ip header
        ip_header = packet[eth_length:20+eth_length]

        #now unpack them :)
        iph = unpack('!BBHHHBBH4s4s' , ip_header)

        version_ihl = iph[0]
        version = version_ihl >> 4
        ihl = version_ihl & 0xF

        iph_length = ihl * 4

        ttl = iph[5]
        protocol = iph[6]
        s_addr = socket.inet_ntoa(iph[8]);
        d_addr = socket.inet_ntoa(iph[9]);

        print 'Version : ' + str(version) + ' IP Header Length : ' + str(ihl) + ' TTL : ' + str(ttl) + ' Protocol : ' + str(protocol) + ' Source Address : ' + str(s_addr) + ' Destination Address : ' + str(d_addr)

        #TCP protocol
        if protocol == 6 :
            t = iph_length + eth_length
            tcp_header = packet[t:t+20]

            #now unpack them :)
            tcph = unpack('!HHLLBBHHH' , tcp_header)

            source_port = tcph[0]
            dest_port = tcph[1]
            sequence = tcph[2]
            acknowledgement = tcph[3]
            doff_reserved = tcph[4]
            tcph_length = doff_reserved >> 4

            print 'Source Port : ' + str(source_port) + ' Dest Port : ' + str(dest_port) + ' Sequence Number : ' + str(sequence) + ' Acknowledgement : ' + str(acknowledgement) + ' TCP header length : ' + str(tcph_length)

            h_size = eth_length + iph_length + tcph_length * 4
            data_size = len(packet) - h_size

            #get data from the packet
            data = packet[h_size:]

            #print 'Data : ' + data

        #ICMP Packets
        elif protocol == 1 :
            u = iph_length + eth_length
            icmph_length = 4
            icmp_header = packet[u:u+4]

            #now unpack them :)
            icmph = unpack('!BBH' , icmp_header)

            icmp_type = icmph[0]
            code = icmph[1]
            checksum = icmph[2]

            print 'Type : ' + str(icmp_type) + ' Code : ' + str(code) + ' Checksum : ' + str(checksum)

            h_size = eth_length + iph_length + icmph_length
            data_size = len(packet) - h_size

            #get data from the packet
            data = packet[h_size:]

            #print 'Data : ' + data

        #UDP packets
        elif protocol == 17 :
            u = iph_length + eth_length
            udph_length = 8
            udp_header = packet[u:u+8]

            #now unpack them :)
            udph = unpack('!HHHH' , udp_header)

            source_port = udph[0]
            dest_port = udph[1]
            length = udph[2]
            checksum = udph[3]

            print 'Source Port : ' + str(source_port) + ' Dest Port : ' + str(dest_port) + ' Length : ' + str(length) + ' Checksum : ' + str(checksum)

            h_size = eth_length + iph_length + udph_length
            data_size = len(packet) - h_size

            #get data from the packet
            data = packet[h_size:]

            #print 'Data : ' + data

        #some other IP packet like IGMP
        else :
            print 'Protocol other than TCP/UDP/ICMP'

        print

if __name__ == "__main__":
  main(sys.argv)
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top