aiohttp

import aiohttp
import asyncio
async def fetch(session, url):
   async with session.get(url) as response:
       return await response.text(), response.status
async def main():
   async with aiohttp.ClientSession() as session:
       html, status = await fetch(session, 'https://cuiqingcai.com')
       print(f'html: {html[:100]}...')
       print(f'status: {status}')
if __name__ == '__main__':
   loop = asyncio.get_event_loop()
   loop.run_until_complete(main())

import aiohttp
import asyncio
async def main():
   params = {'name': 'germey', 'age': 25}
   async with aiohttp.ClientSession() as session:
       async with session.get('https://httpbin.org/get', params=params) as response:
           print(await response.text())
if __name__ == '__main__':
   asyncio.get_event_loop().run_until_complete(main())

session.post('http://httpbin.org/post', data=b'data')
session.put('http://httpbin.org/put', data=b'data')
session.delete('http://httpbin.org/delete')
session.head('http://httpbin.org/get')
session.options('http://httpbin.org/get')
session.patch('http://httpbin.org/patch', data=b'data')

import aiohttp
import asyncio
async def main():
   data = {'name': 'germey', 'age': 25}
   async with aiohttp.ClientSession() as session:
       async with session.post('https://httpbin.org/post', data=data) as response:
           print(await response.text())
if __name__ == '__main__':
   asyncio.get_event_loop().run_until_complete(main())
async def main():
   data = {'name': 'germey', 'age': 25}
   async with aiohttp.ClientSession() as session:
       async with session.post('https://httpbin.org/post', json=data) as response:
           print(await response.text())

import aiohttp
import asyncio
async def main():
   data = {'name': 'germey', 'age': 25}
   async with aiohttp.ClientSession() as session:
       async with session.post('https://httpbin.org/post', data=data) as response:
           print('status:', response.status)
           print('headers:', response.headers)
           print('body:', await response.text())
           print('bytes:', await response.read())
           print('json:', await response.json())
if __name__ == '__main__':
   asyncio.get_event_loop().run_until_complete(main())

设置超时

import aiohttp
import asyncio
async def main():
   timeout = aiohttp.ClientTimeout(total=1)
   async with aiohttp.ClientSession(timeout=timeout) as session:
       async with session.get('https://httpbin.org/get') as response:
           print('status:', response.status)
if __name__ == '__main__':
   asyncio.get_event_loop().run_until_complete(main())

控制并发量

import asyncio
import aiohttp
CONCURRENCY = 5
URL = 'https://www.baidu.com'
semaphore = asyncio.Semaphore(CONCURRENCY)
session = None
async def scrape_api():
   async with semaphore:
       print('scraping', URL)
       async with session.get(URL) as response:
           await asyncio.sleep(1)
           return await response.text()
async def main():
   global session
   session = aiohttp.ClientSession()
   scrape_index_tasks = [asyncio.ensure_future(scrape_api()) for _ in range(10000)]
   await asyncio.gather(*scrape_index_tasks)
if __name__ == '__main__':
   asyncio.get_event_loop().run_until_complete(main())
原文地址:https://www.cnblogs.com/angdh/p/14774827.html