(1).我们需要安装几个python中的库,安装起来很简单,pip3 install “库名”。
以下是需要用到的库,其中包含系统自带库,大家可根据自身情况选择安装。(若缺少相应的模板,在脚本执行时会有报错提示的,可根据提示补充安装)
Redis,redis-dump
Pyquery
urllib
random
asyncio
aiohttp
botocore
multiprocessing
(2).安装Redis-x64-3.0.504.msi,若不安装则脚本运行时会报错:Error 10061 connecting to 127.0.0.1:6379. 由于目标计算机积极拒绝,无法连接
报错原因:Redis服务没有启动
安装教程:github下载,下载速度极慢,需要搭梯子。考虑到本篇文章不能涉及翻墙,所以我将我搭梯子下载好的Redis-x64-3.0.504.msi放到下方的附件中,供大家安装。
(1).模块创建
本次代理池全面升级,为了实现功能的多样性,以及保证脚本的稳定性,可读性。我将创建6个模块脚本,实现从代理池运行——》爬取代理——》存储——》检测——》接口——》调用等功能。
(2).实现代理维护
使用代理赋值法,将批量获取的免费代理统一赋初始值为10,并存入数据库中,通过检测模块向代理服务器发送请求,若首次请求成功,则将该代理初始值提升至100,若首次请求失败则将初始值减1,若代理值减为0,则将代理从代理池中移除。
(3.)代理调用
通过WEB API接口,拿到随机可用代理,根据我们为代理赋加的值,优先获取最高值代理(值越高越稳定),若无最高值代理,则根据值的大小进行排名,优先输出排名最靠前的代理,供我们使用。
(1).将爬取的代理存储到Redis数据库中,通过定义一个类来操作Redis的有序集合。
(2).我们前面说过要为代理赋初始值,并且根据赋值大小进行排序,所以我们不得不调用Redis有序集合来满足我们的要求。
什么是Redis有序集合:
Redis 有序集合和集合一样也是string类型元素的集合,且不允许重复的成员。不同的是每个元素都会关联一个double类型的分数。redis正是通过分数来为集合中的成员进行从小到大的排序。 这里的double类型分数就是我们为代理赋的值。
现在我们要定义一个类,一些方法 和一些常量来实现代理的存储。
python
# coding=gbk
#存储模块
python
MAX_SCORE = 100 #最大值
MIN_SCORE = 0 #最小值
INITIAL_SCORE = 10 #初始值
REDIS_HOST = 'localhost' #Redis连接IP
REDIS_PORT = 6379 #Redis连接端口
REDIS_PASSWORD = None #连接密码,大家根据自己需求选择
REDIS_KEY = 'proxies' #有序集合键名,获取代理存储使用的有序集合
python
import redis #实现Redis的连接及使用
from random import choice #返回一个列表,元组或字符串的随机项
class RedisClient(object):
def __init__(self, host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD):
"""
初始化
:param host: Redis 地址
:param port: Redis 端口
:param password: Redis密码
"""
self.db = redis.StrictRedis(host=host, port=port, password=password, decode_responses=True)
这里有个小细节:self.db是定义一个实例,用来连接数据库的,里面有一个decode_responses参数值为True 这是因为当我们要求返回键值时,若不加此条件则会返回——b’Value’ ,返回结果会有一个跟屁虫b,b代表为byte数据类型,所以当添加decode_responses=True时,返回结果就不会有跟屁虫了——’Valie’
python
def add(self, proxy, score=INITIAL_SCORE):
"""
添加代理,设置分数为最高
:param proxy: 代理
:param score: 分数
:return: 添加结果
"""
if not self.db.zscore(REDIS_KEY, proxy):
return self.db.zadd(REDIS_KEY, score, proxy)
add()实现为批量获取的代理附加初始值10。
def random(self):
"""
随机获取有效代理,首先尝试获取最高分数代理,如果不存在,按照排名获取,否则异常
:return: 随机代理
"""
result = self.db.zrangebyscore(REDIS_KEY, MAX_SCORE, MAX_SCORE)
if len(result):
return choice(result)
else:
result = self.db.zrevrange(REDIS_KEY, 0, 100)
if len(result):
return choice(result)
else:
raise PoolEmptyError
第一个result包含所有值为100的代理(最高效代理),第二个result就是矬子里拔将军,但也很不错的。
def decrease(self, proxy):
"""
代理值减一分,小于最小值则删除
:param proxy: 代理
:return: 修改后的代理分数
"""
score = self.db.zscore(REDIS_KEY, proxy)
if score and score > MIN_SCORE:
print('代理', proxy, '当前分数', score, '减1')
return self.db.zincrby(REDIS_KEY, proxy, -1)
else:
print('代理', proxy, '当前分数', score, '移除')
return self.db.zrem(REDIS_KEY, proxy)
Redis基础语法不讲了,实现无用代理移除功能
def exists(self, proxy):
"""
判断是否存在
:param proxy: 代理
:return: 是否存在
"""
return not self.db.zscore(REDIS_KEY, proxy) == None
#简略写法,当代理池无代理时 返回not
def max(self, proxy):
"""
将代理设置为MAX_SCORE
:param proxy: 代理
:return: 设置结果
"""
print('代理', proxy, '可用,设置为', MAX_SCORE)
return self.db.zadd(REDIS_KEY, MAX_SCORE, proxy)
def count(self):
"""
获取数量
:return: 数量
"""
return self.db.zcard(REDIS_KEY)
def all(self):
"""
获取全部代理
:return: 全部代理列表
"""
return self.db.zrangebyscore(REDIS_KEY, MIN_SCORE, MAX_SCORE)
从各大网站批量爬去代理,比较简单 不做过多说明 以前文章有涉及过爬虫的原理。
from pyquery import PyQuery as pq #解析
import urllib.request #请求
class ProxyMetaclass(type):
def __new__(cls, name, bases, attrs):
count = 0
attrs['__CrawlFunc__'] = []
for k, v in attrs.items():
if 'crawl_' in k:
attrs['__CrawlFunc__'].append(k)
count += 1
attrs['__CrawlFuncCount__'] = count
return type.__new__(cls, name, bases, attrs)
class Crawler(object, metaclass=ProxyMetaclass):
def get_proxies(self, callback): #callback = crawl_daili66 就是下面定义的获取代理的方法名称
proxies = []
for proxy in eval("self.{}()".format(callback)):
print('成功获取到代理', proxy)
proxies.append(proxy)
return proxies
其实这里借助于元类来实现【kk……对于元类我了解的也不深】。
这是attrs字典形式 K为键 V为值。当中包含我们定义的方法名称 如图:
定义了一个 ProxyMetaclass,Crawl 类将它设置为元类,元类中实现了 new() 方法,这个方法有固定的几个参数,其中第四个参数 attrs 中包含了类的一些属性,这其中就包含了类中方法的一些信息,我们可以遍历 attrs 这个变量即可获取类的所有方法信息。所以在这里我们在 new() 方法中遍历了 attrs 的这个属性,就像遍历一个字典一样,键名对应的就是方法的名称,接下来判断其开头是否是 crawl_,如果是,则将其加入到 CrawlFunc 属性中,这样我们就成功将所有以 crawl 开头的方法定义成了一个属性,就成功动态地获取到所有以 crawl 开头的方法列表了。
def crawl_daili66(self,page_count=4):
start_url = 'http://www.66ip.cn/{}.html'
urls = [start_url.format (page) for page in range(1, page_count + 1)]
for url in urls:
print('Crawling', url)
req = urllib.request.Request(url=url)
res = urllib.request.urlopen(req)
html = res.read()
if html:
doc = pq(html)
trs = doc('.containerbox table tr:gt(0)').items()
for tr in trs:
ip = tr.find('td:nth-child(1)').text()
port = tr.find('td:nth-child(2)').text()
yield ':' .join([ip,port])
def crawl_proxyXH(self):
start_url = "http://www.89ip.cn/index_{}.html"
urls = [start_url.format(page) for page in range(1,10)]
headers = {'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:23.0) Gecko/20100101 Firefox/23.0'}
for url in urls:
req = urllib.request.Request(url=url,headers=headers)
res = urllib.request.urlopen(req)
html = res.read()
if html:
doc = pq(html)
trs = doc('.layui-row table tr:gt(0)').items()
for tr in trs:
ip = tr.find('td:nth-child(1)').text() #tr节点下第一个td子节点的文本内容
port = tr.find('td:nth-child(2)').text()
yield ':' .join([ip,port])
唯一要点一下的一处是,这里的tr:gt(0)容易理解错,这里使用gt(0),并不是说包含了从第一个tr节点到最后一个tr节点的所有节点,而是包含首个tr节点的下一个兄弟节点及其后的所有tr节点。换句话说,就是除了第一个tr节点外 其他的tr节点都包含其中
我找了两个比较好用且访问速度比较快的网站,之所以抛弃”小幻代理”是因为容易出现访问延迟的现象,影响脚本的稳定性。
第二个代理网站需要设置请求头的,要不然会禁止访问。
爬虫方法是如何运行的见下图,以66ip为例:
from db import RedisClient
from crawler import Crawler
限制代理池的最大容存量为10000
POOL_UPPER_THRESHOLD = 10000
定义Getter类,创建实例,为了调用其中类的函数
class Getter():
def __init__(self):
self.redis = RedisClient()
self.crawler = Crawler()
def is_over_threshold(self):
"""
调用RedisClient中count()函数
判断是否达到了代理池限制
"""
if self.redis.count() >= POOL_UPPER_THRESHOLD:
return True
else:
return False
def run(self):
print('获取器开始执行')
if not self.is_over_threshold():
for callback_label in range(self.crawler.__CrawlFuncCount__): #从列表获取所有包含crawl_的方法
callback = self.crawler.__CrawlFunc__[callback_label]
proxies = self.crawler.get_proxies(callback) #承接获取模块中的函数,获取代理
for proxy in proxies:
self.redis.add(proxy) #存储到redis数据库中
定义了 is_over_threshold() 方法判断代理池是否已经达到了容量阈值,它就是调用了 RedisClient 的 count() 方法获取代理的数量,然后加以判断,如果数量达到阈值则返回 True,否则 False。如果不想加这个限制可以将此方法永久返回 True。
接下来定义了 run() 方法,首先判断了代理池是否达到阈值,然后在这里就调用了 Crawler 类的 CrawlFunc 属性,获取到所有以 crawl 开头的方法列表,依次通过 get_proxies() 方法调用,得到各个方法抓取到的代理,然后再利用 RedisClient 的 add() 方法加入数据库
在上述db.py crawler.py getter.py三个模块中,我们已经能够成功获取代理并且将其放入数据库中。然后就需要一个检测模块来对所有的代理进行一轮轮的检测,检测可用就设置为 100,不可用就分数减 1,这样就可以实时改变每个代理的可用情况,在获取有效代理的时候只需要获取分数高的代理即可。
由于代理的数量非常多,为了提高代理的检测效率,我们在这里使用异步请求库 Aiohttp 来进行检测。
为什么要用Aiohttp呢,来回想一下,我们在请求单个网址的时候通常习惯使用requests来请求,而Requests 作为一个同步请求库,我们在发出一个请求之后需要等待网页加载完成之后才能继续执行程序。也就是这个过程会阻塞在等待响应这个过程,如果服务器响应非常慢,比如一个请求等待十几秒,那么我们使用 Requests 完成一个请求就会需要十几秒的时间,中间其实就是一个等待响应的过程,程序也不会继续往下执行。
而Aiohttp异步请求库便完美的解决了这个问题。在请求发出之后,程序可以继续接下去执行去做其他的事情,当响应到达时,会通知程序再去处理这个响应,这样程序就没有被阻塞,充分把时间和资源利用起来,大大提高效率。
VALID_STATUS_CODES = [200]
TEST_URL = 'http://www.baidu.com'
BATCH_TEST_SIZE = 100
设置好状态码200为目标服务器已经处理了请求。
BATCH_TEST_SIZE设置好一次检测的最大代理量,这里一次最多检测100个代理
TEST_URL:使用该网站进行检测,可以设置为一个不会封 IP 的网站。百度就很不错哦~
from db import RedisClient
import asyncio #用来编写 并发 代码的库
import aiohttp
import time
init() 方法中建立了一个 RedisClient 对象,供类中其他方法使用
class Tester(object):
def __init__(self):
self.redis = RedisClient()
接下来定义了一个 test_single_proxy() 方法,用来检测单个代理的可用情况,其参数就是被检测的代理。
async def test_single_proxy(self, proxy):
conn = aiohttp.TCPConnector(verify_ssl=False) #用于使用TCP处理HTTP和HTTPS的连接器
async with aiohttp.ClientSession(connector=conn) as session:
try:
if isinstance(proxy, bytes):
proxy = proxy.decode('utf-8')
real_proxy = 'http://' + proxy
print('正在测试', proxy)
async with session.get(TEST_URL, proxy=real_proxy, timeout=15) as response: #调用get()请求代理
if response.status in VALID_STATUS_CODES:
self.redis.max(proxy)
print('代理可用', proxy)
else:
self.redis.decrease(proxy)
print('请求响应码不合法', proxy)
except :
self.redis.decrease(proxy)
print('代理请求失败', proxy)
注意这个方法前面加了 async 关键词,代表这个方法是异步的,方法内部首先创建了 Aiohttp 的 ClientSession 对象,此对象类似于 Requests 的 Session 对象,可以直接调用该对象的 get() 方法来访问页面。
在这里代理的设置方式是通过 proxy 参数传递给 get() 方法,请求方法前面也需要加上 async 关键词标明是异步请求,这也是 Aiohttp 使用时的常见写法。
def run(self):
print('测试器开始运行')
try:
proxies = self.redis.all() # 所有的代理
loop = asyncio.get_event_loop() #事件循环的获取
# 批量测试
for i in range(0, len(proxies), BATCH_TEST_SIZE):
test_proxies = proxies[i:i + BATCH_TEST_SIZE]
tasks = [self.test_single_proxy(proxy) for proxy in test_proxies]
loop.run_until_complete(asyncio.wait(tasks))
time.sleep(5)
except Exception as e:
print('测试器发生错误', e.args)
(1).数据库密码泄露风险
(2).为了远程连接代理池
(3).便于同步更新
这样获取代理只需要请求一下接口即可,以上的几个缺点弊端可以解决。
from flask import Flask, g #调用flask库
from db import RedisClient #调用类
这是个小知识点all是个变量列表,我们看到all等于[‘app’]意思就是说 ,在本模块中,若不引用该模块,则只允许执行app函数-> Flask(name)
__all__ = ['app']
app = Flask(__name__)
初始化:所有的Flask都必须创建程序实例,
web服务器使用wsgi协议,把客户端所有的请求都转发给这个程序实例
程序实例是Flask的对象,一般情况下用如下方法实例化
Flask类只有一个必须指定的参数,即程序主模块或者包的名字,name是系统变量,该变量指的是本py文件的文件名
def get_conn():
if not hasattr(g, 'redis'):
g.redis = RedisClient()
return g.redis
@app.route('/')
def index():
return '<h2>Welcome to Proxy Pool System</h2>'
@app.route('/random')
def get_proxy():
conn = get_conn()
return conn.random()
@app.route('/count')
def get_counts():
conn = get_conn()
return str(conn.count())
客户端发送url给web服务器,web服务器将url转发给flask程序实例,程序实例
需要知道对于每一个url请求启动那一部分代码,所以保存了一个url和python函数的映射关系。
处理url和函数之间关系的程序,称为路由
在flask中,定义路由最简便的方式,是使用程序实例的app.route装饰器,把装饰的函数注册为路由
if __name__ == '__main__':
app.run()
TESTER_CYCLE = 20
GETTER_CYCLE = 20
TESTER_ENABLED = True
GETTER_ENABLED = True
API_ENABLED = True
在这里还有三个常量,TESTER_ENABLED、GETTER_ENABLED、API_ENABLED 都是布尔类型,True 或者 False。标明了测试模块、获取模块、接口模块的开关,如果为 True,则代表模块开启。
import time
from multiprocessing import Process
from api import app #调用接口模块
from getter import Getter #调用获取模块(2) crawler.py与getter皆为获取模块
from tester import Tester #调用检测模块
三个调度方法结构也非常清晰,比如 schedule_tester() 方法,这是用来调度测试模块的方法,首先声明一个 Tester 对象,然后进入死循环不断循环调用其 run() 方法,执行完一轮之后就休眠一段时间,休眠结束之后重新再执行。在这里休眠时间也定义为一个常量,如 20 秒,这样就会每隔 20 秒进行一次代理检测。
class Scheduler():
def schedule_tester(self, cycle=TESTER_CYCLE):
tester = Tester()
while True:
print('测试器开始运行')
tester.run()
time.sleep(cycle)
#每隔20秒从数据库获取一次代理
def schedule_getter(self, cycle=GETTER_CYCLE):
getter = Getter()
while True:
print('开始抓取代理')
getter.run()
time.sleep(cycle)
def schedule_api(self):
app.run('127.0.0.1','5000') #这里要看分配,我这儿分配的是5000端口也就是AIP_PORT,这个端口怎么看,大家可以直接执行aip.py模块
启动入口是 run() 方法,其分别判断了三个模块的开关,如果开启的话,就新建一个 Process 进程,设置好启动目标,然后调用 start() 方法运行,这样三个进程就可以并行执行,互不干扰。
def run(self):
print('代理池开始运行')
if TESTER_ENABLED:
tester_process = Process(target=self.schedule_tester)
tester_process.start()
if GETTER_ENABLED:
getter_process = Process(target=self.schedule_getter)
getter_process.start()
if API_ENABLED:
api_process = Process(target=self.schedule_api)
api_process.start()
if __name__=='__main__':
Scheduler().run()
(1).首先要保证Redis服务开启,若没开启的话请下载并安装Redis-x64-3.0.504.msi
安装包已添加到附件中,点开直接无脑‘下一步‘,安装成功后,找到.exe文件所在目录,双击执行即可。 Port:6379别动就行,脚本里面已经提前写好了。
(2).回到run.py脚本 直接执行(保证所有模块均在同一目录下)
(3).执行效果:如图
方法:sqlmap -u 目标网址 –porxy = http://127.0.0.1:5000/random 即可实现每扫描一次就,变动一次我们扫描器的IP地址。是不是很牛掰的样子。
用户名 | 金币 | 积分 | 时间 | 理由 |
---|---|---|---|---|
1659720135 | 5.00 | 0 | 2021-12-09 18:06:05 | 一个受益终生的帖子~~ |
nifuda | 1.00 | 0 | 2021-05-29 21:09:05 | 一个受益终生的帖子~~ |
1761012518 | 10.00 | 0 | 2020-08-07 13:01:08 | 一个受益终生的帖子~~ |
1761012518 | 5.00 | 0 | 2020-08-07 13:01:07 | 一个受益终生的帖子~~ |
奖励系统 | 100.00 | 0 | 2020-08-06 22:10:00 | 投稿满 10 赞奖励 |
奖励系统 | 50.00 | 0 | 2020-08-06 21:09:58 | 投稿满 5 赞奖励 |
Track-聂风 | 130.00 | 0 | 2020-08-06 19:07:36 | 加油~支持同学 |
打赏我,让我更有动力~
The attachment is hidden and you need to reply to the topic before it becomes visible.
© 2016 - 2024 掌控者 All Rights Reserved.
18487620995
发表于 2023-5-21
打卡记录
评论列表
加载数据中...
yufei
发表于 2023-11-30
1
评论列表
加载数据中...
落尘
发表于 10个月前
顶
评论列表
加载数据中...
a1b3lt
发表于 9个月前
马上搭建一下试试
评论列表
加载数据中...
exploit2020
发表于 8个月前
感谢大佬
评论列表
加载数据中...
liqiang
发表于 7个月前
顶顶顶
评论列表
加载数据中...
skyblue
发表于 4个月前
跟随大佬的步伐
评论列表
加载数据中...
林辞
发表于 4个月前
顶
评论列表
加载数据中...