1
0
Fork 0

Initial implementation of support for caching and restoring buffers.

pull/1762/head
Ian Clanton-Thuon 2024-06-16 16:21:51 -04:00
parent 361a115e53
commit 2dbcd8740f
2 changed files with 154 additions and 68 deletions

View File

@ -8,6 +8,7 @@ import {
import * as crypto from 'crypto'
import * as fs from 'fs'
import {URL} from 'url'
import {Readable as ReadableStream} from 'stream';
import * as utils from './cacheUtils'
import {CompressionMethod} from './constants'
@ -21,6 +22,7 @@ import {
ArtifactCacheList
} from './contracts'
import {
type ChunkWriteCallback,
downloadCacheHttpClient,
downloadCacheHttpClientConcurrent,
downloadCacheStorageSDK
@ -39,6 +41,8 @@ import {
const versionSalt = '1.0'
type GetStreamForChunk = (start: number, end: number) => NodeJS.ReadableStream
function getCacheApiUrl(resource: string): string {
const baseUrl: string = process.env['ACTIONS_CACHE_URL'] || ''
if (!baseUrl) {
@ -168,10 +172,66 @@ async function printCachesListForDiagnostics(
}
}
export async function downloadCacheToBuffer(archiveLocation: string, options?: DownloadOptions): Promise<Buffer> {
let lastWrittenChunkLocation: number = 0
let writtenBytes: number = 0
interface IChunk {
chunkBuffer: Buffer
chunkLength: number
bufferPosition: number
}
const bufferChunks: IChunk[] = []
await downloadCacheInternal(archiveLocation,
(chunkBuffer, chunkLength = chunkBuffer.length, bufferPosition = lastWrittenChunkLocation) => {
bufferChunks.push({ chunkBuffer, chunkLength, bufferPosition })
lastWrittenChunkLocation = bufferPosition + chunkLength
writtenBytes = writtenBytes > lastWrittenChunkLocation ? writtenBytes : lastWrittenChunkLocation
},
() => writtenBytes,
options
)
if (bufferChunks.length === 1) {
const [{ chunkBuffer }] = bufferChunks
return chunkBuffer;
} else if (bufferChunks.length === 0) {
return Buffer.alloc(0)
} else {
const finalBuffer = Buffer.alloc(writtenBytes)
for (const { chunkBuffer: buffer, bufferPosition: position, chunkLength: length} of bufferChunks) {
buffer.copy(finalBuffer, position, 0, length)
}
return finalBuffer
}
}
export async function downloadCache(
archiveLocation: string,
archivePath: string,
options?: DownloadOptions
): Promise<void> {
const archiveDescriptor = await fs.promises.open(archivePath, 'w')
try {
await downloadCacheInternal(archiveLocation,
async (buffer, length, position) => {
await archiveDescriptor.write(buffer, 0, length, position)
},
() => utils.getArchiveFileSizeInBytes(archivePath),
options
)
} finally {
await archiveDescriptor.close()
}
}
async function downloadCacheInternal(
archiveLocation: string,
onChunk: ChunkWriteCallback,
getWrittenLength: () => number,
options: DownloadOptions | undefined
): Promise<void> {
const archiveUrl = new URL(archiveLocation)
const downloadOptions = getDownloadOptions(options)
@ -181,22 +241,23 @@ export async function downloadCache(
// Use Azure storage SDK to download caches hosted on Azure to improve speed and reliability.
await downloadCacheStorageSDK(
archiveLocation,
archivePath,
onChunk,
getWrittenLength,
downloadOptions
)
} else if (downloadOptions.concurrentBlobDownloads) {
// Use concurrent implementation with HttpClient to work around blob SDK issue
await downloadCacheHttpClientConcurrent(
archiveLocation,
archivePath,
onChunk,
downloadOptions
)
} else {
// Otherwise, download using the Actions http-client.
await downloadCacheHttpClient(archiveLocation, archivePath)
await downloadCacheHttpClient(archiveLocation, onChunk, getWrittenLength)
}
} else {
await downloadCacheHttpClient(archiveLocation, archivePath)
await downloadCacheHttpClient(archiveLocation, onChunk, getWrittenLength)
}
}
@ -277,13 +338,12 @@ async function uploadChunk(
async function uploadFile(
httpClient: HttpClient,
cacheId: number,
archivePath: string,
fileSize: number,
getReadStreamForChunk: GetStreamForChunk,
options?: UploadOptions
): Promise<void> {
// Upload Chunks
const fileSize = utils.getArchiveFileSizeInBytes(archivePath)
const resourceUrl = getCacheApiUrl(`caches/${cacheId.toString()}`)
const fd = fs.openSync(archivePath, 'r')
const uploadOptions = getUploadOptions(options)
const concurrency = utils.assertDefined(
@ -299,41 +359,24 @@ async function uploadFile(
core.debug('Awaiting all uploads')
let offset = 0
try {
await Promise.all(
parallelUploads.map(async () => {
while (offset < fileSize) {
const chunkSize = Math.min(fileSize - offset, maxChunkSize)
const start = offset
const end = offset + chunkSize - 1
offset += maxChunkSize
await Promise.all(
parallelUploads.map(async () => {
while (offset < fileSize) {
const chunkSize = Math.min(fileSize - offset, maxChunkSize)
const start = offset
const end = offset + chunkSize - 1
offset += maxChunkSize
await uploadChunk(
httpClient,
resourceUrl,
() =>
fs
.createReadStream(archivePath, {
fd,
start,
end,
autoClose: false
})
.on('error', error => {
throw new Error(
`Cache upload failed because file read failed with ${error.message}`
)
}),
start,
end
)
}
})
)
} finally {
fs.closeSync(fd)
}
return
await uploadChunk(
httpClient,
resourceUrl,
() => getReadStreamForChunk(start, end),
start,
end
)
}
})
)
}
async function commitCache(
@ -354,15 +397,65 @@ export async function saveCache(
cacheId: number,
archivePath: string,
options?: UploadOptions
): Promise<void> {
const fileSize = utils.getArchiveFileSizeInBytes(archivePath)
const fd = fs.openSync(archivePath, 'r')
try {
await saveCacheInner(
cacheId,
(start, end) => fs
.createReadStream(archivePath, {
fd,
start,
end,
autoClose: false
})
.on('error', error => {
throw new Error(
`Cache upload failed because file read failed with ${error.message}`
)
}),
fileSize,
options
)
} finally {
fs.closeSync(fd)
}
}
export async function saveCacheBuffer(
cacheId: number,
buffer: Buffer,
options?: UploadOptions
): Promise<void> {
await saveCacheInner(
cacheId,
(start, end) => ReadableStream.from(buffer.subarray(start, end + 1)),
buffer.length,
options
)
}
async function saveCacheInner(
cacheId: number,
getReadStreamForChunk: GetStreamForChunk,
cacheSize: number,
options?: UploadOptions
): Promise<void> {
const httpClient = createHttpClient()
core.debug('Upload cache')
await uploadFile(httpClient, cacheId, archivePath, options)
await uploadFile(
httpClient,
cacheId,
cacheSize,
getReadStreamForChunk,
options
)
// Commit Cache
core.debug('Commiting cache')
const cacheSize = utils.getArchiveFileSizeInBytes(archivePath)
core.debug('Committing cache')
core.info(
`Cache Size: ~${Math.round(cacheSize / (1024 * 1024))} MB (${cacheSize} B)`
)

View File

@ -3,17 +3,17 @@ import {HttpClient, HttpClientResponse} from '@actions/http-client'
import {BlockBlobClient} from '@azure/storage-blob'
import {TransferProgressEvent} from '@azure/ms-rest-js'
import * as buffer from 'buffer'
import * as fs from 'fs'
import * as stream from 'stream'
import * as util from 'util'
import * as utils from './cacheUtils'
import {SocketTimeout} from './constants'
import {DownloadOptions} from '../options'
import {retryHttpClientResponse} from './requestUtils'
import {AbortController} from '@azure/abort-controller'
export type ChunkWriteCallback = (chunk: Buffer, count: number | undefined, offset: number | undefined) => Promise<void> | void
/**
* Pipes the body of a HTTP response to a stream
*
@ -169,9 +169,9 @@ export class DownloadProgress {
*/
export async function downloadCacheHttpClient(
archiveLocation: string,
archivePath: string
onChunk: ChunkWriteCallback,
getWrittenLength: () => number
): Promise<void> {
const writeStream = fs.createWriteStream(archivePath)
const httpClient = new HttpClient('actions/cache')
const downloadResponse = await retryHttpClientResponse(
'downloadCache',
@ -184,14 +184,16 @@ export async function downloadCacheHttpClient(
core.debug(`Aborting download, socket timed out after ${SocketTimeout} ms`)
})
await pipeResponseToStream(downloadResponse, writeStream)
// readBodyBuffer is always defined in http-client
const responseBuffer: Buffer = await downloadResponse.readBodyBuffer!()
onChunk(responseBuffer, undefined, undefined)
// Validate download size.
const contentLengthHeader = downloadResponse.message.headers['content-length']
if (contentLengthHeader) {
const expectedLength = parseInt(contentLengthHeader)
const actualLength = utils.getArchiveFileSizeInBytes(archivePath)
const actualLength = getWrittenLength()
if (actualLength !== expectedLength) {
throw new Error(
@ -211,10 +213,9 @@ export async function downloadCacheHttpClient(
*/
export async function downloadCacheHttpClientConcurrent(
archiveLocation: string,
archivePath: fs.PathLike,
onChunk: ChunkWriteCallback,
options: DownloadOptions
): Promise<void> {
const archiveDescriptor = await fs.promises.open(archivePath, 'w')
const httpClient = new HttpClient('actions/cache', undefined, {
socketTimeout: options.timeoutInMs,
keepAlive: true
@ -270,16 +271,11 @@ export async function downloadCacheHttpClientConcurrent(
| undefined
const waitAndWrite: () => Promise<void> = async () => {
const segment = await Promise.race(Object.values(activeDownloads))
await archiveDescriptor.write(
segment.buffer,
0,
segment.count,
segment.offset
)
const { buffer, count, offset} = await Promise.race(Object.values(activeDownloads))
onChunk(buffer, count, offset)
actives--
delete activeDownloads[segment.offset]
bytesDownloaded += segment.count
delete activeDownloads[offset]
bytesDownloaded += count
progressFn({loadedBytes: bytesDownloaded})
}
@ -297,7 +293,6 @@ export async function downloadCacheHttpClientConcurrent(
}
} finally {
httpClient.dispose()
await archiveDescriptor.close()
}
}
@ -373,7 +368,8 @@ declare class DownloadSegment {
*/
export async function downloadCacheStorageSDK(
archiveLocation: string,
archivePath: string,
onChunk: ChunkWriteCallback,
getWrittenLength: () => number,
options: DownloadOptions
): Promise<void> {
const client = new BlockBlobClient(archiveLocation, undefined, {
@ -394,7 +390,7 @@ export async function downloadCacheStorageSDK(
'Unable to determine content length, downloading file with http-client...'
)
await downloadCacheHttpClient(archiveLocation, archivePath)
await downloadCacheHttpClient(archiveLocation, onChunk, getWrittenLength)
} else {
// Use downloadToBuffer for faster downloads, since internally it splits the
// file into 4 MB chunks which can then be parallelized and retried independently
@ -407,8 +403,6 @@ export async function downloadCacheStorageSDK(
const maxSegmentSize = Math.min(134217728, buffer.constants.MAX_LENGTH)
const downloadProgress = new DownloadProgress(contentLength)
const fd = fs.openSync(archivePath, 'w')
try {
downloadProgress.startDisplayTimer()
const controller = new AbortController()
@ -437,12 +431,11 @@ export async function downloadCacheStorageSDK(
'Aborting cache download as the download time exceeded the timeout.'
)
} else if (Buffer.isBuffer(result)) {
fs.writeFileSync(fd, result)
onChunk(result, undefined, undefined)
}
}
} finally {
downloadProgress.stopDisplayTimer()
fs.closeSync(fd)
}
}
}