0
点赞
收藏
分享

微信扫一扫

用twisted 发送请求


# coding: utf-8
# @Time : 2022-05-18 13:01
# @Author : AngDH
from twisted.internet import reactor
from twisted.web.client import Agent
from twisted.web.http_headers import Headers

# 相当于客户端角色
agent = Agent(reactor)

defered = agent.request(
b'GET',
b'http://www.baidu.com/s?wd=python',
Headers({"User-Agent": ["python"]}),
None)


def successCallback(response):
print('Response received:', response.code)


defered.addCallback(successCallback)


def errorCallback(error):
print("error:", str(error))


defered.addErrback(errorCallback)


def callbackShutdown(ignored):
reactor.stop()


defered.addBoth(callbackShutdown)

reactor.run()
# coding: utf-8
# @Time : 2022-05-18 13:01
# @Author : AngDH
from twisted.internet import reactor
from twisted.internet.defer import succeed
from twisted.web.client import Agent
from twisted.web.http_headers import Headers
from twisted.web.iweb import IBodyProducer
from zope.interface import implementer


@implementer(IBodyProducer)
class BytesProducer(object):

def __init__(self, body):
self.body = body
self.length = len(body)

def startProducing(self, consumer):
consumer.write(self.body)
return succeed(None)

def pauseProducting(self):
pass

def stopProducing(self):
pass


# 相当于客户端角色
agent = Agent(reactor)

defered = agent.request(
b'POST',
b'http://www.baidu.com/s?wd=python',
Headers({"User-Agent": ["python"]}),
BytesProducer('请求体'.encode()))


def successCallback(response):
print('Response received:', response.code)


defered.addCallback(successCallback)


def errorCallback(error):
print("error:", str(error))


defered.addErrback(errorCallback)


def callbackShutdown(ignored):
reactor.stop()


defered.addBoth(callbackShutdown)

reactor.run()
# coding: utf-8
# @Time : 2022-05-18 13:01
# @Author : AngDH
from twisted.internet import reactor
from twisted.internet.defer import DeferredList
from twisted.web.client import Agent
from twisted.web.http_headers import Headers

# 相当于客户端角色
agent = Agent(reactor)


def successCallback(response):
print('Response received:', response.code)


def errorCallback(error):
print("error:", str(error))


def callbackShutdown(ignored):
reactor.stop()


_ = []

for url in [b"http://www.baidu.com",
b"http://www.taobao.com",
b"http://www.jd.com",
b"http://www.tmall.com",
b"http://www.tencent.com",
b"http://www.douban.com",
]:
d = agent.request(
b'GET',
url,
Headers({"User-Agent": ["python"]}),
None)
d.addCallback(successCallback)
d.addErrback(errorCallback)
_.append(d)

# 统一管理 多个defered对象
dl = DeferredList(_)
dl.addBoth(callbackShutdown)

reactor.run()





举报

相关推荐

0 条评论