|
@@ -1,591 +1,242 @@
|
|
-import os
|
|
|
|
-import time
|
|
|
|
-import logging
|
|
|
|
-import signal
|
|
|
|
-import subprocess
|
|
|
|
-import cv2
|
|
|
|
-import numpy as np
|
|
|
|
|
|
+import ddddocr
|
|
|
|
+
|
|
|
|
+
|
|
from PIL import Image
|
|
from PIL import Image
|
|
import io
|
|
import io
|
|
|
|
+import numpy as np
|
|
|
|
+import time
|
|
|
|
+import json
|
|
import base64
|
|
import base64
|
|
import random
|
|
import random
|
|
-from selenium import webdriver
|
|
|
|
-from selenium.webdriver.firefox.options import Options
|
|
|
|
-from selenium.webdriver.firefox.service import Service
|
|
|
|
-from selenium.webdriver.support.ui import WebDriverWait
|
|
|
|
-from selenium.webdriver.support import expected_conditions as EC
|
|
|
|
-from selenium.webdriver.common.by import By
|
|
|
|
-from selenium.webdriver.common.action_chains import ActionChains
|
|
|
|
-from contextlib import contextmanager
|
|
|
|
|
|
+import math
|
|
|
|
+import requests
|
|
|
|
+import urllib.parse
|
|
|
|
|
|
-# 配置日志
|
|
|
|
-logging.basicConfig(
|
|
|
|
- level=logging.INFO,
|
|
|
|
- format='%(asctime)s - %(levelname)s - %(message)s'
|
|
|
|
-)
|
|
|
|
-logger = logging.getLogger(__name__)
|
|
|
|
-
|
|
|
|
-class TimeoutException(Exception):
|
|
|
|
- pass
|
|
|
|
-
|
|
|
|
-@contextmanager
|
|
|
|
-def time_limit(seconds):
|
|
|
|
- def signal_handler(signum, frame):
|
|
|
|
- raise TimeoutException("Timed out!")
|
|
|
|
- signal.signal(signal.SIGALRM, signal_handler)
|
|
|
|
- signal.alarm(seconds)
|
|
|
|
- try:
|
|
|
|
- yield
|
|
|
|
- finally:
|
|
|
|
- signal.alarm(0)
|
|
|
|
-
|
|
|
|
-class GeetestCracker:
|
|
|
|
|
|
+class TDCSimulator:
|
|
def __init__(self):
|
|
def __init__(self):
|
|
- self.driver = None
|
|
|
|
- self.wait = None
|
|
|
|
- logger.info("初始化 GeetestCracker")
|
|
|
|
-
|
|
|
|
- def cleanup_processes(self):
|
|
|
|
- """清理残留进程"""
|
|
|
|
- try:
|
|
|
|
- subprocess.run(['pkill', '-f', 'firefox'], stderr=subprocess.DEVNULL)
|
|
|
|
- subprocess.run(['pkill', '-f', 'geckodriver'], stderr=subprocess.DEVNULL)
|
|
|
|
- time.sleep(2)
|
|
|
|
- logger.info("清理残留进程完成")
|
|
|
|
- except Exception as e:
|
|
|
|
- logger.error(f"清理进程时出错: {str(e)}")
|
|
|
|
-
|
|
|
|
- def setup_driver(self):
|
|
|
|
- """配置并初始化 WebDriver"""
|
|
|
|
- try:
|
|
|
|
- logger.info("开始设置浏览器驱动")
|
|
|
|
-
|
|
|
|
- # 首先清理可能的残留进程
|
|
|
|
- self.cleanup_processes()
|
|
|
|
-
|
|
|
|
- # 检查 geckodriver
|
|
|
|
- from shutil import which
|
|
|
|
- geckodriver_path = which('geckodriver')
|
|
|
|
-
|
|
|
|
- if not geckodriver_path:
|
|
|
|
- logger.error("未找到 geckodriver")
|
|
|
|
- return False
|
|
|
|
-
|
|
|
|
- logger.info(f"找到 geckodriver: {geckodriver_path}")
|
|
|
|
-
|
|
|
|
- # Firefox 配置 - 修改配置以提高稳定性
|
|
|
|
- options = Options()
|
|
|
|
-
|
|
|
|
- # 核心配置
|
|
|
|
- options.set_preference('marionette', True)
|
|
|
|
- options.set_preference('marionette.port', 2828) # 固定端口
|
|
|
|
- options.set_preference('network.http.connection-timeout', 10000)
|
|
|
|
- options.set_preference('network.http.response.timeout', 10000)
|
|
|
|
-
|
|
|
|
- # 禁用 JavaScript JIT
|
|
|
|
- options.set_preference('javascript.options.ion', False)
|
|
|
|
- options.set_preference('javascript.options.baselinejit', False)
|
|
|
|
-
|
|
|
|
- # 禁用硬件加速
|
|
|
|
- options.set_preference('layers.acceleration.disabled', True)
|
|
|
|
-
|
|
|
|
- # 禁用不必要的功能
|
|
|
|
- options.set_preference('browser.cache.disk.enable', False)
|
|
|
|
- options.set_preference('browser.cache.memory.enable', False)
|
|
|
|
- options.set_preference('browser.cache.offline.enable', False)
|
|
|
|
- options.set_preference('network.http.use-cache', False)
|
|
|
|
- options.set_preference('browser.tabs.remote.autostart', False)
|
|
|
|
- options.set_preference('browser.tabs.remote.autostart.2', False)
|
|
|
|
- options.set_preference('dom.ipc.processCount', 1)
|
|
|
|
- options.set_preference('browser.sessionstore.resume_from_crash', False)
|
|
|
|
-
|
|
|
|
- # 添加必要的参数
|
|
|
|
- options.add_argument('--headless')
|
|
|
|
- options.add_argument('--no-sandbox')
|
|
|
|
- options.add_argument('--disable-dev-shm-usage')
|
|
|
|
- options.add_argument('--disable-gpu')
|
|
|
|
- options.add_argument('--disable-extensions')
|
|
|
|
- options.add_argument('--disable-infobars')
|
|
|
|
- options.add_argument('--disable-notifications')
|
|
|
|
- options.add_argument('--window-size=1280,800')
|
|
|
|
-
|
|
|
|
- logger.info("创建 WebDriver 实例")
|
|
|
|
-
|
|
|
|
- # 使用临时目录
|
|
|
|
- import tempfile
|
|
|
|
- temp_dir = tempfile.mkdtemp()
|
|
|
|
-
|
|
|
|
- # Firefox 临时配置目录
|
|
|
|
- options.set_preference('profile', temp_dir)
|
|
|
|
-
|
|
|
|
- # 创建 Service 对象,添加更多日志选项
|
|
|
|
- service = Service(
|
|
|
|
- geckodriver_path,
|
|
|
|
- log_output=os.path.join(temp_dir, 'geckodriver.log'),
|
|
|
|
- service_args=[
|
|
|
|
- '--log', 'trace',
|
|
|
|
- '--marionette-port', '2828'
|
|
|
|
- ]
|
|
|
|
- )
|
|
|
|
-
|
|
|
|
- # 设置超时限制并重试
|
|
|
|
- max_attempts = 3
|
|
|
|
- for attempt in range(max_attempts):
|
|
|
|
- try:
|
|
|
|
- logger.info(f"尝试创建 WebDriver 实例 (尝试 {attempt + 1}/{max_attempts})")
|
|
|
|
-
|
|
|
|
- # 设置环境变量
|
|
|
|
- os.environ['MOZ_HEADLESS'] = '1'
|
|
|
|
- os.environ['DISPLAY'] = ':99'
|
|
|
|
-
|
|
|
|
- # 创建虚拟显示
|
|
|
|
- try:
|
|
|
|
- subprocess.run(['Xvfb', ':99', '-screen', '0', '1280x800x24'],
|
|
|
|
- start_new_session=True,
|
|
|
|
- stdout=subprocess.DEVNULL,
|
|
|
|
- stderr=subprocess.DEVNULL)
|
|
|
|
- except:
|
|
|
|
- logger.warning("Xvfb 启动失败,继续尝试")
|
|
|
|
-
|
|
|
|
- # 设置严格的超时
|
|
|
|
- with time_limit(20): # 减少超时时间
|
|
|
|
- self.driver = webdriver.Firefox(
|
|
|
|
- service=service,
|
|
|
|
- options=options
|
|
|
|
- )
|
|
|
|
-
|
|
|
|
- # 设置页面加载超时
|
|
|
|
- self.driver.set_page_load_timeout(10)
|
|
|
|
- self.driver.set_script_timeout(10)
|
|
|
|
- self.wait = WebDriverWait(self.driver, 10)
|
|
|
|
-
|
|
|
|
- # 测试连接
|
|
|
|
- logger.info("测试浏览器连接")
|
|
|
|
- self.driver.get('about:blank')
|
|
|
|
-
|
|
|
|
- logger.info("浏览器驱动初始化成功")
|
|
|
|
- return True
|
|
|
|
-
|
|
|
|
- except TimeoutException:
|
|
|
|
- logger.error(f"第 {attempt + 1} 次尝试超时")
|
|
|
|
- except Exception as e:
|
|
|
|
- logger.error(f"第 {attempt + 1} 次尝试失败: {str(e)}")
|
|
|
|
-
|
|
|
|
- # 清理资源
|
|
|
|
- self.cleanup_driver()
|
|
|
|
- self.cleanup_processes()
|
|
|
|
-
|
|
|
|
- if attempt < max_attempts - 1:
|
|
|
|
- logger.info("等待后重试...")
|
|
|
|
- time.sleep(5)
|
|
|
|
- else:
|
|
|
|
- raise Exception("在多次尝试后仍然失败")
|
|
|
|
|
|
+ self.start_time = int(time.time() * 1000)
|
|
|
|
+ self.data = {
|
|
|
|
+ "tdf": 1,
|
|
|
|
+ "ts": [],
|
|
|
|
+ "points": [],
|
|
|
|
+ "events": [],
|
|
|
|
+ "ua": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
|
|
|
|
+ "tm": {
|
|
|
|
+ "st": self.start_time,
|
|
|
|
+ "ed": 0
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ def add_point(self, x, y, t):
|
|
|
|
+ self.data["points"].append([x, y])
|
|
|
|
+ self.data["ts"].append(t - self.start_time)
|
|
|
|
+
|
|
|
|
+ def add_event(self, type_, x, y):
|
|
|
|
+ t = int(time.time() * 1000) - self.start_time
|
|
|
|
+ self.data["events"].append({
|
|
|
|
+ "type": type_,
|
|
|
|
+ "t": t,
|
|
|
|
+ "x": x,
|
|
|
|
+ "y": y
|
|
|
|
+ })
|
|
|
|
+
|
|
|
|
+ def get_data(self):
|
|
|
|
+ self.data["tm"]["ed"] = int(time.time() * 1000)
|
|
|
|
+ return base64.b64encode(json.dumps(self.data).encode()).decode()
|
|
|
|
+
|
|
|
|
+def generate_tracks(start_pos, target_pos):
|
|
|
|
+ tracks = []
|
|
|
|
+ distance = target_pos[0] - start_pos[0]
|
|
|
|
+ current = 0
|
|
|
|
+ t = int(time.time() * 1000)
|
|
|
|
+ v = 0 # 初始速度
|
|
|
|
+
|
|
|
|
+ x = start_pos[0]
|
|
|
|
+ y = start_pos[1]
|
|
|
|
+
|
|
|
|
+ while current < distance:
|
|
|
|
+ delta_t = 10 # 10ms
|
|
|
|
+ t += delta_t
|
|
|
|
|
|
- except Exception as e:
|
|
|
|
- logger.error(f"浏览器驱动初始化失败: {str(e)}")
|
|
|
|
- self.cleanup_driver()
|
|
|
|
- return False
|
|
|
|
-
|
|
|
|
- finally:
|
|
|
|
- # 清理临时目录
|
|
|
|
- try:
|
|
|
|
- import shutil
|
|
|
|
- shutil.rmtree(temp_dir, ignore_errors=True)
|
|
|
|
- except:
|
|
|
|
- pass
|
|
|
|
-
|
|
|
|
- # 停止虚拟显示
|
|
|
|
- try:
|
|
|
|
- subprocess.run(['pkill', 'Xvfb'],
|
|
|
|
- stdout=subprocess.DEVNULL,
|
|
|
|
- stderr=subprocess.DEVNULL)
|
|
|
|
- except:
|
|
|
|
- pass
|
|
|
|
-
|
|
|
|
- def cleanup_processes(self):
|
|
|
|
- """更彻底地清理残留进程"""
|
|
|
|
- try:
|
|
|
|
- # 使用 pkill 清理进程
|
|
|
|
- commands = [
|
|
|
|
- ['pkill', '-f', 'firefox'],
|
|
|
|
- ['pkill', '-f', 'geckodriver'],
|
|
|
|
- ['pkill', '-f', 'Xvfb'],
|
|
|
|
- ['killall', 'firefox'],
|
|
|
|
- ['killall', 'geckodriver'],
|
|
|
|
- ['killall', 'Xvfb']
|
|
|
|
- ]
|
|
|
|
-
|
|
|
|
- for cmd in commands:
|
|
|
|
- try:
|
|
|
|
- subprocess.run(cmd,
|
|
|
|
- stdout=subprocess.DEVNULL,
|
|
|
|
- stderr=subprocess.DEVNULL)
|
|
|
|
- except:
|
|
|
|
- continue
|
|
|
|
-
|
|
|
|
- # 使用 ps 查找并强制终止进程
|
|
|
|
- try:
|
|
|
|
- ps_output = subprocess.check_output(['ps', 'aux']).decode()
|
|
|
|
- for line in ps_output.split('\n'):
|
|
|
|
- if 'firefox' in line or 'geckodriver' in line or 'Xvfb' in line:
|
|
|
|
- try:
|
|
|
|
- pid = int(line.split()[1])
|
|
|
|
- os.kill(pid, signal.SIGKILL)
|
|
|
|
- except:
|
|
|
|
- continue
|
|
|
|
- except:
|
|
|
|
- pass
|
|
|
|
-
|
|
|
|
- time.sleep(2)
|
|
|
|
- logger.info("清理残留进程完成")
|
|
|
|
- except Exception as e:
|
|
|
|
- logger.error(f"清理进程时出错: {str(e)}")
|
|
|
|
-
|
|
|
|
- def navigate_to_page(self):
|
|
|
|
- """导航到目标页面"""
|
|
|
|
- try:
|
|
|
|
- logger.info("正在访问目标网页")
|
|
|
|
-
|
|
|
|
- max_attempts = 3
|
|
|
|
- for attempt in range(max_attempts):
|
|
|
|
- try:
|
|
|
|
- logger.info(f"尝试访问页面 (尝试 {attempt + 1}/{max_attempts})")
|
|
|
|
-
|
|
|
|
- # 增加页面加载超时时间
|
|
|
|
- self.driver.set_page_load_timeout(30)
|
|
|
|
-
|
|
|
|
- # 直接访问目标验证码页面
|
|
|
|
- target_url = 'https://open.yuewen.com/'
|
|
|
|
- logger.info(f"访问目标URL: {target_url}")
|
|
|
|
- self.driver.get(target_url)
|
|
|
|
-
|
|
|
|
- # 等待页面加载完成
|
|
|
|
- logger.info("等待页面加载")
|
|
|
|
- WebDriverWait(self.driver, 30).until(
|
|
|
|
- lambda driver: driver.execute_script("return document.readyState") == "complete"
|
|
|
|
- )
|
|
|
|
-
|
|
|
|
- # 检查页面是否正确加载
|
|
|
|
- try:
|
|
|
|
- # 验证是否存在验证码相关元素
|
|
|
|
- slide_button = WebDriverWait(self.driver, 10).until(
|
|
|
|
- EC.presence_of_element_located((By.CLASS_NAME, "geetest_slider_button"))
|
|
|
|
- )
|
|
|
|
-
|
|
|
|
- if not slide_button:
|
|
|
|
- raise Exception("未找到滑块验证码元素")
|
|
|
|
-
|
|
|
|
- logger.info("验证码页面加载成功")
|
|
|
|
- return True
|
|
|
|
-
|
|
|
|
- except Exception as e:
|
|
|
|
- logger.error(f"验证码元素检查失败: {str(e)}")
|
|
|
|
- raise
|
|
|
|
-
|
|
|
|
- except Exception as e:
|
|
|
|
- logger.error(f"第 {attempt + 1} 次尝试失败: {str(e)}")
|
|
|
|
- if attempt < max_attempts - 1:
|
|
|
|
- logger.info("等待后重试...")
|
|
|
|
-
|
|
|
|
- # 重置浏览器状态
|
|
|
|
- try:
|
|
|
|
- self.driver.execute_script("""
|
|
|
|
- window.stop();
|
|
|
|
- window.location.href = 'about:blank';
|
|
|
|
- """)
|
|
|
|
- except:
|
|
|
|
- pass
|
|
|
|
-
|
|
|
|
- time.sleep(5)
|
|
|
|
- else:
|
|
|
|
- raise Exception("页面访问在多次尝试后仍然失败")
|
|
|
|
-
|
|
|
|
- except Exception as e:
|
|
|
|
- logger.error(f"页面访问失败: {str(e)}")
|
|
|
|
- return False
|
|
|
|
|
|
+ # 加速阶段
|
|
|
|
+ if current < distance * 0.6:
|
|
|
|
+ v += (random.random() * 2 + 2)
|
|
|
|
+ # 减速阶段
|
|
|
|
+ else:
|
|
|
|
+ v *= 0.98
|
|
|
|
|
|
- finally:
|
|
|
|
- # 重置页面加载超时为默认值
|
|
|
|
- try:
|
|
|
|
- self.driver.set_page_load_timeout(30)
|
|
|
|
- except:
|
|
|
|
- pass
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-
|
|
|
|
- def setup_browser_config(self):
|
|
|
|
- """配置浏览器网络设置"""
|
|
|
|
- try:
|
|
|
|
- # 配置网络设置
|
|
|
|
- self.driver.execute_script("""
|
|
|
|
- navigator.connection = {
|
|
|
|
- effectiveType: '4g',
|
|
|
|
- rtt: 50,
|
|
|
|
- downlink: 10,
|
|
|
|
- saveData: false
|
|
|
|
- };
|
|
|
|
- """)
|
|
|
|
-
|
|
|
|
- # 设置自定义请求头
|
|
|
|
- self.driver.execute_cdp_cmd('Network.setUserAgentOverride', {
|
|
|
|
- "userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
|
|
|
|
- })
|
|
|
|
-
|
|
|
|
- # 启用网络监控
|
|
|
|
- self.driver.execute_cdp_cmd('Network.enable', {})
|
|
|
|
-
|
|
|
|
- # 设置网络条件
|
|
|
|
- self.driver.execute_cdp_cmd('Network.emulateNetworkConditions', {
|
|
|
|
- 'offline': False,
|
|
|
|
- 'latency': 20, # 延迟时间(毫秒)
|
|
|
|
- 'downloadThroughput': 780 * 1024 / 8, # 下载速度(字节/秒)
|
|
|
|
- 'uploadThroughput': 330 * 1024 / 8, # 上传速度(字节/秒)
|
|
|
|
- 'connectionType': 'wifi'
|
|
|
|
- })
|
|
|
|
-
|
|
|
|
- return True
|
|
|
|
- except Exception as e:
|
|
|
|
- logger.error(f"配置浏览器网络设置失败: {str(e)}")
|
|
|
|
- return False
|
|
|
|
- def get_slider(self):
|
|
|
|
- """获取滑块元素"""
|
|
|
|
- try:
|
|
|
|
- logger.info("寻找滑块元素")
|
|
|
|
-
|
|
|
|
- # 尝试多种定位方式
|
|
|
|
- selectors = [
|
|
|
|
- (By.CLASS_NAME, "gt_slider_knob"),
|
|
|
|
- (By.CLASS_NAME, "geetest_slider_button"),
|
|
|
|
- (By.CLASS_NAME, "gt_slider_knob_new"),
|
|
|
|
- (By.CSS_SELECTOR, ".gt_slider_knob"),
|
|
|
|
- (By.CSS_SELECTOR, ".geetest_slider_button"),
|
|
|
|
- (By.XPATH, "//div[contains(@class, 'slider')]//div[contains(@class, 'knob')]")
|
|
|
|
- ]
|
|
|
|
-
|
|
|
|
- # 等待任意一个元素出现
|
|
|
|
- for selector in selectors:
|
|
|
|
- try:
|
|
|
|
- logger.info(f"尝试使用选择器: {selector}")
|
|
|
|
- element = WebDriverWait(self.driver, 10).until(
|
|
|
|
- EC.presence_of_element_located(selector)
|
|
|
|
- )
|
|
|
|
- if element:
|
|
|
|
- logger.info(f"成功找到滑块元素: {selector}")
|
|
|
|
- return element
|
|
|
|
- except:
|
|
|
|
- continue
|
|
|
|
-
|
|
|
|
- raise Exception("未能找到滑块元素")
|
|
|
|
-
|
|
|
|
- except Exception as e:
|
|
|
|
- logger.error(f"获取滑块元素失败: {str(e)}")
|
|
|
|
- return None
|
|
|
|
-
|
|
|
|
- def get_slider_background(self):
|
|
|
|
- """获取背景图片"""
|
|
|
|
- try:
|
|
|
|
- # 等待背景图片加载
|
|
|
|
- background = WebDriverWait(self.driver, 10).until(
|
|
|
|
- EC.presence_of_element_located((By.CLASS_NAME, "gt_box"))
|
|
|
|
- )
|
|
|
|
- # 获取背景图片的base64数据
|
|
|
|
- canvas = self.driver.execute_script(
|
|
|
|
- "return document.getElementsByClassName('gt_box')[0].toDataURL('image/png')"
|
|
|
|
- )
|
|
|
|
- # 转换base64为图片
|
|
|
|
- canvas = canvas.split(',')[1]
|
|
|
|
- image_data = base64.b64decode(canvas)
|
|
|
|
- image = Image.open(io.BytesIO(image_data))
|
|
|
|
- return image
|
|
|
|
- except Exception as e:
|
|
|
|
- logger.error(f"获取背景图片失败: {str(e)}")
|
|
|
|
- return None
|
|
|
|
-
|
|
|
|
- def get_slider_image(self):
|
|
|
|
- """获取滑块图片"""
|
|
|
|
- try:
|
|
|
|
- # 等待滑块图片加载
|
|
|
|
- slider = WebDriverWait(self.driver, 10).until(
|
|
|
|
- EC.presence_of_element_located((By.CLASS_NAME, "gt_slice"))
|
|
|
|
- )
|
|
|
|
- # 获取滑块图片的base64数据
|
|
|
|
- canvas = self.driver.execute_script(
|
|
|
|
- "return document.getElementsByClassName('gt_slice')[0].toDataURL('image/png')"
|
|
|
|
- )
|
|
|
|
- # 转换base64为图片
|
|
|
|
- canvas = canvas.split(',')[1]
|
|
|
|
- image_data = base64.b64decode(canvas)
|
|
|
|
- image = Image.open(io.BytesIO(image_data))
|
|
|
|
- return image
|
|
|
|
- except Exception as e:
|
|
|
|
- logger.error(f"获取滑块图片失败: {str(e)}")
|
|
|
|
- return None
|
|
|
|
-
|
|
|
|
- def get_gap(self, bg_image, slider_image):
|
|
|
|
- """计算滑块缺口位置"""
|
|
|
|
- try:
|
|
|
|
- # 转换图片格式
|
|
|
|
- bg = cv2.cvtColor(np.array(bg_image), cv2.COLOR_RGB2BGR)
|
|
|
|
- slider = cv2.cvtColor(np.array(slider_image), cv2.COLOR_RGB2BGR)
|
|
|
|
-
|
|
|
|
- # 计算差异
|
|
|
|
- diff = cv2.absdiff(bg, slider)
|
|
|
|
- mask = cv2.cvtColor(diff, cv2.COLOR_BGR2GRAY)
|
|
|
|
- ret, mask = cv2.threshold(mask, 127, 255, cv2.THRESH_BINARY)
|
|
|
|
-
|
|
|
|
- # 查找轮廓
|
|
|
|
- contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
|
|
|
-
|
|
|
|
- if contours:
|
|
|
|
- # 获取最大轮廓
|
|
|
|
- max_contour = max(contours, key=cv2.contourArea)
|
|
|
|
- x, y, w, h = cv2.boundingRect(max_contour)
|
|
|
|
- return x
|
|
|
|
- return None
|
|
|
|
- except Exception as e:
|
|
|
|
- logger.error(f"计算缺口位置失败: {str(e)}")
|
|
|
|
- return None
|
|
|
|
-
|
|
|
|
- def generate_track(self, distance):
|
|
|
|
- """生成移动轨迹"""
|
|
|
|
- tracks = []
|
|
|
|
- current = 0
|
|
|
|
- mid = distance * 3 / 4
|
|
|
|
- t = 0.2
|
|
|
|
- v = 0
|
|
|
|
|
|
+ # 位移
|
|
|
|
+ delta_x = v * (delta_t / 1000)
|
|
|
|
+ current += delta_x
|
|
|
|
+ x += delta_x
|
|
|
|
|
|
- while current < distance:
|
|
|
|
- if current < mid:
|
|
|
|
- a = 2
|
|
|
|
- else:
|
|
|
|
- a = -3
|
|
|
|
- v0 = v
|
|
|
|
- v = v0 + a * t
|
|
|
|
- move = v0 * t + 1 / 2 * a * t * t
|
|
|
|
- current += move
|
|
|
|
- tracks.append(round(move))
|
|
|
|
|
|
+ # y轴抖动
|
|
|
|
+ y = start_pos[1] + (random.random() * 2 - 1)
|
|
|
|
|
|
- # 微调
|
|
|
|
- while sum(tracks) > distance:
|
|
|
|
- tracks[-1] -= 1
|
|
|
|
- while sum(tracks) < distance:
|
|
|
|
- tracks.append(1)
|
|
|
|
-
|
|
|
|
- # 添加回退
|
|
|
|
- tracks.extend([-1, -1, -2, -2, -1, -1])
|
|
|
|
- return tracks
|
|
|
|
-
|
|
|
|
- def move_slider(self, slider, tracks):
|
|
|
|
- """移动滑块"""
|
|
|
|
- try:
|
|
|
|
- ActionChains(self.driver).click_and_hold(slider).perform()
|
|
|
|
- for track in tracks:
|
|
|
|
- ActionChains(self.driver).move_by_offset(track, random.randint(-1, 1)).perform()
|
|
|
|
- time.sleep(random.uniform(0.01, 0.02))
|
|
|
|
- time.sleep(0.5)
|
|
|
|
- ActionChains(self.driver).release().perform()
|
|
|
|
- return True
|
|
|
|
- except Exception as e:
|
|
|
|
- logger.error(f"移动滑块失败: {str(e)}")
|
|
|
|
- return False
|
|
|
|
-
|
|
|
|
- def crack_captcha(self):
|
|
|
|
- """破解验证码主流程"""
|
|
|
|
- try:
|
|
|
|
- logger.info("开始破解验证码")
|
|
|
|
-
|
|
|
|
- # 获取滑块元素
|
|
|
|
- slider = self.get_slider()
|
|
|
|
- if not slider:
|
|
|
|
- return False
|
|
|
|
-
|
|
|
|
- # 获取图片
|
|
|
|
- bg_image = self.get_slider_background()
|
|
|
|
- slider_image = self.get_slider_image()
|
|
|
|
- if not bg_image or not slider_image:
|
|
|
|
- return False
|
|
|
|
-
|
|
|
|
- # 计算缺口位置
|
|
|
|
- gap = self.get_gap(bg_image, slider_image)
|
|
|
|
- if not gap:
|
|
|
|
- return False
|
|
|
|
- logger.info(f"缺口位置: {gap}")
|
|
|
|
-
|
|
|
|
- # 生成轨迹
|
|
|
|
- tracks = self.generate_track(gap)
|
|
|
|
- logger.info(f"生成轨迹: {len(tracks)}个点")
|
|
|
|
-
|
|
|
|
- # 移动滑块
|
|
|
|
- result = self.move_slider(slider, tracks)
|
|
|
|
- if not result:
|
|
|
|
- return False
|
|
|
|
-
|
|
|
|
- # 等待验证结果
|
|
|
|
- time.sleep(2)
|
|
|
|
-
|
|
|
|
- # 检查是否验证成功
|
|
|
|
- try:
|
|
|
|
- success = WebDriverWait(self.driver, 5).until(
|
|
|
|
- EC.presence_of_element_located((By.CLASS_NAME, "gt_success"))
|
|
|
|
- )
|
|
|
|
- if success:
|
|
|
|
- logger.info("验证成功")
|
|
|
|
- return True
|
|
|
|
- except:
|
|
|
|
- logger.error("验证失败")
|
|
|
|
- return False
|
|
|
|
-
|
|
|
|
- except Exception as e:
|
|
|
|
- logger.error(f"验证码破解失败: {str(e)}")
|
|
|
|
- return False
|
|
|
|
-
|
|
|
|
- def cleanup_driver(self):
|
|
|
|
- """清理 WebDriver 资源"""
|
|
|
|
- if hasattr(self, 'driver') and self.driver:
|
|
|
|
- try:
|
|
|
|
- self.driver.quit()
|
|
|
|
- except:
|
|
|
|
- pass
|
|
|
|
- finally:
|
|
|
|
- self.driver = None
|
|
|
|
-
|
|
|
|
- def run(self):
|
|
|
|
- """运行主程序"""
|
|
|
|
- try:
|
|
|
|
- logger.info("开始破解验证码流程")
|
|
|
|
-
|
|
|
|
- # 初始化浏览器
|
|
|
|
- if not self.setup_driver():
|
|
|
|
- return False
|
|
|
|
-
|
|
|
|
- # 访问目标页面
|
|
|
|
- if not self.navigate_to_page():
|
|
|
|
- return False
|
|
|
|
-
|
|
|
|
- # 破解验证码
|
|
|
|
- max_attempts = 3
|
|
|
|
- for attempt in range(max_attempts):
|
|
|
|
- logger.info(f"第 {attempt + 1} 次尝试破解验证码")
|
|
|
|
- if self.crack_captcha():
|
|
|
|
- return True
|
|
|
|
- time.sleep(2)
|
|
|
|
|
|
+ tracks.append({
|
|
|
|
+ "x": round(x),
|
|
|
|
+ "y": round(y),
|
|
|
|
+ "t": t
|
|
|
|
+ })
|
|
|
|
+
|
|
|
|
+ # 确保最后到达目标位置
|
|
|
|
+ tracks.append({
|
|
|
|
+ "x": target_pos[0],
|
|
|
|
+ "y": target_pos[1],
|
|
|
|
+ "t": t + 10
|
|
|
|
+ })
|
|
|
|
+
|
|
|
|
+ return tracks
|
|
|
|
+
|
|
|
|
+def get_collect(start_pos, target_pos):
|
|
|
|
+ tdc = TDCSimulator()
|
|
|
|
+
|
|
|
|
+ # 开始事件
|
|
|
|
+ tdc.add_event("start", start_pos[0], start_pos[1])
|
|
|
|
+
|
|
|
|
+ # 按下事件
|
|
|
|
+ tdc.add_event("down", start_pos[0], start_pos[1])
|
|
|
|
+
|
|
|
|
+ # 移动轨迹
|
|
|
|
+ tracks = generate_tracks(start_pos, target_pos)
|
|
|
|
+ for track in tracks:
|
|
|
|
+ tdc.add_point(track["x"], track["y"], track["t"])
|
|
|
|
+
|
|
|
|
+ # 松开事件
|
|
|
|
+ tdc.add_event("up", target_pos[0], target_pos[1])
|
|
|
|
+
|
|
|
|
+ # 结束事件
|
|
|
|
+ tdc.add_event("end", target_pos[0], target_pos[1])
|
|
|
|
+
|
|
|
|
+ return tdc.get_data()
|
|
|
|
+
|
|
|
|
+def get_gap(target_img:bytes, background_img:bytes):
|
|
|
|
+ det = ddddocr.DdddOcr(det=False, ocr=False, show_ad=False)
|
|
|
|
+ res = det.slide_match(target_img, background_img,simple_target=True)
|
|
|
|
+ print(res)
|
|
|
|
+ return res["target"][0]
|
|
|
|
+def parse_positions(data):
|
|
|
|
+ """
|
|
|
|
+ 解析验证码位置信息
|
|
|
|
+
|
|
|
|
+ 参数:
|
|
|
|
+ data: _aq_88765 返回的数据
|
|
|
|
+
|
|
|
|
+ 返回:
|
|
|
|
+ {
|
|
|
|
+ "start_pos": [x, y],
|
|
|
|
+ "move_limit": [min_x, max_x],
|
|
|
|
+ "sess": session_id,
|
|
|
|
+ "image_urls": {
|
|
|
|
+ "bg": background_image_url,
|
|
|
|
+ "sprite": sprite_image_url
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ """
|
|
|
|
+ try:
|
|
|
|
+ # 获取滑块信息
|
|
|
|
+ fg_elem_list = data["data"]["dyn_show_info"]["fg_elem_list"]
|
|
|
|
+ slider_info = None
|
|
|
|
+ for elem in fg_elem_list:
|
|
|
|
+ if elem["id"] == 1: # id为1的元素是滑块
|
|
|
|
+ slider_info = elem
|
|
|
|
+ break
|
|
|
|
|
|
- return False
|
|
|
|
-
|
|
|
|
- except Exception as e:
|
|
|
|
- logger.error(f"程序执行失败: {str(e)}")
|
|
|
|
- return False
|
|
|
|
-
|
|
|
|
- finally:
|
|
|
|
- logger.info("程序执行结束")
|
|
|
|
- self.cleanup_driver()
|
|
|
|
|
|
+ if not slider_info:
|
|
|
|
+ raise ValueError("未找到滑块信息")
|
|
|
|
+
|
|
|
|
+ # 解析移动限制
|
|
|
|
+ track_limit = slider_info["move_cfg"]["track_limit"] # "x>=50&&x<=552"
|
|
|
|
+ limits = track_limit.replace("x>=", "").replace("&&x<=", ",").split(",")
|
|
|
|
+ min_x = int(limits[0])
|
|
|
|
+ max_x = int(limits[1])
|
|
|
|
+
|
|
|
|
+ # 获取图片URL
|
|
|
|
+ bg_img_url = data["data"]["dyn_show_info"]["bg_elem_cfg"]["img_url"]
|
|
|
|
+ sprite_url = data["data"]["dyn_show_info"]["sprite_url"]
|
|
|
|
+
|
|
|
|
+ # 获取session
|
|
|
|
+ sess = data["sess"]
|
|
|
|
+
|
|
|
|
+ return {
|
|
|
|
+ "start_pos": slider_info["init_pos"], # [50, 226]
|
|
|
|
+ "move_limit": [min_x, max_x], # [50, 552]
|
|
|
|
+ "sess": sess,
|
|
|
|
+ "image_urls": {
|
|
|
|
+ "bg": bg_img_url,
|
|
|
|
+ "sprite": sprite_url
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ except Exception as e:
|
|
|
|
+ print(f"解析位置信息失败: {str(e)}")
|
|
|
|
+ return None
|
|
|
|
+
|
|
|
|
+def get_captcha_prehandle():
|
|
|
|
+ # 基础URL
|
|
|
|
+ base_url = "https://turing.captcha.qcloud.com/cap_union_prehandle"
|
|
|
|
+
|
|
|
|
+ # 构造请求参数
|
|
|
|
+ params = {
|
|
|
|
+ "aid": "2026859470",
|
|
|
|
+ "protocol": "https",
|
|
|
|
+ "accver": "1",
|
|
|
|
+ "showtype": "popup",
|
|
|
|
+ "ua": "TW96aWxsYS81LjAgKFdpbmRvd3MgTlQgMTAuMDsgV2luNjQ7IHg2NCkgQXBwbGVXZWJLaXQvNTM3LjM2IChLSFRNTCwgbGlrZSBHZWNrbykgQ2hyb21lLzEzMi4wLjAuMCBTYWZhcmkvNTM3",
|
|
|
|
+ "noheader": "1",
|
|
|
|
+ "fb": "1",
|
|
|
|
+ "aged": "0",
|
|
|
|
+ "enableAged": "0",
|
|
|
|
+ "enableDarkMode": "0",
|
|
|
|
+ "grayscale": "1",
|
|
|
|
+ "clientype": "2",
|
|
|
|
+ "cap_cd": "",
|
|
|
|
+ "uid": "",
|
|
|
|
+ "lang": "zh-cn",
|
|
|
|
+ "entry_url": "https://open.yuewen.com/",
|
|
|
|
+ "elder_captcha": "0",
|
|
|
|
+ "js": "/tcaptcha-frame.c055d939.js",
|
|
|
|
+ "login_appid": "",
|
|
|
|
+ "wb": "1",
|
|
|
|
+ "subsid": "5",
|
|
|
|
+ "callback": "_aq_88765",
|
|
|
|
+ "sess": ""
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ # 设置请求头
|
|
|
|
+ headers = {
|
|
|
|
+ "accept": "*/*",
|
|
|
|
+ "accept-language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
|
|
|
|
+ "sec-ch-ua": '"Not A(Brand";v="8", "Chromium";v="132", "Microsoft Edge";v="132"',
|
|
|
|
+ "sec-ch-ua-mobile": "?0",
|
|
|
|
+ "sec-ch-ua-platform": '"Windows"',
|
|
|
|
+ "sec-fetch-dest": "script",
|
|
|
|
+ "sec-fetch-mode": "no-cors",
|
|
|
|
+ "sec-fetch-site": "cross-site",
|
|
|
|
+ "Referer": "https://open.yuewen.com/",
|
|
|
|
+ "Referrer-Policy": "strict-origin-when-cross-origin"
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ try:
|
|
|
|
+ # 发送GET请求
|
|
|
|
+ response = requests.get(
|
|
|
|
+ base_url,
|
|
|
|
+ params=params,
|
|
|
|
+ headers=headers
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ # 检查响应状态
|
|
|
|
+ response.raise_for_status()
|
|
|
|
+
|
|
|
|
+ # 获取响应文本
|
|
|
|
+ return response.text
|
|
|
|
+
|
|
|
|
+ except requests.exceptions.RequestException as e:
|
|
|
|
+ print(f"请求失败: {str(e)}")
|
|
|
|
+ return None
|
|
|
|
|
|
- def __del__(self):
|
|
|
|
- """析构函数,确保资源被清理"""
|
|
|
|
- self.cleanup_driver()
|
|
|
|
- self.cleanup_processes()
|
|
|
|
|
|
+# start_pos = [50, 226] # 滑块初始位置
|
|
|
|
+# target_pos = [342, 226] # 目标位置
|
|
|
|
|
|
-if __name__ == "__main__":
|
|
|
|
- cracker = GeetestCracker()
|
|
|
|
- result = cracker.run()
|
|
|
|
- print("破解结果:", "成功" if result else "失败")
|
|
|
|
|
|
+# collect = get_collect(start_pos, target_pos)
|
|
|
|
+# print(collect)
|