From a73dbad5a756133dd757bcda34b18777ddba01a9 Mon Sep 17 00:00:00 2001 From: Prajjwal Date: Mon, 15 Apr 2024 14:05:22 +0530 Subject: [PATCH] changes to multipart download --- packages/warp-cache/package.json | 2 +- packages/warp-cache/src/cache.ts | 44 ++++++++++++++++--- .../src/internal/cacheHttpClient.ts | 26 ++++++++++- .../warp-cache/src/internal/downloadUtils.ts | 27 +++++++++++- packages/warp-cache/src/test.ts | 26 +++++------ 5 files changed, 101 insertions(+), 24 deletions(-) diff --git a/packages/warp-cache/package.json b/packages/warp-cache/package.json index 092faa98..85024a22 100644 --- a/packages/warp-cache/package.json +++ b/packages/warp-cache/package.json @@ -1,6 +1,6 @@ { "name": "github-actions.warp-cache", - "version": "1.0.1", + "version": "1.0.2", "preview": true, "description": "Github action to use WarpBuild's in-house cache offering", "keywords": [ diff --git a/packages/warp-cache/src/cache.ts b/packages/warp-cache/src/cache.ts index 8eee08fb..71cc60dd 100644 --- a/packages/warp-cache/src/cache.ts +++ b/packages/warp-cache/src/cache.ts @@ -137,6 +137,7 @@ export async function restoreCache( } await cacheHttpClient.downloadCache( + cacheEntry.provider, cacheEntry.s3?.pre_signed_url, archivePath ) @@ -168,20 +169,41 @@ export async function restoreCache( return cacheKey } - // For GCS, we do a streaming download which means that we extract the archive while we are downloading it. const archiveLocation = `gs://${cacheEntry.gcs?.bucket_name}/${cacheEntry.gcs?.cache_key}` - const readStream = cacheHttpClient.downloadCacheStreaming( - 'gcs', + await cacheHttpClient.downloadCache( + cacheEntry.provider, archiveLocation, - cacheEntry?.gcs?.short_lived_token?.access_token ?? '' + archivePath, + cacheEntry.gcs?.short_lived_token?.access_token ?? '' ) - if (!readStream) { - return undefined + if (core.isDebug()) { + await listTar(archivePath, compressionMethod) } - await extractStreamingTar(readStream, archivePath, compressionMethod) + const archiveFileSize = utils.getArchiveFileSizeInBytes(archivePath) + core.info( + `Cache Size: ~${Math.round( + archiveFileSize / (1024 * 1024) + )} MB (${archiveFileSize} B)` + ) + + await extractTar(archivePath, compressionMethod) + + // For GCS, we do a streaming download which means that we extract the archive while we are downloading it. + + // const readStream = cacheHttpClient.downloadCacheStreaming( + // 'gcs', + // archiveLocation, + // cacheEntry?.gcs?.short_lived_token?.access_token ?? '' + // ) + + // if (!readStream) { + // return undefined + // } + + // await extractStreamingTar(readStream, archivePath, compressionMethod) core.info('Cache restored successfully') break } @@ -283,6 +305,14 @@ export async function saveCache( ) if (!isSuccessStatusCode(reserveCacheResponse?.statusCode)) { + core.debug(`Failed to reserve cache: ${reserveCacheResponse?.statusCode}`) + core.debug( + `Reserve Cache Request: ${JSON.stringify({ + key, + numberOfChunks, + cacheVersion + })}` + ) throw new Error( reserveCacheResponse?.error?.message ?? `Cache size of ~${Math.round( diff --git a/packages/warp-cache/src/internal/cacheHttpClient.ts b/packages/warp-cache/src/internal/cacheHttpClient.ts index 8fab972f..c41e179a 100644 --- a/packages/warp-cache/src/internal/cacheHttpClient.ts +++ b/packages/warp-cache/src/internal/cacheHttpClient.ts @@ -16,6 +16,7 @@ import { } from './contracts' import { downloadCacheMultiConnection, + downloadCacheMultipartGCP, downloadCacheStreamingGCP } from './downloadUtils' import {isSuccessStatusCode, retryTypedResponse} from './requestUtils' @@ -187,10 +188,31 @@ async function printCachesListForDiagnostics( */ export async function downloadCache( + provider: string, archiveLocation: string, - archivePath: string + archivePath: string, + gcsToken?: string ): Promise { - await downloadCacheMultiConnection(archiveLocation, archivePath, 8) + switch (provider) { + case 's3': + await downloadCacheMultiConnection(archiveLocation, archivePath, 8) + break + case 'gcs': { + if (!gcsToken) { + throw new Error( + 'Unable to download cache from GCS. GCP token is not provided.' + ) + } + + const oauth2Client = new OAuth2Client() + oauth2Client.setCredentials({access_token: gcsToken}) + const storage = new Storage({ + authClient: oauth2Client + }) + await downloadCacheMultipartGCP(storage, archiveLocation, archivePath) + break + } + } } export function downloadCacheStreaming( diff --git a/packages/warp-cache/src/internal/downloadUtils.ts b/packages/warp-cache/src/internal/downloadUtils.ts index 9019f2d4..9faeb3aa 100644 --- a/packages/warp-cache/src/internal/downloadUtils.ts +++ b/packages/warp-cache/src/internal/downloadUtils.ts @@ -13,7 +13,7 @@ import {DownloadOptions} from '../options' import {retryHttpClientResponse} from './requestUtils' import {AbortController} from '@azure/abort-controller' -import {Storage} from '@google-cloud/storage' +import {Storage, TransferManager} from '@google-cloud/storage' /** * Pipes the body of a HTTP response to a stream @@ -294,6 +294,31 @@ export async function downloadCacheMultiConnection( } } +/** + * Download cache in multipart using the Gcloud SDK + * + * @param archiveLocation the URL for the cache + */ +export async function downloadCacheMultipartGCP( + storage: Storage, + archiveLocation: string, + archivePath: string +) { + try { + const {bucketName, objectName} = + utils.retrieveGCSBucketAndObjectName(archiveLocation) + + const transferManager = new TransferManager(storage.bucket(bucketName)) + await transferManager.downloadFileInChunks(objectName, { + destination: archivePath + }) + } catch (error) { + core.debug(`Failed to download cache: ${error}`) + core.error(`Failed to download cache.`) + throw error + } +} + /** * Download the cache to a provider writable stream using GCloud SDK * diff --git a/packages/warp-cache/src/test.ts b/packages/warp-cache/src/test.ts index 65340121..9620df75 100644 --- a/packages/warp-cache/src/test.ts +++ b/packages/warp-cache/src/test.ts @@ -12,11 +12,11 @@ process.env['WARPBUILD_RUNNER_VERIFICATION_TOKEN'] = process.env['GITHUB_REPOSITORY'] = 'Warpbuilds/backend-cache' process.env['GITHUB_REF'] = 'refs/heads/main' -saveCache( - ['/Users/prajjwal/Repos/warpbuild/playground/test_fs'], - 'test-fs-local-key', - true -) +// saveCache( +// ['/Users/prajjwal/Repos/warpbuild/playground/test_fs'], +// 'test-fs-local-key', +// true +// ) // saveCache( // ['/Users/prajjwal/Repos/warpbuild/playground/test_fs'], @@ -31,14 +31,14 @@ saveCache( // true // ) -// restoreCache( -// ['/Users/prajjwal/Repos/warpbuild/playground/test_fs'], -// 'test-fs-local-key-3', -// ['test-fs'], -// {}, -// true, -// false -// ) +restoreCache( + ['/Users/prajjwal/Repos/warpbuild/playground/test_fs'], + 'test-fs-local-key', + ['test-fs'], + {}, + true, + false +) // deleteCache( // ['/Users/prajjwal/Repos/warpbuild/playground/test_fs'],