diff --git a/packages/artifact/package-lock.json b/packages/artifact/package-lock.json index 41b90a9b..bc503a71 100644 --- a/packages/artifact/package-lock.json +++ b/packages/artifact/package-lock.json @@ -19,6 +19,7 @@ "@octokit/request-error": "^5.0.0", "@protobuf-ts/plugin": "^2.2.3-alpha.1", "archiver": "^5.3.1", + "async": "^3.2.5", "crypto": "^1.0.1", "jwt-decode": "^3.1.2", "twirp-ts": "^2.5.0", diff --git a/packages/artifact/package.json b/packages/artifact/package.json index 292ba63d..104a5d67 100644 --- a/packages/artifact/package.json +++ b/packages/artifact/package.json @@ -50,6 +50,7 @@ "@octokit/request-error": "^5.0.0", "@protobuf-ts/plugin": "^2.2.3-alpha.1", "archiver": "^5.3.1", + "async": "^3.2.5", "crypto": "^1.0.1", "jwt-decode": "^3.1.2", "twirp-ts": "^2.5.0", diff --git a/packages/artifact/src/internal/upload/upload-artifact.ts b/packages/artifact/src/internal/upload/upload-artifact.ts index b8f95201..e810ccc6 100644 --- a/packages/artifact/src/internal/upload/upload-artifact.ts +++ b/packages/artifact/src/internal/upload/upload-artifact.ts @@ -71,7 +71,11 @@ export async function uploadArtifact( const zipUploadStream = await createZipUploadStream( zipSpecification, options?.compressionLevel - ) + ).catch(err => { + throw new InvalidResponseError( + `createZipUploadStream: response from backend was not ok: ${err}` + ) + }) // Upload zip to blob storage const uploadResult = await uploadZipToBlobStorage( diff --git a/packages/artifact/src/internal/upload/zip.ts b/packages/artifact/src/internal/upload/zip.ts index 639ef3f4..974df55a 100644 --- a/packages/artifact/src/internal/upload/zip.ts +++ b/packages/artifact/src/internal/upload/zip.ts @@ -1,6 +1,7 @@ import * as stream from 'stream' import * as ZipStream from 'zip-stream' import * as core from '@actions/core' +import async from 'async' import {createReadStream} from 'fs' import {UploadZipSpecification} from './upload-zip-specification' import {getUploadChunkSize} from '../shared/config' @@ -42,39 +43,95 @@ export async function createZipUploadStream( zip.on('finish', zipFinishCallback) zip.on('end', zipEndCallback) - for (const file of uploadSpecification) { - await new Promise((resolve, reject) => { - if (file.sourcePath !== null) { - core.debug(`createReadStream with: ${file.sourcePath}`) - // Add a normal file to the zip - const readsstream = createReadStream(file.sourcePath) - readsstream.on('error', reject) + // for (const file of uploadSpecification) { + // await new Promise((resolve, reject) => { + // if (file.sourcePath !== null) { + // core.debug(`createReadStream with: ${file.sourcePath}`) + // // Add a normal file to the zip + // const readStream = createReadStream(file.sourcePath) + // readStream.on('data', chunk => { + // core.debug(`Received ${chunk.length} bytes of data.`) + // }) + // readStream.on('end', () => { + // core.debug('There will be no more data.') + // }) + // readStream.on('error', reject) // Catch any errors from createReadStream - zip.entry( - readsstream, - {name: file.destinationPath}, - function (err, entry) { - core.debug(`${err}`) - if (err) reject(err) - else resolve(entry) + // core.debug(`readsstream errors: ${readStream.errored}`) + // const entry = zip.entry( + // readStream, + // {name: file.destinationPath}, + // function (err) { + // core.debug(`Is stream paused: ${readStream.isPaused()}`) + // if (err) { + // core.error('An error occurred:', err) + // reject(err) + // } + // core.debug(`Is stream paused: ${readStream.isPaused()}`) + // resolve('resolved artifact') + // } + // ) + // readStream.pipe(entry) + // } else { + // zip.entry(null, {name: `${file.destinationPath}/`}, function (err) { + // if (err) { + // core.error('An error occurred:', err) + // reject(err) + // } + // resolve('resolved artifact') + // }) + // } + // }) + // } + const fileUploadQueue = async.queue(function (task, callback) { + core.info(`hello ${task.name}`) + callback() + }, 1) + + fileUploadQueue.error(function (err, task) { + core.error(`task experienced an error: ${task} ${err}`) + }) + + for (const file of uploadSpecification) { + if (file.sourcePath !== null) { + const readStream = createReadStream(file.sourcePath) + readStream.on('data', chunk => { + core.debug(`Received ${chunk.length} bytes of data.`) + }) + readStream.on('end', () => { + core.debug('There will be no more data.') + }) + readStream.on('error', function (err) { + core.debug(`${err}`) + }) // Catch any errors from createReadStream + fileUploadQueue.push( + zip.entry(readStream, {name: file.destinationPath}, function (err) { + core.debug(`Is stream paused: ${readStream.isPaused()}`) + if (err) { + core.error('An error occurred:', err) } - ) - } else { - // add directory to zip - core.debug(`add directory with: ${file.destinationPath}`) - zip.entry( - null, - {name: `${file.destinationPath}/`}, - function (err, entry) { - core.debug(`${err}`) - if (err) reject(err) - else resolve(entry) + core.debug(`Is stream paused: ${readStream.isPaused()}`) + }) + ) + } else { + fileUploadQueue.push( + zip.entry(null, {name: `${file.destinationPath}/`}, function (err) { + if (err) { + core.error('An error occurred:', err) } - ) - } - }) + }) + ) + } } + core.debug(`Starting the finalizing of all entries`) + + for (const item of fileUploadQueue) { + core.debug(`Starting the finalizing ${item}`) + } + fileUploadQueue.drain(() => { + core.debug('all items have been processed') + }) zip.finalize() core.debug(`Finalizing entries`) const bufferSize = getUploadChunkSize()