1
0
Fork 0

feat: modifies implementation of save, restore and introduces delete.

pull/1716/head
Prajjwal 2023-11-22 17:42:10 +05:30
parent c7fc05d955
commit 84879c0f57
9 changed files with 415 additions and 424 deletions

21
packages/warp-cache/.vscode/launch.json vendored Normal file
View File

@ -0,0 +1,21 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"type": "node",
"request": "launch",
"name": "Launch Test.ts",
"skipFiles": [
"<node_internals>/**"
],
"program": "${workspaceFolder}/src/test.ts",
"preLaunchTask": "tsc: build - tsconfig.json",
"outFiles": [
"${workspaceFolder}/lib/**/*.js"
]
}
]
}

View File

@ -17,6 +17,7 @@
"@azure/abort-controller": "^1.1.0",
"@azure/ms-rest-js": "^2.6.0",
"@azure/storage-blob": "^12.13.0",
"axios": "^1.6.2",
"semver": "^6.3.1",
"uuid": "^3.3.3"
},
@ -318,6 +319,29 @@
"resolved": "https://registry.npmjs.org/asynckit/-/asynckit-0.4.0.tgz",
"integrity": "sha512-Oei9OH4tRh0YqU3GxhX79dM/mwVgvbZJaSNaRk+bshkj0S5cfHcgYakreBjrHwatXKbz+IoIdYLxrKim2MjW0Q=="
},
"node_modules/axios": {
"version": "1.6.2",
"resolved": "https://registry.npmjs.org/axios/-/axios-1.6.2.tgz",
"integrity": "sha512-7i24Ri4pmDRfJTR7LDBhsOTtcm+9kjX5WiY1X3wIisx6G9So3pfMkEiU7emUBe46oceVImccTEM3k6C5dbVW8A==",
"dependencies": {
"follow-redirects": "^1.15.0",
"form-data": "^4.0.0",
"proxy-from-env": "^1.1.0"
}
},
"node_modules/axios/node_modules/form-data": {
"version": "4.0.0",
"resolved": "https://registry.npmjs.org/form-data/-/form-data-4.0.0.tgz",
"integrity": "sha512-ETEklSGi5t0QMZuiXoA/Q6vcnxcLQP5vdugSpuAyi6SVGi2clPPp+xgEhuMaHC+zGgn31Kd235W35f7Hykkaww==",
"dependencies": {
"asynckit": "^0.4.0",
"combined-stream": "^1.0.8",
"mime-types": "^2.1.12"
},
"engines": {
"node": ">= 6"
}
},
"node_modules/balanced-match": {
"version": "1.0.2",
"resolved": "https://registry.npmjs.org/balanced-match/-/balanced-match-1.0.2.tgz",
@ -372,6 +396,25 @@
"node": ">=0.8.x"
}
},
"node_modules/follow-redirects": {
"version": "1.15.3",
"resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.15.3.tgz",
"integrity": "sha512-1VzOtuEM8pC9SFU1E+8KfTjZyMztRsgEfwQl44z8A25uy13jSzTj6dyK2Df52iV0vgHCfBwLhDWevLn95w5v6Q==",
"funding": [
{
"type": "individual",
"url": "https://github.com/sponsors/RubenVerborgh"
}
],
"engines": {
"node": ">=4.0"
},
"peerDependenciesMeta": {
"debug": {
"optional": true
}
}
},
"node_modules/form-data": {
"version": "2.5.1",
"resolved": "https://registry.npmjs.org/form-data/-/form-data-2.5.1.tgz",
@ -442,6 +485,11 @@
"node": ">= 0.6.0"
}
},
"node_modules/proxy-from-env": {
"version": "1.1.0",
"resolved": "https://registry.npmjs.org/proxy-from-env/-/proxy-from-env-1.1.0.tgz",
"integrity": "sha512-D+zkORCbA9f1tdWRK0RaCR3GPv50cMxcrz4X8k5LTSUD1Dkw47mKJEZQNunItRTkWwgtaUSo1RVFRIG9ZXiFYg=="
},
"node_modules/sax": {
"version": "1.2.4",
"resolved": "https://registry.npmjs.org/sax/-/sax-1.2.4.tgz",
@ -783,6 +831,28 @@
"resolved": "https://registry.npmjs.org/asynckit/-/asynckit-0.4.0.tgz",
"integrity": "sha512-Oei9OH4tRh0YqU3GxhX79dM/mwVgvbZJaSNaRk+bshkj0S5cfHcgYakreBjrHwatXKbz+IoIdYLxrKim2MjW0Q=="
},
"axios": {
"version": "1.6.2",
"resolved": "https://registry.npmjs.org/axios/-/axios-1.6.2.tgz",
"integrity": "sha512-7i24Ri4pmDRfJTR7LDBhsOTtcm+9kjX5WiY1X3wIisx6G9So3pfMkEiU7emUBe46oceVImccTEM3k6C5dbVW8A==",
"requires": {
"follow-redirects": "^1.15.0",
"form-data": "^4.0.0",
"proxy-from-env": "^1.1.0"
},
"dependencies": {
"form-data": {
"version": "4.0.0",
"resolved": "https://registry.npmjs.org/form-data/-/form-data-4.0.0.tgz",
"integrity": "sha512-ETEklSGi5t0QMZuiXoA/Q6vcnxcLQP5vdugSpuAyi6SVGi2clPPp+xgEhuMaHC+zGgn31Kd235W35f7Hykkaww==",
"requires": {
"asynckit": "^0.4.0",
"combined-stream": "^1.0.8",
"mime-types": "^2.1.12"
}
}
}
},
"balanced-match": {
"version": "1.0.2",
"resolved": "https://registry.npmjs.org/balanced-match/-/balanced-match-1.0.2.tgz",
@ -825,6 +895,11 @@
"resolved": "https://registry.npmjs.org/events/-/events-3.3.0.tgz",
"integrity": "sha512-mQw+2fkQbALzQ7V0MY0IqdnXNOeTtP4r0lN9z7AAawCXgqea7bDii20AYrIBrFd/Hx0M2Ocz6S111CaFkUcb0Q=="
},
"follow-redirects": {
"version": "1.15.3",
"resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.15.3.tgz",
"integrity": "sha512-1VzOtuEM8pC9SFU1E+8KfTjZyMztRsgEfwQl44z8A25uy13jSzTj6dyK2Df52iV0vgHCfBwLhDWevLn95w5v6Q=="
},
"form-data": {
"version": "2.5.1",
"resolved": "https://registry.npmjs.org/form-data/-/form-data-2.5.1.tgz",
@ -869,6 +944,11 @@
"resolved": "https://registry.npmjs.org/process/-/process-0.11.10.tgz",
"integrity": "sha512-cdGef/drWFoydD1JsMzuFf8100nZl+GT+yacc2bEced5f9Rjk4z+WtFUTBu9PhOi9j/jfmBPu0mMEY4wIdAF8A=="
},
"proxy-from-env": {
"version": "1.1.0",
"resolved": "https://registry.npmjs.org/proxy-from-env/-/proxy-from-env-1.1.0.tgz",
"integrity": "sha512-D+zkORCbA9f1tdWRK0RaCR3GPv50cMxcrz4X8k5LTSUD1Dkw47mKJEZQNunItRTkWwgtaUSo1RVFRIG9ZXiFYg=="
},
"sax": {
"version": "1.2.4",
"resolved": "https://registry.npmjs.org/sax/-/sax-1.2.4.tgz",

View File

@ -46,6 +46,7 @@
"@azure/abort-controller": "^1.1.0",
"@azure/ms-rest-js": "^2.6.0",
"@azure/storage-blob": "^12.13.0",
"axios": "^1.6.2",
"semver": "^6.3.1",
"uuid": "^3.3.3"
},

