1
0
Fork 0

test async eachof

pull/1700/head
Vallie Joseph 2024-04-01 19:57:11 +00:00
parent 23039a4345
commit a8fa53b609
1 changed files with 66 additions and 43 deletions

View File

@ -22,7 +22,26 @@ export class ZipUploadStream extends stream.Transform {
cb(null, chunk) cb(null, chunk)
} }
} }
export async function zipFileUpload(file, zip, callback): Promise<void> {
const readStream = createReadStream(file.sourcePath)
if (file.sourcePath !== null) {
zip.entry(readStream, {name: file.destinationPath}, function (err) {
core.debug(`Is stream paused: ${readStream.isPaused()}`)
if (err) {
core.error('An error occurred:', err)
return callback(err)
}
core.debug(`Is stream paused: ${readStream.isPaused()}`)
})
} else {
zip.entry(null, {name: `${file.destinationPath}/`}, function (err) {
if (err) {
core.error('An error occurred:', err)
}
})
}
callback()
}
export async function createZipUploadStream( export async function createZipUploadStream(
uploadSpecification: UploadZipSpecification[], uploadSpecification: UploadZipSpecification[],
compressionLevel: number = DEFAULT_COMPRESSION_LEVEL compressionLevel: number = DEFAULT_COMPRESSION_LEVEL
@ -83,51 +102,55 @@ export async function createZipUploadStream(
// } // }
// }) // })
// } // }
const fileUploadQueue = async.queue(function (task, callback) { // see https://caolan.github.io/async/v3/docs.html#queue for options
core.debug(`adding file to upload queue ${task}`) // const fileUploadQueue = async.queue() // concurrency for uploads automatically set to 1
callback()
}, 1)
fileUploadQueue.error(function (err, task) { // fileUploadQueue.error(function (err, task) {
core.error(`task experienced an error: ${task} ${err}`) // core.error(`task experienced an error: ${task} ${err}`)
}) // })
for (const file of uploadSpecification) { // for (const file of uploadSpecification) {
if (file.sourcePath !== null) { // if (file.sourcePath !== null) {
const readStream = createReadStream(file.sourcePath) // const readStream = createReadStream(file.sourcePath)
readStream.on('data', chunk => { // readStream.on('data', chunk => {
core.debug(`Received ${chunk.length} bytes of data.`) // core.debug(`Received ${chunk.length} bytes of data.`)
}) // })
readStream.on('end', () => { // readStream.on('end', () => {
core.debug('There will be no more data.') // core.debug('There will be no more data.')
}) // })
readStream.on('error', function (err) { // readStream.on('error', function (err) {
core.debug(`${err}`) // core.debug(`${err}`)
}) // Catch any errors from createReadStream // }) // Catch any errors from createReadStream
fileUploadQueue.push( // fileUploadQueue.push(
zip.entry(readStream, {name: file.destinationPath}, function (err) { // zip.entry(readStream, {name: file.destinationPath}, function (err) {
core.debug(`Is stream paused: ${readStream.isPaused()}`) // core.debug(`Is stream paused: ${readStream.isPaused()}`)
if (err) { // if (err) {
core.error('An error occurred:', err) // core.error('An error occurred:', err)
} // }
core.debug(`Is stream paused: ${readStream.isPaused()}`) // core.debug(`Is stream paused: ${readStream.isPaused()}`)
}) // })
) // )
} else { // } else {
fileUploadQueue.push( // fileUploadQueue.push(
zip.entry(null, {name: `${file.destinationPath}/`}, function (err) { // zip.entry(null, {name: `${file.destinationPath}/`}, function (err) {
if (err) { // if (err) {
core.error('An error occurred:', err) // core.error('An error occurred:', err)
} // }
}) // })
) // )
// }
// }
// core.debug(`Starting the finalizing of all entries`)
// fileUploadQueue.drain(() => {
// core.debug('all items have been processed')
// })
async.forEachOf(uploadSpecification, zipFileUpload, function (err) {
if (err) {
core.error('An error occurred:', err)
} }
}
core.debug(`Starting the finalizing of all entries`)
fileUploadQueue.drain(() => {
core.debug('all items have been processed')
}) })
zip.finalize() zip.finalize()
core.debug(`Finalizing entries`) core.debug(`Finalizing entries`)