From 6067ad854663a3e0e0bcc06ed51232e181721182 Mon Sep 17 00:00:00 2001 From: Vladislav Grubov Date: Mon, 26 May 2025 14:12:49 +0300 Subject: [PATCH 1/7] annotations --- connection.lua | 136 ++++++++++++++++++++++++++++++++++++------------- 1 file changed, 100 insertions(+), 36 deletions(-) diff --git a/connection.lua b/connection.lua index b822e97..c1a2d1b 100644 --- a/connection.lua +++ b/connection.lua @@ -30,15 +30,40 @@ ffi.cdef [[ local C = ffi.C local iovec = ffi.typeof('struct iovec') -- FIXME it is used once! -local IOVSZ = ffi.sizeof(iovec) +local IOVSZ = assert(ffi.sizeof(iovec)) local NOTCONNECTED = 0; local CONNECTING = 1; local CONNECTED = 2; local RECONNECTING = 3; +---@class connection +---@field host string remote host +---@field port number remote port +---@field timeout number connection and request timeout +---@field private __id number identifier of object +---@field private _auto boolean +---@field private _reconnect number|false? +---@field private _gen number generation of connection +---@field state S2S connection state +---@field maxbuf number read buffer size +---@field rbuf ffi.cdata* read buffer +---@field avail number actual data length in buffer +---@field wsize number write buffer size +---@field wbuf ffi.cdata* write buffer +---@field wcur number current write buffer position +---@field wstash table stash for write buffer +---@field _flush fiber.channel flush channel +---@field connwait fiber.channel connection wait channel +---@field private ww fiber? writer worker fiber +---@field private rw fiber? read worker local M = obj.class({},_NAME) +---@alias S2S +---| 0 NOTCONNECTED +---| 1 CONNECTING +---| 2 CONNECTED +---| 3 RECONNECTING local S2S = { [NOTCONNECTED] = 'NOTCONNECTED', [CONNECTING] = 'CONNECTING', @@ -54,6 +79,12 @@ local errno_is_transient = { [errno.EINTR] = true; } +---@class connection.options +---@field timeout number? connection and request timeout +---@field autoconnect boolean? should we connect on init? +---@field reconnect number|false? reconnect timeout, if false then no reconnect +---@field maxbuf number? read buffer size, default is 2*1024*1024 + --[[ options: timeout - @@ -78,13 +109,16 @@ internal fields: _gen ]] +---@param host string remote host +---@param port number remote port +---@param opt connection.options? function M:_init(host, port, opt) self.host = host; self.port = tonumber(port) opt = opt or {} - + self.timeout = tonumber(opt.timeout) or 1/3 - + if opt.reconnect ~= nil then if opt.reconnect then self._reconnect = tonumber(opt.reconnect) @@ -94,13 +128,13 @@ function M:_init(host, port, opt) else self._reconnect = 1/3 end - + if opt.autoconnect ~= nil then self._auto = opt.autoconnect else self._auto = true end - + self.state = NOTCONNECTED self.maxbuf = opt.maxbuf or 2*1024*1024 self.rbuf = ffi.cast('char *', ffi.C.calloc(1, self.maxbuf)) @@ -127,9 +161,9 @@ function M:_init(host, port, opt) self.wstash = {} self._flush = fiber.channel(0) - + self.connwait = fiber.channel(0) - + if self._auto then self:connect() end @@ -140,6 +174,9 @@ function M:_init(host, port, opt) ]] end +---Logs a message on the connection. +---@param l string log level +---@param msg string message to log function M:log(l,msg,...) msg = tostring(msg) -- FIXME it's a bad pattern. We should always use format @@ -163,22 +200,31 @@ function M:fdno() end end +---Returns a string representation of the connection. +---@return string function M:_stringify() return string.format("cnn(%s:%s : %s:%s : %s)",self:fdno(),self.state,self.host,self.port,self.__id) end +---Describes current connection. +---@return string function M:desc() return tostring(self.host) .. ':' .. tostring(self.port) .. '/' .. self:fdno() end +---Default callback for connection established. function M:on_connected() self:log("D", "called default on_connected") end +---Default callback for connection failed. +---@param e string|box.error error message function M:on_disconnect(e) self:log("D", "called default on_disconnect: %s", e) end +---Cleans up the connection. +---@param e integer errno code that triggered action function M:_cleanup(e) self.state = NOTCONNECTED if self.ww then if self.ww ~= fiber.self() then pcall(fiber.cancel,self.ww) end self.ww = nil end @@ -190,7 +236,7 @@ function M:_cleanup(e) self.avail = 0ULL self.lasterror = errno.strerror(e) - + while self.connwait:put(false, 0) do end end @@ -240,12 +286,14 @@ function M:on_connect_failed(e) end end +---Callback is called when connection is about to reset +---@param e integer errno code function M:on_connect_reset(e) self:log('W',"connection reset:",errno.strerror(e)) if self.state == CONNECTED then -- TODO: stop all fibers self:_cleanup(0) - + if self._reconnect then self.state = NOTCONNECTED -- was RECONNECTING fiber.create(function(self) fiber.name("net.cb") self:on_disconnect(errno.strerror(e)) end,self) @@ -258,11 +306,18 @@ function M:on_connect_reset(e) end end +---Callback is called when data is available for reading. +--- +---if method raises an error, then connection is reset. +---@param is_last boolean true if this is the last read, false if more data is expected function M:on_read(is_last) self:log('D',"on_read (last:",is_last,") ",ffi.string(self.rbuf,self.ravail)) self.avail = 0ULL end +---Callback is called when socket has been connected. +--- +---In this method we create two fibers: one for reading and one for writing. function M:on_connect_io() local err = self.s:getsockopt('SOL_SOCKET', 'SO_ERROR'); if err ~= 0 then @@ -271,15 +326,14 @@ function M:on_connect_io() return end self.state = CONNECTED; - + local weak = setmetatable({}, { __mode = "kv" }) weak.self = self --print('----', weak.self.s) - + self.ww = fiber.create(function (weak, gen) fiber.name(string.format("net.ww[%s:%s#%d]", weak.self.host, weak.self.port, gen)) local s = weak.self.s - local timeout = weak.self.timeout while weak.self and gen == weak.self._gen do if s:writable(1) then if not weak.self then break end @@ -290,7 +344,7 @@ function M:on_connect_io() end end end, weak, self._gen) - + self.rw = fiber.create(function (weak, gen) fiber.name(string.format("net.rw[%s:%s#%d]", weak.self.host, weak.self.port, gen)) local s = weak.self.s @@ -304,16 +358,16 @@ function M:on_connect_io() if rd >= 0 then self.avail = self.avail + rd; local avail = self.avail - + local status, err = pcall(self.on_read, self, rd == 0) if not status then self:log('E', 'on_read raised an error: ', err) self:on_connect_reset(errno.EINVAL) -- errno.EINVAL = 22 end - + local pkoft = avail - self.avail -- print("avail ",avail, " -> ", self.avail, " pkoft = ", pkoft) - + -- FIXME: Is it a good idea? if self.avail > 0 then if self.avail == self.maxbuf then @@ -339,7 +393,7 @@ function M:on_connect_io() end end end, weak, self._gen) - + while self.connwait:put(true, 0) do end fiber.create(function(self) fiber.name("net.cb") self:on_connected() end,self) end @@ -349,7 +403,7 @@ function M:wait_con(timeout) return true end -- FIXME move define default timeout in the start of the file - if slef.connwait:get(timeout or self.timeout or 10) then + if self.connwait:get(timeout or self.timeout or 10) then return else -- FIXME Should we use to kinds of error here? There are two cases: it @@ -358,48 +412,50 @@ function M:wait_con(timeout) end end +---Connects to the remote host and port. +---@return boolean true if connected, false if not function M:connect() assert(type(self) == 'table',"object required") - + if self.state ~= NOTCONNECTED then return (self.state == CONNECTED) end - + -- connect timeout assert(not self.s, "Already have socket") - + self.state = CONNECTING self._gen = self._gen + 1 - + local weak = setmetatable({}, { __mode = "kv" }) weak.self = self - + fiber.create(function(weak) -- We don't need to check self because fiber is runned without yielding local ai = socket.getaddrinfo( weak.self.host, weak.self.port, weak.self.timeout, { ['type'] = 'SOCK_STREAM', } ) - + -- But after getaddrinfo we do need to check the link if not weak.self then return end - + if ai and #ai > 0 then --print(dumper(ai)) else weak.self:on_connect_failed( errno() == 0 and errno.ENXIO or errno() ) return end - + local ainfo = ai[1] local s = socket( ainfo.family, ainfo.type, ainfo.protocol ) if not s then weak.self:on_connect_failed( errno() ) return end - + s:nonblock(true) s:linger(1,0) - + while true do -- FIXME sysconnect should be not yielding, but we have to dowble check -- for traps from tnt team @@ -414,17 +470,17 @@ function M:connect() or s:errno() == errno.EWOULDBLOCK then weak.self.s = s - + local wr = s:writable(weak.self.timeout) - + if not weak.self then s:close() return end - + if wr then weak.self:on_connect_io() else weak.self:on_connect_failed( errno.ETIMEDOUT ) end - + return elseif s:errno() == errno.EINTR then -- again @@ -435,7 +491,7 @@ function M:connect() end end end - + end, weak) end @@ -448,6 +504,11 @@ function M:_wbuf_realloc( ... ) C.memcpy(self.wbuf, old, self.wcur * ffi.sizeof(self.wbuf[0])) end +---Pushes data to the write buffer. +--- +---Does not write into socket, you must call :flush() to write data. +---@param buf string|ffi.cdata* data to write +---@param len number? length of data to write, if nil then uses #buf function M:push_write( buf, len ) if self.state ~= CONNECTED then error("Not connected") @@ -471,6 +532,8 @@ function M:push_write( buf, len ) self.wcur = self.wcur + 1 end +---Writes data to the socket +---@return boolean is_drained true if write buffer was left empty, false if not function M:_writev() if self.wcur == 0 then return true @@ -519,16 +582,17 @@ function M:_writev() end -- iowait ? return false - + end +---Flushes the write buffer to the socket. function M:flush() assert(type(self) == 'table', "object required") - + if self.state ~= CONNECTED then error("Not connected") end - + self._flush:put(true, 0) end From 08a68426a4141e6f50258c5f5e6160eb489dc93c Mon Sep 17 00:00:00 2001 From: Vladislav Grubov Date: Mon, 26 May 2025 15:43:51 +0300 Subject: [PATCH 2/7] annotations and simple tests --- .github/actions/setup-tarantool/action.yml | 27 +++ .github/actions/setup-test/action.yml | 24 +++ .github/workflows/lint.yml | 16 ++ .github/workflows/push-rockspec.yml | 58 +++++++ .github/workflows/tests.yml | 49 ++++++ connection.lua | 8 +- test/001_basic_test.lua | 183 +++++++++++++++++++++ 7 files changed, 362 insertions(+), 3 deletions(-) create mode 100644 .github/actions/setup-tarantool/action.yml create mode 100644 .github/actions/setup-test/action.yml create mode 100644 .github/workflows/lint.yml create mode 100644 .github/workflows/push-rockspec.yml create mode 100644 .github/workflows/tests.yml create mode 100644 test/001_basic_test.lua diff --git a/.github/actions/setup-tarantool/action.yml b/.github/actions/setup-tarantool/action.yml new file mode 100644 index 0000000..0dec81e --- /dev/null +++ b/.github/actions/setup-tarantool/action.yml @@ -0,0 +1,27 @@ +name: Setup Tarantool Test Environment +description: 'Sets up Tarantool and tt' + +inputs: + tarantool-version: + description: 'Tarantool version to install' + required: true + default: '2.11' + +runs: + using: "composite" + steps: + - name: setup tarantool + uses: tarantool/setup-tarantool@v3 + with: + tarantool-version: ${{ inputs.tarantool-version }} + - name: add tarantool/modules repo + shell: bash + run: | + os=$(. /etc/os-release && echo $ID) + dist=$(. /etc/os-release && echo $VERSION_CODENAME) + curl -L "https://download.tarantool.org/tarantool/modules/gpgkey" | sudo apt-key add - + apt_source_path="/etc/apt/sources.list.d/tarantool.list" + echo "deb https://download.tarantool.org/tarantool/modules/${os}/ ${dist} main" | sudo tee ${apt_source_path} + - name: install tt + shell: bash + run: sudo apt-get update && sudo apt-get install -y tt diff --git a/.github/actions/setup-test/action.yml b/.github/actions/setup-test/action.yml new file mode 100644 index 0000000..aadb8c1 --- /dev/null +++ b/.github/actions/setup-test/action.yml @@ -0,0 +1,24 @@ +name: Setup Tarantool Test Environment +description: 'Sets up Tarantool and dependencies for testing' + +inputs: + tarantool-version: + description: 'Tarantool version to install' + required: true + +runs: + using: "composite" + steps: + - name: setup tarantool + uses: ./.github/actions/setup-tarantool + with: + tarantool-version: ${{ inputs.tarantool-version }} + - name: install luacov-coveralls 0.2.3 + shell: bash + run: tt rocks install --server https://luarocks.org luacov-coveralls 0.2.3 + - name: install luacov-console 1.2.0 + shell: bash + run: tt rocks --server http://moonlibs.github.io/rocks install luacov-console 1.2.0 + - name: install luatest scm-1 + shell: bash + run: tt rocks install luatest 1.1.0 diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml new file mode 100644 index 0000000..7cd56fc --- /dev/null +++ b/.github/workflows/lint.yml @@ -0,0 +1,16 @@ +name: Linting with luacheck + +on: + - push + - pull_request + +jobs: + run-luacheck-linter: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: ./.github/actions/setup-tarantool + with: + tarantool-version: '2.11' + - name: install tarantool/luacheck and execute it + run: tt rocks install luacheck && .rocks/bin/luacheck . diff --git a/.github/workflows/push-rockspec.yml b/.github/workflows/push-rockspec.yml new file mode 100644 index 0000000..ac6cd05 --- /dev/null +++ b/.github/workflows/push-rockspec.yml @@ -0,0 +1,58 @@ +name: Create and push rockspec for moonlibs/connection + +on: + workflow_run: + workflows: + - "Linting with luacheck" + - "Testing with unit tests" + types: + - completed + push: + tags: + - '*' + +env: + ROCK_NAME: connection + +jobs: + pack-and-push-tagged-rockspec: + runs-on: ubuntu-latest + if: ${{ github.event.workflow_run.conclusion == 'success' }} && startsWith(github.ref, 'refs/tags/') }} + steps: + - uses: actions/checkout@v4 + - uses: ./.github/actions/setup-tarantool + + # https://stackoverflow.com/questions/58177786/get-the-current-pushed-tag-in-github-actions + - name: Set env + run: echo "TAG=${GITHUB_REF#refs/*/}" >> $GITHUB_ENV + + - run: tt rocks new_version --tag=${{ env.TAG }} ${{ env.ROCK_NAME }}-dev-1.rockspec ${{ env.TAG }} "git+https://github.com/${{ github.repository }}.git" + - run: tt rocks install ${{ env.ROCK_NAME }}-${{ env.TAG }}-1.rockspec + - run: tt rocks pack ${{ env.ROCK_NAME }}-${{ env.TAG }}-1.rockspec + + - uses: unfor19/install-aws-cli-action@v1.0.3 + - run: + | + mkdir .build + cp ${{ env.ROCK_NAME }}-dev-1.rockspec ${{ env.ROCK_NAME }}-${{ env.TAG }}-1.rockspec \ + .build/ + cp *.src.rock .build/ + - name: rebuild and publish s3 luarocks server + env: + AWS_ACCESS_KEY_ID: ${{ secrets.MOONLIBS_S3_ACCESS_KEY_ID }} + AWS_SECRET_ACCESS_KEY: ${{ secrets.MOONLIBS_S3_SECRET_KEY}} + AWS_EC2_METADATA_DISABLED: true + run: | + cd .build && \ + aws s3 sync s3://moonlibs/ ./ && \ + tt rocks admin make_manifest . && \ + aws s3 sync --acl public-read ./ s3://moonlibs/; + - uses: "marvinpinto/action-automatic-releases@latest" + with: + repo_token: "${{ secrets.GITHUB_TOKEN }}" + prerelease: false + files: | + README.md + ${{env.ROCK_NAME}}-dev-1.rockspec + ${{env.ROCK_NAME}}-${{env.TAG}}-1.rockspec + ${{env.ROCK_NAME}}-${{env.TAG}}-1.src.rock diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml new file mode 100644 index 0000000..0df105e --- /dev/null +++ b/.github/workflows/tests.yml @@ -0,0 +1,49 @@ +name: Testing with unit tests + +on: + - push + - pull_request + +jobs: + run-unit-tests: + runs-on: ubuntu-22.04 + strategy: + matrix: + version: ["1.10.15", "2.10.7", "2.11.2", "3.0.1"] + steps: + - uses: actions/checkout@v4 + - uses: ./.github/actions/setup-test + with: + tarantool-version: ${{matrix.version}} + - name: run tests + run: luatest --no-capture --coverage -v + - name: rename luacov.stats.out + run: mv luacov.stats.out luacov.stats.out-${{matrix.version}} + - uses: actions/upload-artifact@master + with: + name: luacov.stats.out-${{matrix.version}} + path: luacov.stats.out-${{matrix.version}} + + run-coverage-report: + runs-on: ubuntu-latest + needs: ["run-unit-tests"] + steps: + - uses: actions/checkout@v4 + - uses: ./.github/actions/setup-test + with: + tarantool-version: '2.11' + - name: Download run artifacts + uses: actions/download-artifact@v4 + with: + pattern: luacov.stats.out-* + merge-multiple: true + - name: debug + run: ls -la . + - name: merge luacov.stats.out + run: cat luacov.stats.out-* | >luacov.stats.out tarantool -e 'm={} for k in io.lines() do local vs=io.read():split(" ") vs[#vs]=nil local r = m[k] if r then for i, v in pairs(vs) do r[i]=r[i]+v end else m[k]=vs end end; for k, v in pairs(m) do print(k) print(table.concat(v, " ")) end' + - name: prepare coverage report + run: .rocks/bin/luacov-console . && .rocks/bin/luacov-console -s + - name: publish coveralls report + env: + COVERALLS_REPO_TOKEN: ${{ secrets.COVERALLS_REPO_TOKEN }} + run: .rocks/bin/luacov-coveralls -v \ No newline at end of file diff --git a/connection.lua b/connection.lua index c1a2d1b..4e405ea 100644 --- a/connection.lua +++ b/connection.lua @@ -69,6 +69,10 @@ local S2S = { [CONNECTING] = 'CONNECTING', [CONNECTED] = 'CONNECTED', [RECONNECTING] = 'RECONNECTING', + NOTCONNECTED = NOTCONNECTED, + CONNECTED = CONNECTED, + CONNECTING = CONNECTING, + RECONNECTING = RECONNECTING, } M.S2S = S2S @@ -154,7 +158,6 @@ function M:_init(host, port, opt) self._gen = 0 self.wsize = 32 - local osz = self.wsize self.wbuf = ffi.new('struct iovec[?]', self.wsize) self.wcur = 0 @@ -495,11 +498,10 @@ function M:connect() end, weak) end -function M:_wbuf_realloc( ... ) +function M:_wbuf_realloc() local old = self.wbuf local osz = self.wsize self.wsize = osz * 2 - local nsz = self.wsize self.wbuf = ffi.new('struct iovec[?]', self.wsize) C.memcpy(self.wbuf, old, self.wcur * ffi.sizeof(self.wbuf[0])) end diff --git a/test/001_basic_test.lua b/test/001_basic_test.lua new file mode 100644 index 0000000..7d85130 --- /dev/null +++ b/test/001_basic_test.lua @@ -0,0 +1,183 @@ +local t = require('luatest') +local fiber = require('fiber') +local json = require('json') +local g = t.group('basic') + +local username = 'guest' +local password = '' + +---@type luatest.server +local server = t.Server:new({ + alias = 'server', + box_cfg = { + listen = '127.0.0.1:3301', + }, + net_box_port = 3301, + net_box_uri = '127.0.0.1:3301', +}) + +g.before_all(function() + server:start({wait_until_ready = true}) +end) + +g.after_all(function() + server:stop() +end) + +local connection = require 'connection' + +function g.test_connect() + ---@type connection + local cnn = connection:new(server.net_box.host, server.net_box.port) + cnn.connwait:get(1) + t.assert_equals(cnn.state, connection.S2S.CONNECTED, 'connection has been established') + cnn:close() + t.assert_equals(cnn.state, connection.S2S.NOTCONNECTED, 'connection has been closed') +end + +---@class connection.greeter: connection +local greeter = require 'obj'.class({}, 'connection.greeter', connection) + +function greeter:on_connect_io() + self:super(greeter, 'on_connect_io')() + self.stage = 'greeting' +end + +function greeter:on_greeting_read() + local avail = self.avail + local greeting_size = 128 + if avail < greeting_size then return end + + local ffi = require 'ffi' + local str = ffi.string(self.rbuf, greeting_size) + self.avail = avail - greeting_size -- consume the greeting + + local _, salt_b64 = unpack(string.split(str, '\n')) + local digest = require 'digest' + + local salt = digest.base64_decode(salt_b64):sub(1, 20) + local step1 = digest.sha1(password) + local step2 = digest.sha1(step1) + local step3 = digest.sha1(salt .. step2) + + local function xor(s1, s2, n) + local r = table.new(n, 0) + for i = 1, n do + r[i] = string.char(bit.bxor(s1:byte(i), s2:byte(i))) + end + return table.concat(r, '') + end + + local scramble = xor(step1, step3, #salt) + + -- now construct auth packet + local msgpack = require('msgpack') + local key = { + REQUEST_TYPE = 0x00, + SYNC = 0x01, + TUPLE = 0x21, + USER_NAME = 0x23, + } + local val = { + AUTH = 0x07, + } + local hdr = { + [key.REQUEST_TYPE] = val.AUTH, + [key.SYNC] = 0x01, + } + local bdy = { + [key.USER_NAME] = username, + [key.TUPLE] = {'chap-sha1', scramble}, + } + local buf = msgpack.encode(hdr) .. msgpack.encode(bdy) + local size = msgpack.encode(#buf) + local pkt = table.concat({ + string.char(0xce), + -- prepend \0-bytes to buffer + ("\x00\x00\x00\x00"):sub(1, 4 - #size) .. size, + buf + },'') + + self:push_write(pkt) + self.stage = 'fetching_schema' + self:flush() +end + +function greeter:on_fetching_schema_read(is_last) + local msgpack = require('msgpack') + + local ptr = self.rbuf + local avail = tonumber(self.avail) + local tail = ptr + avail + + local sz + sz, ptr = msgpack.decode(ptr, tonumber(tail-ptr)) + if sz == nil then + -- not enough data + return + end + self:log('D', 'size:%s', sz) + + if avail < sz then + -- not enough data + self:log('D', 'not enough data, need %s, have %s', sz, avail) + return + end + + local hdr + hdr, ptr = msgpack.decode(ptr, tonumber(tail-ptr)) + if hdr == nil then + -- not enough data + return + end + self:log('D', 'hdr:%s', json.encode(hdr)) + + local bdy + bdy, ptr = msgpack.decode(ptr, tonumber(tail-ptr)) + if bdy == nil then + -- not enough data + return + end + self:log('D', 'bdy:%s', json.encode(bdy)) + self.avail = tail - ptr + + self.on_schema:put({ + header = hdr, + body = bdy, + }) +end + +---Tarantool greeter +function greeter:on_read(is_last) + if self.stage == 'greeting' then + return self:on_greeting_read(is_last) + elseif self.stage == 'fetching_schema' then + return self:on_fetching_schema_read(is_last) + else + self:log('E', 'unknown stage %s', self.stage) + self.avail = 0 + end +end + +function g.test_greeting() + local cnn = greeter:new(server.net_box.host, server.net_box.port) + cnn.on_schema = fiber.channel() + cnn.connwait:get(1) + t.assert_equals(cnn.state, connection.S2S.CONNECTED, 'connection has been established') + + local packet = cnn.on_schema:get(5) + t.assert(packet, "packet with schema must be received") + + local schema_version = server:exec(function() + return box.info.schema_version or box.internal.schema_version() + end) + + t.assert_items_equals(packet.header, { + [0x00] = 0x00, -- REQUEST_TYPE:OK + [0x01] = 0x01, -- SYNC:1 + [0x05] = schema_version, -- SCHEMA_ID:83 + }, "packet.header is okay") + + t.assert_items_equals(packet.body, {}, "packet body is empty") + t.assert_equals(cnn.state, connection.S2S.CONNECTED, 'connection has been established') +end From 340f52f4f215a320eb0a94c47e5d39573dc2d18e Mon Sep 17 00:00:00 2001 From: Vladislav Grubov Date: Mon, 26 May 2025 16:17:40 +0300 Subject: [PATCH 3/7] fix annotations --- .github/workflows/tests.yml | 2 +- .luacheckrc | 12 ++++++++++++ .luacov | 21 +++++++++++++++++++++ 3 files changed, 34 insertions(+), 1 deletion(-) create mode 100644 .luacheckrc create mode 100644 .luacov diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 0df105e..e0442df 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -16,7 +16,7 @@ jobs: with: tarantool-version: ${{matrix.version}} - name: run tests - run: luatest --no-capture --coverage -v + run: .rocks/bin/luatest --no-capture --coverage -v - name: rename luacov.stats.out run: mv luacov.stats.out luacov.stats.out-${{matrix.version}} - uses: actions/upload-artifact@master diff --git a/.luacheckrc b/.luacheckrc new file mode 100644 index 0000000..22a710f --- /dev/null +++ b/.luacheckrc @@ -0,0 +1,12 @@ +std="tarantool" + +codes=true +max_line_length=140 +include_files = {"connection.lua"} + +ignore = { + "212", -- unused argument + "431", -- shadwing upvalue + "432", -- shadowing upvalue argument + "542", -- empty if branch +} diff --git a/.luacov b/.luacov new file mode 100644 index 0000000..3ed9ca4 --- /dev/null +++ b/.luacov @@ -0,0 +1,21 @@ +runreport = false +deletestats = false + +exclude = { + "%.rocks/", + "builtin/", + "t/.+%.test", +} + +pathcorrect = { + { "^/source/connection/", "" }, +} + +coveralls = { + root = "/", + debug = true, + pathcorrect = { + { "^/home/runner/work/connection/connection/", "" }, + { "^/source/connection", "" }, + }, +} \ No newline at end of file From 44bc2e2d8d22970195ac28342526bee8d2d8f3b8 Mon Sep 17 00:00:00 2001 From: Vladislav Grubov Date: Mon, 26 May 2025 16:26:47 +0300 Subject: [PATCH 4/7] fix ci --- .github/workflows/tests.yml | 2 ++ .rocks/config-5.1.lua | 6 ++++++ connection-dev-1.rockspec | 23 +++++++++++++++++++++++ 3 files changed, 31 insertions(+) create mode 100644 .rocks/config-5.1.lua create mode 100644 connection-dev-1.rockspec diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index e0442df..81ae02a 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -15,6 +15,8 @@ jobs: - uses: ./.github/actions/setup-test with: tarantool-version: ${{matrix.version}} + - name: build lib + run: tt rocks make connection-dev-1.rockspec - name: run tests run: .rocks/bin/luatest --no-capture --coverage -v - name: rename luacov.stats.out diff --git a/.rocks/config-5.1.lua b/.rocks/config-5.1.lua new file mode 100644 index 0000000..59c2192 --- /dev/null +++ b/.rocks/config-5.1.lua @@ -0,0 +1,6 @@ +rocks_servers = { + "https://moonlibs.org", + "https://moonlibs.github.io/rocks", + "https://rocks.tarantool.org", + "https://luarocks.org", +} diff --git a/connection-dev-1.rockspec b/connection-dev-1.rockspec new file mode 100644 index 0000000..c8a1060 --- /dev/null +++ b/connection-dev-1.rockspec @@ -0,0 +1,23 @@ +package = "connection" +version = "dev-1" +source = { + url = "git+https://github.com/moonlibs/connection.git", + branch = "master" +} +description = { + summary = "Base class for tcp connections", + detailed = "Base class for tcp connections", + homepage = "https://github.com/moonlibs/connection.git", + license = "Artistic", + maintainer = "Mons Anderson " +} +dependencies = { + "lua >= 5.1", + "obj >= 0" +} +build = { + type = "builtin", + modules = { + connection = "connection.lua" + } +} From 138f05b7a6f6de2b212ce494ca06d96912a84317 Mon Sep 17 00:00:00 2001 From: Vladislav Grubov Date: Mon, 26 May 2025 16:31:36 +0300 Subject: [PATCH 5/7] fix tests --- .github/workflows/tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 81ae02a..4346201 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -18,7 +18,7 @@ jobs: - name: build lib run: tt rocks make connection-dev-1.rockspec - name: run tests - run: .rocks/bin/luatest --no-capture --coverage -v + run: .rocks/bin/luatest --no-capture -v - name: rename luacov.stats.out run: mv luacov.stats.out luacov.stats.out-${{matrix.version}} - uses: actions/upload-artifact@master From f864aa31259f5e3e32b1407e195564e51486388a Mon Sep 17 00:00:00 2001 From: Vladislav Grubov Date: Mon, 26 May 2025 16:35:40 +0300 Subject: [PATCH 6/7] fix tests --- .github/actions/setup-test/action.yml | 6 ------ .github/workflows/tests.yml | 30 --------------------------- 2 files changed, 36 deletions(-) diff --git a/.github/actions/setup-test/action.yml b/.github/actions/setup-test/action.yml index aadb8c1..e6b494e 100644 --- a/.github/actions/setup-test/action.yml +++ b/.github/actions/setup-test/action.yml @@ -13,12 +13,6 @@ runs: uses: ./.github/actions/setup-tarantool with: tarantool-version: ${{ inputs.tarantool-version }} - - name: install luacov-coveralls 0.2.3 - shell: bash - run: tt rocks install --server https://luarocks.org luacov-coveralls 0.2.3 - - name: install luacov-console 1.2.0 - shell: bash - run: tt rocks --server http://moonlibs.github.io/rocks install luacov-console 1.2.0 - name: install luatest scm-1 shell: bash run: tt rocks install luatest 1.1.0 diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 4346201..ef9a457 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -19,33 +19,3 @@ jobs: run: tt rocks make connection-dev-1.rockspec - name: run tests run: .rocks/bin/luatest --no-capture -v - - name: rename luacov.stats.out - run: mv luacov.stats.out luacov.stats.out-${{matrix.version}} - - uses: actions/upload-artifact@master - with: - name: luacov.stats.out-${{matrix.version}} - path: luacov.stats.out-${{matrix.version}} - - run-coverage-report: - runs-on: ubuntu-latest - needs: ["run-unit-tests"] - steps: - - uses: actions/checkout@v4 - - uses: ./.github/actions/setup-test - with: - tarantool-version: '2.11' - - name: Download run artifacts - uses: actions/download-artifact@v4 - with: - pattern: luacov.stats.out-* - merge-multiple: true - - name: debug - run: ls -la . - - name: merge luacov.stats.out - run: cat luacov.stats.out-* | >luacov.stats.out tarantool -e 'm={} for k in io.lines() do local vs=io.read():split(" ") vs[#vs]=nil local r = m[k] if r then for i, v in pairs(vs) do r[i]=r[i]+v end else m[k]=vs end end; for k, v in pairs(m) do print(k) print(table.concat(v, " ")) end' - - name: prepare coverage report - run: .rocks/bin/luacov-console . && .rocks/bin/luacov-console -s - - name: publish coveralls report - env: - COVERALLS_REPO_TOKEN: ${{ secrets.COVERALLS_REPO_TOKEN }} - run: .rocks/bin/luacov-coveralls -v \ No newline at end of file From 85114fdc47c06fcc899117c0e5ca0fd38c5949d3 Mon Sep 17 00:00:00 2001 From: Vladislav Grubov Date: Mon, 26 May 2025 18:08:23 +0300 Subject: [PATCH 7/7] readme.md --- README.md | 210 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 210 insertions(+) create mode 100644 README.md diff --git a/README.md b/README.md new file mode 100644 index 0000000..f12ece0 --- /dev/null +++ b/README.md @@ -0,0 +1,210 @@ +# Connection - Base Class for TCP Connections + +A Lua module providing a foundation for managing TCP socket connections in Tarantool applications. This library offers an object-oriented approach to handling network connections with support for asynchronous operations, automatic reconnection, and customizable event handling. + +## Features + +- Asynchronous TCP connection management +- Automatic reconnection with configurable timeout +- Buffer management for both reading and writing +- Event-based architecture with callbacks +- Non-blocking I/O + +## Installation + +### Using Tarantool Rocks + +```bash +tt rocks --server https://moonlibs.org install connection +``` + +### Manual Installation + +Clone the repository and install using the rockspec: + +```bash +git clone https://github.com/moonlibs/connection +cd connection +tt rocks make +``` + +## Usage + +### Basic Connection + +```lua +local connection = require('connection') + +-- Create a new connection +local conn = connection.new('example.com', 8080) + +-- Wait for connection to be established (if needed) +conn.connwait:get(timeout) + +-- Send data +conn:push_write("Hello, server!") +conn:flush() + +-- Close connection when done +conn:close() +``` + +### Connection Options + +```lua +local default_options = { + timeout = 1/3, -- Connection and request timeout in seconds + autoconnect = true, -- Connect immediately when created + reconnect = 1/3, -- Reconnect after 1/3 seconds if connection fails + maxbuf = 2 * 1024 * 1024 -- 2MB read buffer size for read +} + +local conn = connection.new('example.com', 8080, { + timeout = 1, -- Connection and request timeout in seconds + autoconnect = true, -- Connect immediately when created + reconnect = 0.5, -- Reconnect after 0.5 seconds if connection fails + maxbuf = 2 * 1024 * 1024 -- 2MB read buffer size for read +}) +``` + +### Custom Event Handlers + +```lua +local obj = require('obj') +local connection = require('connection') + +local MyConnection = obj.class({}, 'MyConnection', connection) + +function MyConnection:on_connected() + self:log('I', 'Successfully connected!') + -- Initialize session, authenticate, etc. +end + +function MyConnection:on_disconnect(err) + self:log('W', 'Disconnected: %s', err) + -- Clean up resources +end + +function MyConnection:on_read(is_last) + -- Process received data + local data = ffi.string(self.rbuf, self.avail) + self:log('D', 'Received data: %s', data) + + -- Important: reset buffer position after processing + self.avail = 0 +end + +-- Create instance of your connection +local conn = MyConnection:new('example.com', 8080) +``` + +## Connection States + +The connection can be in one of the following states: + +- `NOTCONNECTED` (0) - Initial state, not connected +- `CONNECTING` (1) - Connection attempt in progress +- `CONNECTED` (2) - Successfully connected +- `RECONNECTING` (3) - Attempting to reconnect after failure + +You can check the current state: + +```lua +if conn.state == connection.S2S.CONNECTED then + -- Connection is established +end +``` + +## API Reference + +### Methods (all methods never yields) + +- `new(host, port, options)` - Create a new connection +- `connect()` - Initiate connection +- `push_write(buf, len)` - Queue data to be sent, reallocates write buffer +- `flush()` - Send queued data +- `close()` - Close the connection, connection can be reused +- `destroy()` - Clean up resources, makes connection unusable +- `fdno()` - Returns file descriptor (or -1) + +### Callbacks (executed in separate fiber) + +- `on_connected()` - Called when connection is established, executed in separate fiber +- `on_disconnect(err: string)` - Called when connection is closed, executed in separate fiber +- `on_connect_failed(errno_code)` - Called when connection attempt fails, reconnection logic is performed here +- `on_connect_reset(errno_code)` - Called when connection is reset (closed due to an error) + +### Reading data + +- `on_read(is_last)` - Called when data is available for reading, from rw (read worker) fiber. + - **Important**: `on_read` is called directly from the read worker fiber. If your callback yields (using `fiber.sleep()` or other yielding operations), reading from the socket is stopped until the callback returns. + - If an exception is raised inside `on_read`, the connection will be reset. The connection can be reestablished automatically if the `reconnect` option is set. + +### Protected methods + +- `_cleanup(errno_code)` - Executes cleanup of all resources, closes socket, cancels fibers, drains buffers + +## Example: Echo Client + +Here's a simple echo client implementation that demonstrates basic usage: + +```lua +local connection = require('connection') +local fiber = require('fiber') +local ffi = require('ffi') +local obj = require('obj') + +local EchoClient = obj.class({}, 'EchoClient', connection) + +function EchoClient:on_connected() + self:log('I', 'Connected to echo server') + -- Send a message when connected + self:push_write("Hello, Echo Server!\n") + self:flush() +end + +function EchoClient:on_read(is_last) + -- Process the received echo response + local data = ffi.string(self.rbuf, self.avail) + self:log('I', 'Received echo: %s', data) + + -- Clear the buffer after processing + self.avail = 0 + + -- Send another message + if not is_last then + -- yielding in on_read callback stops read from socket. + fiber.sleep(1) + self:push_write("Another message!\n") + self:flush() + end +end + +function EchoClient:on_disconnect(err) + self:log('W', 'Disconnected from echo server: %s', err) +end + +-- Usage example +local function test_echo_client() + local client = EchoClient:new('localhost', 7777) + client.connwait:get(2) -- Wait up to 2 seconds for connection + + -- Keep the client running for a while + fiber.sleep(5) + + -- Close the connection + client:close() +end + +fiber.create(test_echo_client) +``` + +## Real-World Examples + +Some real world examples + +[connection-legacy](https://github.com/moonlibs/connection-legacy) repository, which implements a backward compatible API on top of this module. + +[connection-scribe](https://github.com/moonlibs/connection-scribe) repository, which implements Scribe protocol + +[tarantool1.5-replica](https://github.com/ochaton/migrate/blob/master/migrate/replica.lua) repository, which implements replication protocol of Tarantool 1.5