Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add unsubscribe feature for client #196

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
16 changes: 13 additions & 3 deletions src/Thruway/ClientSession.php
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,18 @@ public function subscribe($topicName, callable $callback, $options = null)
{
return $this->peer->getSubscriber()->subscribe($this, $topicName, $callback, $options);
}


/**
* Unsubscribe
*
* @param string $subscriptionId
* @return Promise
*/
public function unsubscribe($subscriptionId)
{
return $this->peer->getSubscriber()->unsubscribe($this, $subscriptionId);
}

/**
* Publish
*
Expand Down Expand Up @@ -129,7 +140,6 @@ public function close()
*/
public function onClose()
{

$this->state = static::STATE_DOWN;
}

}
78 changes: 73 additions & 5 deletions src/Thruway/Role/Subscriber.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use Thruway\Message\SubscribedMessage;
use Thruway\Message\SubscribeMessage;
use Thruway\Message\UnsubscribedMessage;
use Thruway\Message\UnsubscribeMessage;

/**
* Class Subscriber
Expand All @@ -28,14 +29,19 @@ class Subscriber extends AbstractRole
* @var array
*/
private $subscriptions;

/**
* @var array
*/
private $unsubscriptionsPromises;

/**
* Constructor
*/
public function __construct()
{

$this->subscriptions = [];
$this->unsubscriptionsPromises = [];
}

/**
Expand Down Expand Up @@ -86,7 +92,7 @@ protected function processError(AbstractSession $session, ErrorMessage $msg)
$this->processSubscribeError($session, $msg);
break;
case Message::MSG_UNSUBSCRIBE:
// TODO
$this->processUnsubscribeError($session, $msg);
break;
default:
Logger::critical($this, "Unhandled error");
Expand Down Expand Up @@ -128,6 +134,32 @@ protected function processSubscribed(ClientSession $session, SubscribedMessage $
}
}
}

/**
* Process unsubscribe error
*
* @param \Thruway\AbstractSession $session
* @param \Thruway\Message\ErrorMessage $msg
*/
protected function processUnsubscribeError(AbstractSession $session, ErrorMessage $msg)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method should be called processUnsubscribe

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is called whenever the router responds with an ERROR message. Why should it be called processUnsubscribe? It follows the same logic as processSubscribeError.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

processUnsubscribeError is correct. Only the broker would handle the UNSUBSCRIBE message.

{
foreach ($this->subscriptions as $key => $subscription) {
if ($subscription["unsubscribed_request_id"] === $msg->getErrorRequestId()) {
// reject the promise
$subscription['unsubscribed_deferred']->reject($msg);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this deferred used anywhere? I see pieces of this idea in the code but I don't think it is used anywhere - we may be able to get rid of it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This deferred is the one that wraps the promise returned by the unsubscribe method, so yes, it is used (and is kind of important.) Or I didn't understand what you meant?

return;
}
}

// Execution continues up here in case the original unsubscribe request has not been found
foreach ($this->unsubscriptionsPromises as $key => $unsubscriptionPromise) {
if ($unsubscriptionPromise["request_id"] === $msg->getErrorRequestId()) {
$unsubscriptionPromise["deferred"]->reject($msg);
unset($this->unsubscriptionPromises[$key]);
return;
}
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the spec say anything about what the client should do if there is an error unsubscribing?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


/**
* process unsubscribed
Expand Down Expand Up @@ -219,13 +251,49 @@ public function subscribe(ClientSession $session, $topicName, callable $callback
"options" => $options,
"deferred" => $deferred
];

array_push($this->subscriptions, $subscription);

$subscribeMsg = new SubscribeMessage($requestId, $options, $topicName);
$session->sendMessage($subscribeMsg);

return $deferred->promise();
}


/**
* process unsubscribe
* @param ClientSession $session
* @param string $subscriptionId
* @return boolean
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do see that this actually returns promise - the docblock just needs updated

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, this has been fixed

*/
public function unsubscribe(ClientSession $session, $subscriptionId)
{
$requestId = Utils::getUniqueId();
$subscriptionExists = false;
$deferred = new Deferred();

foreach ($this->subscriptions as $i => $subscription) {
if ($subscription["subscription_id"] == $subscriptionId) {
$subscriptionExists = true;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should noop the subscription callback at this point so that the callback won't receive messages once it has asked to be unsubscribed - even if the router decides to keep sending them.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think is the best way to achieve this? We could set the subscription["callback"] to null, requiring us to update the processEvent method to handle this situation. Or maybe delete the subscription from the subscriptions list, but we have to deal with the promise that must be resolved or rejected when the router responds.

$this->subscriptions[$i]["unsubscribed_request_id"] = $requestId;
$this->subscriptions[$i]["unsubscribed_deferred"] = $deferred;
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible that more than one subscription to the same uri may exist. Difference routers handle this differently. With the Thruway router, the subscription is going to be a new subscription with a new id - giving two distinct subscriptions with an identical uri. I can't remember exactly what crossbar will do, I believe it will either send a SUBSCRIBED with the same subscription id or will send an error back.

This could be further compounded by the fact that subscriptions with the same uri may actually be different because of other reasons (pattern matching comes to mind).

It would be better to handle unsubscribe by using the subscription_id which can be obtained when the promise returned by subscribe resolves.

Copy link
Author

@vyk12 vyk12 May 24, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we take a look at Autobahn|JS client, the subscription method gives an object representing the subscription to the callback called when the promise is resolved. When unsubscribing, this object must be given to the unsubscribe method (the topic name is not given.) Maybe it is done this way to handle the situation you just described (the object Autobahn creates must contain the subscription id or something.) Maybe we should do something similar?

Here is the implementation I propose.
When the server responds, two cases can happen. First: the subscription succeeded. In that case, the promise is resolved and a Subscription object is given to the callback. This Subscription object contains a subscription_id attribute that is filled with the correct subscription ID given by the server. The callback must then do whatever it wants to handle this object, but must keep it somehow to give it later to the ClientSession::unsubscribe method. This method would be rewritten to handle this new behavior and not work with a topic name as it is now.
In the second case, if the subscription failed, then the promise is rejected as usual.

What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While it would be good to keep the pattern that Autobahn|JS uses (which would have subscribe return a promise that resolves to a Subscription object), This would be making a change to the API. Right now the promise resolves with the SubscribedMessage.

If unsubscribe were implemented to take the subscription_id, the SubscribedMessage would have enough information.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the code so that the unsubscribe method takes the subscriptionId instead of the topicName. As you said, the client can easily unsubscribe by retrieving the subscriptionId of the SubsribedMessage it has been given when the promise is resolved.


// In case the client never subscribed to this topic before
if ($subscriptionExists === false) {
$unsubscriptionPromise = [
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If false here - we should just return a rejected promise.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. This way, the unsubscriptionPromises attribute I created for this class became useless, so I got rid of it.

"request_id" => $requestId,
"deferred" => $deferred
];

array_push($this->unsubscriptionsPromises, $unsubscriptionPromise);
}

$unsubscribeMessage = new UnsubscribeMessage($requestId, $subscriptionId);
$session->sendMessage($unsubscribeMessage);

return $deferred->promise();
}
}
2 changes: 2 additions & 0 deletions src/Thruway/Subscription/SubscriptionGroup.php
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,8 @@ public function processUnsubscribe(Session $session, UnsubscribeMessage $msg)
}

$this->removeSubscription($subscription);

Logger::debug($this, "Removed subscription to \"" . $this->getmatchType() . "\":\"" . $this->getUri() . "\"");

$session->sendMessage(new UnsubscribedMessage($msg->getRequestId()));
return $subscription;
Expand Down