Recipe13.11.Detecting Inactive Computers


Recipe 13.11. Detecting Inactive Computers

Credit: Nicola Larosa

Problem

You need to monitor the working state of a number of computers connected to a TCP/IP network.

Solution

The key idea in this recipe is to have every computer periodically send a heartbeat UDP packet to a computer acting as the server for this heartbeat-monitoring service. The server keeps track of how much time has passed since each computer last sent a heartbeat and reports on computers that have been silent for too long.

Here is the "client" program, HeartbeatClient.py, which must run on every computer we need to monitor:

""" Heartbeat client, sends out a UDP packet periodically """ import socket, time SERVER_IP = '192.168.0.15'; SERVER_PORT = 43278; BEAT_PERIOD = 5 print 'Sending heartbeat to IP %s , port %d' % (SERVER_IP, SERVER_PORT) print 'press Ctrl-C to stop' while True:     hbSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)     hbSocket.sendto('PyHB', (SERVER_IP, SERVER_PORT))     if _ _debug_ _:         print 'Time: %s' % time.ctime( )     time.sleep(BEAT_PERIOD)

The server program, which receives and keeps track of these "heartbeats", must run on the machine whose address is given as SERVER_IP in the "client" program. The server must support concurrency, since many heartbeats from different computers might arrive simultaneously. A server program has essentially two ways to support concurrency: multithreading, or asynchronous operation. Here is a multithreaded ThreadedBeatServer.py, using only modules from the Python Standard Library:

""" Threaded heartbeat server """ import socket, threading, time UDP_PORT = 43278; CHECK_PERIOD = 20; CHECK_TIMEOUT = 15 class Heartbeats(dict):     """ Manage shared heartbeats dictionary with thread locking """     def _ _init_ _(self):         super(Heartbeats, self)._ _init_ _( )         self._lock = threading.Lock( )     def _ _setitem_ _(self, key, value):         """ Create or update the dictionary entry for a client """         self._lock.acquire( )         try:             super(Heartbeats, self)._ _setitem_ _(key, value)         finally:             self._lock.release( )     def getSilent(self):         """ Return a list of clients with heartbeat older than CHECK_TIMEOUT """         limit = time.time( ) - CHECK_TIMEOUT         self._lock.acquire( )         try:             silent = [ip for (ip, ipTime) in self.items( ) if ipTime < limit]         finally:             self._lock.release( )         return silent class Receiver(threading.Thread):     """ Receive UDP packets and log them in the heartbeats dictionary """     def _ _init_ _(self, goOnEvent, heartbeats):         super(Receiver, self)._ _init_ _( )         self.goOnEvent = goOnEvent         self.heartbeats = heartbeats         self.recSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)         self.recSocket.settimeout(CHECK_TIMEOUT)         self.recSocket.bind(('', UDP_PORT))     def run(self):         while self.goOnEvent.isSet( ):             try:                 data, addr = self.recSocket.recvfrom(5)                 if data == 'PyHB':                     self.heartbeats[addr[0]] = time.time( )             except socket.timeout:                 pass def main(num_receivers=3):     receiverEvent = threading.Event( )     receiverEvent.set( )     heartbeats = Heartbeats( )     receivers = [  ]     for i in range(num_receivers):         receiver = Receiver(goOnEvent=receiverEvent, heartbeats=heartbeats)         receiver.start( )         receivers.append(receiver)     print 'Threaded heartbeat server listening on port %d' % UDP_PORT     print 'press Ctrl-C to stop'     try:         while True:             silent = heartbeats.getSilent( )             print 'Silent clients: %s' % silent             time.sleep(CHECK_PERIOD)     except KeyboardInterrupt:         print 'Exiting, please wait...'         receiverEvent.clear( )         for receiver in receivers:             receiver.join( )         print 'Finished.' if _ _name_ _ == '_ _main_ _':     main( )

As an alternative, here is an asynchronous AsyncBeatServer.py program based on the powerful Twisted framework:

import time from twisted.application import internet, service from twisted.internet import protocol from twisted.python import log UDP_PORT = 43278; CHECK_PERIOD = 20; CHECK_TIMEOUT = 15 class Receiver(protocol.DatagramProtocol):     """ Receive UDP packets and log them in the "client"s dictionary """     def datagramReceived(self, data, (ip, port)):         if data == 'PyHB':             self.callback(ip) class DetectorService(internet.TimerService):     """ Detect clients not sending heartbeats for too long """     def _ _init_ _(self):         internet.TimerService._ _init_ _(self, CHECK_PERIOD, self.detect)         self.beats = {  }     def update(self, ip):         self.beats[ip] = time.time( )     def detect(self):         """ Log a list of clients with heartbeat older than CHECK_TIMEOUT """         limit = time.time( ) - CHECK_TIMEOUT         silent = [ip for (ip, ipTime) in self.beats.items( ) if ipTime < limit]         log.msg('Silent clients: %s' % silent) application = service.Application('Heartbeat') # define and link the silent clients' detector service detectorSvc = DetectorService( ) detectorSvc.setServiceParent(application) # create an instance of the Receiver protocol, and give it the callback receiver = Receiver( ) receiver.callback = detectorSvc.update # define and link the UDP server service, passing the receiver in udpServer = internet.UDPServer(UDP_PORT, receiver) udpServer.setServiceParent(application) # each service is started automatically by Twisted at launch time log.msg('Asynchronous heartbeat server listening on port %d\n'     'press Ctrl-C to stop\n' % UDP_PORT)

