agent.lua 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. local skynet = require "skynet"
  2. local socket = require "skynet.socket"
  3. local cjson = require "cjson"
  4. local tools = require "tools"
  5. local websocket = require "http.websocket"
  6. local mysqldbx = require "mysqldbx"
  7. local closing = false
  8. local isRunTask = false --是否在工作
  9. local isActive = false --是否激活
  10. local handle = {}
  11. local M = {}
  12. local RECV_TASK = {}
  13. local AGENT_ID = 0
  14. function handle.connect(id)
  15. print("ws connect from: " .. tostring(id))
  16. end
  17. function handle.handshake(id, header, url)
  18. local addr = websocket.addrinfo(id)
  19. print("ws handshake from: " .. tostring(id), "url", url, "addr:", addr)
  20. print("----header-----")
  21. for k,v in pairs(header) do
  22. print(k,v)
  23. end
  24. print("--------------")
  25. end
  26. --工人已完成
  27. function M.finish_task(id, msg)
  28. skynet.error("工人已完成:",cjson.encode(msg['data']))
  29. isRunTask = false --等待老板发送新任务
  30. end
  31. --工人未完成
  32. function M.unfinish_task(id, msg)
  33. skynet.error("工人未完成:",msg['data'].error_info)
  34. isRunTask = false
  35. end
  36. --工人准备工作
  37. function M.status_task(id, msg)
  38. skynet.error("工人准备工作:",id,msg)
  39. AGENT_ID = id
  40. isActive = true
  41. skynet.fork(function()
  42. RECV_TASK.updateAppConfig()
  43. -- skynet.sleep(200)
  44. RECV_TASK.updateFilterConfig()
  45. -- skynet.sleep(200)
  46. RECV_TASK.updateMainConfig()
  47. -- skynet.sleep(200)
  48. RECV_TASK.updatePlatformConfig()
  49. end)
  50. end
  51. function handle.message(id, msg, type)
  52. if msg == "ping" then
  53. websocket.write(id,"pong")
  54. return
  55. end
  56. msg = cjson.decode(msg)
  57. if M[msg.cmd] then
  58. M[msg.cmd](id,msg)
  59. else
  60. skynet.error("no find:",msg)
  61. end
  62. end
  63. function handle.ping(id)
  64. print("ws ping from: " .. tostring(id) .. "\n")
  65. end
  66. function handle.pong(id)
  67. print("ws pong from: " .. tostring(id))
  68. end
  69. function handle.close(id, code, reason)
  70. print("ws close from: " .. tostring(id), code, reason)
  71. AGENT_ID = 0
  72. isRunTask = false
  73. isActive = false
  74. closing = true
  75. end
  76. function handle.error(id)
  77. print("ws error from: " .. tostring(id))
  78. end
  79. local connect = function(fd, addr)
  80. if closing then
  81. return
  82. end
  83. local protocol = "ws"
  84. local ok, err = websocket.accept(fd, handle, protocol, addr)
  85. if not ok then
  86. skynet.error(err)
  87. else
  88. print("websocket connect!")
  89. end
  90. end
  91. function RECV_TASK.isActive()
  92. return isActive
  93. end
  94. function RECV_TASK.isRunStak()
  95. return isRunTask
  96. end
  97. function RECV_TASK.startOneTask(msg_body)
  98. skynet.error(" 收到老板的任务:",msg_body,AGENT_ID)
  99. local body = cjson.decode(msg_body)
  100. if AGENT_ID~=0 then
  101. isRunTask = true --工人进入工作状态
  102. websocket.write(AGENT_ID, cjson.encode({cmd="find_book",dy_url=body.dy_url}))
  103. end
  104. end
  105. function RECV_TASK.updateAppConfig(msg_body)
  106. -- local body = cjson.decode(msg_body)
  107. if AGENT_ID~=0 then
  108. isRunTask = true --工人进入工作状态
  109. websocket.write(AGENT_ID, cjson.encode({cmd="updateAppConfig"}))
  110. end
  111. end
  112. function RECV_TASK.updatePlatformConfig(msg_body)
  113. if AGENT_ID~=0 then
  114. isRunTask = true --工人进入工作状态
  115. websocket.write(AGENT_ID, cjson.encode({cmd="updatePlatformConfig"}))
  116. end
  117. end
  118. function RECV_TASK.updateMainConfig(msg_body)
  119. if AGENT_ID~=0 then
  120. isRunTask = true --工人进入工作状态
  121. websocket.write(AGENT_ID, cjson.encode({cmd="updateMainConfig"}))
  122. end
  123. end
  124. function RECV_TASK.updateFilterConfig(msg_body)
  125. if AGENT_ID~=0 then
  126. isRunTask = true --工人进入工作状态
  127. websocket.write(AGENT_ID, cjson.encode({cmd="updateFilterConfig"}))
  128. end
  129. end
  130. function RECV_TASK.addMain(msg_body)
  131. if AGENT_ID~=0 then
  132. isRunTask = true --工人进入工作状态
  133. --根据主体匹配,筛选出需要创建推广连接的书
  134. websocket.write(AGENT_ID, cjson.encode({cmd="addMain",data=msg_body}))
  135. end
  136. end
  137. local dispatch = function(session, address,id, addr,cmd,...)
  138. if(cmd=="connect") then
  139. connect(id,addr)
  140. else
  141. local f = assert(RECV_TASK[cmd], cmd .. " not found")
  142. skynet.retpack(f(...))
  143. end
  144. end
  145. skynet.start(function()
  146. skynet.dispatch("lua", dispatch)
  147. end)