1
0
Fork 0

Cleanup implementation and use tarballs instead of streaming zip

pull/1857/head
Bassem Dghaidi 2024-10-21 05:21:32 -07:00 committed by GitHub
parent d399e33060
commit 89354f6540
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 173 additions and 242 deletions

View File

@ -14,14 +14,10 @@ import {
GetCacheEntryDownloadURLRequest, GetCacheEntryDownloadURLRequest,
GetCacheEntryDownloadURLResponse GetCacheEntryDownloadURLResponse
} from './generated/results/api/v1/cache' } from './generated/results/api/v1/cache'
import { UploadCacheStream } from './internal/v2/upload-cache' import { UploadCacheFile } from './internal/v2/upload-cache'
import { StreamExtract } from './internal/v2/download-cache' import { DownloadCacheFile } from './internal/v2/download-cache'
import {
UploadZipSpecification,
getUploadZipSpecification
} from '@actions/artifact/lib/internal/upload/upload-zip-specification'
import { createZipUploadStream } from '@actions/artifact/lib/internal/upload/zip'
import { getBackendIdsFromToken, BackendIds } from '@actions/artifact/lib/internal/shared/util' import { getBackendIdsFromToken, BackendIds } from '@actions/artifact/lib/internal/shared/util'
import { CacheFileSizeLimit } from './internal/constants'
export class ValidationError extends Error { export class ValidationError extends Error {
constructor(message: string) { constructor(message: string) {
@ -101,6 +97,16 @@ export async function restoreCache(
} }
} }
/**
* Restores cache using the legacy Cache Service
*
* @param paths
* @param primaryKey
* @param restoreKeys
* @param options
* @param enableCrossOsArchive
* @returns
*/
async function restoreCachev1( async function restoreCachev1(
paths: string[], paths: string[],
primaryKey: string, primaryKey: string,
@ -209,8 +215,7 @@ async function restoreCachev2(
restoreKeys = restoreKeys || [] restoreKeys = restoreKeys || []
const keys = [primaryKey, ...restoreKeys] const keys = [primaryKey, ...restoreKeys]
core.debug('Resolved Keys:') core.debug(`Resolved Keys: JSON.stringify(keys)`)
core.debug(JSON.stringify(keys))
if (keys.length > 10) { if (keys.length > 10) {
throw new ValidationError( throw new ValidationError(
@ -224,7 +229,6 @@ async function restoreCachev2(
let archivePath = '' let archivePath = ''
try { try {
const twirpClient = cacheTwirpClient.internalCacheTwirpClient() const twirpClient = cacheTwirpClient.internalCacheTwirpClient()
// BackendIds are retrieved form the signed JWT
const backendIds: BackendIds = getBackendIdsFromToken() const backendIds: BackendIds = getBackendIdsFromToken()
const compressionMethod = await utils.getCompressionMethod() const compressionMethod = await utils.getCompressionMethod()
@ -240,11 +244,11 @@ async function restoreCachev2(
), ),
} }
core.debug(`GetCacheEntryDownloadURLRequest: ${JSON.stringify(twirpClient)}`)
const response: GetCacheEntryDownloadURLResponse = await twirpClient.GetCacheEntryDownloadURL(request) const response: GetCacheEntryDownloadURLResponse = await twirpClient.GetCacheEntryDownloadURL(request)
core.debug(`GetCacheEntryDownloadURLResponse: ${JSON.stringify(response)}`) core.debug(`GetCacheEntryDownloadURLResponse: ${JSON.stringify(response)}`)
if (!response.ok) { if (!response.ok) {
// Cache not found
core.warning(`Cache not found for keys: ${keys.join(', ')}`) core.warning(`Cache not found for keys: ${keys.join(', ')}`)
return undefined return undefined
} }
@ -262,11 +266,13 @@ async function restoreCachev2(
) )
core.debug(`Archive path: ${archivePath}`) core.debug(`Archive path: ${archivePath}`)
if (core.isDebug()) {
await listTar(archivePath, compressionMethod)
}
core.debug(`Starting download of artifact to: ${archivePath}`) core.debug(`Starting download of artifact to: ${archivePath}`)
await DownloadCacheFile(
response.signedDownloadUrl,
archivePath
)
const archiveFileSize = utils.getArchiveFileSizeInBytes(archivePath) const archiveFileSize = utils.getArchiveFileSizeInBytes(archivePath)
core.info( core.info(
`Cache Size: ~${Math.round( `Cache Size: ~${Math.round(
@ -274,18 +280,16 @@ async function restoreCachev2(
)} MB (${archiveFileSize} B)` )} MB (${archiveFileSize} B)`
) )
// Download the cache from the cache entry if (core.isDebug()) {
await cacheHttpClient.downloadCache( await listTar(archivePath, compressionMethod)
response.signedDownloadUrl, }
archivePath,
options
)
await extractTar(archivePath, compressionMethod) await extractTar(archivePath, compressionMethod)
core.info('Cache restored successfully') core.info('Cache restored successfully')
return request.key return request.key
} catch (error) { } catch (error) {
// TODO: handle all the possible error scenarios
throw new Error(`Unable to download and extract cache: ${error.message}`) throw new Error(`Unable to download and extract cache: ${error.message}`)
} finally { } finally {
try { try {
@ -294,6 +298,8 @@ async function restoreCachev2(
core.debug(`Failed to delete archive: ${error}`) core.debug(`Failed to delete archive: ${error}`)
} }
} }
return undefined
} }
/** /**
@ -325,6 +331,15 @@ export async function saveCache(
} }
} }
/**
* Save cache using the legacy Cache Service
*
* @param paths
* @param key
* @param options
* @param enableCrossOsArchive
* @returns
*/
async function saveCachev1( async function saveCachev1(
paths: string[], paths: string[],
key: string, key: string,
@ -419,6 +434,15 @@ async function saveCachev1(
return cacheId return cacheId
} }
/**
* Save cache using the new Cache Service
*
* @param paths
* @param key
* @param options
* @param enableCrossOsArchive
* @returns
*/
async function saveCachev2( async function saveCachev2(
paths: string[], paths: string[],
key: string, key: string,
@ -428,59 +452,103 @@ async function saveCachev2(
// BackendIds are retrieved form the signed JWT // BackendIds are retrieved form the signed JWT
const backendIds: BackendIds = getBackendIdsFromToken() const backendIds: BackendIds = getBackendIdsFromToken()
const compressionMethod = await utils.getCompressionMethod() const compressionMethod = await utils.getCompressionMethod()
const version = utils.getCacheVersion(
paths,
compressionMethod,
enableCrossOsArchive
)
const twirpClient = cacheTwirpClient.internalCacheTwirpClient() const twirpClient = cacheTwirpClient.internalCacheTwirpClient()
const request: CreateCacheEntryRequest = { let cacheId = -1
workflowRunBackendId: backendIds.workflowRunBackendId,
workflowJobRunBackendId: backendIds.workflowJobRunBackendId,
key: key,
version: version
}
const response: CreateCacheEntryResponse = await twirpClient.CreateCacheEntry(request)
core.info(`CreateCacheEntryResponse: ${JSON.stringify(response)}`)
// Archive const cachePaths = await utils.resolvePaths(paths)
// We're going to handle 1 path fow now. This needs to be fixed to handle all core.debug('Cache Paths:')
// paths passed in. core.debug(`${JSON.stringify(cachePaths)}`)
const rootDir = path.dirname(paths[0])
const zipSpecs: UploadZipSpecification[] = getUploadZipSpecification(paths, rootDir) if (cachePaths.length === 0) {
if (zipSpecs.length === 0) {
throw new Error( throw new Error(
`Error with zip specs: ${zipSpecs.flatMap(s => (s.sourcePath ? [s.sourcePath] : [])).join(', ')}` `Path Validation Error: Path(s) specified in the action for caching do(es) not exist, hence no cache is being saved.`
) )
} }
// 0: No compression const archiveFolder = await utils.createTempDirectory()
// 1: Best speed const archivePath = path.join(
// 6: Default compression (same as GNU Gzip) archiveFolder,
// 9: Best compression Higher levels will result in better compression, but will take longer to complete. For large files that are not easily compressed, a value of 0 is recommended for significantly faster uploads. utils.getCacheFileName(compressionMethod)
const zipUploadStream = await createZipUploadStream(
zipSpecs,
6
) )
// Cache v2 upload core.debug(`Archive Path: ${archivePath}`)
// inputs:
// - getSignedUploadURL
// - archivePath
core.info(`Saving Cache v2: ${paths[0]}`)
await UploadCacheStream(response.signedUploadUrl, zipUploadStream)
// Finalize the cache entry try {
const finalizeRequest: FinalizeCacheEntryUploadRequest = { await createTar(archiveFolder, cachePaths, compressionMethod)
workflowRunBackendId: backendIds.workflowRunBackendId, if (core.isDebug()) {
workflowJobRunBackendId: backendIds.workflowJobRunBackendId, await listTar(archivePath, compressionMethod)
key: key, }
version: version,
sizeBytes: "1024", const archiveFileSize = utils.getArchiveFileSizeInBytes(archivePath)
core.debug(`File Size: ${archiveFileSize}`)
// For GHES, this check will take place in ReserveCache API with enterprise file size limit
if (archiveFileSize > CacheFileSizeLimit && !utils.isGhes()) {
throw new Error(
`Cache size of ~${Math.round(
archiveFileSize / (1024 * 1024)
)} MB (${archiveFileSize} B) is over the 10GB limit, not saving cache.`
)
}
core.debug('Reserving Cache')
const version = utils.getCacheVersion(
paths,
compressionMethod,
enableCrossOsArchive
)
const request: CreateCacheEntryRequest = {
workflowRunBackendId: backendIds.workflowRunBackendId,
workflowJobRunBackendId: backendIds.workflowJobRunBackendId,
key: key,
version: version
}
const response: CreateCacheEntryResponse = await twirpClient.CreateCacheEntry(request)
core.info(`CreateCacheEntryResponse: ${JSON.stringify(response)}`)
// TODO: handle the error cases here
if (!response.ok) {
throw new ReserveCacheError(
`Unable to reserve cache with key ${key}, another job may be creating this cache.`
)
}
// TODO: mask the signed upload URL
core.debug(`Saving Cache to: ${response.signedUploadUrl}`)
await UploadCacheFile(
response.signedUploadUrl,
archivePath,
)
const finalizeRequest: FinalizeCacheEntryUploadRequest = {
workflowRunBackendId: backendIds.workflowRunBackendId,
workflowJobRunBackendId: backendIds.workflowJobRunBackendId,
key: key,
version: version,
sizeBytes: `${archiveFileSize}`,
}
const finalizeResponse: FinalizeCacheEntryUploadResponse = await twirpClient.FinalizeCacheEntryUpload(finalizeRequest)
core.debug(`FinalizeCacheEntryUploadResponse: ${JSON.stringify(finalizeResponse)}`)
if (!finalizeResponse.ok) {
throw new Error(
`Unable to finalize cache with key ${key}, another job may be finalizing this cache.`
)
}
// TODO: this is not great, we should handle the types without parsing
cacheId = parseInt(finalizeResponse.entryId)
} catch (error) {
const typedError = error as Error
core.debug(typedError.message)
} finally {
// Try to delete the archive to save space
try {
await utils.unlinkFile(archivePath)
} catch (error) {
core.debug(`Failed to delete archive: ${error}`)
}
} }
const finalizeResponse: FinalizeCacheEntryUploadResponse = await twirpClient.FinalizeCacheEntryUpload(finalizeRequest) return cacheId
core.info(`FinalizeCacheEntryUploadResponse: ${JSON.stringify(finalizeResponse)}`)
return 0
} }

View File

@ -1,8 +1,8 @@
import { HttpClient, HttpClientResponse, HttpCodes } from '@actions/http-client'
import { BearerCredentialHandler } from '@actions/http-client/lib/auth'
import { info, debug } from '@actions/core' import { info, debug } from '@actions/core'
import { CacheServiceClientJSON } from '../generated/results/api/v1/cache.twirp'
import { getRuntimeToken, getCacheServiceURL } from './config' import { getRuntimeToken, getCacheServiceURL } from './config'
import { BearerCredentialHandler } from '@actions/http-client/lib/auth'
import { HttpClient, HttpClientResponse, HttpCodes } from '@actions/http-client'
import { CacheServiceClientJSON } from '../generated/results/api/v1/cache.twirp'
// import {getUserAgentString} from './user-agent' // import {getUserAgentString} from './user-agent'
// import {NetworkError, UsageError} from './errors' // import {NetworkError, UsageError} from './errors'
@ -16,6 +16,13 @@ interface Rpc {
): Promise<object | Uint8Array> ): Promise<object | Uint8Array>
} }
/**
* This class is a wrapper around the CacheServiceClientJSON class generated by Twirp.
*
* It adds retry logic to the request method, which is not present in the generated client.
*
* This class is used to interact with cache service v2.
*/
class CacheServiceClient implements Rpc { class CacheServiceClient implements Rpc {
private httpClient: HttpClient private httpClient: HttpClient
private baseUrl: string private baseUrl: string

View File

@ -36,3 +36,5 @@ export const SystemTarPathOnWindows = `${process.env['SYSTEMDRIVE']}\\Windows\\S
export const TarFilename = 'cache.tar' export const TarFilename = 'cache.tar'
export const ManifestFilename = 'manifest.txt' export const ManifestFilename = 'manifest.txt'
export const CacheFileSizeLimit = 10 * Math.pow(1024, 3) // 10GiB per repository

View File

@ -1,68 +1,25 @@
import * as core from '@actions/core' import * as core from '@actions/core'
import * as httpClient from '@actions/http-client'
import unzip from 'unzip-stream'
const packageJson = require('../../../package.json')
export async function StreamExtract(url: string, directory: string): Promise<void> { import {
let retryCount = 0 BlobClient,
while (retryCount < 5) { BlockBlobClient,
try { BlobDownloadOptions,
await streamExtractExternal(url, directory) } from '@azure/storage-blob'
return
} catch (error) {
retryCount++
core.info(
`Failed to download cache after ${retryCount} retries due to ${error.message}. Retrying in 5 seconds...`
)
// wait 5 seconds before retrying
await new Promise(resolve => setTimeout(resolve, 5000))
}
}
throw new Error(`Cache download failed after ${retryCount} retries.`) export async function DownloadCacheFile(
} signedUploadURL: string,
archivePath: string,
export async function streamExtractExternal( ): Promise<{}> {
url: string, const downloadOptions: BlobDownloadOptions = {
directory: string maxRetryRequests: 5,
): Promise<void> {
const client = new httpClient.HttpClient(`@actions/cache-${packageJson.version}`)
const response = await client.get(url)
if (response.message.statusCode !== 200) {
core.info(`Failed to download cache. HTTP status code: ${response.message.statusCode}`)
throw new Error(
`Unexpected HTTP response from blob storage: ${response.message.statusCode} ${response.message.statusMessage}`
)
}
const timeout = 30 * 1000 // 30 seconds
return new Promise((resolve, reject) => {
const timerFn = (): void => {
response.message.destroy(
new Error(`Blob storage chunk did not respond in ${timeout}ms`)
)
}
const timer = setTimeout(timerFn, timeout)
response.message
.on('data', () => {
timer.refresh()
})
.on('error', (error: Error) => {
core.info(
`response.message: Cache download failed: ${error.message}`
)
clearTimeout(timer)
reject(error)
})
.pipe(unzip.Extract({path: directory}))
.on('close', () => {
clearTimeout(timer)
resolve()
})
.on('error', (error: Error) => {
reject(error)
})
})
} }
// TODO: tighten the configuration and pass the appropriate user-agent
const blobClient: BlobClient = new BlobClient(signedUploadURL)
const blockBlobClient: BlockBlobClient = blobClient.getBlockBlobClient()
core.debug(`BlobClient: ${JSON.stringify(blobClient)}`)
core.debug(`blockBlobClient: ${JSON.stringify(blockBlobClient)}`)
return blockBlobClient.downloadToFile(archivePath, 0, undefined, downloadOptions)
}

View File

@ -1,130 +1,27 @@
import * as core from '@actions/core' import * as core from '@actions/core'
import { CreateCacheEntryResponse } from '../../generated/results/api/v1/cache'
import { ZipUploadStream } from '@actions/artifact/lib/internal/upload/zip'
import { NetworkError } from '@actions/artifact/'
import { TransferProgressEvent } from '@azure/core-http'
import * as stream from 'stream'
import * as crypto from 'crypto'
import { import {
BlobClient, BlobClient,
BlockBlobClient, BlockBlobClient,
BlockBlobUploadStreamOptions,
BlockBlobParallelUploadOptions BlockBlobParallelUploadOptions
} from '@azure/storage-blob' } from '@azure/storage-blob'
export async function UploadCacheStream(
signedUploadURL: string,
zipUploadStream: ZipUploadStream
): Promise<{}> {
let uploadByteCount = 0
let lastProgressTime = Date.now()
let timeoutId: NodeJS.Timeout | undefined
const chunkTimer = (timeout: number): NodeJS.Timeout => {
// clear the previous timeout
if (timeoutId) {
clearTimeout(timeoutId)
}
timeoutId = setTimeout(() => {
const now = Date.now()
// if there's been more than 30 seconds since the
// last progress event, then we'll consider the upload stalled
if (now - lastProgressTime > timeout) {
throw new Error('Upload progress stalled.')
}
}, timeout)
return timeoutId
}
const maxConcurrency = 32
const bufferSize = 8 * 1024 * 1024 // 8 MB Chunks
const blobClient = new BlobClient(signedUploadURL)
const blockBlobClient = blobClient.getBlockBlobClient()
const timeoutDuration = 300000 // 30 seconds
core.debug(
`Uploading cache zip to blob storage with maxConcurrency: ${maxConcurrency}, bufferSize: ${bufferSize}`
)
const uploadCallback = (progress: TransferProgressEvent): void => {
core.info(`Uploaded bytes ${progress.loadedBytes}`)
uploadByteCount = progress.loadedBytes
chunkTimer(timeoutDuration)
lastProgressTime = Date.now()
}
const options: BlockBlobUploadStreamOptions = {
blobHTTPHeaders: { blobContentType: 'zip' },
onProgress: uploadCallback
}
let sha256Hash: string | undefined = undefined
const uploadStream = new stream.PassThrough()
const hashStream = crypto.createHash('sha256')
zipUploadStream.pipe(uploadStream) // This stream is used for the upload
zipUploadStream.pipe(hashStream).setEncoding('hex') // This stream is used to compute a hash of the zip content that gets used. Integrity check
core.info('Beginning upload of cache to blob storage')
try {
// Start the chunk timer
timeoutId = chunkTimer(timeoutDuration)
await blockBlobClient.uploadStream(
uploadStream,
bufferSize,
maxConcurrency,
options
)
} catch (error) {
if (NetworkError.isNetworkErrorCode(error?.code)) {
throw new NetworkError(error?.code)
}
throw error
} finally {
// clear the timeout whether or not the upload completes
if (timeoutId) {
clearTimeout(timeoutId)
}
}
core.info('Finished uploading cache content to blob storage!')
hashStream.end()
sha256Hash = hashStream.read() as string
core.info(`SHA256 hash of uploaded artifact zip is ${sha256Hash}`)
core.info(`Uploaded: ${uploadByteCount} bytes`)
if (uploadByteCount === 0) {
core.error(
`No data was uploaded to blob storage. Reported upload byte count is 0.`
)
}
return {
uploadSize: uploadByteCount,
sha256Hash
}
}
export async function UploadCacheFile( export async function UploadCacheFile(
uploadURL: CreateCacheEntryResponse, signedUploadURL: string,
archivePath: string, archivePath: string,
): Promise<{}> { ): Promise<{}> {
core.info(`Uploading ${archivePath} to: ${JSON.stringify(uploadURL)}`) // TODO: tighten the configuration and pass the appropriate user-agent
// Specify data transfer options // Specify data transfer options
const uploadOptions: BlockBlobParallelUploadOptions = { const uploadOptions: BlockBlobParallelUploadOptions = {
blockSize: 4 * 1024 * 1024, // 4 MiB max block size blockSize: 4 * 1024 * 1024, // 4 MiB max block size
concurrency: 2, // maximum number of parallel transfer workers concurrency: 4, // maximum number of parallel transfer workers
maxSingleShotSize: 8 * 1024 * 1024, // 8 MiB initial transfer size maxSingleShotSize: 8 * 1024 * 1024, // 8 MiB initial transfer size
}; };
const blobClient: BlobClient = new BlobClient(uploadURL.signedUploadUrl) const blobClient: BlobClient = new BlobClient(signedUploadURL)
const blockBlobClient: BlockBlobClient = blobClient.getBlockBlobClient() const blockBlobClient: BlockBlobClient = blobClient.getBlockBlobClient()
core.info(`BlobClient: ${JSON.stringify(blobClient)}`) core.debug(`BlobClient: ${JSON.stringify(blobClient)}`)
core.info(`blockBlobClient: ${JSON.stringify(blockBlobClient)}`) core.debug(`blockBlobClient: ${JSON.stringify(blockBlobClient)}`)
return blockBlobClient.uploadFile(archivePath, uploadOptions); return blockBlobClient.uploadFile(archivePath, uploadOptions);
} }