1
0
Fork 0

adds streaming download support

pull/1716/head
Prajjwal 2024-04-10 10:14:06 +05:30
parent 6de528cc4a
commit 1fc87f92d9
9 changed files with 1365 additions and 51 deletions

View File

@ -2,10 +2,10 @@ import * as cache from '../src/cache'
test('isFeatureAvailable returns true if server url is set', () => {
try {
process.env['WARP_CACHE_URL'] = 'http://cache.com'
process.env['WARPBUILD_CACHE_URL'] = 'http://cache.com'
expect(cache.isFeatureAvailable()).toBe(true)
} finally {
delete process.env['WARP_CACHE_URL']
delete process.env['WARPBUILD_CACHE_URL']
}
})

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,6 @@
{
"name": "github-actions.warp-cache",
"version": "0.2.0",
"version": "0.3.0",
"preview": true,
"description": "Github action to use WarpBuild's in-house cache offering",
"keywords": [
@ -9,7 +9,7 @@
"cache",
"warpbuild"
],
"homepage": "https://github.com/actions/toolkit/tree/main/packages/cache",
"homepage": "https://github.com/WarpBuilds/toolkit/tree/main/packages/warp-cache",
"license": "MIT",
"main": "lib/cache.js",
"types": "lib/cache.d.ts",
@ -26,7 +26,7 @@
},
"repository": {
"type": "git",
"url": "git+https://github.com/actions/toolkit.git",
"url": "git+https://github.com/WarpBuilds/toolkit.git",
"directory": "packages/cache"
},
"scripts": {
@ -35,7 +35,7 @@
"tsc": "tsc"
},
"bugs": {
"url": "https://github.com/actions/toolkit/issues"
"url": "https://github.com/WarpBuilds/toolkit/issues"
},
"dependencies": {
"@actions/core": "^1.10.0",
@ -46,6 +46,7 @@
"@azure/abort-controller": "^1.1.0",
"@azure/ms-rest-js": "^2.6.0",
"@azure/storage-blob": "^12.13.0",
"@google-cloud/storage": "^7.9.0",
"axios": "^1.6.2",
"semver": "^6.3.1",
"uuid": "^3.3.3"

View File

@ -2,7 +2,12 @@ import * as core from '@actions/core'
import * as path from 'path'
import * as utils from './internal/cacheUtils'
import * as cacheHttpClient from './internal/cacheHttpClient'
import {createTar, extractTar, listTar} from './internal/tar'
import {
createTar,
extractStreamingTar,
extractTar,
listTar
} from './internal/tar'
import {DownloadOptions, getUploadOptions} from './options'
export class ValidationError extends Error {
@ -50,7 +55,7 @@ function checkKey(key: string): void {
*/
export function isFeatureAvailable(): boolean {
return !!process.env['WARP_CACHE_URL']
return !!process.env['WARPBUILD_CACHE_URL']
}
/**
@ -95,14 +100,15 @@ export async function restoreCache(
compressionMethod,
enableCrossOsArchive
})
if (!cacheEntry?.pre_signed_url) {
// Cache not found
if (!cacheEntry) {
// Internal Error
return undefined
}
if (options?.lookupOnly) {
core.info('Lookup only - skipping download')
return cacheEntry.cache_key
return cacheEntry?.cache_key
}
archivePath = path.join(
@ -111,8 +117,17 @@ export async function restoreCache(
)
core.debug(`Archive Path: ${archivePath}`)
// Download the cache from the cache entry
await cacheHttpClient.downloadCache(cacheEntry.pre_signed_url, archivePath)
switch (cacheEntry.provider) {
case 's3': {
if (!cacheEntry.pre_signed_url) {
// Cache not found
return undefined
}
await cacheHttpClient.downloadCache(
cacheEntry.pre_signed_url,
archivePath
)
if (core.isDebug()) {
await listTar(archivePath, compressionMethod)
@ -127,6 +142,27 @@ export async function restoreCache(
await extractTar(archivePath, compressionMethod)
core.info('Cache restored successfully')
break
}
case 'gcs': {
// For GCS, we do a streaming download which means that we extract the archive while we are downloading it.
const archiveLocation = cacheEntry.archive_location ?? ''
const readStream = cacheHttpClient.downloadCacheStreaming(
'gcs',
archiveLocation
)
if (!readStream) {
return undefined
}
await extractStreamingTar(readStream, archivePath, compressionMethod)
core.info('Cache restored successfully')
break
}
}
return cacheEntry.cache_key
} catch (error) {
@ -134,7 +170,7 @@ export async function restoreCache(
if (typedError.name === ValidationError.name) {
throw error
} else {
// Supress all non-validation cache related errors because caching should be optional
// Suppress all non-validation cache related errors because caching should be optional
core.warning(`Failed to restore: ${(error as Error).message}`)
}
} finally {

View File

@ -21,7 +21,10 @@ import {
InternalS3CompletedPart,
CommitCacheResponse
} from './contracts'
import {downloadCacheMultiConnection} from './downloadUtils'
import {
downloadCacheMultiConnection,
downloadCacheStreamingGCP
} from './downloadUtils'
import {isSuccessStatusCode, retryTypedResponse} from './requestUtils'
import axios, {AxiosError} from 'axios'
@ -29,12 +32,14 @@ const versionSalt = '1.0'
function getCacheApiUrl(resource: string): string {
const baseUrl: string =
process.env['WARP_CACHE_URL'] ?? 'https://cache.warpbuild.com'
process.env['WARPBUILD_CACHE_URL'] ?? 'https://cache.warpbuild.com'
if (!baseUrl) {
throw new Error('Cache Service Url not found, unable to restore cache.')
}
const url = `${baseUrl}/v1/${resource}`
const provider: string = process.env['STORAGE_PROVIDER'] ?? 'gcs'
const url = `${baseUrl}/v1/${resource}/${provider}`
core.debug(`Resource Url: ${url}`)
return url
}
@ -54,11 +59,11 @@ function getRequestOptions(): RequestOptions {
}
function createHttpClient(): HttpClient {
const token = process.env['WARP_RUNNER_VERIFICATION_TOKEN'] ?? ''
const token = process.env['WARPBUILD_RUNNER_VERIFICATION_TOKEN'] ?? ''
const bearerCredentialHandler = new BearerCredentialHandler(token)
return new HttpClient(
'actions/cache',
'warp/cache',
[bearerCredentialHandler],
getRequestOptions()
)
@ -106,7 +111,7 @@ export async function getCacheEntry(
const response = await retryTypedResponse('getCacheEntry', async () =>
httpClient.getJson<ArtifactCacheEntry>(getCacheApiUrl(resource))
)
// Cache not found
if (response.statusCode === 204) {
// List cache for primary key only if cache miss occurs
if (core.isDebug()) {
@ -119,9 +124,9 @@ export async function getCacheEntry(
}
const cacheResult = response.result
const cacheDownloadUrl = cacheResult?.pre_signed_url
const cacheDownloadUrl = cacheResult?.archive_location
if (!cacheDownloadUrl) {
// Cache achiveLocation not found. This should never happen, and hence bail out.
// Cache archiveLocation not found. This should never happen, and hence bail out.
throw new Error('Cache not found.')
}
core.setSecret(cacheDownloadUrl)
@ -163,7 +168,20 @@ export async function downloadCache(
await downloadCacheMultiConnection(archiveLocation, archivePath, 8)
}
// Reserve Cache
export function downloadCacheStreaming(
provider: string,
archiveLocation: string
): NodeJS.ReadableStream | undefined {
switch (provider) {
case 's3':
return undefined
case 'gcs':
return downloadCacheStreamingGCP(archiveLocation)
default:
return undefined
}
}
export async function reserveCache(
cacheKey: string,
numberOfChunks: number,
@ -240,7 +258,6 @@ async function uploadFileToS3(
preSignedURLs: string[],
archivePath: string
): Promise<InternalS3CompletedPart[]> {
// Upload Chunks
const fileSize = utils.getArchiveFileSizeInBytes(archivePath)
const numberOfChunks = preSignedURLs.length
@ -334,7 +351,6 @@ export async function saveCache(
// Sort parts in ascending order by partNumber
completedParts.sort((a, b) => a.PartNumber - b.PartNumber)
// Commit Cache
core.debug('Committing cache')
const cacheSize = utils.getArchiveFileSizeInBytes(archivePath)
core.info(

View File

@ -7,7 +7,10 @@ export interface ITypedResponseWithError<T> extends TypedResponse<T> {
}
export interface ArtifactCacheEntry {
provider: string
auth_method: string
cache_key?: string
archive_location?: string
pre_signed_url?: string
cache_version?: string
}

View File

@ -13,6 +13,7 @@ import {DownloadOptions} from '../options'
import {retryHttpClientResponse} from './requestUtils'
import {AbortController} from '@azure/abort-controller'
import {Storage} from '@google-cloud/storage'
/**
* Pipes the body of a HTTP response to a stream
@ -292,3 +293,47 @@ export async function downloadCacheMultiConnection(
await fileHandle?.close()
}
}
/**
* Download the cache to a provider writable stream using GCloud SDK
*
* @param archiveLocation the URL for the cache
*/
export function downloadCacheStreamingGCP(
archiveLocation: string
): NodeJS.ReadableStream | undefined {
try {
const storage = new Storage({
token: process.env['GCP_ACCESS_TOKEN']
})
// The archiveLocation for GCP will be in the format of gs://<bucket-name>/<object-name>
const bucketName = archiveLocation.split('/')[2]
if (!bucketName || bucketName.length < 2) {
throw new Error(
`Invalid GCS URL: ${archiveLocation}. Should be in the format gs://<bucket-name>/<object-name>`
)
}
const fileName = archiveLocation.split('/').slice(3).join('/')
if (!fileName || fileName.length < 1) {
throw new Error(
`Invalid GCS URL: ${archiveLocation}. Should be in the format gs://<bucket-name>/<object-name>`
)
}
storage
.bucket(bucketName)
.file(fileName)
.getMetadata()
.then(data => {
core.info(`File size: ${data[0]?.size} bytes`)
})
return storage.bucket(bucketName).file(fileName).createReadStream()
} catch (error) {
core.debug(`Failed to download cache: ${error}`)
core.error(`Failed to download cache.`)
throw error
}
}

View File

@ -11,9 +11,17 @@ import {
TarFilename,
ManifestFilename
} from './constants'
import {ChildProcessWithoutNullStreams, spawn} from 'child_process'
const IS_WINDOWS = process.platform === 'win32'
enum TAR_MODE {
CREATE = 'create',
EXTRACT = 'extract',
EXTRACT_STREAM = 'extractStream',
LIST = 'list'
}
// Returns tar path and type: BSD or GNU
async function getTarPath(): Promise<ArchiveTool> {
switch (process.platform) {
@ -54,7 +62,7 @@ async function getTarPath(): Promise<ArchiveTool> {
async function getTarArgs(
tarPath: ArchiveTool,
compressionMethod: CompressionMethod,
type: string,
type: TAR_MODE,
archivePath = ''
): Promise<string[]> {
const args = [`"${tarPath.path}"`]
@ -69,7 +77,7 @@ async function getTarArgs(
// Method specific args
switch (type) {
case 'create':
case TAR_MODE.CREATE:
args.push(
'--posix',
'-cf',
@ -87,7 +95,7 @@ async function getTarArgs(
ManifestFilename
)
break
case 'extract':
case TAR_MODE.EXTRACT:
args.push(
'-xf',
BSD_TAR_ZSTD
@ -98,7 +106,16 @@ async function getTarArgs(
workingDirectory.replace(new RegExp(`\\${path.sep}`, 'g'), '/')
)
break
case 'list':
case TAR_MODE.EXTRACT_STREAM:
args.push(
'-xf',
'-',
'-P',
'-C',
workingDirectory.replace(new RegExp(`\\${path.sep}`, 'g'), '/')
)
break
case TAR_MODE.LIST:
args.push(
'-tf',
BSD_TAR_ZSTD
@ -127,7 +144,7 @@ async function getTarArgs(
// Returns commands to run tar and compression program
async function getCommands(
compressionMethod: CompressionMethod,
type: string,
type: TAR_MODE,
archivePath = ''
): Promise<string[]> {
let args
@ -139,8 +156,9 @@ async function getCommands(
type,
archivePath
)
const compressionArgs =
type !== 'create'
type !== TAR_MODE.CREATE
? await getDecompressionProgram(tarPath, compressionMethod, archivePath)
: await getCompressionProgram(tarPath, compressionMethod)
const BSD_TAR_ZSTD =
@ -148,7 +166,7 @@ async function getCommands(
compressionMethod !== CompressionMethod.Gzip &&
IS_WINDOWS
if (BSD_TAR_ZSTD && type !== 'create') {
if (BSD_TAR_ZSTD && type !== TAR_MODE.CREATE) {
args = [[...compressionArgs].join(' '), [...tarArgs].join(' ')]
} else {
args = [[...tarArgs].join(' '), [...compressionArgs].join(' ')]
@ -161,6 +179,42 @@ async function getCommands(
return [args.join(' ')]
}
/*
* Returns command pipes to stream data to tar and compression program.
* Only supports tar and zstd at the moment
* @returns Array of ChildProcessWithoutNullStreams. Pipe to the processes in the order they are returned
*/
async function getCommandPipes(
compressionMethod: CompressionMethod,
type: TAR_MODE,
archivePath = ''
): Promise<ChildProcessWithoutNullStreams[]> {
const spawnedProcesses: ChildProcessWithoutNullStreams[] = []
const tarPath = await getTarPath()
const tarArgs = await getTarArgs(
tarPath,
compressionMethod,
type,
archivePath
)
// Remove tar executable from tarArgs
tarArgs.shift()
let zstdInfo =
type !== TAR_MODE.CREATE
? await getDecompressionProgramStream(tarPath, compressionMethod)
: await getCompressionProgramStream(tarPath, compressionMethod)
const zstdProcess = spawn(zstdInfo.command, zstdInfo.args)
spawnedProcesses.push(zstdProcess)
const tarProcess = spawn(tarPath.path, tarArgs)
spawnedProcesses.push(tarProcess)
return spawnedProcesses
}
function getWorkingDirectory(): string {
return process.env['GITHUB_WORKSPACE'] ?? process.cwd()
}
@ -204,6 +258,39 @@ async function getDecompressionProgram(
}
}
// Alternative to getDecompressionProgram which returns zstd that command that can be piped into
async function getDecompressionProgramStream(
tarPath: ArchiveTool,
compressionMethod: CompressionMethod
): Promise<{command: string; args: string[]}> {
const BSD_TAR_ZSTD =
tarPath.type === ArchiveToolType.BSD &&
compressionMethod !== CompressionMethod.Gzip &&
IS_WINDOWS
switch (compressionMethod) {
case CompressionMethod.Zstd:
return BSD_TAR_ZSTD
? {command: 'zstd', args: ['-d', '--long=30', '--force', '--stdout']}
: {
command: IS_WINDOWS ? 'zstd' : 'unzstd',
args: IS_WINDOWS
? ['-d', '--long=30', '--stdout', '-T0']
: ['--long=30', '--stdout', '-T0']
}
case CompressionMethod.ZstdWithoutLong:
return BSD_TAR_ZSTD
? {command: 'zstd', args: ['-d', '--force', '--stdout']}
: {
command: IS_WINDOWS ? 'zstd' : 'unzstd',
args: ['-d', '--stdout', '-T0']
}
default:
// Assuming gzip is the default method if none specified
return {command: 'gzip', args: ['-d']}
}
}
// Used for creating the archive
// -T#: Compress using # working thread. If # is 0, attempt to detect and use the number of physical CPU cores.
// zstdmt is equivalent to 'zstd -T0'
@ -244,6 +331,44 @@ async function getCompressionProgram(
}
}
async function getCompressionProgramStream(
tarPath: ArchiveTool,
compressionMethod: CompressionMethod
): Promise<{command: string; args: string[]}> {
const BSD_TAR_ZSTD =
tarPath.type === ArchiveToolType.BSD &&
compressionMethod !== CompressionMethod.Gzip &&
IS_WINDOWS
switch (compressionMethod) {
case CompressionMethod.Zstd:
return BSD_TAR_ZSTD
? {
command: 'zstd',
args: ['-T0', '--long=30', '--force', '--stdout']
}
: {
command: IS_WINDOWS ? 'zstd' : 'zstdmt',
args: IS_WINDOWS
? ['-T0', '--long=30', '--stdout', '-T0']
: ['--long=30', '--stdout', '-T0']
}
case CompressionMethod.ZstdWithoutLong:
return BSD_TAR_ZSTD
? {
command: 'zstd',
args: ['-T0', '--force', '--stdout']
}
: {
command: IS_WINDOWS ? 'zstd' : 'zstdmt',
args: ['-T0', '--stdout']
}
default:
// Assuming gzip is the default method if none specified
return {command: 'gzip', args: []}
}
}
// Executes all commands as separate processes
async function execCommands(commands: string[], cwd?: string): Promise<void> {
for (const command of commands) {
@ -265,11 +390,14 @@ export async function listTar(
archivePath: string,
compressionMethod: CompressionMethod
): Promise<void> {
const commands = await getCommands(compressionMethod, 'list', archivePath)
const commands = await getCommands(
compressionMethod,
TAR_MODE.LIST,
archivePath
)
await execCommands(commands)
}
// Extract a tar
export async function extractTar(
archivePath: string,
compressionMethod: CompressionMethod
@ -277,10 +405,67 @@ export async function extractTar(
// Create directory to extract tar into
const workingDirectory = getWorkingDirectory()
await io.mkdirP(workingDirectory)
const commands = await getCommands(compressionMethod, 'extract', archivePath)
const commands = await getCommands(
compressionMethod,
TAR_MODE.EXTRACT,
archivePath
)
await execCommands(commands)
}
// Supports only archives created using tar and zstd
export async function extractStreamingTar(
stream: NodeJS.ReadableStream,
archivePath: string,
compressionMethod: CompressionMethod
): Promise<void> {
const workingDirectory = getWorkingDirectory()
await io.mkdirP(workingDirectory)
const commandPipes = await getCommandPipes(
compressionMethod,
TAR_MODE.EXTRACT_STREAM,
archivePath
)
if (commandPipes.length < 2) {
throw new Error(
'At least two processes should be present as the archive is compressed at least twice.'
)
}
return new Promise((resolve, reject) => {
stream.pipe(commandPipes[0].stdin)
for (let i = 0; i < commandPipes.length - 1; i++) {
commandPipes[i].stdout.pipe(commandPipes[i + 1].stdin)
commandPipes[i].stderr.on('data', data => {
reject(
new Error(`Error in ${commandPipes[i].spawnfile}: ${data.toString()}`)
)
})
commandPipes[i].on('error', error => {
reject(
new Error(`Error in ${commandPipes[i].spawnfile}: ${error.message}`)
)
})
}
const lastCommand = commandPipes[commandPipes.length - 1]
lastCommand.stderr.on('data', data => {
console.error(`Error in ${lastCommand.spawnfile}:`, data.toString())
reject(new Error(`Error in ${lastCommand.spawnfile}: ${data.toString()}`))
})
lastCommand.on('close', code => {
if (code === 0) {
resolve()
} else {
reject(new Error(`Last command exited with code ${code}`))
}
})
})
}
// Create a tar
export async function createTar(
archiveFolder: string,
@ -292,6 +477,6 @@ export async function createTar(
path.join(archiveFolder, ManifestFilename),
sourceDirectories.join('\n')
)
const commands = await getCommands(compressionMethod, 'create')
const commands = await getCommands(compressionMethod, TAR_MODE.CREATE)
await execCommands(commands, archiveFolder)
}

View File

@ -1,6 +1,12 @@
import {exec, spawn} from 'child_process'
import {deleteCache, restoreCache, saveCache} from './cache'
import {downloadCacheStreamingGCP} from './internal/downloadUtils'
process.env['WARP_CACHE_URL'] = 'http://localhost:8002'
import fs, {write} from 'fs'
import {extractStreamingTar} from './internal/tar'
import {CompressionMethod} from './internal/constants'
process.env['WARPBUILD_CACHE_URL'] = 'http://localhost:8002'
process.env['RUNNER_TEMP'] = '/Users/prajjwal/Repos/warpbuild/playground/tmp_fs'
process.env['NODE_DEBUG'] = 'http'
@ -19,3 +25,22 @@ process.env['NODE_DEBUG'] = 'http'
// )
// deleteCache(['test-fs-local-key'])
process.env['GCP_ACCESS_TOKEN'] =
'ya29.c.c0AY_VpZgcQopWxkSf9wIIo9NED0YFh3VIgZ1wx1ulvSCrq5iTiZWbrRGPej2vA835U2HkNdrLwaVKFLeL57v1s-guzSvihNnHMMJ4wUPJHZPQd-CJ90i6F0NYcjQuv7SC2EBkaKciM-Act0IDygPwzwwixCe-4iCxcUv3YUysZcee9Qknxq5UBPfGjqQArVKifC2fScJ7HnBmbbSc8t1mDp9mLiIpax9V31anOQ-4QK1kqSgi4gh0m-Cd7v24S7Kfc5IEcQLrVyI62W4Y4HywRJ2V_qBx3ZKFMmO1lV5Tl3wHX40XyD1J2Cc6kXbF4LHHPcMnRf85ylaXaUGMwDNlkDPFHRJmOkWnZF8-v_Y4868-Mmektdl8khWvCQwGSLHo_jCKehCJZl1qK1gzNfie7Rgm9qbooMAEg1KkPPiDBmMY_WUsBo1-a0vuHrE90IhtvKI_TNTeH-pUDjSFMsbgrhnbGu5oN6DXk--WyjHy9slW6r8TDjB8UjPE2uiaGbYrQZsRPoaKVAxVylc9tFONyPwJ10MUerPq3ESq49QUASdasuYCef0CZ_R3kJyIMQe7p6WBfOZ0L11ZTz_tnFn1Oa8JGHvYl1xvx79EbHjo4mvyr5WTAXa42g-gCnPnJFLaN649DRZbdRzbbc3-bQbqFuictuoSQmOjhrqW6_0_44wVhlga9Ok9kZ4_lx6Oqvq9SiI6IxIJSBVnXet3MgzoRdJur8Ws766sinJ_iFkZdsQdj2IQ_hj74vh61v1i84xIZY-bp-IrvQQf_vZm6bbBZXxaXhiVphpij7nY5Rz3qS2d0e3byc1iUW63jXlY1iIhlvsd1i2Zd4YVyQrfgSy_zpuXJOqhS1MwBrkddb4F-r3wQtRJ1ttmbpSJOpeYzewzSeVopk8pmOaUSd0rS4qQkY1UdhQoavyn54VMj5U8BiOkjo-wV2MUXl0FlVF7u3-c3vUhlZ1JrMj6xiWFXys_QBMtU55jMe31UV-saSFxM7f1-xk1_2xoou8'
const readStream = downloadCacheStreamingGCP(
'gs://cache-bench-test/custom_modules.tar.zst'
)
extractStreamingTar(
readStream!,
'/tmp/custom_modules',
CompressionMethod.ZstdWithoutLong
)
.then(() => {
console.log('done')
})
.catch(err => {
console.log(err)
})