# -*- coding: utf-8 -*- import base64 from PIL import Image import ddddocr import json from http.server import BaseHTTPRequestHandler, HTTPServer import requests import ssl from urllib.parse import urlencode ssl._create_default_https_context = ssl._create_unverified_context class MyRequestHandler(BaseHTTPRequestHandler): def do_GET(self): self.send_response(200) self.send_header('Content-type', 'text/html') self.end_headers() self.wfile.write(b"Hello, this is a GET response!") def do_POST(self): content_length = int(self.headers['Content-Length']) post_data = self.rfile.read(content_length) try: data = json.loads(post_data.decode('utf-8')) print(data) cmd = data.get('cmd') print(cmd) if cmd == "ocr": base64_image_data = data.get('base64_image_data') # 确保键名与发送的数据匹配 if not base64_image_data: raise ValueError("Missing 'base64_image_data' in JSON payload") print("This is OCR command.") ocr(base64_image_data,self) elif cmd == "create_link": name = data.get('name') bookId = data.get('bookId') topUpTemplateId = data.get('topUpTemplateId') passBackTemplateId = data.get('passBackTemplateId') paidPoint = data.get('paidPoint') token = data.get('token') create_link(name,bookId,topUpTemplateId,passBackTemplateId,paidPoint,token,self) print("Command is empty.") elif cmd == "chapterSpread": if not all(k in data for k in ['cbid', 'OPENSESSID']): raise ValueError("Missing required parameters for chapterSpread") response = { "message": "Data received successfully", "received_data": temp_chapterSparead(data), } # chapterSpread(data) response_str = json.dumps(response) self.send_response(200) self.send_header('Content-type', 'application/json') self.end_headers() self.wfile.write(response_str.encode('utf-8')) elif cmd == "get_fq_book": key = data.get('key') response = { "message": "Data received successfully", "received_data": getFqBook(key,None), } response_str = json.dumps(response) self.send_response(200) self.send_header('Content-type', 'application/json') self.end_headers() self.wfile.write(response_str.encode('utf-8')) elif cmd == "get_gap": base64_image_data1 = data.get('base64_image_data1') base64_image_data0 = data.get('base64_image_data0') if not base64_image_data1: raise ValueError("Missing 'base64_image_data1' in JSON payload") if not base64_image_data0: raise ValueError("Missing 'base64_image_data0' in JSON payload") print("This is get_gap command.") response = { "message": "Data received successfully", "received_data": get_gap(key,None), } response_str = json.dumps(response) self.send_response(200) self.send_header('Content-type', 'application/json') self.end_headers() self.wfile.write(response_str.encode('utf-8')) else: self.send_response(200) self.send_header('Content-type', 'application/json') self.end_headers() self.wfile.write("not func") print("Unknown command.") except json.JSONDecodeError: self.send_error(400, "Invalid JSON data") except (ValueError, base64.binascii.Error, IOError) as e: self.send_error(400, f"Invalid base64 image data: {str(e)}") except Exception as e: self.send_error(500, f"Internal server error: {str(e)}") def ocr(base64_image_data,server): image_data = base64.b64decode(base64_image_data) image = Image.open(io.BytesIO(image_data)) # 直接在内存中处理图像 ocr = ddddocr.DdddOcr() result = ocr.classification(image) response = { "message": "Data received successfully", "received_data": result, } response_str = json.dumps(response) server.send_response(200) server.send_header('Content-type', 'application/json') server.end_headers() server.wfile.write(response_str.encode('utf-8')) def create_link(name,bookId,topUpTemplateId,passBackTemplateId,paidPoint,token,server): url = "https://ms.zhangwenpindu.cn/manage/distribution/createLink" data = { "name": name, "bookId": bookId, "passBackTemplateId": passBackTemplateId,#"5", 回传规则 "adPassbackTemplateId": "", "topUpTemplateId": topUpTemplateId, #充值 "mediaChannel": "1", #媒体渠道 "group": "35", #合作站点 "paidPoint": paidPoint,#"", #卡点收费 "site": "5" #推广平台 } headers['Authorization']=f'Bearer {token}' res = requests.post(url, headers=headers, data=data).json() # f'{"=" * 40}\n' # f'原始ID:{res.get("data").get("originalId")}\n' # f'小程序ID:{res.get("data").get("appId")}\n' # f'小程序启动页面:{res.get("data").get("launchPath")}\n' # f'小程序启动参数:{res.get("data").get("launchParam")}\n' # f'跳转路径参数:{res.get("data").get("fullPath")}\n' # f'微信站外链接:{res.get("data").get("wxMountLink")}\n' # f'{"=" * 40}\n', response = { "message": "create_link successfully", "received_data":{"originalId":res.get("data").get("originalId"), "appId":res.get("data").get("appId"), "launchPath":res.get("data").get("launchPath"), "fullPath":res.get("data").get("fullPath"), "wxMountLink":res.get("data").get("wxMountLink")} } response_str = json.dumps(response) server.send_response(200) server.send_header('Content-type', 'application/json') server.end_headers() server.wfile.write(response_str.encode('utf-8')) # print(res) print(data) def run(server_class=HTTPServer, handler_class=MyRequestHandler, port=8080): server_address = ('', port) httpd = server_class(server_address, handler_class) print(f"Serving on port {port}") httpd.serve_forever() headers_str = ''' Accept: application/json, text/plain, */* Accept-Language: zh-CN,zh;q=0.9 Cache-Control: no-cache Connection: keep-alive Content-Type: application/x-www-form-urlencoded Origin: https://manage.zhangwenpindu.cn Pragma: no-cache Referer: https://manage.zhangwenpindu.cn/ Sec-Fetch-Dest: empty Sec-Fetch-Mode: cors Sec-Fetch-Site: same-site User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36 sec-ch-ua: "Chromium";v="130", "Google Chrome";v="130", "Not?A_Brand";v="99" sec-ch-ua-mobile: ?0 sec-ch-ua-platform: "Windows" ''' headers = dict( [[y.strip() for y in x.strip().split(':', 1)] for x in headers_str.strip().split('\n') if x.strip()]) def getFqBook( key,post_data=None): return { 'header': '', 'body': '', 'http_code': 0 } def temp_chapterSparead(data): cookies = { 'Hm_lvt_990f9ab9737a266517417cc2949bb3f4': '1736394515', 'csrfToken': 'vN8OkvTZZfzerTHhodh7rc81', 'OPENSESSID': data['OPENSESSID'], 'yw_open_token': '6785d8c2ec046', 'is_read_notice': '6785d8c2ec046', 'sidebarStatus': '0', } print(cookies) headers = { 'authority': 'open.yuewen.com', 'accept': 'application/json, text/plain, */*', 'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6', 'priority': 'u=1, i', 'referer': 'https://open.yuewen.com/new/library', 'sec-ch-ua': '"Microsoft Edge";v="131", "Chromium";v="131", "Not_A Brand";v="24"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"Windows"', 'sec-fetch-dest': 'empty', 'sec-fetch-mode': 'cors', 'sec-fetch-site': 'same-origin', 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0' } url = 'https://open.yuewen.com/api/wechatspread/chapterSpread' params = { 'cbid': data['cbid'] } print(params) response = requests.get(url, params=params, cookies=cookies, headers=headers) return response.json() def get_gap(target_img:bytes, background_img:bytes): det = ddddocr.DdddOcr(det=False, ocr=False, show_ad=False) res = det.slide_match(target_img, background_img,simple_target=True) print(res) return res["target"][0] if __name__ == "__main__": import io # 确保导入了 io 模块以处理内存中的图像 run(port=8080)