1
0
Fork 0

adding asnyc handler back

pull/1700/head
Vallie Joseph 2024-04-01 16:54:55 +00:00
parent 6eff4e928d
commit 4778aebf5b
4 changed files with 92 additions and 29 deletions

View File

@ -19,6 +19,7 @@
"@octokit/request-error": "^5.0.0", "@octokit/request-error": "^5.0.0",
"@protobuf-ts/plugin": "^2.2.3-alpha.1", "@protobuf-ts/plugin": "^2.2.3-alpha.1",
"archiver": "^5.3.1", "archiver": "^5.3.1",
"async": "^3.2.5",
"crypto": "^1.0.1", "crypto": "^1.0.1",
"jwt-decode": "^3.1.2", "jwt-decode": "^3.1.2",
"twirp-ts": "^2.5.0", "twirp-ts": "^2.5.0",

View File

@ -50,6 +50,7 @@
"@octokit/request-error": "^5.0.0", "@octokit/request-error": "^5.0.0",
"@protobuf-ts/plugin": "^2.2.3-alpha.1", "@protobuf-ts/plugin": "^2.2.3-alpha.1",
"archiver": "^5.3.1", "archiver": "^5.3.1",
"async": "^3.2.5",
"crypto": "^1.0.1", "crypto": "^1.0.1",
"jwt-decode": "^3.1.2", "jwt-decode": "^3.1.2",
"twirp-ts": "^2.5.0", "twirp-ts": "^2.5.0",

View File

@ -71,7 +71,11 @@ export async function uploadArtifact(
const zipUploadStream = await createZipUploadStream( const zipUploadStream = await createZipUploadStream(
zipSpecification, zipSpecification,
options?.compressionLevel options?.compressionLevel
) ).catch(err => {
throw new InvalidResponseError(
`createZipUploadStream: response from backend was not ok: ${err}`
)
})
// Upload zip to blob storage // Upload zip to blob storage
const uploadResult = await uploadZipToBlobStorage( const uploadResult = await uploadZipToBlobStorage(

View File

@ -1,6 +1,7 @@
import * as stream from 'stream' import * as stream from 'stream'
import * as ZipStream from 'zip-stream' import * as ZipStream from 'zip-stream'
import * as core from '@actions/core' import * as core from '@actions/core'
import async from 'async'
import {createReadStream} from 'fs' import {createReadStream} from 'fs'
import {UploadZipSpecification} from './upload-zip-specification' import {UploadZipSpecification} from './upload-zip-specification'
import {getUploadChunkSize} from '../shared/config' import {getUploadChunkSize} from '../shared/config'
@ -42,39 +43,95 @@ export async function createZipUploadStream(
zip.on('finish', zipFinishCallback) zip.on('finish', zipFinishCallback)
zip.on('end', zipEndCallback) zip.on('end', zipEndCallback)
for (const file of uploadSpecification) { // for (const file of uploadSpecification) {
await new Promise((resolve, reject) => { // await new Promise((resolve, reject) => {
if (file.sourcePath !== null) { // if (file.sourcePath !== null) {
core.debug(`createReadStream with: ${file.sourcePath}`) // core.debug(`createReadStream with: ${file.sourcePath}`)
// Add a normal file to the zip // // Add a normal file to the zip
const readsstream = createReadStream(file.sourcePath) // const readStream = createReadStream(file.sourcePath)
readsstream.on('error', reject) // readStream.on('data', chunk => {
// core.debug(`Received ${chunk.length} bytes of data.`)
// })
// readStream.on('end', () => {
// core.debug('There will be no more data.')
// })
// readStream.on('error', reject) // Catch any errors from createReadStream
zip.entry( // core.debug(`readsstream errors: ${readStream.errored}`)
readsstream, // const entry = zip.entry(
{name: file.destinationPath}, // readStream,
function (err, entry) { // {name: file.destinationPath},
core.debug(`${err}`) // function (err) {
if (err) reject(err) // core.debug(`Is stream paused: ${readStream.isPaused()}`)
else resolve(entry) // if (err) {
// core.error('An error occurred:', err)
// reject(err)
// }
// core.debug(`Is stream paused: ${readStream.isPaused()}`)
// resolve('resolved artifact')
// }
// )
// readStream.pipe(entry)
// } else {
// zip.entry(null, {name: `${file.destinationPath}/`}, function (err) {
// if (err) {
// core.error('An error occurred:', err)
// reject(err)
// }
// resolve('resolved artifact')
// })
// }
// })
// }
const fileUploadQueue = async.queue(function (task, callback) {
core.info(`hello ${task.name}`)
callback()
}, 1)
fileUploadQueue.error(function (err, task) {
core.error(`task experienced an error: ${task} ${err}`)
})
for (const file of uploadSpecification) {
if (file.sourcePath !== null) {
const readStream = createReadStream(file.sourcePath)
readStream.on('data', chunk => {
core.debug(`Received ${chunk.length} bytes of data.`)
})
readStream.on('end', () => {
core.debug('There will be no more data.')
})
readStream.on('error', function (err) {
core.debug(`${err}`)
}) // Catch any errors from createReadStream
fileUploadQueue.push(
zip.entry(readStream, {name: file.destinationPath}, function (err) {
core.debug(`Is stream paused: ${readStream.isPaused()}`)
if (err) {
core.error('An error occurred:', err)
} }
) core.debug(`Is stream paused: ${readStream.isPaused()}`)
} else { })
// add directory to zip )
core.debug(`add directory with: ${file.destinationPath}`) } else {
zip.entry( fileUploadQueue.push(
null, zip.entry(null, {name: `${file.destinationPath}/`}, function (err) {
{name: `${file.destinationPath}/`}, if (err) {
function (err, entry) { core.error('An error occurred:', err)
core.debug(`${err}`)
if (err) reject(err)
else resolve(entry)
} }
) })
} )
}) }
} }
core.debug(`Starting the finalizing of all entries`)
for (const item of fileUploadQueue) {
core.debug(`Starting the finalizing ${item}`)
}
fileUploadQueue.drain(() => {
core.debug('all items have been processed')
})
zip.finalize() zip.finalize()
core.debug(`Finalizing entries`) core.debug(`Finalizing entries`)
const bufferSize = getUploadChunkSize() const bufferSize = getUploadChunkSize()