1
0
Fork 0

changes to multipart download

pull/1935/head
Prajjwal 2024-04-15 14:05:22 +05:30
parent a09029945f
commit a73dbad5a7
5 changed files with 101 additions and 24 deletions

View File

@ -1,6 +1,6 @@
{
"name": "github-actions.warp-cache",
"version": "1.0.1",
"version": "1.0.2",
"preview": true,
"description": "Github action to use WarpBuild's in-house cache offering",
"keywords": [

View File

@ -137,6 +137,7 @@ export async function restoreCache(
}
await cacheHttpClient.downloadCache(
cacheEntry.provider,
cacheEntry.s3?.pre_signed_url,
archivePath
)
@ -168,20 +169,41 @@ export async function restoreCache(
return cacheKey
}
// For GCS, we do a streaming download which means that we extract the archive while we are downloading it.
const archiveLocation = `gs://${cacheEntry.gcs?.bucket_name}/${cacheEntry.gcs?.cache_key}`
const readStream = cacheHttpClient.downloadCacheStreaming(
'gcs',
await cacheHttpClient.downloadCache(
cacheEntry.provider,
archiveLocation,
cacheEntry?.gcs?.short_lived_token?.access_token ?? ''
archivePath,
cacheEntry.gcs?.short_lived_token?.access_token ?? ''
)
if (!readStream) {
return undefined
if (core.isDebug()) {
await listTar(archivePath, compressionMethod)
}
await extractStreamingTar(readStream, archivePath, compressionMethod)
const archiveFileSize = utils.getArchiveFileSizeInBytes(archivePath)
core.info(
`Cache Size: ~${Math.round(
archiveFileSize / (1024 * 1024)
)} MB (${archiveFileSize} B)`
)
await extractTar(archivePath, compressionMethod)
// For GCS, we do a streaming download which means that we extract the archive while we are downloading it.
// const readStream = cacheHttpClient.downloadCacheStreaming(
// 'gcs',
// archiveLocation,
// cacheEntry?.gcs?.short_lived_token?.access_token ?? ''
// )
// if (!readStream) {
// return undefined
// }
// await extractStreamingTar(readStream, archivePath, compressionMethod)
core.info('Cache restored successfully')
break
}
@ -283,6 +305,14 @@ export async function saveCache(
)
if (!isSuccessStatusCode(reserveCacheResponse?.statusCode)) {
core.debug(`Failed to reserve cache: ${reserveCacheResponse?.statusCode}`)
core.debug(
`Reserve Cache Request: ${JSON.stringify({
key,
numberOfChunks,
cacheVersion
})}`
)
throw new Error(
reserveCacheResponse?.error?.message ??
`Cache size of ~${Math.round(

View File

@ -16,6 +16,7 @@ import {
} from './contracts'
import {
downloadCacheMultiConnection,
downloadCacheMultipartGCP,
downloadCacheStreamingGCP
} from './downloadUtils'
import {isSuccessStatusCode, retryTypedResponse} from './requestUtils'
@ -187,10 +188,31 @@ async function printCachesListForDiagnostics(
*/
export async function downloadCache(
provider: string,
archiveLocation: string,
archivePath: string
archivePath: string,
gcsToken?: string
): Promise<void> {
await downloadCacheMultiConnection(archiveLocation, archivePath, 8)
switch (provider) {
case 's3':
await downloadCacheMultiConnection(archiveLocation, archivePath, 8)
break
case 'gcs': {
if (!gcsToken) {
throw new Error(
'Unable to download cache from GCS. GCP token is not provided.'
)
}
const oauth2Client = new OAuth2Client()
oauth2Client.setCredentials({access_token: gcsToken})
const storage = new Storage({
authClient: oauth2Client
})
await downloadCacheMultipartGCP(storage, archiveLocation, archivePath)
break
}
}
}
export function downloadCacheStreaming(

View File

@ -13,7 +13,7 @@ import {DownloadOptions} from '../options'
import {retryHttpClientResponse} from './requestUtils'
import {AbortController} from '@azure/abort-controller'
import {Storage} from '@google-cloud/storage'
import {Storage, TransferManager} from '@google-cloud/storage'
/**
* Pipes the body of a HTTP response to a stream
@ -294,6 +294,31 @@ export async function downloadCacheMultiConnection(
}
}
/**
* Download cache in multipart using the Gcloud SDK
*
* @param archiveLocation the URL for the cache
*/
export async function downloadCacheMultipartGCP(
storage: Storage,
archiveLocation: string,
archivePath: string
) {
try {
const {bucketName, objectName} =
utils.retrieveGCSBucketAndObjectName(archiveLocation)
const transferManager = new TransferManager(storage.bucket(bucketName))
await transferManager.downloadFileInChunks(objectName, {
destination: archivePath
})
} catch (error) {
core.debug(`Failed to download cache: ${error}`)
core.error(`Failed to download cache.`)
throw error
}
}
/**
* Download the cache to a provider writable stream using GCloud SDK
*

View File

@ -12,11 +12,11 @@ process.env['WARPBUILD_RUNNER_VERIFICATION_TOKEN'] =
process.env['GITHUB_REPOSITORY'] = 'Warpbuilds/backend-cache'
process.env['GITHUB_REF'] = 'refs/heads/main'
saveCache(
['/Users/prajjwal/Repos/warpbuild/playground/test_fs'],
'test-fs-local-key',
true
)
// saveCache(
// ['/Users/prajjwal/Repos/warpbuild/playground/test_fs'],
// 'test-fs-local-key',
// true
// )
// saveCache(
// ['/Users/prajjwal/Repos/warpbuild/playground/test_fs'],
@ -31,14 +31,14 @@ saveCache(
// true
// )
// restoreCache(
// ['/Users/prajjwal/Repos/warpbuild/playground/test_fs'],
// 'test-fs-local-key-3',
// ['test-fs'],
// {},
// true,
// false
// )
restoreCache(
['/Users/prajjwal/Repos/warpbuild/playground/test_fs'],
'test-fs-local-key',
['test-fs'],
{},
true,
false
)
// deleteCache(
// ['/Users/prajjwal/Repos/warpbuild/playground/test_fs'],