对于大量的数据采集除了多线程,就只有异步来实现了

上一篇文章里我写了有关多线程的实现,对于异步的由于本人对python的学习还不是很深入还不能自己写出来,刚好看到一个篇使用twisted异步采集的文章,就搬过来给大家分享一下。

Async Batching with Twisted: A Walkthrough

Example 1: Just a DefferedList

from twisted.internet import reactor
from twisted.web.client import getPage
from twisted.internet.defer import DeferredList
 
def listCallback(results):
  print results
 
def finish(ign):
  reactor.stop()
 
def test():
  d1 = getPage('http://www.google.com')
  d2 = getPage('http://yahoo.com')
  dl = DeferredList([d1, d2])
  dl.addCallback(listCallback)
  dl.addCallback(finish)
 
test()
reactor.run()

This is one of the simplest examples you’ll ever see for a deferred list in action. Get two deferreds (the getPage function returns a deferred) and use them to created a deferred list. Add callbacks to the list, garnish with a lemon.

Example 2: Simple Result Manipulation

from twisted.internet import reactor
from twisted.web.client import getPage
from twisted.internet.defer import DeferredList
 
def listCallback(results):
  for isSuccess, content in results:
    print "Successful? %s" % isSuccess
    print "Content Length: %s" % len(content)
 
def finish(ign):
  reactor.stop()
 
def test():
  d1 = getPage('http://www.google.com')
  d2 = getPage('http://yahoo.com')
  dl = DeferredList([d1, d2])
  dl.addCallback(listCallback)
  dl.addCallback(finish)
 
test()
reactor.run()

We make things a little more interesting in this example by doing some processing on the results. For this to make sense, just remember that a callback gets passed the result when the deferred action completes. If we look up the API documentation for DeferredList, we see that it returns a list of (success, result) tuples, where success is a Boolean and result is the result of a deferred that was put in the list (remember, we’ve got two layers of deferreds here!).

Example 3: Page Callbacks Too

from twisted.internet import reactor
from twisted.web.client import getPage
from twisted.internet.defer import DeferredList
 
def pageCallback(result):
  return len(result)
 
def listCallback(result):
  print result
 
def finish(ign):
  reactor.stop()
 
def test():
  d1 = getPage('http://www.google.com')
  d1.addCallback(pageCallback)
  d2 = getPage('http://yahoo.com')
  d2.addCallback(pageCallback)
  dl = DeferredList([d1, d2])
  dl.addCallback(listCallback)
  dl.addCallback(finish)
 
test()
reactor.run()

Here, we mix things up a little bit. Instead of doing processing on all the results at once (in the deferred list callback), we’re processing them when the page callbacks fire. Our processing here is just a simple example of getting the length of the getPage deferred result: the HTML content of the page at the given URL.

Example 4: Results with More Structure

from twisted.internet import reactor
from twisted.web.client import getPage
from twisted.internet.defer import DeferredList
 
def pageCallback(result):
  data = {
    'length': len(result),
    'content': result[:10],
    }
  return data
 
def listCallback(result):
  for isSuccess, data in result:
    if isSuccess:
      print "Call to server succeeded with data %s" % str(data)
 
def finish(ign):
  reactor.stop()
 
def test():
  d1 = getPage('http://www.google.com')
  d1.addCallback(pageCallback)
  d2 = getPage('http://yahoo.com')
  d2.addCallback(pageCallback)
  dl = DeferredList([d1, d2])
  dl.addCallback(listCallback)
  dl.addCallback(finish)
 
test()
reactor.run()

A follow-up to the last example, here we put the data in which we are interested into a dictionary. We don’t end up pulling any of the data out of the dictionary; we just stringify it and print it to stdout.

Example 5: Passing Values to Callbacks

from twisted.internet import reactor
from twisted.web.client import getPage
from twisted.internet.defer import DeferredList
 
def pageCallback(result, url):
  data = {
    'length': len(result),
    'content': result[:10],
    'url': url,
    }
  return data
 
def getPageData(url):
  d = getPage(url)
  d.addCallback(pageCallback, url)
  return d
 
def listCallback(result):
  for isSuccess, data in result:
    if isSuccess:
      print "Call to %s succeeded with data %s" % (data['url'], str(data))
 
def finish(ign):
  reactor.stop()
 
def test():
  d1 = getPageData('http://www.google.com')
  d2 = getPageData('http://yahoo.com')
  dl = DeferredList([d1, d2])
  dl.addCallback(listCallback)
  dl.addCallback(finish)
 
test()
reactor.run()

After all this playing, we start asking ourselves more serious questions, like: “I want to decide which values show up in my callbacks” or “Some information that is available here, isn’t available there. How do I get it there?” This is how :-) Just pass the parameters you want to your callback. They’ll be tacked on after the result (as you can see from the function signatures).

In this example, we needed to create our own deferred-returning function, one that wraps the getPage function so that we can also pass the URL on to the callback.

