Skip to content

Commit

Permalink
Add test for message processing error handling (Azure#168)
Browse files Browse the repository at this point in the history
  • Loading branch information
askingcat committed Mar 23, 2020
1 parent bc2987d commit b8f9eff
Showing 1 changed file with 43 additions and 0 deletions.
43 changes: 43 additions & 0 deletions hub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ func (suite *eventHubSuite) TestPartitioned() {
tests := map[string]func(context.Context, *testing.T, *Hub, string){
"TestSend": testBasicSend,
"TestSendTooBig": testSendTooBig,
"TestHandleError": testHandleError,
"TestSendAndReceive": testBasicSendAndReceive,
"TestBatchSendAndReceive": testBatchSendAndReceive,
"TestBatchSendTooLarge": testBatchSendTooLarge,
Expand All @@ -283,6 +284,7 @@ func (suite *eventHubSuite) TestWebSocket() {
tests := map[string]func(context.Context, *testing.T, *Hub, string){
"TestSend": testBasicSend,
"TestSendTooBig": testSendTooBig,
"TestHandleError": testHandleError,
"TestSendAndReceive": testBasicSendAndReceive,
"TestBatchSendAndReceive": testBatchSendAndReceive,
"TestBatchSendTooLarge": testBatchSendTooLarge,
Expand Down Expand Up @@ -391,6 +393,47 @@ func testBasicSendAndReceive(ctx context.Context, t *testing.T, client *Hub, par
}
}

func testHandleError(ctx context.Context, t *testing.T, client *Hub, partitionID string) {
numMessages := 1
var wg sync.WaitGroup
wg.Add(numMessages)

messages := make([]string, numMessages)
for i := 0; i < numMessages; i++ {
messages[i] = test.RandomString("hello", 10)
}

for idx, message := range messages {
if !assert.NoError(t, client.Send(ctx, NewEventFromString(message), SendWithMessageID(fmt.Sprintf("%d", idx)))) {
assert.FailNow(t, "unable to send event")
}
}

expectedFailures := 2
processedCount := 0
totalReceiveCount := 0
_, err := client.Receive(ctx, partitionID, func(ctx context.Context, event *Event) error {
assert.Equal(t, messages[processedCount], string(event.Data))
totalReceiveCount++
if totalReceiveCount <= expectedFailures {
return fmt.Errorf("Failed processing %d (expected failures: %d)", totalReceiveCount, expectedFailures)
}
processedCount++
wg.Done()
return nil
}, ReceiveWithPrefetchCount(100))
if err != nil {
fmt.Printf("Received error %s\n", err.Error())
}
assert.NoError(t, err)

end, _ := ctx.Deadline()
waitUntil(t, &wg, time.Until(end))

assert.Equal(t, totalReceiveCount, expectedFailures+numMessages)
assert.Equal(t, processedCount, numMessages)
}

func (suite *eventHubSuite) TestEpochReceivers() {
tests := map[string]func(context.Context, *testing.T, *Hub, []string, string){
"TestEpochGreaterThenLess": testEpochGreaterThenLess,
Expand Down

0 comments on commit b8f9eff

Please sign in to comment.