agent.lua 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. local skynet = require "skynet"
  2. local socket = require "skynet.socket"
  3. local cjson = require "cjson"
  4. local tools = require "tools"
  5. local websocket = require "http.websocket"
  6. local mysqldbx = require "mysqldbx"
  7. local closing = false
  8. local isRunTask = false --是否在工作
  9. local isActive = false --是否激活
  10. local handle = {}
  11. local M = {}
  12. local RECV_TASK = {}
  13. local AGENT_ID = 0
  14. function handle.connect(id)
  15. print("ws connect from: " .. tostring(id))
  16. end
  17. function handle.handshake(id, header, url)
  18. local addr = websocket.addrinfo(id)
  19. print("ws handshake from: " .. tostring(id), "url", url, "addr:", addr)
  20. print("----header-----")
  21. for k,v in pairs(header) do
  22. print(k,v)
  23. end
  24. print("--------------")
  25. end
  26. --工人已完成
  27. function M.finish_task(id, msg)
  28. skynet.error("工人已完成:",cjson.encode(msg['data']))
  29. isRunTask = false --等待老板发送新任务
  30. end
  31. --工人未完成
  32. function M.unfinish_task(id, msg)
  33. skynet.error("工人未完成:",msg['data'].error_info)
  34. isRunTask = false
  35. end
  36. --工人准备工作
  37. function M.status_task(id, msg)
  38. skynet.error("工人准备工作:",id,msg)
  39. AGENT_ID = id
  40. isActive = true
  41. skynet.fork(function()
  42. RECV_TASK.updateAppConfig()
  43. -- skynet.sleep(200)
  44. RECV_TASK.updateFilterConfig()
  45. -- skynet.sleep(200)
  46. RECV_TASK.updateMainConfig()
  47. -- skynet.sleep(200)
  48. RECV_TASK.updatePlatformConfig()
  49. RECV_TASK.updateFqKeyList()
  50. RECV_TASK.updateBlackBooks()
  51. RECV_TASK.getPullDataConfig()
  52. end)
  53. end
  54. function handle.message(id, msg, type)
  55. if msg == "ping" then
  56. websocket.write(id,"pong")
  57. return
  58. end
  59. msg = cjson.decode(msg)
  60. if M[msg.cmd] then
  61. M[msg.cmd](id,msg)
  62. else
  63. skynet.error("no find:",msg)
  64. end
  65. end
  66. function handle.ping(id)
  67. print("ws ping from: " .. tostring(id) .. "\n")
  68. end
  69. function handle.pong(id)
  70. print("ws pong from: " .. tostring(id))
  71. end
  72. function handle.close(id, code, reason)
  73. print("ws close from: " .. tostring(id), code, reason)
  74. AGENT_ID = 0
  75. isRunTask = false
  76. isActive = false
  77. closing = true
  78. end
  79. function handle.error(id)
  80. print("ws error from: " .. tostring(id))
  81. end
  82. local connect = function(fd, addr)
  83. if closing then
  84. return
  85. end
  86. local protocol = "ws"
  87. local ok, err = websocket.accept(fd, handle, protocol, addr)
  88. if not ok then
  89. skynet.error(err)
  90. else
  91. print("websocket connect!")
  92. end
  93. end
  94. function RECV_TASK.isActive()
  95. return isActive
  96. end
  97. function RECV_TASK.isRunStak()
  98. return isRunTask
  99. end
  100. function RECV_TASK.startOneTask(msg_body)
  101. skynet.error(" 收到老板的任务:",msg_body,AGENT_ID)
  102. local body = cjson.decode(msg_body)
  103. if AGENT_ID~=0 then
  104. isRunTask = true --工人进入工作状态
  105. websocket.write(AGENT_ID, cjson.encode({cmd="find_book",dy_url=body.dy_url}))
  106. end
  107. end
  108. function RECV_TASK.updatePullConig(msg_body)
  109. if AGENT_ID~=0 then
  110. isRunTask = true --工人进入工作状态
  111. websocket.write(AGENT_ID, cjson.encode({cmd="updatePullConig"}))
  112. end
  113. end
  114. function RECV_TASK.getPullDataConfig(msg_body)
  115. if AGENT_ID~=0 then
  116. isRunTask = true --工人进入工作状态
  117. websocket.write(AGENT_ID, cjson.encode({cmd="getPullDataConfig"}))
  118. end
  119. end
  120. function RECV_TASK.updateAppConfig(msg_body)
  121. -- local body = cjson.decode(msg_body)
  122. if AGENT_ID~=0 then
  123. isRunTask = true --工人进入工作状态
  124. websocket.write(AGENT_ID, cjson.encode({cmd="updateAppConfig"}))
  125. end
  126. end
  127. function RECV_TASK.updatePlatformConfig(msg_body)
  128. if AGENT_ID~=0 then
  129. isRunTask = true --工人进入工作状态
  130. websocket.write(AGENT_ID, cjson.encode({cmd="updatePlatformConfig"}))
  131. end
  132. end
  133. function RECV_TASK.updateMainConfig(msg_body)
  134. if AGENT_ID~=0 then
  135. isRunTask = true --工人进入工作状态
  136. websocket.write(AGENT_ID, cjson.encode({cmd="updateMainConfig"}))
  137. end
  138. end
  139. function RECV_TASK.updateFilterConfig(msg_body)
  140. if AGENT_ID~=0 then
  141. isRunTask = true --工人进入工作状态
  142. websocket.write(AGENT_ID, cjson.encode({cmd="updateFilterConfig"}))
  143. end
  144. end
  145. function RECV_TASK.addMain(msg_body)
  146. if AGENT_ID~=0 then
  147. isRunTask = true --工人进入工作状态
  148. --根据主体匹配,筛选出需要创建推广连接的书
  149. websocket.write(AGENT_ID, cjson.encode({cmd="addMain",data=msg_body}))
  150. end
  151. end
  152. function RECV_TASK.updateFqKeyList(msg_body)
  153. if AGENT_ID~=0 then
  154. isRunTask = true --工人进入工作状态
  155. --根据主体匹配,筛选出需要创建推广连接的书
  156. websocket.write(AGENT_ID, cjson.encode({cmd="updateFqKeyList",data=msg_body}))
  157. end
  158. end
  159. function RECV_TASK.updateBlackBooks(msg_body)
  160. if AGENT_ID~=0 then
  161. isRunTask = true --工人进入工作状态
  162. --根据主体匹配,筛选出需要创建推广连接的书
  163. websocket.write(AGENT_ID, cjson.encode({cmd="updateBlackBooks",data=msg_body}))
  164. end
  165. end
  166. local dispatch = function(session, address,id, addr,cmd,...)
  167. if(cmd=="connect") then
  168. connect(id,addr)
  169. else
  170. local f = assert(RECV_TASK[cmd], cmd .. " not found")
  171. skynet.retpack(f(...))
  172. end
  173. end
  174. skynet.start(function()
  175. skynet.dispatch("lua", dispatch)
  176. end)