Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a connection reuse race #191

Merged
merged 1 commit into from
Nov 24, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 51 additions & 15 deletions lib/Cro/HTTP/Client.pm6
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ class Cro::HTTP::Client::Policy::Timeout does Cro::Policy::Timeout[%(
#| for multiple requests, as well as allowing configuration of common properties of
#| many requests at construction time.
class Cro::HTTP::Client {
my class PipelineClosedBeforeHeaders is Exception { }
my class Pipeline {
has Bool $.secure;
has Str $.host;
Expand All @@ -127,6 +128,7 @@ class Cro::HTTP::Client {
has Tap $!tap;
has $!next-response-vow;
has Bool $.dead = False;
has Lock::Async $!lock .= new;

submethod BUILD(:$!secure!, :$!host!, :$!port!, :$!in!, :$out!) {
$!tap = supply {
Expand All @@ -136,18 +138,22 @@ class Cro::HTTP::Client {
$vow.keep($_);
LAST {
$!dead = True;
if $!next-response-vow {
$!next-response-vow.break:
'Connection unexpectedly closed before response headers received';
$!next-response-vow = Nil;
$!lock.protect: {
if $!next-response-vow {
$!next-response-vow.break:
PipelineClosedBeforeHeaders.new;
$!next-response-vow = Nil;
}
}
}
QUIT {
default {
$!dead = True;
if $!next-response-vow {
$!next-response-vow.break($_);
$!next-response-vow = Nil;
$!lock.protect: {
if $!next-response-vow {
$!next-response-vow.break($_);
$!next-response-vow = Nil;
}
}
}
}
Expand All @@ -156,8 +162,21 @@ class Cro::HTTP::Client {
}

method send-request($request --> Promise) {
my $next-response-promise = Promise.new;
$!next-response-vow = $next-response-promise.vow;
my $next-response-promise;
my $broken = False;
$!lock.protect: {
if $!dead {
# Without https://github.com/MoarVM/MoarVM/pull/1782 merged,
# we can't put the below return here.
$broken = True;
}
else {
$next-response-promise = Promise.new;
$!next-response-vow = $next-response-promise.vow;
}
}
return Promise.broken(PipelineClosedBeforeHeaders.new) if $broken;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this line would be better in the if $!dead arm and no need for var $broken

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it would be better. But see the comment. ;-)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To elaborate a bit more, the PR referenced in the comment actually has been merged. But I don't intentionally want to break Cro for Rakudo versions <= 2024.05 yet.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh - yes I see the comment is related - missed that!


$!in.emit($request);
return $next-response-promise;
}
Expand Down Expand Up @@ -217,13 +236,22 @@ class Cro::HTTP::Client {

method send-request(Cro::HTTP::Request $request --> Promise) {
my $p = Promise.new;
my $broken = False;
$!lock.protect: {
my $stream-id = $!next-stream-id;
$!next-stream-id += 2;
$request.http2-stream-id = $stream-id;
$request.http-version = '2.0';
%!outstanding-stream-responses{$stream-id} = $p.vow;
if $!dead {
# Without https://github.com/MoarVM/MoarVM/pull/1782 merged,
# we can't put the below return here.
$broken = True;
}
else {
my $stream-id = $!next-stream-id;
$!next-stream-id += 2;
$request.http2-stream-id = $stream-id;
$request.http-version = '2.0';
%!outstanding-stream-responses{$stream-id} = $p.vow;
}
}
return Promise.broken(PipelineClosedBeforeHeaders.new) if $broken;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

$!in.emit($request);
$p
}
Expand Down Expand Up @@ -617,7 +645,6 @@ class Cro::HTTP::Client {

# Send the request.
whenever $pipeline.send-request($request-object) {
$headers-kept = True;
QUIT {
$request-log.end;
when GoAwayRetry {
Expand All @@ -628,7 +655,16 @@ class Cro::HTTP::Client {
.goaway-exception.rethrow;
}
}
when PipelineClosedBeforeHeaders {
if $goaway-retries > 0 && !$headers-kept {
$retry-supplier.emit: True;
}
else {
.rethrow;
}
}
}
$headers-kept = True;

# Consider adding the connection back into the cache to use it
# again.
Expand Down
Loading