From 5d2b3276ebf6b98114eb73a5bd315e59f0f08c0a Mon Sep 17 00:00:00 2001 From: Jordi Boggiano Date: Fri, 16 Nov 2018 15:38:01 +0100 Subject: [PATCH] Avoid starting all jobs immediately --- src/Composer/Util/Http/CurlDownloader.php | 36 +++---- src/Composer/Util/HttpDownloader.php | 110 +++++++++++++++------- 2 files changed, 92 insertions(+), 54 deletions(-) diff --git a/src/Composer/Util/Http/CurlDownloader.php b/src/Composer/Util/Http/CurlDownloader.php index 83f07c44a..4015b0bac 100644 --- a/src/Composer/Util/Http/CurlDownloader.php +++ b/src/Composer/Util/Http/CurlDownloader.php @@ -95,12 +95,17 @@ class CurlDownloader public function download($resolve, $reject, $origin, $url, $options, $copyTo = null) { - return $this->initDownload($resolve, $reject, $origin, $url, $options, $copyTo); + $attributes = array(); + if (isset($options['retry-auth-failure'])) { + $attributes['retryAuthFailure'] = $options['retry-auth-failure']; + unset($options['retry-auth-failure']); + } + + return $this->initDownload($resolve, $reject, $origin, $url, $options, $copyTo, $attributes); } private function initDownload($resolve, $reject, $origin, $url, $options, $copyTo = null, array $attributes = array()) { - // TODO allow setting attributes somehow $attributes = array_merge(array( 'retryAuthFailure' => true, 'redirects' => 1, @@ -193,12 +198,12 @@ class CurlDownloader $this->io->writeError('Downloading ' . $url . $usingProxy . $ifModified, true, IOInterface::DEBUG); $this->checkCurlResult(curl_multi_add_handle($this->multiHandle, $curlHandle)); +// TODO progress //$params['notification'](STREAM_NOTIFY_RESOLVE, STREAM_NOTIFY_SEVERITY_INFO, '', 0, 0, 0, false); } public function tick() { - // TODO check we have active handles before doing this if (!$this->jobs) { return; } @@ -229,6 +234,7 @@ class CurlDownloader $statusCode = null; $response = null; try { +// TODO progress //$this->onProgress($curlHandle, $job['callback'], $progress, $job['progress']); if (CURLE_OK !== $errno) { throw new TransportException($error); @@ -263,7 +269,6 @@ class CurlDownloader // handle 3xx redirects, 304 Not Modified is excluded if ($statusCode >= 300 && $statusCode <= 399 && $statusCode !== 304 && $job['redirects'] < $this->maxRedirects) { - // TODO $location = $this->handleRedirect($job, $response); if ($location) { $this->restartJob($job, $location, array('redirects' => $job['attributes']['redirects'] + 1)); @@ -274,6 +279,7 @@ class CurlDownloader // fail 4xx and 5xx responses and capture the response if ($statusCode >= 400 && $statusCode <= 599) { throw $this->failResponse($job, $response, $response->getStatusMessage()); +// TODO progress // $this->io->overwriteError("Downloading (failed)", false); } @@ -320,24 +326,13 @@ class CurlDownloader if ($this->jobs[$i]['progress'] !== $progress) { $previousProgress = $this->jobs[$i]['progress']; $this->jobs[$i]['progress'] = $progress; - try { - //$this->onProgress($curlHandle, $this->jobs[$i]['callback'], $progress, $previousProgress); - } catch (TransportException $e) { - var_dump('Caught '.$e->getMessage());die; - unset($this->jobs[$i]); - curl_multi_remove_handle($this->multiHandle, $curlHandle); - curl_close($curlHandle); - fclose($job['headerHandle']); - fclose($job['bodyHandle']); - if ($job['filename']) { - @unlink($job['filename'].'~'); - } - call_user_func($job['reject'], $e); - } + // TODO + //$this->onProgress($curlHandle, $this->jobs[$i]['callback'], $progress, $previousProgress); } } } catch (\Exception $e) { + // TODO var_dump('Caught2', get_class($e), $e->getMessage(), $e);die; } } @@ -444,13 +439,10 @@ class CurlDownloader private function onProgress($curlHandle, callable $notify, array $progress, array $previousProgress) { + // TODO add support for progress if (300 <= $progress['http_code'] && $progress['http_code'] < 400) { return; } - if (!$previousProgress['http_code'] && $progress['http_code'] && $progress['http_code'] < 200 || 400 <= $progress['http_code']) { - $code = 403 === $progress['http_code'] ? STREAM_NOTIFY_AUTH_RESULT : STREAM_NOTIFY_FAILURE; - $notify($code, STREAM_NOTIFY_SEVERITY_ERR, curl_error($curlHandle), $progress['http_code'], 0, 0, false); - } if ($previousProgress['download_content_length'] < $progress['download_content_length']) { $notify(STREAM_NOTIFY_FILE_SIZE_IS, STREAM_NOTIFY_SEVERITY_INFO, '', 0, 0, (int) $progress['download_content_length'], false); } diff --git a/src/Composer/Util/HttpDownloader.php b/src/Composer/Util/HttpDownloader.php index b259ef3e5..0e882c4a3 100644 --- a/src/Composer/Util/HttpDownloader.php +++ b/src/Composer/Util/HttpDownloader.php @@ -33,8 +33,8 @@ class HttpDownloader private $config; private $jobs = array(); private $options = array(); - private $index; - private $progress; + private $runningJobs = 0; + private $maxJobs = 10; private $lastProgress; private $disableTls = false; private $curl; @@ -42,8 +42,6 @@ class HttpDownloader private $idGen = 0; /** - * Constructor. - * * @param IOInterface $io The IO instance * @param Config $config The config * @param array $options The options @@ -131,35 +129,24 @@ class HttpDownloader 'status' => self::STATUS_QUEUED, 'request' => $request, 'sync' => $sync, + 'origin' => Url::getOrigin($this->config, $request['url']), ); - $origin = Url::getOrigin($this->config, $job['request']['url']); - // capture username/password from URL if there is one if (preg_match('{^https?://([^:/]+):([^@/]+)@([^/]+)}i', $request['url'], $match)) { - $this->io->setAuthentication($origin, rawurldecode($match[1]), rawurldecode($match[2])); + $this->io->setAuthentication($job['origin'], rawurldecode($match[1]), rawurldecode($match[2])); } - $curl = $this->curl; $rfs = $this->rfs; - $io = $this->io; - if ($curl && preg_match('{^https?://}i', $job['request']['url'])) { - $resolver = function ($resolve, $reject) use (&$job, $curl, $origin) { - // start job - $url = $job['request']['url']; - $options = $job['request']['options']; - - $job['status'] = HttpDownloader::STATUS_STARTED; - - if ($job['request']['copyTo']) { - $curl->download($resolve, $reject, $origin, $url, $options, $job['request']['copyTo']); - } else { - $curl->download($resolve, $reject, $origin, $url, $options); - } + if ($this->curl && preg_match('{^https?://}i', $job['request']['url'])) { + $resolver = function ($resolve, $reject) use (&$job) { + $job['status'] = HttpDownloader::STATUS_QUEUED; + $job['resolve'] = $resolve; + $job['reject'] = $reject; }; } else { - $resolver = function ($resolve, $reject) use (&$job, $rfs, $curl, $origin) { + $resolver = function ($resolve, $reject) use (&$job, $rfs) { // start job $url = $job['request']['url']; $options = $job['request']['options']; @@ -167,11 +154,11 @@ class HttpDownloader $job['status'] = HttpDownloader::STATUS_STARTED; if ($job['request']['copyTo']) { - $result = $rfs->copy($origin, $url, $job['request']['copyTo'], false /* TODO progress */, $options); + $result = $rfs->copy($job['origin'], $url, $job['request']['copyTo'], false /* TODO progress */, $options); $resolve($result); } else { - $body = $rfs->getContents($origin, $url, false /* TODO progress */, $options); + $body = $rfs->getContents($job['origin'], $url, false /* TODO progress */, $options); $headers = $rfs->getLastHeaders(); $response = new Http\Response($job['request'], $rfs->findStatusCode($headers), $headers, $body); @@ -180,26 +167,85 @@ class HttpDownloader }; } + $downloader = $this; + $io = $this->io; + $canceler = function () {}; $promise = new Promise($resolver, $canceler); - $promise->then(function ($response) use (&$job) { + $promise->then(function ($response) use (&$job, $downloader) { $job['status'] = HttpDownloader::STATUS_COMPLETED; $job['response'] = $response; - // TODO look for more jobs to start once we throttle to max X jobs - }, function ($e) use ($io, &$job) { - // var_dump(__CLASS__ . __LINE__); - // var_dump(get_class($e)); - // var_dump($e->getMessage()); - // die; + + // TODO 3.0 this should be done directly on $this when PHP 5.3 is dropped + $downloader->markJobDone(); + $downloader->scheduleNextJob(); + + return $response; + }, function ($e) use ($io, &$job, $downloader) { $job['status'] = HttpDownloader::STATUS_FAILED; $job['exception'] = $e; + + $downloader->markJobDone(); + + throw $e; }); $this->jobs[$job['id']] =& $job; + if ($this->runningJobs < $this->maxJobs) { + $this->startJob($job['id']); + } + return array($job, $promise); } + private function startJob($id) + { + $job =& $this->jobs[$id]; + if ($job['status'] !== self::STATUS_QUEUED) { + return; + } + + // start job + $job['status'] = self::STATUS_STARTED; + $this->runningJobs++; + + $resolve = $job['resolve']; + $reject = $job['reject']; + $url = $job['request']['url']; + $options = $job['request']['options']; + $origin = $job['origin']; + + if ($job['request']['copyTo']) { + $this->curl->download($resolve, $reject, $origin, $url, $options, $job['request']['copyTo']); + } else { + $this->curl->download($resolve, $reject, $origin, $url, $options); + } + } + + /** + * @private + */ + public function markJobDone() + { + $this->runningJobs--; + } + + /** + * @private + */ + public function scheduleNextJob() + { + foreach ($this->jobs as $job) { + if ($job['status'] === self::STATUS_QUEUED) { + $this->startJob($job['id']); + if ($this->runningJobs >= $this->maxJobs) { + return; + } + } + } + } + public function wait($index = null, $progress = false) { while (true) {