redisbackup.lua 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. --[[ config
  2. root = "./"
  3. listen = "127.0.0.1:8786"
  4. redisaddr = "127.0.0.1:6379[1]"
  5. dbfile = root .. "backup.db"
  6. thread = 4
  7. logger = nil
  8. harbor = 1
  9. address = "127.0.0.1:8788"
  10. master = "127.0.0.1:8787"
  11. start = "redisbackup"
  12. standalone = "127.0.0.1:8787"
  13. luaservice = root.."service/?.lua"
  14. cpath = root.."service/?.so"
  15. ]]
  16. local skynet = require "skynet"
  17. local socket = require "socket"
  18. local NAME = "redisbackup"
  19. local mode, id , db = ...
  20. local fd = nil -- redis socket fd
  21. if mode == nil then
  22. local listen_addr = skynet.getenv "listen"
  23. local addr, port = string.match(listen_addr, "([^:%s]+)%s*:%s*([^:%s]+)")
  24. print(string.format("Listen on %s:%d", addr, port))
  25. skynet.start(function()
  26. local db = skynet.newservice(NAME , "dbserver")
  27. local id = socket.listen(addr, port)
  28. socket.start(id , function(id, addr)
  29. -- you can also call skynet.newservice for this socket id
  30. skynet.newservice(NAME, "dispatcher", id, db)
  31. end)
  32. end)
  33. elseif mode == "dispatcher" then
  34. id = tonumber(id)
  35. db = tonumber(db)
  36. local function mainloop()
  37. while true do
  38. local str = socket.readline(id,"\n")
  39. if str then
  40. local cmd, key = string.match(str, "(%w+)%s*(.*)")
  41. if cmd == "S" or cmd == "L" or cmd == "C" then
  42. skynet.send(db, "lua", cmd, key)
  43. elseif cmd == "V" then
  44. local ret = skynet.call(db, "lua", cmd, key)
  45. if ret then
  46. socket.write(id, tostring(ret))
  47. end
  48. else
  49. print("Unknown command", cmd, key)
  50. end
  51. else
  52. socket.close(id)
  53. skynet.exit()
  54. return
  55. end
  56. end
  57. end
  58. skynet.start(function()
  59. socket.start(id)
  60. skynet.fork(mainloop)
  61. end)
  62. elseif mode == "dbserver" then
  63. local unqlite = require "unqlite"
  64. local dbfile = skynet.getenv "dbfile"
  65. local unqlite_db = unqlite.open(dbfile)
  66. print("Open db file : ", dbfile)
  67. -- mark in _G
  68. unqlite_db_gc = setmetatable({} , { __gc = function() unqlite.close(unqlite_db) end })
  69. local redis_addr = skynet.getenv "redisaddr"
  70. local addr, port, db = string.match(redis_addr, "([^:%s]+)%s*:%s*([^:%s%[]+)%s*%[%s*(%d+)%]")
  71. port = tonumber(port)
  72. db = tostring(db)
  73. print(string.format("Redis %s : %d select(%d)", addr, port, db))
  74. ----- redis response
  75. local function readline()
  76. return assert(socket.readline(fd, "\r\n"))
  77. end
  78. local function readbytes(bytes)
  79. return assert(socket.read(fd, bytes))
  80. end
  81. local function read_response(firstline)
  82. local firstchar = string.byte(firstline)
  83. local data = string.sub(firstline,2)
  84. if firstchar == 42 then -- '*'
  85. local n = tonumber(data)
  86. if n <= 0 then
  87. return n
  88. end
  89. local bulk = {}
  90. for i = 1,n do
  91. local line = readline()
  92. bulk[i*2-1] = line .. "\r\n"
  93. local bytes = tonumber(string.sub(line,2))
  94. if bytes >= 0 then
  95. local data = readbytes(bytes + 2)
  96. -- bulk[i] = nil when bytes < 0
  97. bulk[i*2] = data
  98. else
  99. bulk[i*2] = ""
  100. end
  101. end
  102. return n, table.concat(bulk)
  103. end
  104. if firstchar == 36 then -- '$'
  105. local bytes = tonumber(data)
  106. if bytes < 0 then
  107. return data
  108. end
  109. local firstline = skynet.readbytes(fd, bytes+2)
  110. return data .. "\r\n" .. firstline
  111. end
  112. return firstline
  113. end
  114. -----------------------
  115. local command = {}
  116. local dbcmd = { head = 1, tail = 1 }
  117. local cache = {}
  118. local function push(v)
  119. dbcmd[dbcmd.tail] = v
  120. dbcmd.tail = dbcmd.tail + 1
  121. end
  122. local function pop()
  123. if dbcmd.head == dbcmd.tail then
  124. return
  125. end
  126. local v = dbcmd[dbcmd.head]
  127. dbcmd[dbcmd.head] = nil
  128. dbcmd.head = dbcmd.head + 1
  129. if dbcmd.head == dbcmd.tail then
  130. dbcmd.head = 1
  131. dbcmd.tail = 1
  132. end
  133. return v
  134. end
  135. function command.S(key)
  136. local load_command = string.format("*2\r\n$7\r\nHGETALL\r\n$%d\r\n%s\r\n",#key,key)
  137. push(load_command)
  138. pcall(socket.write,fd,load_command)
  139. end
  140. function command.L(key)
  141. local v = unqlite.load(unqlite_db , key)
  142. if v then
  143. push(v)
  144. pcall(socket.write,fd,v)
  145. end
  146. end
  147. function command.C()
  148. local ok, err = pcall(unqlite.commit,unqlite_db)
  149. if not ok then
  150. print("Commit error:", err)
  151. end
  152. end
  153. function command.D(key)
  154. print("delete", key)
  155. end
  156. function command.V(key)
  157. local v = unqlite.load(unqlite_db , key)
  158. skynet.ret(skynet.pack(v))
  159. end
  160. local dispatcher
  161. local function connect_redis(addr, port, db)
  162. fd = socket.open(addr, port)
  163. if fd then
  164. socket.write(fd, string.format("*2\r\n$6\r\nSELECT\r\n$%d\r\n%d\r\n",#db,db))
  165. local ok = readline()
  166. assert(ok == "+OK", string.format("Select %d failed", db))
  167. for i = dbcmd.head, dbcmd.tail -1 do
  168. socket.write(fd, dbcmd[i])
  169. end
  170. print("connect ok")
  171. skynet.fork(dispatcher)
  172. return true
  173. end
  174. end
  175. local function dispatch_one()
  176. local firstline = readline()
  177. if firstline == "+OK" then
  178. pop()
  179. else
  180. local r,data = read_response(firstline)
  181. if type(r) == "number" and r > 0 then
  182. -- save key
  183. local cmd = pop()
  184. local key = string.match(cmd,"\r\n([^%s]+)\r\n$")
  185. unqlite.save(unqlite_db , key, string.format("*%d\r\n$5\r\nHMSET\r\n$%d\r\n%s\r\n%s", r+2, #key, key, data))
  186. else
  187. print("error:", r, data)
  188. pop()
  189. end
  190. end
  191. end
  192. -- local function
  193. function dispatcher()
  194. while true do
  195. local ok , err = pcall(dispatch_one)
  196. if not ok then
  197. -- reconnect
  198. print("redis disconnected:" , err)
  199. local ok, err = pcall(connect_redis, addr, port, db)
  200. if not ok then
  201. fd = nil
  202. print("Connect redis error: " .. tostring(err))
  203. skynet.sleep(1000)
  204. end
  205. return
  206. end
  207. end
  208. end
  209. skynet.start(function()
  210. assert(connect_redis(addr,port,db) , "Connect failed")
  211. skynet.dispatch("lua", function(session,addr, cmd, ...)
  212. command[cmd](...)
  213. end)
  214. end)
  215. else
  216. error ("Invalid mode " .. mode)
  217. end