1
0
Fork 0

update zip stream logic

pull/1700/head
bethanyj28 2024-04-03 17:05:08 -04:00
parent f9bafe97e6
commit 3ca50668ea
1 changed files with 27 additions and 59 deletions

View File

@ -30,6 +30,7 @@ export async function createZipUploadStream(
core.debug( core.debug(
`Creating Artifact archive with compressionLevel: ${compressionLevel}` `Creating Artifact archive with compressionLevel: ${compressionLevel}`
) )
const zlibOptions = { const zlibOptions = {
zlib: { zlib: {
level: compressionLevel, level: compressionLevel,
@ -37,77 +38,43 @@ export async function createZipUploadStream(
} }
} }
const zip = new ZipStream.default(zlibOptions) const zip = new ZipStream.default(zlibOptions)
const bufferSize = getUploadChunkSize()
const zipUploadStream = new ZipUploadStream(bufferSize)
zip.pipe(zipUploadStream)
// register callbacks for various events during the zip lifecycle // register callbacks for various events during the zip lifecycle
zip.on('error', zipErrorCallback) zip.on('error', zipErrorCallback)
zip.on('warning', zipWarningCallback) zip.on('warning', zipWarningCallback)
zip.on('finish', zipFinishCallback) zip.on('finish', zipFinishCallback)
zip.on('end', zipEndCallback) zip.on('end', zipEndCallback)
// see https://caolan.github.io/async/v3/docs.html#queue for options const addFileToZip = (file: UploadZipSpecification, callback: (error?: Error) => void) => {
const fileUploadQueue = async.queue(function (fileItem, callback) {
try {
core.debug(`adding ${fileItem} to the file queue`)
callback()
} catch (err) {
core.error(`task experienced an error: ${fileItem} ${err}`)
callback(err)
}
}) // concurrency for uploads automatically set to 1
fileUploadQueue.error(function (err, task) {
core.error(`task experienced an error: ${task} ${err}`)
})
for (const file of uploadSpecification) {
if (file.sourcePath !== null) { if (file.sourcePath !== null) {
const readStream = createReadStream(file.sourcePath) zip.entry(createReadStream(file.sourcePath), { name: file.destinationPath }, (error: any) => {
readStream.on('end', () => { if (error) {
core.debug('The upload read stream is ending') callback(error)
}) return
readStream.on('error', function (err) {
core.debug(`${err}`)
}) // Catch any errors from createReadStream
const fileEntry = zip.entry(
readStream,
{name: file.destinationPath},
function (err, entry) {
if (err) {
core.error('A file entry error occurred')
core.info(err)
throw new Error('An error occurred during file entry')
}
core.debug(`File entry was succesful: ${entry.data.name}`)
} }
) callback()
})
fileUploadQueue.push(fileEntry)
} else { } else {
fileUploadQueue.push( zip.entry('', { name: file.destinationPath }, (error: any) => {
zip.entry( if (error) {
null, callback(error)
{name: `${file.destinationPath}/`}, return
function (err, entry) { }
if (err) { callback()
core.error('A directory entry error occurred') })
core.info(err)
throw new Error('An error occurred during directory entry')
}
core.debug(`File entry was succesful: ${entry.data.name}`)
}
)
)
} }
} }
core.debug(`Starting the finalizing of all entries`) async.eachSeries(uploadSpecification, addFileToZip, (error: any) => {
if (error) {
fileUploadQueue.drain(() => { core.error('Failed to add a file to the zip:')
core.debug('all items have been processed') core.info(error)
return
}
zip.finalize() // Finalize the archive once all files have been added
}) })
zip.finalize()
core.debug(`Finalizing entries`)
const bufferSize = getUploadChunkSize()
const zipUploadStream = new ZipUploadStream(bufferSize)
zip.pipe(zipUploadStream) // Pipe the zip stream into zipUploadStream
core.debug( core.debug(
`Zip write high watermark value ${zipUploadStream.writableHighWaterMark}` `Zip write high watermark value ${zipUploadStream.writableHighWaterMark}`
@ -115,6 +82,7 @@ export async function createZipUploadStream(
core.debug( core.debug(
`Zip read high watermark value ${zipUploadStream.readableHighWaterMark}` `Zip read high watermark value ${zipUploadStream.readableHighWaterMark}`
) )
return zipUploadStream return zipUploadStream
} }