diff --git a/cmd/p2psim/main.go b/cmd/p2psim/main.go index 451b0d942d..18bc3b888d 100644 --- a/cmd/p2psim/main.go +++ b/cmd/p2psim/main.go @@ -41,9 +41,13 @@ import ( "fmt" "io" "os" + "os/signal" "strings" + "syscall" "text/tabwriter" + "time" + "github.com/ethereum/go-ethereum/cmd/utils" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" @@ -102,6 +106,22 @@ func main() { Usage: "load a network snapshot from stdin", Action: loadSnapshot, }, + { + Name: "network", + Usage: "manage the simulation network", + Subcommands: []*cli.Command{ + { + Name: "start", + Usage: "start all nodes in the network", + Action: startNetwork, + }, + { + Name: "peer-stats", + Usage: "show peer stats", + Action: getNetworkPeerStats, + }, + }, + }, { Name: "node", Usage: "manage simulation nodes", @@ -132,6 +152,74 @@ func main() { Value: "", Usage: "node private key (hex encoded)", }, + &cli.BoolFlag{ + Name: "sim.dialer", + Usage: "Use the simulation dialer", + }, + &cli.BoolFlag{ + Name: "fake.iplistener", + Usage: "Use the fake listener", + }, + &cli.BoolFlag{ + Name: "start", + Usage: "start the node after creation", + }, + utils.NoDiscoverFlag, + utils.DHTBucketSizeFlag, + utils.BootnodesFlag, + &cli.BoolFlag{ + Name: "autofill.bootnodes", + Usage: "autofill bootnodes with existing bootnodes from manager", + }, + }, + }, + { + Name: "create-multi", + Usage: "create a node", + Action: createMultiNode, + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "name", + Value: "", + Usage: "node name", + }, + &cli.IntFlag{ + Name: "count", + Value: 1, + Usage: "number of nodes to create", + }, + &cli.StringFlag{ + Name: "services", + Value: "", + Usage: "node services (comma separated)", + }, + &cli.BoolFlag{ + Name: "sim.dialer", + Usage: "Use the simulation dialer", + }, + &cli.BoolFlag{ + Name: "fake.iplistener", + Usage: "Use the fake listener", + }, + &cli.BoolFlag{ + Name: "start", + Usage: "start the node after creation", + }, + utils.NoDiscoverFlag, + utils.DHTBucketSizeFlag, + utils.BootnodesFlag, + &cli.BoolFlag{ + Name: "only.bootnode", + Usage: "only create bootnodes", + }, + &cli.DurationFlag{ + Name: "interval", + Usage: "create interval", + }, + &cli.BoolFlag{ + Name: "autofill.bootnodes", + Usage: "autofill bootnodes with existing bootnodes from manager", + }, }, }, { @@ -176,6 +264,29 @@ func main() { }, }, }, + { + Name: "peer-stats", + Usage: "show peer stats", + ArgsUsage: "", + Action: getNodePeerStats, + }, + }, + }, + { + Name: "log-stats", + Usage: "log peer stats to a CSV file", + Action: startLogStats, + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "file", + Usage: "output file", + Value: "stats.csv", + }, + &cli.DurationFlag{ + Name: "interval", + Usage: "log interval", + Value: 15 * time.Second, + }, }, }, } @@ -287,6 +398,28 @@ func createNode(ctx *cli.Context) error { config.ID = enode.PubkeyToIDV4(&privKey.PublicKey) config.PrivateKey = privKey } + if ctx.Bool(utils.NoDiscoverFlag.Name) { + config.NoDiscovery = true + } + if ctx.Bool("sim.dialer") { + config.UseTCPDialer = false + } else { + config.UseTCPDialer = true + } + if ctx.Bool("fake.iplistener") { + config.UseFakeIPListener = true + } + config.BootstrapNodeURLs = ctx.String(utils.BootnodesFlag.Name) + if ctx.Bool("autofill.bootnodes") { + bootnodeURLs, err := getBootnodes() + if err != nil { + return err + } + if bootnodeURLs != "" { + config.BootstrapNodeURLs += "," + bootnodeURLs + } + } + config.DHTBucketSize = ctx.Int(utils.DHTBucketSizeFlag.Name) if services := ctx.String("services"); services != "" { config.Lifecycles = strings.Split(services, ",") } @@ -295,6 +428,75 @@ func createNode(ctx *cli.Context) error { return err } fmt.Fprintln(ctx.App.Writer, "Created", node.Name) + + // Start node if needed + if ctx.Bool("start") { + if err := client.StartNode(node.Name); err != nil { + return err + } + fmt.Fprintln(ctx.App.Writer, "Started", node.Name) + } + + return nil +} + +func getBootnodes() (string, error) { + nodes, err := client.GetNodes() + if err != nil { + return "", err + } + + bootnodes := make([]string, 0) + for _, node := range nodes { + if strings.HasPrefix(node.Name, "bootnode") { + bootnodes = append(bootnodes, node.Enode) + } + } + + return strings.Join(bootnodes, ","), nil +} + +func createMultiNode(ctx *cli.Context) error { + if ctx.Args().Len() != 0 { + return cli.ShowCommandHelp(ctx, ctx.Command.Name) + } + + t := time.Now() + + onlyCreateBootnode := ctx.Bool("only.bootnode") + createInterval := ctx.Duration("interval") + bootNodeURLs := ctx.String(utils.BootnodesFlag.Name) + if ctx.Bool("autofill.bootnodes") { + existedBootnodeURLs, err := getBootnodes() + if err != nil { + return err + } + if existedBootnodeURLs != "" { + bootNodeURLs += "," + existedBootnodeURLs + } + } + + // Overwrite the bootnodes flag if not create bootnodes + if !onlyCreateBootnode { + ctx.Set(utils.BootnodesFlag.Name, bootNodeURLs) + } + + // Create nodes + count := ctx.Int("count") + for i := 0; i < count; i++ { + nodeName := fmt.Sprintf("node-%d-%d", t.Unix(), i) + if onlyCreateBootnode { + nodeName = fmt.Sprintf("bootnode-%d-%d", t.Unix(), i) + } + ctx.Set("name", nodeName) + if err := createNode(ctx); err != nil { + return err + } + if createInterval > 0 { + time.Sleep(createInterval) + } + } + return nil } @@ -429,3 +631,91 @@ func rpcSubscribe(client *rpc.Client, out io.Writer, method string, args ...stri } } } + +func startNetwork(ctx *cli.Context) error { + if ctx.Args().Len() != 0 { + return cli.ShowCommandHelp(ctx, ctx.Command.Name) + } + if err := client.StartNetwork(); err != nil { + return err + } + fmt.Fprintln(ctx.App.Writer, "Started network") + return nil +} + +func getNodePeerStats(ctx *cli.Context) error { + if ctx.Args().Len() != 1 { + return cli.ShowCommandHelp(ctx, ctx.Command.Name) + } + nodeName := ctx.Args().Get(0) + stats, err := client.GetNodePeerStats(nodeName) + if err != nil { + return err + } + fmt.Fprintln(ctx.App.Writer, "Peer stats of", ctx.String("node")) + fmt.Fprintln(ctx.App.Writer, "Peer count: ", stats.PeerCount) + fmt.Fprintln(ctx.App.Writer, "Tried: ", stats.Tried) + fmt.Fprintln(ctx.App.Writer, "Failed: ", stats.Failed) + fmt.Fprintln(ctx.App.Writer, "Nodes count: ", stats.DifferentNodesDiscovered) + fmt.Fprintln(ctx.App.Writer, "DHT: ", stats.DHTBuckets) + return nil +} + +func getNetworkPeerStats(ctx *cli.Context) error { + if ctx.Args().Len() != 0 { + return cli.ShowCommandHelp(ctx, ctx.Command.Name) + } + stats, err := client.GetAllNodePeerStats() + if err != nil { + return err + } + for nodeID, stats := range stats { + fmt.Fprintln(ctx.App.Writer, "Peer stats of", nodeID) + fmt.Fprintln(ctx.App.Writer, "Peer count: ", stats.PeerCount) + fmt.Fprintln(ctx.App.Writer, "Tried: ", stats.Tried) + fmt.Fprintln(ctx.App.Writer, "Failed: ", stats.Failed) + fmt.Fprintln(ctx.App.Writer, "Nodes count: ", stats.DifferentNodesDiscovered) + fmt.Fprintln(ctx.App.Writer, "DHT: ", stats.DHTBuckets) + } + return nil +} + +func startLogStats(ctx *cli.Context) error { + if ctx.Args().Len() != 0 { + return cli.ShowCommandHelp(ctx, ctx.Command.Name) + } + csvFile := ctx.String("file") + f, err := os.OpenFile(csvFile, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644) + if err != nil { + return err + } + defer f.Close() + + sig := make(chan os.Signal, 1) + signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) + timer := time.NewTicker(ctx.Duration("interval")) + + f.WriteString("node,timestamp,type,value\n") + +loop: + for { + select { + case <-sig: + return nil + case <-timer.C: + stats, err := client.GetAllNodePeerStats() + if err != nil { + fmt.Fprintln(ctx.App.Writer, err) + goto loop + } + for nodeID, stats := range stats { + t := time.Now() + f.WriteString(fmt.Sprintf("%s,%d,%s,%d\n", nodeID, t.Unix(), "PeerCount", stats.PeerCount)) + f.WriteString(fmt.Sprintf("%s,%d,%s,%d\n", nodeID, t.Unix(), "Tried", stats.Tried)) + f.WriteString(fmt.Sprintf("%s,%d,%s,%d\n", nodeID, t.Unix(), "Failed", stats.Failed)) + f.WriteString(fmt.Sprintf("%s,%d,%s,%d\n", nodeID, t.Unix(), "DifferentNodesDiscovered", stats.DifferentNodesDiscovered)) + f.WriteString(fmt.Sprintf("%s,%d,%s,%d\n", nodeID, t.Unix(), "DHTBuckets", stats.DHTBuckets)) + } + } + } +} diff --git a/p2p/discover/v4_udp.go b/p2p/discover/v4_udp.go index bf1e9080a4..6024db4a6d 100644 --- a/p2p/discover/v4_udp.go +++ b/p2p/discover/v4_udp.go @@ -155,6 +155,21 @@ func ListenV4(c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) { return t, nil } +func (t *UDPv4) BucketSizes() []int { + if t == nil || t.tab == nil { + return []int{} + } + sizes := make([]int, len(t.tab.buckets)) + for i, bucket := range t.tab.buckets { + if bucket == nil { + sizes[i] = 0 + } else { + sizes[i] = len(bucket.entries) + } + } + return sizes +} + // Self returns the local node. func (t *UDPv4) Self() *enode.Node { return t.localNode.Node() diff --git a/p2p/server.go b/p2p/server.go index 2577a32218..5bf67483b1 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -308,6 +308,15 @@ func (c *conn) set(f connFlag, val bool) { } } +// SetListenFunc sets the function used to accept inbound connections (testing only) +func (srv *Server) SetListenFunc(f func(network, addr string) (net.Listener, error)) { + srv.listenFunc = f +} + +func (srv *Server) UDPv4() *discover.UDPv4 { + return srv.ntab +} + // LocalNode returns the local node record. func (srv *Server) LocalNode() *enode.LocalNode { return srv.localnode diff --git a/p2p/simulations/adapters/exec.go b/p2p/simulations/adapters/exec.go index 35ccdfb068..9ca7ba30e0 100644 --- a/p2p/simulations/adapters/exec.go +++ b/p2p/simulations/adapters/exec.go @@ -364,6 +364,11 @@ func (n *ExecNode) Snapshots() (map[string][]byte, error) { return snapshots, n.client.Call(&snapshots, "simulation_snapshot") } +// Empty PeerStats +func (n *ExecNode) PeerStats() *PeerStats { + return &PeerStats{} +} + // execNodeConfig is used to serialize the node configuration so it can be // passed to the child process as a JSON encoded environment variable type execNodeConfig struct { diff --git a/p2p/simulations/adapters/inproc.go b/p2p/simulations/adapters/inproc.go index 1cb26a8ea0..10b1b59382 100644 --- a/p2p/simulations/adapters/inproc.go +++ b/p2p/simulations/adapters/inproc.go @@ -21,8 +21,11 @@ import ( "errors" "fmt" "math" + "math/rand" "net" + "strings" "sync" + "time" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" @@ -91,14 +94,32 @@ func (s *SimAdapter) NewNode(config *NodeConfig) (Node, error) { return nil, err } + p2pCfg := p2p.Config{ + PrivateKey: config.PrivateKey, + MaxPeers: math.MaxInt32, + NoDiscovery: config.NoDiscovery, + EnableMsgEvents: config.EnableMsgEvents, + DHTBucketSize: config.DHTBucketSize, + } + if !config.DisableTCP { + p2pCfg.ListenAddr = fmt.Sprintf(":%d", config.Port) + } + if len(config.BootstrapNodeURLs) > 0 { + for _, url := range strings.Split(config.BootstrapNodeURLs, ",") { + if len(url) == 0 { + continue + } + n, err := enode.Parse(enode.ValidSchemes, url) + if err != nil { + log.Warn("invalid bootstrap node URL", "url", url, "err", err) + continue + } + p2pCfg.BootstrapNodes = append(p2pCfg.BootstrapNodes, n) + } + } + n, err := node.New(&node.Config{ - P2P: p2p.Config{ - PrivateKey: config.PrivateKey, - MaxPeers: math.MaxInt32, - NoDiscovery: true, - Dialer: s, - EnableMsgEvents: config.EnableMsgEvents, - }, + P2P: p2pCfg, ExternalSigner: config.ExternalSigner, Logger: log.New("node.id", id.String()), }) @@ -106,6 +127,10 @@ func (s *SimAdapter) NewNode(config *NodeConfig) (Node, error) { return nil, err } + if config.UseFakeIPListener { + n.Server().SetListenFunc(listenFakeAddrFunc) + } + simNode := &SimNode{ ID: id, config: config, @@ -113,6 +138,16 @@ func (s *SimAdapter) NewNode(config *NodeConfig) (Node, error) { adapter: s, running: make(map[string]node.Lifecycle), } + if !config.UseTCPDialer { + n.Server().Dialer = s + } else { + simNode.dialer = &wrapTCPDialerStats{ + d: &net.Dialer{Timeout: 15 * time.Second}, + resultCh: make(chan resultDial, 10000), + } + n.Server().Dialer = simNode.dialer + } + s.nodes[id] = simNode return simNode, nil } @@ -162,6 +197,8 @@ func (s *SimAdapter) GetNode(id enode.ID) (*SimNode, bool) { // net.Pipe (see SimAdapter.Dial), running devp2p protocols directly over that // pipe type SimNode struct { + ctx context.Context + cancel context.CancelFunc lock sync.RWMutex ID enode.ID config *NodeConfig @@ -170,6 +207,11 @@ type SimNode struct { running map[string]node.Lifecycle client *rpc.Client registerOnce sync.Once + dialer *wrapTCPDialerStats + + // Track different nodes discovered by the node + discoveredNodes sync.Map + differentNodeCount int } // Close closes the underlaying node.Node to release @@ -240,6 +282,15 @@ func (sn *SimNode) Snapshots() (map[string][]byte, error) { // Start registers the services and starts the underlying devp2p node func (sn *SimNode) Start(snapshots map[string][]byte) error { + sn.lock.Lock() + if sn.cancel != nil { + sn.lock.Unlock() + return errors.New("node already started") + } + + sn.ctx, sn.cancel = context.WithCancel(context.Background()) + sn.lock.Unlock() + // ensure we only register the services once in the case of the node // being stopped and then started again var regErr error @@ -282,6 +333,8 @@ func (sn *SimNode) Start(snapshots map[string][]byte) error { sn.client = client sn.lock.Unlock() + go sn.trackDiscoveredNode() + return nil } @@ -292,6 +345,10 @@ func (sn *SimNode) Stop() error { sn.client.Close() sn.client = nil } + if sn.cancel != nil { + sn.cancel() + sn.cancel = nil + } sn.lock.Unlock() return sn.node.Close() } @@ -351,3 +408,96 @@ func (sn *SimNode) NodeInfo() *p2p.NodeInfo { } return server.NodeInfo() } + +func (sn *SimNode) PeerStats() *PeerStats { + if sn.dialer == nil || sn.node.Server() == nil || sn.node.Server().UDPv4() == nil { + return &PeerStats{} + } + + nodesCount := 0 + sn.discoveredNodes.Range(func(_, _ interface{}) bool { + nodesCount++ + return true + }) + return &PeerStats{ + PeerCount: sn.node.Server().PeerCount(), + Failed: sn.dialer.failed, + Tried: sn.dialer.tried, + DifferentNodesDiscovered: nodesCount, + DHTBuckets: sn.node.Server().UDPv4().BucketSizes(), + } +} + +func (sn *SimNode) trackDiscoveredNode() { + if sn.dialer == nil { + return + } + + for { + select { + case <-sn.ctx.Done(): + return + case r := <-sn.dialer.resultCh: + if _, ok := sn.discoveredNodes.LoadOrStore(r.node, struct{}{}); !ok { + sn.differentNodeCount++ + } + if r.err != nil { + log.Info("dial failed", "node", r.node, "err", r.err) + sn.dialer.failed++ + } + log.Info("dial tried", "from", sn.ID, "to", r.node) + sn.dialer.tried++ + } + } +} + +func listenFakeAddrFunc(network, laddr string) (net.Listener, error) { + l, err := net.Listen(network, laddr) + if err != nil { + return nil, err + } + fakeAddr := &net.TCPAddr{IP: net.IP{byte(rand.Intn(255)), byte(rand.Intn(255)), byte(rand.Intn(255)), byte(rand.Intn(255))}, Port: rand.Intn(65535)} + return &fakeAddrListener{l, fakeAddr}, nil +} + +// fakeAddrListener is a listener that creates connections with a mocked remote address. +type fakeAddrListener struct { + net.Listener + remoteAddr net.Addr +} + +type fakeAddrConn struct { + net.Conn + remoteAddr net.Addr +} + +func (l *fakeAddrListener) Accept() (net.Conn, error) { + c, err := l.Listener.Accept() + if err != nil { + return nil, err + } + return &fakeAddrConn{c, l.remoteAddr}, nil +} + +func (c *fakeAddrConn) RemoteAddr() net.Addr { + return c.remoteAddr +} + +type wrapTCPDialerStats struct { + d *net.Dialer + failed int + tried int + resultCh chan resultDial +} + +type resultDial struct { + err error + node enode.ID +} + +func (d wrapTCPDialerStats) Dial(ctx context.Context, dest *enode.Node) (net.Conn, error) { + nodeAddr := &net.TCPAddr{IP: dest.IP(), Port: dest.TCP()} + conn, err := d.d.DialContext(ctx, "tcp", nodeAddr.String()) + d.resultCh <- resultDial{err, dest.ID()} + return conn, err +} diff --git a/p2p/simulations/adapters/types.go b/p2p/simulations/adapters/types.go index aeb8ef7772..28e89684d2 100644 --- a/p2p/simulations/adapters/types.go +++ b/p2p/simulations/adapters/types.go @@ -42,7 +42,6 @@ import ( // * SimNode - An in-memory node // * ExecNode - A child process node // * DockerNode - A Docker container node -// type Node interface { // Addr returns the node's address (e.g. an Enode URL) Addr() []byte @@ -65,6 +64,9 @@ type Node interface { // Snapshots creates snapshots of the running services Snapshots() (map[string][]byte, error) + + // PeerStats returns the node's peer statistics + PeerStats() *PeerStats } // NodeAdapter is used to create Nodes in a simulation network @@ -119,7 +121,8 @@ type NodeConfig struct { // function to sanction or prevent suggesting a peer Reachable func(id enode.ID) bool - Port uint16 + Port uint16 + DisableTCP bool // LogFile is the log file name of the p2p node at runtime. // @@ -131,34 +134,61 @@ type NodeConfig struct { // // The default verbosity is INFO. LogVerbosity log.Lvl + + // NoDiscovery disables the peer discovery mechanism (manual peer addition) + NoDiscovery bool + + // Use default TCP dialer + UseTCPDialer bool + + // UseFakeIPListener is used to fake the remote IP address when accepting incoming connections + UseFakeIPListener bool + + // DHTBucketSize is the bucket size of DHT + DHTBucketSize int + + // BootstrapNodes is the list of bootstrap nodes + BootstrapNodeURLs string } // nodeConfigJSON is used to encode and decode NodeConfig as JSON by encoding // all fields as strings type nodeConfigJSON struct { - ID string `json:"id"` - PrivateKey string `json:"private_key"` - Name string `json:"name"` - Lifecycles []string `json:"lifecycles"` - Properties []string `json:"properties"` - EnableMsgEvents bool `json:"enable_msg_events"` - Port uint16 `json:"port"` - LogFile string `json:"logfile"` - LogVerbosity int `json:"log_verbosity"` + ID string `json:"id"` + PrivateKey string `json:"private_key"` + Name string `json:"name"` + Lifecycles []string `json:"lifecycles"` + Properties []string `json:"properties"` + EnableMsgEvents bool `json:"enable_msg_events"` + Port uint16 `json:"port"` + DisableTCP bool `json:"disable_tcp"` + LogFile string `json:"logfile"` + LogVerbosity int `json:"log_verbosity"` + NoDiscovery bool `json:"no_discovery"` + UseTCPDialer bool `json:"use_tcp_dialer"` + UseFakeIPListener bool `json:"use_fake_ip_listener"` + DHTBucketSize int `json:"dht_bucket_size"` + BootstrapNodeURLs string `json:"bootstrap_node_urls"` } // MarshalJSON implements the json.Marshaler interface by encoding the config // fields as strings func (n *NodeConfig) MarshalJSON() ([]byte, error) { confJSON := nodeConfigJSON{ - ID: n.ID.String(), - Name: n.Name, - Lifecycles: n.Lifecycles, - Properties: n.Properties, - Port: n.Port, - EnableMsgEvents: n.EnableMsgEvents, - LogFile: n.LogFile, - LogVerbosity: int(n.LogVerbosity), + ID: n.ID.String(), + Name: n.Name, + Lifecycles: n.Lifecycles, + Properties: n.Properties, + Port: n.Port, + DisableTCP: n.DisableTCP, + EnableMsgEvents: n.EnableMsgEvents, + LogFile: n.LogFile, + LogVerbosity: int(n.LogVerbosity), + NoDiscovery: n.NoDiscovery, + UseTCPDialer: n.UseTCPDialer, + UseFakeIPListener: n.UseFakeIPListener, + DHTBucketSize: n.DHTBucketSize, + BootstrapNodeURLs: n.BootstrapNodeURLs, } if n.PrivateKey != nil { confJSON.PrivateKey = hex.EncodeToString(crypto.FromECDSA(n.PrivateKey)) @@ -196,9 +226,15 @@ func (n *NodeConfig) UnmarshalJSON(data []byte) error { n.Lifecycles = confJSON.Lifecycles n.Properties = confJSON.Properties n.Port = confJSON.Port + n.DisableTCP = confJSON.DisableTCP n.EnableMsgEvents = confJSON.EnableMsgEvents n.LogFile = confJSON.LogFile n.LogVerbosity = log.Lvl(confJSON.LogVerbosity) + n.NoDiscovery = confJSON.NoDiscovery + n.UseTCPDialer = confJSON.UseTCPDialer + n.UseFakeIPListener = confJSON.UseFakeIPListener + n.DHTBucketSize = confJSON.DHTBucketSize + n.BootstrapNodeURLs = confJSON.BootstrapNodeURLs return nil } @@ -324,3 +360,12 @@ func (n *NodeConfig) initEnode(ip net.IP, tcpport int, udpport int) error { func (n *NodeConfig) initDummyEnode() error { return n.initEnode(net.IPv4(127, 0, 0, 1), int(n.Port), 0) } + +// PeerStats is a struct that holds the statistics of a node's peers +type PeerStats struct { + PeerCount int `json:"peer_count"` + Tried int `json:"tried"` + DifferentNodesDiscovered int `json:"different_nodes_discovered"` + Failed int `json:"failed"` + DHTBuckets []int `json:"dht_buckets"` +} diff --git a/p2p/simulations/dht/dht.go b/p2p/simulations/dht/dht.go new file mode 100644 index 0000000000..e38b25d723 --- /dev/null +++ b/p2p/simulations/dht/dht.go @@ -0,0 +1,52 @@ +package main + +import ( + "flag" + "fmt" + "net/http" + "os" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p/simulations" + "github.com/ethereum/go-ethereum/p2p/simulations/adapters" +) + +var ( + verbosity = flag.Int("verbosity", 3, "logging verbosity") + port = flag.Int("port", 8888, "port to listen on") +) + +func main() { + flag.Parse() + + log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*verbosity), log.StreamHandler(os.Stdout, log.TerminalFormat(false)))) + + services := map[string]adapters.LifecycleConstructor{ + "empty": func(ctx *adapters.ServiceContext, stack *node.Node) (node.Lifecycle, error) { + return &emptyService{}, nil + }, + } + adapters.RegisterLifecycles(services) + + adapter := adapters.NewSimAdapter(services) + + log.Info("starting simulation server", "port", *port) + network := simulations.NewNetwork(adapter, &simulations.NetworkConfig{ + DefaultService: "empty", + }) + if err := http.ListenAndServe(fmt.Sprintf(":%d", *port), simulations.NewServer(network)); err != nil { + log.Crit("error starting simulation server", "err", err) + } +} + +type emptyService struct { +} + +func (s *emptyService) Start() error { + return nil +} + +func (s *emptyService) Stop() error { + return nil +} diff --git a/p2p/simulations/dht/dht.sh b/p2p/simulations/dht/dht.sh new file mode 100755 index 0000000000..9dec902fe1 --- /dev/null +++ b/p2p/simulations/dht/dht.sh @@ -0,0 +1,39 @@ +#!/bin/bash + +main_cmd="go run ." +p2psim_cmd="go run ../../../cmd/p2psim" + +bench_with_bucket_size() { + local bucket_size=$1 + local sleep_time=$2 + + pids=() + echo "Start server $bucket_size..." + $main_cmd > tmp_$bucket_size.log & + echo "Start stats $bucket_size..." + $p2psim_cmd log-stats --file ./stats_$bucket_size.csv & + + echo "Start bootnodes $bucket_size..." + $p2psim_cmd node create-multi --count 2 --fake.iplistener --start --dht.bucketsize $bucket_size --only.bootnode + + $p2psim_cmd node create-multi --count 100 --fake.iplistener --start --dht.bucketsize 16 --autofill.bootnodes --interval 2s + echo "Sleep $sleep_time..." + sleep $sleep_time + + $p2psim_cmd node create-multi --count 100 --fake.iplistener --start --dht.bucketsize 16 --autofill.bootnodes --interval 2s + echo "Sleep $sleep_time..." + sleep $sleep_time + + echo "Kill server and stats $bucket_size..." + kill -9 $(lsof -t -i:8888) + ps aux | grep $p2psim_cmd | grep -v "grep" | awk '{print $2}' | xargs kill -9 + + sleep 10 +} + +bench_with_bucket_size 8 1200 +bench_with_bucket_size 16 1200 +bench_with_bucket_size 32 1200 +bench_with_bucket_size 64 1200 +bench_with_bucket_size 128 1200 +bench_with_bucket_size 256 1200 \ No newline at end of file diff --git a/p2p/simulations/http.go b/p2p/simulations/http.go index 27ed5b75d2..1f4a1f81bb 100644 --- a/p2p/simulations/http.go +++ b/p2p/simulations/http.go @@ -211,6 +211,18 @@ func (c *Client) RPCClient(ctx context.Context, nodeID string) (*rpc.Client, err return rpc.DialWebsocket(ctx, fmt.Sprintf("%s/nodes/%s/rpc", baseURL, nodeID), "") } +// GetNodePeerStats returns the peer stats of a node +func (c *Client) GetNodePeerStats(nodeID string) (*adapters.PeerStats, error) { + stats := &adapters.PeerStats{} + return stats, c.Get(fmt.Sprintf("/peerstats/%s", nodeID), stats) +} + +// GetAllNodePeerStats returns the peer stats of all nodes +func (c *Client) GetAllNodePeerStats() (map[string]*adapters.PeerStats, error) { + stats := make(map[string]*adapters.PeerStats) + return stats, c.Get("/peerstats", &stats) +} + // Get performs a HTTP GET request decoding the resulting JSON response // into "out" func (c *Client) Get(path string, out interface{}) error { @@ -296,6 +308,8 @@ func NewServer(network *Network) *Server { s.POST("/nodes/:nodeid/conn/:peerid", s.ConnectNode) s.DELETE("/nodes/:nodeid/conn/:peerid", s.DisconnectNode) s.GET("/nodes/:nodeid/rpc", s.NodeRPC) + s.GET("/peerstats/:nodeid", s.GetNodePeerStats) + s.GET("/peerstats", s.GetAllNodePeerStats) return s } @@ -642,6 +656,22 @@ func (s *Server) DisconnectNode(w http.ResponseWriter, req *http.Request) { s.JSON(w, http.StatusOK, node.NodeInfo()) } +// GetNodePeerStats returns the peer stats of a node +func (s *Server) GetNodePeerStats(w http.ResponseWriter, req *http.Request) { + node := req.Context().Value("node").(*Node) + s.JSON(w, http.StatusOK, node.PeerStats()) +} + +// GetAllNodePeerStats returns the peer stats of all nodes +func (s *Server) GetAllNodePeerStats(w http.ResponseWriter, req *http.Request) { + stats := make(map[string]*adapters.PeerStats) + for _, node := range s.network.GetNodes() { + stats[node.Config.Name] = node.PeerStats() + } + + s.JSON(w, http.StatusOK, stats) +} + // Options responds to the OPTIONS HTTP method by returning a 200 OK response // with the "Access-Control-Allow-Headers" header set to "Content-Type" func (s *Server) Options(w http.ResponseWriter, req *http.Request) { diff --git a/p2p/simulations/http_test.go b/p2p/simulations/http_test.go index 6d7f0b6d7a..b4d504463f 100644 --- a/p2p/simulations/http_test.go +++ b/p2p/simulations/http_test.go @@ -621,6 +621,8 @@ func TestHTTPSnapshot(t *testing.T) { nodes := make([]*p2p.NodeInfo, nodeCount) for i := 0; i < nodeCount; i++ { config := adapters.RandomNodeConfig() + // No need to use TCP for this test + config.DisableTCP = true node, err := client.CreateNode(config) if err != nil { t.Fatalf("error creating node: %s", err) diff --git a/p2p/simulations/network_test.go b/p2p/simulations/network_test.go index d5651441a2..a3a428e07b 100644 --- a/p2p/simulations/network_test.go +++ b/p2p/simulations/network_test.go @@ -64,6 +64,8 @@ func TestSnapshot(t *testing.T) { ids := make([]enode.ID, nodeCount) for i := 0; i < nodeCount; i++ { conf := adapters.RandomNodeConfig() + // No need to use TCP for this test + conf.DisableTCP = true node, err := network.NewNodeWithConfig(conf) if err != nil { t.Fatalf("error creating node: %s", err)