Skip to content

Commit

Permalink
Merge pull request zeromq#251 from bendikro/master-err-channel
Browse files Browse the repository at this point in the history
Fix zeromq#242: Unable to recover from panics when creating new Channeler
  • Loading branch information
taotetek authored Jan 21, 2019
2 parents ac82ea8 + 7494a0c commit 3608957
Show file tree
Hide file tree
Showing 9 changed files with 194 additions and 59 deletions.
35 changes: 28 additions & 7 deletions auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ func TestAuthIPAllow(t *testing.T) {
}
defer poller.Destroy()

s := poller.Wait(200)
s, err := poller.Wait(200)
if err != nil {
t.Error(err)
}
if want, have := server, s; want != have {
t.Errorf("want %#v, have %#v", want, have)
}
Expand Down Expand Up @@ -162,7 +165,10 @@ func TestAuthPlain(t *testing.T) {
}
defer poller.Destroy()

s := poller.Wait(200)
s, err := poller.Wait(200)
if err != nil {
t.Error(err)
}
if want, have := server, s; want != have {
t.Errorf("want %#v, have %#v", want, have)
}
Expand Down Expand Up @@ -195,7 +201,10 @@ func TestAuthPlain(t *testing.T) {
t.Error(err)
}

s = poller.Wait(200)
s, err = poller.Wait(200)
if err != nil {
t.Error(err)
}
if s != nil {
t.Errorf("want %#v, have %#v", nil, s)
}
Expand Down Expand Up @@ -274,7 +283,10 @@ func TestAuthCurveAllowAny(t *testing.T) {
}
defer poller.Destroy()

s := poller.Wait(2000)
s, err := poller.Wait(2000)
if err != nil {
t.Error(err)
}
if want, have := server, s; want != have {
t.Errorf("want %#v, have %#v", want, have)
}
Expand Down Expand Up @@ -302,7 +314,10 @@ func TestAuthCurveAllowAny(t *testing.T) {
t.Error(err)
}

s = poller.Wait(200)
s, err = poller.Wait(200)
if err != nil {
t.Error(err)
}
if s != nil {
t.Errorf("want %#v, have %#v", nil, s)
}
Expand Down Expand Up @@ -381,7 +396,10 @@ func TestAuthCurveAllowCertificate(t *testing.T) {
}
defer poller.Destroy()

s := poller.Wait(200)
s, err := poller.Wait(200)
if err != nil {
t.Error(err)
}
if want, have := server, s; want != have {
t.Errorf("want '%#v', have '%#v'", want, have)
}
Expand All @@ -400,7 +418,10 @@ func TestAuthCurveAllowCertificate(t *testing.T) {
t.Error(err)
}

s = poller.Wait(200)
s, err = poller.Wait(200)
if err != nil {
t.Error(err)
}
if s != nil {
t.Errorf("want '%#v', have '%#v", nil, s)
}
Expand Down
90 changes: 66 additions & 24 deletions channeler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,37 +27,52 @@ type Channeler struct {
commandChan chan<- string
SendChan chan<- [][]byte
RecvChan <-chan [][]byte
ErrChan <-chan error
errChan chan<- error
destroyed bool
}

// Destroy sends a message to the Channeler to shut it down
// and clean it up.
func (c *Channeler) Destroy() {
if c.destroyed {
return
}
c.commandChan <- "destroy"
}

// Subscribe to a Topic
func (c *Channeler) Subscribe(topic string) {
if c.destroyed {
return
}
c.commandChan <- fmt.Sprintf("subscribe %s", topic)
}

// Unsubscribe from a Topic
func (c *Channeler) Unsubscribe(topic string) {
if c.destroyed {
return
}
c.commandChan <- fmt.Sprintf("unsubscribe %s", topic)
}

