I know this is an old question, but I can give you an example of kqueue socket polling that I am using for a multithreaded HTTP server, that I figured out after reading C source code and the man pages for kqueue.
#bsd socket polling
#I make all the relevant flags more C like to match the kqueue man pages
from select import kevent, kqueue
from select import KQ_EV_ADD as EV_ADD, KQ_EV_ONESHOT as EV_ONESHOT
from select import KQ_EV_EOF as EV_EOF
from .common import Client_Thread #a parent class who's implementation is irrelevant to the question, lol
class BSD_Client(Client_Thread):
def __init__(self, *args):
Client_Thread.__init__(self, *args)
#Make a kqueue object for the thread
kq = kqueue()
#Make a one-shot kev for this kqueue for when the kill socket is
#connected to. The connection is only made once, so why not tell
#that to our kqueue? The default filter is EVFILT_READ, so we don't
#need to specify that. The default flag is just EV_ADD.
kill_kev = kevent(self.kill_fd, flags=EV_ADD|EV_ONESHOT)
#using defaults for the client socket.
client_kev = kevent(self.client_sock)
#we only need to keep track of the kqueue's control func.
#This also makes things prettier in the run func.
self.control = kq.control
#now, we add thel list of events we just made to our kqueue.
#The first 0 means we want a list of at most 0 length in return.
#the second 0 means we want no timeout (i.e. do this in a
#non-blocking way.)
self.control([client_kev, kill_kev], 0, 0)
def run(self):
while True:
#Here we poll the kqueue object.
#The empty list means we are adding no new events to the kqueue.
#The one means we want a list of at most 1 element. Then None
#Means we want block until an event is triggered.
events = self.control([], 1, None)
#If we have an event, and the event is for the kill socket
#(meaning somebody made a connection to it), then we break the
#loop and die.
if events and events[0].ident == self.kill_fd:
self.die()
break
#If all that is left is an EOF in our socket, then we break
#the loop and die. Kqueues will keep returning a kevent
#that has been read once, even when they are empty.
if events and events[0].flags & EV_EOF:
self.die()
break
#Finally, if we have an event that isn't for the kill socket and
#does not have the EOF flag set, then there is work to do. If
#the handle client function (defined in the parent class) returns
#1, then we are done serving a page and we can die.
if events and self.handle_client():
self.die()
break
client.close()
All self.die does is put
s the clients ip:port string onto a Queue used for messaging. A different thread get
s that string from the queue, prints a message and join
s the relevant thread object. Of course, I am not using an pipes for this, only sockets. I did find this on an online manpage for kqueue though
Fifos, Pipes
Returns when the there is data to read; data contains the number of
bytes available.
When the last writer disconnects, the filter will set EV_EOF in
flags. This may be cleared by passing in EV_CLEAR, at which point the
filter will resume waiting for data to become available before re-
turning
So perhaps in your udp server, where you loop through the revents list, you should do as the man page says? Actually, you don't even need to loop through a list that is a most 1 long. Perhaps your listen function should look something like this...
def listen(self, ip, port):
print "Starting!"
sock = socket.socket(AF_INET, SOCK_DGRAM)
sock.bind((ip, port))
kq = select.kqueue()
kev0 = select.kevent(sock)
kev1 = select.kevent(self.pipe)
kq.control([kev0, kev1], 0, 0)
while True: #this loop never breaks! so this whole function blocks forever like this
revents = kq.control([], 1, None)
if revents:
event = revents[0]
if event.flags & select.KQ_EV_EOF:
new_event = select.kevent(event.ident, flags=select.KQ_EV_CLEAR)
kq.control([new_event], 0, 0)
else:
print event
I really recommend importing the flags and functions the way I do though, It makes it more similar to the C based manpages you will have to compare to, and I think it looks prettier. I also want to point out that my class is a bit different from what you have, because each new client is going to get an instance of this, and each will run in it's own thread.