API
接口进行调用Redis
进行数据存储redis
数据库,并且使用有序集合进行存储,自动去重
# -*- coding:utf-8 -*-
# 这里负责代理IP的采集工作
from proxy_redis import ProxyRedis
from multiprocessing import Process
from concurrent.futures import ThreadPoolExecutor
import requests
from lxml import etree
import time
headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36',
}
def get_kuai_ip(red):
for i in range(1, 100):
url = f'https://free.kuaidaili.com/free/intr/{i}/'
resp = requests.get(url=url, headers=headers)
tree = etree.HTML(resp.text)
trs = tree.xpath('//table//tr')
for tr in trs:
ip = tr.xpath('./td[1]/text()') # IP
port = tr.xpath('./td[2]/text()') # PORT
if not ip:
continue
ip = ip[0]
port = port[0]
proxy_ip = ip + ':' + port
# print(proxy_ip)
red.add_proxy_ip(proxy_ip) # 增加新的ip地址
time.sleep(20)
def get_buzhidao_ip(red):
url = 'https://ip.jiangxianli.com/?page=1'
resp = requests.get(url=url, headers=headers)
tree = etree.HTML(resp.text)
trs = tree.xpath('//table//tr')
for tr in trs:
ip = tr.xpath('./td[1]/text()')
port = tr.xpath('./td[2]/text()')
if not ip:
continue
ip = ip[0]
port = port[0]
proxy_ip = ip + ':' + port
# print(proxy_ip)
red.add_proxy_ip(proxy_ip) # 增加新的ip地址
def run(): # 启动
red = ProxyRedis() # 创建好red存储
t = ThreadPoolExecutor(2)
while True:
try:
t.submit(get_kuai_ip, red) # 采集快代理
t.submit(get_buzhidao_ip, red)
except Exception as e:
print('出现错误 >> ', e)
continue
time.sleep(60) # 每分钟跑一次
if __name__ == '__main__':
run()
Async
异步爬虫提高IP验证速度
# -*- coding:utf-8 -*-
# 负责IP代理的验证工作
import time
from proxy_redis import ProxyRedis
import asyncio
import aiohttp
async def verify_one(ip, sem, red):
timeout = aiohttp.ClientTimeout(total=10) # 10s 没回来就报错
async with sem:
try:
async with aiohttp.ClientSession() as session:
async with session.get("http://www.baidu.com", proxy="http://" + ip, timeout=timeout) as resp:
page_source = await resp.text()
if resp.status in [200, 302]:
print('[*]此IP可用 >> ', ip)
red.set_max_score(ip)
else:
# 有问题,扣分
print('[-]此IP不可用 >> ', ip)
red.des_incrby(ip)
except Exception as e:
print('[-]此IP不可用 >> ', ip)
red.desc_incrby(ip)
async def main(red):
# 1.把ip全部取出
all_proxies = red.get_all_proxy()
sem = asyncio.Semaphore(30)
tasks = []
for ip in all_proxies:
tasks.append(asyncio.create_task(verify_one(ip, sem, red)))
await asyncio.wait(tasks)
def run():
red = ProxyRedis()
time.sleep(10)
while True:
try:
asyncio.run(main(red))
time.sleep(100)
except Exception as e:
print('出现错误 >> ', e)
time.sleep(100)
if __name__ == '__main__':
run()
sanic
库,提供Api接口
# -*- coding:utf-8 -*-
# 负责提供代理IP的接口
from proxy_redis import ProxyRedis
from sanic import Sanic, json
from sanic_cors import CORS
from settings import *
red = ProxyRedis()
app = Sanic('IP')
CORS(app)
@app.route('/')
def clhttp(req):
ip = red.get_keyong_proxy()
return json({'ip': ip})
def run():
app.run(host=API_ADDRESS, port=API_PORT)
if __name__ == '__main__':
run()
Redis
,开启Redis
服务settings.py
文件中设置必要参数runner.py
文件即可用户名 | 金币 | 积分 | 时间 | 理由 |
---|---|---|---|---|
Track-劲夫 | 100.00 | 0 | 2022-08-23 11:11:14 | 一个受益终生的帖子~~ |
打赏我,让我更有动力~
PyProxy.zip 文件大小:0.005M (下载次数:27)
© 2016 - 2024 掌控者 All Rights Reserved.
山屿云
发表于 2022-8-23
JF我蹲你
评论列表
加载数据中...
喜欢悠哉独自在
发表于 2022-8-25
赞
评论列表
加载数据中...
山屿云
发表于 2022-8-31
123
评论列表
加载数据中...
lilei029
发表于 2022-8-31
niu
评论列表
加载数据中...