diff --git a/hub_test.go b/hub_test.go index c555d93f..af0b177e 100644 --- a/hub_test.go +++ b/hub_test.go @@ -258,6 +258,7 @@ func (suite *eventHubSuite) TestPartitioned() { tests := map[string]func(context.Context, *testing.T, *Hub, string){ "TestSend": testBasicSend, "TestSendTooBig": testSendTooBig, + "TestHandleError": testHandleError, "TestSendAndReceive": testBasicSendAndReceive, "TestBatchSendAndReceive": testBatchSendAndReceive, "TestBatchSendTooLarge": testBatchSendTooLarge, @@ -283,6 +284,7 @@ func (suite *eventHubSuite) TestWebSocket() { tests := map[string]func(context.Context, *testing.T, *Hub, string){ "TestSend": testBasicSend, "TestSendTooBig": testSendTooBig, + "TestHandleError": testHandleError, "TestSendAndReceive": testBasicSendAndReceive, "TestBatchSendAndReceive": testBatchSendAndReceive, "TestBatchSendTooLarge": testBatchSendTooLarge, @@ -391,6 +393,47 @@ func testBasicSendAndReceive(ctx context.Context, t *testing.T, client *Hub, par } } +func testHandleError(ctx context.Context, t *testing.T, client *Hub, partitionID string) { + numMessages := 1 + var wg sync.WaitGroup + wg.Add(numMessages) + + messages := make([]string, numMessages) + for i := 0; i < numMessages; i++ { + messages[i] = test.RandomString("hello", 10) + } + + for idx, message := range messages { + if !assert.NoError(t, client.Send(ctx, NewEventFromString(message), SendWithMessageID(fmt.Sprintf("%d", idx)))) { + assert.FailNow(t, "unable to send event") + } + } + + expectedFailures := 2 + processedCount := 0 + totalReceiveCount := 0 + _, err := client.Receive(ctx, partitionID, func(ctx context.Context, event *Event) error { + assert.Equal(t, messages[processedCount], string(event.Data)) + totalReceiveCount++ + if totalReceiveCount <= expectedFailures { + return fmt.Errorf("Failed processing %d (expected failures: %d)", totalReceiveCount, expectedFailures) + } + processedCount++ + wg.Done() + return nil + }, ReceiveWithPrefetchCount(100)) + if err != nil { + fmt.Printf("Received error %s\n", err.Error()) + } + assert.NoError(t, err) + + end, _ := ctx.Deadline() + waitUntil(t, &wg, time.Until(end)) + + assert.Equal(t, totalReceiveCount, expectedFailures+numMessages) + assert.Equal(t, processedCount, numMessages) +} + func (suite *eventHubSuite) TestEpochReceivers() { tests := map[string]func(context.Context, *testing.T, *Hub, []string, string){ "TestEpochGreaterThenLess": testEpochGreaterThenLess,