Skip to content

Commit

Permalink
Carael/add delete option (#41)
Browse files Browse the repository at this point in the history
* Add delete option for peeked messages
  • Loading branch information
Carael authored Feb 13, 2024
1 parent 3e1af2d commit b8c82cd
Show file tree
Hide file tree
Showing 7 changed files with 427 additions and 194 deletions.
14 changes: 11 additions & 3 deletions src/Core/ServiceBus.Contracts/IMessageService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,28 @@ Task<Result> SendMessagesAsync(
string queueOrTopicName,
IReadOnlyList<SendMessage> messages,
CancellationToken cancellationToken);

IAsyncEnumerable<PurgeResult> PurgeAsync(
string connectionName,
string topicOrQueueName,
string? subscriptionName,
SubQueue subQueue,
long totalCount,
CancellationToken cancellationToken);

IAsyncEnumerable<ResendResult> ResendAsync(string connectionName,
string topicOrQueueName,
string? subscriptionName,
SubQueue subQueue,
string destinationTopicOrQueueName,
long totalCount,
CancellationToken cancellationToken);
}

Task<Result> DeleteMessage(
string connectionName,
string queueOrTopicName,
string? subscriptionName,
SubQueue subQueue,
long sequenceNumber,
CancellationToken cancellationToken);
}
45 changes: 42 additions & 3 deletions src/Core/ServiceBus/MessageService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,46 @@ await SendMessagesInternalAsync(
yield return new ResendResult(totalResend);
}

public async Task<Result> DeleteMessage(
string connectionName,
string queueOrTopicName,
string? subscriptionName,
SubQueue subQueue,
long sequenceNumber,
CancellationToken cancellationToken)
{
var connection =
await _connectionManagement.GetAsync(connectionName, cancellationToken);

await using var client = new ServiceBusClient(connection.ConnectionString);

await using ServiceBusReceiver receiver =
GetReceiver(client, queueOrTopicName, subscriptionName, subQueue, ReceiveMode.PeekLock);

var messageCompleted = false;

try
{
await foreach (var message in receiver.ReceiveMessagesAsync(cancellationToken))
{
if (message.SequenceNumber == sequenceNumber)
{
await receiver.CompleteMessageAsync(message, cancellationToken);

messageCompleted = true;
break;
}

await receiver.AbandonMessageAsync(message, null, cancellationToken);
}
}
catch (TaskCanceledException)
{
}

return new Result(messageCompleted ? 1 : 0);
}

private ServiceBusReceiver GetReceiver(
ServiceBusClient client,
string queueOrTopicName,
Expand Down Expand Up @@ -198,10 +238,9 @@ private ServiceBusReceiver GetReceiver(
fromSequenceNumber,
cancellationToken);
}

return await receiver.ReceiveMessagesAsync(
maxMessages ?? maxReceiverMessagesCount,
null,
cancellationToken: cancellationToken);
}

Expand Down Expand Up @@ -262,4 +301,4 @@ private async Task<Result> SendMessagesInternalAsync(
messageBatch?.Dispose();
}
}
}
}
2 changes: 1 addition & 1 deletion src/Ui/Website.Host/electron.manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"appId": "com.crossbusexplorer.app",
"productName": "Cross Bus Explorer",
"copyright": "Copyright © 2024",
"buildVersion": "0.4.0",
"buildVersion": "0.4.1",
"compression": "maximum",
"directories": {
"output": "../../../bin/Desktop"
Expand Down
113 changes: 113 additions & 0 deletions src/Ui/Website/Jobs/DeleteMessageJob.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
using System;
using System.ComponentModel;
using System.Threading;
using System.Threading.Tasks;
using CrossBusExplorer.ServiceBus.Contracts;
using CrossBusExplorer.ServiceBus.Contracts.Types;
using CrossBusExplorer.Website.Extensions;
using MudBlazor;
namespace CrossBusExplorer.Website.Jobs;

public class DeleteMessageJob : IJob
{
private readonly string _connectionName;
private readonly string _queueOrTopicName;
private readonly string? _subscriptionName;
private readonly SubQueue _subQueue;
private readonly long _sequenceNumber;
private readonly IMessageService _messageService;
private readonly ISnackbar _snackbar;
private readonly CancellationTokenSource _cancellationTokenSource;

public event PropertyChangedEventHandler? PropertyChanged;

public event JobCompletedEventHandler? OnCompleted;

public DeleteMessageJob(

Check warning on line 26 in src/Ui/Website/Jobs/DeleteMessageJob.cs

View workflow job for this annotation

GitHub Actions / publish-linux-x64

Non-nullable field '_snackbar' must contain a non-null value when exiting constructor. Consider declaring the field as nullable.

Check warning on line 26 in src/Ui/Website/Jobs/DeleteMessageJob.cs

View workflow job for this annotation

GitHub Actions / publish-macos-x64

Non-nullable field '_snackbar' must contain a non-null value when exiting constructor. Consider declaring the field as nullable.
string connectionName,
string queueOrTopicName,
string? subscriptionName,
SubQueue subQueue,
long sequenceNumber,
IMessageService messageService)
{
_connectionName = connectionName;
_queueOrTopicName = queueOrTopicName;
_subscriptionName = subscriptionName;
_subQueue = subQueue;
_sequenceNumber = sequenceNumber;

_messageService = messageService;
_cancellationTokenSource = new CancellationTokenSource();
}

private int _progress;
public int Progress
{
get => _progress;
private set
{
_progress = value;
this.Notify(PropertyChanged);

Check warning on line 51 in src/Ui/Website/Jobs/DeleteMessageJob.cs

View workflow job for this annotation

GitHub Actions / publish-linux-x64

Possible null reference argument for parameter 'handler' in 'void INotifyPropertyChangedExtensions.Notify(INotifyPropertyChanged sender, PropertyChangedEventHandler handler, string propertyName = "")'.

Check warning on line 51 in src/Ui/Website/Jobs/DeleteMessageJob.cs

View workflow job for this annotation

GitHub Actions / publish-macos-x64

Possible null reference argument for parameter 'handler' in 'void INotifyPropertyChangedExtensions.Notify(INotifyPropertyChanged sender, PropertyChangedEventHandler handler, string propertyName = "")'.
}
}

public string Name =>
$"Delete message {_sequenceNumber} from {_queueOrTopicName} {_subscriptionName}".Trim();

public bool ViewDetails { get; set; }

private JobStatus _status;
public JobStatus Status
{
get => _status;
private set
{
_status = value;
this.Notify(PropertyChanged);
}
}
public string? ErrorMessage { get; private set; }
public string? WarningMessage { get; private set; }

public async Task ExecuteAsync()
{
Status = JobStatus.Running;

try
{
Result result = await _messageService.DeleteMessage(
_connectionName,
_queueOrTopicName,
_subscriptionName,
_subQueue,
_sequenceNumber,
_cancellationTokenSource.Token);

Progress = JobsHelper.GetProgress(1, result.Count);

Status = JobStatus.Succeeded;

if (result.Count == 0)
{
WarningMessage = "Message was not not deleted";
}
}
catch (TaskCanceledException)
{
Status = JobStatus.Cancelled;
}
catch (Exception ex)
{
ErrorMessage = $"Job {Name} failed. Error: {ex.Message}.";
Status = JobStatus.Failed;
}

await OnCompleted(_connectionName, _queueOrTopicName, _subscriptionName);
}

public void Cancel()
{
_cancellationTokenSource.Cancel();
}
}
Loading

0 comments on commit b8c82cd

Please sign in to comment.