From af6daa948cc3b9de6209de0308a3a563468318e5 Mon Sep 17 00:00:00 2001 From: Ian Clanton-Thuon Date: Sun, 16 Jun 2024 16:27:24 -0400 Subject: [PATCH] fixup! Initial implementation of support for caching and restoring buffers. --- .../cache/src/internal/cacheHttpClient.ts | 63 ++++++++++++------- packages/cache/src/internal/downloadUtils.ts | 10 ++- 2 files changed, 48 insertions(+), 25 deletions(-) diff --git a/packages/cache/src/internal/cacheHttpClient.ts b/packages/cache/src/internal/cacheHttpClient.ts index 3fc3345e..ec7fd95c 100644 --- a/packages/cache/src/internal/cacheHttpClient.ts +++ b/packages/cache/src/internal/cacheHttpClient.ts @@ -8,7 +8,7 @@ import { import * as crypto from 'crypto' import * as fs from 'fs' import {URL} from 'url' -import {Readable as ReadableStream} from 'stream'; +import {Readable as ReadableStream} from 'stream' import * as utils from './cacheUtils' import {CompressionMethod} from './constants' @@ -172,7 +172,10 @@ async function printCachesListForDiagnostics( } } -export async function downloadCacheToBuffer(archiveLocation: string, options?: DownloadOptions): Promise { +export async function downloadCacheToBuffer( + archiveLocation: string, + options?: DownloadOptions +): Promise { let lastWrittenChunkLocation: number = 0 let writtenBytes: number = 0 interface IChunk { @@ -182,29 +185,41 @@ export async function downloadCacheToBuffer(archiveLocation: string, options?: D } const bufferChunks: IChunk[] = [] - await downloadCacheInternal(archiveLocation, - (chunkBuffer, chunkLength = chunkBuffer.length, bufferPosition = lastWrittenChunkLocation) => { - bufferChunks.push({ chunkBuffer, chunkLength, bufferPosition }) + await downloadCacheInternal( + archiveLocation, + ( + chunkBuffer, + chunkLength = chunkBuffer.length, + bufferPosition = lastWrittenChunkLocation + ) => { + bufferChunks.push({chunkBuffer, chunkLength, bufferPosition}) lastWrittenChunkLocation = bufferPosition + chunkLength - writtenBytes = writtenBytes > lastWrittenChunkLocation ? writtenBytes : lastWrittenChunkLocation + writtenBytes = + writtenBytes > lastWrittenChunkLocation + ? writtenBytes + : lastWrittenChunkLocation }, () => writtenBytes, options ) if (bufferChunks.length === 1) { - const [{ chunkBuffer }] = bufferChunks - return chunkBuffer; + const [{chunkBuffer}] = bufferChunks + return chunkBuffer } else if (bufferChunks.length === 0) { return Buffer.alloc(0) } else { const finalBuffer = Buffer.alloc(writtenBytes) - for (const { chunkBuffer: buffer, bufferPosition: position, chunkLength: length} of bufferChunks) { + for (const { + chunkBuffer: buffer, + bufferPosition: position, + chunkLength: length + } of bufferChunks) { buffer.copy(finalBuffer, position, 0, length) } return finalBuffer -} + } } export async function downloadCache( @@ -215,7 +230,8 @@ export async function downloadCache( const archiveDescriptor = await fs.promises.open(archivePath, 'w') try { - await downloadCacheInternal(archiveLocation, + await downloadCacheInternal( + archiveLocation, async (buffer, length, position) => { await archiveDescriptor.write(buffer, 0, length, position) }, @@ -404,18 +420,19 @@ export async function saveCache( try { await saveCacheInner( cacheId, - (start, end) => fs - .createReadStream(archivePath, { - fd, - start, - end, - autoClose: false - }) - .on('error', error => { - throw new Error( - `Cache upload failed because file read failed with ${error.message}` - ) - }), + (start, end) => + fs + .createReadStream(archivePath, { + fd, + start, + end, + autoClose: false + }) + .on('error', error => { + throw new Error( + `Cache upload failed because file read failed with ${error.message}` + ) + }), fileSize, options ) diff --git a/packages/cache/src/internal/downloadUtils.ts b/packages/cache/src/internal/downloadUtils.ts index 34f1dd7c..144ea200 100644 --- a/packages/cache/src/internal/downloadUtils.ts +++ b/packages/cache/src/internal/downloadUtils.ts @@ -12,7 +12,11 @@ import {retryHttpClientResponse} from './requestUtils' import {AbortController} from '@azure/abort-controller' -export type ChunkWriteCallback = (chunk: Buffer, count: number | undefined, offset: number | undefined) => Promise | void +export type ChunkWriteCallback = ( + chunk: Buffer, + count: number | undefined, + offset: number | undefined +) => Promise | void /** * Pipes the body of a HTTP response to a stream @@ -271,7 +275,9 @@ export async function downloadCacheHttpClientConcurrent( | undefined const waitAndWrite: () => Promise = async () => { - const { buffer, count, offset} = await Promise.race(Object.values(activeDownloads)) + const {buffer, count, offset} = await Promise.race( + Object.values(activeDownloads) + ) onChunk(buffer, count, offset) actives-- delete activeDownloads[offset]