Recipe14.12.Aggregating RSS Feeds


Recipe 14.12. Aggregating RSS Feeds

Credit: Valentino Volonghi, Peter Cogolo

Problem

You need to aggregate potentially very high numbers of RSS feeds, with top performance and scalability.

Solution

Parsing RSS feeds in Python is best done with Mark Pilgrim's Universal Feed Parser from http://www.feedparser.org, but aggregation requires a lot of network activity, in addition to parsing.

As for any network task demanding high performance, Twisted is a good starting point. Say that you have in out.py a module that binds a huge list of RSS feed names to a variable named rss_feed, each feed name represented as a tuple consisting of a URL and a description (e.g., you can download a module exactly like this from http://xoomer.virgilio.it/dialtone/out.py.). You can then build an aggregator server on top of that list, as follows:

#!/usr/bin/python from twisted.internet import reactor, protocol, defer from twisted.web import client import feedparser, time, sys, cStringIO from out import rss_feed as rss_feeds DEFERRED_GROUPS = 60       # Number of simultaneous connections INTER_QUERY_TIME = 300     # Max Age (in seconds) of each feed in the cache TIMEOUT = 30               # Timeout in seconds for the web request # dict cache's structure will be the following: { 'URL': (TIMESTAMP, value) } cache = {  } class FeederProtocol(object):     def _ _init_ _(self):         self.parsed = 0         self.error_list = [  ]     def isCached(self, site):         ''' do we have site's feed cached (from not too long ago)? '''         # how long since we last cached it (if never cached, since Jan 1 1970)         elapsed_time = time.time( ) - cache.get(site, (0, 0))[0]         return elapsed_time < INTER_QUERY_TIME     def gotError(self, traceback, extra_args):         ''' an error has occurred, print traceback info then go on '''         print traceback, extra_args         self.error_list.append(extra_args)     def getPageFromMemory(self, data, addr):         ''' callback for a cached page: ignore data, get feed from cache '''         return defer.succeed(cache[addr][1])     def parseFeed(self, feed):         ''' wrap feedparser.parse to parse a string '''         try: feed+''         except TypeError: feed = str(feed)         return feedparser.parse(cStringIO.StringIO(feed))     def memoize(self, feed, addr):         ''' cache result from feedparser.parse, and pass it on '''         cache[addr] = time.time( ), feed         return feed     def workOnPage(self, parsed_feed, addr):         ''' just provide some logged feedback on a channel feed '''         chan = parsed_feed.get('channel', None)         if chan:             print chan.get('title', '(no channel title?)')         return parsed_feed     def stopWorking(self, data=None):         ''' just for testing: we close after parsing a number of feeds.             Override depending on protocol/interface you use to communicate             with this RSS aggregator server.         '''         print "Closing connection number %d..." % self.parsed         print "=-"*20         self.parsed += 1         print 'Parsed', self.parsed, 'of', self.END_VALUE         if self.parsed >= self.END_VALUE:             print "Closing all..."             if self.error_list:                 print 'Observed', len(self.error_list), 'errors'                 for i in self.error_list:                     print i             reactor.stop( )     def getPage(self, data, args):         return client.getPage(args, timeout=TIMEOUT)     def printStatus(self, data=None):         print "Starting feed group..."     def start(self, data=None, standalone=True):         d = defer.succeed(self.printStatus( ))         for feed in data:             if self.isCached(feed):                 d.addCallback(self.getPageFromMemory, feed)                 d.addErrback(self.gotError, (feed, 'getting from memory'))             else:                 # not cached, go and get it from the web directly                 d.addCallback(self.getPage, feed)                 d.addErrback(self.gotError, (feed, 'getting'))                 # once gotten, parse the feed and diagnose possible errors                 d.addCallback(self.parseFeed)                 d.addErrback(self.gotError, (feed, 'parsing'))                 # put the parsed structure in the cache and pass it on                 d.addCallback(self.memoize, feed)                 d.addErrback(self.gotError, (feed, 'memoizing'))             # now one way or another we have the parsed structure, to             # use or display in whatever way is most appropriate             d.addCallback(self.workOnPage, feed)             d.addErrback(self.gotError, (feed, 'working on page'))             # for testing purposes only, stop working on each feed at once             if standalone:                 d.addCallback(self.stopWorking)                 d.addErrback(self.gotError, (feed, 'while stopping'))         if not standalone:             return d class FeederFactory(protocol.ClientFactory):     protocol = FeederProtocol( )     def _ _init_ _(self, standalone=False):         self.feeds = self.getFeeds( )         self.standalone = standalone         self.protocol.factory = self         self.protocol.END_VALUE = len(self.feeds) # this is just for testing         if standalone:             self.start(self.feeds)     def start(self, addresses):         # Divide into groups all the feeds to download         if len(addresses) > DEFERRED_GROUPS:             url_groups = [[  ] for x in xrange(DEFERRED_GROUPS)]             for i, addr in enumerate(addresses):                 url_groups[i%DEFERRED_GROUPS].append(addr[0])         else:             url_groups = [[addr[0]] for addr in addresses]         for group in url_groups:             if not self.standalone:                 return self.protocol.start(group, self.standalone)             else:                 self.protocol.start(group, self.standalone)     def getFeeds(self, where=None):         # used for a complete refresh of the feeds, or for testing purposes         if where is None:             return rss_feeds         return None if _ _name_ _=="_ _main_ _":     f = FeederFactory(standalone=True)     reactor.run( )

Discussion

RSS is a lightweight XML format designed for sharing headlines, news, blogs, and other web contents. Mark Pilgrim's Universal Feed Parser (http://www.feedparser.org) does a great job of parsing "feeds" that can be in various dialects of RSS format into a uniform memory representation based on Python dictionaries. This recipe builds on top of feedparser to provide a full-featured RSS aggregator.

This recipe is scalable to very high numbers of feeds and is usable in multiclient environments. Both characteristics depend essentially on this recipe being built with the powerful Twisted framework for asynchronous network programming. A simple web interface built with Nevow (from http://www.nevow.com) is also part of the latest complete package for this aggregator, which you can download from my blog at http://vvolonghi.blogspot.com/.

An important characteristic of this recipe's code is that you can easily set the following operating parameters to improve performance:

  • Number of parallel connections to use for feed downloading

  • Timeout for each feed request

  • Maximum age of a feed in the aggregator's cache

Being able to set these parameters helps you balance performance, network load, and load on the machine on which you're running the aggregator.

See Also

Universal Feed Parser is at http://www.feedparser.org; the latest version of this RSS aggregator is at http://vvolonghi.blogspot.com/; Twisted is at http://twistedmatrix.com/.



Python Cookbook
Python Cookbook
ISBN: 0596007973
EAN: 2147483647
Year: 2004
Pages: 420

flylib.com © 2008-2017.
If you may any questions please contact us: flylib@qtcs.net