From 89354f65407afc6d6c4d740ba32d64ab02e1fab3 Mon Sep 17 00:00:00 2001 From: Bassem Dghaidi <568794+Link-@users.noreply.github.com> Date: Mon, 21 Oct 2024 05:21:32 -0700 Subject: [PATCH] Cleanup implementation and use tarballs instead of streaming zip --- packages/cache/src/cache.ts | 198 ++++++++++++------ .../cache/src/internal/cacheTwirpClient.ts | 13 +- packages/cache/src/internal/constants.ts | 4 +- .../cache/src/internal/v2/download-cache.ts | 85 ++------ .../cache/src/internal/v2/upload-cache.ts | 115 +--------- packages/cache/src/internal/v2/zip.ts | 0 6 files changed, 173 insertions(+), 242 deletions(-) delete mode 100644 packages/cache/src/internal/v2/zip.ts diff --git a/packages/cache/src/cache.ts b/packages/cache/src/cache.ts index 37250c47..7354f649 100644 --- a/packages/cache/src/cache.ts +++ b/packages/cache/src/cache.ts @@ -14,14 +14,10 @@ import { GetCacheEntryDownloadURLRequest, GetCacheEntryDownloadURLResponse } from './generated/results/api/v1/cache' -import { UploadCacheStream } from './internal/v2/upload-cache' -import { StreamExtract } from './internal/v2/download-cache' -import { - UploadZipSpecification, - getUploadZipSpecification -} from '@actions/artifact/lib/internal/upload/upload-zip-specification' -import { createZipUploadStream } from '@actions/artifact/lib/internal/upload/zip' +import { UploadCacheFile } from './internal/v2/upload-cache' +import { DownloadCacheFile } from './internal/v2/download-cache' import { getBackendIdsFromToken, BackendIds } from '@actions/artifact/lib/internal/shared/util' +import { CacheFileSizeLimit } from './internal/constants' export class ValidationError extends Error { constructor(message: string) { @@ -101,6 +97,16 @@ export async function restoreCache( } } +/** + * Restores cache using the legacy Cache Service + * + * @param paths + * @param primaryKey + * @param restoreKeys + * @param options + * @param enableCrossOsArchive + * @returns + */ async function restoreCachev1( paths: string[], primaryKey: string, @@ -209,8 +215,7 @@ async function restoreCachev2( restoreKeys = restoreKeys || [] const keys = [primaryKey, ...restoreKeys] - core.debug('Resolved Keys:') - core.debug(JSON.stringify(keys)) + core.debug(`Resolved Keys: JSON.stringify(keys)`) if (keys.length > 10) { throw new ValidationError( @@ -224,7 +229,6 @@ async function restoreCachev2( let archivePath = '' try { const twirpClient = cacheTwirpClient.internalCacheTwirpClient() - // BackendIds are retrieved form the signed JWT const backendIds: BackendIds = getBackendIdsFromToken() const compressionMethod = await utils.getCompressionMethod() @@ -240,11 +244,11 @@ async function restoreCachev2( ), } + core.debug(`GetCacheEntryDownloadURLRequest: ${JSON.stringify(twirpClient)}`) const response: GetCacheEntryDownloadURLResponse = await twirpClient.GetCacheEntryDownloadURL(request) core.debug(`GetCacheEntryDownloadURLResponse: ${JSON.stringify(response)}`) if (!response.ok) { - // Cache not found core.warning(`Cache not found for keys: ${keys.join(', ')}`) return undefined } @@ -262,11 +266,13 @@ async function restoreCachev2( ) core.debug(`Archive path: ${archivePath}`) - if (core.isDebug()) { - await listTar(archivePath, compressionMethod) - } - core.debug(`Starting download of artifact to: ${archivePath}`) + + await DownloadCacheFile( + response.signedDownloadUrl, + archivePath + ) + const archiveFileSize = utils.getArchiveFileSizeInBytes(archivePath) core.info( `Cache Size: ~${Math.round( @@ -274,18 +280,16 @@ async function restoreCachev2( )} MB (${archiveFileSize} B)` ) - // Download the cache from the cache entry - await cacheHttpClient.downloadCache( - response.signedDownloadUrl, - archivePath, - options - ) + if (core.isDebug()) { + await listTar(archivePath, compressionMethod) + } await extractTar(archivePath, compressionMethod) core.info('Cache restored successfully') return request.key } catch (error) { + // TODO: handle all the possible error scenarios throw new Error(`Unable to download and extract cache: ${error.message}`) } finally { try { @@ -294,6 +298,8 @@ async function restoreCachev2( core.debug(`Failed to delete archive: ${error}`) } } + + return undefined } /** @@ -325,6 +331,15 @@ export async function saveCache( } } +/** + * Save cache using the legacy Cache Service + * + * @param paths + * @param key + * @param options + * @param enableCrossOsArchive + * @returns + */ async function saveCachev1( paths: string[], key: string, @@ -419,6 +434,15 @@ async function saveCachev1( return cacheId } +/** + * Save cache using the new Cache Service + * + * @param paths + * @param key + * @param options + * @param enableCrossOsArchive + * @returns + */ async function saveCachev2( paths: string[], key: string, @@ -428,59 +452,103 @@ async function saveCachev2( // BackendIds are retrieved form the signed JWT const backendIds: BackendIds = getBackendIdsFromToken() const compressionMethod = await utils.getCompressionMethod() - const version = utils.getCacheVersion( - paths, - compressionMethod, - enableCrossOsArchive - ) const twirpClient = cacheTwirpClient.internalCacheTwirpClient() - const request: CreateCacheEntryRequest = { - workflowRunBackendId: backendIds.workflowRunBackendId, - workflowJobRunBackendId: backendIds.workflowJobRunBackendId, - key: key, - version: version - } - const response: CreateCacheEntryResponse = await twirpClient.CreateCacheEntry(request) - core.info(`CreateCacheEntryResponse: ${JSON.stringify(response)}`) + let cacheId = -1 - // Archive - // We're going to handle 1 path fow now. This needs to be fixed to handle all - // paths passed in. - const rootDir = path.dirname(paths[0]) - const zipSpecs: UploadZipSpecification[] = getUploadZipSpecification(paths, rootDir) - if (zipSpecs.length === 0) { + const cachePaths = await utils.resolvePaths(paths) + core.debug('Cache Paths:') + core.debug(`${JSON.stringify(cachePaths)}`) + + if (cachePaths.length === 0) { throw new Error( - `Error with zip specs: ${zipSpecs.flatMap(s => (s.sourcePath ? [s.sourcePath] : [])).join(', ')}` + `Path Validation Error: Path(s) specified in the action for caching do(es) not exist, hence no cache is being saved.` ) } - // 0: No compression - // 1: Best speed - // 6: Default compression (same as GNU Gzip) - // 9: Best compression Higher levels will result in better compression, but will take longer to complete. For large files that are not easily compressed, a value of 0 is recommended for significantly faster uploads. - const zipUploadStream = await createZipUploadStream( - zipSpecs, - 6 + const archiveFolder = await utils.createTempDirectory() + const archivePath = path.join( + archiveFolder, + utils.getCacheFileName(compressionMethod) ) - // Cache v2 upload - // inputs: - // - getSignedUploadURL - // - archivePath - core.info(`Saving Cache v2: ${paths[0]}`) - await UploadCacheStream(response.signedUploadUrl, zipUploadStream) + core.debug(`Archive Path: ${archivePath}`) - // Finalize the cache entry - const finalizeRequest: FinalizeCacheEntryUploadRequest = { - workflowRunBackendId: backendIds.workflowRunBackendId, - workflowJobRunBackendId: backendIds.workflowJobRunBackendId, - key: key, - version: version, - sizeBytes: "1024", + try { + await createTar(archiveFolder, cachePaths, compressionMethod) + if (core.isDebug()) { + await listTar(archivePath, compressionMethod) + } + + const archiveFileSize = utils.getArchiveFileSizeInBytes(archivePath) + core.debug(`File Size: ${archiveFileSize}`) + + // For GHES, this check will take place in ReserveCache API with enterprise file size limit + if (archiveFileSize > CacheFileSizeLimit && !utils.isGhes()) { + throw new Error( + `Cache size of ~${Math.round( + archiveFileSize / (1024 * 1024) + )} MB (${archiveFileSize} B) is over the 10GB limit, not saving cache.` + ) + } + + core.debug('Reserving Cache') + const version = utils.getCacheVersion( + paths, + compressionMethod, + enableCrossOsArchive + ) + const request: CreateCacheEntryRequest = { + workflowRunBackendId: backendIds.workflowRunBackendId, + workflowJobRunBackendId: backendIds.workflowJobRunBackendId, + key: key, + version: version + } + const response: CreateCacheEntryResponse = await twirpClient.CreateCacheEntry(request) + core.info(`CreateCacheEntryResponse: ${JSON.stringify(response)}`) + // TODO: handle the error cases here + if (!response.ok) { + throw new ReserveCacheError( + `Unable to reserve cache with key ${key}, another job may be creating this cache.` + ) + } + + // TODO: mask the signed upload URL + core.debug(`Saving Cache to: ${response.signedUploadUrl}`) + await UploadCacheFile( + response.signedUploadUrl, + archivePath, + ) + + const finalizeRequest: FinalizeCacheEntryUploadRequest = { + workflowRunBackendId: backendIds.workflowRunBackendId, + workflowJobRunBackendId: backendIds.workflowJobRunBackendId, + key: key, + version: version, + sizeBytes: `${archiveFileSize}`, + } + + const finalizeResponse: FinalizeCacheEntryUploadResponse = await twirpClient.FinalizeCacheEntryUpload(finalizeRequest) + core.debug(`FinalizeCacheEntryUploadResponse: ${JSON.stringify(finalizeResponse)}`) + + if (!finalizeResponse.ok) { + throw new Error( + `Unable to finalize cache with key ${key}, another job may be finalizing this cache.` + ) + } + + // TODO: this is not great, we should handle the types without parsing + cacheId = parseInt(finalizeResponse.entryId) + } catch (error) { + const typedError = error as Error + core.debug(typedError.message) + } finally { + // Try to delete the archive to save space + try { + await utils.unlinkFile(archivePath) + } catch (error) { + core.debug(`Failed to delete archive: ${error}`) + } } - const finalizeResponse: FinalizeCacheEntryUploadResponse = await twirpClient.FinalizeCacheEntryUpload(finalizeRequest) - core.info(`FinalizeCacheEntryUploadResponse: ${JSON.stringify(finalizeResponse)}`) - - return 0 + return cacheId } \ No newline at end of file diff --git a/packages/cache/src/internal/cacheTwirpClient.ts b/packages/cache/src/internal/cacheTwirpClient.ts index cc365ec6..3cb3422e 100644 --- a/packages/cache/src/internal/cacheTwirpClient.ts +++ b/packages/cache/src/internal/cacheTwirpClient.ts @@ -1,8 +1,8 @@ -import { HttpClient, HttpClientResponse, HttpCodes } from '@actions/http-client' -import { BearerCredentialHandler } from '@actions/http-client/lib/auth' import { info, debug } from '@actions/core' -import { CacheServiceClientJSON } from '../generated/results/api/v1/cache.twirp' import { getRuntimeToken, getCacheServiceURL } from './config' +import { BearerCredentialHandler } from '@actions/http-client/lib/auth' +import { HttpClient, HttpClientResponse, HttpCodes } from '@actions/http-client' +import { CacheServiceClientJSON } from '../generated/results/api/v1/cache.twirp' // import {getUserAgentString} from './user-agent' // import {NetworkError, UsageError} from './errors' @@ -16,6 +16,13 @@ interface Rpc { ): Promise } +/** + * This class is a wrapper around the CacheServiceClientJSON class generated by Twirp. + * + * It adds retry logic to the request method, which is not present in the generated client. + * + * This class is used to interact with cache service v2. + */ class CacheServiceClient implements Rpc { private httpClient: HttpClient private baseUrl: string diff --git a/packages/cache/src/internal/constants.ts b/packages/cache/src/internal/constants.ts index b2cddf96..bc4e1d7a 100644 --- a/packages/cache/src/internal/constants.ts +++ b/packages/cache/src/internal/constants.ts @@ -35,4 +35,6 @@ export const SystemTarPathOnWindows = `${process.env['SYSTEMDRIVE']}\\Windows\\S export const TarFilename = 'cache.tar' -export const ManifestFilename = 'manifest.txt' \ No newline at end of file +export const ManifestFilename = 'manifest.txt' + +export const CacheFileSizeLimit = 10 * Math.pow(1024, 3) // 10GiB per repository \ No newline at end of file diff --git a/packages/cache/src/internal/v2/download-cache.ts b/packages/cache/src/internal/v2/download-cache.ts index 19563181..1820cb70 100644 --- a/packages/cache/src/internal/v2/download-cache.ts +++ b/packages/cache/src/internal/v2/download-cache.ts @@ -1,68 +1,25 @@ import * as core from '@actions/core' -import * as httpClient from '@actions/http-client' -import unzip from 'unzip-stream' -const packageJson = require('../../../package.json') -export async function StreamExtract(url: string, directory: string): Promise { - let retryCount = 0 - while (retryCount < 5) { - try { - await streamExtractExternal(url, directory) - return - } catch (error) { - retryCount++ - core.info( - `Failed to download cache after ${retryCount} retries due to ${error.message}. Retrying in 5 seconds...` - ) - // wait 5 seconds before retrying - await new Promise(resolve => setTimeout(resolve, 5000)) - } - } +import { + BlobClient, + BlockBlobClient, + BlobDownloadOptions, +} from '@azure/storage-blob' - throw new Error(`Cache download failed after ${retryCount} retries.`) -} +export async function DownloadCacheFile( + signedUploadURL: string, + archivePath: string, +): Promise<{}> { + const downloadOptions: BlobDownloadOptions = { + maxRetryRequests: 5, + } -export async function streamExtractExternal( - url: string, - directory: string - ): Promise { - const client = new httpClient.HttpClient(`@actions/cache-${packageJson.version}`) - const response = await client.get(url) - if (response.message.statusCode !== 200) { - core.info(`Failed to download cache. HTTP status code: ${response.message.statusCode}`) - throw new Error( - `Unexpected HTTP response from blob storage: ${response.message.statusCode} ${response.message.statusMessage}` - ) - } - - const timeout = 30 * 1000 // 30 seconds - - return new Promise((resolve, reject) => { - const timerFn = (): void => { - response.message.destroy( - new Error(`Blob storage chunk did not respond in ${timeout}ms`) - ) - } - const timer = setTimeout(timerFn, timeout) - - response.message - .on('data', () => { - timer.refresh() - }) - .on('error', (error: Error) => { - core.info( - `response.message: Cache download failed: ${error.message}` - ) - clearTimeout(timer) - reject(error) - }) - .pipe(unzip.Extract({path: directory})) - .on('close', () => { - clearTimeout(timer) - resolve() - }) - .on('error', (error: Error) => { - reject(error) - }) - }) - } \ No newline at end of file + // TODO: tighten the configuration and pass the appropriate user-agent + const blobClient: BlobClient = new BlobClient(signedUploadURL) + const blockBlobClient: BlockBlobClient = blobClient.getBlockBlobClient() + + core.debug(`BlobClient: ${JSON.stringify(blobClient)}`) + core.debug(`blockBlobClient: ${JSON.stringify(blockBlobClient)}`) + + return blockBlobClient.downloadToFile(archivePath, 0, undefined, downloadOptions) +} \ No newline at end of file diff --git a/packages/cache/src/internal/v2/upload-cache.ts b/packages/cache/src/internal/v2/upload-cache.ts index b3ed530d..e4572d20 100644 --- a/packages/cache/src/internal/v2/upload-cache.ts +++ b/packages/cache/src/internal/v2/upload-cache.ts @@ -1,130 +1,27 @@ import * as core from '@actions/core' -import { CreateCacheEntryResponse } from '../../generated/results/api/v1/cache' -import { ZipUploadStream } from '@actions/artifact/lib/internal/upload/zip' -import { NetworkError } from '@actions/artifact/' -import { TransferProgressEvent } from '@azure/core-http' -import * as stream from 'stream' -import * as crypto from 'crypto' - import { BlobClient, BlockBlobClient, - BlockBlobUploadStreamOptions, BlockBlobParallelUploadOptions } from '@azure/storage-blob' -export async function UploadCacheStream( - signedUploadURL: string, - zipUploadStream: ZipUploadStream -): Promise<{}> { - let uploadByteCount = 0 - let lastProgressTime = Date.now() - let timeoutId: NodeJS.Timeout | undefined - - const chunkTimer = (timeout: number): NodeJS.Timeout => { - // clear the previous timeout - if (timeoutId) { - clearTimeout(timeoutId) - } - - timeoutId = setTimeout(() => { - const now = Date.now() - // if there's been more than 30 seconds since the - // last progress event, then we'll consider the upload stalled - if (now - lastProgressTime > timeout) { - throw new Error('Upload progress stalled.') - } - }, timeout) - return timeoutId - } - - const maxConcurrency = 32 - const bufferSize = 8 * 1024 * 1024 // 8 MB Chunks - const blobClient = new BlobClient(signedUploadURL) - const blockBlobClient = blobClient.getBlockBlobClient() - const timeoutDuration = 300000 // 30 seconds - - core.debug( - `Uploading cache zip to blob storage with maxConcurrency: ${maxConcurrency}, bufferSize: ${bufferSize}` - ) - - const uploadCallback = (progress: TransferProgressEvent): void => { - core.info(`Uploaded bytes ${progress.loadedBytes}`) - uploadByteCount = progress.loadedBytes - chunkTimer(timeoutDuration) - lastProgressTime = Date.now() - } - - const options: BlockBlobUploadStreamOptions = { - blobHTTPHeaders: { blobContentType: 'zip' }, - onProgress: uploadCallback - } - - let sha256Hash: string | undefined = undefined - const uploadStream = new stream.PassThrough() - const hashStream = crypto.createHash('sha256') - - zipUploadStream.pipe(uploadStream) // This stream is used for the upload - zipUploadStream.pipe(hashStream).setEncoding('hex') // This stream is used to compute a hash of the zip content that gets used. Integrity check - - core.info('Beginning upload of cache to blob storage') - try { - // Start the chunk timer - timeoutId = chunkTimer(timeoutDuration) - await blockBlobClient.uploadStream( - uploadStream, - bufferSize, - maxConcurrency, - options - ) - } catch (error) { - if (NetworkError.isNetworkErrorCode(error?.code)) { - throw new NetworkError(error?.code) - } - throw error - } finally { - // clear the timeout whether or not the upload completes - if (timeoutId) { - clearTimeout(timeoutId) - } - } - - core.info('Finished uploading cache content to blob storage!') - - hashStream.end() - sha256Hash = hashStream.read() as string - core.info(`SHA256 hash of uploaded artifact zip is ${sha256Hash}`) - core.info(`Uploaded: ${uploadByteCount} bytes`) - - if (uploadByteCount === 0) { - core.error( - `No data was uploaded to blob storage. Reported upload byte count is 0.` - ) - } - return { - uploadSize: uploadByteCount, - sha256Hash - } -} - export async function UploadCacheFile( - uploadURL: CreateCacheEntryResponse, + signedUploadURL: string, archivePath: string, ): Promise<{}> { - core.info(`Uploading ${archivePath} to: ${JSON.stringify(uploadURL)}`) - + // TODO: tighten the configuration and pass the appropriate user-agent // Specify data transfer options const uploadOptions: BlockBlobParallelUploadOptions = { blockSize: 4 * 1024 * 1024, // 4 MiB max block size - concurrency: 2, // maximum number of parallel transfer workers + concurrency: 4, // maximum number of parallel transfer workers maxSingleShotSize: 8 * 1024 * 1024, // 8 MiB initial transfer size }; - const blobClient: BlobClient = new BlobClient(uploadURL.signedUploadUrl) + const blobClient: BlobClient = new BlobClient(signedUploadURL) const blockBlobClient: BlockBlobClient = blobClient.getBlockBlobClient() - core.info(`BlobClient: ${JSON.stringify(blobClient)}`) - core.info(`blockBlobClient: ${JSON.stringify(blockBlobClient)}`) + core.debug(`BlobClient: ${JSON.stringify(blobClient)}`) + core.debug(`blockBlobClient: ${JSON.stringify(blockBlobClient)}`) return blockBlobClient.uploadFile(archivePath, uploadOptions); } \ No newline at end of file diff --git a/packages/cache/src/internal/v2/zip.ts b/packages/cache/src/internal/v2/zip.ts deleted file mode 100644 index e69de29b..00000000