Skip to content

Commit

Permalink
feat(grpc): add txn/setnx (#181)
Browse files Browse the repository at this point in the history
  • Loading branch information
spacewander authored Oct 20, 2022
1 parent e0457f9 commit a892f4e
Show file tree
Hide file tree
Showing 3 changed files with 301 additions and 29 deletions.
88 changes: 88 additions & 0 deletions lib/resty/etcd/proto.lua
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ service KV {
// A delete request increments the revision of the key-value store
// and generates a delete event in the event history for every deleted key.
rpc DeleteRange(DeleteRangeRequest) returns (DeleteRangeResponse) {}
// Txn processes multiple requests in a single transaction.
// A txn request increments the revision of the key-value store
// and generates events with the same revision for every completed request.
// It is not allowed to modify the same key several times within one txn.
rpc Txn(TxnRequest) returns (TxnResponse) {}
}
message ResponseHeader {
Expand Down Expand Up @@ -181,6 +187,88 @@ message DeleteRangeResponse {
repeated KeyValue prev_kvs = 3;
}
message RequestOp {
// request is a union of request types accepted by a transaction.
oneof request {
RangeRequest request_range = 1;
PutRequest request_put = 2;
DeleteRangeRequest request_delete_range = 3;
TxnRequest request_txn = 4;
}
}
message ResponseOp {
// response is a union of response types returned by a transaction.
oneof response {
RangeResponse response_range = 1;
PutResponse response_put = 2;
DeleteRangeResponse response_delete_range = 3;
TxnResponse response_txn = 4;
}
}
message Compare {
enum CompareResult {
EQUAL = 0;
GREATER = 1;
LESS = 2;
NOT_EQUAL = 3;
}
enum CompareTarget {
VERSION = 0;
CREATE = 1;
MOD = 2;
VALUE = 3;
LEASE = 4;
}
// result is logical comparison operation for this comparison.
CompareResult result = 1;
// target is the key-value field to inspect for the comparison.
CompareTarget target = 2;
// key is the subject key for the comparison operation.
bytes key = 3;
oneof target_union {
// version is the version of the given key
int64 version = 4;
// create_revision is the creation revision of the given key
int64 create_revision = 5;
// mod_revision is the last modified revision of the given key.
int64 mod_revision = 6;
// value is the value of the given key, in bytes.
bytes value = 7;
// lease is the lease id of the given key.
int64 lease = 8;
// leave room for more target_union field tags, jump to 64
}
// range_end compares the given target to all keys in the range [key, range_end).
// See RangeRequest for more details on key ranges.
bytes range_end = 64;
// TODO: fill out with most of the rest of RangeRequest fields when needed.
}
message TxnRequest {
// compare is a list of predicates representing a conjunction of terms.
// If the comparisons succeed, then the success requests will be processed in order,
// and the response will contain their respective responses in order.
// If the comparisons fail, then the failure requests will be processed in order,
// and the response will contain their respective responses in order.
repeated Compare compare = 1;
// success is a list of requests which will be applied when compare evaluates to true.
repeated RequestOp success = 2;
// failure is a list of requests which will be applied when compare evaluates to false.
repeated RequestOp failure = 3;
}
message TxnResponse {
ResponseHeader header = 1;
// succeeded is set to true if the compare evaluated to true or false otherwise.
bool succeeded = 2;
// responses is a list of responses corresponding to the results from applying
// success if succeeded is true or failure if succeeded is false.
repeated ResponseOp responses = 3;
}
message Event {
enum EventType {
PUT = 0;
Expand Down
93 changes: 64 additions & 29 deletions lib/resty/etcd/v3.lua
Original file line number Diff line number Diff line change
Expand Up @@ -661,13 +661,19 @@ local function txn(self, opts_arg, compare, success, failure)
return nil, "success and failure couldn't be empty at the same time"
end

local body = {
compare = compare,
success = success,
failure = failure,
}

if self.use_grpc then
return self:grpc_call("Txn", body, nil, nil, opts_arg)
end

local timeout = opts_arg and opts_arg.timeout
local opts = {
body = {
compare = compare,
success = success,
failure = failure,
},
body = body,
}

return _request_uri(self, "POST", "/kv/txn", opts, timeout or self.timeout)
Expand Down Expand Up @@ -1127,25 +1133,38 @@ end
local failure = {}
function _M.setnx(self, key, val, opts)
clear_tab(compare)
clear_tab(success)

key = utils.get_real_key(self.key_prefix, key)

compare[1] = {}
compare[1].target = "CREATE"
compare[1].key = encode_base64(key)
compare[1].createRevision = 0
if self.use_grpc then
compare[1] = {}
compare[1].target = "CREATE"
compare[1].key = key
compare[1].create_revision = 0

clear_tab(success)
success[1] = {}
success[1].requestPut = {}
success[1].requestPut.key = encode_base64(key)
success[1] = {}
success[1].request_put = {}
success[1].request_put.key = key

local err
val, err = serialize_and_encode_base64(self.serializer.serialize, val)
if not val then
return nil, "failed to encode val: " .. err
success[1].request_put.value = val
else
compare[1] = {}
compare[1].target = "CREATE"
compare[1].key = encode_base64(key)
compare[1].createRevision = 0

success[1] = {}
success[1].requestPut = {}
success[1].requestPut.key = encode_base64(key)

local err
val, err = serialize_and_encode_base64(self.serializer.serialize, val)
if not val then
return nil, "failed to encode val: " .. err
end
success[1].requestPut.value = val
end
success[1].requestPut.value = val

return txn(self, opts, compare, success, nil)
end
Expand Down Expand Up @@ -1187,12 +1206,17 @@ function _M.txn(self, compare, success, failure, opts)
for i, rule in ipairs(compare) do
rule = tab_clone(rule)

rule.key = encode_base64(utils.get_real_key(self.key_prefix, rule.key))
if self.use_grpc then
rule.key = utils.get_real_key(self.key_prefix, rule.key)
else
rule.key = encode_base64(utils.get_real_key(self.key_prefix, rule.key))

if rule.value then
rule.value, err = serialize_and_encode_base64(self.serializer.serialize, rule.value)
if not rule.value then
return nil, "failed to encode value: " .. err
if rule.value then
rule.value, err =
serialize_and_encode_base64(self.serializer.serialize, rule.value)
if not rule.value then
return nil, "failed to encode value: " .. err
end
end
end

Expand All @@ -1207,14 +1231,23 @@ function _M.txn(self, compare, success, failure, opts)
rule = tab_clone(rule)
if rule.requestPut then
local requestPut = tab_clone(rule.requestPut)
requestPut.key = encode_base64(utils.get_real_key(self.key_prefix, requestPut.key))
requestPut.value, err = serialize_and_encode_base64(self.serializer.serialize,
requestPut.value)
if not requestPut.value then
return nil, "failed to encode value: " .. err
if self.use_grpc then
requestPut.key = utils.get_real_key(self.key_prefix, requestPut.key)
else
requestPut.key =
encode_base64(utils.get_real_key(self.key_prefix, requestPut.key))
requestPut.value, err = serialize_and_encode_base64(self.serializer.serialize,
requestPut.value)
if not requestPut.value then
return nil, "failed to encode value: " .. err
end
end

rule.requestPut = requestPut
if self.use_grpc then
rule.request_put = requestPut
else
rule.requestPut = requestPut
end
end
new_rules[i] = rule
end
Expand Down Expand Up @@ -1348,6 +1381,8 @@ local implemented_grpc_methods = {
set = true,
delete = true,
readdir = true,
txn = true,
setnx = true,
}
for k, v in pairs(_M) do
_http_M[k] = v
Expand Down
149 changes: 149 additions & 0 deletions t/v3/grpc/txn.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
use Test::Nginx::Socket::Lua;

log_level('info');
no_long_string();
repeat_each(1);

my $etcd_version = `etcd --version`;
if ($etcd_version =~ /^etcd Version: 2/ || $etcd_version =~ /^etcd Version: 3.1./) {
plan(skip_all => "etcd is too old, skip v3 protocol");
} else {
plan 'no_plan';
}

our $HttpConfig = <<'_EOC_';
lua_socket_log_errors off;
lua_package_path 'lib/?.lua;/usr/local/share/lua/5.3/?.lua;/usr/share/lua/5.1/?.lua;;';
init_by_lua_block {
local cjson = require("cjson.safe")
function check_res(data, err, val, status)
if err then
ngx.say("err: ", err)
ngx.exit(200)
end
if val then
if data.body.kvs==nil then
ngx.exit(404)
end
if data.body.kvs and val ~= data.body.kvs[1].value then
ngx.say("failed to check value")
ngx.log(ngx.ERR, "failed to check value, got: ",data.body.kvs[1].value,
", expect: ", val)
ngx.exit(200)
else
ngx.say("checked val as expect: ", val)
end
end
if status and status ~= data.status then
ngx.exit(data.status)
end
end
}
_EOC_

add_block_preprocessor(sub {
my ($block) = @_;

if (!$block->main_config) {
$block->set_value("main_config", "thread_pool grpc-client-nginx-module threads=1;");
}

if (!$block->request) {
$block->set_value("request", "GET /t");
}

if ((!defined $block->error_log) && (!defined $block->no_error_log)) {
$block->set_value("no_error_log", "[error]");
}
});

run_tests();

__DATA__
=== TEST 1: txn("EQUAL") and get
--- http_config eval: $::HttpConfig
--- config
location /t {
content_by_lua_block {
local etcd, err = require "resty.etcd" .new({protocol = "v3", use_grpc = true})
check_res(etcd, err)
local res, err = etcd:set("/test", "abc")
check_res(res, err)
local data, err = etcd:get("/test")
check_res(data, err, "abc")
local data, err = etcd:txn(
{{key = "/test", result = "EQUAL", value = "abc", target = "VALUE"}},
{{requestPut = {key = "/test", value = "ddd"}}}
)
check_res(data, err)
local data, err = etcd:get("/test")
check_res(data, err, "ddd")
}
}
--- response_body
checked val as expect: abc
checked val as expect: ddd
=== TEST 2: txn(not "EQUAL") and get
--- http_config eval: $::HttpConfig
--- config
location /t {
content_by_lua_block {
local etcd, err = require "resty.etcd" .new({protocol = "v3", use_grpc = true})
check_res(etcd, err)
local res, err = etcd:set("/test", "abc")
check_res(res, err)
local data, err = etcd:get("/test")
check_res(data, err, "abc")
local data, err = etcd:txn(
{{key = "/test", result = "EQUAL", value = "not equal", target = "VALUE"}},
{{requestPut = {key = "/test", value = "ddd"}}}
)
check_res(data, err)
local data, err = etcd:get("/test")
check_res(data, err, "abc")
}
}
--- response_body
checked val as expect: abc
checked val as expect: abc
=== TEST 3: setnx(key, val)
--- http_config eval: $::HttpConfig
--- config
location /t {
content_by_lua_block {
local etcd, err = require "resty.etcd" .new({protocol = "v3"})
check_res(etcd, err)
local res, err = etcd:delete("/setnx")
check_res(res, err, nil, 200)
local res, err = etcd:setnx("/setnx", "aaa")
check_res(res, err, nil, 200)
local res, err = etcd:setnx("/setnx", "bbb")
check_res(res, err, nil, 200)
local data, err = etcd:get("/setnx")
check_res(data, err, "aaa", 200)
}
}
--- response_body
checked val as expect: aaa

0 comments on commit a892f4e

Please sign in to comment.