feat: 代理IP功能 Done

This commit is contained in:
Relakkes
2023-12-08 00:10:04 +08:00
parent c530bd4219
commit 1cec23f73d
9 changed files with 103 additions and 92 deletions

View File

@@ -5,7 +5,7 @@
import json
import pathlib
import random
from typing import Dict, List
from typing import List
import httpx
from tenacity import retry, stop_after_attempt, wait_fixed
@@ -29,18 +29,21 @@ class ProxyIpPool:
"""
self.proxy_list = await IpProxy.get_proxies(self.ip_pool_count)
@retry(stop=stop_after_attempt(3), wait=wait_fixed(1))
async def is_valid_proxy(self, proxy: IpInfoModel) -> bool:
"""
验证代理IP是否有效
:param proxy:
:return:
"""
utils.logger.info(f"[ProxyIpPool.is_valid_proxy] testing {proxy.ip} is it valid ")
try:
httpx_proxy = f"{proxy.protocol}{proxy.ip}:{proxy.port}"
proxy_auth = httpx.BasicAuth(proxy.user, proxy.password)
async with httpx.AsyncClient(proxies={proxy.protocol: httpx_proxy}, auth=proxy_auth) as client:
proxies = {
f"{proxy.protocol}{proxy.ip}": httpx_proxy
}
async with httpx.AsyncClient(proxies=proxies, auth=proxy_auth) as client:
response = await client.get(self.valid_ip_url)
if response.status_code == 200:
return True