// actor is a routine that handles communication with
// the zeromq socket.
func (c *Channeler) actor(recvChan chan<- [][]byte, options []SockOption) {
pipe, err := NewPair(fmt.Sprintf(">%s", c.commandAddr))

if err != nil {
panic(err)
c.errChan <- err
return
}
defer pipe.Destroy()
defer close(recvChan)

pull, err := NewPull(c.proxyAddr)
if err != nil {
panic(err)
c.errChan <- err
return
}
defer pull.Destroy()

Expand All @@ -67,13 +82,15 @@ func (c *Channeler) actor(recvChan chan<- [][]byte, options []SockOption) {
case Pub, Rep, Pull, Router, XPub:
err = sock.Attach(c.endpoints, true)
if err != nil {
panic(err)
c.errChan <- err
return
}

case Req, Push, Dealer, Pair, Stream, XSub:
err = sock.Attach(c.endpoints, false)
if err != nil {
panic(err)
c.errChan <- err
return
}

case Sub:
Expand All @@ -86,26 +103,34 @@ func (c *Channeler) actor(recvChan chan<- [][]byte, options []SockOption) {

err = sock.Attach(c.endpoints, false)
if err != nil {
panic(err)
c.errChan <- err
return
}

default:
panic(ErrInvalidSockType)
c.errChan <- ErrInvalidSockType
return
}

poller, err := NewPoller(sock, pull, pipe)
if err != nil {
panic(err)
c.errChan <- err
goto ExitActor
}
defer poller.Destroy()

for {
s := poller.Wait(-1)
s, err := poller.Wait(-1)
if err != nil {
c.errChan <- err
goto ExitActor
}
switch s {
case pipe:
cmd, err := pipe.RecvMessage()
if err != nil {
panic(err)
c.errChan <- err
goto ExitActor
}

switch string(cmd[0]) {
Expand All @@ -129,19 +154,22 @@ func (c *Channeler) actor(recvChan chan<- [][]byte, options []SockOption) {
case sock:
msg, err := s.RecvMessage()
if err != nil {
panic(err)
c.errChan <- err
goto ExitActor
}
recvChan <- msg

case pull:
msg, err := pull.RecvMessage()
if err != nil {
panic(err)
c.errChan <- err
goto ExitActor
}

err = sock.SendMessage(msg)
if err != nil {
panic(err)
c.errChan <- err
goto ExitActor
}
}
}
Expand All @@ -153,13 +181,15 @@ ExitActor:
func (c *Channeler) channeler(commandChan <-chan string, sendChan <-chan [][]byte) {
push, err := NewPush(c.proxyAddr)
if err != nil {
panic(err)
c.errChan <- err
return
}
defer push.Destroy()

pipe, err := NewPair(fmt.Sprintf("@%s", c.commandAddr))
if err != nil {
panic(err)
c.errChan <- err
goto ExitChanneler
}
defer pipe.Destroy()

Expand All @@ -168,13 +198,15 @@ func (c *Channeler) channeler(commandChan <-chan string, sendChan <-chan [][]byt
case cmd := <-commandChan:
switch cmd {
case "destroy":
c.destroyed = true
err = pipe.SendFrame([]byte("destroy"), FlagNone)
if err != nil {
panic(err)
}
_, err = pipe.RecvMessage()
if err != nil {
panic(err)
c.errChan <- err
} else {
_, err = pipe.RecvMessage()
if err != nil {
c.errChan <- err
}
}
goto ExitChanneler
default:
Expand All @@ -186,18 +218,18 @@ func (c *Channeler) channeler(commandChan <-chan string, sendChan <-chan [][]byt
}
err := pipe.SendMessage(message)
if err != nil {
panic(err)
c.errChan <- err
}
_, err = pipe.RecvMessage()
if err != nil {
panic(err)
c.errChan <- err
}
}

case msg := <-sendChan:
err := push.SendMessage(msg)
if err != nil {
panic(err)
c.errChan <- err
}
}
}
Expand All @@ -210,6 +242,7 @@ func newChanneler(sockType int, endpoints string, subscribe []string, options []
commandChan := make(chan string)
sendChan := make(chan [][]byte)
recvChan := make(chan [][]byte)
errChan := make(chan error)

C.Sock_init()
c := &Channeler{
Expand All @@ -219,6 +252,8 @@ func newChanneler(sockType int, endpoints string, subscribe []string, options []
commandChan: commandChan,
SendChan: sendChan,
RecvChan: recvChan,
ErrChan: errChan,
errChan: errChan,
}
c.commandAddr = fmt.Sprintf("inproc://actorcontrol%d", c.id)
c.proxyAddr = fmt.Sprintf("inproc://proxy%d", c.id)
Expand Down Expand Up @@ -248,6 +283,7 @@ func NewPubChanneler(endpoints string, options ...SockOption) *Channeler {
func NewSubChanneler(endpoints string, varargs ...interface{}) *Channeler {
subscribe := []string{}
options := []SockOption{}
var err error

for _, arg := range varargs {
switch x := arg.(type) {
Expand All @@ -256,11 +292,17 @@ func NewSubChanneler(endpoints string, varargs ...interface{}) *Channeler {
case SockOption:
options = append(options, x)
default:
panic(fmt.Sprintf("Don't know how to handle a %T argument to NewSubChanneler", arg))
err = fmt.Errorf("Don't know how to handle a %T argument to NewSubChanneler", arg)
break
}
}

return newChanneler(Sub, endpoints, subscribe, options)
channeler := newChanneler(Sub, endpoints, subscribe, options)

if err != nil {
go func() { channeler.errChan <- err }()
}
return channeler
}

// NewRepChanneler creates a new Channeler wrapping
Expand Down
40 changes: 40 additions & 0 deletions channeler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,14 @@ func TestPubSubChannelerNoInitialSubscription(t *testing.T) {
}
}

func TestPubSubChannelerOptionError(t *testing.T) {
sub := NewSubChanneler("inproc://channelerpubsub2", 32)
defer sub.Destroy()
err := <-sub.ErrChan
expected := fmt.Errorf("Don't know how to handle a %T argument to NewSubChanneler", 32)
assertEqual(t, expected.Error(), err.Error())
}

func TestDealerRouterChanneler(t *testing.T) {
dealer := NewDealerChanneler("inproc://channelerdealerrouter")
defer dealer.Destroy()
Expand All @@ -127,6 +135,38 @@ func TestDealerRouterChanneler(t *testing.T) {
}
}

func TestDealerRouterChannelerAttachError(t *testing.T) {
dealer := NewDealerChanneler("bad endpoint")
defer dealer.Destroy()

dealer.SendChan <- [][]byte{[]byte("hello")}

select {
case resp := <-dealer.RecvChan:
if want, got := "world", string(resp[0]); want != got {
t.Errorf("want '%s', got '%s'", want, got)
}
case err := <-dealer.ErrChan:
assertEqual(t, ErrSockAttach, err)
}
}

func TestDealerRouterChannelerEmptyEndpointsError(t *testing.T) {
dealer := NewDealerChanneler("")
defer dealer.Destroy()

dealer.SendChan <- [][]byte{[]byte("hello")}

select {
case resp := <-dealer.RecvChan:
if want, got := "world", string(resp[0]); want != got {
t.Errorf("want '%s', got '%s'", want, got)
}
case err := <-dealer.ErrChan:
assertEqual(t, ErrSockAttachEmptyEndpoints, err)
}
}

func TestDealerChannelerRecvChanIsClosedOnDestroy(t *testing.T) {
test_router := NewRouterChanneler("inproc://channelerouter")

Expand Down
3 changes: 3 additions & 0 deletions goczmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ var (
// ErrSockAttach is returned when an attach call to a socket fails
ErrSockAttach = errors.New("error attaching zsock")

// ErrSockAttachEmptyEndpoints is returned when the endpoints value is empty
ErrSockAttachEmptyEndpoints = errors.New("Endpoints cannot be empty")

// ErrInvalidSockType is returned when a function is called
// against a socket type that is not applicable for that socket type
ErrInvalidSockType = errors.New("invalid socket type")
Expand Down
Loading

0 comments on commit 3608957

Please sign in to comment.