Example 6: Adding Some Error Checking

from twisted.internet import reactor
from twisted.web.client import getPage
from twisted.internet.defer import DeferredList
 
urls = [
  'http://yahoo.com',
  'http://www.google.com',
  'http://www.google.com/MicrosoftRules.html',
  'http://bogusdomain.com',
  ]
 
def pageCallback(result, url):
  data = {
    'length': len(result),
    'content': result[:10],
    'url': url,
    }
  return data
 
def pageErrback(error, url):
  return {
    'msg': error.getErrorMessage(),
    'err': error,
    'url': url,
    }
 
def getPageData(url):
  d = getPage(url, timeout=5)
  d.addCallback(pageCallback, url)
  d.addErrback(pageErrback, url)
  return d
 
def listCallback(result):
  for ignore, data in result:
    if data.has_key('err'):
      print "Call to %s failed with data %s" % (data['url'], str(data))
    else:
      print "Call to %s succeeded with data %s" % (data['url'], str(data))
 
def finish(ign):
  reactor.stop()
 
def test():
  deferreds = []
  for url in urls:
    d = getPageData(url)
    deferreds.append(d)
  dl = DeferredList(deferreds, consumeErrors=1)
  dl.addCallback(listCallback)
  dl.addCallback(finish)
 
test()
reactor.run()

As we get closer to building real applications, we start getting concerned about things like catching/anticipating errors. We haven’t added any errbacks to the deferred list, but we have added one to our page callback. We’ve added more URLs and put them in a list to ease the pains of duplicate code. As you can see, two of the URLs should return errors: one a 404, and the other should be a domain not resolving (we’ll see this as a timeout).

Example 7: Batching with DeferredSemaphore

from twisted.internet import reactor
from twisted.web.client import getPage
from twisted.internet import defer
 
maxRun = 1
 
urls = [
  'http://twistedmatrix.com',
  'http://twistedsoftwarefoundation.org',
  'http://yahoo.com',
  'http://www.google.com',
  ]
 
def listCallback(results):
  for isSuccess, result in results:
    print len(result)
 
def finish(ign):
  reactor.stop()
 
def test():
  deferreds = []
  sem = defer.DeferredSemaphore(maxRun)
  for url in urls:
    d = sem.run(getPage, url)
    deferreds.append(d)
  dl = defer.DeferredList(deferreds)
  dl.addCallback(listCallback)
  dl.addCallback(finish)
 
test()
reactor.run()

These last two examples are for more advanced use cases. As soon as the reactor starts, deferreds that are ready, start “firing” — their “jobs” start running. What if we’ve got 500 deferreds in a list? Well, they all start processing. As you can imagine, this is an easy way to run an accidental DoS against a friendly service. Not cool.

For situations like this, what we want is a way to run only so many deferreds at a time. This is a great use for the deferred semaphore. When I repeated runs of the example above, the content lengths of the four pages returned after about 2.5 seconds. With the example rewritten to use just the deferred list (no deferred semaphore), the content lengths were returned after about 1.2 seconds. The extra time is due to the fact that I (for the sake of the example) forced only one deferred to run at a time, obviously not what you’re going to want to do for a highly concurrent task ;-)

Note that without changing the code and only setting maxRun to 4, the timings for getting the the content lengths is about the same, averaging for me 1.3 seconds (there’s a little more overhead involved when using the deferred semaphore).

One last subtle note (in anticipation of the next example): the for loop creates all the deferreds at once; the deferred semaphore simply limits how many get run at a time.

Example 8: Throttling with Cooperator

from twisted.internet import reactor
from twisted.web.client import getPage
from twisted.internet import defer, task
 
maxRun = 2
 
urls = [
  'http://twistedmatrix.com',
  'http://twistedsoftwarefoundation.org',
  'http://yahoo.com',
  'http://www.google.com',
  ]
 
def pageCallback(result):
  print len(result)
  return result
 
def doWork():
  for url in urls:
    d = getPage(url)
    d.addCallback(pageCallback)
    yield d
 
def finish(ign):
  reactor.stop()
 
def test():
  deferreds = []
  coop = task.Cooperator()
  work = doWork()
  for i in xrange(maxRun):
    d = coop.coiterate(work)
    deferreds.append(d)
  dl = defer.DeferredList(deferreds)
  dl.addCallback(finish)
 
test()
reactor.run()

原文出自http://oubiwann.blogspot.com/2008/06/async-batching-with-twisted-walkthrough.html

虽然现在很多人都说twisted人如其名,写的代码实在是太扭曲了,非正常人所能接受,虽然这个简单的例子看上去还好;每次写twisted的程序整个人都扭曲了,累得不得了,文档等于没有,必须得看源码才知道怎么整。不过我最近也想学习下这个框架,它对网络编程提供了很大的帮助,省去了很多的麻烦!