mirror of https://github.com/actions/toolkit
Merge pull request #3 from WarpBuilds/hotfix-gcs-backup-download
adds backup download method for streaming cachepull/1935/head
commit
cea490e16b
|
@ -1,6 +1,6 @@
|
|||
{
|
||||
"name": "github-actions.warp-cache",
|
||||
"version": "1.1.3",
|
||||
"version": "1.1.11",
|
||||
"preview": true,
|
||||
"description": "Github action to use WarpBuild's in-house cache offering",
|
||||
"keywords": [
|
||||
|
|
|
@ -217,12 +217,62 @@ export async function restoreCache(
|
|||
}
|
||||
}
|
||||
|
||||
try {
|
||||
await extractStreamingTar(
|
||||
readStream,
|
||||
archivePath,
|
||||
compressionMethod,
|
||||
downloadCommandPipe
|
||||
)
|
||||
} catch (error) {
|
||||
core.debug(`Failed to download cache: ${error}`)
|
||||
core.info(
|
||||
`Streaming download failed. Likely a cloud provider issue. Retrying with multipart download`
|
||||
)
|
||||
// Wait 1 second
|
||||
await new Promise(resolve => setTimeout(resolve, 1000))
|
||||
// Try to download the cache using the non-streaming method
|
||||
try {
|
||||
await cacheHttpClient.downloadCache(
|
||||
cacheEntry.provider,
|
||||
archiveLocation,
|
||||
archivePath,
|
||||
cacheEntry.gcs?.short_lived_token?.access_token ?? ''
|
||||
)
|
||||
} catch (error) {
|
||||
core.debug(`Failed to download cache: ${error}`)
|
||||
core.info(
|
||||
`Multipart download failed. Likely a cloud provider issue. Retrying with basic download`
|
||||
)
|
||||
// Wait 1 second
|
||||
await new Promise(resolve => setTimeout(resolve, 1000))
|
||||
// Try to download the cache using the basic method
|
||||
try {
|
||||
await cacheHttpClient.downloadCacheSingleThread(
|
||||
cacheEntry.provider,
|
||||
archiveLocation,
|
||||
archivePath,
|
||||
cacheEntry.gcs?.short_lived_token?.access_token ?? ''
|
||||
)
|
||||
} catch (error) {
|
||||
core.info('Cache Miss. Failed to download cache.')
|
||||
return undefined
|
||||
}
|
||||
}
|
||||
|
||||
if (core.isDebug()) {
|
||||
await listTar(archivePath, compressionMethod)
|
||||
}
|
||||
|
||||
const archiveFileSize = utils.getArchiveFileSizeInBytes(archivePath)
|
||||
core.info(
|
||||
`Cache Size: ~${Math.round(
|
||||
archiveFileSize / (1024 * 1024)
|
||||
)} MB (${archiveFileSize} B)`
|
||||
)
|
||||
|
||||
await extractTar(archivePath, compressionMethod)
|
||||
}
|
||||
core.info('Cache restored successfully')
|
||||
break
|
||||
}
|
||||
|
|
|
@ -15,6 +15,7 @@ import {
|
|||
InternalS3CompletedPart
|
||||
} from './contracts'
|
||||
import {
|
||||
downloadCacheGCP,
|
||||
downloadCacheMultiConnection,
|
||||
downloadCacheMultipartGCP,
|
||||
downloadCacheStreamingGCP
|
||||
|
@ -230,6 +231,37 @@ export async function downloadCache(
|
|||
}
|
||||
}
|
||||
|
||||
export async function downloadCacheSingleThread(
|
||||
provider: string,
|
||||
archiveLocation: string,
|
||||
archivePath: string,
|
||||
gcsToken?: string
|
||||
): Promise<void> {
|
||||
switch (provider) {
|
||||
case 's3':
|
||||
break
|
||||
case 'gcs': {
|
||||
if (!gcsToken) {
|
||||
throw new Error(
|
||||
'Unable to download cache from GCS. GCP token is not provided.'
|
||||
)
|
||||
}
|
||||
|
||||
const oauth2Client = new OAuth2Client()
|
||||
oauth2Client.setCredentials({access_token: gcsToken})
|
||||
const storage = new Storage({
|
||||
authClient: oauth2Client,
|
||||
retryOptions: {
|
||||
autoRetry: false,
|
||||
maxRetries: 1
|
||||
}
|
||||
})
|
||||
await downloadCacheGCP(storage, archiveLocation, archivePath)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export function downloadCacheStreaming(
|
||||
provider: string,
|
||||
archiveLocation: string,
|
||||
|
|
|
@ -313,11 +313,48 @@ export async function downloadCacheMultipartGCP(
|
|||
await transferManager.downloadFileInChunks(objectName, {
|
||||
destination: archivePath,
|
||||
noReturnData: true,
|
||||
chunkSizeBytes: 1024 * 1024 * 8
|
||||
validation: 'crc32c'
|
||||
})
|
||||
} catch (error) {
|
||||
core.debug(`Failed to download cache: ${error}`)
|
||||
core.error(`Failed to download cache.`)
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
export async function downloadCacheGCP(
|
||||
storage: Storage,
|
||||
archiveLocation: string,
|
||||
archivePath: string
|
||||
) {
|
||||
try {
|
||||
const timeoutDuration = 300000 // 5 minutes
|
||||
|
||||
const timeoutPromise = new Promise((_, reject) =>
|
||||
setTimeout(() => reject(new Error('Download timed out')), timeoutDuration)
|
||||
)
|
||||
|
||||
const {bucketName, objectName} =
|
||||
utils.retrieveGCSBucketAndObjectName(archiveLocation)
|
||||
|
||||
const downloadPromise = storage
|
||||
.bucket(bucketName)
|
||||
.file(objectName)
|
||||
.download({
|
||||
destination: archivePath,
|
||||
validation: 'crc32c'
|
||||
})
|
||||
|
||||
try {
|
||||
await Promise.race([downloadPromise, timeoutPromise])
|
||||
core.debug(
|
||||
`Download completed for bucket: ${bucketName}, object: ${objectName}`
|
||||
)
|
||||
} catch (error) {
|
||||
core.debug(`Failed to download cache: ${error}`)
|
||||
throw error
|
||||
}
|
||||
} catch (error) {
|
||||
core.debug(`Failed to download cache: ${error}`)
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
@ -347,7 +384,6 @@ export function downloadCacheStreamingGCP(
|
|||
return storage.bucket(bucketName).file(objectName).createReadStream()
|
||||
} catch (error) {
|
||||
core.debug(`Failed to download cache: ${error}`)
|
||||
core.error(`Failed to download cache.`)
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
|
|
@ -441,21 +441,47 @@ export async function extractStreamingTar(
|
|||
}
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
if (stream) {
|
||||
stream.pipe(commandPipes[0].stdin)
|
||||
const handleStreamError = (
|
||||
stream: NodeJS.ReadableStream | NodeJS.WritableStream,
|
||||
commandName: string
|
||||
) => {
|
||||
stream.on('error', error => {
|
||||
reject(new Error(`Error in ${commandName}: ${error.message}`))
|
||||
})
|
||||
}
|
||||
for (let i = 0; i < commandPipes.length - 1; i++) {
|
||||
commandPipes[i].stdout.pipe(commandPipes[i + 1].stdin)
|
||||
|
||||
commandPipes[i].stderr.on('data', data => {
|
||||
// Attach error handlers and pipe the streams
|
||||
commandPipes.forEach(commandPipe => {
|
||||
handleStreamError(commandPipe.stdin, commandPipe.spawnfile)
|
||||
handleStreamError(commandPipe.stdout, commandPipe.spawnfile)
|
||||
handleStreamError(commandPipe.stderr, commandPipe.spawnfile)
|
||||
|
||||
commandPipe.stderr.on('data', data => {
|
||||
reject(
|
||||
new Error(`Error in ${commandPipes[i].spawnfile}: ${data.toString()}`)
|
||||
new Error(`Error in ${commandPipe.spawnfile}: ${data.toString()}`)
|
||||
)
|
||||
})
|
||||
})
|
||||
|
||||
commandPipes[i].on('error', error => {
|
||||
if (stream) {
|
||||
stream.pipe(commandPipes[0].stdin).on('error', error => {
|
||||
reject(
|
||||
new Error(`Error in ${commandPipes[i].spawnfile}: ${error.message}`)
|
||||
new Error(
|
||||
`Error piping to ${commandPipes[0].spawnfile}: ${error.message}`
|
||||
)
|
||||
)
|
||||
})
|
||||
}
|
||||
for (let i = 0; i < commandPipes.length - 1; i++) {
|
||||
commandPipes[i].stdout
|
||||
.pipe(commandPipes[i + 1].stdin)
|
||||
.on('error', error => {
|
||||
reject(
|
||||
new Error(
|
||||
`Error piping between ${commandPipes[i].spawnfile} and ${
|
||||
commandPipes[i + 1].spawnfile
|
||||
}: ${error.message}`
|
||||
)
|
||||
)
|
||||
})
|
||||
}
|
||||
|
@ -472,6 +498,9 @@ export async function extractStreamingTar(
|
|||
reject(new Error(`Last command exited with code ${code}`))
|
||||
}
|
||||
})
|
||||
lastCommand.on('error', error => {
|
||||
reject(new Error(`Error in ${lastCommand.spawnfile}: ${error.message}`))
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue