Skip to content

Commit

Permalink
Enhance PulsoidModule with reconnection logic
Browse files Browse the repository at this point in the history
Updated PulsoidModule.cs to track last message time and inactivity threshold. Improved WebSocket connection handling with automatic reconnections and token validation. Enhanced heart rate monitoring logic to check token validity on exceptions. UI changes in MainWindow.xaml include a new toggle button and layout adjustments. Updated project dependencies in MagicChatbox.csproj to newer versions.
  • Loading branch information
BoiHanny committed Feb 11, 2025
1 parent 05f55cf commit 2866c9c
Show file tree
Hide file tree
Showing 4 changed files with 268 additions and 85 deletions.
176 changes: 141 additions & 35 deletions vrcosc-magicchatbox/Classes/Modules/PulsoidModule.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
using Newtonsoft.Json;
using System.Net.WebSockets;
using System.Text;
using System.Collections.Concurrent;
using System.Timers;
using vrcosc_magicchatbox.ViewModels;
using vrcosc_magicchatbox.Classes.DataAndSecurity;
using vrcosc_magicchatbox.DataAndSecurity;
Expand Down Expand Up @@ -257,6 +255,8 @@ public partial class PulsoidModule : ObservableObject
private readonly Queue<Tuple<DateTime, int>> _heartRates = new();
private bool _isFetchingStatistics = false;
private DateTime _lastStateChangeTime = DateTime.MinValue;
private DateTime _lastMessageReceivedTime = DateTime.Now;
private readonly TimeSpan _inactivityThreshold = TimeSpan.FromSeconds(15);

// For OSC smoothing (count-based)
private readonly Queue<int> _oscHeartRates = new();
Expand Down Expand Up @@ -349,34 +349,76 @@ private void CheckMonitoringConditions()
}
}

private async Task ConnectToWebSocketAsync(string accessToken, CancellationToken cancellationToken)
private async Task ConnectToWebSocketWithReconnectAsync(string accessToken, CancellationToken cancellationToken)
{
_webSocket = new ClientWebSocket();
_webSocket.Options.SetRequestHeader("Authorization", $"Bearer {accessToken}");
_webSocket.Options.KeepAliveInterval = TimeSpan.FromSeconds(5);

try
int attempt = 0;
const int maxAttempts = 10; // Maximum reconnection attempts
while (!cancellationToken.IsCancellationRequested)
{
await _webSocket.ConnectAsync(new Uri("wss://dev.pulsoid.net/api/v1/data/real_time"), cancellationToken).ConfigureAwait(false);
try
{
// Initialize and configure a new WebSocket instance
_webSocket = new ClientWebSocket();
_webSocket.Options.SetRequestHeader("Authorization", $"Bearer {accessToken}");
_webSocket.Options.KeepAliveInterval = TimeSpan.FromSeconds(5);

Application.Current.Dispatcher.Invoke(() =>
// Attempt to connect to the Pulsoid WebSocket endpoint
await _webSocket.ConnectAsync(new Uri("wss://dev.pulsoid.net/api/v1/data/real_time"), cancellationToken);

// On successful connection, reset error flags and start the timer
Application.Current.Dispatcher.Invoke(() =>
{
PulsoidAccessError = false;
PulsoidAccessErrorTxt = "";
});
_processDataTimer.Start();

// Begin reading messages from the WebSocket
await HeartRateMonitoringLoopAsync(cancellationToken);
break; // Exit the loop on a clean exit
}
catch (WebSocketException ex)
{
PulsoidAccessError = false;
PulsoidAccessErrorTxt = "";
});
attempt++;
Logging.WriteInfo($"WebSocket connection attempt {attempt} failed: {ex.Message}");

_processDataTimer.Start();
// Immediately check if the token is still valid. If not, exit.
bool tokenValid = await PulsoidOAuthHandler.Instance.ValidateTokenAsync(accessToken);
if (!tokenValid)
{
Application.Current.Dispatcher.Invoke(() =>
{
PulsoidAccessError = true;
PulsoidAccessErrorTxt = "Access token invalid or revoked. Please reconnect.";
TriggerPulsoidAuthConnected(false);
});
return; // Stop further reconnection attempts.
}

await HeartRateMonitoringLoopAsync(cancellationToken).ConfigureAwait(false);
}
catch (WebSocketException ex)
{
Application.Current.Dispatcher.Invoke(() =>
if (attempt >= maxAttempts)
{
Application.Current.Dispatcher.Invoke(() =>
{
PulsoidAccessError = true;
PulsoidAccessErrorTxt = "Failed to connect after multiple attempts.";
});
return;
}
// Use exponential backoff (capped at 10 seconds)
int delayMs = Math.Min(10000, 2000 * (int)Math.Pow(2, attempt));
Logging.WriteInfo($"Retrying connection in {delayMs}ms...");
await Task.Delay(delayMs, cancellationToken);
}
catch (Exception ex)
{
PulsoidAccessError = true;
PulsoidAccessErrorTxt = ex.Message;
});
Logging.WriteException(ex, MSGBox: false);
Application.Current.Dispatcher.Invoke(() =>
{
PulsoidAccessError = true;
PulsoidAccessErrorTxt = ex.Message;
});
Logging.WriteException(ex);
return;
}
}
}

Expand Down Expand Up @@ -456,6 +498,9 @@ private void HandleHeartRateMessage(string message)
int rawHR = ParseHeartRateFromMessage(message);
if (rawHR == -1) return;

// Record the time of the last valid message.
_lastMessageReceivedTime = DateTime.Now;

if (Settings.ApplyHeartRateAdjustment)
{
rawHR += Settings.HeartRateAdjustment;
Expand All @@ -480,19 +525,32 @@ private void HandleHeartRateMessage(string message)
private async Task HeartRateMonitoringLoopAsync(CancellationToken cancellationToken)
{
var buffer = new byte[1024];

try
{
while (_webSocket != null && _webSocket.State == WebSocketState.Open && !cancellationToken.IsCancellationRequested)
while (_webSocket != null &&
_webSocket.State == WebSocketState.Open &&
!cancellationToken.IsCancellationRequested)
{
WebSocketReceiveResult result;
try
{
result = await _webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), cancellationToken).ConfigureAwait(false);
result = await _webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), cancellationToken);
}
catch (WebSocketException wex)
{
Logging.WriteInfo($"WebSocketException: {wex.Message}");
Logging.WriteInfo($"WebSocket exception during receive: {wex.Message}");
// On exception, check token immediately.
bool tokenValid = await PulsoidOAuthHandler.Instance.ValidateTokenAsync(ViewModel.Instance.PulsoidAccessTokenOAuth);
if (!tokenValid)
{
Application.Current.Dispatcher.Invoke(() =>
{
PulsoidAccessError = true;
PulsoidAccessErrorTxt = "Access token invalid or revoked. Please reconnect.";
TriggerPulsoidAuthConnected(false);
});
return; // Exit the monitoring loop.
}
break;
}
catch (OperationCanceledException)
Expand All @@ -501,26 +559,43 @@ private async Task HeartRateMonitoringLoopAsync(CancellationToken cancellationTo
}
catch (IOException ioex)
{
Logging.WriteInfo($"IOException while reading from WebSocket: {ioex.Message}");
Logging.WriteInfo($"IO exception during receive: {ioex.Message}");
break;
}

// Check if the server signals a closure
if (result.MessageType == WebSocketMessageType.Close)
{
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing", cancellationToken).ConfigureAwait(false);
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing", cancellationToken);
break;
}

// Process the received message
string message = Encoding.UTF8.GetString(buffer, 0, result.Count);
HandleHeartRateMessage(message);
}
}
finally
{
if (ShouldStartMonitoring() && !isMonitoringStarted)
// Before attempting reconnection, check token validity
if (ShouldStartMonitoring() && !cancellationToken.IsCancellationRequested)
{
await Task.Delay(5000).ConfigureAwait(false);
await Application.Current.Dispatcher.InvokeAsync(() => StartMonitoringHeartRateAsync()).Task.ConfigureAwait(false);
bool valid = await PulsoidOAuthHandler.Instance.ValidateTokenAsync(ViewModel.Instance.PulsoidAccessTokenOAuth);
if (valid)
{
Logging.WriteInfo("WebSocket connection lost, attempting reconnection...");
await Task.Delay(5000, cancellationToken);
await Application.Current.Dispatcher.InvokeAsync(() => StartMonitoringHeartRateAsync());
}
else
{
Application.Current.Dispatcher.Invoke(() =>
{
PulsoidAccessError = true;
PulsoidAccessErrorTxt = "Access token invalid or revoked. Please reconnect.";
TriggerPulsoidAuthConnected(false);
});
}
}
}
}
Expand Down Expand Up @@ -627,7 +702,9 @@ private void SendMCBHeartRateInfo(int hrValueForOSC)

