Files
SafeLine/sdk/lua-resty-t1k/lib/resty/t1k/request.lua
xiaobing.wang 1f3f7262e0 feat: add sdk
2024-07-04 17:19:44 +08:00

361 lines
9.9 KiB
Lua

local bit = require "bit"
local buffer = require "resty.t1k.buffer"
local consts = require "resty.t1k.constants"
local file = require "resty.t1k.file"
local log = require "resty.t1k.log"
local utils = require "resty.t1k.utils"
local uuid = require "resty.t1k.uuid"
local _M = {
_VERSION = '1.0.0',
}
local bor = bit.bor
local byte = string.byte
local char = string.char
local fmt = string.format
local sub = string.sub
local ngx = ngx
local nlog = ngx.log
local ngx_req = ngx.req
local ngx_socket = ngx.socket
local ngx_var = ngx.var
local warn_fmt = log.warn_fmt
local debug_fmt = log.debug_fmt
local KEY_EXTRA_UUID = "UUID"
local KEY_EXTRA_LOCAL_ADDR = "LocalAddr"
local KEY_EXTRA_LOCAL_PORT = "LocalPort"
local KEY_EXTRA_REMOTE_ADDR = "RemoteAddr"
local KEY_EXTRA_REMOTE_PORT = "RemotePort"
local KEY_EXTRA_SCHEME = "Scheme"
local KEY_EXTRA_SERVER_NAME = "ServerName"
local KEY_EXTRA_PROXY_NAME = "ProxyName"
local KEY_EXTRA_REQ_BEGIN_TIME = "ReqBeginTime"
local KEY_EXTRA_HAS_RSP_IF_OK = "HasRspIfOK"
local KEY_EXTRA_HAS_RSP_IF_BLOCK = "HasRspIfBlock"
local TAG_HEAD_WITH_MASK_FIRST = bor(consts.TAG_HEAD, consts.MASK_FIRST)
local TAG_EXTRA_WITH_MASK_LAST = bor(consts.TAG_EXTRA, consts.MASK_LAST)
local T1K_PROTO = "Proto:2\n"
local T1K_PROTO_DATA = fmt("%s%s%s", char(consts.TAG_VERSION), utils.int_to_char_length(#T1K_PROTO), T1K_PROTO)
local function read_request_body(opt_req_body_size)
local ok, err
local req_body, req_body_size
local content_length = tonumber(ngx_var.http_content_length) or 0
local transfer_encoding = ngx_var.http_transfer_encoding
if content_length == 0 and not transfer_encoding then
return true, nil, nil
end
ngx_req.read_body()
req_body = ngx_req.get_body_data()
if req_body then
req_body_size = #req_body
if req_body_size > opt_req_body_size then
nlog(debug_fmt("request body is too long: %d bytes, cut to %d bytes", req_body_size, opt_req_body_size))
req_body = sub(req_body, 1, opt_req_body_size)
end
return true, nil, req_body
end
local path = ngx_req.get_body_file()
if not path then
return true, nil, nil
end
ok, err, req_body = file.read(path, opt_req_body_size)
if not ok then
err = fmt("failed to read temporary file %s: %s", path, err)
return ok, err, nil
end
return true, nil, req_body
end
local function get_remote_addr(remote_addr_var, remote_addr_idx)
local addr
if remote_addr_var then
addr = utils.get_indexed_element(ngx_var[remote_addr_var], remote_addr_idx)
end
return addr or ngx_var.remote_addr
end
local function build_header()
local http_version = ngx_req.http_version()
if http_version < 2.0 then
return true, nil, ngx_req.raw_header()
end
local headers, err = ngx_req.get_headers(0, true)
if err then
err = fmt("failed to call ngx_req.get_headers: %s", err)
return nil, err, nil
end
local buf = buffer:new()
buf:add(ngx_req.get_method())
buf:add(" ")
buf:add(ngx_var.request_uri)
buf:add(fmt(" HTTP/%.1f\r\n", http_version))
for k, v in pairs(headers) do
buf:add(k .. ": " .. v .. "\r\n")
end
buf:add("\r\n")
return true, nil, buf
end
local function build_body(opts)
local ok, err
local body
local req_body_size = opts.req_body_size * 1024
ok, err, body = read_request_body(req_body_size)
if not ok then
return ok, err, nil
end
return true, nil, body
end
local function build_extra(opts)
local err
local src_ip = get_remote_addr(opts.remote_addr_var, opts.remote_addr_idx)
if not src_ip then
err = fmt("failed to get remote_addr, var: %s, idx %d", opts.remote_addr_var, opts.remote_addr_idx)
return nil, err
end
local src_port = ngx_var.remote_port
if not src_port then
err = "failed to get ngx_var.remote_port"
return nil, err, nil
end
local local_ip = ngx_var.server_addr
if not local_ip then
err = "failed to get ngx_var.server_addr"
return nil, err, nil
end
local local_port = ngx_var.server_port
if not local_port then
err = "failed to get ngx_var.server_port"
return nil, err, nil
end
local extra = buffer:new({
KEY_EXTRA_UUID, ":", uuid.generate_v4(), "\n",
KEY_EXTRA_REMOTE_ADDR, ":", src_ip, "\n",
KEY_EXTRA_REMOTE_PORT, ":", src_port, "\n",
KEY_EXTRA_LOCAL_ADDR, ":", local_ip, "\n",
KEY_EXTRA_LOCAL_PORT, ":", local_port, "\n",
KEY_EXTRA_SCHEME, ":", ngx_var.scheme, "\n",
KEY_EXTRA_SERVER_NAME, ":", ngx_var.server_name, "\n",
KEY_EXTRA_PROXY_NAME, ":", ngx_var.hostname, "\n",
KEY_EXTRA_REQ_BEGIN_TIME, ":", fmt("%.0f", ngx_req.start_time() * 1000000), "\n",
KEY_EXTRA_HAS_RSP_IF_OK, ":n\n",
KEY_EXTRA_HAS_RSP_IF_BLOCK, ":n\n"
})
return true, nil, extra
end
local function do_send(sock, data)
local ok, err = sock:send(data)
if not ok then
return ok, err
end
return true, nil
end
local function receive_data(s, srv)
local t = {}
local ft = true
local finished
repeat
local err
local tag, length, packet, rsp_body
packet, err = s:receive(consts.T1K_HEADER_SIZE)
if err then
err = fmt("failed to receive info packet from t1k server %s: %s", srv, err)
return nil, err, nil
end
if not packet then
err = fmt("empty packet from t1k server %s", srv)
return nil, err, nil
end
if ft then
if not utils.is_mask_first(byte(packet, 1, 1)) then
err = fmt("first packet is not MASK_FIRST from t1k server %s", srv)
return nil, err, nil
end
ft = false
end
finished, tag, length = utils.packet_parser(packet)
if length > 0 then
rsp_body, err = s:receive(length)
if not rsp_body or #rsp_body ~= length then
err = fmt("failed to receive payload from t1k server %s: %s", srv, err)
return nil, err, nil
end
t[tag] = rsp_body
end
until (finished)
return true, nil, t
end
local function get_socket(opts)
local ok, err
local count, sock, server
sock, err = ngx_socket.tcp()
if not sock then
err = fmt("failed to create socket: %s", err)
return nil, err, nil, nil
end
sock:settimeouts(opts.connect_timeout, opts.send_timeout, opts.read_timeout)
if opts.uds then
server = opts.host
ok, err = sock:connect(opts.host)
else
server = fmt("%s:%d", opts.host, opts.port)
ok, err = sock:connect(opts.host, opts.port)
end
if not ok then
sock:close()
err = fmt("failed to connect to t1k server %s: %s", server, err)
return ok, err, nil, nil
end
nlog(debug_fmt("successfully connected to t1k server %s", server))
count, err = sock:getreusedtimes()
if not count then
nlog(warn_fmt("failed to get reused times from t1k server %s: %s", server, err))
end
if count == 0 then
ok, err = sock:setoption("keepalive", true)
if not ok then
nlog(warn_fmt("failed to set keepalive for t1k server %s: %s", server, err))
end
ok, err = sock:setoption("reuseaddr", true)
if not ok then
nlog(warn_fmt("failed to set reuseaddr for t1k server %s: %s", server, err))
end
ok, err = sock:setoption("tcp-nodelay", true)
if not ok then
nlog(warn_fmt("failed to set tcp-nodelay for t1k server %s: %s", server, err))
end
end
return true, nil, sock, server
end
local function do_socket(opts, header, body, extra)
local ok, err
local t, sock, server
ok, err, sock, server = get_socket(opts)
if not ok then
err = fmt("failed to get socket: %s", err)
return ok, err, nil
end
ok, err = do_send(sock, { char(TAG_HEAD_WITH_MASK_FIRST), utils.int_to_char_length(header:len()), header })
if not ok then
sock:close()
err = fmt("failed to send header data to t1k server %s: %s", server, err)
return ok, err, nil
end
if body ~= nil then
ok, err = do_send(sock, { char(consts.TAG_BODY), utils.int_to_char_length(body:len()), body })
if not ok then
sock:close()
err = fmt("failed to send body data to t1k server %s: %s", server, err)
return ok, err, nil
end
end
ok, err = do_send(sock, { T1K_PROTO_DATA, char(TAG_EXTRA_WITH_MASK_LAST), utils.int_to_char_length(extra:len()), extra })
if not ok then
sock:close()
err = fmt("failed to send extra data to t1k server %s: %s", server, err)
return ok, err, nil
end
ok, err, t = receive_data(sock, server)
if not ok then
return ok, err, nil
end
ok, err = sock:setkeepalive(opts.keepalive_timeout, opts.keepalive_size)
if not ok then
nlog(warn_fmt("failed to set keepalive: %s", err))
sock:close()
end
return true, nil, t
end
function _M.do_request(opts)
local ok, err
local header, body, extra, t
ok, err, header = build_header(opts)
if not ok then
return ok, err, nil
end
ok, err, body = build_body(opts)
if not ok then
return ok, err, nil
end
ok, err, extra = build_extra(opts)
if not ok then
return ok, err, nil
end
ok, err, t = do_socket(opts, header, body, extra)
if not ok then
return ok, err, nil
end
if opts.mode == consts.MODE_BLOCK then
local extra_header = t[consts.TAG_EXTRA_HEADER]
if extra_header then
ngx.ctx.t1k_extra_header = extra_header
end
end
local result = {
action = t[consts.TAG_HEAD],
status = t[consts.TAG_BODY],
event_id = utils.get_event_id(t[consts.TAG_EXTRA_BODY]),
}
return true, nil, result
end
return _M