import os import time import logging import signal import subprocess import cv2 import numpy as np from PIL import Image import io import base64 import random from selenium import webdriver from selenium.webdriver.firefox.options import Options from selenium.webdriver.firefox.service import Service from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.common.by import By from selenium.webdriver.common.action_chains import ActionChains from contextlib import contextmanager # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class TimeoutException(Exception): pass @contextmanager def time_limit(seconds): def signal_handler(signum, frame): raise TimeoutException("Timed out!") signal.signal(signal.SIGALRM, signal_handler) signal.alarm(seconds) try: yield finally: signal.alarm(0) class GeetestCracker: def __init__(self): self.driver = None self.wait = None logger.info("初始化 GeetestCracker") def cleanup_processes(self): """清理残留进程""" try: subprocess.run(['pkill', '-f', 'firefox'], stderr=subprocess.DEVNULL) subprocess.run(['pkill', '-f', 'geckodriver'], stderr=subprocess.DEVNULL) time.sleep(2) logger.info("清理残留进程完成") except Exception as e: logger.error(f"清理进程时出错: {str(e)}") def setup_driver(self): """配置并初始化 WebDriver""" try: logger.info("开始设置浏览器驱动") # 首先清理可能的残留进程 self.cleanup_processes() # 检查 geckodriver from shutil import which geckodriver_path = which('geckodriver') if not geckodriver_path: logger.error("未找到 geckodriver") return False logger.info(f"找到 geckodriver: {geckodriver_path}") # Firefox 配置 - 修改配置以提高稳定性 options = Options() # 核心配置 options.set_preference('marionette', True) options.set_preference('marionette.port', 2828) # 固定端口 options.set_preference('network.http.connection-timeout', 10000) options.set_preference('network.http.response.timeout', 10000) # 禁用 JavaScript JIT options.set_preference('javascript.options.ion', False) options.set_preference('javascript.options.baselinejit', False) # 禁用硬件加速 options.set_preference('layers.acceleration.disabled', True) # 禁用不必要的功能 options.set_preference('browser.cache.disk.enable', False) options.set_preference('browser.cache.memory.enable', False) options.set_preference('browser.cache.offline.enable', False) options.set_preference('network.http.use-cache', False) options.set_preference('browser.tabs.remote.autostart', False) options.set_preference('browser.tabs.remote.autostart.2', False) options.set_preference('dom.ipc.processCount', 1) options.set_preference('browser.sessionstore.resume_from_crash', False) # 添加必要的参数 options.add_argument('--headless') options.add_argument('--no-sandbox') options.add_argument('--disable-dev-shm-usage') options.add_argument('--disable-gpu') options.add_argument('--disable-extensions') options.add_argument('--disable-infobars') options.add_argument('--disable-notifications') options.add_argument('--window-size=1280,800') logger.info("创建 WebDriver 实例") # 使用临时目录 import tempfile temp_dir = tempfile.mkdtemp() # Firefox 临时配置目录 options.set_preference('profile', temp_dir) # 创建 Service 对象,添加更多日志选项 service = Service( geckodriver_path, log_output=os.path.join(temp_dir, 'geckodriver.log'), service_args=[ '--log', 'trace', '--marionette-port', '2828' ] ) # 设置超时限制并重试 max_attempts = 3 for attempt in range(max_attempts): try: logger.info(f"尝试创建 WebDriver 实例 (尝试 {attempt + 1}/{max_attempts})") # 设置环境变量 os.environ['MOZ_HEADLESS'] = '1' os.environ['DISPLAY'] = ':99' # 创建虚拟显示 try: subprocess.run(['Xvfb', ':99', '-screen', '0', '1280x800x24'], start_new_session=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) except: logger.warning("Xvfb 启动失败,继续尝试") # 设置严格的超时 with time_limit(20): # 减少超时时间 self.driver = webdriver.Firefox( service=service, options=options ) # 设置页面加载超时 self.driver.set_page_load_timeout(10) self.driver.set_script_timeout(10) self.wait = WebDriverWait(self.driver, 10) # 测试连接 logger.info("测试浏览器连接") self.driver.get('about:blank') logger.info("浏览器驱动初始化成功") return True except TimeoutException: logger.error(f"第 {attempt + 1} 次尝试超时") except Exception as e: logger.error(f"第 {attempt + 1} 次尝试失败: {str(e)}") # 清理资源 self.cleanup_driver() self.cleanup_processes() if attempt < max_attempts - 1: logger.info("等待后重试...") time.sleep(5) else: raise Exception("在多次尝试后仍然失败") except Exception as e: logger.error(f"浏览器驱动初始化失败: {str(e)}") self.cleanup_driver() return False finally: # 清理临时目录 try: import shutil shutil.rmtree(temp_dir, ignore_errors=True) except: pass # 停止虚拟显示 try: subprocess.run(['pkill', 'Xvfb'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) except: pass def cleanup_processes(self): """更彻底地清理残留进程""" try: # 使用 pkill 清理进程 commands = [ ['pkill', '-f', 'firefox'], ['pkill', '-f', 'geckodriver'], ['pkill', '-f', 'Xvfb'], ['killall', 'firefox'], ['killall', 'geckodriver'], ['killall', 'Xvfb'] ] for cmd in commands: try: subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) except: continue # 使用 ps 查找并强制终止进程 try: ps_output = subprocess.check_output(['ps', 'aux']).decode() for line in ps_output.split('\n'): if 'firefox' in line or 'geckodriver' in line or 'Xvfb' in line: try: pid = int(line.split()[1]) os.kill(pid, signal.SIGKILL) except: continue except: pass time.sleep(2) logger.info("清理残留进程完成") except Exception as e: logger.error(f"清理进程时出错: {str(e)}") def navigate_to_page(self): """导航到目标页面""" try: logger.info("正在访问目标网页") max_attempts = 3 for attempt in range(max_attempts): try: logger.info(f"尝试访问页面 (尝试 {attempt + 1}/{max_attempts})") # 增加页面加载超时时间 self.driver.set_page_load_timeout(30) # 直接访问目标验证码页面 target_url = 'https://open.yuewen.com/' logger.info(f"访问目标URL: {target_url}") self.driver.get(target_url) # 等待页面加载完成 logger.info("等待页面加载") WebDriverWait(self.driver, 30).until( lambda driver: driver.execute_script("return document.readyState") == "complete" ) # 检查页面是否正确加载 try: # 验证是否存在验证码相关元素 slide_button = WebDriverWait(self.driver, 10).until( EC.presence_of_element_located((By.CLASS_NAME, "geetest_slider_button")) ) if not slide_button: raise Exception("未找到滑块验证码元素") logger.info("验证码页面加载成功") return True except Exception as e: logger.error(f"验证码元素检查失败: {str(e)}") raise except Exception as e: logger.error(f"第 {attempt + 1} 次尝试失败: {str(e)}") if attempt < max_attempts - 1: logger.info("等待后重试...") # 重置浏览器状态 try: self.driver.execute_script(""" window.stop(); window.location.href = 'about:blank'; """) except: pass time.sleep(5) else: raise Exception("页面访问在多次尝试后仍然失败") except Exception as e: logger.error(f"页面访问失败: {str(e)}") return False finally: # 重置页面加载超时为默认值 try: self.driver.set_page_load_timeout(30) except: pass def setup_browser_config(self): """配置浏览器网络设置""" try: # 配置网络设置 self.driver.execute_script(""" navigator.connection = { effectiveType: '4g', rtt: 50, downlink: 10, saveData: false }; """) # 设置自定义请求头 self.driver.execute_cdp_cmd('Network.setUserAgentOverride', { "userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" }) # 启用网络监控 self.driver.execute_cdp_cmd('Network.enable', {}) # 设置网络条件 self.driver.execute_cdp_cmd('Network.emulateNetworkConditions', { 'offline': False, 'latency': 20, # 延迟时间(毫秒) 'downloadThroughput': 780 * 1024 / 8, # 下载速度(字节/秒) 'uploadThroughput': 330 * 1024 / 8, # 上传速度(字节/秒) 'connectionType': 'wifi' }) return True except Exception as e: logger.error(f"配置浏览器网络设置失败: {str(e)}") return False def get_slider(self): """获取滑块元素""" try: logger.info("寻找滑块元素") # 尝试多种定位方式 selectors = [ (By.CLASS_NAME, "gt_slider_knob"), (By.CLASS_NAME, "geetest_slider_button"), (By.CLASS_NAME, "gt_slider_knob_new"), (By.CSS_SELECTOR, ".gt_slider_knob"), (By.CSS_SELECTOR, ".geetest_slider_button"), (By.XPATH, "//div[contains(@class, 'slider')]//div[contains(@class, 'knob')]") ] # 等待任意一个元素出现 for selector in selectors: try: logger.info(f"尝试使用选择器: {selector}") element = WebDriverWait(self.driver, 10).until( EC.presence_of_element_located(selector) ) if element: logger.info(f"成功找到滑块元素: {selector}") return element except: continue raise Exception("未能找到滑块元素") except Exception as e: logger.error(f"获取滑块元素失败: {str(e)}") return None def get_slider_background(self): """获取背景图片""" try: # 等待背景图片加载 background = WebDriverWait(self.driver, 10).until( EC.presence_of_element_located((By.CLASS_NAME, "gt_box")) ) # 获取背景图片的base64数据 canvas = self.driver.execute_script( "return document.getElementsByClassName('gt_box')[0].toDataURL('image/png')" ) # 转换base64为图片 canvas = canvas.split(',')[1] image_data = base64.b64decode(canvas) image = Image.open(io.BytesIO(image_data)) return image except Exception as e: logger.error(f"获取背景图片失败: {str(e)}") return None def get_slider_image(self): """获取滑块图片""" try: # 等待滑块图片加载 slider = WebDriverWait(self.driver, 10).until( EC.presence_of_element_located((By.CLASS_NAME, "gt_slice")) ) # 获取滑块图片的base64数据 canvas = self.driver.execute_script( "return document.getElementsByClassName('gt_slice')[0].toDataURL('image/png')" ) # 转换base64为图片 canvas = canvas.split(',')[1] image_data = base64.b64decode(canvas) image = Image.open(io.BytesIO(image_data)) return image except Exception as e: logger.error(f"获取滑块图片失败: {str(e)}") return None def get_gap(self, bg_image, slider_image): """计算滑块缺口位置""" try: # 转换图片格式 bg = cv2.cvtColor(np.array(bg_image), cv2.COLOR_RGB2BGR) slider = cv2.cvtColor(np.array(slider_image), cv2.COLOR_RGB2BGR) # 计算差异 diff = cv2.absdiff(bg, slider) mask = cv2.cvtColor(diff, cv2.COLOR_BGR2GRAY) ret, mask = cv2.threshold(mask, 127, 255, cv2.THRESH_BINARY) # 查找轮廓 contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) if contours: # 获取最大轮廓 max_contour = max(contours, key=cv2.contourArea) x, y, w, h = cv2.boundingRect(max_contour) return x return None except Exception as e: logger.error(f"计算缺口位置失败: {str(e)}") return None def generate_track(self, distance): """生成移动轨迹""" tracks = [] current = 0 mid = distance * 3 / 4 t = 0.2 v = 0 while current < distance: if current < mid: a = 2 else: a = -3 v0 = v v = v0 + a * t move = v0 * t + 1 / 2 * a * t * t current += move tracks.append(round(move)) # 微调 while sum(tracks) > distance: tracks[-1] -= 1 while sum(tracks) < distance: tracks.append(1) # 添加回退 tracks.extend([-1, -1, -2, -2, -1, -1]) return tracks def move_slider(self, slider, tracks): """移动滑块""" try: ActionChains(self.driver).click_and_hold(slider).perform() for track in tracks: ActionChains(self.driver).move_by_offset(track, random.randint(-1, 1)).perform() time.sleep(random.uniform(0.01, 0.02)) time.sleep(0.5) ActionChains(self.driver).release().perform() return True except Exception as e: logger.error(f"移动滑块失败: {str(e)}") return False def crack_captcha(self): """破解验证码主流程""" try: logger.info("开始破解验证码") # 获取滑块元素 slider = self.get_slider() if not slider: return False # 获取图片 bg_image = self.get_slider_background() slider_image = self.get_slider_image() if not bg_image or not slider_image: return False # 计算缺口位置 gap = self.get_gap(bg_image, slider_image) if not gap: return False logger.info(f"缺口位置: {gap}") # 生成轨迹 tracks = self.generate_track(gap) logger.info(f"生成轨迹: {len(tracks)}个点") # 移动滑块 result = self.move_slider(slider, tracks) if not result: return False # 等待验证结果 time.sleep(2) # 检查是否验证成功 try: success = WebDriverWait(self.driver, 5).until( EC.presence_of_element_located((By.CLASS_NAME, "gt_success")) ) if success: logger.info("验证成功") return True except: logger.error("验证失败") return False except Exception as e: logger.error(f"验证码破解失败: {str(e)}") return False def cleanup_driver(self): """清理 WebDriver 资源""" if hasattr(self, 'driver') and self.driver: try: self.driver.quit() except: pass finally: self.driver = None def run(self): """运行主程序""" try: logger.info("开始破解验证码流程") # 初始化浏览器 if not self.setup_driver(): return False # 访问目标页面 if not self.navigate_to_page(): return False # 破解验证码 max_attempts = 3 for attempt in range(max_attempts): logger.info(f"第 {attempt + 1} 次尝试破解验证码") if self.crack_captcha(): return True time.sleep(2) return False except Exception as e: logger.error(f"程序执行失败: {str(e)}") return False finally: logger.info("程序执行结束") self.cleanup_driver() def __del__(self): """析构函数,确保资源被清理""" self.cleanup_driver() self.cleanup_processes() if __name__ == "__main__": cracker = GeetestCracker() result = cracker.run() print("破解结果:", "成功" if result else "失败")