1
0
Fork 0

test queue

pull/1700/head
Vallie Joseph 2024-04-01 20:01:19 +00:00
parent a8fa53b609
commit a2a8a724c2
1 changed files with 43 additions and 67 deletions

View File

@ -22,26 +22,7 @@ export class ZipUploadStream extends stream.Transform {
cb(null, chunk)
}
}
export async function zipFileUpload(file, zip, callback): Promise<void> {
const readStream = createReadStream(file.sourcePath)
if (file.sourcePath !== null) {
zip.entry(readStream, {name: file.destinationPath}, function (err) {
core.debug(`Is stream paused: ${readStream.isPaused()}`)
if (err) {
core.error('An error occurred:', err)
return callback(err)
}
core.debug(`Is stream paused: ${readStream.isPaused()}`)
})
} else {
zip.entry(null, {name: `${file.destinationPath}/`}, function (err) {
if (err) {
core.error('An error occurred:', err)
}
})
}
callback()
}
export async function createZipUploadStream(
uploadSpecification: UploadZipSpecification[],
compressionLevel: number = DEFAULT_COMPRESSION_LEVEL
@ -103,55 +84,50 @@ export async function createZipUploadStream(
// })
// }
// see https://caolan.github.io/async/v3/docs.html#queue for options
// const fileUploadQueue = async.queue() // concurrency for uploads automatically set to 1
const fileUploadQueue = async.queue() // concurrency for uploads automatically set to 1
// fileUploadQueue.error(function (err, task) {
// core.error(`task experienced an error: ${task} ${err}`)
// })
// for (const file of uploadSpecification) {
// if (file.sourcePath !== null) {
// const readStream = createReadStream(file.sourcePath)
// readStream.on('data', chunk => {
// core.debug(`Received ${chunk.length} bytes of data.`)
// })
// readStream.on('end', () => {
// core.debug('There will be no more data.')
// })
// readStream.on('error', function (err) {
// core.debug(`${err}`)
// }) // Catch any errors from createReadStream
// fileUploadQueue.push(
// zip.entry(readStream, {name: file.destinationPath}, function (err) {
// core.debug(`Is stream paused: ${readStream.isPaused()}`)
// if (err) {
// core.error('An error occurred:', err)
// }
// core.debug(`Is stream paused: ${readStream.isPaused()}`)
// })
// )
// } else {
// fileUploadQueue.push(
// zip.entry(null, {name: `${file.destinationPath}/`}, function (err) {
// if (err) {
// core.error('An error occurred:', err)
// }
// })
// )
// }
// }
// core.debug(`Starting the finalizing of all entries`)
// fileUploadQueue.drain(() => {
// core.debug('all items have been processed')
// })
async.forEachOf(uploadSpecification, zipFileUpload, function (err) {
if (err) {
core.error('An error occurred:', err)
}
fileUploadQueue.error(function (err, task) {
core.error(`task experienced an error: ${task} ${err}`)
})
for (const file of uploadSpecification) {
if (file.sourcePath !== null) {
const readStream = createReadStream(file.sourcePath)
readStream.on('data', chunk => {
core.debug(`Received ${chunk.length} bytes of data.`)
})
readStream.on('end', () => {
core.debug('There will be no more data.')
})
readStream.on('error', function (err) {
core.debug(`${err}`)
}) // Catch any errors from createReadStream
fileUploadQueue.push(
zip.entry(readStream, {name: file.destinationPath}, function (err) {
core.debug(`Is stream paused: ${readStream.isPaused()}`)
if (err) {
core.error('An error occurred:', err)
}
core.debug(`Is stream paused: ${readStream.isPaused()}`)
})
)
} else {
fileUploadQueue.push(
zip.entry(null, {name: `${file.destinationPath}/`}, function (err) {
if (err) {
core.error('An error occurred:', err)
}
})
)
}
}
core.debug(`Starting the finalizing of all entries`)
fileUploadQueue.drain(() => {
core.debug('all items have been processed')
})
zip.finalize()
core.debug(`Finalizing entries`)
const bufferSize = getUploadChunkSize()