private async Task StartMonitoringHeartRateAsync()
{
if (_cts != null || isMonitoringStarted) return;
// Avoid duplicate monitoring sessions
if (_cts != null || isMonitoringStarted)
return;

isMonitoringStarted = true;
string accessToken = ViewModel.Instance.PulsoidAccessTokenOAuth;
Expand All @@ -643,6 +720,7 @@ private async Task StartMonitoringHeartRateAsync()
return;
}

// Validate the access token before proceeding
bool isTokenValid = await PulsoidOAuthHandler.Instance.ValidateTokenAsync(accessToken).ConfigureAwait(false);
if (!isTokenValid)
{
Expand All @@ -656,16 +734,18 @@ private async Task StartMonitoringHeartRateAsync()
return;
}

// Create a cancellation token for this monitoring session and update UI text
_cts = new CancellationTokenSource();
UpdateFormattedHeartRateText();

try
{
await ConnectToWebSocketAsync(accessToken, _cts.Token).ConfigureAwait(false);
// Use the improved connection method that handles reconnection automatically
await ConnectToWebSocketWithReconnectAsync(accessToken, _cts.Token).ConfigureAwait(false);
}
catch (OperationCanceledException)
{

// Monitoring was cancelled; no further action required.
}
catch (Exception ex)
{
Expand Down Expand Up @@ -886,6 +966,32 @@ public void OnApplicationClosing()

public async void ProcessData()
{
// Check for inactivity: if no message has been received for a specified threshold
TimeSpan inactivity = DateTime.Now - _lastMessageReceivedTime;
if (inactivity > _inactivityThreshold)
{
bool tokenValid = await PulsoidOAuthHandler.Instance.ValidateTokenAsync(ViewModel.Instance.PulsoidAccessTokenOAuth);
if (!tokenValid)
{
Application.Current.Dispatcher.Invoke(() =>
{
PulsoidAccessError = true;
PulsoidAccessErrorTxt = "Access token invalid or revoked. Please reconnect.";
TriggerPulsoidAuthConnected(false);
});
StopMonitoringHeartRateAsync();
return;
}
else
{
Logging.WriteInfo($"No messages received for {inactivity.TotalSeconds} seconds, but token is still valid. Device might be offline.");
// Optionally, mark the device as offline:
PulsoidDeviceOnline = false;
return;
}
}

// Existing logic to determine device status based on heart rate changes
bool shouldBeOnline = HeartRateFromSocket > 0;

if (shouldBeOnline)
Expand Down
8 changes: 4 additions & 4 deletions vrcosc-magicchatbox/MagicChatbox.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -192,14 +192,14 @@
<ItemGroup>
<PackageReference Include="CommunityToolkit.Mvvm" Version="8.4.0" />
<PackageReference Include="Dubya.WindowsMediaController" Version="2.5.5" />
<PackageReference Include="LibreHardwareMonitorLib" Version="0.9.5-pre370" />
<PackageReference Include="LibreHardwareMonitorLib" Version="0.9.5-pre387" />
<PackageReference Include="Microsoft.Xaml.Behaviors.Wpf" Version="1.1.135" />
<PackageReference Include="NAudio" Version="2.2.1" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
<PackageReference Include="NHotkey.Wpf" Version="3.0.0" />
<PackageReference Include="NLog" Version="5.3.4" />
<PackageReference Include="OpenAI-DotNet" Version="8.4.1" />
<PackageReference Include="System.Management" Version="9.0.0" />
<PackageReference Include="NLog" Version="5.4.0" />
<PackageReference Include="OpenAI-DotNet" Version="8.5.4" />
<PackageReference Include="System.Management" Version="9.0.1" />
<PackageReference Include="VRChat.OSCQuery" Version="0.0.7" />
</ItemGroup>

Expand Down
Loading

0 comments on commit 2866c9c

Please sign in to comment.