From 2534c95fa357a7bd7b8f86b2e5217186047c84bd Mon Sep 17 00:00:00 2001 From: Alexandre Perrin Date: Fri, 22 Oct 2021 10:28:40 +0200 Subject: [PATCH] tasks: create taskResult to be returned by Drain Before this patch, the workerpool would keep a reference to the task struct even after its task func it has completed to provide the resulting error (if any) when Drain() is called. Since the task struct has a reference to the user-provided provided task func, the task func cannot be garbage collected until Drain() is called. This could be problematic as (1) the number of task is unbounded and (2) the workerpool has no control over the memory used by the task func. This patch introduce a taskResult struct satisfying the Task interface that doesn't keep a reference to the task func. Signed-off-by: Alexandre Perrin --- task.go | 18 +++++++++++------- workerpool.go | 5 +++-- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/task.go b/task.go index 1ac4aa5..55d1ea2 100644 --- a/task.go +++ b/task.go @@ -31,19 +31,23 @@ type Task interface { type task struct { id string run func(context.Context) error +} + +type taskResult struct { + id string err error } -// Ensure that task implements the Task interface. -var _ Task = &task{} +// Ensure that taskResult implements the Task interface. +var _ Task = &taskResult{} -// String implements fmt.Stringer for task. -func (t *task) String() string { +// String implements fmt.Stringer for taskResult. +func (t *taskResult) String() string { return t.id } -// Err returns the error resulting from processing the task. It ensures that -// the task struct implements the Task interface. -func (t *task) Err() error { +// Err returns the error resulting from processing the taskResult. It ensures +// that the taskResult struct implements the Task interface. +func (t *taskResult) Err() error { return t.err } diff --git a/workerpool.go b/workerpool.go index 28d0b45..da289c5 100644 --- a/workerpool.go +++ b/workerpool.go @@ -165,11 +165,12 @@ func (wp *WorkerPool) Close() error { func (wp *WorkerPool) run(ctx context.Context) { for t := range wp.tasks { t := t - wp.results = append(wp.results, t) + result := taskResult{id: t.id} + wp.results = append(wp.results, &result) wp.workers <- struct{}{} go func() { defer wp.wg.Done() - t.err = t.run(ctx) + result.err = t.run(ctx) <-wp.workers }() }