Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: emit metrics from CRT HTTP engine #1017

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changes/e7c3c7ab-749e-4371-8b25-42ea76aa870d.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"id": "e7c3c7ab-749e-4371-8b25-42ea76aa870d",
"type": "feature",
"description": "Emit metrics from CRT HTTP engine",
"issues": [
"https://github.com/awslabs/smithy-kotlin/issues/893"
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@ import aws.smithy.kotlin.runtime.crt.SdkDefaultIO
import aws.smithy.kotlin.runtime.http.HttpErrorCode
import aws.smithy.kotlin.runtime.http.HttpException
import aws.smithy.kotlin.runtime.http.engine.ProxyConfig
import aws.smithy.kotlin.runtime.http.engine.internal.HttpClientMetrics
import aws.smithy.kotlin.runtime.http.request.HttpRequest
import aws.smithy.kotlin.runtime.io.Closeable
import aws.smithy.kotlin.runtime.net.TlsVersion
import aws.smithy.kotlin.runtime.telemetry.metrics.measureSeconds
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.TimeoutCancellationException
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.Semaphore
Expand All @@ -27,8 +30,10 @@ import aws.smithy.kotlin.runtime.net.TlsVersion as SdkTlsVersion

internal class ConnectionManager(
private val config: CrtHttpEngineConfig,
private val metrics: HttpClientMetrics,
) : Closeable {
private val leases = Semaphore(config.maxConnections.toInt())
private val pending = atomic(0L)

private val crtTlsContext: TlsContext = TlsContextOptionsBuilder()
.apply {
Expand Down Expand Up @@ -61,16 +66,22 @@ internal class ConnectionManager(
val manager = getManagerForUri(request.uri, proxyConfig)
var leaseAcquired = false

metrics.queuedRequests = pending.incrementAndGet()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correctness: If we are trying to acquire a connection then it's not queued right? I think queued would be before we hit the requestLimiter.

metrics.queuedRequests = pending.incrementAndGet()
requestLimiter.withPermit {
    metrics.queuedRequests = pending.decrementAndGet()
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, if "queued" means "before we attempt to acquire a connection" then I'm guessing that the requestsQueuedDuration measurement below is also wrong. I'll move it too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had to look at the definitions I gave them again and I think this would be inline with what it says

queued=waiting to be executed (e.g. waiting for thread to be available), in-flight=actively processing

If we are to the point of acquiring a connection we are "actively processing" the request. I can see where the definition could be interpreted differently though as in "I have all the resources needed at this point to execute the request" but I think establishing a connection (or waiting on one to be available) is part of overall request processing. Curious if others disagree.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still doesn't look correct, I don't think queuedRequests is calculated in connection manager.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I think I was confusing the semaphore inside ConnectionManager with the one inside CrtHttpEngine. Switching.


return try {
// wait for an actual connection
val conn = withTimeout(config.connectionAcquireTimeout) {
// get a permit to acquire a connection (limits overall connections since managers are per/host)
leases.acquire()
leaseAcquired = true
manager.acquireConnection()
}
metrics.requestsQueuedDuration.measureSeconds {
// wait for an actual connection
val conn = withTimeout(config.connectionAcquireTimeout) {
// get a permit to acquire a connection (limits overall connections since managers are per/host)
leases.acquire()
leaseAcquired = true
metrics.connectionAcquireDuration.measureSeconds {
manager.acquireConnection()
}
}

LeasedConnection(conn)
LeasedConnection(conn)
}
} catch (ex: Exception) {
if (leaseAcquired) {
leases.release()
Expand All @@ -82,8 +93,18 @@ internal class ConnectionManager(
}

throw httpEx
} finally {
metrics.queuedRequests = pending.decrementAndGet()
emitConnections()
}
}

private fun emitConnections() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit / naming: emitConnectionsMetrics?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the class already has the word "connection" in it, renaming to simply emitMetrics.

val idleConnections = leases.availablePermits.toLong()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correctness: this semaphore isn't tied in anyway to actual connections in CRT, you could have 100 max connections configured but that doesn't mean you have 100 idle connections already connected waiting to be used. I'm thinking this would have to come from the actual CRT connection manager.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh jeez, good point. Yes, this is clearly flawed.

That brings up an interesting wrinkle, though. We don't have just one CRT connection manager, we have one for each host. Does the Smithy metric for acquired connections equal the sum of acquired connections across all underlying CRT connection managers?

Moreover, we have a CRT config param called maxConnections. Contrary to its name, it's used as the maximum number of connections for each underlying CRT connection manager. Meaning if maxConnections is 100 and we connect to 3 hosts, there are actually 3 underlying CRT connection managers which each have a max of 100 connections for a total of 300. How then do we measure the Smithy metric of idle connections? Is it the sum of idle connections across all underlying CRT connection managers (which may be more than maxConnections)? Or is it maxConnections minus the sum of acquired connections across all underlying CRT connection managers (which may be less than the actual number of open/idle connections)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fact CRT uses connection manager per host is an implementation detail, the config setting is at the engine level. This is what the leases semaphore is for is to control max connections across multiple managers (albeit it likely isn't perfect at ensuring we never cross the maxConnections threshold due to idle connections).

The reason I allowed each connection manager to be configured with maxConnections is because we don't know how many hosts an engine will be used to connect to, it may be 1 or it may be split across many hosts which will require many connection managers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see this as an area that may cause confusion later, especially if users begin correlating the idle connections metric with connections reported by the OS or JVM. I don't have a better suggestion though given how the CRT connection manager works. I'll switch the implementation to calculate the idle connections metric as maxConnections minus the sum of acquired connections across all underlying CRT connection managers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh and interrogating the number of active connections from the CRT connection managers requires another revision of aws-crt-kotlin: awslabs/aws-crt-kotlin#88

metrics.idleConnections = idleConnections
metrics.acquiredConnections = config.maxConnections.toLong() - idleConnections
}

private suspend fun getManagerForUri(uri: Uri, proxyConfig: ProxyConfig): HttpClientConnectionManager = mutex.withLock {
connManagers.getOrPut(uri.authority) {
val connOpts = options.apply {
Expand All @@ -105,6 +126,7 @@ internal class ConnectionManager(
HttpClientConnectionManager(connOpts)
}
}

override fun close() {
connManagers.forEach { entry -> entry.value.close() }
crtTlsContext.close()
Expand All @@ -117,6 +139,7 @@ internal class ConnectionManager(
delegate.close()
} finally {
leases.release()
emitConnections()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import aws.smithy.kotlin.runtime.http.config.EngineFactory
import aws.smithy.kotlin.runtime.http.engine.HttpClientEngine
import aws.smithy.kotlin.runtime.http.engine.HttpClientEngineBase
import aws.smithy.kotlin.runtime.http.engine.callContext
import aws.smithy.kotlin.runtime.http.engine.internal.HttpClientMetrics
import aws.smithy.kotlin.runtime.http.request.HttpRequest
import aws.smithy.kotlin.runtime.io.internal.SdkDispatchers
import aws.smithy.kotlin.runtime.operation.ExecutionContext
Expand All @@ -22,6 +23,8 @@ import kotlinx.coroutines.sync.withPermit
internal const val DEFAULT_WINDOW_SIZE_BYTES: Int = 16 * 1024
internal const val CHUNK_BUFFER_SIZE: Long = 64 * 1024

private const val TELEMETRY_SCOPE = "aws.smithy.kotlin.runtime.http.engine.crt"

/**
* [HttpClientEngine] based on the AWS Common Runtime HTTP client
*/
Expand Down Expand Up @@ -51,7 +54,11 @@ public class CrtHttpEngine(public override val config: CrtHttpEngineConfig) : Ht
// }

private val requestLimiter = Semaphore(config.maxConcurrency.toInt())
private val connectionManager = ConnectionManager(config)
private val metrics = HttpClientMetrics(TELEMETRY_SCOPE, config.telemetryProvider).apply {
connectionsLimit = config.maxConnections.toLong()
requestConcurrencyLimit = config.maxConcurrency.toLong()
}
private val connectionManager = ConnectionManager(config, metrics)

override suspend fun roundTrip(context: ExecutionContext, request: HttpRequest): HttpCall = requestLimiter.withPermit {
val callContext = callContext()
Expand All @@ -63,15 +70,15 @@ public class CrtHttpEngine(public override val config: CrtHttpEngineConfig) : Ht
val conn = connectionManager.acquire(request)
logger.trace { "Acquired connection ${conn.id}" }

val respHandler = SdkStreamResponseHandler(conn, callContext)
val respHandler = SdkStreamResponseHandler(conn, callContext, context, metrics)
callContext.job.invokeOnCompletion {
logger.trace { "completing handler; cause=$it" }
// ensures the stream is driven to completion regardless of what the downstream consumer does
respHandler.complete()
}

val reqTime = Instant.now()
val engineRequest = request.toCrtRequest(callContext)
val engineRequest = request.toCrtRequest(callContext, metrics)

val stream = mapCrtException {
conn.makeRequest(engineRequest, respHandler).also { stream ->
Expand All @@ -81,7 +88,7 @@ public class CrtHttpEngine(public override val config: CrtHttpEngineConfig) : Ht

if (request.isChunked) {
withContext(SdkDispatchers.IO) {
stream.sendChunkedBody(request.body)
stream.sendChunkedBody(request.body, metrics)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package aws.smithy.kotlin.runtime.http.engine.crt
import aws.sdk.kotlin.crt.CRT
import aws.sdk.kotlin.crt.CrtRuntimeException
import aws.sdk.kotlin.crt.http.HeadersBuilder
import aws.sdk.kotlin.crt.http.HttpRequestBodyStream
import aws.sdk.kotlin.crt.http.HttpStream
import aws.sdk.kotlin.crt.io.Protocol
import aws.sdk.kotlin.crt.io.Uri
Expand All @@ -18,10 +17,13 @@ import aws.smithy.kotlin.runtime.crt.SdkSourceBodyStream
import aws.smithy.kotlin.runtime.http.HttpBody
import aws.smithy.kotlin.runtime.http.HttpErrorCode
import aws.smithy.kotlin.runtime.http.HttpException
import aws.smithy.kotlin.runtime.http.engine.crt.io.reportingTo
import aws.smithy.kotlin.runtime.http.engine.internal.HttpClientMetrics
import aws.smithy.kotlin.runtime.http.request.HttpRequest
import aws.smithy.kotlin.runtime.io.SdkBuffer
import aws.smithy.kotlin.runtime.io.buffer
import aws.smithy.kotlin.runtime.io.readToByteArray
import aws.smithy.kotlin.runtime.io.source
import kotlinx.coroutines.job
import kotlin.coroutines.CoroutineContext

Expand All @@ -42,18 +44,27 @@ internal val HttpRequest.uri: Uri
}
}

internal fun HttpRequest.toCrtRequest(callContext: CoroutineContext): aws.sdk.kotlin.crt.http.HttpRequest {
internal fun HttpRequest.toCrtRequest(
callContext: CoroutineContext,
metrics: HttpClientMetrics,
): aws.sdk.kotlin.crt.http.HttpRequest {
val body = this.body
check(!body.isDuplex) { "CrtHttpEngine does not yet support full duplex streams" }
val bodyStream = if (isChunked) {
null
} else {
when (body) {
is HttpBody.Empty -> null
is HttpBody.Bytes -> HttpRequestBodyStream.fromByteArray(body.bytes())
is HttpBody.ChannelContent -> ReadChannelBodyStream(body.readFrom(), callContext)
is HttpBody.Bytes -> {
val source = body.bytes().source().reportingTo(metrics.bytesSent)
SdkSourceBodyStream(source)
}
is HttpBody.ChannelContent -> {
val source = body.readFrom().reportingTo(metrics.bytesSent)
ReadChannelBodyStream(source, callContext)
}
is HttpBody.SourceContent -> {
val source = body.readFrom()
val source = body.readFrom().reportingTo(metrics.bytesSent)
callContext.job.invokeOnCompletion {
source.close()
}
Expand Down Expand Up @@ -85,10 +96,10 @@ internal val HttpRequest.isChunked: Boolean get() = (this.body is HttpBody.Sourc
* Send a chunked body using the CRT writeChunk bindings.
* @param body an HTTP body that has a chunked content encoding. Must be [HttpBody.SourceContent] or [HttpBody.ChannelContent]
*/
internal suspend fun HttpStream.sendChunkedBody(body: HttpBody) {
internal suspend fun HttpStream.sendChunkedBody(body: HttpBody, metrics: HttpClientMetrics) {
when (body) {
is HttpBody.SourceContent -> {
val source = body.readFrom()
val source = body.readFrom().reportingTo(metrics.bytesSent)
val bufferedSource = source.buffer()

while (!bufferedSource.exhausted()) {
Expand All @@ -97,7 +108,7 @@ internal suspend fun HttpStream.sendChunkedBody(body: HttpBody) {
}
}
is HttpBody.ChannelContent -> {
val chan = body.readFrom()
val chan = body.readFrom().reportingTo(metrics.bytesSent)
var buffer = SdkBuffer()
val nextBuffer = SdkBuffer()
var sentFirstChunk = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,27 @@ import aws.sdk.kotlin.crt.io.Buffer
import aws.smithy.kotlin.runtime.http.*
import aws.smithy.kotlin.runtime.http.HeadersBuilder
import aws.smithy.kotlin.runtime.http.HttpException
import aws.smithy.kotlin.runtime.http.engine.EngineAttributes
import aws.smithy.kotlin.runtime.http.engine.crt.io.reportingTo
import aws.smithy.kotlin.runtime.http.engine.internal.HttpClientMetrics
import aws.smithy.kotlin.runtime.http.response.HttpResponse
import aws.smithy.kotlin.runtime.http.response.copy
import aws.smithy.kotlin.runtime.io.SdkBuffer
import aws.smithy.kotlin.runtime.io.SdkByteChannel
import aws.smithy.kotlin.runtime.io.SdkByteReadChannel
import aws.smithy.kotlin.runtime.io.source
import aws.smithy.kotlin.runtime.operation.ExecutionContext
import aws.smithy.kotlin.runtime.telemetry.logging.logger
import aws.smithy.kotlin.runtime.telemetry.metrics.recordSeconds
import aws.smithy.kotlin.runtime.time.Instant
import aws.smithy.kotlin.runtime.time.fromEpochNanoseconds
import aws.smithy.kotlin.runtime.util.derivedName
import kotlinx.atomicfu.locks.reentrantLock
import kotlinx.atomicfu.locks.withLock
import kotlinx.coroutines.*
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlin.coroutines.CoroutineContext

/**
Expand All @@ -30,6 +41,8 @@ import kotlin.coroutines.CoroutineContext
internal class SdkStreamResponseHandler(
private val conn: HttpClientConnection,
private val callContext: CoroutineContext,
private val execContext: ExecutionContext,
private val clientMetrics: HttpClientMetrics,
) : HttpStreamResponseHandler {
// TODO - need to cancel the stream when the body is closed from the caller side early.
// There is no great way to do that currently without either (1) closing the connection or (2) throwing an
Expand Down Expand Up @@ -57,6 +70,10 @@ internal class SdkStreamResponseHandler(

private var streamCompleted = false

init {
clientMetrics.incrementInflightRequests()
}

/**
* Called by the response read channel as data is consumed
* @param size the number of bytes consumed
Expand Down Expand Up @@ -184,7 +201,30 @@ internal class SdkStreamResponseHandler(
}

internal suspend fun waitForResponse(): HttpResponse =
responseReady.receive()
responseReady.receive().wrapBody()

private fun HttpResponse.wrapBody(): HttpResponse {
val wrappedBody = when (val originalBody = body) {
is HttpBody.Empty -> return this // Don't need an object copy since we're not wrapping the body
is HttpBody.Bytes ->
originalBody
.bytes()
.source()
.reportingTo(clientMetrics.bytesReceived)
.toHttpBody(originalBody.contentLength)
is HttpBody.SourceContent ->
originalBody
.readFrom()
.reportingTo(clientMetrics.bytesReceived)
.toHttpBody(originalBody.contentLength)
is HttpBody.ChannelContent ->
originalBody
.readFrom()
.reportingTo(clientMetrics.bytesReceived)
.toHttpBody(originalBody.contentLength)
}
return copy(body = wrappedBody)
}

/**
* Invoked only after the consumer is finished with the response and it is safe to cleanup resources
Expand All @@ -197,6 +237,8 @@ internal class SdkStreamResponseHandler(
// and more data is pending arrival). It can also happen if the coroutine for this request is cancelled
// before onResponseComplete fires.
lock.withLock {
clientMetrics.decrementInflightRequests()

val forceClose = !streamCompleted

if (forceClose) {
Expand All @@ -210,4 +252,19 @@ internal class SdkStreamResponseHandler(
conn.close()
}
}

override fun onMetrics(stream: HttpStream, metrics: HttpStreamMetrics) {
val sendEnd = positiveInstantOrNull(metrics.sendEndTimestampNs)
val receiveStart = positiveInstantOrNull(metrics.receiveStartTimestampNs)

if (sendEnd != null && receiveStart != null) {
val ttfb = receiveStart - sendEnd
if (ttfb.isPositive()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: have you seen any instances where this is negative?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not directly but event streams may begin receiving traffic before sending has concluded since the communication is bidirectional and duplexed. It seemed prudent in that situation to not report TTFB.

clientMetrics.timeToFirstByteDuration.recordSeconds(ttfb)
execContext[EngineAttributes.TimeToFirstByte] = ttfb
}
}
}
}

private fun positiveInstantOrNull(ns: Long): Instant? = if (ns > 0) Instant.fromEpochNanoseconds(ns) else null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment: These new body types are missing tests

Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/
package aws.smithy.kotlin.runtime.http.engine.crt.io

import aws.smithy.kotlin.runtime.io.SdkBuffer
import aws.smithy.kotlin.runtime.io.SdkByteReadChannel
import aws.smithy.kotlin.runtime.telemetry.metrics.MonotonicCounter

private class ReportingByteReadChannel(
val delegate: SdkByteReadChannel,
val metric: MonotonicCounter,
) : SdkByteReadChannel by delegate {
override suspend fun read(sink: SdkBuffer, limit: Long): Long = delegate.read(sink, limit).also {
if (it > 0) metric.add(it)
}
}

internal fun SdkByteReadChannel.reportingTo(metric: MonotonicCounter): SdkByteReadChannel =
ReportingByteReadChannel(this, metric)
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/
package aws.smithy.kotlin.runtime.http.engine.crt.io

import aws.smithy.kotlin.runtime.io.SdkBuffer
import aws.smithy.kotlin.runtime.io.SdkSource
import aws.smithy.kotlin.runtime.telemetry.metrics.MonotonicCounter

private class ReportingSource(val delegate: SdkSource, val metric: MonotonicCounter) : SdkSource by delegate {
override fun read(sink: SdkBuffer, limit: Long): Long = delegate.read(sink, limit).also {
if (it > 0) metric.add(it)
}
}

internal fun SdkSource.reportingTo(metric: MonotonicCounter): SdkSource = ReportingSource(this, metric)
Loading
Loading