1
0
Fork 0

fixup! Initial implementation of support for caching and restoring buffers.

pull/1762/head
Ian Clanton-Thuon 2024-06-16 16:27:24 -04:00
parent 2dbcd8740f
commit af6daa948c
2 changed files with 48 additions and 25 deletions

View File

@ -8,7 +8,7 @@ import {
import * as crypto from 'crypto' import * as crypto from 'crypto'
import * as fs from 'fs' import * as fs from 'fs'
import {URL} from 'url' import {URL} from 'url'
import {Readable as ReadableStream} from 'stream'; import {Readable as ReadableStream} from 'stream'
import * as utils from './cacheUtils' import * as utils from './cacheUtils'
import {CompressionMethod} from './constants' import {CompressionMethod} from './constants'
@ -172,7 +172,10 @@ async function printCachesListForDiagnostics(
} }
} }
export async function downloadCacheToBuffer(archiveLocation: string, options?: DownloadOptions): Promise<Buffer> { export async function downloadCacheToBuffer(
archiveLocation: string,
options?: DownloadOptions
): Promise<Buffer> {
let lastWrittenChunkLocation: number = 0 let lastWrittenChunkLocation: number = 0
let writtenBytes: number = 0 let writtenBytes: number = 0
interface IChunk { interface IChunk {
@ -182,29 +185,41 @@ export async function downloadCacheToBuffer(archiveLocation: string, options?: D
} }
const bufferChunks: IChunk[] = [] const bufferChunks: IChunk[] = []
await downloadCacheInternal(archiveLocation, await downloadCacheInternal(
(chunkBuffer, chunkLength = chunkBuffer.length, bufferPosition = lastWrittenChunkLocation) => { archiveLocation,
bufferChunks.push({ chunkBuffer, chunkLength, bufferPosition }) (
chunkBuffer,
chunkLength = chunkBuffer.length,
bufferPosition = lastWrittenChunkLocation
) => {
bufferChunks.push({chunkBuffer, chunkLength, bufferPosition})
lastWrittenChunkLocation = bufferPosition + chunkLength lastWrittenChunkLocation = bufferPosition + chunkLength
writtenBytes = writtenBytes > lastWrittenChunkLocation ? writtenBytes : lastWrittenChunkLocation writtenBytes =
writtenBytes > lastWrittenChunkLocation
? writtenBytes
: lastWrittenChunkLocation
}, },
() => writtenBytes, () => writtenBytes,
options options
) )
if (bufferChunks.length === 1) { if (bufferChunks.length === 1) {
const [{ chunkBuffer }] = bufferChunks const [{chunkBuffer}] = bufferChunks
return chunkBuffer; return chunkBuffer
} else if (bufferChunks.length === 0) { } else if (bufferChunks.length === 0) {
return Buffer.alloc(0) return Buffer.alloc(0)
} else { } else {
const finalBuffer = Buffer.alloc(writtenBytes) const finalBuffer = Buffer.alloc(writtenBytes)
for (const { chunkBuffer: buffer, bufferPosition: position, chunkLength: length} of bufferChunks) { for (const {
chunkBuffer: buffer,
bufferPosition: position,
chunkLength: length
} of bufferChunks) {
buffer.copy(finalBuffer, position, 0, length) buffer.copy(finalBuffer, position, 0, length)
} }
return finalBuffer return finalBuffer
} }
} }
export async function downloadCache( export async function downloadCache(
@ -215,7 +230,8 @@ export async function downloadCache(
const archiveDescriptor = await fs.promises.open(archivePath, 'w') const archiveDescriptor = await fs.promises.open(archivePath, 'w')
try { try {
await downloadCacheInternal(archiveLocation, await downloadCacheInternal(
archiveLocation,
async (buffer, length, position) => { async (buffer, length, position) => {
await archiveDescriptor.write(buffer, 0, length, position) await archiveDescriptor.write(buffer, 0, length, position)
}, },
@ -404,18 +420,19 @@ export async function saveCache(
try { try {
await saveCacheInner( await saveCacheInner(
cacheId, cacheId,
(start, end) => fs (start, end) =>
.createReadStream(archivePath, { fs
fd, .createReadStream(archivePath, {
start, fd,
end, start,
autoClose: false end,
}) autoClose: false
.on('error', error => { })
throw new Error( .on('error', error => {
`Cache upload failed because file read failed with ${error.message}` throw new Error(
) `Cache upload failed because file read failed with ${error.message}`
}), )
}),
fileSize, fileSize,
options options
) )

View File

@ -12,7 +12,11 @@ import {retryHttpClientResponse} from './requestUtils'
import {AbortController} from '@azure/abort-controller' import {AbortController} from '@azure/abort-controller'
export type ChunkWriteCallback = (chunk: Buffer, count: number | undefined, offset: number | undefined) => Promise<void> | void export type ChunkWriteCallback = (
chunk: Buffer,
count: number | undefined,
offset: number | undefined
) => Promise<void> | void
/** /**
* Pipes the body of a HTTP response to a stream * Pipes the body of a HTTP response to a stream
@ -271,7 +275,9 @@ export async function downloadCacheHttpClientConcurrent(
| undefined | undefined
const waitAndWrite: () => Promise<void> = async () => { const waitAndWrite: () => Promise<void> = async () => {
const { buffer, count, offset} = await Promise.race(Object.values(activeDownloads)) const {buffer, count, offset} = await Promise.race(
Object.values(activeDownloads)
)
onChunk(buffer, count, offset) onChunk(buffer, count, offset)
actives-- actives--
delete activeDownloads[offset] delete activeDownloads[offset]