1
0
Fork 0

Add cache service v2 client

pull/1857/head
Bassem Dghaidi 2024-09-24 03:17:44 -07:00 committed by GitHub
parent 70e5684b1f
commit 07e51a445e
9 changed files with 2828 additions and 1218 deletions

View File

@ -1,25 +1,27 @@
import * as core from '@actions/core' import * as core from '@actions/core'
import * as path from 'path' import * as path from 'path'
import * as utils from './internal/cacheUtils' import * as utils from './internal/cacheUtils'
import {CacheServiceVersion, CacheUrl} from './internal/constants' import { CacheServiceVersion, CacheUrl } from './internal/constants'
import * as cacheHttpClient from './internal/cacheHttpClient' import * as cacheHttpClient from './internal/cacheHttpClient'
import * as cacheTwirpClient from './internal/cacheTwirpClient' import * as cacheTwirpClient from './internal/cacheTwirpClient'
import {createTar, extractTar, listTar} from './internal/tar' import { createTar, extractTar, listTar } from './internal/tar'
import {DownloadOptions, UploadOptions} from './options' import { DownloadOptions, UploadOptions } from './options'
import { import {
GetCacheBlobUploadURLRequest, CreateCacheEntryRequest,
GetCacheBlobUploadURLResponse, CreateCacheEntryResponse,
GetCachedBlobRequest, FinalizeCacheEntryUploadRequest,
GetCachedBlobResponse FinalizeCacheEntryUploadResponse,
} from './generated/results/api/v1/blobcache' GetCacheEntryDownloadURLRequest,
import {UploadCacheStream} from './internal/v2/upload-cache' GetCacheEntryDownloadURLResponse
import {StreamExtract} from './internal/v2/download-cache' } from './generated/results/api/v1/cache'
import { UploadCacheStream } from './internal/v2/upload-cache'
import { StreamExtract } from './internal/v2/download-cache'
import { import {
UploadZipSpecification, UploadZipSpecification,
getUploadZipSpecification getUploadZipSpecification
} from '@actions/artifact/lib/internal/upload/upload-zip-specification' } from '@actions/artifact/lib/internal/upload/upload-zip-specification'
import {createZipUploadStream} from '@actions/artifact/lib/internal/upload/zip' import { createZipUploadStream } from '@actions/artifact/lib/internal/upload/zip'
import {getBackendIdsFromToken, BackendIds} from '@actions/artifact/lib/internal/shared/util' import { getBackendIdsFromToken, BackendIds } from '@actions/artifact/lib/internal/shared/util'
export class ValidationError extends Error { export class ValidationError extends Error {
constructor(message: string) { constructor(message: string) {
@ -193,7 +195,7 @@ async function restoreCachev2(
options?: DownloadOptions, options?: DownloadOptions,
enableCrossOsArchive = false enableCrossOsArchive = false
) { ) {
restoreKeys = restoreKeys || [] restoreKeys = restoreKeys || []
const keys = [primaryKey, ...restoreKeys] const keys = [primaryKey, ...restoreKeys]
@ -212,24 +214,31 @@ async function restoreCachev2(
try { try {
// BackendIds are retrieved form the signed JWT // BackendIds are retrieved form the signed JWT
const backendIds: BackendIds = getBackendIdsFromToken() const backendIds: BackendIds = getBackendIdsFromToken()
const twirpClient = cacheTwirpClient.internalBlobCacheTwirpClient() const compressionMethod = await utils.getCompressionMethod()
const getSignedDownloadURLRequest: GetCachedBlobRequest = { const twirpClient = cacheTwirpClient.internalCacheTwirpClient()
const request: GetCacheEntryDownloadURLRequest = {
workflowRunBackendId: backendIds.workflowRunBackendId, workflowRunBackendId: backendIds.workflowRunBackendId,
workflowJobRunBackendId: backendIds.workflowJobRunBackendId, workflowJobRunBackendId: backendIds.workflowJobRunBackendId,
keys: keys, key: primaryKey,
restoreKeys: restoreKeys,
version: utils.getCacheVersion(
paths,
compressionMethod,
enableCrossOsArchive,
),
} }
const signedDownloadURL: GetCachedBlobResponse = await twirpClient.GetCachedBlob(getSignedDownloadURLRequest) const response: GetCacheEntryDownloadURLResponse = await twirpClient.GetCacheEntryDownloadURL(request)
core.info(`GetCachedBlobResponse: ${JSON.stringify(signedDownloadURL)}`) core.info(`GetCacheEntryDownloadURLResponse: ${JSON.stringify(response)}`)
if (signedDownloadURL.blobs.length === 0) { if (!response.ok) {
// Cache not found // Cache not found
core.warning(`Cache not found for keys: ${keys.join(', ')}`) core.warning(`Cache not found for keys: ${keys.join(', ')}`)
return undefined return undefined
} }
core.info(`Cache hit for: ${signedDownloadURL.blobs[0].key}`) core.info(`Cache hit for: ${request.key}`)
core.info(`Starting download of artifact to: ${paths[0]}`) core.info(`Starting download of artifact to: ${paths[0]}`)
await StreamExtract(signedDownloadURL.blobs[0].signedUrl, path.dirname(paths[0])) await StreamExtract(response.signedDownloadUrl, path.dirname(paths[0]))
core.info(`Artifact download completed successfully.`) core.info(`Artifact download completed successfully.`)
return keys[0] return keys[0]
@ -255,7 +264,7 @@ export async function saveCache(
): Promise<number> { ): Promise<number> {
checkPaths(paths) checkPaths(paths)
checkKey(key) checkKey(key)
console.debug(`Cache Service Version: ${CacheServiceVersion}`) console.debug(`Cache Service Version: ${CacheServiceVersion}`)
switch (CacheServiceVersion) { switch (CacheServiceVersion) {
case "v2": case "v2":
@ -327,9 +336,9 @@ async function saveCachev1(
} else if (reserveCacheResponse?.statusCode === 400) { } else if (reserveCacheResponse?.statusCode === 400) {
throw new Error( throw new Error(
reserveCacheResponse?.error?.message ?? reserveCacheResponse?.error?.message ??
`Cache size of ~${Math.round( `Cache size of ~${Math.round(
archiveFileSize / (1024 * 1024) archiveFileSize / (1024 * 1024)
)} MB (${archiveFileSize} B) is over the data cap limit, not saving cache.` )} MB (${archiveFileSize} B) is over the data cap limit, not saving cache.`
) )
} else { } else {
throw new ReserveCacheError( throw new ReserveCacheError(
@ -368,15 +377,21 @@ async function saveCachev2(
): Promise<number> { ): Promise<number> {
// BackendIds are retrieved form the signed JWT // BackendIds are retrieved form the signed JWT
const backendIds: BackendIds = getBackendIdsFromToken() const backendIds: BackendIds = getBackendIdsFromToken()
const twirpClient = cacheTwirpClient.internalBlobCacheTwirpClient() const compressionMethod = await utils.getCompressionMethod()
const getSignedUploadURL: GetCacheBlobUploadURLRequest = { const version = utils.getCacheVersion(
paths,
compressionMethod,
enableCrossOsArchive
)
const twirpClient = cacheTwirpClient.internalCacheTwirpClient()
const request: CreateCacheEntryRequest = {
workflowRunBackendId: backendIds.workflowRunBackendId, workflowRunBackendId: backendIds.workflowRunBackendId,
workflowJobRunBackendId: backendIds.workflowJobRunBackendId, workflowJobRunBackendId: backendIds.workflowJobRunBackendId,
organization: "github", key: key,
keys: [key], version: version
} }
const signedUploadURL: GetCacheBlobUploadURLResponse = await twirpClient.GetCacheBlobUploadURL(getSignedUploadURL) const response: CreateCacheEntryResponse = await twirpClient.CreateCacheEntry(request)
core.info(`GetCacheBlobUploadURLResponse: ${JSON.stringify(signedUploadURL)}`) core.info(`CreateCacheEntryResponse: ${JSON.stringify(response)}`)
// Archive // Archive
// We're going to handle 1 path fow now. This needs to be fixed to handle all // We're going to handle 1 path fow now. This needs to be fixed to handle all
@ -403,7 +418,19 @@ async function saveCachev2(
// - getSignedUploadURL // - getSignedUploadURL
// - archivePath // - archivePath
core.info(`Saving Cache v2: ${paths[0]}`) core.info(`Saving Cache v2: ${paths[0]}`)
await UploadCacheStream(signedUploadURL.urls[0].url, zipUploadStream) await UploadCacheStream(response.signedUploadUrl, zipUploadStream)
// Finalize the cache entry
const finalizeRequest: FinalizeCacheEntryUploadRequest = {
workflowRunBackendId: backendIds.workflowRunBackendId,
workflowJobRunBackendId: backendIds.workflowJobRunBackendId,
key: key,
version: version,
sizeBytes: "1024",
}
const finalizeResponse: FinalizeCacheEntryUploadResponse = await twirpClient.FinalizeCacheEntryUpload(finalizeRequest)
core.info(`FinalizeCacheEntryUploadResponse: ${JSON.stringify(finalizeResponse)}`)
return 0 return 0
} }

View File

@ -1,513 +0,0 @@
// @generated by protobuf-ts 2.9.1 with parameter long_type_string,client_none,generate_dependencies
// @generated from protobuf file "results/api/v1/blobcache.proto" (package "github.actions.results.api.v1", syntax proto3)
// tslint:disable
import { ServiceType } from "@protobuf-ts/runtime-rpc";
import type { BinaryWriteOptions } from "@protobuf-ts/runtime";
import type { IBinaryWriter } from "@protobuf-ts/runtime";
import { WireType } from "@protobuf-ts/runtime";
import type { BinaryReadOptions } from "@protobuf-ts/runtime";
import type { IBinaryReader } from "@protobuf-ts/runtime";
import { UnknownFieldHandler } from "@protobuf-ts/runtime";
import type { PartialMessage } from "@protobuf-ts/runtime";
import { reflectionMergePartial } from "@protobuf-ts/runtime";
import { MESSAGE_TYPE } from "@protobuf-ts/runtime";
import { MessageType } from "@protobuf-ts/runtime";
import { Timestamp } from "../../../google/protobuf/timestamp";
/**
* @generated from protobuf message github.actions.results.api.v1.GetCachedBlobRequest
*/
export interface GetCachedBlobRequest {
/**
* Workflow run backend ID
*
* @generated from protobuf field: string workflow_run_backend_id = 1;
*/
workflowRunBackendId: string;
/**
* Workflow job run backend ID
*
* @generated from protobuf field: string workflow_job_run_backend_id = 2;
*/
workflowJobRunBackendId: string;
/**
* Key(s) of te blob(s) to be retrieved
*
* @generated from protobuf field: repeated string keys = 3;
*/
keys: string[];
}
/**
* @generated from protobuf message github.actions.results.api.v1.GetCachedBlobResponse
*/
export interface GetCachedBlobResponse {
/**
* List of blobs that match the requested keys
*
* @generated from protobuf field: repeated github.actions.results.api.v1.GetCachedBlobResponse.Blob blobs = 1;
*/
blobs: GetCachedBlobResponse_Blob[];
}
/**
* @generated from protobuf message github.actions.results.api.v1.GetCachedBlobResponse.Blob
*/
export interface GetCachedBlobResponse_Blob {
/**
* Key of the blob
*
* @generated from protobuf field: string key = 1;
*/
key: string;
/**
* Download url for the cached blob
*
* @generated from protobuf field: string signed_url = 2;
*/
signedUrl: string;
/**
* Version of the cached blob entry
*
* @generated from protobuf field: int32 version = 3;
*/
version: number;
/**
* Checksum of the blob
*
* @generated from protobuf field: string checksum = 4;
*/
checksum: string;
/**
* Timestamp for when the blob cache entry expires
*
* @generated from protobuf field: google.protobuf.Timestamp expires_at = 5;
*/
expiresAt?: Timestamp;
/**
* Timestamp for when the blob cache entry was created
*
* @generated from protobuf field: google.protobuf.Timestamp created_at = 6;
*/
createdAt?: Timestamp;
}
/**
* @generated from protobuf message github.actions.results.api.v1.GetCacheBlobUploadURLRequest
*/
export interface GetCacheBlobUploadURLRequest {
/**
* Workflow run backend ID
*
* @generated from protobuf field: string workflow_run_backend_id = 1;
*/
workflowRunBackendId: string;
/**
* Workflow job run backend ID
*
* @generated from protobuf field: string workflow_job_run_backend_id = 2;
*/
workflowJobRunBackendId: string;
/**
* / Owner of the blob(s) to be retrieved
*
* @generated from protobuf field: string organization = 3;
*/
organization: string;
/**
* Key(s) of te blob(s) to be retrieved
*
* @generated from protobuf field: repeated string keys = 4;
*/
keys: string[];
}
/**
* @generated from protobuf message github.actions.results.api.v1.GetCacheBlobUploadURLResponse
*/
export interface GetCacheBlobUploadURLResponse {
/**
* List of upload URLs that match the requested keys
*
* @generated from protobuf field: repeated github.actions.results.api.v1.GetCacheBlobUploadURLResponse.Url urls = 1;
*/
urls: GetCacheBlobUploadURLResponse_Url[];
}
/**
* @generated from protobuf message github.actions.results.api.v1.GetCacheBlobUploadURLResponse.Url
*/
export interface GetCacheBlobUploadURLResponse_Url {
/**
* Key of the blob
*
* @generated from protobuf field: string key = 1;
*/
key: string;
/**
* URL to the blob
*
* @generated from protobuf field: string url = 2;
*/
url: string;
}
// @generated message type with reflection information, may provide speed optimized methods
class GetCachedBlobRequest$Type extends MessageType<GetCachedBlobRequest> {
constructor() {
super("github.actions.results.api.v1.GetCachedBlobRequest", [
{ no: 1, name: "workflow_run_backend_id", kind: "scalar", T: 9 /*ScalarType.STRING*/ },
{ no: 2, name: "workflow_job_run_backend_id", kind: "scalar", T: 9 /*ScalarType.STRING*/ },
{ no: 3, name: "keys", kind: "scalar", repeat: 2 /*RepeatType.UNPACKED*/, T: 9 /*ScalarType.STRING*/ }
]);
}
create(value?: PartialMessage<GetCachedBlobRequest>): GetCachedBlobRequest {
const message = { workflowRunBackendId: "", workflowJobRunBackendId: "", keys: [] };
globalThis.Object.defineProperty(message, MESSAGE_TYPE, { enumerable: false, value: this });
if (value !== undefined)
reflectionMergePartial<GetCachedBlobRequest>(this, message, value);
return message;
}
internalBinaryRead(reader: IBinaryReader, length: number, options: BinaryReadOptions, target?: GetCachedBlobRequest): GetCachedBlobRequest {
let message = target ?? this.create(), end = reader.pos + length;
while (reader.pos < end) {
let [fieldNo, wireType] = reader.tag();
switch (fieldNo) {
case /* string workflow_run_backend_id */ 1:
message.workflowRunBackendId = reader.string();
break;
case /* string workflow_job_run_backend_id */ 2:
message.workflowJobRunBackendId = reader.string();
break;
case /* repeated string keys */ 3:
message.keys.push(reader.string());
break;
default:
let u = options.readUnknownField;
if (u === "throw")
throw new globalThis.Error(`Unknown field ${fieldNo} (wire type ${wireType}) for ${this.typeName}`);
let d = reader.skip(wireType);
if (u !== false)
(u === true ? UnknownFieldHandler.onRead : u)(this.typeName, message, fieldNo, wireType, d);
}
}
return message;
}
internalBinaryWrite(message: GetCachedBlobRequest, writer: IBinaryWriter, options: BinaryWriteOptions): IBinaryWriter {
/* string workflow_run_backend_id = 1; */
if (message.workflowRunBackendId !== "")
writer.tag(1, WireType.LengthDelimited).string(message.workflowRunBackendId);
/* string workflow_job_run_backend_id = 2; */
if (message.workflowJobRunBackendId !== "")
writer.tag(2, WireType.LengthDelimited).string(message.workflowJobRunBackendId);
/* repeated string keys = 3; */
for (let i = 0; i < message.keys.length; i++)
writer.tag(3, WireType.LengthDelimited).string(message.keys[i]);
let u = options.writeUnknownFields;
if (u !== false)
(u == true ? UnknownFieldHandler.onWrite : u)(this.typeName, message, writer);
return writer;
}
}
/**
* @generated MessageType for protobuf message github.actions.results.api.v1.GetCachedBlobRequest
*/
export const GetCachedBlobRequest = new GetCachedBlobRequest$Type();
// @generated message type with reflection information, may provide speed optimized methods
class GetCachedBlobResponse$Type extends MessageType<GetCachedBlobResponse> {
constructor() {
super("github.actions.results.api.v1.GetCachedBlobResponse", [
{ no: 1, name: "blobs", kind: "message", repeat: 1 /*RepeatType.PACKED*/, T: () => GetCachedBlobResponse_Blob }
]);
}
create(value?: PartialMessage<GetCachedBlobResponse>): GetCachedBlobResponse {
const message = { blobs: [] };
globalThis.Object.defineProperty(message, MESSAGE_TYPE, { enumerable: false, value: this });
if (value !== undefined)
reflectionMergePartial<GetCachedBlobResponse>(this, message, value);
return message;
}
internalBinaryRead(reader: IBinaryReader, length: number, options: BinaryReadOptions, target?: GetCachedBlobResponse): GetCachedBlobResponse {
let message = target ?? this.create(), end = reader.pos + length;
while (reader.pos < end) {
let [fieldNo, wireType] = reader.tag();
switch (fieldNo) {
case /* repeated github.actions.results.api.v1.GetCachedBlobResponse.Blob blobs */ 1:
message.blobs.push(GetCachedBlobResponse_Blob.internalBinaryRead(reader, reader.uint32(), options));
break;
default:
let u = options.readUnknownField;
if (u === "throw")
throw new globalThis.Error(`Unknown field ${fieldNo} (wire type ${wireType}) for ${this.typeName}`);
let d = reader.skip(wireType);
if (u !== false)
(u === true ? UnknownFieldHandler.onRead : u)(this.typeName, message, fieldNo, wireType, d);
}
}
return message;
}
internalBinaryWrite(message: GetCachedBlobResponse, writer: IBinaryWriter, options: BinaryWriteOptions): IBinaryWriter {
/* repeated github.actions.results.api.v1.GetCachedBlobResponse.Blob blobs = 1; */
for (let i = 0; i < message.blobs.length; i++)
GetCachedBlobResponse_Blob.internalBinaryWrite(message.blobs[i], writer.tag(1, WireType.LengthDelimited).fork(), options).join();
let u = options.writeUnknownFields;
if (u !== false)
(u == true ? UnknownFieldHandler.onWrite : u)(this.typeName, message, writer);
return writer;
}
}
/**
* @generated MessageType for protobuf message github.actions.results.api.v1.GetCachedBlobResponse
*/
export const GetCachedBlobResponse = new GetCachedBlobResponse$Type();
// @generated message type with reflection information, may provide speed optimized methods
class GetCachedBlobResponse_Blob$Type extends MessageType<GetCachedBlobResponse_Blob> {
constructor() {
super("github.actions.results.api.v1.GetCachedBlobResponse.Blob", [
{ no: 1, name: "key", kind: "scalar", T: 9 /*ScalarType.STRING*/ },
{ no: 2, name: "signed_url", kind: "scalar", T: 9 /*ScalarType.STRING*/ },
{ no: 3, name: "version", kind: "scalar", T: 5 /*ScalarType.INT32*/ },
{ no: 4, name: "checksum", kind: "scalar", T: 9 /*ScalarType.STRING*/ },
{ no: 5, name: "expires_at", kind: "message", T: () => Timestamp },
{ no: 6, name: "created_at", kind: "message", T: () => Timestamp }
]);
}
create(value?: PartialMessage<GetCachedBlobResponse_Blob>): GetCachedBlobResponse_Blob {
const message = { key: "", signedUrl: "", version: 0, checksum: "" };
globalThis.Object.defineProperty(message, MESSAGE_TYPE, { enumerable: false, value: this });
if (value !== undefined)
reflectionMergePartial<GetCachedBlobResponse_Blob>(this, message, value);
return message;
}
internalBinaryRead(reader: IBinaryReader, length: number, options: BinaryReadOptions, target?: GetCachedBlobResponse_Blob): GetCachedBlobResponse_Blob {
let message = target ?? this.create(), end = reader.pos + length;
while (reader.pos < end) {
let [fieldNo, wireType] = reader.tag();
switch (fieldNo) {
case /* string key */ 1:
message.key = reader.string();
break;
case /* string signed_url */ 2:
message.signedUrl = reader.string();
break;
case /* int32 version */ 3:
message.version = reader.int32();
break;
case /* string checksum */ 4:
message.checksum = reader.string();
break;
case /* google.protobuf.Timestamp expires_at */ 5:
message.expiresAt = Timestamp.internalBinaryRead(reader, reader.uint32(), options, message.expiresAt);
break;
case /* google.protobuf.Timestamp created_at */ 6:
message.createdAt = Timestamp.internalBinaryRead(reader, reader.uint32(), options, message.createdAt);
break;
default:
let u = options.readUnknownField;
if (u === "throw")
throw new globalThis.Error(`Unknown field ${fieldNo} (wire type ${wireType}) for ${this.typeName}`);
let d = reader.skip(wireType);
if (u !== false)
(u === true ? UnknownFieldHandler.onRead : u)(this.typeName, message, fieldNo, wireType, d);
}
}
return message;
}
internalBinaryWrite(message: GetCachedBlobResponse_Blob, writer: IBinaryWriter, options: BinaryWriteOptions): IBinaryWriter {
/* string key = 1; */
if (message.key !== "")
writer.tag(1, WireType.LengthDelimited).string(message.key);
/* string signed_url = 2; */
if (message.signedUrl !== "")
writer.tag(2, WireType.LengthDelimited).string(message.signedUrl);
/* int32 version = 3; */
if (message.version !== 0)
writer.tag(3, WireType.Varint).int32(message.version);
/* string checksum = 4; */
if (message.checksum !== "")
writer.tag(4, WireType.LengthDelimited).string(message.checksum);
/* google.protobuf.Timestamp expires_at = 5; */
if (message.expiresAt)
Timestamp.internalBinaryWrite(message.expiresAt, writer.tag(5, WireType.LengthDelimited).fork(), options).join();
/* google.protobuf.Timestamp created_at = 6; */
if (message.createdAt)
Timestamp.internalBinaryWrite(message.createdAt, writer.tag(6, WireType.LengthDelimited).fork(), options).join();
let u = options.writeUnknownFields;
if (u !== false)
(u == true ? UnknownFieldHandler.onWrite : u)(this.typeName, message, writer);
return writer;
}
}
/**
* @generated MessageType for protobuf message github.actions.results.api.v1.GetCachedBlobResponse.Blob
*/
export const GetCachedBlobResponse_Blob = new GetCachedBlobResponse_Blob$Type();
// @generated message type with reflection information, may provide speed optimized methods
class GetCacheBlobUploadURLRequest$Type extends MessageType<GetCacheBlobUploadURLRequest> {
constructor() {
super("github.actions.results.api.v1.GetCacheBlobUploadURLRequest", [
{ no: 1, name: "workflow_run_backend_id", kind: "scalar", T: 9 /*ScalarType.STRING*/ },
{ no: 2, name: "workflow_job_run_backend_id", kind: "scalar", T: 9 /*ScalarType.STRING*/ },
{ no: 3, name: "organization", kind: "scalar", T: 9 /*ScalarType.STRING*/ },
{ no: 4, name: "keys", kind: "scalar", repeat: 2 /*RepeatType.UNPACKED*/, T: 9 /*ScalarType.STRING*/ }
]);
}
create(value?: PartialMessage<GetCacheBlobUploadURLRequest>): GetCacheBlobUploadURLRequest {
const message = { workflowRunBackendId: "", workflowJobRunBackendId: "", organization: "", keys: [] };
globalThis.Object.defineProperty(message, MESSAGE_TYPE, { enumerable: false, value: this });
if (value !== undefined)
reflectionMergePartial<GetCacheBlobUploadURLRequest>(this, message, value);
return message;
}
internalBinaryRead(reader: IBinaryReader, length: number, options: BinaryReadOptions, target?: GetCacheBlobUploadURLRequest): GetCacheBlobUploadURLRequest {
let message = target ?? this.create(), end = reader.pos + length;
while (reader.pos < end) {
let [fieldNo, wireType] = reader.tag();
switch (fieldNo) {
case /* string workflow_run_backend_id */ 1:
message.workflowRunBackendId = reader.string();
break;
case /* string workflow_job_run_backend_id */ 2:
message.workflowJobRunBackendId = reader.string();
break;
case /* string organization */ 3:
message.organization = reader.string();
break;
case /* repeated string keys */ 4:
message.keys.push(reader.string());
break;
default:
let u = options.readUnknownField;
if (u === "throw")
throw new globalThis.Error(`Unknown field ${fieldNo} (wire type ${wireType}) for ${this.typeName}`);
let d = reader.skip(wireType);
if (u !== false)
(u === true ? UnknownFieldHandler.onRead : u)(this.typeName, message, fieldNo, wireType, d);
}
}
return message;
}
internalBinaryWrite(message: GetCacheBlobUploadURLRequest, writer: IBinaryWriter, options: BinaryWriteOptions): IBinaryWriter {
/* string workflow_run_backend_id = 1; */
if (message.workflowRunBackendId !== "")
writer.tag(1, WireType.LengthDelimited).string(message.workflowRunBackendId);
/* string workflow_job_run_backend_id = 2; */
if (message.workflowJobRunBackendId !== "")
writer.tag(2, WireType.LengthDelimited).string(message.workflowJobRunBackendId);
/* string organization = 3; */
if (message.organization !== "")
writer.tag(3, WireType.LengthDelimited).string(message.organization);
/* repeated string keys = 4; */
for (let i = 0; i < message.keys.length; i++)
writer.tag(4, WireType.LengthDelimited).string(message.keys[i]);
let u = options.writeUnknownFields;
if (u !== false)
(u == true ? UnknownFieldHandler.onWrite : u)(this.typeName, message, writer);
return writer;
}
}
/**
* @generated MessageType for protobuf message github.actions.results.api.v1.GetCacheBlobUploadURLRequest
*/
export const GetCacheBlobUploadURLRequest = new GetCacheBlobUploadURLRequest$Type();
// @generated message type with reflection information, may provide speed optimized methods
class GetCacheBlobUploadURLResponse$Type extends MessageType<GetCacheBlobUploadURLResponse> {
constructor() {
super("github.actions.results.api.v1.GetCacheBlobUploadURLResponse", [
{ no: 1, name: "urls", kind: "message", repeat: 1 /*RepeatType.PACKED*/, T: () => GetCacheBlobUploadURLResponse_Url }
]);
}
create(value?: PartialMessage<GetCacheBlobUploadURLResponse>): GetCacheBlobUploadURLResponse {
const message = { urls: [] };
globalThis.Object.defineProperty(message, MESSAGE_TYPE, { enumerable: false, value: this });
if (value !== undefined)
reflectionMergePartial<GetCacheBlobUploadURLResponse>(this, message, value);
return message;
}
internalBinaryRead(reader: IBinaryReader, length: number, options: BinaryReadOptions, target?: GetCacheBlobUploadURLResponse): GetCacheBlobUploadURLResponse {
let message = target ?? this.create(), end = reader.pos + length;
while (reader.pos < end) {
let [fieldNo, wireType] = reader.tag();
switch (fieldNo) {
case /* repeated github.actions.results.api.v1.GetCacheBlobUploadURLResponse.Url urls */ 1:
message.urls.push(GetCacheBlobUploadURLResponse_Url.internalBinaryRead(reader, reader.uint32(), options));
break;
default:
let u = options.readUnknownField;
if (u === "throw")
throw new globalThis.Error(`Unknown field ${fieldNo} (wire type ${wireType}) for ${this.typeName}`);
let d = reader.skip(wireType);
if (u !== false)
(u === true ? UnknownFieldHandler.onRead : u)(this.typeName, message, fieldNo, wireType, d);
}
}
return message;
}
internalBinaryWrite(message: GetCacheBlobUploadURLResponse, writer: IBinaryWriter, options: BinaryWriteOptions): IBinaryWriter {
/* repeated github.actions.results.api.v1.GetCacheBlobUploadURLResponse.Url urls = 1; */
for (let i = 0; i < message.urls.length; i++)
GetCacheBlobUploadURLResponse_Url.internalBinaryWrite(message.urls[i], writer.tag(1, WireType.LengthDelimited).fork(), options).join();
let u = options.writeUnknownFields;
if (u !== false)
(u == true ? UnknownFieldHandler.onWrite : u)(this.typeName, message, writer);
return writer;
}
}
/**
* @generated MessageType for protobuf message github.actions.results.api.v1.GetCacheBlobUploadURLResponse
*/
export const GetCacheBlobUploadURLResponse = new GetCacheBlobUploadURLResponse$Type();
// @generated message type with reflection information, may provide speed optimized methods
class GetCacheBlobUploadURLResponse_Url$Type extends MessageType<GetCacheBlobUploadURLResponse_Url> {
constructor() {
super("github.actions.results.api.v1.GetCacheBlobUploadURLResponse.Url", [
{ no: 1, name: "key", kind: "scalar", T: 9 /*ScalarType.STRING*/ },
{ no: 2, name: "url", kind: "scalar", T: 9 /*ScalarType.STRING*/ }
]);
}
create(value?: PartialMessage<GetCacheBlobUploadURLResponse_Url>): GetCacheBlobUploadURLResponse_Url {
const message = { key: "", url: "" };
globalThis.Object.defineProperty(message, MESSAGE_TYPE, { enumerable: false, value: this });
if (value !== undefined)
reflectionMergePartial<GetCacheBlobUploadURLResponse_Url>(this, message, value);
return message;
}
internalBinaryRead(reader: IBinaryReader, length: number, options: BinaryReadOptions, target?: GetCacheBlobUploadURLResponse_Url): GetCacheBlobUploadURLResponse_Url {
let message = target ?? this.create(), end = reader.pos + length;
while (reader.pos < end) {
let [fieldNo, wireType] = reader.tag();
switch (fieldNo) {
case /* string key */ 1:
message.key = reader.string();
break;
case /* string url */ 2:
message.url = reader.string();
break;
default:
let u = options.readUnknownField;
if (u === "throw")
throw new globalThis.Error(`Unknown field ${fieldNo} (wire type ${wireType}) for ${this.typeName}`);
let d = reader.skip(wireType);
if (u !== false)
(u === true ? UnknownFieldHandler.onRead : u)(this.typeName, message, fieldNo, wireType, d);
}
}
return message;
}
internalBinaryWrite(message: GetCacheBlobUploadURLResponse_Url, writer: IBinaryWriter, options: BinaryWriteOptions): IBinaryWriter {
/* string key = 1; */
if (message.key !== "")
writer.tag(1, WireType.LengthDelimited).string(message.key);
/* string url = 2; */
if (message.url !== "")
writer.tag(2, WireType.LengthDelimited).string(message.url);
let u = options.writeUnknownFields;
if (u !== false)
(u == true ? UnknownFieldHandler.onWrite : u)(this.typeName, message, writer);
return writer;
}
}
/**
* @generated MessageType for protobuf message github.actions.results.api.v1.GetCacheBlobUploadURLResponse.Url
*/
export const GetCacheBlobUploadURLResponse_Url = new GetCacheBlobUploadURLResponse_Url$Type();
/**
* @generated ServiceType for protobuf service github.actions.results.api.v1.BlobCacheService
*/
export const BlobCacheService = new ServiceType("github.actions.results.api.v1.BlobCacheService", [
{ name: "GetCachedBlob", options: {}, I: GetCachedBlobRequest, O: GetCachedBlobResponse },
{ name: "GetCacheBlobUploadURL", options: {}, I: GetCacheBlobUploadURLRequest, O: GetCacheBlobUploadURLResponse }
]);

View File

@ -1,433 +0,0 @@
import {
TwirpContext,
TwirpServer,
RouterEvents,
TwirpError,
TwirpErrorCode,
Interceptor,
TwirpContentType,
chainInterceptors,
} from "twirp-ts";
import {
GetCachedBlobRequest,
GetCachedBlobResponse,
GetCacheBlobUploadURLRequest,
GetCacheBlobUploadURLResponse,
} from "./blobcache";
//==================================//
// Client Code //
//==================================//
interface Rpc {
request(
service: string,
method: string,
contentType: "application/json" | "application/protobuf",
data: object | Uint8Array
): Promise<object | Uint8Array>;
}
export interface BlobCacheServiceClient {
GetCachedBlob(request: GetCachedBlobRequest): Promise<GetCachedBlobResponse>;
GetCacheBlobUploadURL(
request: GetCacheBlobUploadURLRequest
): Promise<GetCacheBlobUploadURLResponse>;
}
export class BlobCacheServiceClientJSON implements BlobCacheServiceClient {
private readonly rpc: Rpc;
constructor(rpc: Rpc) {
this.rpc = rpc;
this.GetCachedBlob.bind(this);
this.GetCacheBlobUploadURL.bind(this);
}
GetCachedBlob(request: GetCachedBlobRequest): Promise<GetCachedBlobResponse> {
const data = GetCachedBlobRequest.toJson(request, {
useProtoFieldName: true,
emitDefaultValues: false,
});
const promise = this.rpc.request(
"github.actions.results.api.v1.BlobCacheService",
"GetCachedBlob",
"application/json",
data as object
);
return promise.then((data) =>
GetCachedBlobResponse.fromJson(data as any, { ignoreUnknownFields: true })
);
}
GetCacheBlobUploadURL(
request: GetCacheBlobUploadURLRequest
): Promise<GetCacheBlobUploadURLResponse> {
const data = GetCacheBlobUploadURLRequest.toJson(request, {
useProtoFieldName: true,
emitDefaultValues: false,
});
const promise = this.rpc.request(
"github.actions.results.api.v1.BlobCacheService",
"GetCacheBlobUploadURL",
"application/json",
data as object
);
return promise.then((data) =>
GetCacheBlobUploadURLResponse.fromJson(data as any, {
ignoreUnknownFields: true,
})
);
}
}
export class BlobCacheServiceClientProtobuf implements BlobCacheServiceClient {
private readonly rpc: Rpc;
constructor(rpc: Rpc) {
this.rpc = rpc;
this.GetCachedBlob.bind(this);
this.GetCacheBlobUploadURL.bind(this);
}
GetCachedBlob(request: GetCachedBlobRequest): Promise<GetCachedBlobResponse> {
const data = GetCachedBlobRequest.toBinary(request);
const promise = this.rpc.request(
"github.actions.results.api.v1.BlobCacheService",
"GetCachedBlob",
"application/protobuf",
data
);
return promise.then((data) =>
GetCachedBlobResponse.fromBinary(data as Uint8Array)
);
}
GetCacheBlobUploadURL(
request: GetCacheBlobUploadURLRequest
): Promise<GetCacheBlobUploadURLResponse> {
const data = GetCacheBlobUploadURLRequest.toBinary(request);
const promise = this.rpc.request(
"github.actions.results.api.v1.BlobCacheService",
"GetCacheBlobUploadURL",
"application/protobuf",
data
);
return promise.then((data) =>
GetCacheBlobUploadURLResponse.fromBinary(data as Uint8Array)
);
}
}
//==================================//
// Server Code //
//==================================//
export interface BlobCacheServiceTwirp<T extends TwirpContext = TwirpContext> {
GetCachedBlob(
ctx: T,
request: GetCachedBlobRequest
): Promise<GetCachedBlobResponse>;
GetCacheBlobUploadURL(
ctx: T,
request: GetCacheBlobUploadURLRequest
): Promise<GetCacheBlobUploadURLResponse>;
}
export enum BlobCacheServiceMethod {
GetCachedBlob = "GetCachedBlob",
GetCacheBlobUploadURL = "GetCacheBlobUploadURL",
}
export const BlobCacheServiceMethodList = [
BlobCacheServiceMethod.GetCachedBlob,
BlobCacheServiceMethod.GetCacheBlobUploadURL,
];
export function createBlobCacheServiceServer<
T extends TwirpContext = TwirpContext
>(service: BlobCacheServiceTwirp<T>) {
return new TwirpServer<BlobCacheServiceTwirp, T>({
service,
packageName: "github.actions.results.api.v1",
serviceName: "BlobCacheService",
methodList: BlobCacheServiceMethodList,
matchRoute: matchBlobCacheServiceRoute,
});
}
function matchBlobCacheServiceRoute<T extends TwirpContext = TwirpContext>(
method: string,
events: RouterEvents<T>
) {
switch (method) {
case "GetCachedBlob":
return async (
ctx: T,
service: BlobCacheServiceTwirp,
data: Buffer,
interceptors?: Interceptor<
T,
GetCachedBlobRequest,
GetCachedBlobResponse
>[]
) => {
ctx = { ...ctx, methodName: "GetCachedBlob" };
await events.onMatch(ctx);
return handleBlobCacheServiceGetCachedBlobRequest(
ctx,
service,
data,
interceptors
);
};
case "GetCacheBlobUploadURL":
return async (
ctx: T,
service: BlobCacheServiceTwirp,
data: Buffer,
interceptors?: Interceptor<
T,
GetCacheBlobUploadURLRequest,
GetCacheBlobUploadURLResponse
>[]
) => {
ctx = { ...ctx, methodName: "GetCacheBlobUploadURL" };
await events.onMatch(ctx);
return handleBlobCacheServiceGetCacheBlobUploadURLRequest(
ctx,
service,
data,
interceptors
);
};
default:
events.onNotFound();
const msg = `no handler found`;
throw new TwirpError(TwirpErrorCode.BadRoute, msg);
}
}
function handleBlobCacheServiceGetCachedBlobRequest<
T extends TwirpContext = TwirpContext
>(
ctx: T,
service: BlobCacheServiceTwirp,
data: Buffer,
interceptors?: Interceptor<T, GetCachedBlobRequest, GetCachedBlobResponse>[]
): Promise<string | Uint8Array> {
switch (ctx.contentType) {
case TwirpContentType.JSON:
return handleBlobCacheServiceGetCachedBlobJSON<T>(
ctx,
service,
data,
interceptors
);
case TwirpContentType.Protobuf:
return handleBlobCacheServiceGetCachedBlobProtobuf<T>(
ctx,
service,
data,
interceptors
);
default:
const msg = "unexpected Content-Type";
throw new TwirpError(TwirpErrorCode.BadRoute, msg);
}
}
function handleBlobCacheServiceGetCacheBlobUploadURLRequest<
T extends TwirpContext = TwirpContext
>(
ctx: T,
service: BlobCacheServiceTwirp,
data: Buffer,
interceptors?: Interceptor<
T,
GetCacheBlobUploadURLRequest,
GetCacheBlobUploadURLResponse
>[]
): Promise<string | Uint8Array> {
switch (ctx.contentType) {
case TwirpContentType.JSON:
return handleBlobCacheServiceGetCacheBlobUploadURLJSON<T>(
ctx,
service,
data,
interceptors
);
case TwirpContentType.Protobuf:
return handleBlobCacheServiceGetCacheBlobUploadURLProtobuf<T>(
ctx,
service,
data,
interceptors
);
default:
const msg = "unexpected Content-Type";
throw new TwirpError(TwirpErrorCode.BadRoute, msg);
}
}
async function handleBlobCacheServiceGetCachedBlobJSON<
T extends TwirpContext = TwirpContext
>(
ctx: T,
service: BlobCacheServiceTwirp,
data: Buffer,
interceptors?: Interceptor<T, GetCachedBlobRequest, GetCachedBlobResponse>[]
) {
let request: GetCachedBlobRequest;
let response: GetCachedBlobResponse;
try {
const body = JSON.parse(data.toString() || "{}");
request = GetCachedBlobRequest.fromJson(body, {
ignoreUnknownFields: true,
});
} catch (e) {
if (e instanceof Error) {
const msg = "the json request could not be decoded";
throw new TwirpError(TwirpErrorCode.Malformed, msg).withCause(e, true);
}
}
if (interceptors && interceptors.length > 0) {
const interceptor = chainInterceptors(...interceptors) as Interceptor<
T,
GetCachedBlobRequest,
GetCachedBlobResponse
>;
response = await interceptor(ctx, request!, (ctx, inputReq) => {
return service.GetCachedBlob(ctx, inputReq);
});
} else {
response = await service.GetCachedBlob(ctx, request!);
}
return JSON.stringify(
GetCachedBlobResponse.toJson(response, {
useProtoFieldName: true,
emitDefaultValues: false,
}) as string
);
}
async function handleBlobCacheServiceGetCacheBlobUploadURLJSON<
T extends TwirpContext = TwirpContext
>(
ctx: T,
service: BlobCacheServiceTwirp,
data: Buffer,
interceptors?: Interceptor<
T,
GetCacheBlobUploadURLRequest,
GetCacheBlobUploadURLResponse
>[]
) {
let request: GetCacheBlobUploadURLRequest;
let response: GetCacheBlobUploadURLResponse;
try {
const body = JSON.parse(data.toString() || "{}");
request = GetCacheBlobUploadURLRequest.fromJson(body, {
ignoreUnknownFields: true,
});
} catch (e) {
if (e instanceof Error) {
const msg = "the json request could not be decoded";
throw new TwirpError(TwirpErrorCode.Malformed, msg).withCause(e, true);
}
}
if (interceptors && interceptors.length > 0) {
const interceptor = chainInterceptors(...interceptors) as Interceptor<
T,
GetCacheBlobUploadURLRequest,
GetCacheBlobUploadURLResponse
>;
response = await interceptor(ctx, request!, (ctx, inputReq) => {
return service.GetCacheBlobUploadURL(ctx, inputReq);
});
} else {
response = await service.GetCacheBlobUploadURL(ctx, request!);
}
return JSON.stringify(
GetCacheBlobUploadURLResponse.toJson(response, {
useProtoFieldName: true,
emitDefaultValues: false,
}) as string
);
}
async function handleBlobCacheServiceGetCachedBlobProtobuf<
T extends TwirpContext = TwirpContext
>(
ctx: T,
service: BlobCacheServiceTwirp,
data: Buffer,
interceptors?: Interceptor<T, GetCachedBlobRequest, GetCachedBlobResponse>[]
) {
let request: GetCachedBlobRequest;
let response: GetCachedBlobResponse;
try {
request = GetCachedBlobRequest.fromBinary(data);
} catch (e) {
if (e instanceof Error) {
const msg = "the protobuf request could not be decoded";
throw new TwirpError(TwirpErrorCode.Malformed, msg).withCause(e, true);
}
}
if (interceptors && interceptors.length > 0) {
const interceptor = chainInterceptors(...interceptors) as Interceptor<
T,
GetCachedBlobRequest,
GetCachedBlobResponse
>;
response = await interceptor(ctx, request!, (ctx, inputReq) => {
return service.GetCachedBlob(ctx, inputReq);
});
} else {
response = await service.GetCachedBlob(ctx, request!);
}
return Buffer.from(GetCachedBlobResponse.toBinary(response));
}
async function handleBlobCacheServiceGetCacheBlobUploadURLProtobuf<
T extends TwirpContext = TwirpContext
>(
ctx: T,
service: BlobCacheServiceTwirp,
data: Buffer,
interceptors?: Interceptor<
T,
GetCacheBlobUploadURLRequest,
GetCacheBlobUploadURLResponse
>[]
) {
let request: GetCacheBlobUploadURLRequest;
let response: GetCacheBlobUploadURLResponse;
try {
request = GetCacheBlobUploadURLRequest.fromBinary(data);
} catch (e) {
if (e instanceof Error) {
const msg = "the protobuf request could not be decoded";
throw new TwirpError(TwirpErrorCode.Malformed, msg).withCause(e, true);
}
}
if (interceptors && interceptors.length > 0) {
const interceptor = chainInterceptors(...interceptors) as Interceptor<
T,
GetCacheBlobUploadURLRequest,
GetCacheBlobUploadURLResponse
>;
response = await interceptor(ctx, request!, (ctx, inputReq) => {
return service.GetCacheBlobUploadURL(ctx, inputReq);
});
} else {
response = await service.GetCacheBlobUploadURL(ctx, request!);
}
return Buffer.from(GetCacheBlobUploadURLResponse.toBinary(response));
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -1,16 +1,13 @@
import * as core from '@actions/core' import * as core from '@actions/core'
import {HttpClient} from '@actions/http-client' import { HttpClient } from '@actions/http-client'
import {BearerCredentialHandler} from '@actions/http-client/lib/auth' import { BearerCredentialHandler } from '@actions/http-client/lib/auth'
import { import {
RequestOptions, RequestOptions,
TypedResponse TypedResponse
} from '@actions/http-client/lib/interfaces' } from '@actions/http-client/lib/interfaces'
import * as crypto from 'crypto'
import * as fs from 'fs' import * as fs from 'fs'
import {URL} from 'url' import { URL } from 'url'
import * as utils from './cacheUtils' import * as utils from './cacheUtils'
import {CompressionMethod} from './constants'
import { import {
ArtifactCacheEntry, ArtifactCacheEntry,
InternalCacheOptions, InternalCacheOptions,
@ -36,9 +33,7 @@ import {
retryHttpClientResponse, retryHttpClientResponse,
retryTypedResponse retryTypedResponse
} from './requestUtils' } from './requestUtils'
import {CacheUrl} from './constants' import { CacheUrl } from './constants'
const versionSalt = '1.0'
function getCacheApiUrl(resource: string): string { function getCacheApiUrl(resource: string): string {
const baseUrl: string = CacheUrl || '' const baseUrl: string = CacheUrl || ''
@ -76,43 +71,18 @@ function createHttpClient(): HttpClient {
) )
} }
export function getCacheVersion(
paths: string[],
compressionMethod?: CompressionMethod,
enableCrossOsArchive = false
): string {
// don't pass changes upstream
const components = paths.slice()
// Add compression method to cache version to restore
// compressed cache as per compression method
if (compressionMethod) {
components.push(compressionMethod)
}
// Only check for windows platforms if enableCrossOsArchive is false
if (process.platform === 'win32' && !enableCrossOsArchive) {
components.push('windows-only')
}
// Add salt to cache version to support breaking changes in cache entry
components.push(versionSalt)
return crypto.createHash('sha256').update(components.join('|')).digest('hex')
}
export async function getCacheEntry( export async function getCacheEntry(
keys: string[], keys: string[],
paths: string[], paths: string[],
options?: InternalCacheOptions options?: InternalCacheOptions
): Promise<ArtifactCacheEntry | null> { ): Promise<ArtifactCacheEntry | null> {
const httpClient = createHttpClient() const httpClient = createHttpClient()
const version = getCacheVersion( const version = utils.getCacheVersion(
paths, paths,
options?.compressionMethod, options?.compressionMethod,
options?.enableCrossOsArchive options?.enableCrossOsArchive
) )
const resource = `cache?keys=${encodeURIComponent( const resource = `cache?keys=${encodeURIComponent(
keys.join(',') keys.join(',')
)}&version=${version}` )}&version=${version}`
@ -209,7 +179,7 @@ export async function reserveCache(
options?: InternalCacheOptions options?: InternalCacheOptions
): Promise<ITypedResponseWithError<ReserveCacheResponse>> { ): Promise<ITypedResponseWithError<ReserveCacheResponse>> {
const httpClient = createHttpClient() const httpClient = createHttpClient()
const version = getCacheVersion( const version = utils.getCacheVersion(
paths, paths,
options?.compressionMethod, options?.compressionMethod,
options?.enableCrossOsArchive options?.enableCrossOsArchive
@ -246,8 +216,7 @@ async function uploadChunk(
end: number end: number
): Promise<void> { ): Promise<void> {
core.debug( core.debug(
`Uploading chunk of size ${ `Uploading chunk of size ${end - start + 1
end - start + 1
} bytes at offset ${start} with content range: ${getContentRange( } bytes at offset ${start} with content range: ${getContentRange(
start, start,
end end
@ -343,7 +312,7 @@ async function commitCache(
cacheId: number, cacheId: number,
filesize: number filesize: number
): Promise<TypedResponse<null>> { ): Promise<TypedResponse<null>> {
const commitCacheRequest: CommitCacheRequest = {size: filesize} const commitCacheRequest: CommitCacheRequest = { size: filesize }
return await retryTypedResponse('commitCache', async () => return await retryTypedResponse('commitCache', async () =>
httpClient.postJson<null>( httpClient.postJson<null>(
getCacheApiUrl(`caches/${cacheId.toString()}`), getCacheApiUrl(`caches/${cacheId.toString()}`),

View File

@ -1,197 +1,196 @@
import {HttpClient, HttpClientResponse, HttpCodes} from '@actions/http-client' import { HttpClient, HttpClientResponse, HttpCodes } from '@actions/http-client'
import {BearerCredentialHandler} from '@actions/http-client/lib/auth' import { BearerCredentialHandler } from '@actions/http-client/lib/auth'
import {info, debug} from '@actions/core' import { info, debug } from '@actions/core'
import {BlobCacheServiceClientJSON} from '../generated/results/api/v1/blobcache.twirp' import { CacheServiceClientJSON } from '../generated/results/api/v1/cache.twirp'
import {CacheUrl} from './constants' import { CacheUrl } from './constants'
import {getRuntimeToken} from './config' import { getRuntimeToken } from './config'
// import {getUserAgentString} from './user-agent' // import {getUserAgentString} from './user-agent'
// import {NetworkError, UsageError} from './errors' // import {NetworkError, UsageError} from './errors'
// The twirp http client must implement this interface // The twirp http client must implement this interface
interface Rpc { interface Rpc {
request( request(
service: string, service: string,
method: string, method: string,
contentType: 'application/json' | 'application/protobuf', contentType: 'application/json' | 'application/protobuf',
data: object | Uint8Array data: object | Uint8Array
): Promise<object | Uint8Array> ): Promise<object | Uint8Array>
} }
class BlobCacheServiceClient implements Rpc { class CacheServiceClient implements Rpc {
private httpClient: HttpClient private httpClient: HttpClient
private baseUrl: string private baseUrl: string
private maxAttempts = 5 private maxAttempts = 5
private baseRetryIntervalMilliseconds = 3000 private baseRetryIntervalMilliseconds = 3000
private retryMultiplier = 1.5 private retryMultiplier = 1.5
constructor( constructor(
userAgent: string, userAgent: string,
maxAttempts?: number, maxAttempts?: number,
baseRetryIntervalMilliseconds?: number, baseRetryIntervalMilliseconds?: number,
retryMultiplier?: number
) {
const token = getRuntimeToken()
this.baseUrl = CacheUrl
if (maxAttempts) {
this.maxAttempts = maxAttempts
}
if (baseRetryIntervalMilliseconds) {
this.baseRetryIntervalMilliseconds = baseRetryIntervalMilliseconds
}
if (retryMultiplier) {
this.retryMultiplier = retryMultiplier
}
this.httpClient = new HttpClient(userAgent, [
new BearerCredentialHandler(token)
])
}
// This function satisfies the Rpc interface. It is compatible with the JSON
// JSON generated client.
async request(
service: string,
method: string,
contentType: 'application/json' | 'application/protobuf',
data: object | Uint8Array
): Promise<object | Uint8Array> {
const url = new URL(`/twirp/${service}/${method}`, this.baseUrl).href
debug(`[Request] ${method} ${url}`)
const headers = {
'Content-Type': contentType
}
try {
const { body } = await this.retryableRequest(async () =>
this.httpClient.post(url, JSON.stringify(data), headers)
)
return body
} catch (error) {
throw new Error(`Failed to ${method}: ${error.message}`)
}
}
async retryableRequest(
operation: () => Promise<HttpClientResponse>
): Promise<{ response: HttpClientResponse; body: object }> {
let attempt = 0
let errorMessage = ''
let rawBody = ''
while (attempt < this.maxAttempts) {
let isRetryable = false
try {
const response = await operation()
const statusCode = response.message.statusCode
rawBody = await response.readBody()
debug(`[Response] - ${response.message.statusCode}`)
debug(`Headers: ${JSON.stringify(response.message.headers, null, 2)}`)
const body = JSON.parse(rawBody)
debug(`Body: ${JSON.stringify(body, null, 2)}`)
if (this.isSuccessStatusCode(statusCode)) {
return { response, body }
}
isRetryable = this.isRetryableHttpStatusCode(statusCode)
errorMessage = `Failed request: (${statusCode}) ${response.message.statusMessage}`
if (body.msg) {
// if (UsageError.isUsageErrorMessage(body.msg)) {
// throw new UsageError()
// }
errorMessage = `${errorMessage}: ${body.msg}`
}
} catch (error) {
if (error instanceof SyntaxError) {
debug(`Raw Body: ${rawBody}`)
}
// if (error instanceof UsageError) {
// throw error
// }
// if (NetworkError.isNetworkErrorCode(error?.code)) {
// throw new NetworkError(error?.code)
// }
isRetryable = true
errorMessage = error.message
}
if (!isRetryable) {
throw new Error(`Received non-retryable error: ${errorMessage}`)
}
if (attempt + 1 === this.maxAttempts) {
throw new Error(
`Failed to make request after ${this.maxAttempts} attempts: ${errorMessage}`
)
}
const retryTimeMilliseconds =
this.getExponentialRetryTimeMilliseconds(attempt)
info(
`Attempt ${attempt + 1} of ${this.maxAttempts
} failed with error: ${errorMessage}. Retrying request in ${retryTimeMilliseconds} ms...`
)
await this.sleep(retryTimeMilliseconds)
attempt++
}
throw new Error(`Request failed`)
}
isSuccessStatusCode(statusCode?: number): boolean {
if (!statusCode) return false
return statusCode >= 200 && statusCode < 300
}
isRetryableHttpStatusCode(statusCode?: number): boolean {
if (!statusCode) return false
const retryableStatusCodes = [
HttpCodes.BadGateway,
HttpCodes.GatewayTimeout,
HttpCodes.InternalServerError,
HttpCodes.ServiceUnavailable,
HttpCodes.TooManyRequests
]
return retryableStatusCodes.includes(statusCode)
}
async sleep(milliseconds: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, milliseconds))
}
getExponentialRetryTimeMilliseconds(attempt: number): number {
if (attempt < 0) {
throw new Error('attempt should be a positive integer')
}
if (attempt === 0) {
return this.baseRetryIntervalMilliseconds
}
const minTime =
this.baseRetryIntervalMilliseconds * this.retryMultiplier ** attempt
const maxTime = minTime * this.retryMultiplier
// returns a random number between minTime and maxTime (exclusive)
return Math.trunc(Math.random() * (maxTime - minTime) + minTime)
}
}
export function internalCacheTwirpClient(options?: {
maxAttempts?: number
retryIntervalMs?: number
retryMultiplier?: number retryMultiplier?: number
) { }): CacheServiceClientJSON {
const token = getRuntimeToken() const client = new CacheServiceClient(
this.baseUrl = CacheUrl 'actions/cache',
if (maxAttempts) { options?.maxAttempts,
this.maxAttempts = maxAttempts options?.retryIntervalMs,
} options?.retryMultiplier
if (baseRetryIntervalMilliseconds) { )
this.baseRetryIntervalMilliseconds = baseRetryIntervalMilliseconds return new CacheServiceClientJSON(client)
}
if (retryMultiplier) {
this.retryMultiplier = retryMultiplier
}
this.httpClient = new HttpClient(userAgent, [
new BearerCredentialHandler(token)
])
}
// This function satisfies the Rpc interface. It is compatible with the JSON
// JSON generated client.
async request(
service: string,
method: string,
contentType: 'application/json' | 'application/protobuf',
data: object | Uint8Array
): Promise<object | Uint8Array> {
const url = new URL(`/twirp/${service}/${method}`, this.baseUrl).href
debug(`[Request] ${method} ${url}`)
const headers = {
'Content-Type': contentType
}
try {
const {body} = await this.retryableRequest(async () =>
this.httpClient.post(url, JSON.stringify(data), headers)
)
return body
} catch (error) {
throw new Error(`Failed to ${method}: ${error.message}`)
}
}
async retryableRequest(
operation: () => Promise<HttpClientResponse>
): Promise<{response: HttpClientResponse; body: object}> {
let attempt = 0
let errorMessage = ''
let rawBody = ''
while (attempt < this.maxAttempts) {
let isRetryable = false
try {
const response = await operation()
const statusCode = response.message.statusCode
rawBody = await response.readBody()
debug(`[Response] - ${response.message.statusCode}`)
debug(`Headers: ${JSON.stringify(response.message.headers, null, 2)}`)
const body = JSON.parse(rawBody)
debug(`Body: ${JSON.stringify(body, null, 2)}`)
if (this.isSuccessStatusCode(statusCode)) {
return {response, body}
}
isRetryable = this.isRetryableHttpStatusCode(statusCode)
errorMessage = `Failed request: (${statusCode}) ${response.message.statusMessage}`
if (body.msg) {
// if (UsageError.isUsageErrorMessage(body.msg)) {
// throw new UsageError()
// }
errorMessage = `${errorMessage}: ${body.msg}`
}
} catch (error) {
if (error instanceof SyntaxError) {
debug(`Raw Body: ${rawBody}`)
}
// if (error instanceof UsageError) {
// throw error
// }
// if (NetworkError.isNetworkErrorCode(error?.code)) {
// throw new NetworkError(error?.code)
// }
isRetryable = true
errorMessage = error.message
}
if (!isRetryable) {
throw new Error(`Received non-retryable error: ${errorMessage}`)
}
if (attempt + 1 === this.maxAttempts) {
throw new Error(
`Failed to make request after ${this.maxAttempts} attempts: ${errorMessage}`
)
}
const retryTimeMilliseconds =
this.getExponentialRetryTimeMilliseconds(attempt)
info(
`Attempt ${attempt + 1} of ${
this.maxAttempts
} failed with error: ${errorMessage}. Retrying request in ${retryTimeMilliseconds} ms...`
)
await this.sleep(retryTimeMilliseconds)
attempt++
}
throw new Error(`Request failed`)
}
isSuccessStatusCode(statusCode?: number): boolean {
if (!statusCode) return false
return statusCode >= 200 && statusCode < 300
}
isRetryableHttpStatusCode(statusCode?: number): boolean {
if (!statusCode) return false
const retryableStatusCodes = [
HttpCodes.BadGateway,
HttpCodes.GatewayTimeout,
HttpCodes.InternalServerError,
HttpCodes.ServiceUnavailable,
HttpCodes.TooManyRequests
]
return retryableStatusCodes.includes(statusCode)
}
async sleep(milliseconds: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, milliseconds))
}
getExponentialRetryTimeMilliseconds(attempt: number): number {
if (attempt < 0) {
throw new Error('attempt should be a positive integer')
}
if (attempt === 0) {
return this.baseRetryIntervalMilliseconds
}
const minTime =
this.baseRetryIntervalMilliseconds * this.retryMultiplier ** attempt
const maxTime = minTime * this.retryMultiplier
// returns a random number between minTime and maxTime (exclusive)
return Math.trunc(Math.random() * (maxTime - minTime) + minTime)
}
}
export function internalBlobCacheTwirpClient(options?: {
maxAttempts?: number
retryIntervalMs?: number
retryMultiplier?: number
}): BlobCacheServiceClientJSON {
const client = new BlobCacheServiceClient(
'actions/cache',
options?.maxAttempts,
options?.retryIntervalMs,
options?.retryMultiplier
)
return new BlobCacheServiceClientJSON(client)
} }

View File

@ -6,13 +6,16 @@ import * as fs from 'fs'
import * as path from 'path' import * as path from 'path'
import * as semver from 'semver' import * as semver from 'semver'
import * as util from 'util' import * as util from 'util'
import {v4 as uuidV4} from 'uuid' import { v4 as uuidV4 } from 'uuid'
import * as crypto from 'crypto'
import { import {
CacheFilename, CacheFilename,
CompressionMethod, CompressionMethod,
GnuTarPathOnWindows GnuTarPathOnWindows
} from './constants' } from './constants'
const versionSalt = '1.0'
// From https://github.com/actions/toolkit/blob/main/packages/tool-cache/src/tool-cache.ts#L23 // From https://github.com/actions/toolkit/blob/main/packages/tool-cache/src/tool-cache.ts#L23
export async function createTempDirectory(): Promise<string> { export async function createTempDirectory(): Promise<string> {
const IS_WINDOWS = process.platform === 'win32' const IS_WINDOWS = process.platform === 'win32'
@ -143,3 +146,28 @@ export function isGhes(): boolean {
return !isGitHubHost && !isGheHost return !isGitHubHost && !isGheHost
} }
export function getCacheVersion(
paths: string[],
compressionMethod?: CompressionMethod,
enableCrossOsArchive = false
): string {
// don't pass changes upstream
const components = paths.slice()
// Add compression method to cache version to restore
// compressed cache as per compression method
if (compressionMethod) {
components.push(compressionMethod)
}
// Only check for windows platforms if enableCrossOsArchive is false
if (process.platform === 'win32' && !enableCrossOsArchive) {
components.push('windows-only')
}
// Add salt to cache version to support breaking changes in cache entry
components.push(versionSalt)
return crypto.createHash('sha256').update(components.join('|')).digest('hex')
}

View File

@ -1,13 +1,14 @@
import * as core from '@actions/core' import * as core from '@actions/core'
import {GetCacheBlobUploadURLResponse} from '../../generated/results/api/v1/blobcache' import { CreateCacheEntryResponse } from '../../generated/results/api/v1/cache'
import {ZipUploadStream} from '@actions/artifact/lib/internal/upload/zip' import { ZipUploadStream } from '@actions/artifact/lib/internal/upload/zip'
import {NetworkError} from '@actions/artifact/' import { NetworkError } from '@actions/artifact/'
import {TransferProgressEvent} from '@azure/core-http' import { TransferProgressEvent } from '@azure/core-http'
import * as stream from 'stream' import * as stream from 'stream'
import * as crypto from 'crypto' import * as crypto from 'crypto'
import { import {
BlobClient, BlobClient,
BlockBlobClient, BlockBlobClient,
BlockBlobUploadStreamOptions, BlockBlobUploadStreamOptions,
BlockBlobParallelUploadOptions BlockBlobParallelUploadOptions
} from '@azure/storage-blob' } from '@azure/storage-blob'
@ -55,7 +56,7 @@ export async function UploadCacheStream(
} }
const options: BlockBlobUploadStreamOptions = { const options: BlockBlobUploadStreamOptions = {
blobHTTPHeaders: {blobContentType: 'zip'}, blobHTTPHeaders: { blobContentType: 'zip' },
onProgress: uploadCallback onProgress: uploadCallback
} }
@ -89,7 +90,7 @@ export async function UploadCacheStream(
} }
core.info('Finished uploading cache content to blob storage!') core.info('Finished uploading cache content to blob storage!')
hashStream.end() hashStream.end()
sha256Hash = hashStream.read() as string sha256Hash = hashStream.read() as string
core.info(`SHA256 hash of uploaded artifact zip is ${sha256Hash}`) core.info(`SHA256 hash of uploaded artifact zip is ${sha256Hash}`)
@ -107,11 +108,11 @@ export async function UploadCacheStream(
} }
export async function UploadCacheFile( export async function UploadCacheFile(
uploadURL: GetCacheBlobUploadURLResponse, uploadURL: CreateCacheEntryResponse,
archivePath: string, archivePath: string,
): Promise<{}> { ): Promise<{}> {
core.info(`Uploading ${archivePath} to: ${JSON.stringify(uploadURL)}`) core.info(`Uploading ${archivePath} to: ${JSON.stringify(uploadURL)}`)
// Specify data transfer options // Specify data transfer options
const uploadOptions: BlockBlobParallelUploadOptions = { const uploadOptions: BlockBlobParallelUploadOptions = {
blockSize: 4 * 1024 * 1024, // 4 MiB max block size blockSize: 4 * 1024 * 1024, // 4 MiB max block size
@ -119,8 +120,7 @@ export async function UploadCacheFile(
maxSingleShotSize: 8 * 1024 * 1024, // 8 MiB initial transfer size maxSingleShotSize: 8 * 1024 * 1024, // 8 MiB initial transfer size
}; };
// const blobClient: BlobClient = new BlobClient(uploadURL.urls[0]) const blobClient: BlobClient = new BlobClient(uploadURL.signedUploadUrl)
const blobClient: BlobClient = new BlobClient(uploadURL.urls[0].url)
const blockBlobClient: BlockBlobClient = blobClient.getBlockBlobClient() const blockBlobClient: BlockBlobClient = blobClient.getBlockBlobClient()
core.info(`BlobClient: ${JSON.stringify(blobClient)}`) core.info(`BlobClient: ${JSON.stringify(blobClient)}`)