1
0
Fork 0

Merge pull request #661 from yacaovsnc/main

Retry artifact download when response stream is truncated (continued)
pull/665/head
Chris Sidi 2020-12-04 13:10:40 -05:00 committed by GitHub
commit 7ad5004f87
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 160 additions and 18 deletions

View File

@ -124,7 +124,7 @@ describe('Download Tests', () => {
) )
const targetPath = path.join(root, 'FileA.txt') const targetPath = path.join(root, 'FileA.txt')
setupDownloadItemResponse(true, 200, fileContents) setupDownloadItemResponse(fileContents, true, 200, false, false)
const downloadHttpClient = new DownloadHttpClient() const downloadHttpClient = new DownloadHttpClient()
const items: DownloadItem[] = [] const items: DownloadItem[] = []
@ -147,7 +147,7 @@ describe('Download Tests', () => {
) )
const targetPath = path.join(root, 'FileB.txt') const targetPath = path.join(root, 'FileB.txt')
setupDownloadItemResponse(false, 200, fileContents) setupDownloadItemResponse(fileContents, false, 200, false, false)
const downloadHttpClient = new DownloadHttpClient() const downloadHttpClient = new DownloadHttpClient()
const items: DownloadItem[] = [] const items: DownloadItem[] = []
@ -171,7 +171,7 @@ describe('Download Tests', () => {
const fileContents = Buffer.from('try, try again\n', defaultEncoding) const fileContents = Buffer.from('try, try again\n', defaultEncoding)
const targetPath = path.join(root, `FileC-${statusCode}.txt`) const targetPath = path.join(root, `FileC-${statusCode}.txt`)
setupDownloadItemResponse(false, statusCode, fileContents) setupDownloadItemResponse(fileContents, false, statusCode, false, true)
const downloadHttpClient = new DownloadHttpClient() const downloadHttpClient = new DownloadHttpClient()
const items: DownloadItem[] = [] const items: DownloadItem[] = []
@ -188,6 +188,52 @@ describe('Download Tests', () => {
} }
}) })
it('Test retry on truncated response with gzip', async () => {
const fileContents = Buffer.from(
'Sometimes gunzip fails on the first try\n',
defaultEncoding
)
const targetPath = path.join(root, 'FileD.txt')
setupDownloadItemResponse(fileContents, true, 200, true, true)
const downloadHttpClient = new DownloadHttpClient()
const items: DownloadItem[] = []
items.push({
sourceLocation: `${configVariables.getRuntimeUrl()}_apis/resources/Containers/13?itemPath=my-artifact%2FFileD.txt`,
targetPath
})
await expect(
downloadHttpClient.downloadSingleArtifact(items)
).resolves.not.toThrow()
await checkDestinationFile(targetPath, fileContents)
})
it('Test retry on truncated response without gzip', async () => {
const fileContents = Buffer.from(
'You have to inspect the content-length header to know if you got everything\n',
defaultEncoding
)
const targetPath = path.join(root, 'FileE.txt')
setupDownloadItemResponse(fileContents, false, 200, true, true)
const downloadHttpClient = new DownloadHttpClient()
const items: DownloadItem[] = []
items.push({
sourceLocation: `${configVariables.getRuntimeUrl()}_apis/resources/Containers/13?itemPath=my-artifact%2FFileD.txt`,
targetPath
})
await expect(
downloadHttpClient.downloadSingleArtifact(items)
).resolves.not.toThrow()
await checkDestinationFile(targetPath, fileContents)
})
/** /**
* Helper used to setup mocking for the HttpClient * Helper used to setup mocking for the HttpClient
*/ */
@ -251,19 +297,27 @@ describe('Download Tests', () => {
* @param firstHttpResponseCode the http response code that should be returned * @param firstHttpResponseCode the http response code that should be returned
*/ */
function setupDownloadItemResponse( function setupDownloadItemResponse(
fileContents: Buffer,
isGzip: boolean, isGzip: boolean,
firstHttpResponseCode: number, firstHttpResponseCode: number,
fileContents: Buffer truncateFirstResponse: boolean,
retryExpected: boolean
): void { ): void {
const spyInstance = jest const spyInstance = jest
.spyOn(HttpClient.prototype, 'get') .spyOn(HttpClient.prototype, 'get')
.mockImplementationOnce(async () => { .mockImplementationOnce(async () => {
if (firstHttpResponseCode === 200) { if (firstHttpResponseCode === 200) {
const fullResponse = await constructResponse(isGzip, fileContents)
const actualResponse = truncateFirstResponse
? fullResponse.subarray(0, 3)
: fullResponse
return { return {
message: getDownloadResponseMessage( message: getDownloadResponseMessage(
firstHttpResponseCode, firstHttpResponseCode,
isGzip, isGzip,
await constructResponse(isGzip, fileContents) fullResponse.length,
actualResponse
), ),
readBody: emptyMockReadBody readBody: emptyMockReadBody
} }
@ -272,6 +326,7 @@ describe('Download Tests', () => {
message: getDownloadResponseMessage( message: getDownloadResponseMessage(
firstHttpResponseCode, firstHttpResponseCode,
false, false,
0,
null null
), ),
readBody: emptyMockReadBody readBody: emptyMockReadBody
@ -280,14 +335,16 @@ describe('Download Tests', () => {
}) })
// set up a second mock only if we expect a retry. Otherwise this mock will affect other tests. // set up a second mock only if we expect a retry. Otherwise this mock will affect other tests.
if (firstHttpResponseCode !== 200) { if (retryExpected) {
spyInstance.mockImplementationOnce(async () => { spyInstance.mockImplementationOnce(async () => {
// chained response, if the HTTP GET function gets called again, return a successful response // chained response, if the HTTP GET function gets called again, return a successful response
const fullResponse = await constructResponse(isGzip, fileContents)
return { return {
message: getDownloadResponseMessage( message: getDownloadResponseMessage(
200, 200,
isGzip, isGzip,
await constructResponse(isGzip, fileContents) fullResponse.length,
fullResponse
), ),
readBody: emptyMockReadBody readBody: emptyMockReadBody
} }
@ -311,6 +368,7 @@ describe('Download Tests', () => {
function getDownloadResponseMessage( function getDownloadResponseMessage(
httpResponseCode: number, httpResponseCode: number,
isGzip: boolean, isGzip: boolean,
contentLength: number,
response: Buffer | null response: Buffer | null
): http.IncomingMessage { ): http.IncomingMessage {
let readCallCount = 0 let readCallCount = 0
@ -335,7 +393,9 @@ describe('Download Tests', () => {
}) })
mockMessage.statusCode = httpResponseCode mockMessage.statusCode = httpResponseCode
mockMessage.headers = {} mockMessage.headers = {
'content-length': contentLength.toString()
}
if (isGzip) { if (isGzip) {
mockMessage.headers['content-encoding'] = 'gzip' mockMessage.headers['content-encoding'] = 'gzip'

View File

@ -9,7 +9,9 @@ import {
isThrottledStatusCode, isThrottledStatusCode,
getExponentialRetryTimeInMilliseconds, getExponentialRetryTimeInMilliseconds,
tryGetRetryAfterValueTimeInMilliseconds, tryGetRetryAfterValueTimeInMilliseconds,
displayHttpDiagnostics displayHttpDiagnostics,
getFileSize,
rmFile
} from './utils' } from './utils'
import {URL} from 'url' import {URL} from 'url'
import {StatusReporter} from './status-reporter' import {StatusReporter} from './status-reporter'
@ -151,7 +153,7 @@ export class DownloadHttpClient {
): Promise<void> { ): Promise<void> {
let retryCount = 0 let retryCount = 0
const retryLimit = getRetryLimit() const retryLimit = getRetryLimit()
const destinationStream = fs.createWriteStream(downloadPath) let destinationStream = fs.createWriteStream(downloadPath)
const headers = getDownloadHeaders('application/json', true, true) const headers = getDownloadHeaders('application/json', true, true)
// a single GET request is used to download a file // a single GET request is used to download a file
@ -201,11 +203,39 @@ export class DownloadHttpClient {
} }
} }
const isAllBytesReceived = (
expected?: string,
received?: number
): boolean => {
// be lenient, if any input is missing, assume success, i.e. not truncated
if (
!expected ||
!received ||
process.env['ACTIONS_ARTIFACT_SKIP_DOWNLOAD_VALIDATION']
) {
core.info('Skipping download validation.')
return true
}
return parseInt(expected) === received
}
const resetDestinationStream = async (
fileDownloadPath: string
): Promise<void> => {
destinationStream.close()
await rmFile(fileDownloadPath)
destinationStream = fs.createWriteStream(fileDownloadPath)
}
// keep trying to download a file until a retry limit has been reached // keep trying to download a file until a retry limit has been reached
while (retryCount <= retryLimit) { while (retryCount <= retryLimit) {
let response: IHttpClientResponse let response: IHttpClientResponse
try { try {
response = await makeDownloadRequest() response = await makeDownloadRequest()
if (core.isDebug()) {
displayHttpDiagnostics(response)
}
} catch (error) { } catch (error) {
// if an error is caught, it is usually indicative of a timeout so retry the download // if an error is caught, it is usually indicative of a timeout so retry the download
core.info('An error occurred while attempting to download a file') core.info('An error occurred while attempting to download a file')
@ -217,19 +247,37 @@ export class DownloadHttpClient {
continue continue
} }
let forceRetry = false
if (isSuccessStatusCode(response.message.statusCode)) { if (isSuccessStatusCode(response.message.statusCode)) {
// The body contains the contents of the file however calling response.readBody() causes all the content to be converted to a string // The body contains the contents of the file however calling response.readBody() causes all the content to be converted to a string
// which can cause some gzip encoded data to be lost // which can cause some gzip encoded data to be lost
// Instead of using response.readBody(), response.message is a readableStream that can be directly used to get the raw body contents // Instead of using response.readBody(), response.message is a readableStream that can be directly used to get the raw body contents
return this.pipeResponseToFile( try {
response, const isGzipped = isGzip(response.message.headers)
destinationStream, await this.pipeResponseToFile(response, destinationStream, isGzipped)
isGzip(response.message.headers)
if (
isGzipped ||
isAllBytesReceived(
response.message.headers['content-length'],
await getFileSize(downloadPath)
) )
} else if (isRetryableStatusCode(response.message.statusCode)) { ) {
return
} else {
forceRetry = true
}
} catch (error) {
// retry on error, most likely streams were corrupted
forceRetry = true
}
}
if (forceRetry || isRetryableStatusCode(response.message.statusCode)) {
core.info( core.info(
`A ${response.message.statusCode} response code has been received while attempting to download an artifact` `A ${response.message.statusCode} response code has been received while attempting to download an artifact`
) )
resetDestinationStream(downloadPath)
// if a throttled status code is received, try to get the retryAfter header value, else differ to standard exponential backoff // if a throttled status code is received, try to get the retryAfter header value, else differ to standard exponential backoff
isThrottledStatusCode(response.message.statusCode) isThrottledStatusCode(response.message.statusCode)
? await backOff( ? await backOff(
@ -263,26 +311,48 @@ export class DownloadHttpClient {
if (isGzip) { if (isGzip) {
const gunzip = zlib.createGunzip() const gunzip = zlib.createGunzip()
response.message response.message
.on('error', error => {
core.error(
`An error occurred while attempting to read the response stream`
)
gunzip.close()
destinationStream.close()
reject(error)
})
.pipe(gunzip) .pipe(gunzip)
.on('error', error => {
core.error(
`An error occurred while attempting to decompress the response stream`
)
destinationStream.close()
reject(error)
})
.pipe(destinationStream) .pipe(destinationStream)
.on('close', () => { .on('close', () => {
resolve() resolve()
}) })
.on('error', error => { .on('error', error => {
core.error( core.error(
`An error has been encountered while decompressing and writing a downloaded file to ${destinationStream.path}` `An error occurred while writing a downloaded file to ${destinationStream.path}`
) )
reject(error) reject(error)
}) })
} else { } else {
response.message response.message
.on('error', error => {
core.error(
`An error occurred while attempting to read the response stream`
)
destinationStream.close()
reject(error)
})
.pipe(destinationStream) .pipe(destinationStream)
.on('close', () => { .on('close', () => {
resolve() resolve()
}) })
.on('error', error => { .on('error', error => {
core.error( core.error(
`An error has been encountered while writing a downloaded file to ${destinationStream.path}` `An error occurred while writing a downloaded file to ${destinationStream.path}`
) )
reject(error) reject(error)
}) })

View File

@ -303,6 +303,18 @@ export async function createEmptyFilesForArtifact(
} }
} }
export async function getFileSize(filePath: string): Promise<number> {
const stats = await fs.stat(filePath)
debug(
`${filePath} size:(${stats.size}) blksize:(${stats.blksize}) blocks:(${stats.blocks})`
)
return stats.size
}
export async function rmFile(filePath: string): Promise<void> {
await fs.unlink(filePath)
}
export function getProperRetention( export function getProperRetention(
retentionInput: number, retentionInput: number,
retentionSetting: string | undefined retentionSetting: string | undefined