123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591 |
- import os
- import time
- import logging
- import signal
- import subprocess
- import cv2
- import numpy as np
- from PIL import Image
- import io
- import base64
- import random
- from selenium import webdriver
- from selenium.webdriver.firefox.options import Options
- from selenium.webdriver.firefox.service import Service
- from selenium.webdriver.support.ui import WebDriverWait
- from selenium.webdriver.support import expected_conditions as EC
- from selenium.webdriver.common.by import By
- from selenium.webdriver.common.action_chains import ActionChains
- from contextlib import contextmanager
- # 配置日志
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(levelname)s - %(message)s'
- )
- logger = logging.getLogger(__name__)
- class TimeoutException(Exception):
- pass
- @contextmanager
- def time_limit(seconds):
- def signal_handler(signum, frame):
- raise TimeoutException("Timed out!")
- signal.signal(signal.SIGALRM, signal_handler)
- signal.alarm(seconds)
- try:
- yield
- finally:
- signal.alarm(0)
- class GeetestCracker:
- def __init__(self):
- self.driver = None
- self.wait = None
- logger.info("初始化 GeetestCracker")
- def cleanup_processes(self):
- """清理残留进程"""
- try:
- subprocess.run(['pkill', '-f', 'firefox'], stderr=subprocess.DEVNULL)
- subprocess.run(['pkill', '-f', 'geckodriver'], stderr=subprocess.DEVNULL)
- time.sleep(2)
- logger.info("清理残留进程完成")
- except Exception as e:
- logger.error(f"清理进程时出错: {str(e)}")
- def setup_driver(self):
- """配置并初始化 WebDriver"""
- try:
- logger.info("开始设置浏览器驱动")
-
- # 首先清理可能的残留进程
- self.cleanup_processes()
-
- # 检查 geckodriver
- from shutil import which
- geckodriver_path = which('geckodriver')
-
- if not geckodriver_path:
- logger.error("未找到 geckodriver")
- return False
-
- logger.info(f"找到 geckodriver: {geckodriver_path}")
-
- # Firefox 配置 - 修改配置以提高稳定性
- options = Options()
-
- # 核心配置
- options.set_preference('marionette', True)
- options.set_preference('marionette.port', 2828) # 固定端口
- options.set_preference('network.http.connection-timeout', 10000)
- options.set_preference('network.http.response.timeout', 10000)
-
- # 禁用 JavaScript JIT
- options.set_preference('javascript.options.ion', False)
- options.set_preference('javascript.options.baselinejit', False)
-
- # 禁用硬件加速
- options.set_preference('layers.acceleration.disabled', True)
-
- # 禁用不必要的功能
- options.set_preference('browser.cache.disk.enable', False)
- options.set_preference('browser.cache.memory.enable', False)
- options.set_preference('browser.cache.offline.enable', False)
- options.set_preference('network.http.use-cache', False)
- options.set_preference('browser.tabs.remote.autostart', False)
- options.set_preference('browser.tabs.remote.autostart.2', False)
- options.set_preference('dom.ipc.processCount', 1)
- options.set_preference('browser.sessionstore.resume_from_crash', False)
-
- # 添加必要的参数
- options.add_argument('--headless')
- options.add_argument('--no-sandbox')
- options.add_argument('--disable-dev-shm-usage')
- options.add_argument('--disable-gpu')
- options.add_argument('--disable-extensions')
- options.add_argument('--disable-infobars')
- options.add_argument('--disable-notifications')
- options.add_argument('--window-size=1280,800')
-
- logger.info("创建 WebDriver 实例")
-
- # 使用临时目录
- import tempfile
- temp_dir = tempfile.mkdtemp()
-
- # Firefox 临时配置目录
- options.set_preference('profile', temp_dir)
-
- # 创建 Service 对象,添加更多日志选项
- service = Service(
- geckodriver_path,
- log_output=os.path.join(temp_dir, 'geckodriver.log'),
- service_args=[
- '--log', 'trace',
- '--marionette-port', '2828'
- ]
- )
-
- # 设置超时限制并重试
- max_attempts = 3
- for attempt in range(max_attempts):
- try:
- logger.info(f"尝试创建 WebDriver 实例 (尝试 {attempt + 1}/{max_attempts})")
-
- # 设置环境变量
- os.environ['MOZ_HEADLESS'] = '1'
- os.environ['DISPLAY'] = ':99'
-
- # 创建虚拟显示
- try:
- subprocess.run(['Xvfb', ':99', '-screen', '0', '1280x800x24'],
- start_new_session=True,
- stdout=subprocess.DEVNULL,
- stderr=subprocess.DEVNULL)
- except:
- logger.warning("Xvfb 启动失败,继续尝试")
-
- # 设置严格的超时
- with time_limit(20): # 减少超时时间
- self.driver = webdriver.Firefox(
- service=service,
- options=options
- )
-
- # 设置页面加载超时
- self.driver.set_page_load_timeout(10)
- self.driver.set_script_timeout(10)
- self.wait = WebDriverWait(self.driver, 10)
-
- # 测试连接
- logger.info("测试浏览器连接")
- self.driver.get('about:blank')
-
- logger.info("浏览器驱动初始化成功")
- return True
-
- except TimeoutException:
- logger.error(f"第 {attempt + 1} 次尝试超时")
- except Exception as e:
- logger.error(f"第 {attempt + 1} 次尝试失败: {str(e)}")
-
- # 清理资源
- self.cleanup_driver()
- self.cleanup_processes()
-
- if attempt < max_attempts - 1:
- logger.info("等待后重试...")
- time.sleep(5)
- else:
- raise Exception("在多次尝试后仍然失败")
-
- except Exception as e:
- logger.error(f"浏览器驱动初始化失败: {str(e)}")
- self.cleanup_driver()
- return False
-
- finally:
- # 清理临时目录
- try:
- import shutil
- shutil.rmtree(temp_dir, ignore_errors=True)
- except:
- pass
-
- # 停止虚拟显示
- try:
- subprocess.run(['pkill', 'Xvfb'],
- stdout=subprocess.DEVNULL,
- stderr=subprocess.DEVNULL)
- except:
- pass
- def cleanup_processes(self):
- """更彻底地清理残留进程"""
- try:
- # 使用 pkill 清理进程
- commands = [
- ['pkill', '-f', 'firefox'],
- ['pkill', '-f', 'geckodriver'],
- ['pkill', '-f', 'Xvfb'],
- ['killall', 'firefox'],
- ['killall', 'geckodriver'],
- ['killall', 'Xvfb']
- ]
-
- for cmd in commands:
- try:
- subprocess.run(cmd,
- stdout=subprocess.DEVNULL,
- stderr=subprocess.DEVNULL)
- except:
- continue
-
- # 使用 ps 查找并强制终止进程
- try:
- ps_output = subprocess.check_output(['ps', 'aux']).decode()
- for line in ps_output.split('\n'):
- if 'firefox' in line or 'geckodriver' in line or 'Xvfb' in line:
- try:
- pid = int(line.split()[1])
- os.kill(pid, signal.SIGKILL)
- except:
- continue
- except:
- pass
-
- time.sleep(2)
- logger.info("清理残留进程完成")
- except Exception as e:
- logger.error(f"清理进程时出错: {str(e)}")
- def navigate_to_page(self):
- """导航到目标页面"""
- try:
- logger.info("正在访问目标网页")
-
- max_attempts = 3
- for attempt in range(max_attempts):
- try:
- logger.info(f"尝试访问页面 (尝试 {attempt + 1}/{max_attempts})")
-
- # 增加页面加载超时时间
- self.driver.set_page_load_timeout(30)
-
- # 直接访问目标验证码页面
- target_url = 'https://open.yuewen.com/'
- logger.info(f"访问目标URL: {target_url}")
- self.driver.get(target_url)
-
- # 等待页面加载完成
- logger.info("等待页面加载")
- WebDriverWait(self.driver, 30).until(
- lambda driver: driver.execute_script("return document.readyState") == "complete"
- )
-
- # 检查页面是否正确加载
- try:
- # 验证是否存在验证码相关元素
- slide_button = WebDriverWait(self.driver, 10).until(
- EC.presence_of_element_located((By.CLASS_NAME, "geetest_slider_button"))
- )
-
- if not slide_button:
- raise Exception("未找到滑块验证码元素")
-
- logger.info("验证码页面加载成功")
- return True
-
- except Exception as e:
- logger.error(f"验证码元素检查失败: {str(e)}")
- raise
-
- except Exception as e:
- logger.error(f"第 {attempt + 1} 次尝试失败: {str(e)}")
- if attempt < max_attempts - 1:
- logger.info("等待后重试...")
-
- # 重置浏览器状态
- try:
- self.driver.execute_script("""
- window.stop();
- window.location.href = 'about:blank';
- """)
- except:
- pass
-
- time.sleep(5)
- else:
- raise Exception("页面访问在多次尝试后仍然失败")
-
- except Exception as e:
- logger.error(f"页面访问失败: {str(e)}")
- return False
-
- finally:
- # 重置页面加载超时为默认值
- try:
- self.driver.set_page_load_timeout(30)
- except:
- pass
- def setup_browser_config(self):
- """配置浏览器网络设置"""
- try:
- # 配置网络设置
- self.driver.execute_script("""
- navigator.connection = {
- effectiveType: '4g',
- rtt: 50,
- downlink: 10,
- saveData: false
- };
- """)
-
- # 设置自定义请求头
- self.driver.execute_cdp_cmd('Network.setUserAgentOverride', {
- "userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
- })
-
- # 启用网络监控
- self.driver.execute_cdp_cmd('Network.enable', {})
-
- # 设置网络条件
- self.driver.execute_cdp_cmd('Network.emulateNetworkConditions', {
- 'offline': False,
- 'latency': 20, # 延迟时间(毫秒)
- 'downloadThroughput': 780 * 1024 / 8, # 下载速度(字节/秒)
- 'uploadThroughput': 330 * 1024 / 8, # 上传速度(字节/秒)
- 'connectionType': 'wifi'
- })
-
- return True
- except Exception as e:
- logger.error(f"配置浏览器网络设置失败: {str(e)}")
- return False
- def get_slider(self):
- """获取滑块元素"""
- try:
- logger.info("寻找滑块元素")
-
- # 尝试多种定位方式
- selectors = [
- (By.CLASS_NAME, "gt_slider_knob"),
- (By.CLASS_NAME, "geetest_slider_button"),
- (By.CLASS_NAME, "gt_slider_knob_new"),
- (By.CSS_SELECTOR, ".gt_slider_knob"),
- (By.CSS_SELECTOR, ".geetest_slider_button"),
- (By.XPATH, "//div[contains(@class, 'slider')]//div[contains(@class, 'knob')]")
- ]
-
- # 等待任意一个元素出现
- for selector in selectors:
- try:
- logger.info(f"尝试使用选择器: {selector}")
- element = WebDriverWait(self.driver, 10).until(
- EC.presence_of_element_located(selector)
- )
- if element:
- logger.info(f"成功找到滑块元素: {selector}")
- return element
- except:
- continue
-
- raise Exception("未能找到滑块元素")
-
- except Exception as e:
- logger.error(f"获取滑块元素失败: {str(e)}")
- return None
- def get_slider_background(self):
- """获取背景图片"""
- try:
- # 等待背景图片加载
- background = WebDriverWait(self.driver, 10).until(
- EC.presence_of_element_located((By.CLASS_NAME, "gt_box"))
- )
- # 获取背景图片的base64数据
- canvas = self.driver.execute_script(
- "return document.getElementsByClassName('gt_box')[0].toDataURL('image/png')"
- )
- # 转换base64为图片
- canvas = canvas.split(',')[1]
- image_data = base64.b64decode(canvas)
- image = Image.open(io.BytesIO(image_data))
- return image
- except Exception as e:
- logger.error(f"获取背景图片失败: {str(e)}")
- return None
- def get_slider_image(self):
- """获取滑块图片"""
- try:
- # 等待滑块图片加载
- slider = WebDriverWait(self.driver, 10).until(
- EC.presence_of_element_located((By.CLASS_NAME, "gt_slice"))
- )
- # 获取滑块图片的base64数据
- canvas = self.driver.execute_script(
- "return document.getElementsByClassName('gt_slice')[0].toDataURL('image/png')"
- )
- # 转换base64为图片
- canvas = canvas.split(',')[1]
- image_data = base64.b64decode(canvas)
- image = Image.open(io.BytesIO(image_data))
- return image
- except Exception as e:
- logger.error(f"获取滑块图片失败: {str(e)}")
- return None
- def get_gap(self, bg_image, slider_image):
- """计算滑块缺口位置"""
- try:
- # 转换图片格式
- bg = cv2.cvtColor(np.array(bg_image), cv2.COLOR_RGB2BGR)
- slider = cv2.cvtColor(np.array(slider_image), cv2.COLOR_RGB2BGR)
-
- # 计算差异
- diff = cv2.absdiff(bg, slider)
- mask = cv2.cvtColor(diff, cv2.COLOR_BGR2GRAY)
- ret, mask = cv2.threshold(mask, 127, 255, cv2.THRESH_BINARY)
-
- # 查找轮廓
- contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
-
- if contours:
- # 获取最大轮廓
- max_contour = max(contours, key=cv2.contourArea)
- x, y, w, h = cv2.boundingRect(max_contour)
- return x
- return None
- except Exception as e:
- logger.error(f"计算缺口位置失败: {str(e)}")
- return None
- def generate_track(self, distance):
- """生成移动轨迹"""
- tracks = []
- current = 0
- mid = distance * 3 / 4
- t = 0.2
- v = 0
-
- while current < distance:
- if current < mid:
- a = 2
- else:
- a = -3
- v0 = v
- v = v0 + a * t
- move = v0 * t + 1 / 2 * a * t * t
- current += move
- tracks.append(round(move))
-
- # 微调
- while sum(tracks) > distance:
- tracks[-1] -= 1
- while sum(tracks) < distance:
- tracks.append(1)
-
- # 添加回退
- tracks.extend([-1, -1, -2, -2, -1, -1])
- return tracks
- def move_slider(self, slider, tracks):
- """移动滑块"""
- try:
- ActionChains(self.driver).click_and_hold(slider).perform()
- for track in tracks:
- ActionChains(self.driver).move_by_offset(track, random.randint(-1, 1)).perform()
- time.sleep(random.uniform(0.01, 0.02))
- time.sleep(0.5)
- ActionChains(self.driver).release().perform()
- return True
- except Exception as e:
- logger.error(f"移动滑块失败: {str(e)}")
- return False
- def crack_captcha(self):
- """破解验证码主流程"""
- try:
- logger.info("开始破解验证码")
-
- # 获取滑块元素
- slider = self.get_slider()
- if not slider:
- return False
-
- # 获取图片
- bg_image = self.get_slider_background()
- slider_image = self.get_slider_image()
- if not bg_image or not slider_image:
- return False
-
- # 计算缺口位置
- gap = self.get_gap(bg_image, slider_image)
- if not gap:
- return False
- logger.info(f"缺口位置: {gap}")
-
- # 生成轨迹
- tracks = self.generate_track(gap)
- logger.info(f"生成轨迹: {len(tracks)}个点")
-
- # 移动滑块
- result = self.move_slider(slider, tracks)
- if not result:
- return False
-
- # 等待验证结果
- time.sleep(2)
-
- # 检查是否验证成功
- try:
- success = WebDriverWait(self.driver, 5).until(
- EC.presence_of_element_located((By.CLASS_NAME, "gt_success"))
- )
- if success:
- logger.info("验证成功")
- return True
- except:
- logger.error("验证失败")
- return False
-
- except Exception as e:
- logger.error(f"验证码破解失败: {str(e)}")
- return False
- def cleanup_driver(self):
- """清理 WebDriver 资源"""
- if hasattr(self, 'driver') and self.driver:
- try:
- self.driver.quit()
- except:
- pass
- finally:
- self.driver = None
- def run(self):
- """运行主程序"""
- try:
- logger.info("开始破解验证码流程")
-
- # 初始化浏览器
- if not self.setup_driver():
- return False
-
- # 访问目标页面
- if not self.navigate_to_page():
- return False
-
- # 破解验证码
- max_attempts = 3
- for attempt in range(max_attempts):
- logger.info(f"第 {attempt + 1} 次尝试破解验证码")
- if self.crack_captcha():
- return True
- time.sleep(2)
-
- return False
-
- except Exception as e:
- logger.error(f"程序执行失败: {str(e)}")
- return False
-
- finally:
- logger.info("程序执行结束")
- self.cleanup_driver()
- def __del__(self):
- """析构函数,确保资源被清理"""
- self.cleanup_driver()
- self.cleanup_processes()
- if __name__ == "__main__":
- cracker = GeetestCracker()
- result = cracker.run()
- print("破解结果:", "成功" if result else "失败")
|