Skip to content

Add public API with request object types #162

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.

- SSL support (#155)
- IPROTO_PUSH messages support (#67)
- Public API with request object types (#126)

### Changed

- Add `Call16` method, support build tag `go_tarantool_call_17`
to choose behavior for `Call` method (#125)
- `IPROTO_*` constants that identify requests renamed from `<Name>Request` to
`<Name>RequestCode` (#126)

### Fixed

Expand Down
26 changes: 26 additions & 0 deletions call_16_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,29 @@ func TestConnection_Call(t *testing.T) {
t.Errorf("result is not {{1}} : %v", resp.Data)
}
}

func TestCallRequest(t *testing.T) {
var resp *Response
var err error

conn := connect(t, server, opts)
defer conn.Close()

req := NewCallRequest("simple_incr").Args([]interface{}{1})
resp, err = conn.Do(req)
if err != nil {
t.Errorf("Failed to use Call")
}
if resp.Data[0].([]interface{})[0].(uint64) != 2 {
t.Errorf("result is not {{1}} : %v", resp.Data)
}
}

func TestCallRequestCode(t *testing.T) {
req := NewCallRequest("simple_incrt")
code := req.Code()
expected := Call16RequestCode
if code != int32(expected) {
t.Errorf("CallRequest actual code %v != %v", code, expected)
}
}
26 changes: 26 additions & 0 deletions call_17_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,29 @@ func TestConnection_Call(t *testing.T) {
t.Errorf("result is not {{1}} : %v", resp.Data)
}
}

func TestCallRequest(t *testing.T) {
var resp *Response
var err error

conn := connect(t, server, opts)
defer conn.Close()

req := NewCallRequest("simple_incr").Args([]interface{}{1})
resp, err = conn.Do(req)
if err != nil {
t.Errorf("Failed to use Call")
}
if resp.Data[0].(uint64) != 2 {
t.Errorf("result is not {{1}} : %v", resp.Data)
}
}

func TestCallRequestCode(t *testing.T) {
req := NewCallRequest("simple_incrt")
code := req.Code()
expected := Call17RequestCode
if code != int32(expected) {
t.Errorf("CallRequest actual code %v != %v", code, expected)
}
}
73 changes: 73 additions & 0 deletions client_tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,79 @@ func (o Op) EncodeMsgpack(enc *msgpack.Encoder) error {
return enc.Encode(o.Arg)
}

const (
appendOperator = "+"
subtractionOperator = "-"
bitwiseAndOperator = "&"
bitwiseOrOperator = "|"
bitwiseXorOperator = "^"
spliceOperator = ":"
insertOperator = "!"
deleteOperator = "#"
assignOperator = "="
)

// Operations is a collection of update operations.
type Operations struct {
ops []Op
}

// NewOperations returns a new empty collection of update operations.
func NewOperations() *Operations {
ops := new(Operations)
return ops
}

func (ops *Operations) append(op string, field int, arg interface{}) *Operations {
ops.ops = append(ops.ops, Op{op, field, arg})
return ops
}

// Add adds an additional operation to the collection of update operations.
func (ops *Operations) Add(field int, arg interface{}) *Operations {
return ops.append(appendOperator, field, arg)
}

// Subtract adds a subtraction operation to the collection of update operations.
func (ops *Operations) Subtract(field int, arg interface{}) *Operations {
return ops.append(subtractionOperator, field, arg)
}

// BitwiseAnd adds a bitwise AND operation to the collection of update operations.
func (ops *Operations) BitwiseAnd(field int, arg interface{}) *Operations {
return ops.append(bitwiseAndOperator, field, arg)
}

// BitwiseOr adds a bitwise OR operation to the collection of update operations.
func (ops *Operations) BitwiseOr(field int, arg interface{}) *Operations {
return ops.append(bitwiseOrOperator, field, arg)
}

// BitwiseXor adds a bitwise XOR operation to the collection of update operations.
func (ops *Operations) BitwiseXor(field int, arg interface{}) *Operations {
return ops.append(bitwiseXorOperator, field, arg)
}

// Splice adds a splice operation to the collection of update operations.
func (ops *Operations) Splice(field int, arg interface{}) *Operations {
return ops.append(spliceOperator, field, arg)
}

// Insert adds an insert operation to the collection of update operations.
func (ops *Operations) Insert(field int, arg interface{}) *Operations {
return ops.append(insertOperator, field, arg)
}

// Delete adds a delete operation to the collection of update operations.
func (ops *Operations) Delete(field int, arg interface{}) *Operations {
return ops.append(deleteOperator, field, arg)
}

// Assign adds an assign operation to the collection of update operations.
func (ops *Operations) Assign(field int, arg interface{}) *Operations {
return ops.append(assignOperator, field, arg)
}

type OpSplice struct {
Op string
Field int
Expand Down
86 changes: 61 additions & 25 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,18 +468,35 @@ func (conn *Connection) dial() (err error) {
return
}

func (conn *Connection) writeAuthRequest(w *bufio.Writer, scramble []byte) (err error) {
request := &Future{
requestId: 0,
requestCode: AuthRequest,
func pack(h *smallWBuf, enc *msgpack.Encoder, reqid uint32, req Request, res SchemaResolver) (err error) {
hl := h.Len()
h.Write([]byte{
0xce, 0, 0, 0, 0, // Length.
0x82, // 2 element map.
KeyCode, byte(req.Code()), // Request code.
KeySync, 0xce,
byte(reqid >> 24), byte(reqid >> 16),
byte(reqid >> 8), byte(reqid),
})

if err = req.Body(res, enc); err != nil {
return
}

l := uint32(h.Len() - 5 - hl)
h.b[hl+1] = byte(l >> 24)
h.b[hl+2] = byte(l >> 16)
h.b[hl+3] = byte(l >> 8)
h.b[hl+4] = byte(l)

return
}

func (conn *Connection) writeAuthRequest(w *bufio.Writer, scramble []byte) (err error) {
var packet smallWBuf
err = request.pack(&packet, msgpack.NewEncoder(&packet), func(enc *msgpack.Encoder) error {
return enc.Encode(map[uint32]interface{}{
KeyUserName: conn.opts.User,
KeyTuple: []interface{}{string("chap-sha1"), string(scramble)},
})
})
req := newAuthRequest(conn.opts.User, string(scramble))
err = pack(&packet, msgpack.NewEncoder(&packet), 0, req, conn.Schema)

if err != nil {
return errors.New("auth: pack error " + err.Error())
}
Expand Down Expand Up @@ -704,7 +721,7 @@ func (conn *Connection) reader(r *bufio.Reader, c net.Conn) {
}
}

func (conn *Connection) newFuture(requestCode int32) (fut *Future) {
func (conn *Connection) newFuture() (fut *Future) {
fut = NewFuture()
if conn.rlimit != nil && conn.opts.RLimitAction == RLimitDrop {
select {
Expand All @@ -720,7 +737,6 @@ func (conn *Connection) newFuture(requestCode int32) (fut *Future) {
}
}
fut.requestId = conn.nextRequestId()
fut.requestCode = requestCode
shardn := fut.requestId & (conn.opts.Concurrency - 1)
shard := &conn.shard[shardn]
shard.rmut.Lock()
Expand Down Expand Up @@ -769,23 +785,16 @@ func (conn *Connection) newFuture(requestCode int32) (fut *Future) {
return
}

func (conn *Connection) sendFuture(fut *Future, body func(*msgpack.Encoder) error) *Future {
func (conn *Connection) send(req Request) *Future {
fut := conn.newFuture()
if fut.ready == nil {
return fut
}
conn.putFuture(fut, body)
conn.putFuture(fut, req)
return fut
}

func (conn *Connection) failFuture(fut *Future, err error) *Future {
if f := conn.fetchFuture(fut.requestId); f == fut {
fut.SetError(err)
conn.markDone(fut)
}
return fut
}

func (conn *Connection) putFuture(fut *Future, body func(*msgpack.Encoder) error) {
func (conn *Connection) putFuture(fut *Future, req Request) {
shardn := fut.requestId & (conn.opts.Concurrency - 1)
shard := &conn.shard[shardn]
shard.bufmut.Lock()
Expand All @@ -801,10 +810,11 @@ func (conn *Connection) putFuture(fut *Future, body func(*msgpack.Encoder) error
shard.enc = msgpack.NewEncoder(&shard.buf)
}
blen := shard.buf.Len()
if err := fut.pack(&shard.buf, shard.enc, body); err != nil {
reqid := fut.requestId
if err := pack(&shard.buf, shard.enc, reqid, req, conn.Schema); err != nil {
shard.buf.Trunc(blen)
shard.bufmut.Unlock()
if f := conn.fetchFuture(fut.requestId); f == fut {
if f := conn.fetchFuture(reqid); f == fut {
fut.SetError(err)
conn.markDone(fut)
} else if f != nil {
Expand Down Expand Up @@ -978,6 +988,32 @@ func (conn *Connection) nextRequestId() (requestId uint32) {
return atomic.AddUint32(&conn.requestId, 1)
}

// Do verifies, sends the request and returns a response.
//
// An error is returned if the request was formed incorrectly, or failed to
// communicate by the connection, or unable to decode the response.
func (conn *Connection) Do(req Request) (*Response, error) {
fut := conn.DoAsync(req)
return fut.Get()
}

// DoTyped verifies, sends the request and fills the typed result.
//
// An error is returned if the request was formed incorrectly, or failed to
// communicate by the connection, or unable to decode the response.
func (conn *Connection) DoTyped(req Request, result interface{}) error {
fut := conn.DoAsync(req)
return fut.GetTyped(result)
}

// DoAsync verifies, sends the request and returns a future.
//
// An error is returned if the request was formed incorrectly, or failed to
// create the future.
func (conn *Connection) DoAsync(req Request) *Future {
return conn.send(req)
}

// ConfiguredTimeout returns a timeout from connection config.
func (conn *Connection) ConfiguredTimeout() time.Duration {
return conn.opts.Timeout
Expand Down
30 changes: 30 additions & 0 deletions connection_pool/connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,36 @@ func (connPool *ConnectionPool) EvalAsync(expr string, args interface{}, userMod
return conn.EvalAsync(expr, args)
}

// Do sends the request and returns a response.
func (connPool *ConnectionPool) Do(req tarantool.Request, userMode Mode) (*tarantool.Response, error) {
conn, err := connPool.getNextConnection(userMode)
if err != nil {
return nil, err
}

return conn.Do(req)
}

// DoTyped sends the request and fills the typed result.
func (connPool *ConnectionPool) DoTyped(req tarantool.Request, result interface{}, userMode Mode) error {
conn, err := connPool.getNextConnection(userMode)
if err != nil {
return err
}

return conn.DoTyped(req, result)
}

// DoAsync sends the request and returns a future.
func (connPool *ConnectionPool) DoAsync(req tarantool.Request, userMode Mode) *tarantool.Future {
conn, err := connPool.getNextConnection(userMode)
if err != nil {
return tarantool.NewErrorFuture(err)
}

return conn.DoAsync(req)
}

//
// private
//
Expand Down
39 changes: 39 additions & 0 deletions connection_pool/connection_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1252,6 +1252,45 @@ func TestPing(t *testing.T) {
require.NotNilf(t, resp, "response is nil after Ping")
}

func TestDo(t *testing.T) {
roles := []bool{true, true, false, true, false}

err := test_helpers.SetClusterRO(servers, connOpts, roles)
require.Nilf(t, err, "fail to set roles for cluster")

connPool, err := connection_pool.Connect(servers, connOpts)
require.Nilf(t, err, "failed to connect")
require.NotNilf(t, connPool, "conn is nil after Connect")

defer connPool.Close()

req := tarantool.NewPingRequest()
// ANY
resp, err := connPool.Do(req, connection_pool.ANY)
require.Nilf(t, err, "failed to Ping")
require.NotNilf(t, resp, "response is nil after Ping")

// RW
resp, err = connPool.Do(req, connection_pool.RW)
require.Nilf(t, err, "failed to Ping")
require.NotNilf(t, resp, "response is nil after Ping")

// RO
resp, err = connPool.Do(req, connection_pool.RO)
require.Nilf(t, err, "failed to Ping")
require.NotNilf(t, resp, "response is nil after Ping")

// PreferRW
resp, err = connPool.Do(req, connection_pool.PreferRW)
require.Nilf(t, err, "failed to Ping")
require.NotNilf(t, resp, "response is nil after Ping")

// PreferRO
resp, err = connPool.Do(req, connection_pool.PreferRO)
require.Nilf(t, err, "failed to Ping")
require.NotNilf(t, resp, "response is nil after Ping")
}

// runTestMain is a body of TestMain function
// (see https://pkg.go.dev/testing#hdr-Main).
// Using defer + os.Exit is not works so TestMain body
Expand Down
19 changes: 19 additions & 0 deletions connection_pool/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,3 +529,22 @@ func ExampleConnectionPool_Eval() {
// Code 0
// Data [3]
}

func ExampleConnectionPool_Do() {
pool, err := examplePool(testRoles)
if err != nil {
fmt.Println(err)
}
defer pool.Close()

// Ping a Tarantool instance to check connection.
req := tarantool.NewPingRequest()
resp, err := pool.Do(req, connection_pool.ANY)
fmt.Println("Ping Code", resp.Code)
fmt.Println("Ping Data", resp.Data)
fmt.Println("Ping Error", err)
// Output:
// Ping Code 0
// Ping Data []
// Ping Error <nil>
}
Loading