From 84879c0f57de56860f04631d9e333c6cc8e0e7b3 Mon Sep 17 00:00:00 2001 From: Prajjwal Date: Wed, 22 Nov 2023 17:42:10 +0530 Subject: [PATCH] feat: modifies implementation of save, restore and introduces delete. --- packages/warp-cache/.vscode/launch.json | 21 ++ packages/warp-cache/package-lock.json | 80 +++++ packages/warp-cache/package.json | 1 + packages/warp-cache/src/cache.ts | 72 ++-- .../src/internal/cacheHttpClient.ts | 269 +++++++------- .../warp-cache/src/internal/cacheUtils.ts | 9 + .../warp-cache/src/internal/contracts.d.ts | 36 +- .../warp-cache/src/internal/downloadUtils.ts | 327 +++++------------- packages/warp-cache/src/test.ts | 24 +- 9 files changed, 415 insertions(+), 424 deletions(-) create mode 100644 packages/warp-cache/.vscode/launch.json diff --git a/packages/warp-cache/.vscode/launch.json b/packages/warp-cache/.vscode/launch.json new file mode 100644 index 00000000..f7ac9595 --- /dev/null +++ b/packages/warp-cache/.vscode/launch.json @@ -0,0 +1,21 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "type": "node", + "request": "launch", + "name": "Launch Test.ts", + "skipFiles": [ + "/**" + ], + "program": "${workspaceFolder}/src/test.ts", + "preLaunchTask": "tsc: build - tsconfig.json", + "outFiles": [ + "${workspaceFolder}/lib/**/*.js" + ] + } + ] +} \ No newline at end of file diff --git a/packages/warp-cache/package-lock.json b/packages/warp-cache/package-lock.json index d8f3fe58..a9e84023 100644 --- a/packages/warp-cache/package-lock.json +++ b/packages/warp-cache/package-lock.json @@ -17,6 +17,7 @@ "@azure/abort-controller": "^1.1.0", "@azure/ms-rest-js": "^2.6.0", "@azure/storage-blob": "^12.13.0", + "axios": "^1.6.2", "semver": "^6.3.1", "uuid": "^3.3.3" }, @@ -318,6 +319,29 @@ "resolved": "https://registry.npmjs.org/asynckit/-/asynckit-0.4.0.tgz", "integrity": "sha512-Oei9OH4tRh0YqU3GxhX79dM/mwVgvbZJaSNaRk+bshkj0S5cfHcgYakreBjrHwatXKbz+IoIdYLxrKim2MjW0Q==" }, + "node_modules/axios": { + "version": "1.6.2", + "resolved": "https://registry.npmjs.org/axios/-/axios-1.6.2.tgz", + "integrity": "sha512-7i24Ri4pmDRfJTR7LDBhsOTtcm+9kjX5WiY1X3wIisx6G9So3pfMkEiU7emUBe46oceVImccTEM3k6C5dbVW8A==", + "dependencies": { + "follow-redirects": "^1.15.0", + "form-data": "^4.0.0", + "proxy-from-env": "^1.1.0" + } + }, + "node_modules/axios/node_modules/form-data": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/form-data/-/form-data-4.0.0.tgz", + "integrity": "sha512-ETEklSGi5t0QMZuiXoA/Q6vcnxcLQP5vdugSpuAyi6SVGi2clPPp+xgEhuMaHC+zGgn31Kd235W35f7Hykkaww==", + "dependencies": { + "asynckit": "^0.4.0", + "combined-stream": "^1.0.8", + "mime-types": "^2.1.12" + }, + "engines": { + "node": ">= 6" + } + }, "node_modules/balanced-match": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/balanced-match/-/balanced-match-1.0.2.tgz", @@ -372,6 +396,25 @@ "node": ">=0.8.x" } }, + "node_modules/follow-redirects": { + "version": "1.15.3", + "resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.15.3.tgz", + "integrity": "sha512-1VzOtuEM8pC9SFU1E+8KfTjZyMztRsgEfwQl44z8A25uy13jSzTj6dyK2Df52iV0vgHCfBwLhDWevLn95w5v6Q==", + "funding": [ + { + "type": "individual", + "url": "https://github.com/sponsors/RubenVerborgh" + } + ], + "engines": { + "node": ">=4.0" + }, + "peerDependenciesMeta": { + "debug": { + "optional": true + } + } + }, "node_modules/form-data": { "version": "2.5.1", "resolved": "https://registry.npmjs.org/form-data/-/form-data-2.5.1.tgz", @@ -442,6 +485,11 @@ "node": ">= 0.6.0" } }, + "node_modules/proxy-from-env": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/proxy-from-env/-/proxy-from-env-1.1.0.tgz", + "integrity": "sha512-D+zkORCbA9f1tdWRK0RaCR3GPv50cMxcrz4X8k5LTSUD1Dkw47mKJEZQNunItRTkWwgtaUSo1RVFRIG9ZXiFYg==" + }, "node_modules/sax": { "version": "1.2.4", "resolved": "https://registry.npmjs.org/sax/-/sax-1.2.4.tgz", @@ -783,6 +831,28 @@ "resolved": "https://registry.npmjs.org/asynckit/-/asynckit-0.4.0.tgz", "integrity": "sha512-Oei9OH4tRh0YqU3GxhX79dM/mwVgvbZJaSNaRk+bshkj0S5cfHcgYakreBjrHwatXKbz+IoIdYLxrKim2MjW0Q==" }, + "axios": { + "version": "1.6.2", + "resolved": "https://registry.npmjs.org/axios/-/axios-1.6.2.tgz", + "integrity": "sha512-7i24Ri4pmDRfJTR7LDBhsOTtcm+9kjX5WiY1X3wIisx6G9So3pfMkEiU7emUBe46oceVImccTEM3k6C5dbVW8A==", + "requires": { + "follow-redirects": "^1.15.0", + "form-data": "^4.0.0", + "proxy-from-env": "^1.1.0" + }, + "dependencies": { + "form-data": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/form-data/-/form-data-4.0.0.tgz", + "integrity": "sha512-ETEklSGi5t0QMZuiXoA/Q6vcnxcLQP5vdugSpuAyi6SVGi2clPPp+xgEhuMaHC+zGgn31Kd235W35f7Hykkaww==", + "requires": { + "asynckit": "^0.4.0", + "combined-stream": "^1.0.8", + "mime-types": "^2.1.12" + } + } + } + }, "balanced-match": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/balanced-match/-/balanced-match-1.0.2.tgz", @@ -825,6 +895,11 @@ "resolved": "https://registry.npmjs.org/events/-/events-3.3.0.tgz", "integrity": "sha512-mQw+2fkQbALzQ7V0MY0IqdnXNOeTtP4r0lN9z7AAawCXgqea7bDii20AYrIBrFd/Hx0M2Ocz6S111CaFkUcb0Q==" }, + "follow-redirects": { + "version": "1.15.3", + "resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.15.3.tgz", + "integrity": "sha512-1VzOtuEM8pC9SFU1E+8KfTjZyMztRsgEfwQl44z8A25uy13jSzTj6dyK2Df52iV0vgHCfBwLhDWevLn95w5v6Q==" + }, "form-data": { "version": "2.5.1", "resolved": "https://registry.npmjs.org/form-data/-/form-data-2.5.1.tgz", @@ -869,6 +944,11 @@ "resolved": "https://registry.npmjs.org/process/-/process-0.11.10.tgz", "integrity": "sha512-cdGef/drWFoydD1JsMzuFf8100nZl+GT+yacc2bEced5f9Rjk4z+WtFUTBu9PhOi9j/jfmBPu0mMEY4wIdAF8A==" }, + "proxy-from-env": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/proxy-from-env/-/proxy-from-env-1.1.0.tgz", + "integrity": "sha512-D+zkORCbA9f1tdWRK0RaCR3GPv50cMxcrz4X8k5LTSUD1Dkw47mKJEZQNunItRTkWwgtaUSo1RVFRIG9ZXiFYg==" + }, "sax": { "version": "1.2.4", "resolved": "https://registry.npmjs.org/sax/-/sax-1.2.4.tgz", diff --git a/packages/warp-cache/package.json b/packages/warp-cache/package.json index 9244656b..7aa554dd 100644 --- a/packages/warp-cache/package.json +++ b/packages/warp-cache/package.json @@ -46,6 +46,7 @@ "@azure/abort-controller": "^1.1.0", "@azure/ms-rest-js": "^2.6.0", "@azure/storage-blob": "^12.13.0", + "axios": "^1.6.2", "semver": "^6.3.1", "uuid": "^3.3.3" }, diff --git a/packages/warp-cache/src/cache.ts b/packages/warp-cache/src/cache.ts index 3a84d140..d4f8576a 100644 --- a/packages/warp-cache/src/cache.ts +++ b/packages/warp-cache/src/cache.ts @@ -3,7 +3,7 @@ import * as path from 'path' import * as utils from './internal/cacheUtils' import * as cacheHttpClient from './internal/cacheHttpClient' import {createTar, extractTar, listTar} from './internal/tar' -import {DownloadOptions, UploadOptions} from './options' +import {DownloadOptions, getUploadOptions} from './options' export class ValidationError extends Error { constructor(message: string) { @@ -95,14 +95,14 @@ export async function restoreCache( compressionMethod, enableCrossOsArchive }) - if (!cacheEntry?.archiveLocation) { + if (!cacheEntry?.pre_signed_url) { // Cache not found return undefined } if (options?.lookupOnly) { core.info('Lookup only - skipping download') - return cacheEntry.cacheKey + return cacheEntry.cache_key } archivePath = path.join( @@ -112,11 +112,7 @@ export async function restoreCache( core.debug(`Archive Path: ${archivePath}`) // Download the cache from the cache entry - await cacheHttpClient.downloadCache( - cacheEntry.archiveLocation, - archivePath, - options - ) + await cacheHttpClient.downloadCache(cacheEntry.pre_signed_url, archivePath) if (core.isDebug()) { await listTar(archivePath, compressionMethod) @@ -132,7 +128,7 @@ export async function restoreCache( await extractTar(archivePath, compressionMethod) core.info('Cache restored successfully') - return cacheEntry.cacheKey + return cacheEntry.cache_key } catch (error) { const typedError = error as Error if (typedError.name === ValidationError.name) { @@ -165,16 +161,15 @@ export async function restoreCache( export async function saveCache( paths: string[], key: string, - options?: UploadOptions, enableCrossOsArchive = false -): Promise { +): Promise { checkPaths(paths) checkKey(key) const compressionMethod = await utils.getCompressionMethod() - let cacheId = -1 const cachePaths = await utils.resolvePaths(paths) + let cacheKey = '' core.debug('Cache Paths:') core.debug(`${JSON.stringify(cachePaths)}`) @@ -197,7 +192,7 @@ export async function saveCache( if (core.isDebug()) { await listTar(archivePath, compressionMethod) } - const fileSizeLimit = 10 * 1024 * 1024 * 1024 // 10GB per repo limit + const fileSizeLimit = 20 * 1024 * 1024 * 1024 // 20GB per repo limit const archiveFileSize = utils.getArchiveFileSizeInBytes(archivePath) core.debug(`File Size: ${archiveFileSize}`) @@ -211,9 +206,13 @@ export async function saveCache( } core.debug('Reserving Cache') + // Calculate number of chunks required + const uploadOptions = getUploadOptions() + const maxChunkSize = uploadOptions?.uploadChunkSize ?? 32 * 1024 * 1024 // Default 32MB + const numberOfChunks = Math.floor(archiveFileSize / maxChunkSize) const reserveCacheResponse = await cacheHttpClient.reserveCache( key, - paths, + numberOfChunks, { compressionMethod, enableCrossOsArchive, @@ -221,23 +220,29 @@ export async function saveCache( } ) - if (reserveCacheResponse?.result?.cacheId) { - cacheId = reserveCacheResponse?.result?.cacheId - } else if (reserveCacheResponse?.statusCode === 400) { + if (reserveCacheResponse?.statusCode === 400) { throw new Error( reserveCacheResponse?.error?.message ?? `Cache size of ~${Math.round( archiveFileSize / (1024 * 1024) )} MB (${archiveFileSize} B) is over the data cap limit, not saving cache.` ) - } else { - throw new ReserveCacheError( - `Unable to reserve cache with key ${key}, another job may be creating this cache. More details: ${reserveCacheResponse?.error?.message}` - ) } - core.debug(`Saving Cache (ID: ${cacheId})`) - await cacheHttpClient.saveCache(cacheId, archivePath, options) + core.debug(`Saving Cache`) + cacheKey = await cacheHttpClient.saveCache( + key, + cacheHttpClient.getCacheVersion( + paths, + compressionMethod, + enableCrossOsArchive + ), + reserveCacheResponse?.result?.upload_id ?? '', + reserveCacheResponse?.result?.upload_key ?? '', + numberOfChunks, + reserveCacheResponse?.result?.pre_signed_urls ?? [], + archivePath + ) } catch (error) { const typedError = error as Error if (typedError.name === ValidationError.name) { @@ -256,5 +261,24 @@ export async function saveCache( } } - return cacheId + return cacheKey +} + +/** + * Deletes an entire cache by cache key. + * @param keys The cache keys + */ +export async function deleteCache(keys: string[]): Promise { + for (const key of keys) { + checkKey(key) + } + + core.debug('Deleting Cache') + core.debug(`Cache Keys: ${keys}`) + + try { + await cacheHttpClient.deleteCache(keys) + } catch (error) { + core.warning(`Failed to delete cache: ${error}`) + } } diff --git a/packages/warp-cache/src/internal/cacheHttpClient.ts b/packages/warp-cache/src/internal/cacheHttpClient.ts index 6d580b39..2b28f073 100644 --- a/packages/warp-cache/src/internal/cacheHttpClient.ts +++ b/packages/warp-cache/src/internal/cacheHttpClient.ts @@ -7,7 +7,6 @@ import { } from '@actions/http-client/lib/interfaces' import * as crypto from 'crypto' import * as fs from 'fs' -import {URL} from 'url' import * as utils from './cacheUtils' import {CompressionMethod} from './constants' @@ -18,34 +17,24 @@ import { ReserveCacheRequest, ReserveCacheResponse, ITypedResponseWithError, - ArtifactCacheList + ArtifactCacheList, + InternalS3CompletedPart, + CommitCacheResponse } from './contracts' -import { - downloadCacheHttpClient, - downloadCacheHttpClientConcurrent, - downloadCacheStorageSDK -} from './downloadUtils' -import { - DownloadOptions, - UploadOptions, - getDownloadOptions, - getUploadOptions -} from '../options' -import { - isSuccessStatusCode, - retryHttpClientResponse, - retryTypedResponse -} from './requestUtils' +import {downloadCacheMultiConnection} from './downloadUtils' +import {isSuccessStatusCode, retry, retryTypedResponse} from './requestUtils' +import axios, {AxiosError} from 'axios' const versionSalt = '1.0' function getCacheApiUrl(resource: string): string { - const baseUrl: string = process.env['ACTIONS_CACHE_URL'] ?? 'localhost:8000' + const baseUrl: string = + process.env['ACTIONS_CACHE_URL'] ?? 'http://127.0.0.1:8002' if (!baseUrl) { throw new Error('Cache Service Url not found, unable to restore cache.') } - const url = `${baseUrl}/v1/cache/${resource}` + const url = `${baseUrl}/v1/${resource}` core.debug(`Resource Url: ${url}`) return url } @@ -130,7 +119,7 @@ export async function getCacheEntry( } const cacheResult = response.result - const cacheDownloadUrl = cacheResult?.archiveLocation + const cacheDownloadUrl = cacheResult?.pre_signed_url if (!cacheDownloadUrl) { // Cache achiveLocation not found. This should never happen, and hence bail out. throw new Error('Cache not found.') @@ -158,9 +147,9 @@ async function printCachesListForDiagnostics( core.debug( `No matching cache found for cache key '${key}', version '${version} and scope ${process.env['GITHUB_REF']}. There exist one or more cache(s) with similar key but they have different version or scope. See more info on cache matching here: https://docs.github.com/en/actions/using-workflows/caching-dependencies-to-speed-up-workflows#matching-a-cache-key \nOther caches with similar key:` ) - for (const cacheEntry of cacheListResult?.artifactCaches || []) { + for (const cacheEntry of cacheListResult?.artifactCaches ?? []) { core.debug( - `Cache Key: ${cacheEntry?.cacheKey}, Cache Version: ${cacheEntry?.cacheVersion}, Cache Scope: ${cacheEntry?.scope}, Cache Created: ${cacheEntry?.creationTime}` + `Cache Key: ${cacheEntry?.cache_key}, Cache Version: ${cacheEntry?.cache_version}` ) } } @@ -169,57 +158,27 @@ async function printCachesListForDiagnostics( export async function downloadCache( archiveLocation: string, - archivePath: string, - options?: DownloadOptions + archivePath: string ): Promise { - const archiveUrl = new URL(archiveLocation) - const downloadOptions = getDownloadOptions(options) - - if (archiveUrl.hostname.endsWith('.blob.core.windows.net')) { - if (downloadOptions.useAzureSdk) { - // Use Azure storage SDK to download caches hosted on Azure to improve speed and reliability. - await downloadCacheStorageSDK( - archiveLocation, - archivePath, - downloadOptions - ) - } else if (downloadOptions.concurrentBlobDownloads) { - // Use concurrent implementation with HttpClient to work around blob SDK issue - await downloadCacheHttpClientConcurrent( - archiveLocation, - archivePath, - downloadOptions - ) - } else { - // Otherwise, download using the Actions http-client. - await downloadCacheHttpClient(archiveLocation, archivePath) - } - } else { - await downloadCacheHttpClient(archiveLocation, archivePath) - } + await downloadCacheMultiConnection(archiveLocation, archivePath, 8) } // Reserve Cache export async function reserveCache( - key: string, - paths: string[], + cacheKey: string, + numberOfChunks: number, options?: InternalCacheOptions ): Promise> { const httpClient = createHttpClient() - const version = getCacheVersion( - paths, - options?.compressionMethod, - options?.enableCrossOsArchive - ) const reserveCacheRequest: ReserveCacheRequest = { - key, - version, - cacheSize: options?.cacheSize + cache_key: cacheKey, + number_of_chunks: numberOfChunks, + content_type: 'application/zstd' } const response = await retryTypedResponse('reserveCache', async () => httpClient.postJson( - getCacheApiUrl('caches'), + getCacheApiUrl('cache/reserve'), reserveCacheRequest ) ) @@ -236,12 +195,12 @@ function getContentRange(start: number, end: number): string { } async function uploadChunk( - httpClient: HttpClient, resourceUrl: string, openStream: () => NodeJS.ReadableStream, + partNumber: number, start: number, end: number -): Promise { +): Promise { core.debug( `Uploading chunk of size ${ end - start + 1 @@ -250,85 +209,77 @@ async function uploadChunk( end )}` ) - const additionalHeaders = { - 'Content-Type': 'application/octet-stream', - 'Content-Range': getContentRange(start, end) - } - const uploadChunkResponse = await retryHttpClientResponse( - `uploadChunk (start: ${start}, end: ${end})`, - async () => - httpClient.sendStream( - 'PATCH', - resourceUrl, - openStream(), - additionalHeaders - ) - ) + // Manually convert the readable stream to a buffer. S3 doesn't allow stream as input + const chunks = await utils.streamToBuffer(openStream()) - if (!isSuccessStatusCode(uploadChunkResponse.message.statusCode)) { + try { + // HACK: Using axios here as S3 API doesn't allow readable stream as input and Github's HTTP client is not able to send buffer as body + const response = await axios.request({ + method: 'PUT', + url: resourceUrl, + headers: { + 'Content-Type': 'application/octet-stream' + }, + data: chunks + }) + return { + ETag: response.headers.etag ?? '', + PartNumber: partNumber + } + } catch (error) { throw new Error( - `Cache service responded with ${uploadChunkResponse.message.statusCode} during upload chunk.` + `Cache service responded with ${ + (error as AxiosError).status + } during upload chunk.` ) } } -async function uploadFile( - httpClient: HttpClient, - cacheId: number, - archivePath: string, - options?: UploadOptions -): Promise { +async function uploadFileToS3( + preSignedURLs: string[], + archivePath: string +): Promise { // Upload Chunks const fileSize = utils.getArchiveFileSizeInBytes(archivePath) - const resourceUrl = getCacheApiUrl(`caches/${cacheId.toString()}`) + const numberOfChunks = preSignedURLs.length + const fd = fs.openSync(archivePath, 'r') - const uploadOptions = getUploadOptions(options) - const concurrency = utils.assertDefined( - 'uploadConcurrency', - uploadOptions.uploadConcurrency - ) - const maxChunkSize = utils.assertDefined( - 'uploadChunkSize', - uploadOptions.uploadChunkSize - ) - - const parallelUploads = [...new Array(concurrency).keys()] core.debug('Awaiting all uploads') let offset = 0 try { - await Promise.all( - parallelUploads.map(async () => { - while (offset < fileSize) { - const chunkSize = Math.min(fileSize - offset, maxChunkSize) - const start = offset - const end = offset + chunkSize - 1 - offset += maxChunkSize + const completedParts = await Promise.all( + preSignedURLs.map(async (presignedURL, index) => { + const chunkSize = Math.ceil(fileSize / numberOfChunks) + const start = offset + const end = offset + chunkSize - 1 + offset += chunkSize - await uploadChunk( - httpClient, - resourceUrl, - () => - fs - .createReadStream(archivePath, { - fd, - start, - end, - autoClose: false - }) - .on('error', error => { - throw new Error( - `Cache upload failed because file read failed with ${error.message}` - ) - }), - start, - end - ) - } + return await uploadChunk( + presignedURL, + () => + fs + .createReadStream(archivePath, { + fd, + start, + end, + autoClose: false + }) + .on('error', error => { + throw new Error( + `Cache upload failed because file read failed with ${error.message}` + ) + }), + index + 1, + start, + end + ) }) ) + + return completedParts } finally { fs.closeSync(fd) } @@ -336,36 +287,68 @@ async function uploadFile( async function commitCache( httpClient: HttpClient, - cacheId: number, - filesize: number -): Promise> { - const commitCacheRequest: CommitCacheRequest = {size: filesize} + cacheKey: string, + cacheVersion: string, + uploadKey: string, + uploadID: string, + parts: InternalS3CompletedPart[] +): Promise> { + const commitCacheRequest: CommitCacheRequest = { + cache_key: cacheKey, + cache_version: cacheVersion, + upload_key: uploadKey, + upload_id: uploadID, + parts: parts, + os: process.env['RUNNER_OS'] ?? 'Linux', + vcs_type: 'github' + } return await retryTypedResponse('commitCache', async () => - httpClient.postJson( - getCacheApiUrl(`caches/${cacheId.toString()}`), + httpClient.postJson( + getCacheApiUrl(`cache/commit`), commitCacheRequest ) ) } export async function saveCache( - cacheId: number, - archivePath: string, - options?: UploadOptions -): Promise { + cacheKey: string, + cacheVersion: string, + uploadId: string, + uploadKey: string, + numberOfChunks: number, + preSignedURLs: string[], + archivePath: string +): Promise { + // Number of chunks should match the number of pre-signed URLs + if (numberOfChunks !== preSignedURLs.length) { + throw new Error( + `Number of chunks (${numberOfChunks}) should match the number of pre-signed URLs (${preSignedURLs.length}).` + ) + } + const httpClient = createHttpClient() core.debug('Upload cache') - await uploadFile(httpClient, cacheId, archivePath, options) + const completedParts = await uploadFileToS3(preSignedURLs, archivePath) + + // Sort parts in ascending order by partNumber + completedParts.sort((a, b) => a.PartNumber - b.PartNumber) // Commit Cache - core.debug('Commiting cache') + core.debug('Committing cache') const cacheSize = utils.getArchiveFileSizeInBytes(archivePath) core.info( `Cache Size: ~${Math.round(cacheSize / (1024 * 1024))} MB (${cacheSize} B)` ) - const commitCacheResponse = await commitCache(httpClient, cacheId, cacheSize) + const commitCacheResponse = await commitCache( + httpClient, + cacheKey, + cacheVersion, + uploadKey, + uploadId, + completedParts + ) if (!isSuccessStatusCode(commitCacheResponse.statusCode)) { throw new Error( `Cache service responded with ${commitCacheResponse.statusCode} during commit cache.` @@ -373,4 +356,16 @@ export async function saveCache( } core.info('Cache saved successfully') + return commitCacheResponse.result?.cache_key ?? '' +} + +export async function deleteCache(keys: string[]) { + const httpClient = createHttpClient() + const resource = `cache?keys=${encodeURIComponent(keys.join(','))}` + const response = await httpClient.del(getCacheApiUrl(resource)) + if (!isSuccessStatusCode(response.message.statusCode)) { + throw new Error( + `Cache service responded with ${response.message.statusCode}` + ) + } } diff --git a/packages/warp-cache/src/internal/cacheUtils.ts b/packages/warp-cache/src/internal/cacheUtils.ts index 650653ad..8513d1c3 100644 --- a/packages/warp-cache/src/internal/cacheUtils.ts +++ b/packages/warp-cache/src/internal/cacheUtils.ts @@ -137,3 +137,12 @@ export function isGhes(): boolean { ) return ghUrl.hostname.toUpperCase() !== 'GITHUB.COM' } + +export function streamToBuffer(stream: NodeJS.ReadableStream): Promise { + return new Promise((resolve, reject) => { + const buffer: Buffer[] = [] + stream.on('data', (chunk: Buffer) => buffer.push(chunk)) + stream.on('error', reject) + stream.on('end', () => resolve(Buffer.concat(buffer))) + }) +} diff --git a/packages/warp-cache/src/internal/contracts.d.ts b/packages/warp-cache/src/internal/contracts.d.ts index 6fcd9427..39058e82 100644 --- a/packages/warp-cache/src/internal/contracts.d.ts +++ b/packages/warp-cache/src/internal/contracts.d.ts @@ -7,11 +7,9 @@ export interface ITypedResponseWithError extends TypedResponse { } export interface ArtifactCacheEntry { - cacheKey?: string - scope?: string - cacheVersion?: string - creationTime?: string - archiveLocation?: string + cache_key?: string + pre_signed_url?: string + cache_version?: string } export interface ArtifactCacheList { @@ -20,17 +18,30 @@ export interface ArtifactCacheList { } export interface CommitCacheRequest { - size: number + cache_key: string + cache_version: string + upload_key: string + upload_id: string + parts: InternalS3CompletedPart[] + os: string + vcs_type: string +} + +export interface CommitCacheResponse { + cache_key: string + cache_version: string } export interface ReserveCacheRequest { - key: string - version?: string - cacheSize?: number + cache_key: string + content_type: string + number_of_chunks: number } export interface ReserveCacheResponse { - cacheId: number + pre_signed_urls: string[] + upload_key: string + upload_id: string } export interface InternalCacheOptions { @@ -43,3 +54,8 @@ export interface ArchiveTool { path: string type: string } + +export interface InternalS3CompletedPart { + ETag: string + PartNumber: number +} diff --git a/packages/warp-cache/src/internal/downloadUtils.ts b/packages/warp-cache/src/internal/downloadUtils.ts index de57ed78..269a1ff8 100644 --- a/packages/warp-cache/src/internal/downloadUtils.ts +++ b/packages/warp-cache/src/internal/downloadUtils.ts @@ -22,10 +22,25 @@ import {AbortController} from '@azure/abort-controller' */ async function pipeResponseToStream( response: HttpClientResponse, - output: NodeJS.WritableStream + output: NodeJS.WritableStream, + progress?: DownloadProgress ): Promise { const pipeline = util.promisify(stream.pipeline) - await pipeline(response.message, output) + await pipeline( + response.message, + new stream.Transform({ + transform(chunk, encoding, callback) { + if (progress) { + progress.setReceivedBytes( + progress.getTransferredBytes() + chunk.length + ) + } + this.push(chunk) + callback() + } + }), + output + ) } /** @@ -204,260 +219,76 @@ export async function downloadCacheHttpClient( } /** - * Download the cache using the Actions toolkit http-client concurrently + * Download the cache using the Actions toolkit http-client with multiple connections * * @param archiveLocation the URL for the cache * @param archivePath the local path where the cache is saved + * @param connections number of connections to use */ -export async function downloadCacheHttpClientConcurrent( - archiveLocation: string, - archivePath: fs.PathLike, - options: DownloadOptions -): Promise { - const archiveDescriptor = await fs.promises.open(archivePath, 'w') - const httpClient = new HttpClient('actions/cache', undefined, { - socketTimeout: options.timeoutInMs, - keepAlive: true - }) - try { - const res = await retryHttpClientResponse( - 'downloadCacheMetadata', - async () => await httpClient.request('HEAD', archiveLocation, null, {}) - ) - - const lengthHeader = res.message.headers['content-length'] - if (lengthHeader === undefined || lengthHeader === null) { - throw new Error('Content-Length not found on blob response') - } - - const length = parseInt(lengthHeader) - if (Number.isNaN(length)) { - throw new Error(`Could not interpret Content-Length: ${length}`) - } - - const downloads: { - offset: number - promiseGetter: () => Promise - }[] = [] - const blockSize = 4 * 1024 * 1024 - - for (let offset = 0; offset < length; offset += blockSize) { - const count = Math.min(blockSize, length - offset) - downloads.push({ - offset, - promiseGetter: async () => { - return await downloadSegmentRetry( - httpClient, - archiveLocation, - offset, - count - ) - } - }) - } - - // reverse to use .pop instead of .shift - downloads.reverse() - let actives = 0 - let bytesDownloaded = 0 - const progress = new DownloadProgress(length) - progress.startDisplayTimer() - const progressFn = progress.onProgress() - - const activeDownloads: {[offset: number]: Promise} = [] - let nextDownload: - | {offset: number; promiseGetter: () => Promise} - | undefined - - const waitAndWrite: () => Promise = async () => { - const segment = await Promise.race(Object.values(activeDownloads)) - await archiveDescriptor.write( - segment.buffer, - 0, - segment.count, - segment.offset - ) - actives-- - delete activeDownloads[segment.offset] - bytesDownloaded += segment.count - progressFn({loadedBytes: bytesDownloaded}) - } - - while ((nextDownload = downloads.pop())) { - activeDownloads[nextDownload.offset] = nextDownload.promiseGetter() - actives++ - - if (actives >= (options.downloadConcurrency ?? 10)) { - await waitAndWrite() - } - } - - while (actives > 0) { - await waitAndWrite() - } - } finally { - httpClient.dispose() - await archiveDescriptor.close() - } -} - -async function downloadSegmentRetry( - httpClient: HttpClient, - archiveLocation: string, - offset: number, - count: number -): Promise { - const retries = 5 - let failures = 0 - - while (true) { - try { - const timeout = 30000 - const result = await promiseWithTimeout( - timeout, - downloadSegment(httpClient, archiveLocation, offset, count) - ) - if (typeof result === 'string') { - throw new Error('downloadSegmentRetry failed due to timeout') - } - - return result - } catch (err) { - if (failures >= retries) { - throw err - } - - failures++ - } - } -} - -async function downloadSegment( - httpClient: HttpClient, - archiveLocation: string, - offset: number, - count: number -): Promise { - const partRes = await retryHttpClientResponse( - 'downloadCachePart', - async () => - await httpClient.get(archiveLocation, { - Range: `bytes=${offset}-${offset + count - 1}` - }) - ) - - if (!partRes.readBodyBuffer) { - throw new Error('Expected HttpClientResponse to implement readBodyBuffer') - } - - return { - offset, - count, - buffer: await partRes.readBodyBuffer() - } -} - -declare class DownloadSegment { - offset: number - count: number - buffer: Buffer -} - -/** - * Download the cache using the Azure Storage SDK. Only call this method if the - * URL points to an Azure Storage endpoint. - * - * @param archiveLocation the URL for the cache - * @param archivePath the local path where the cache is saved - * @param options the download options with the defaults set - */ -export async function downloadCacheStorageSDK( +export async function downloadCacheMultiConnection( archiveLocation: string, archivePath: string, - options: DownloadOptions + connections: number ): Promise { - const client = new BlockBlobClient(archiveLocation, undefined, { - retryOptions: { - // Override the timeout used when downloading each 4 MB chunk - // The default is 2 min / MB, which is way too slow - tryTimeoutInMs: options.timeoutInMs - } - }) - - const properties = await client.getProperties() - const contentLength = properties.contentLength ?? -1 - - if (contentLength < 0) { - // We should never hit this condition, but just in case fall back to downloading the - // file as one large stream - core.debug( - 'Unable to determine content length, downloading file with http-client...' + let fileHandle: fs.promises.FileHandle | null = null + let downloadProgress: DownloadProgress | null = null + try { + fileHandle = await fs.promises.open(archivePath, 'w+') + const httpClient = new HttpClient('actions/cache') + //Request 1 byte to get total content size + const metadataResponse = await retryHttpClientResponse( + 'downloadCache', + async () => + httpClient.get(archiveLocation, { + Range: 'bytes=0-1' + }) ) - - await downloadCacheHttpClient(archiveLocation, archivePath) - } else { - // Use downloadToBuffer for faster downloads, since internally it splits the - // file into 4 MB chunks which can then be parallelized and retried independently - // - // If the file exceeds the buffer maximum length (~1 GB on 32-bit systems and ~2 GB - // on 64-bit systems), split the download into multiple segments - // ~2 GB = 2147483647, beyond this, we start getting out of range error. So, capping it accordingly. - - // Updated segment size to 128MB = 134217728 bytes, to complete a segment faster and fail fast - const maxSegmentSize = Math.min(134217728, buffer.constants.MAX_LENGTH) - const downloadProgress = new DownloadProgress(contentLength) - - const fd = fs.openSync(archivePath, 'w') - - try { - downloadProgress.startDisplayTimer() - const controller = new AbortController() - const abortSignal = controller.signal - while (!downloadProgress.isDone()) { - const segmentStart = - downloadProgress.segmentOffset + downloadProgress.segmentSize - - const segmentSize = Math.min( - maxSegmentSize, - contentLength - segmentStart - ) - - downloadProgress.nextSegment(segmentSize) - const result = await promiseWithTimeout( - options.segmentTimeoutInMs || 3600000, - client.downloadToBuffer(segmentStart, segmentSize, { - abortSignal, - concurrency: options.downloadConcurrency, - onProgress: downloadProgress.onProgress() - }) - ) - if (result === 'timeout') { - controller.abort() - throw new Error( - 'Aborting cache download as the download time exceeded the timeout.' - ) - } else if (Buffer.isBuffer(result)) { - fs.writeFileSync(fd, result) - } - } - } finally { - downloadProgress.stopDisplayTimer() - fs.closeSync(fd) + const contentRange = metadataResponse.message.headers['content-range'] + if (!contentRange) { + console.log(await metadataResponse.readBody()) + throw new Error('Range request not supported by server') } + const match = RegExp(/bytes \d+-\d+\/(\d+)/).exec(contentRange) + if (!match) { + throw new Error( + 'Content-Range header in server response not in correct format' + ) + } + const totalLength = parseInt(match[1]) + await fileHandle.truncate(totalLength) + await fileHandle.sync() + downloadProgress = new DownloadProgress(totalLength) + downloadProgress.startDisplayTimer() + const segmentSize = Math.ceil(totalLength / connections) + const promises: Promise[] = [] + for (let i = 0; i < connections; i++) { + promises.push( + (async () => { + const rangeStart = i * segmentSize + const rangeEnd = Math.min((i + 1) * segmentSize - 1, totalLength - 1) + const downloadResponse = await retryHttpClientResponse( + 'downloadCache', + async () => + httpClient.get(archiveLocation, { + Range: `bytes=${rangeStart}-${rangeEnd}` + }) + ) + const writeStream = fs.createWriteStream(archiveLocation, { + fd: fileHandle.fd, + autoClose: false, + start: rangeStart + }) + await pipeResponseToStream( + downloadResponse, + writeStream, + downloadProgress + ) + })() + ) + } + await Promise.all(promises) + } finally { + downloadProgress?.stopDisplayTimer() + await fileHandle?.close() } } - -const promiseWithTimeout = async ( - timeoutMs: number, - promise: Promise -): Promise => { - let timeoutHandle: NodeJS.Timeout - const timeoutPromise = new Promise(resolve => { - timeoutHandle = setTimeout(() => resolve('timeout'), timeoutMs) - }) - - return Promise.race([promise, timeoutPromise]).then(result => { - clearTimeout(timeoutHandle) - return result - }) -} diff --git a/packages/warp-cache/src/test.ts b/packages/warp-cache/src/test.ts index f54134d5..4a0e02f9 100644 --- a/packages/warp-cache/src/test.ts +++ b/packages/warp-cache/src/test.ts @@ -1,6 +1,20 @@ -import {restoreCache} from './cache' +import {deleteCache, restoreCache, saveCache} from './cache' -restoreCache(['/home/runner/work/_temp'], 'cache-key', [ - 'cache-key-1', - 'cache-key-2' -]) +process.env['RUNNER_TEMP'] = '/Users/prajjwal/Repos/warpbuild/playground/tmp_fs' +process.env['NODE_DEBUG'] = 'http' + +// saveCache( +// ['/Users/prajjwal/Repos/warpbuild/playground/test_fs'], +// 'test-fs-local-key', +// true +// ) + +// restoreCache( +// ['/Users/prajjwal/Repos/warpbuild/playground/test_fs'], +// 'test-fs-local-key', +// [], +// {}, +// true +// ) + +// deleteCache(['test-fs-local-key'])