Skip to content

Commit

Permalink
tasks: create taskResult to be returned by Drain
Browse files Browse the repository at this point in the history
Before this patch, the workerpool would keep a reference to the task
struct even after its task func it has completed to provide the
resulting error (if any) when Drain() is called. Since the task struct
has a reference to the user-provided provided task func, the task func
cannot be garbage collected until Drain() is called.  This could be
problematic as (1) the number of task is unbounded and (2) the
workerpool has no control over the memory used by the task func.

This patch introduce a taskResult struct satisfying the Task interface
that doesn't keep a reference to the task func.

Signed-off-by: Alexandre Perrin <[email protected]>
  • Loading branch information
kaworu authored and rolinh committed Oct 22, 2021
1 parent dfc7446 commit 2534c95
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 9 deletions.
18 changes: 11 additions & 7 deletions task.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,23 @@ type Task interface {
type task struct {
id string
run func(context.Context) error
}

type taskResult struct {
id string
err error
}

// Ensure that task implements the Task interface.
var _ Task = &task{}
// Ensure that taskResult implements the Task interface.
var _ Task = &taskResult{}

// String implements fmt.Stringer for task.
func (t *task) String() string {
// String implements fmt.Stringer for taskResult.
func (t *taskResult) String() string {
return t.id
}

// Err returns the error resulting from processing the task. It ensures that
// the task struct implements the Task interface.
func (t *task) Err() error {
// Err returns the error resulting from processing the taskResult. It ensures
// that the taskResult struct implements the Task interface.
func (t *taskResult) Err() error {
return t.err
}
5 changes: 3 additions & 2 deletions workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,12 @@ func (wp *WorkerPool) Close() error {
func (wp *WorkerPool) run(ctx context.Context) {
for t := range wp.tasks {
t := t
wp.results = append(wp.results, t)
result := taskResult{id: t.id}
wp.results = append(wp.results, &result)
wp.workers <- struct{}{}
go func() {
defer wp.wg.Done()
t.err = t.run(ctx)
result.err = t.run(ctx)
<-wp.workers
}()
}
Expand Down

0 comments on commit 2534c95

Please sign in to comment.