17/07/2011

讓Python 實現 P2P 功能[上]


The P2P Framework Implementation (Python version)


This page walks through a Python implementation of the P2P framework library itself. I will assume that you are familiar with Python. I will also assume you are familiar with the general concepts of socket programming, though you may not necessarily have used the Python networking or threading libraries. The complete source code may be downloaded here: btpeer.py.

Initializing a peer

Let us first examine how a peer node is initialized. As discussed above, the overall operation of a node is managed by the Peer class. The constructor of the class stores the canonical identifier (name) of the peer, the port on which it listens for connections, and the maximum size of the list of known peers that the node will maintain (this can be set to 0 to allow an unlimited number of peers in the list). The constructor also initializes several fields whose use is described in the following subsections - shutdownhandlers, and router. The following definition illustrates the Python code for accomplishing these tasks:
    def __init__( self, maxpeers, serverport, myid=None, serverhost = None ):
	self.debug = 0

	self.maxpeers = int(maxpeers)
	self.serverport = int(serverport)

        # If not supplied, the host name/IP address will be determined
	# by attempting to connect to an Internet host like Google.
	if serverhost: self.serverhost = serverhost
	else: self.__initserverhost()

        # If not supplied, the peer id will be composed of the host address
        # and port number
	if myid: self.myid = myid
	else: self.myid = '%s:%d' % (self.serverhost, self.serverport)

        # list (dictionary/hash table) of known peers
	self.peers = {}  

        # used to stop the main loop
	self.shutdown = False  

	self.handlers = {}
	self.router = None
    # end constructor
Every peer node performs operations common to traditional network client and server applications. First, let us walk through the server-related operations, as described in the previous section.

The main loop

The main loop of a peer begins by setting up a socket that listens for incoming connections from other peers. To do this, we must (1) create the socket object, (2) set its options, (3) bind it to a port on which to listen for connections, and (4) actually begin listening for connections. Here is a Python method that accomplishes this, returning the initialized socket:

    def makeserversocket( self, port, backlog=5 ):
	s = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
	s.setsockopt( socket.SOL_SOCKET, socket.SO_REUSEADDR, 1 )
	s.bind( ( '', port ) )
	s.listen( backlog )
	return s

The first statement creates a socket that will communicate using the IPv4 (AF_INET) protocol with TCP (SOCK_STREAM). By setting the SO_REUSEADDR option, the port number of the socket will be immediately reusable after the socket is closed (otherwise the operating system may prevent the port from being reused after the server exits, until a certain amount of time has passed). Then the socket is bound to the specified port and is set up to receive connections. The backlog parameter indicates how many incoming connections should be queued up. A value of 5 is conventionally used, although with a multithreaded server (as we are building) the parameter's value is not very significant.
Having created a server socket, the main loop of a peer loops continously, accepting connections. When an incoming connection is accepted, the server will have a new socket object used to send and receive data on the connection. The main loop then calls a separate method to handle communication with this connection in a new thread. A simple for of the main loop would thus look like this:

  s = self.makeserversocket( self.serverport )
  
  while 1:
     clientsock, clientaddr = s.accept()
  
     t = threading.Thread( target = self.__handlepeer, args = [ clientsock ] )
     t.start()

In reality, we also need to handle any errors that may occur in the process of accepting a connection, and we need to provide a mechanism so that the loop may somehow be nicely terminated (for example, when the user indicates that the program should exit). To do this, we set up the server socket to time out every 2 seconds (an arbitrary choice) and make the loop termination condition dependent on a boolean (instance) variable, shutdown. Also, we set up an exception handler to allow the main loop to be stopped by the user pressing the "Ctrl"+"Break" (or "Ctrl"+"c") keys. Here, then, is the complete mainloop method.
    def mainloop( self ):
	s = self.makeserversocket( self.serverport )
	s.settimeout(2)
	self.__debug( 'Server started: %s (%s:%d)'
		      % ( self.myid, self.serverhost, self.serverport ) )

	while not self.shutdown:
	    try:
		self.__debug( 'Listening for connections...' )
		clientsock, clientaddr = s.accept()
		clientsock.settimeout(None)

		t = threading.Thread( target = self.__handlepeer, args = [ clientsock ] )
		t.start()
	    except KeyboardInterrupt:
		self.shutdown = True
		continue
	    except:
		if self.debug:
		    traceback.print_exc()
		    continue
	# end while loop

	self.__debug( 'Main loop exiting' )
	s.close()
    # end mainloop method
The debug method will output various messages to an appropriate location - for example, the screen or a log file. The myid field is the identifier of the peer, and theserverhost field stores the peer's IP address (or host name). These values are initialized by the constructor for the peer object.

Handling a peer connection

The handlepeer method takes a newly formed peer connection, reads in a request from it, and dispatches the request to an appropriate handler (function or method) for processing. The particular handlers and types of requests will be specified by the programmer using this framework to implement a particular protocol. Thehandlepeer method simply looks for the appropriate handler for a message, if there is one registered with the peer object, and calls it.
handlepeer begins by encapsulating the socket connection in a PeerConnection object, to allow easy sending/receiving and encoding/decoding of P2P messages in the system.

	host, port = clientsock.getpeername()
	peerconn = BTPeerConnection( None, host, port, clientsock, debug=False )

