123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188 |
- # -*- coding: utf-8 -*-
- import base64
- from PIL import Image
- import ddddocr
- import json
- from http.server import BaseHTTPRequestHandler, HTTPServer
- import requests
- import ssl
- from urllib.parse import urlencode
- ssl._create_default_https_context = ssl._create_unverified_context
- class MyRequestHandler(BaseHTTPRequestHandler):
- def do_GET(self):
- self.send_response(200)
- self.send_header('Content-type', 'text/html')
- self.end_headers()
- self.wfile.write(b"Hello, this is a GET response!")
- def do_POST(self):
- content_length = int(self.headers['Content-Length'])
- post_data = self.rfile.read(content_length)
- try:
- data = json.loads(post_data.decode('utf-8'))
- cmd = data.get('cmd')
- if cmd == "ocr":
- base64_image_data = data.get('base64_image_data') # 确保键名与发送的数据匹配
- if not base64_image_data:
- raise ValueError("Missing 'base64_image_data' in JSON payload")
- print("This is OCR command.")
- ocr(base64_image_data,self)
- elif cmd == "create_link":
- name = data.get('name')
- bookId = data.get('bookId')
- topUpTemplateId = data.get('topUpTemplateId')
- passBackTemplateId = data.get('passBackTemplateId')
- paidPoint = data.get('paidPoint')
- token = data.get('token')
- create_link(name,bookId,topUpTemplateId,passBackTemplateId,paidPoint,token,self)
- print("Command is empty.")
- elif cmd == "get_fq_book":
- key = data.get('key')
- response = {
- "message": "Data received successfully",
- "received_data": getFqBook(key,None),
- }
- response_str = json.dumps(response)
- self.send_response(200)
- self.send_header('Content-type', 'application/json')
- self.end_headers()
- self.wfile.write(response_str.encode('utf-8'))
- else:
- self.send_response(200)
- self.send_header('Content-type', 'application/json')
- self.end_headers()
- self.wfile.write("not func")
- print("Unknown command.")
- except json.JSONDecodeError:
- self.send_error(400, "Invalid JSON data")
- except (ValueError, base64.binascii.Error, IOError) as e:
- self.send_error(400, f"Invalid base64 image data: {str(e)}")
- except Exception as e:
- self.send_error(500, f"Internal server error: {str(e)}")
- def ocr(base64_image_data,server):
- image_data = base64.b64decode(base64_image_data)
- image = Image.open(io.BytesIO(image_data)) # 直接在内存中处理图像
- ocr = ddddocr.DdddOcr()
- result = ocr.classification(image)
- response = {
- "message": "Data received successfully",
- "received_data": result,
- }
- response_str = json.dumps(response)
- server.send_response(200)
- server.send_header('Content-type', 'application/json')
- server.end_headers()
- server.wfile.write(response_str.encode('utf-8'))
- def create_link(name,bookId,topUpTemplateId,passBackTemplateId,paidPoint,token,server):
- url = "https://ms.zhangwenpindu.cn/manage/distribution/createLink"
- data = {
- "name": name,
- "bookId": bookId,
- "passBackTemplateId": passBackTemplateId,#"5", 回传规则
- "adPassbackTemplateId": "",
- "topUpTemplateId": topUpTemplateId, #充值
- "mediaChannel": "1", #媒体渠道
- "group": "35", #合作站点
- "paidPoint": paidPoint,#"", #卡点收费
- "site": "5" #推广平台
- }
- headers['Authorization']=f'Bearer {token}'
- res = requests.post(url, headers=headers, data=data).json()
- # f'{"=" * 40}\n'
- # f'原始ID:{res.get("data").get("originalId")}\n'
- # f'小程序ID:{res.get("data").get("appId")}\n'
- # f'小程序启动页面:{res.get("data").get("launchPath")}\n'
- # f'小程序启动参数:{res.get("data").get("launchParam")}\n'
- # f'跳转路径参数:{res.get("data").get("fullPath")}\n'
- # f'微信站外链接:{res.get("data").get("wxMountLink")}\n'
- # f'{"=" * 40}\n',
- response = {
- "message": "create_link successfully",
- "received_data":{"originalId":res.get("data").get("originalId"),
- "appId":res.get("data").get("appId"),
- "launchPath":res.get("data").get("launchPath"),
- "fullPath":res.get("data").get("fullPath"),
- "wxMountLink":res.get("data").get("wxMountLink")}
- }
- response_str = json.dumps(response)
- server.send_response(200)
- server.send_header('Content-type', 'application/json')
- server.end_headers()
- server.wfile.write(response_str.encode('utf-8'))
- # print(res)
- print(data)
- def run(server_class=HTTPServer, handler_class=MyRequestHandler, port=8080):
- server_address = ('', port)
- httpd = server_class(server_address, handler_class)
- print(f"Serving on port {port}")
- httpd.serve_forever()
- headers_str = '''
- Accept: application/json, text/plain, */*
- Accept-Language: zh-CN,zh;q=0.9
- Cache-Control: no-cache
- Connection: keep-alive
- Content-Type: application/x-www-form-urlencoded
- Origin: https://manage.zhangwenpindu.cn
- Pragma: no-cache
- Referer: https://manage.zhangwenpindu.cn/
- Sec-Fetch-Dest: empty
- Sec-Fetch-Mode: cors
- Sec-Fetch-Site: same-site
- User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36
- sec-ch-ua: "Chromium";v="130", "Google Chrome";v="130", "Not?A_Brand";v="99"
- sec-ch-ua-mobile: ?0
- sec-ch-ua-platform: "Windows"
- '''
- headers = dict(
- [[y.strip() for y in x.strip().split(':', 1)] for x in headers_str.strip().split('\n') if x.strip()])
- def getFqBook( key,post_data=None):
- # key = 'PHVJKLVWRUY' # Get this from your input method
- if not key:
- return
- url = f"https://api.whbfxb.cn/api/novelsale/reader/get/content/v1/?aid=40013183&device_brand=realme&device_platform=android&device_type=RMX2020&mp_sdk_version=3.21.0&novelsale_app_scene=023001&version_code=230&key={key}&item_source=1&module_name=ad_link&click_id=__CLICKID__&clickid=__CLICKID__&creativetype=__CTYPE__&demand_id=0&item_id=&media_source=1&mid1=__MID1__&mid2=__MID2__&mid3=__MID3__&mid4=__MID4__&mid5=__MID5__&projectid=__PROJECT_ID__&promotionid=__PROMOTION_ID__&request_id=__REQUESTID__&book_id=&host_novelsale_app_id=40013183"
- n_headers = {
- 'Connection': 'keep-alive',
- 'Cookie': 'sid_tt=617946caef227c4a80e1dd9cb290515e;ssid_ucp_v1=1.0.0-KGY3OGU0YWM0OTMzMjI1YTg4OThiMTAyMGI3YmFhOTIxNjE2YzZiYzQKFQjkktCcvMz_ARCf9PC5Bhjv6CA4CBoCaGwiIDYxNzk0NmNhZWYyMjdjNGE4MGUxZGQ5Y2IyOTA1MTVl;is_staff_user=false;sid_ucp_v1=1.0.0-KGY3OGU0YWM0OTMzMjI1YTg4OThiMTAyMGI3YmFhOTIxNjE2YzZiYzQKFQjkktCcvMz_ARCf9PC5Bhjv6CA4CBoCaGwiIDYxNzk0NmNhZWYyMjdjNGE4MGUxZGQ5Y2IyOTA1MTVl;sessionid=617946caef227c4a80e1dd9cb290515e;ttwid=1%7CG17ZXFInuaAvJxxf46K8gU7KOBUO5aC89gBmDvnA_fo%7C1732000431%7C90ea8439ff8c370df5be3f31d7187a23ed8a2318b4a1aa7d345283ba395bfcb2;sessionid_ss=617946caef227c4a80e1dd9cb290515e;sid_guard=617946caef227c4a80e1dd9cb290515e%7C1732000287%7C5184000%7CSat%2C+18-Jan-2025+07%3A11%3A27+GMT;uid_tt=560968d5ca0089cc8ce610b12f7f2191;passport_csrf_token=8a66478b232599457f588305cb6317dd;uid_tt_ss=560968d5ca0089cc8ce610b12f7f2191;odin_tt=38b1a8591565cfabfde8e2ff37a2d033e74a58e85a42dbf7c48fbca909ebd43b2a0ef306193946a4c74c64f005d15aaef66f3f0b5fe61669cd56f37b355e1591;passport_csrf_token_default=8a66478b232599457f588305cb6317dd;store-region=cn-gs;store-region-src=uid;n_mh=9-mIeuD4wZnlYrrOvfzG3MuT6aQmCUtmr8FxV8Kl8xY',
- 'User-Agent': 'Mozilla/5.0 (Linux; Android 10; RMX2020 Build/QP1A.190711.020; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/120.0.6099.193 Mobile Safari/537.36 aweme/29.4.0 ToutiaoMicroApp/3.21.0 PluginVersion/29409006',
- 'Content-Type': 'application/json',
- 'Referer': 'https://tmaservice.developer.toutiao.com/?appid=tt1b316d8c8401e42101&version=2.3.0'
- }
- try:
- if post_data:
- if isinstance(post_data, dict):
- response = requests.post(url, headers=n_headers, data=urlencode(post_data), allow_redirects=True, verify=False, timeout=(10, 30))
- else:
- response = requests.post(url, headers=n_headers, data=post_data, allow_redirects=True, verify=False, timeout=(10, 30))
- else:
- response = requests.get(url, headers=n_headers, allow_redirects=True, verify=False, timeout=(10, 30))
-
- return {
- 'header': str(response.headers),
- 'body': response.text,
- 'http_code': response.status_code
- }
- except Exception as e:
- return {
- 'header': '',
- 'body': str(e),
- 'http_code': 0
- }
- if __name__ == "__main__":
- import io # 确保导入了 io 模块以处理内存中的图像
- run(port=8080)
|