From 29d5ba46f7361020904216e0686657fd775124c5 Mon Sep 17 00:00:00 2001 From: Vladimir Vivien Date: Sat, 14 Aug 2021 17:50:21 -0400 Subject: [PATCH] Command Builder This patch implements command-builder which allows multiple command strings to be batched, then executed using one of several execution policies, including - in series - concurrent - piped (not ready) - exit on error - continue on error Changes also include refactor and updated tests. --- exec/builder.go | 143 +++++++++++++++++++++++++++++++ exec/builder_test.go | 194 +++++++++++++++++++++++++++++++++++++++++++ exec/proc.go | 49 ++++++++--- exec/proc_test.go | 10 +++ procs_test.go | 12 ++- 5 files changed, 392 insertions(+), 16 deletions(-) create mode 100644 exec/builder.go create mode 100644 exec/builder_test.go diff --git a/exec/builder.go b/exec/builder.go new file mode 100644 index 0000000..8b726ea --- /dev/null +++ b/exec/builder.go @@ -0,0 +1,143 @@ +package exec + +import ( + "sync" +) + +type CommandPolicy byte + +const ( + CmdOnErrContinue CommandPolicy = 1 << iota + CmdOnErrExit + CmdExecSerial + CmdExecConcurrent + CmdExecPipe +) + +type CommandProcs struct { + procs []*Proc +} +type CommandBuilder struct { + cmdPolicy CommandPolicy + procs []*Proc + procChan chan *Proc +} + +// Commands creates a *CommandBuilder used to collect +// command strings to be executed. +func Commands(cmds ...string) *CommandBuilder { + cb := new(CommandBuilder) + for _, cmd := range cmds { + cb.procs = append(cb.procs, NewProc(cmd)) + } + return cb +} + +// WithPolicy sets one or more command policy mask values, i.e. (CmdOnErrContinue | CmdExecConcurrent) +func (cb *CommandBuilder) WithPolicy(policyMask CommandPolicy) *CommandBuilder { + cb.cmdPolicy = policyMask + return cb +} + +// Add adds a new command string to the builder +func (cb *CommandBuilder) Add(cmds ...string) *CommandBuilder { + for _, cmd := range cmds { + cb.procs = append(cb.procs, NewProc(cmd)) + } + return cb +} + +// Run is a shortcut for executing the procs serially: +// +// cb.WithPolicy(CmdOnErrContinue).Start().Wait() +// +func (cb *CommandBuilder) Run() CommandProcs { + return cb.WithPolicy(CmdOnErrContinue).Start().Wait() +} + +// ConcurRun is a shortcut for executing procs concurrently: +// +// cb.WithPolicy(CmdExecConcurrent).Start().Wait() +// +func (cb *CommandBuilder) ConcurRun() CommandProcs { + return cb.WithPolicy(CmdOnErrContinue | CmdExecConcurrent).Start().Wait() +} + +// Start starts running the registered procs serially and returns immediately. +// This should be followed by a call to Wait to retrieve results. +func (cb *CommandBuilder) Start() *CommandBuilder { + if len(cb.procs) == 0 { + return cb + } + + cb.procChan = make(chan *Proc, len(cb.procs)) + switch { + case hasPolicy(cb.cmdPolicy, CmdExecConcurrent): + // launch each command in its own goroutine + go func() { + defer close(cb.procChan) + var gate sync.WaitGroup + for _, proc := range cb.procs { + gate.Add(1) + go func(wg *sync.WaitGroup, ch chan<- *Proc, p *Proc) { + defer wg.Done() + ch <- p.Start() + }(&gate, cb.procChan, proc) + } + // wait for procs to launch + gate.Wait() + }() + + case hasPolicy(cb.cmdPolicy, CmdExecPipe): + // pipe successive commands serially + go func(ch chan<- *Proc) { + defer close(cb.procChan) + if len(cb.procs) == 1 { + ch <- cb.procs[0].Start() + return + } + }(cb.procChan) + default: + // launch all procs (serially), return immediately + go func(ch chan<- *Proc) { + defer close(cb.procChan) + for _, proc := range cb.procs { + ch <- proc.Start() + } + }(cb.procChan) + } + return cb +} + +func (cb *CommandBuilder) Wait() CommandProcs { + if len(cb.procs) == 0 || cb.procChan == nil { + return CommandProcs{procs: []*Proc{}} + } + + var result CommandProcs + for proc := range cb.procChan { + result.procs = append(result.procs, proc) + + // check for start err + if proc.Err() != nil { + if hasPolicy(cb.cmdPolicy, CmdOnErrExit) { + break + } + } + + // wait for command to complete + if err := proc.Wait().Err(); err != nil { + if hasPolicy(cb.cmdPolicy, CmdOnErrExit) { + break + } + } + } + return result +} + +func hasPolicy(mask, pol CommandPolicy) bool { + return (mask & pol) != 0 +} + +// TODO - add termination methods +// - Pipe() - Runs each command, piping result of prev command into std input of next command diff --git a/exec/builder_test.go b/exec/builder_test.go new file mode 100644 index 0000000..42ab988 --- /dev/null +++ b/exec/builder_test.go @@ -0,0 +1,194 @@ +package exec + +import ( + "testing" +) + +func TestCommandBuilder(t *testing.T) { + tests := []struct { + name string + initCmds []string + additionalCmds []string + }{ + {name: "no procs"}, + { + name: "initial procs only", + initCmds: []string{"echo 'hello world'", "date", "ls -al"}, + }, + { + name: "initial and one additional", + initCmds: []string{"echo 'hello world'", "date", "ls -al"}, + additionalCmds: []string{"git commit --signoff"}, + }, + { + name: "initial and multiple additional", + initCmds: []string{"echo 'hello world'", "date", "ls -al"}, + additionalCmds: []string{"git commit --signoff", "history", "man time", "man man"}, + }, + { + name: "no initial multiple additional", + additionalCmds: []string{"git commit --signoff", "history", "man time", "man man"}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + c := Commands(test.initCmds...) + if len(test.initCmds) != len(c.procs) { + t.Error("unexpected command count in CommandBuilder") + } + c.Add(test.additionalCmds...) + if (len(test.initCmds) + len(test.additionalCmds)) != len(c.procs) { + t.Error("procs are not added to builder properly") + } + }) + } +} + +func TestCommandBuilder_Run(t *testing.T) { + tests := []struct { + name string + commands []string + expectedCmds int + }{ + { + name: "zero procs", + }, + { + name: "no error in procs", + commands: []string{"echo 'hello world'", "date", "ls -al"}, + expectedCmds: 3, + }, + { + name: "continue on 1 error", + commands: []string{"foobar", "echo 'hello world'", "date", "ls -al"}, + expectedCmds: 4, + }, + { + name: "continue on 2 errors", + commands: []string{"foobar", "echo 'hello world'", "daftpunk", "date", "ls -al"}, + expectedCmds: 5, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + c := Commands(test.commands...).Run() + if len(c.procs) != test.expectedCmds { + t.Errorf("expecting %d procs to run, got %d", test.expectedCmds, len(c.procs)) + } + }) + } +} + +func TestCommandBuilder_ConcurRun(t *testing.T) { + tests := []struct { + name string + commands []string + expectedCmds int + }{ + { + name: "zero procs", + }, + { + name: "no error in procs", + commands: []string{"echo 'hello world'", "date", "ls -al"}, + expectedCmds: 3, + }, + { + name: "continue on 1 error", + commands: []string{"foobar", "echo 'hello world'", "date", "ls -al"}, + expectedCmds: 4, + }, + { + name: "continue on 2 errors", + commands: []string{"foobar", "echo 'hello world'", "daftpunk", "date", "ls -al"}, + expectedCmds: 5, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + c := Commands(test.commands...).ConcurRun() + if len(c.procs) != test.expectedCmds { + t.Errorf("expecting %d procs to run, got %d", test.expectedCmds, len(c.procs)) + } + }) + } +} + +func TestCommandBuilder_StartWait(t *testing.T) { + tests := []struct { + name string + commands []string + policy CommandPolicy + expectedCmds int + }{ + { + name: "zero procs", + }, + { + name: "no error in procs", + commands: []string{"echo 'hello world'", "date", "ls -al"}, + policy: CmdOnErrContinue, + expectedCmds: 3, + }, + { + name: "continue on 1 error", + commands: []string{"foobar", "echo 'hello world'", "date", "ls -al"}, + policy: CmdOnErrContinue, + expectedCmds: 4, + }, + { + name: "break on 1 error", + commands: []string{"foobar", "echo 'hello world'", "date", "ls -al"}, + policy: CmdOnErrExit, + expectedCmds: 1, + }, + { + name: "continue on 2 errors", + commands: []string{"foobar", "echo 'hello world'", "daftpunk", "date", "ls -al"}, + policy: CmdOnErrContinue, + expectedCmds: 5, + }, + { + name: "break on 2 errors", + commands: []string{"foobar", "echo 'hello world'", "daftpunk", "date", "ls -al"}, + policy: CmdOnErrExit, + expectedCmds: 1, + }, + { + name: "concurrently no errors", + commands: []string{"echo 'hello world'", "date", "ls -al"}, + policy: CmdExecConcurrent, + expectedCmds: 3, + }, + { + name: "concurrent 1 error", + commands: []string{"foobar", "echo 'hello world'", "date", "ls -al"}, + policy: CmdExecConcurrent, + expectedCmds: 4, + }, + { + name: "continue on 2 errors", + commands: []string{"foobar", "echo 'hello world'", "daftpunk", "date", "ls -al"}, + policy: CmdExecConcurrent, + expectedCmds: 5, + }, + { + name: "Concurr|Continue with 1 err", + commands: []string{"man cat", "echo 'hello world'", "foo", "ls -al"}, + policy: CmdOnErrContinue | CmdExecConcurrent, + expectedCmds: 4, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + c := Commands(test.commands...).WithPolicy(test.policy).Start().Wait() + if len(c.procs) != test.expectedCmds { + t.Errorf("expecting %d procs to run, got %d", test.expectedCmds, len(c.procs)) + } + }) + } +} diff --git a/exec/proc.go b/exec/proc.go index 4d2f5ce..71f5368 100644 --- a/exec/proc.go +++ b/exec/proc.go @@ -23,9 +23,9 @@ type Proc struct { process *os.Process } -// StartProc creates a *Proc and starts an OS process but does not wait for -// it to complete (use proc.Wait for that) -func StartProc(cmdStr string) *Proc { +// NewProc sets up command string to be started as an OS process, however +// does not start the process. +func NewProc(cmdStr string) *Proc { words, err := parse(cmdStr) if err != nil { return &Proc{err: err} @@ -40,25 +40,28 @@ func StartProc(cmdStr string) *Proc { return &Proc{err: fmt.Errorf("%s; %s", outerr, errerr)} } - if err := command.Start(); err != nil { - return &Proc{cmd: command, err: err} - } - return &Proc{ - id: command.Process.Pid, cmd: command, - process: command.Process, - state: command.ProcessState, stdoutPipe: pipeout, stderrPipe: pipeerr, output: output, } } +// StartProc sets up and starts an OS process, but does not wait for +// it to complete (use proc.Wait for that) +func StartProc(cmdStr string) *Proc { + proc := NewProc(cmdStr) + if proc.Err() != nil { + return proc + } + return proc.Start() +} + // RunProc creates, runs, and waits for a process to complete and // return *Proc with result info. This call must be followed by -// Proc.Result() to access to a string value of the process result. -// NOTE: using proc.Out() to access the reader will be empty. +// Proc.Result() to access command result as a string value. +// NOTE: using proc.Out(), after this call, will be empty. func RunProc(cmdStr string) *Proc { proc := StartProc(cmdStr) if proc.Err() != nil { @@ -92,7 +95,27 @@ func Run(cmdStr string) (result string) { return proc.Result() } -// Command returns the os/exec.Cmd that started the process +// Start starts the associated command as an OS process +// Errors can be accessed using p.Err() +func (p *Proc) Start() *Proc { + if p.cmd == nil { + p.err = fmt.Errorf("cmd is nill") + return p + } + + if err := p.cmd.Start(); err != nil { + p.err = err + return p + } + + p.process = p.cmd.Process + p.id = p.cmd.Process.Pid + p.state = p.cmd.ProcessState + + return p +} + +// Commands returns the os/exec.Cmd that started the process func (p *Proc) Command() *osexec.Cmd { return p.cmd } diff --git a/exec/proc_test.go b/exec/proc_test.go index 58bd236..4e6a814 100644 --- a/exec/proc_test.go +++ b/exec/proc_test.go @@ -116,6 +116,16 @@ func TestProc(t *testing.T) { } }, }, + { + name: "bad command", + cmdStr: `foobar "HELLO WORLD!"`, + exec: func(cmd string) { + result := RunProc(cmd) + if result.Err() != nil { + t.Log(result.Err()) + } + }, + }, } for _, test := range tests { diff --git a/procs_test.go b/procs_test.go index aa49f87..9d590ad 100644 --- a/procs_test.go +++ b/procs_test.go @@ -5,6 +5,7 @@ import ( "io" "regexp" "strings" + "sync" "testing" ) @@ -59,16 +60,21 @@ func TestEchoRun(t *testing.T) { t.Fatal(p.Err()) } + var mtex sync.RWMutex buf := &bytes.Buffer{} - go func() { - if _, err := io.Copy(buf, p.StdOut()); err != nil { + go func(b *bytes.Buffer) { + defer mtex.Unlock() + mtex.Lock() + if _, err := io.Copy(b, p.StdOut()); err != nil { t.Error(err) } - }() + }(buf) p.Wait() + mtex.RLock() result := strings.TrimSpace(buf.String()) + mtex.RUnlock() result = spaces.ReplaceAllString(result, " ") if !strings.Contains(result, "HELLO WORLD!") { t.Fatal("Unexpected result:", result)