Skip to content

Commit

Permalink
Implement client-side cancellation
Browse files Browse the repository at this point in the history
  • Loading branch information
danog committed Jun 2, 2023
1 parent a4e7a90 commit ac2fe4b
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 9 deletions.
2 changes: 1 addition & 1 deletion src/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ private function methodAbstractions(string &$method, array &$arguments): ?Deferr
}
} elseif ($method === 'messages.sendMultiMedia') {
foreach ($arguments['multi_media'] as &$singleMedia) {
if (is_string($singleMedia['media'])
if (\is_string($singleMedia['media'])
|| $singleMedia['media']['_'] === 'inputMediaUploadedPhoto'
|| $singleMedia['media']['_'] === 'inputMediaUploadedDocument'
) {
Expand Down
10 changes: 8 additions & 2 deletions src/Loop/Connection/CheckLoop.php
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,14 @@ protected function loop(): ?float
case 4:
if ($chr & 32) {
if ($message->getSent() + $this->resendTimeout < \time()) {
$this->logger->logger("Message $message received by server and is being processed for way too long, resending request...", Logger::ERROR);
$this->connection->methodRecall(['message_id' => $message_id, 'postpone' => true]);
if ($message->isCancellationRequested()) {
unset($this->connection->new_outgoing[$message_id]);
unset($this->connection->outgoing_messages[$message_id]);
$this->logger->logger("Cancelling $message...", Logger::ERROR);
} else {
$this->logger->logger("Message $message received by server and is being processed for way too long, resending request...", Logger::ERROR);
$this->connection->methodRecall(['message_id' => $message_id, 'postpone' => true]);
}
} else {
$this->logger->logger("Message $message received by server and is being processed, waiting...", Logger::ERROR);
}
Expand Down
15 changes: 14 additions & 1 deletion src/MTProto/OutgoingMessage.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

namespace danog\MadelineProto\MTProto;

use Amp\Cancellation;
use Amp\CancelledException;
use Amp\DeferredFuture;
use Amp\Future;
use danog\MadelineProto\Exception;
Expand Down Expand Up @@ -139,6 +141,8 @@ class OutgoingMessage extends Message
*/
private int $tries = 0;

private ?Cancellation $cancellation = null;

/**
* Create outgoing message.
*
Expand All @@ -149,7 +153,7 @@ class OutgoingMessage extends Message
* @param boolean $unencrypted Is this an unencrypted message?
* @param null|DeferredFuture $deferred Response deferred
*/
public function __construct(array|callable $body, string $constructor, string $type, bool $method, bool $unencrypted, ?DeferredFuture $deferred = null)
public function __construct(array|callable $body, string $constructor, string $type, bool $method, bool $unencrypted, ?DeferredFuture $deferred = null, ?Cancellation $cancellation = null)
{
$this->body = $body;
$this->constructor = $constructor;
Expand All @@ -161,6 +165,15 @@ public function __construct(array|callable $body, string $constructor, string $t
}

$this->contentRelated = !isset(Message::NOT_CONTENT_RELATED[$constructor]);
$this->cancellation = $cancellation;
$cancellation?->subscribe(fn (CancelledException $e) => $this->reply(fn () => throw $e));
}

/**
* Whether cancellation is requested.
*/
public function isCancellationRequested(): bool {
return $this->cancellation?->isRequested() ?? false;
}

/**
Expand Down
3 changes: 2 additions & 1 deletion src/MTProtoSession/CallHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ public function methodCallAsyncWrite(string $method, array|callable $args = [],
$methodInfo['type'],
true,
!$this->shared->hasTempAuthKey() && \strpos($method, '.') === false && $method !== 'ping_delay_disconnect',
$response
$response,
$aargs['cancellation'] ?? null
);
if (isset($aargs['queue'])) {
$message->setQueueId($aargs['queue']);
Expand Down
22 changes: 18 additions & 4 deletions src/Wrappers/DialogHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,18 @@

namespace danog\MadelineProto\Wrappers;

use Amp\CancelledException;
use Amp\Sync\LocalMutex;
use Amp\TimeoutCancellation;
use Amp\TimeoutException;
use danog\MadelineProto\Exception;
use danog\MadelineProto\MTProto;
use danog\MadelineProto\Settings;
use Throwable;

use function Amp\async;
use function Amp\delay;

/**
* Dialog handler.
*
Expand Down Expand Up @@ -76,10 +82,18 @@ private function cacheAllBotUsers(): void
$state['pts_total_limit'] = 2147483647;
while ($bottom <= $top) {
$state['pts'] = ($bottom+$top)>>1;
$result = $this->methodCallAsyncRead(
'updates.getDifference',
$state
)['_'];
while (1) {
try {
$result = $this->methodCallAsyncRead(
'updates.getDifference',
$state
)['_'];
break;
} catch (Throwable $e) {
$this->logger->logger("Got {$e->getMessage()} while getting difference, retrying...");
delay(1.0);
}
}
$this->logger("$bottom, {$state['pts']}, $top");
$this->logger($result);
if ($result === 'updates.differenceTooLong') {
Expand Down

0 comments on commit ac2fe4b

Please sign in to comment.