View File

@ -3,7 +3,7 @@ import * as path from 'path'
import * as utils from './internal/cacheUtils'
import * as cacheHttpClient from './internal/cacheHttpClient'
import {createTar, extractTar, listTar} from './internal/tar'
import {DownloadOptions, UploadOptions} from './options'
import {DownloadOptions, getUploadOptions} from './options'
export class ValidationError extends Error {
constructor(message: string) {
@ -95,14 +95,14 @@ export async function restoreCache(
compressionMethod,
enableCrossOsArchive
})
if (!cacheEntry?.archiveLocation) {
if (!cacheEntry?.pre_signed_url) {
// Cache not found
return undefined
}
if (options?.lookupOnly) {
core.info('Lookup only - skipping download')
return cacheEntry.cacheKey
return cacheEntry.cache_key
}
archivePath = path.join(
@ -112,11 +112,7 @@ export async function restoreCache(
core.debug(`Archive Path: ${archivePath}`)
// Download the cache from the cache entry
await cacheHttpClient.downloadCache(
cacheEntry.archiveLocation,
archivePath,
options
)
await cacheHttpClient.downloadCache(cacheEntry.pre_signed_url, archivePath)
if (core.isDebug()) {
await listTar(archivePath, compressionMethod)
@ -132,7 +128,7 @@ export async function restoreCache(
await extractTar(archivePath, compressionMethod)
core.info('Cache restored successfully')
return cacheEntry.cacheKey
return cacheEntry.cache_key
} catch (error) {
const typedError = error as Error
if (typedError.name === ValidationError.name) {
@ -165,16 +161,15 @@ export async function restoreCache(
export async function saveCache(
paths: string[],
key: string,
options?: UploadOptions,
enableCrossOsArchive = false
): Promise<number> {
): Promise<string> {
checkPaths(paths)
checkKey(key)
const compressionMethod = await utils.getCompressionMethod()
let cacheId = -1
const cachePaths = await utils.resolvePaths(paths)
let cacheKey = ''
core.debug('Cache Paths:')
core.debug(`${JSON.stringify(cachePaths)}`)
@ -197,7 +192,7 @@ export async function saveCache(
if (core.isDebug()) {
await listTar(archivePath, compressionMethod)
}
const fileSizeLimit = 10 * 1024 * 1024 * 1024 // 10GB per repo limit
const fileSizeLimit = 20 * 1024 * 1024 * 1024 // 20GB per repo limit
const archiveFileSize = utils.getArchiveFileSizeInBytes(archivePath)
core.debug(`File Size: ${archiveFileSize}`)
@ -211,9 +206,13 @@ export async function saveCache(
}
core.debug('Reserving Cache')
// Calculate number of chunks required
const uploadOptions = getUploadOptions()
const maxChunkSize = uploadOptions?.uploadChunkSize ?? 32 * 1024 * 1024 // Default 32MB
const numberOfChunks = Math.floor(archiveFileSize / maxChunkSize)
const reserveCacheResponse = await cacheHttpClient.reserveCache(
key,
paths,
numberOfChunks,
{
compressionMethod,
enableCrossOsArchive,
@ -221,23 +220,29 @@ export async function saveCache(
}
)
if (reserveCacheResponse?.result?.cacheId) {
cacheId = reserveCacheResponse?.result?.cacheId
} else if (reserveCacheResponse?.statusCode === 400) {
if (reserveCacheResponse?.statusCode === 400) {
throw new Error(
reserveCacheResponse?.error?.message ??
`Cache size of ~${Math.round(
archiveFileSize / (1024 * 1024)
)} MB (${archiveFileSize} B) is over the data cap limit, not saving cache.`
)
} else {
throw new ReserveCacheError(
`Unable to reserve cache with key ${key}, another job may be creating this cache. More details: ${reserveCacheResponse?.error?.message}`
)
}
core.debug(`Saving Cache (ID: ${cacheId})`)
await cacheHttpClient.saveCache(cacheId, archivePath, options)
core.debug(`Saving Cache`)
cacheKey = await cacheHttpClient.saveCache(
key,
cacheHttpClient.getCacheVersion(
paths,
compressionMethod,
enableCrossOsArchive
),
reserveCacheResponse?.result?.upload_id ?? '',
reserveCacheResponse?.result?.upload_key ?? '',
numberOfChunks,
reserveCacheResponse?.result?.pre_signed_urls ?? [],
archivePath
)
} catch (error) {
const typedError = error as Error
if (typedError.name === ValidationError.name) {
@ -256,5 +261,24 @@ export async function saveCache(
}
}
return cacheId
return cacheKey
}
/**
* Deletes an entire cache by cache key.
* @param keys The cache keys
*/
export async function deleteCache(keys: string[]): Promise<void> {
for (const key of keys) {
checkKey(key)
}
core.debug('Deleting Cache')
core.debug(`Cache Keys: ${keys}`)
try {
await cacheHttpClient.deleteCache(keys)
} catch (error) {
core.warning(`Failed to delete cache: ${error}`)
}
}

View File

@ -7,7 +7,6 @@ import {
} from '@actions/http-client/lib/interfaces'
import * as crypto from 'crypto'
import * as fs from 'fs'
import {URL} from 'url'
import * as utils from './cacheUtils'
import {CompressionMethod} from './constants'
@ -18,34 +17,24 @@ import {
ReserveCacheRequest,
ReserveCacheResponse,
ITypedResponseWithError,
ArtifactCacheList
ArtifactCacheList,
InternalS3CompletedPart,
CommitCacheResponse
} from './contracts'
import {
downloadCacheHttpClient,
downloadCacheHttpClientConcurrent,
downloadCacheStorageSDK
} from './downloadUtils'
import {
DownloadOptions,
UploadOptions,
getDownloadOptions,
getUploadOptions
} from '../options'
import {
isSuccessStatusCode,
retryHttpClientResponse,
retryTypedResponse
} from './requestUtils'
import {downloadCacheMultiConnection} from './downloadUtils'
import {isSuccessStatusCode, retry, retryTypedResponse} from './requestUtils'
import axios, {AxiosError} from 'axios'
const versionSalt = '1.0'
function getCacheApiUrl(resource: string): string {
const baseUrl: string = process.env['ACTIONS_CACHE_URL'] ?? 'localhost:8000'
const baseUrl: string =
process.env['ACTIONS_CACHE_URL'] ?? 'http://127.0.0.1:8002'
if (!baseUrl) {
throw new Error('Cache Service Url not found, unable to restore cache.')
}
const url = `${baseUrl}/v1/cache/${resource}`
const url = `${baseUrl}/v1/${resource}`
core.debug(`Resource Url: ${url}`)
return url
}
@ -130,7 +119,7 @@ export async function getCacheEntry(
}
const cacheResult = response.result
const cacheDownloadUrl = cacheResult?.archiveLocation
const cacheDownloadUrl = cacheResult?.pre_signed_url
if (!cacheDownloadUrl) {
// Cache achiveLocation not found. This should never happen, and hence bail out.
throw new Error('Cache not found.')
@ -158,9 +147,9 @@ async function printCachesListForDiagnostics(
core.debug(
`No matching cache found for cache key '${key}', version '${version} and scope ${process.env['GITHUB_REF']}. There exist one or more cache(s) with similar key but they have different version or scope. See more info on cache matching here: https://docs.github.com/en/actions/using-workflows/caching-dependencies-to-speed-up-workflows#matching-a-cache-key \nOther caches with similar key:`
)
for (const cacheEntry of cacheListResult?.artifactCaches || []) {
for (const cacheEntry of cacheListResult?.artifactCaches ?? []) {
core.debug(
`Cache Key: ${cacheEntry?.cacheKey}, Cache Version: ${cacheEntry?.cacheVersion}, Cache Scope: ${cacheEntry?.scope}, Cache Created: ${cacheEntry?.creationTime}`
`Cache Key: ${cacheEntry?.cache_key}, Cache Version: ${cacheEntry?.cache_version}`
)
}
}
@ -169,57 +158,27 @@ async function printCachesListForDiagnostics(
export async function downloadCache(
archiveLocation: string,
archivePath: string,
options?: DownloadOptions
archivePath: string
): Promise<void> {
const archiveUrl = new URL(archiveLocation)
const downloadOptions = getDownloadOptions(options)
if (archiveUrl.hostname.endsWith('.blob.core.windows.net')) {
if (downloadOptions.useAzureSdk) {
// Use Azure storage SDK to download caches hosted on Azure to improve speed and reliability.
await downloadCacheStorageSDK(
archiveLocation,
archivePath,
downloadOptions
)
} else if (downloadOptions.concurrentBlobDownloads) {
// Use concurrent implementation with HttpClient to work around blob SDK issue
await downloadCacheHttpClientConcurrent(
archiveLocation,
archivePath,
downloadOptions
)
} else {
// Otherwise, download using the Actions http-client.
await downloadCacheHttpClient(archiveLocation, archivePath)
}
} else {
await downloadCacheHttpClient(archiveLocation, archivePath)
}
await downloadCacheMultiConnection(archiveLocation, archivePath, 8)
}
// Reserve Cache
export async function reserveCache(
key: string,
paths: string[],
cacheKey: string,
numberOfChunks: number,
options?: InternalCacheOptions
): Promise<ITypedResponseWithError<ReserveCacheResponse>> {
const httpClient = createHttpClient()
const version = getCacheVersion(
paths,
options?.compressionMethod,
options?.enableCrossOsArchive
)
const reserveCacheRequest: ReserveCacheRequest = {
key,
version,
cacheSize: options?.cacheSize
cache_key: cacheKey,
number_of_chunks: numberOfChunks,
content_type: 'application/zstd'
}
const response = await retryTypedResponse('reserveCache', async () =>
httpClient.postJson<ReserveCacheResponse>(
getCacheApiUrl('caches'),
getCacheApiUrl('cache/reserve'),
reserveCacheRequest
)
)
@ -236,12 +195,12 @@ function getContentRange(start: number, end: number): string {
}
async function uploadChunk(
httpClient: HttpClient,
resourceUrl: string,
openStream: () => NodeJS.ReadableStream,
partNumber: number,
start: number,
end: number
): Promise<void> {
): Promise<InternalS3CompletedPart> {
core.debug(
`Uploading chunk of size ${
end - start + 1
@ -250,85 +209,77 @@ async function uploadChunk(
end
)}`
)
const additionalHeaders = {
'Content-Type': 'application/octet-stream',
'Content-Range': getContentRange(start, end)
}
const uploadChunkResponse = await retryHttpClientResponse(
`uploadChunk (start: ${start}, end: ${end})`,
async () =>
httpClient.sendStream(
'PATCH',
resourceUrl,
openStream(),
additionalHeaders
)
)
// Manually convert the readable stream to a buffer. S3 doesn't allow stream as input
const chunks = await utils.streamToBuffer(openStream())
if (!isSuccessStatusCode(uploadChunkResponse.message.statusCode)) {
try {
// HACK: Using axios here as S3 API doesn't allow readable stream as input and Github's HTTP client is not able to send buffer as body
const response = await axios.request({
method: 'PUT',
url: resourceUrl,
headers: {
'Content-Type': 'application/octet-stream'
},
data: chunks
})
return {
ETag: response.headers.etag ?? '',
PartNumber: partNumber
}
} catch (error) {
throw new Error(
`Cache service responded with ${uploadChunkResponse.message.statusCode} during upload chunk.`
`Cache service responded with ${
(error as AxiosError).status
} during upload chunk.`
)
}
}
async function uploadFile(
httpClient: HttpClient,
cacheId: number,
archivePath: string,
options?: UploadOptions
): Promise<void> {
async function uploadFileToS3(
preSignedURLs: string[],
archivePath: string
): Promise<InternalS3CompletedPart[]> {
// Upload Chunks
const fileSize = utils.getArchiveFileSizeInBytes(archivePath)
const resourceUrl = getCacheApiUrl(`caches/${cacheId.toString()}`)
const numberOfChunks = preSignedURLs.length
const fd = fs.openSync(archivePath, 'r')
const uploadOptions = getUploadOptions(options)
const concurrency = utils.assertDefined(
'uploadConcurrency',
uploadOptions.uploadConcurrency
)
const maxChunkSize = utils.assertDefined(
'uploadChunkSize',
uploadOptions.uploadChunkSize
)
const parallelUploads = [...new Array(concurrency).keys()]
core.debug('Awaiting all uploads')
let offset = 0
try {
await Promise.all(
parallelUploads.map(async () => {
while (offset < fileSize) {
const chunkSize = Math.min(fileSize - offset, maxChunkSize)
const start = offset
const end = offset + chunkSize - 1
offset += maxChunkSize
const completedParts = await Promise.all(
preSignedURLs.map(async (presignedURL, index) => {
const chunkSize = Math.ceil(fileSize / numberOfChunks)
const start = offset
const end = offset + chunkSize - 1
offset += chunkSize
await uploadChunk(
httpClient,
resourceUrl,
() =>
fs
.createReadStream(archivePath, {
fd,
start,
end,
autoClose: false
})
.on('error', error => {
throw new Error(
`Cache upload failed because file read failed with ${error.message}`
)
}),
start,
end
)
}
return await uploadChunk(
presignedURL,
() =>
fs
.createReadStream(archivePath, {
fd,
start,
end,
autoClose: false
})
.on('error', error => {
throw new Error(
`Cache upload failed because file read failed with ${error.message}`
)
}),
index + 1,
start,
end
)
})
)
return completedParts
} finally {
fs.closeSync(fd)
}
@ -336,36 +287,68 @@ async function uploadFile(
async function commitCache(
httpClient: HttpClient,
cacheId: number,
filesize: number
): Promise<TypedResponse<null>> {
const commitCacheRequest: CommitCacheRequest = {size: filesize}
cacheKey: string,
cacheVersion: string,
uploadKey: string,
uploadID: string,
parts: InternalS3CompletedPart[]
): Promise<TypedResponse<CommitCacheResponse>> {
const commitCacheRequest: CommitCacheRequest = {
cache_key: cacheKey,
cache_version: cacheVersion,
upload_key: uploadKey,
upload_id: uploadID,
parts: parts,
os: process.env['RUNNER_OS'] ?? 'Linux',
vcs_type: 'github'
}
return await retryTypedResponse('commitCache', async () =>
httpClient.postJson<null>(
getCacheApiUrl(`caches/${cacheId.toString()}`),
httpClient.postJson<CommitCacheResponse>(
getCacheApiUrl(`cache/commit`),
commitCacheRequest
)
)
}
export async function saveCache(
cacheId: number,
archivePath: string,
options?: UploadOptions
): Promise<void> {
cacheKey: string,
cacheVersion: string,
uploadId: string,
uploadKey: string,
numberOfChunks: number,
preSignedURLs: string[],
archivePath: string
): Promise<string> {
// Number of chunks should match the number of pre-signed URLs
if (numberOfChunks !== preSignedURLs.length) {
throw new Error(
`Number of chunks (${numberOfChunks}) should match the number of pre-signed URLs (${preSignedURLs.length}).`
)
}
const httpClient = createHttpClient()
core.debug('Upload cache')
await uploadFile(httpClient, cacheId, archivePath, options)
const completedParts = await uploadFileToS3(preSignedURLs, archivePath)
// Sort parts in ascending order by partNumber
completedParts.sort((a, b) => a.PartNumber - b.PartNumber)
// Commit Cache
core.debug('Commiting cache')
core.debug('Committing cache')
const cacheSize = utils.getArchiveFileSizeInBytes(archivePath)
core.info(
`Cache Size: ~${Math.round(cacheSize / (1024 * 1024))} MB (${cacheSize} B)`
)
const commitCacheResponse = await commitCache(httpClient, cacheId, cacheSize)
const commitCacheResponse = await commitCache(
httpClient,
cacheKey,
cacheVersion,
uploadKey,
uploadId,
completedParts
)
if (!isSuccessStatusCode(commitCacheResponse.statusCode)) {
throw new Error(
`Cache service responded with ${commitCacheResponse.statusCode} during commit cache.`
@ -373,4 +356,16 @@ export async function saveCache(
}
core.info('Cache saved successfully')
return commitCacheResponse.result?.cache_key ?? ''
}
export async function deleteCache(keys: string[]) {
const httpClient = createHttpClient()
const resource = `cache?keys=${encodeURIComponent(keys.join(','))}`
const response = await httpClient.del(getCacheApiUrl(resource))
if (!isSuccessStatusCode(response.message.statusCode)) {
throw new Error(
`Cache service responded with ${response.message.statusCode}`
)
}
}

View File

@ -137,3 +137,12 @@ export function isGhes(): boolean {
)
return ghUrl.hostname.toUpperCase() !== 'GITHUB.COM'
}
export function streamToBuffer(stream: NodeJS.ReadableStream): Promise<Buffer> {
return new Promise<Buffer>((resolve, reject) => {
const buffer: Buffer[] = []
stream.on('data', (chunk: Buffer) => buffer.push(chunk))
stream.on('error', reject)
stream.on('end', () => resolve(Buffer.concat(buffer)))
})
}

View File

@ -7,11 +7,9 @@ export interface ITypedResponseWithError<T> extends TypedResponse<T> {
}
export interface ArtifactCacheEntry {
cacheKey?: string
scope?: string
cacheVersion?: string
creationTime?: string
archiveLocation?: string
cache_key?: string
pre_signed_url?: string
cache_version?: string
}
export interface ArtifactCacheList {
@ -20,17 +18,30 @@ export interface ArtifactCacheList {
}
export interface CommitCacheRequest {
size: number
cache_key: string
cache_version: string
upload_key: string
upload_id: string
parts: InternalS3CompletedPart[]
os: string
vcs_type: string
}
export interface CommitCacheResponse {
cache_key: string
cache_version: string
}
export interface ReserveCacheRequest {
key: string
version?: string
cacheSize?: number
cache_key: string
content_type: string
number_of_chunks: number
}
export interface ReserveCacheResponse {
cacheId: number
pre_signed_urls: string[]
upload_key: string
upload_id: string
}
export interface InternalCacheOptions {
@ -43,3 +54,8 @@ export interface ArchiveTool {
path: string
type: string
}
export interface InternalS3CompletedPart {
ETag: string
PartNumber: number
}

View File

@ -22,10 +22,25 @@ import {AbortController} from '@azure/abort-controller'
*/
async function pipeResponseToStream(
response: HttpClientResponse,
output: NodeJS.WritableStream
output: NodeJS.WritableStream,
progress?: DownloadProgress
): Promise<void> {
const pipeline = util.promisify(stream.pipeline)
await pipeline(response.message, output)
await pipeline(
response.message,
new stream.Transform({
transform(chunk, encoding, callback) {
if (progress) {
progress.setReceivedBytes(
progress.getTransferredBytes() + chunk.length
)
}
this.push(chunk)
callback()
}
}),
output
)
}
/**
@ -204,260 +219,76 @@ export async function downloadCacheHttpClient(
}
/**
* Download the cache using the Actions toolkit http-client concurrently
* Download the cache using the Actions toolkit http-client with multiple connections
*
* @param archiveLocation the URL for the cache
* @param archivePath the local path where the cache is saved
* @param connections number of connections to use
*/
export async function downloadCacheHttpClientConcurrent(
archiveLocation: string,
archivePath: fs.PathLike,
options: DownloadOptions
): Promise<void> {
const archiveDescriptor = await fs.promises.open(archivePath, 'w')
const httpClient = new HttpClient('actions/cache', undefined, {
socketTimeout: options.timeoutInMs,
keepAlive: true
})
try {
const res = await retryHttpClientResponse(
'downloadCacheMetadata',
async () => await httpClient.request('HEAD', archiveLocation, null, {})
)
const lengthHeader = res.message.headers['content-length']
if (lengthHeader === undefined || lengthHeader === null) {
throw new Error('Content-Length not found on blob response')
}
const length = parseInt(lengthHeader)
if (Number.isNaN(length)) {
throw new Error(`Could not interpret Content-Length: ${length}`)
}
const downloads: {
offset: number
promiseGetter: () => Promise<DownloadSegment>
}[] = []
const blockSize = 4 * 1024 * 1024
for (let offset = 0; offset < length; offset += blockSize) {
const count = Math.min(blockSize, length - offset)
downloads.push({
offset,
promiseGetter: async () => {
return await downloadSegmentRetry(
httpClient,
archiveLocation,
offset,
count
)
}
})
}
// reverse to use .pop instead of .shift
downloads.reverse()
let actives = 0
let bytesDownloaded = 0
const progress = new DownloadProgress(length)
progress.startDisplayTimer()
const progressFn = progress.onProgress()
const activeDownloads: {[offset: number]: Promise<DownloadSegment>} = []
let nextDownload:
| {offset: number; promiseGetter: () => Promise<DownloadSegment>}
| undefined
const waitAndWrite: () => Promise<void> = async () => {
const segment = await Promise.race(Object.values(activeDownloads))
await archiveDescriptor.write(
segment.buffer,
0,
segment.count,
segment.offset
)
actives--
delete activeDownloads[segment.offset]
bytesDownloaded += segment.count
progressFn({loadedBytes: bytesDownloaded})
}
while ((nextDownload = downloads.pop())) {
activeDownloads[nextDownload.offset] = nextDownload.promiseGetter()
actives++
if (actives >= (options.downloadConcurrency ?? 10)) {
await waitAndWrite()
}
}
while (actives > 0) {
await waitAndWrite()
}
} finally {
httpClient.dispose()
await archiveDescriptor.close()
}
}
async function downloadSegmentRetry(
httpClient: HttpClient,
archiveLocation: string,
offset: number,
count: number
): Promise<DownloadSegment> {
const retries = 5
let failures = 0
while (true) {
try {
const timeout = 30000
const result = await promiseWithTimeout(
timeout,
downloadSegment(httpClient, archiveLocation, offset, count)
)
if (typeof result === 'string') {
throw new Error('downloadSegmentRetry failed due to timeout')
}
return result
} catch (err) {
if (failures >= retries) {
throw err
}
failures++
}
}
}
async function downloadSegment(
httpClient: HttpClient,
archiveLocation: string,
offset: number,
count: number
): Promise<DownloadSegment> {
const partRes = await retryHttpClientResponse(
'downloadCachePart',
async () =>
await httpClient.get(archiveLocation, {
Range: `bytes=${offset}-${offset + count - 1}`
})
)
if (!partRes.readBodyBuffer) {
throw new Error('Expected HttpClientResponse to implement readBodyBuffer')
}
return {
offset,
count,
buffer: await partRes.readBodyBuffer()
}
}
declare class DownloadSegment {
offset: number
count: number
buffer: Buffer
}
/**
* Download the cache using the Azure Storage SDK. Only call this method if the
* URL points to an Azure Storage endpoint.
*
* @param archiveLocation the URL for the cache
* @param archivePath the local path where the cache is saved
* @param options the download options with the defaults set
*/
export async function downloadCacheStorageSDK(
export async function downloadCacheMultiConnection(
archiveLocation: string,
archivePath: string,
options: DownloadOptions
connections: number
): Promise<void> {
const client = new BlockBlobClient(archiveLocation, undefined, {
retryOptions: {
// Override the timeout used when downloading each 4 MB chunk
// The default is 2 min / MB, which is way too slow
tryTimeoutInMs: options.timeoutInMs
}
})
const properties = await client.getProperties()
const contentLength = properties.contentLength ?? -1
if (contentLength < 0) {
// We should never hit this condition, but just in case fall back to downloading the
// file as one large stream
core.debug(
'Unable to determine content length, downloading file with http-client...'
let fileHandle: fs.promises.FileHandle | null = null
let downloadProgress: DownloadProgress | null = null
try {
fileHandle = await fs.promises.open(archivePath, 'w+')
const httpClient = new HttpClient('actions/cache')
//Request 1 byte to get total content size
const metadataResponse = await retryHttpClientResponse(
'downloadCache',
async () =>
httpClient.get(archiveLocation, {
Range: 'bytes=0-1'
})
)
await downloadCacheHttpClient(archiveLocation, archivePath)
} else {
// Use downloadToBuffer for faster downloads, since internally it splits the
// file into 4 MB chunks which can then be parallelized and retried independently
//
// If the file exceeds the buffer maximum length (~1 GB on 32-bit systems and ~2 GB
// on 64-bit systems), split the download into multiple segments
// ~2 GB = 2147483647, beyond this, we start getting out of range error. So, capping it accordingly.
// Updated segment size to 128MB = 134217728 bytes, to complete a segment faster and fail fast
const maxSegmentSize = Math.min(134217728, buffer.constants.MAX_LENGTH)
const downloadProgress = new DownloadProgress(contentLength)
const fd = fs.openSync(archivePath, 'w')
try {
downloadProgress.startDisplayTimer()
const controller = new AbortController()
const abortSignal = controller.signal
while (!downloadProgress.isDone()) {
const segmentStart =
downloadProgress.segmentOffset + downloadProgress.segmentSize
const segmentSize = Math.min(
maxSegmentSize,
contentLength - segmentStart
)
downloadProgress.nextSegment(segmentSize)
const result = await promiseWithTimeout(
options.segmentTimeoutInMs || 3600000,
client.downloadToBuffer(segmentStart, segmentSize, {
abortSignal,
concurrency: options.downloadConcurrency,
onProgress: downloadProgress.onProgress()
})
)
if (result === 'timeout') {
controller.abort()
throw new Error(
'Aborting cache download as the download time exceeded the timeout.'
)
} else if (Buffer.isBuffer(result)) {
fs.writeFileSync(fd, result)
}
}
} finally {
downloadProgress.stopDisplayTimer()
fs.closeSync(fd)
const contentRange = metadataResponse.message.headers['content-range']
if (!contentRange) {
console.log(await metadataResponse.readBody())
throw new Error('Range request not supported by server')
}
const match = RegExp(/bytes \d+-\d+\/(\d+)/).exec(contentRange)
if (!match) {
throw new Error(
'Content-Range header in server response not in correct format'
)
}
const totalLength = parseInt(match[1])
await fileHandle.truncate(totalLength)
await fileHandle.sync()
downloadProgress = new DownloadProgress(totalLength)
downloadProgress.startDisplayTimer()
const segmentSize = Math.ceil(totalLength / connections)
const promises: Promise<void>[] = []
for (let i = 0; i < connections; i++) {
promises.push(
(async () => {
const rangeStart = i * segmentSize
const rangeEnd = Math.min((i + 1) * segmentSize - 1, totalLength - 1)
const downloadResponse = await retryHttpClientResponse(
'downloadCache',
async () =>
httpClient.get(archiveLocation, {
Range: `bytes=${rangeStart}-${rangeEnd}`
})
)
const writeStream = fs.createWriteStream(archiveLocation, {
fd: fileHandle.fd,
autoClose: false,
start: rangeStart
})
await pipeResponseToStream(
downloadResponse,
writeStream,
downloadProgress
)
})()
)
}
await Promise.all(promises)
} finally {
downloadProgress?.stopDisplayTimer()
await fileHandle?.close()
}
}
const promiseWithTimeout = async <T>(
timeoutMs: number,
promise: Promise<T>
): Promise<T | string> => {
let timeoutHandle: NodeJS.Timeout
const timeoutPromise = new Promise<string>(resolve => {
timeoutHandle = setTimeout(() => resolve('timeout'), timeoutMs)
})
return Promise.race([promise, timeoutPromise]).then(result => {
clearTimeout(timeoutHandle)
return result
})
}

View File

@ -1,6 +1,20 @@
import {restoreCache} from './cache'
import {deleteCache, restoreCache, saveCache} from './cache'
restoreCache(['/home/runner/work/_temp'], 'cache-key', [
'cache-key-1',
'cache-key-2'
])
process.env['RUNNER_TEMP'] = '/Users/prajjwal/Repos/warpbuild/playground/tmp_fs'
process.env['NODE_DEBUG'] = 'http'
// saveCache(
// ['/Users/prajjwal/Repos/warpbuild/playground/test_fs'],
// 'test-fs-local-key',
// true
// )
// restoreCache(
// ['/Users/prajjwal/Repos/warpbuild/playground/test_fs'],
// 'test-fs-local-key',
// [],
// {},
// true
// )
// deleteCache(['test-fs-local-key'])