Skip to content

Commit

Permalink
Dynamic reload of consul agent name (#13)
Browse files Browse the repository at this point in the history
This commit allows to dynamically rename consul agents in client mode using consul reload.

When a reload with name change is triggered, the agent leaves the serf cluster and join it back with its new name using a subset of IPs of the previously known peers as start addresses. This commit does not handle server being renamed to mitigate potential risks on the cluster.
  • Loading branch information
clems4ever authored Jan 20, 2021
1 parent 78031c6 commit c339387
Show file tree
Hide file tree
Showing 6 changed files with 354 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .changelog/9525.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
config reload: dynamically reload node name of consul agents.
```
63 changes: 63 additions & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func ConfigSourceFromName(name string) (configSource, bool) {
// consul.Client and consul.Server.
type delegate interface {
GetLANCoordinate() (lib.CoordinateSet, error)
ReconnectSerfWithNewNodeName(oldNodeName string) error
Leave() error
LANMembers() []serf.Member
LANMembersAllSegments() ([]serf.Member, error)
Expand Down Expand Up @@ -1305,6 +1306,43 @@ func (a *Agent) RPC(method string, args interface{}, reply interface{}) error {
return a.delegate.RPC(method, args, reply)
}

// ReconnectSerfWithNewNodeName is used to prepare reconnect to the serf cluster with the new node name
func (a *Agent) ReconnectSerfWithNewNodeName(oldNodeName string) error {
oldMembers := a.delegate.LANMembers()

if err := a.delegate.ReconnectSerfWithNewNodeName(oldNodeName); err != nil {
return err
}

joinAddrs := []string{}
for _, m := range oldMembers {
// We take 10 addresses out of the pool of known members to start joining the cluster.
if len(joinAddrs) < 10 && m.Name != oldNodeName && m.Status == serf.StatusAlive {
joinAddrs = append(joinAddrs, net.JoinHostPort(m.Addr.String(), fmt.Sprint(m.Port)))
}

if len(joinAddrs) == 10 {
break
}
}

// If the node knows nobody, then use the start addresses provided in the config.
if len(joinAddrs) == 0 {
joinAddrs = a.config.StartJoinAddrsLAN
}

if len(joinAddrs) > 0 {
// Only join LAN, not WAN because node name hot reload is only supported on agents.
if _, err := a.JoinLAN(joinAddrs); err != nil {
return err
}
}

// start retry join
go a.retryJoinLAN()
return nil
}

// Leave is used to prepare the agent for a graceful shutdown
func (a *Agent) Leave() error {
return a.delegate.Leave()
Expand Down Expand Up @@ -3617,6 +3655,17 @@ func (a *Agent) reloadConfigInternal(newCfg *config.RuntimeConfig) error {
return err
}

_, isServer := a.delegate.(*consul.Server)
nodeNameChanged := a.config.NodeName != newCfg.NodeName
oldNodeName := a.config.NodeName

if nodeNameChanged && !isServer {
a.config.NodeName = newCfg.NodeName
} else if nodeNameChanged && isServer {
a.logger.Warn("Dynamic reloading of the node name is not yet supported by nodes in server mode. " +
"The node name will remain until you restart the server.")
}

// create the config for the rpc server/client
consulCfg, err := newConsulConfig(a.config, a.logger)
if err != nil {
Expand All @@ -3627,6 +3676,20 @@ func (a *Agent) reloadConfigInternal(newCfg *config.RuntimeConfig) error {
return err
}

if nodeNameChanged && !isServer {
a.logger.Warn("Node name has been modified, leave and join with the new name is requested")

// Update the cached configuration, otherwise the state will keep propagating
// service registrations attached to the old nodes which produces an infinite
// loop of registration/deregistration of the old node.
a.State.SetConfig(LocalConfig(newCfg))

err := a.ReconnectSerfWithNewNodeName(oldNodeName)
if err != nil {
return fmt.Errorf("Unable to reconnect to Serf cluster with new name: %v", err)
}
}

if a.cache.ReloadOptions(newCfg.Cache) {
a.logger.Info("Cache options have been updated")
} else {
Expand Down
245 changes: 245 additions & 0 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3438,6 +3438,251 @@ func TestAgent_SecurityChecks(t *testing.T) {
assert.Contains(t, bytesBuffer.String(), "using enable-script-checks without ACLs and without allow_write_http_from is DANGEROUS")
}

func TestAgent_ReloadConfigNodeName(t *testing.T) {
t.Parallel()

t.Run("Agent node name reloaded if client", func(t *testing.T) {
dataDir := testutil.TempDir(t, "agent") // we manage the data dir
hcl := `
data_dir = "` + dataDir + `"
node_name = "name1"
server = false
bootstrap = false
`
a := NewTestAgent(t, hcl)
defer a.Shutdown()
require.Equal(t, "name1", a.config.NodeName)

hcl = `
data_dir = "` + dataDir + `"
node_name = "name2"
server = false
bootstrap = false
`
c := TestConfig(testutil.Logger(t), config.FileSource{Name: t.Name(), Format: "hcl", Data: hcl})
require.NoError(t, a.reloadConfigInternal(c))
require.Equal(t, "name2", a.config.NodeName)
})

t.Run("Agent node name not reloaded if server", func(t *testing.T) {
dataDir := testutil.TempDir(t, "agent") // we manage the data dir
hcl := `
data_dir = "` + dataDir + `"
node_name = "name1"
`
a := NewTestAgent(t, hcl)
defer a.Shutdown()
require.Equal(t, "name1", a.config.NodeName)

hcl = `
data_dir = "` + dataDir + `"
node_name = "name2"
`
c := TestConfig(testutil.Logger(t), config.FileSource{Name: t.Name(), Format: "hcl", Data: hcl})
require.NoError(t, a.reloadConfigInternal(c))
require.Equal(t, "name1", a.config.NodeName)
})
}

type MemberMatcher struct {
Name string
Status serf.MemberStatus
}

func AssertContainsMembers(t *testing.T, members []serf.Member, expMembers ...MemberMatcher) {
if len(members) != len(expMembers) {
require.Fail(t, fmt.Sprintf("Unexpected number of elements: %d items while %d expected", len(members), len(expMembers)))
}

for _, em := range expMembers {
exists := false
for _, m := range members {
if em.Name == m.Name && em.Status == m.Status {
exists = true
}

if exists {
break
}
}
if !exists {
assert.Fail(t, fmt.Sprintf("Element %s with status %d not found", em.Name, em.Status))
}
}
}

func TestAgent_ReloadConfigNodeName_JoinSerfWithStartJoinAddresses(t *testing.T) {
t.Parallel()
dataDir := testutil.TempDir(t, "agent") // we manage the data dir

a1 := NewTestAgent(t, `node_name = "server1"`)
defer a1.Shutdown()
testrpc.WaitForLeader(t, a1.RPC, "dc1")

addr := fmt.Sprintf("127.0.0.1:%d", a1.Config.SerfPortLAN)
hcl := `
data_dir = "` + dataDir + `"
node_name = "name1"
server = false
bootstrap = false
start_join = ["` + addr + `"]
`

a2 := NewTestAgent(t, hcl)
defer a2.Shutdown()

a2.JoinLAN([]string{addr})

AssertContainsMembers(t, a1.LANMembers(),
MemberMatcher{Name: "server1", Status: serf.StatusAlive},
MemberMatcher{Name: "name1", Status: serf.StatusAlive})

hcl = `
data_dir = "` + dataDir + `"
node_name = "name2"
server = false
bootstrap = false
start_join = ["` + addr + `"]
`
c := TestConfig(testutil.Logger(t), config.FileSource{Name: t.Name(), Format: "hcl", Data: hcl})
require.NoError(t, a2.reloadConfigInternal(c))

AssertContainsMembers(t, a1.LANMembers(),
MemberMatcher{Name: "server1", Status: serf.StatusAlive},
MemberMatcher{Name: "name1", Status: serf.StatusLeft},
MemberMatcher{Name: "name2", Status: serf.StatusAlive})
}

func TestAgent_ReloadConfigNodeName_JoinSerfWithKnownMembers(t *testing.T) {
t.Parallel()
dataDir1 := testutil.TempDir(t, "agent1") // we manage the data dir
dataDir2 := testutil.TempDir(t, "agent2") // we manage the data dir

a1 := NewTestAgent(t, `node_name = "server1"`)
defer a1.Shutdown()
testrpc.WaitForLeader(t, a1.RPC, "dc1")

a2 := NewTestAgent(t, `
data_dir = "`+dataDir1+`"
node_name = "name1"
server = false
bootstrap = false`)
defer a2.Shutdown()

a2.JoinLAN([]string{fmt.Sprintf("127.0.0.1:%d", a1.Config.SerfPortLAN)})

a3 := NewTestAgent(t, `
data_dir = "`+dataDir2+`"
node_name = "name2"
server = false
bootstrap = false`)
defer a3.Shutdown()

a3.JoinLAN([]string{fmt.Sprintf("127.0.0.1:%d", a2.Config.SerfPortLAN)})

testrpc.WaitForTestAgent(t, a2.RPC, "dc1")
testrpc.WaitForTestAgent(t, a3.RPC, "dc1")

retry.Run(t, func(r *retry.R) {
if got, want := len(a1.LANMembers()), 3; got != want {
r.Fatalf("got %d LAN members want at least %d", got, want)
}
})

AssertContainsMembers(t, a1.LANMembers(),
MemberMatcher{Name: "server1", Status: serf.StatusAlive},
MemberMatcher{Name: "name1", Status: serf.StatusAlive},
MemberMatcher{Name: "name2", Status: serf.StatusAlive})

hcl := `
data_dir = "` + dataDir2 + `"
node_name = "name3"
server = false
bootstrap = false`

c := TestConfig(testutil.Logger(t), config.FileSource{Name: t.Name(), Format: "hcl", Data: hcl})
require.NoError(t, a3.reloadConfigInternal(c))

retry.Run(t, func(r *retry.R) {
if got, want := len(a1.LANMembers()), 4; got != want {
r.Fatalf("got %d LAN members want at least %d", got, want)
}
})

AssertContainsMembers(t, a1.LANMembers(),
MemberMatcher{Name: "server1", Status: serf.StatusAlive},
MemberMatcher{Name: "name1", Status: serf.StatusAlive},
MemberMatcher{Name: "name2", Status: serf.StatusLeft},
MemberMatcher{Name: "name3", Status: serf.StatusAlive})
}

func TestAgent_ReloadConfigNodeName_StateIsKept(t *testing.T) {
t.Parallel()
dataDir := testutil.TempDir(t, "agent1") // we manage the data dir

a1 := NewTestAgent(t, `node_name = "server1"`)
defer a1.Shutdown()
testrpc.WaitForLeader(t, a1.RPC, "dc1")

a2 := NewTestAgent(t, `
data_dir = "`+dataDir+`"
node_name = "name1"
server = false
bootstrap = false`)
defer a2.Shutdown()

a2.JoinLAN([]string{fmt.Sprintf("127.0.0.1:%d", a1.Config.SerfPortLAN)})

testrpc.WaitForTestAgent(t, a2.RPC, "dc1")

srv := &structs.NodeService{
Service: "my_service",
ID: "my_service_id",
Port: 8100,
Address: "::5",
}

err := a2.AddService(srv, []*structs.CheckType{}, true, "", ConfigSourceRemote)
require.NoError(t, err)

health := &structs.HealthCheck{
Node: "foo",
CheckID: "http-check",
Name: "http-check",
Status: api.HealthCritical,
}
chk := &structs.CheckType{
CheckID: "http",
HTTP: "http://localhost:8080/mypath?query",
Interval: 20 * time.Millisecond,
TLSSkipVerify: true,
}
err = a2.AddCheck(health, chk, true, "", ConfigSourceLocal)
require.NoError(t, err)

hcl := `
data_dir = "` + dataDir + `"
node_name = "name2"
server = false
bootstrap = false`

c := TestConfig(testutil.Logger(t), config.FileSource{Name: t.Name(), Format: "hcl", Data: hcl})
require.NoError(t, a2.reloadConfigInternal(c))

retry.Run(t, func(r *retry.R) {
if got, want := len(a1.LANMembers()), 3; got != want {
r.Fatalf("got %d LAN members want at least %d", got, want)
}

// Verify that persisted services and checks are reloaded after the rename.
svc := getService(a2, srv.ID)
require.NotNil(r, svc, "missing service %q", srv.ID)

chk := getCheck(a2, health.CheckID)
require.NotNil(r, chk, "missing check %q", health.CheckID)
})
}

func TestAgent_ReloadConfigOutgoingRPCConfig(t *testing.T) {
t.Parallel()
dataDir := testutil.TempDir(t, "agent") // we manage the data dir
Expand Down
34 changes: 34 additions & 0 deletions agent/consul/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,37 @@ func (c *Client) Shutdown() error {
return nil
}

func (c *Client) ReconnectSerfWithNewNodeName(oldNodeName string) error {
c.logger.Info("client changing node name")

if err := c.Leave(); err != nil {
return fmt.Errorf("Failed to leave LAN Serf cluster: %v", err)
}

c.shutdownLock.Lock()
defer c.shutdownLock.Unlock()

// Leave the LAN pool
if c.serf != nil {
if err := c.serf.Shutdown(); err != nil {
return fmt.Errorf("Failed to shutdown serf member: %v", err)
}
}

c.logger.Info("Setup new membership with name", "nodename", c.config.NodeName)
s, err := c.setupSerf(c.config.SerfLANConfig, c.eventCh, serfLANSnapshot)
if err != nil {
c.Shutdown()
return fmt.Errorf("Failed to start lan serf: %v", err)
}
c.serf = s

// The event handler has been shut due to the call to Shutdown so we need to
// restart it to handle the events.
go c.lanEventHandler()
return nil
}

// Leave is used to prepare for a graceful shutdown
func (c *Client) Leave() error {
c.logger.Info("client starting leave")
Expand Down Expand Up @@ -405,5 +436,8 @@ func (c *Client) GetLANCoordinate() (lib.CoordinateSet, error) {
// relevant configuration information
func (c *Client) ReloadConfig(config *Config) error {
c.rpcLimiter.Store(rate.NewLimiter(config.RPCRate, config.RPCMaxBurst))

// The node name could have been changed thus impacting the membership within the serf cluster.
c.config.NodeName = config.NodeName
return nil
}
Loading

0 comments on commit c339387

Please sign in to comment.