Add support for aborting running promises cleanly
parent
0dad963cd8
commit
8f6e82f562
|
@ -184,6 +184,8 @@ class InstallationManager
|
||||||
$runCleanup = function () use (&$cleanupPromises, $loop) {
|
$runCleanup = function () use (&$cleanupPromises, $loop) {
|
||||||
$promises = array();
|
$promises = array();
|
||||||
|
|
||||||
|
$loop->abortJobs();
|
||||||
|
|
||||||
foreach ($cleanupPromises as $cleanup) {
|
foreach ($cleanupPromises as $cleanup) {
|
||||||
$promises[] = new \React\Promise\Promise(function ($resolve, $reject) use ($cleanup) {
|
$promises[] = new \React\Promise\Promise(function ($resolve, $reject) use ($cleanup) {
|
||||||
$promise = $cleanup();
|
$promise = $cleanup();
|
||||||
|
|
|
@ -23,6 +23,7 @@ use Composer\Util\HttpDownloader;
|
||||||
use React\Promise\Promise;
|
use React\Promise\Promise;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* @internal
|
||||||
* @author Jordi Boggiano <j.boggiano@seld.be>
|
* @author Jordi Boggiano <j.boggiano@seld.be>
|
||||||
* @author Nicolas Grekas <p@tchwork.com>
|
* @author Nicolas Grekas <p@tchwork.com>
|
||||||
*/
|
*/
|
||||||
|
@ -90,6 +91,9 @@ class CurlDownloader
|
||||||
$this->authHelper = new AuthHelper($io, $config);
|
$this->authHelper = new AuthHelper($io, $config);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return int internal job id
|
||||||
|
*/
|
||||||
public function download($resolve, $reject, $origin, $url, $options, $copyTo = null)
|
public function download($resolve, $reject, $origin, $url, $options, $copyTo = null)
|
||||||
{
|
{
|
||||||
$attributes = array();
|
$attributes = array();
|
||||||
|
@ -101,6 +105,9 @@ class CurlDownloader
|
||||||
return $this->initDownload($resolve, $reject, $origin, $url, $options, $copyTo, $attributes);
|
return $this->initDownload($resolve, $reject, $origin, $url, $options, $copyTo, $attributes);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return int internal job id
|
||||||
|
*/
|
||||||
private function initDownload($resolve, $reject, $origin, $url, $options, $copyTo = null, array $attributes = array())
|
private function initDownload($resolve, $reject, $origin, $url, $options, $copyTo = null, array $attributes = array())
|
||||||
{
|
{
|
||||||
$attributes = array_merge(array(
|
$attributes = array_merge(array(
|
||||||
|
@ -199,8 +206,29 @@ class CurlDownloader
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->checkCurlResult(curl_multi_add_handle($this->multiHandle, $curlHandle));
|
$this->checkCurlResult(curl_multi_add_handle($this->multiHandle, $curlHandle));
|
||||||
// TODO progress
|
// TODO progress
|
||||||
//$params['notification'](STREAM_NOTIFY_RESOLVE, STREAM_NOTIFY_SEVERITY_INFO, '', 0, 0, 0, false);
|
//$params['notification'](STREAM_NOTIFY_RESOLVE, STREAM_NOTIFY_SEVERITY_INFO, '', 0, 0, 0, false);
|
||||||
|
|
||||||
|
return (int) $curlHandle;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function abortRequest($id)
|
||||||
|
{
|
||||||
|
if (isset($this->jobs[$id]) && isset($this->jobs[$id]['handle'])) {
|
||||||
|
$job = $this->jobs[$id];
|
||||||
|
curl_multi_remove_handle($this->multiHandle, $job['handle']);
|
||||||
|
curl_close($job['handle']);
|
||||||
|
if (is_resource($job['headerHandle'])) {
|
||||||
|
fclose($job['headerHandle']);
|
||||||
|
}
|
||||||
|
if (is_resource($job['bodyHandle'])) {
|
||||||
|
fclose($job['bodyHandle']);
|
||||||
|
}
|
||||||
|
if ($job['filename']) {
|
||||||
|
@unlink($job['filename'].'~');
|
||||||
|
}
|
||||||
|
unset($this->jobs[$id]);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public function tick()
|
public function tick()
|
||||||
|
@ -235,7 +263,7 @@ class CurlDownloader
|
||||||
$statusCode = null;
|
$statusCode = null;
|
||||||
$response = null;
|
$response = null;
|
||||||
try {
|
try {
|
||||||
// TODO progress
|
// TODO progress
|
||||||
//$this->onProgress($curlHandle, $job['callback'], $progress, $job['progress']);
|
//$this->onProgress($curlHandle, $job['callback'], $progress, $job['progress']);
|
||||||
if (CURLE_OK !== $errno || $error) {
|
if (CURLE_OK !== $errno || $error) {
|
||||||
throw new TransportException($error);
|
throw new TransportException($error);
|
||||||
|
@ -285,8 +313,6 @@ class CurlDownloader
|
||||||
// fail 4xx and 5xx responses and capture the response
|
// fail 4xx and 5xx responses and capture the response
|
||||||
if ($statusCode >= 400 && $statusCode <= 599) {
|
if ($statusCode >= 400 && $statusCode <= 599) {
|
||||||
throw $this->failResponse($job, $response, $response->getStatusMessage());
|
throw $this->failResponse($job, $response, $response->getStatusMessage());
|
||||||
// TODO progress
|
|
||||||
// $this->io->overwriteError("Downloading (<error>failed</error>)", false);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if ($job['attributes']['storeAuth']) {
|
if ($job['attributes']['storeAuth']) {
|
||||||
|
|
|
@ -31,6 +31,7 @@ class HttpDownloader
|
||||||
const STATUS_STARTED = 2;
|
const STATUS_STARTED = 2;
|
||||||
const STATUS_COMPLETED = 3;
|
const STATUS_COMPLETED = 3;
|
||||||
const STATUS_FAILED = 4;
|
const STATUS_FAILED = 4;
|
||||||
|
const STATUS_ABORTED = 5;
|
||||||
|
|
||||||
private $io;
|
private $io;
|
||||||
private $config;
|
private $config;
|
||||||
|
@ -184,8 +185,20 @@ class HttpDownloader
|
||||||
|
|
||||||
$downloader = $this;
|
$downloader = $this;
|
||||||
$io = $this->io;
|
$io = $this->io;
|
||||||
|
$curl = $this->curl;
|
||||||
|
|
||||||
$canceler = function () {};
|
$canceler = function () use (&$job, $curl) {
|
||||||
|
if ($job['status'] === self::STATUS_QUEUED) {
|
||||||
|
$job['status'] = self::STATUS_ABORTED;
|
||||||
|
}
|
||||||
|
if ($job['status'] !== self::STATUS_STARTED) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
$job['status'] = self::STATUS_ABORTED;
|
||||||
|
if (isset($job['curl_id'])) {
|
||||||
|
$curl->abortRequest($job['curl_id']);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
$promise = new Promise($resolver, $canceler);
|
$promise = new Promise($resolver, $canceler);
|
||||||
$promise->then(function ($response) use (&$job, $downloader) {
|
$promise->then(function ($response) use (&$job, $downloader) {
|
||||||
|
@ -242,9 +255,9 @@ class HttpDownloader
|
||||||
}
|
}
|
||||||
|
|
||||||
if ($job['request']['copyTo']) {
|
if ($job['request']['copyTo']) {
|
||||||
$this->curl->download($resolve, $reject, $origin, $url, $options, $job['request']['copyTo']);
|
$job['curl_id'] = $this->curl->download($resolve, $reject, $origin, $url, $options, $job['request']['copyTo']);
|
||||||
} else {
|
} else {
|
||||||
$this->curl->download($resolve, $reject, $origin, $url, $options);
|
$job['curl_id'] = $this->curl->download($resolve, $reject, $origin, $url, $options);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -294,14 +307,11 @@ class HttpDownloader
|
||||||
}
|
}
|
||||||
|
|
||||||
if (null !== $index) {
|
if (null !== $index) {
|
||||||
if ($this->jobs[$index]['status'] === self::STATUS_COMPLETED || $this->jobs[$index]['status'] === self::STATUS_FAILED) {
|
return $this->jobs[$index]['status'] < self::STATUS_COMPLETED;
|
||||||
return false;
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
foreach ($this->jobs as $job) {
|
foreach ($this->jobs as $job) {
|
||||||
if (!in_array($job['status'], array(self::STATUS_COMPLETED, self::STATUS_FAILED), true)) {
|
if ($job['status'] < self::STATUS_COMPLETED) {
|
||||||
return true;
|
return true;
|
||||||
} elseif (!$job['sync']) {
|
} elseif (!$job['sync']) {
|
||||||
unset($this->jobs[$job['id']]);
|
unset($this->jobs[$job['id']]);
|
||||||
|
|
|
@ -76,4 +76,13 @@ class Loop
|
||||||
throw $uncaught;
|
throw $uncaught;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function abortJobs()
|
||||||
|
{
|
||||||
|
if ($this->currentPromises) {
|
||||||
|
foreach ($this->currentPromises as $promise) {
|
||||||
|
$promise->cancel();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -281,15 +281,11 @@ class ProcessExecutor
|
||||||
}
|
}
|
||||||
|
|
||||||
if (null !== $index) {
|
if (null !== $index) {
|
||||||
if ($this->jobs[$index]['status'] === self::STATUS_COMPLETED || $this->jobs[$index]['status'] === self::STATUS_FAILED || $this->jobs[$index]['status'] === self::STATUS_ABORTED) {
|
return $this->jobs[$index]['status'] < self::STATUS_COMPLETED;
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
foreach ($this->jobs as $job) {
|
foreach ($this->jobs as $job) {
|
||||||
if (!in_array($job['status'], array(self::STATUS_COMPLETED, self::STATUS_FAILED, self::STATUS_ABORTED), true)) {
|
if ($job['status'] < self::STATUS_COMPLETED) {
|
||||||
return true;
|
return true;
|
||||||
} else {
|
} else {
|
||||||
unset($this->jobs[$job['id']]);
|
unset($this->jobs[$job['id']]);
|
||||||
|
|
|
@ -20,6 +20,7 @@ use Composer\Util\HttpDownloader;
|
||||||
use Composer\Util\Http\Response;
|
use Composer\Util\Http\Response;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* @internal
|
||||||
* @author François Pluchino <francois.pluchino@opendisplay.com>
|
* @author François Pluchino <francois.pluchino@opendisplay.com>
|
||||||
* @author Jordi Boggiano <j.boggiano@seld.be>
|
* @author Jordi Boggiano <j.boggiano@seld.be>
|
||||||
* @author Nils Adermann <naderman@naderman.de>
|
* @author Nils Adermann <naderman@naderman.de>
|
||||||
|
|
Loading…
Reference in New Issue