Skip to content

Commit

Permalink
fix warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
stanleysmall-microsoft committed Apr 24, 2024
1 parent 6247c64 commit 767b7ef
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ await corpusClient.SetMetadataAsync(new Dictionary<string, string>
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to embed: {Name}, error: {Message}", blobName, ex.Message);
logger.LogError(ex, "Failed to embed: {Name}, error: {Message} stackTrace: {StackTrace}", blobName, ex.Message, ex.StackTrace);
throw;
}
}
Expand Down
13 changes: 6 additions & 7 deletions app/shared/Shared/Services/AzureCacheSearchService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public async Task<SupportingContentRecord[]> QueryDocumentsAsync(
}
catch (Exception e)
{
throw new Exception($"Failed to get embeddings: openAiEndpoint={openAiEndpoint} openAiEmbeddingDeployment={openAiEmbeddingDeployment} {e.Message}");
throw new InvalidOperationException($"Failed to get embeddings: openAiEndpoint={openAiEndpoint} openAiEmbeddingDeployment={openAiEmbeddingDeployment} {e.Message}");
}
}

Expand All @@ -54,8 +54,8 @@ public async Task<SupportingContentRecord[]> QueryDocumentsAsync(
var sb = new List<SupportingContentRecord>();
foreach (var doc in searchResults)
{
string sourcePage = doc.GetValueOrDefault("sourcepage").ToString();
string content = doc.GetValueOrDefault("content").ToString();
string sourcePage = doc.GetValueOrDefault("sourcepage")?.ToString() ?? string.Empty;
string content = doc.GetValueOrDefault("content")?.ToString() ?? string.Empty;
content = content.Replace('\r', ' ').Replace('\n', ' ');
sb.Add(new SupportingContentRecord(sourcePage, content));
}
Expand All @@ -70,8 +70,7 @@ private async Task<List<Dictionary<string, object>>> SearchVectorIndexAsync(stri
var searchCommand = new List<object>
{
indexName,
//@category:{category}
$"*=>[KNN {topK} @embedding $query_vector]",
$"@category:{category}=>[KNN {topK} @embedding $query_vector]",
"PARAMS", "2",
"query_vector", queryVector,
"RETURN", "6", "__embedding_score", "id", "content", "category", "sourcepage", "sourcefile",
Expand Down Expand Up @@ -140,8 +139,8 @@ public async Task<SupportingImageRecord[]> QueryImagesAsync(
var sb = new List<SupportingImageRecord>();
foreach (var doc in searchResults)
{
string name = doc.GetValueOrDefault("content").ToString();
string url = doc.GetValueOrDefault("id").ToString();
string name = doc.GetValueOrDefault("content")?.ToString() ?? string.Empty;
string url = doc.GetValueOrDefault("id")?.ToString() ?? string.Empty;
sb.Add(new SupportingImageRecord(name, url));
}

Expand Down
275 changes: 145 additions & 130 deletions app/shared/Shared/Services/RedisConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,173 +3,188 @@
using System.Net.Sockets;
using StackExchange.Redis;

namespace EmbedFunctions.Services
namespace EmbedFunctions.Services;

public class RedisConnection : IDisposable
{
public class RedisConnection : IDisposable
private long _lastReconnectTicks = DateTimeOffset.MinValue.UtcTicks;
private DateTimeOffset _firstErrorTime = DateTimeOffset.MinValue;
private DateTimeOffset _previousErrorTime = DateTimeOffset.MinValue;

// StackExchange.Redis will also be trying to reconnect internally,
// so limit how often we recreate the ConnectionMultiplexer instance
// in an attempt to reconnect
private readonly TimeSpan _reconnectMinInterval = TimeSpan.FromSeconds(60);

// If errors occur for longer than this threshold, StackExchange.Redis
// may be failing to reconnect internally, so we'll recreate the
// ConnectionMultiplexer instance
private readonly TimeSpan _reconnectErrorThreshold = TimeSpan.FromSeconds(30);
private readonly TimeSpan _restartConnectionTimeout = TimeSpan.FromSeconds(15);
private const int RetryMaxAttempts = 5;

private SemaphoreSlim _reconnectSemaphore = new SemaphoreSlim(initialCount: 1, maxCount: 1);
private readonly string _connectionString;
private ConnectionMultiplexer? _connection;
private IDatabase? _database;

private RedisConnection(string connectionString)
{
private long _lastReconnectTicks = DateTimeOffset.MinValue.UtcTicks;
private DateTimeOffset _firstErrorTime = DateTimeOffset.MinValue;
private DateTimeOffset _previousErrorTime = DateTimeOffset.MinValue;

// StackExchange.Redis will also be trying to reconnect internally,
// so limit how often we recreate the ConnectionMultiplexer instance
// in an attempt to reconnect
private readonly TimeSpan _reconnectMinInterval = TimeSpan.FromSeconds(60);

// If errors occur for longer than this threshold, StackExchange.Redis
// may be failing to reconnect internally, so we'll recreate the
// ConnectionMultiplexer instance
private readonly TimeSpan _reconnectErrorThreshold = TimeSpan.FromSeconds(30);
private readonly TimeSpan _restartConnectionTimeout = TimeSpan.FromSeconds(15);
private const int RetryMaxAttempts = 5;

private SemaphoreSlim _reconnectSemaphore = new SemaphoreSlim(initialCount: 1, maxCount: 1);
private readonly string _connectionString;
private ConnectionMultiplexer _connection;
private IDatabase _database;

private RedisConnection(string connectionString)
{
_connectionString = connectionString;
}
_connectionString = connectionString;
}

public static async Task<RedisConnection> InitializeAsync(string connectionString)
{
var redisConnection = new RedisConnection(connectionString);
await redisConnection.ForceReconnectAsync(initializing: true);
public static async Task<RedisConnection> InitializeAsync(string connectionString)
{
var redisConnection = new RedisConnection(connectionString);
await redisConnection.ForceReconnectAsync(initializing: true);

return redisConnection;
}
return redisConnection;
}

// In real applications, consider using a framework such as
// Polly to make it easier to customize the retry approach.
// For more info, please see: https://github.com/App-vNext/Polly
public async Task<T> BasicRetryAsync<T>(Func<IDatabase, Task<T>> func)
{
int reconnectRetry = 0;
// In real applications, consider using a framework such as
// Polly to make it easier to customize the retry approach.
// For more info, please see: https://github.com/App-vNext/Polly
public async Task<T> BasicRetryAsync<T>(Func<IDatabase, Task<T>> func)
{
int reconnectRetry = 0;

while (true)
while (true)
{
try
{
try
if (_database != null)
{
return await func(_database);
}
catch (Exception ex) when (ex is RedisConnectionException || ex is SocketException || ex is ObjectDisposedException)
else
{
reconnectRetry++;
if (reconnectRetry > RetryMaxAttempts)
{
throw;
}

try
{
await ForceReconnectAsync();
}
catch (ObjectDisposedException) { }
throw new InvalidOperationException("Database connection is null.");
}
}
catch (Exception ex) when (ex is RedisConnectionException || ex is SocketException || ex is ObjectDisposedException)
{
reconnectRetry++;
if (reconnectRetry > RetryMaxAttempts)
{
throw;
}

try
{
await ForceReconnectAsync();
}
catch (ObjectDisposedException) { }
}
}
}

/// <summary>
/// Force a new ConnectionMultiplexer to be created.
/// NOTES:
/// 1. Users of the ConnectionMultiplexer MUST handle ObjectDisposedExceptions, which can now happen as a result of calling ForceReconnectAsync().
/// 2. Call ForceReconnectAsync() for RedisConnectionExceptions and RedisSocketExceptions. You can also call it for RedisTimeoutExceptions,
/// but only if you're using generous ReconnectMinInterval and ReconnectErrorThreshold. Otherwise, establishing new connections can cause
/// a cascade failure on a server that's timing out because it's already overloaded.
/// 3. The code will:
/// a. wait to reconnect for at least the "ReconnectErrorThreshold" time of repeated errors before actually reconnecting
/// b. not reconnect more frequently than configured in "ReconnectMinInterval"
/// </summary>
/// <param name="initializing">Should only be true when ForceReconnect is running at startup.</param>
private async Task ForceReconnectAsync(bool initializing = false)
{
long previousTicks = Interlocked.Read(ref _lastReconnectTicks);
var previousReconnectTime = new DateTimeOffset(previousTicks, TimeSpan.Zero);
TimeSpan elapsedSinceLastReconnect = DateTimeOffset.UtcNow - previousReconnectTime;

/// <summary>
/// Force a new ConnectionMultiplexer to be created.
/// NOTES:
/// 1. Users of the ConnectionMultiplexer MUST handle ObjectDisposedExceptions, which can now happen as a result of calling ForceReconnectAsync().
/// 2. Call ForceReconnectAsync() for RedisConnectionExceptions and RedisSocketExceptions. You can also call it for RedisTimeoutExceptions,
/// but only if you're using generous ReconnectMinInterval and ReconnectErrorThreshold. Otherwise, establishing new connections can cause
/// a cascade failure on a server that's timing out because it's already overloaded.
/// 3. The code will:
/// a. wait to reconnect for at least the "ReconnectErrorThreshold" time of repeated errors before actually reconnecting
/// b. not reconnect more frequently than configured in "ReconnectMinInterval"
/// </summary>
/// <param name="initializing">Should only be true when ForceReconnect is running at startup.</param>
private async Task ForceReconnectAsync(bool initializing = false)
// We want to limit how often we perform this top-level reconnect, so we check how long it's been since our last attempt.
if (elapsedSinceLastReconnect < _reconnectMinInterval)
{
long previousTicks = Interlocked.Read(ref _lastReconnectTicks);
var previousReconnectTime = new DateTimeOffset(previousTicks, TimeSpan.Zero);
TimeSpan elapsedSinceLastReconnect = DateTimeOffset.UtcNow - previousReconnectTime;
return;
}

// We want to limit how often we perform this top-level reconnect, so we check how long it's been since our last attempt.
if (elapsedSinceLastReconnect < _reconnectMinInterval)
{
return;
}
bool lockTaken = await _reconnectSemaphore.WaitAsync(_restartConnectionTimeout);
if (!lockTaken)
{
// If we fail to enter the semaphore, then it is possible that another thread has already done so.
// ForceReconnectAsync() can be retried while connectivity problems persist.
return;
}

bool lockTaken = await _reconnectSemaphore.WaitAsync(_restartConnectionTimeout);
if (!lockTaken)
try
{
var utcNow = DateTimeOffset.UtcNow;
previousTicks = Interlocked.Read(ref _lastReconnectTicks);
previousReconnectTime = new DateTimeOffset(previousTicks, TimeSpan.Zero);
elapsedSinceLastReconnect = utcNow - previousReconnectTime;

if (_firstErrorTime == DateTimeOffset.MinValue && !initializing)
{
// If we fail to enter the semaphore, then it is possible that another thread has already done so.
// ForceReconnectAsync() can be retried while connectivity problems persist.
// We haven't seen an error since last reconnect, so set initial values.
_firstErrorTime = utcNow;
_previousErrorTime = utcNow;
return;
}

try
if (elapsedSinceLastReconnect < _reconnectMinInterval)
{
var utcNow = DateTimeOffset.UtcNow;
previousTicks = Interlocked.Read(ref _lastReconnectTicks);
previousReconnectTime = new DateTimeOffset(previousTicks, TimeSpan.Zero);
elapsedSinceLastReconnect = utcNow - previousReconnectTime;

if (_firstErrorTime == DateTimeOffset.MinValue && !initializing)
{
// We haven't seen an error since last reconnect, so set initial values.
_firstErrorTime = utcNow;
_previousErrorTime = utcNow;
return;
}

if (elapsedSinceLastReconnect < _reconnectMinInterval)
{
return; // Some other thread made it through the check and the lock, so nothing to do.
}
return; // Some other thread made it through the check and the lock, so nothing to do.
}

TimeSpan elapsedSinceFirstError = utcNow - _firstErrorTime;
TimeSpan elapsedSinceMostRecentError = utcNow - _previousErrorTime;
TimeSpan elapsedSinceFirstError = utcNow - _firstErrorTime;
TimeSpan elapsedSinceMostRecentError = utcNow - _previousErrorTime;

bool shouldReconnect =
elapsedSinceFirstError >= _reconnectErrorThreshold // Make sure we gave the multiplexer enough time to reconnect on its own if it could.
&& elapsedSinceMostRecentError <= _reconnectErrorThreshold; // Make sure we aren't working on stale data (e.g. if there was a gap in errors, don't reconnect yet).
bool shouldReconnect =
elapsedSinceFirstError >= _reconnectErrorThreshold // Make sure we gave the multiplexer enough time to reconnect on its own if it could.
&& elapsedSinceMostRecentError <= _reconnectErrorThreshold; // Make sure we aren't working on stale data (e.g. if there was a gap in errors, don't reconnect yet).

// Update the previousErrorTime timestamp to be now (e.g. this reconnect request).
_previousErrorTime = utcNow;
// Update the previousErrorTime timestamp to be now (e.g. this reconnect request).
_previousErrorTime = utcNow;

if (!shouldReconnect && !initializing)
{
return;
}
if (!shouldReconnect && !initializing)
{
return;
}

_firstErrorTime = DateTimeOffset.MinValue;
_previousErrorTime = DateTimeOffset.MinValue;
_firstErrorTime = DateTimeOffset.MinValue;
_previousErrorTime = DateTimeOffset.MinValue;

// Create a new connection
ConnectionMultiplexer _newConnection = await ConnectionMultiplexer.ConnectAsync(_connectionString);
// Create a new connection
ConnectionMultiplexer _newConnection = await ConnectionMultiplexer.ConnectAsync(_connectionString);

// Swap current connection with the new connection
ConnectionMultiplexer oldConnection = Interlocked.Exchange(ref _connection, _newConnection);
// Swap current connection with the new connection
ConnectionMultiplexer oldConnection = Interlocked.Exchange(ref _connection!, _newConnection);

Interlocked.Exchange(ref _lastReconnectTicks, utcNow.UtcTicks);
IDatabase newDatabase = _connection.GetDatabase();
Interlocked.Exchange(ref _database, newDatabase);
Interlocked.Exchange(ref _lastReconnectTicks, utcNow.UtcTicks);
IDatabase newDatabase = _connection.GetDatabase();
Interlocked.Exchange(ref _database, newDatabase);

if (oldConnection != null)
if (oldConnection != null)
{
try
{
try
{
await oldConnection.CloseAsync();
}
catch
{
// Ignore any errors from the old connection
}
await oldConnection.CloseAsync();
}
catch
{
// Ignore any errors from the old connection
}
}
finally
{
_reconnectSemaphore.Release();
}
}
finally
{
_reconnectSemaphore.Release();
}
}

public void Dispose()
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}

protected virtual void Dispose(bool disposing)
{
if (disposing)
{
try { _connection?.Dispose(); } catch { }
}
Expand Down

0 comments on commit 767b7ef

Please sign in to comment.