Discussion

When a number of computers are connected by a TCP/IP network, we are often interested in monitoring their working state. The client and server programs presented in this recipe help you detect when a computer stops working, while having minimal impact on network traffic and requiring very little setup. Note that this recipe does not monitor the working state of single, specific services running on a machine, just that of the TCP/IP stack and the underlying operating system and hardware components.

This PyHeartBeat approach is made up of two files: a client program, HeartbeatClient.py, sends UDP packets to the server, while a server program, either ThreadedBeatServer.py (using only modules from the Python Standard Library to implement a multithreaded approach) or AsyncBeatServer.py (implementing an asynchronous approach based on the powerful Twisted framework), runs on a central computer to listen for such packets and detect inactive clients. Client programs, running on any number of computers, periodically send UDP packets to the server program that runs on the central computer. The server program, in either version, dynamically builds a dictionary that stores the IP addresses of the "client" computers and the timestamp of the last packet received from each one. At the same time, the server program periodically checks the dictionary, checking whether any of the timestamps are older than a defined timeout, to identify clients that have been silent too long.

In this kind of application, there is no need to use reliable TCP connections since the loss of a packet now and then does not produce false alarms, as long as the server-checking timeout is kept suitably larger than the "client"-sending period. Since we may have hundreds of computers to monitor, it is best to keep the bandwidth used and the load on the server at a minimum: we do this by periodically sending a small UDP packet, instead of setting up a relatively expensive TCP connection per client.

The packets are sent from each client every 5 seconds, while the server checks the dictionary every 20 seconds, and the server's timeout defaults to 15 seconds. These parameters, along with the server IP number and port used, can be adapted to one's needs.

Threaded server

In the threaded server, a small number of worker threads listen to the UDP packets coming from the "client"s, while the main thread periodically checks the recorded heartbeats. The shared data structure, a dictionary, must be locked and released at each access, both while writing and reading, to avoid data corruption on concurrent access. Such data corruption would typically manifest itself as intermittent, time-dependent bugs that are difficult to reproduce, investigate, and correct.

A very sound alternative to such meticulous use of locking around access to a resource is to dedicate a specialized thread to be the only one interacting with the resource (in this case, the dictionary), while all other threads send work requests to the specialized thread with a Queue.Queue instance. A Queue-based approach is more scalable when per-resource locking gets too complicated to manage easily: Queue is less bug-prone and, in particular, avoids worries about deadlocks. See Recipe 9.3, Recipe 9.5, Recipe 9.4, and Recipe 11.9 for more information about Queue and examples of using Queue to structure the architecture of a multithreaded program.

Asynchronous server

The Twisted server employs an asynchronous, event-driven model based on the Twisted framework (http://www.twistedmatrix.com/). The framework is built around a central "reactor" that dispatches events from a queue in a single thread, and monitors network and host resources. The user program is composed of short code fragments invoked by the reactor when dispatching the matching events. Such a working model guarantees that only one user code fragment is executing at any given time, eliminating at the root all problems of concurrent access to shared data structures. Asynchronous servers can provide excellent performance and scalability under very heavy loads, by avoiding the threading and locking overheads of multithreader servers.

The asynchronous server program presented in this recipe is composed of one application and two services, the UDPServer and the DetectorService, respectively. It is invoked at any command shell by means of the twistd command, with the following options:

$ twistd -ony AsyncBeatServer.py

The twistd command controls the reactor, and many other delicate facets of a server's operation, leaving the script it loads the sole responsibility of defining a global variable named application, implementing the needed services, and connecting the service objects to the application object.

Normally, twistd runs as a daemon and logs to a file (or to other logging facilities, depending on configuration options), but in this case, with the -ony flags, we're specifically asking twistd to run in the foreground and with logging to standard output, so we can better see what's going on. Note that the most popular file extension for scripts to be loaded by twistd is .tac, although in this recipe I have used the more generally familiar extension .py. The choice of file extension is just a convention, in this case: twistd can work with Python source files with any file extension, since you pass the full filename, extension included, as an explicit command-line argument anyway.

See Also

Documentation for the standard library modules socket, tHReading, Queue and time in the Library Reference and Python in a Nutshell; twisted is at http://www.twistedmatrix.com; Jeff Bauer has a related program, known as Mr. Creosote (http://starship.python.net/crew/jbauer/creosote/), using UDP for logging information; UDP is described in depth in W. Richard Stevens, UNIX Network Programming, Volume 1: Networking APIs-Sockets and XTI, 2d ed. (Prentice-Hall); for the truly curious, the UDP protocol is defined in the two-page RFC 768 (http://www.ietf.org/rfc/rfc768.txt), which, when compared with current RFCs, shows how much the Internet infrastructure has evolved in 20 years.



Python Cookbook
Python Cookbook
ISBN: 0596007973
EAN: 2147483647
Year: 2004
Pages: 420

flylib.com © 2008-2017.
If you may any questions please contact us: flylib@qtcs.net