Then, handlepeer attempts to receive some data from the connection and determine what to do with it:

	    msgtype, msgdata = peerconn.recvdata()
	    if msgtype: msgtype = msgtype.upper()
	    if msgtype not in self.handlers:
		self.__debug( 'Not handled: %s: %s' % (msgtype, msgdata) )
	    else:
		self.__debug( 'Handling peer msg: %s: %s' % (msgtype, msgdata) )
		self.handlers[ msgtype ]( peerconn, msgdata )

The handlers field is a dictionary (hash table), mapping message types (4-character strings) to function pointers. If the message type has a corresponding entry in the dictionary, the function pointer is extracted and invoked, passing it the PeerConnection object and the actual data of the message. Upon completion of Before returning, handlepeer closes the connection. Here, then, is the complete definition of the method:
    def __handlepeer( self, clientsock ):
	self.__debug( 'Connected ' + str(clientsock.getpeername()) )

	host, port = clientsock.getpeername()
	peerconn = BTPeerConnection( None, host, port, clientsock, debug=False )
	
	try:
	    msgtype, msgdata = peerconn.recvdata()
	    if msgtype: msgtype = msgtype.upper()
	    if msgtype not in self.handlers:
		self.__debug( 'Not handled: %s: %s' % (msgtype, msgdata) )
	    else:
		self.__debug( 'Handling peer msg: %s: %s' % (msgtype, msgdata) )
		self.handlers[ msgtype ]( peerconn, msgdata )
	except KeyboardInterrupt:
	    raise
	except:
	    if self.debug:
		traceback.print_exc()
	
	self.__debug( 'Disconnecting ' + str(clientsock.getpeername()) )
	peerconn.close()

    # end handlepeer method
^ TOP

Routing and sending messages

Using the addrouter method, the programmer may register a routing function (or method) with the Peer class to help decide how messages should be forwarded, given a destination peer id. The routing function should expect as a paremeter the name of a peer (which may not necessarily be present in the list of known peers of the node), and decide which of the known peer the message should be routed to next in order to (hopefully) reach the desired peer. The routing function should return a tuple of three values: (next-peer-id, host, port) where the host and port are the IP address of the peer identified by next-peer-id. If the message cannot be routed, the next-peer-id should be None.
The sendtopeer method takes a message type and data, along with a destination peer id, and uses the routing function to decide where to send the message next. If no routing function has been registered by the programmer, or if the routing function fails for some reason, the method fails. If the routing function successfully returns the next host/port combination to which the message should be sent, sendtopeer calls the connectandsend method to actually make the connection to the peer, package up, and send the data. If the programmer desires to receive a response from the next peer before the communication socket is closed, it will be returned by these methods.
    def sendtopeer( self, peerid, msgtype, msgdata, waitreply=True ):
	if self.router:
	    nextpid, host, port = self.router( peerid )
	if not self.router or not nextpid:
	    self.__debug( 'Unable to route %s to %s' % (msgtype, peerid) )
	    return None
	return self.connectandsend( host, port, msgtype, msgdata, pid=nextpid,
				    waitreply=waitreply )

    # end sendtopeer method
The connectandsend method connects and sends a message to a peer at the specified IP address and port. The host's reply, if desired by the caller, will be returned as a list of pairs, where each pair contains (1) the type and (2) the actual data of the message.
    def connectandsend( self, host, port, msgtype, msgdata, pid=None, waitreply=True ):
	msgreply = []   # list of replies
	try:
	    peerconn = BTPeerConnection( pid, host, port, debug=self.debug )
	    peerconn.senddata( msgtype, msgdata )
	    self.__debug( 'Sent %s: %s' % (pid, msgtype) )
	    
	    if waitreply:
		onereply = peerconn.recvdata()
		while (onereply != (None,None)):
		    msgreply.append( onereply )
		    self.__debug( 'Got reply %s: %s' % ( pid, str(msgreply) ) )
		    onereply = peerconn.recvdata()
	    peerconn.close()
	except KeyboardInterrupt:
	    raise
	except:
	    if self.debug:
		traceback.print_exc()
	
	return msgreply

    # end connectsend method
^ TOP

Additional methods

The Peer class provides methods supporting other fundamental functionalities of a peer node. Briefly, these include:
  • startstabilizer(stabilizer, delay): Runs the provided 'stabilizer' function in a separate thread, activating it repeatedly after every delay seconds, until theshutdown flag of the Peer object is set.
  • addhandler(msgtype, handler): Registers a handler function for the given message type with the Peer object. Only one handler function may be provided per message type. Message types do not have to be defined in advance of calling this method.
  • addrouter(router): Registers a routing function with this peer. Read the section on routing above for details.
  • addpeer(peerid, host, port): Adds a peer name and IP address/port mapping to the known list of peers.
  • getpeer(peerid): Returns the (host,port) pair for the given peer name.
  • removepeer(peerid): Removes the entry corresponding to the supplied peerid from the list of known pairs.
  • addpeerat(loc, peerid, host, port)getpeerat(loc)removepeerat(loc): Analogous to the prior three methods, except that they access direct (numeric) positions within the list of peers (as opposed to using the peerid as a hash key). These functions should not be used concurrently with the prior three.
  • getpeerids(): Return a list of all known peer ids.
  • numberofpeers():
  • maxpeersreached():
  • checklivepeers(): Attempt to connect and send a 'PING' message to all currently known peers to ensure that they are still alive. For any connections that fail, the corresponding entry is removed from the list. This method can be used as a simple 'stabilizer' function for a P2P protocol.

The PeerConnection class

The PeerConnection class encapsulates a socket connection to a peer node. An object of this class may be initialized with an already-connected socket, or with an IP address/port combination that will be used to open a new socket connection.