1
0
Fork 0

Merge pull request #1486 from actions/bethanyj28/add-twirp-client

Add twirp client
pull/1488/head
Bethany 2023-08-09 10:46:24 -04:00 committed by GitHub
commit 92695f58da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 422 additions and 34 deletions

View File

@ -0,0 +1,200 @@
import * as http from 'http'
import * as net from 'net'
import {HttpClient} from '@actions/http-client'
import * as config from '../src/internal/shared/config'
import {createArtifactTwirpClient} from '../src/internal/shared/artifact-twirp-client'
import * as core from '@actions/core'
jest.mock('@actions/http-client')
describe('artifact-http-client', () => {
beforeAll(() => {
// mock all output so that there is less noise when running tests
jest.spyOn(console, 'log').mockImplementation(() => {})
jest.spyOn(core, 'debug').mockImplementation(() => {})
jest.spyOn(core, 'info').mockImplementation(() => {})
jest.spyOn(core, 'warning').mockImplementation(() => {})
jest
.spyOn(config, 'getResultsServiceUrl')
.mockReturnValue('http://localhost:8080')
jest.spyOn(config, 'getRuntimeToken').mockReturnValue('token')
})
beforeEach(() => {
jest.clearAllMocks()
})
it('should successfully create a client', () => {
const client = createArtifactTwirpClient('upload')
expect(client).toBeDefined()
})
it('should make a request', async () => {
const mockPost = jest.fn(() => {
const msg = new http.IncomingMessage(new net.Socket())
msg.statusCode = 200
return {
message: msg,
readBody: async () => {
return Promise.resolve(
`{"ok": true, "signedUploadUrl": "http://localhost:8080/upload"}`
)
}
}
})
const mockHttpClient = (
HttpClient as unknown as jest.Mock
).mockImplementation(() => {
return {
post: mockPost
}
})
const client = createArtifactTwirpClient('upload')
const artifact = await client.CreateArtifact({
workflowRunBackendId: '1234',
workflowJobRunBackendId: '5678',
name: 'artifact',
version: 4
})
expect(mockHttpClient).toHaveBeenCalledTimes(1)
expect(mockPost).toHaveBeenCalledTimes(1)
expect(artifact).toBeDefined()
expect(artifact.ok).toBe(true)
expect(artifact.signedUploadUrl).toBe('http://localhost:8080/upload')
})
it('should retry if the request fails', async () => {
const mockPost = jest
.fn(() => {
const msgSucceeded = new http.IncomingMessage(new net.Socket())
msgSucceeded.statusCode = 200
return {
message: msgSucceeded,
readBody: async () => {
return Promise.resolve(
`{"ok": true, "signedUploadUrl": "http://localhost:8080/upload"}`
)
}
}
})
.mockImplementationOnce(() => {
const msgFailed = new http.IncomingMessage(new net.Socket())
msgFailed.statusCode = 500
msgFailed.statusMessage = 'Internal Server Error'
return {
message: msgFailed,
readBody: async () => {
return Promise.resolve(`{"ok": false}`)
}
}
})
const mockHttpClient = (
HttpClient as unknown as jest.Mock
).mockImplementation(() => {
return {
post: mockPost
}
})
const client = createArtifactTwirpClient(
'upload',
5, // retry 5 times
1, // wait 1 ms
1.5 // backoff factor
)
const artifact = await client.CreateArtifact({
workflowRunBackendId: '1234',
workflowJobRunBackendId: '5678',
name: 'artifact',
version: 4
})
expect(mockHttpClient).toHaveBeenCalledTimes(1)
expect(artifact).toBeDefined()
expect(artifact.ok).toBe(true)
expect(artifact.signedUploadUrl).toBe('http://localhost:8080/upload')
expect(mockPost).toHaveBeenCalledTimes(2)
})
it('should fail if the request fails 5 times', async () => {
const mockPost = jest.fn(() => {
const msgFailed = new http.IncomingMessage(new net.Socket())
msgFailed.statusCode = 500
msgFailed.statusMessage = 'Internal Server Error'
return {
message: msgFailed,
readBody: async () => {
return Promise.resolve(`{"ok": false}`)
}
}
})
const mockHttpClient = (
HttpClient as unknown as jest.Mock
).mockImplementation(() => {
return {
post: mockPost
}
})
const client = createArtifactTwirpClient(
'upload',
5, // retry 5 times
1, // wait 1 ms
1.5 // backoff factor
)
await expect(async () => {
await client.CreateArtifact({
workflowRunBackendId: '1234',
workflowJobRunBackendId: '5678',
name: 'artifact',
version: 4
})
}).rejects.toThrowError(
'Failed to make request after 5 attempts: Failed request: (500) Internal Server Error'
)
expect(mockHttpClient).toHaveBeenCalledTimes(1)
expect(mockPost).toHaveBeenCalledTimes(5)
})
it('should fail immediately if there is a non-retryable error', async () => {
const mockPost = jest.fn(() => {
const msgFailed = new http.IncomingMessage(new net.Socket())
msgFailed.statusCode = 401
msgFailed.statusMessage = 'Unauthorized'
return {
message: msgFailed,
readBody: async () => {
return Promise.resolve(`{"ok": false}`)
}
}
})
const mockHttpClient = (
HttpClient as unknown as jest.Mock
).mockImplementation(() => {
return {
post: mockPost
}
})
const client = createArtifactTwirpClient(
'upload',
5, // retry 5 times
1, // wait 1 ms
1.5 // backoff factor
)
await expect(async () => {
await client.CreateArtifact({
workflowRunBackendId: '1234',
workflowJobRunBackendId: '5678',
name: 'artifact',
version: 4
})
}).rejects.toThrowError(
'Received non-retryable error: Failed request: (401) Unauthorized'
)
expect(mockHttpClient).toHaveBeenCalledTimes(1)
expect(mockPost).toHaveBeenCalledTimes(1)
})
})

View File

@ -1,4 +1,4 @@
// @generated by protobuf-ts 2.2.3-alpha.1 with parameter client_none,generate_dependencies
// @generated by protobuf-ts 2.9.1 with parameter long_type_string,client_none,generate_dependencies
// @generated from protobuf file "google/protobuf/timestamp.proto" (package "google.protobuf", syntax proto3)
// tslint:disable
//
@ -139,7 +139,7 @@ export interface Timestamp {
*
* @generated from protobuf field: int64 seconds = 1;
*/
seconds: bigint;
seconds: string;
/**
* Non-negative fractions of a second at nanosecond resolution. Negative
* second values with fractions must still have non-negative nanos values
@ -154,7 +154,7 @@ export interface Timestamp {
class Timestamp$Type extends MessageType<Timestamp> {
constructor() {
super("google.protobuf.Timestamp", [
{ no: 1, name: "seconds", kind: "scalar", T: 3 /*ScalarType.INT64*/, L: 0 /*LongType.BIGINT*/ },
{ no: 1, name: "seconds", kind: "scalar", T: 3 /*ScalarType.INT64*/ },
{ no: 2, name: "nanos", kind: "scalar", T: 5 /*ScalarType.INT32*/ }
]);
}
@ -164,7 +164,7 @@ class Timestamp$Type extends MessageType<Timestamp> {
now(): Timestamp {
const msg = this.create();
const ms = Date.now();
msg.seconds = PbLong.from(Math.floor(ms / 1000)).toBigInt();
msg.seconds = PbLong.from(Math.floor(ms / 1000)).toString();
msg.nanos = (ms % 1000) * 1000000;
return msg;
}
@ -180,7 +180,7 @@ class Timestamp$Type extends MessageType<Timestamp> {
fromDate(date: Date): Timestamp {
const msg = this.create();
const ms = date.getTime();
msg.seconds = PbLong.from(Math.floor(ms / 1000)).toBigInt();
msg.seconds = PbLong.from(Math.floor(ms / 1000)).toString();
msg.nanos = (ms % 1000) * 1000000;
return msg;
}
@ -223,14 +223,14 @@ class Timestamp$Type extends MessageType<Timestamp> {
throw new globalThis.Error("Unable to parse Timestamp from JSON. Must be from 0001-01-01T00:00:00Z to 9999-12-31T23:59:59Z inclusive.");
if (!target)
target = this.create();
target.seconds = PbLong.from(ms / 1000).toBigInt();
target.seconds = PbLong.from(ms / 1000).toString();
target.nanos = 0;
if (matches[7])
target.nanos = (parseInt("1" + matches[7] + "0".repeat(9 - matches[7].length)) - 1000000000);
return target;
}
create(value?: PartialMessage<Timestamp>): Timestamp {
const message = { seconds: 0n, nanos: 0 };
const message = { seconds: "0", nanos: 0 };
globalThis.Object.defineProperty(message, MESSAGE_TYPE, { enumerable: false, value: this });
if (value !== undefined)
reflectionMergePartial<Timestamp>(this, message, value);
@ -242,7 +242,7 @@ class Timestamp$Type extends MessageType<Timestamp> {
let [fieldNo, wireType] = reader.tag();
switch (fieldNo) {
case /* int64 seconds */ 1:
message.seconds = reader.int64().toBigInt();
message.seconds = reader.int64().toString();
break;
case /* int32 nanos */ 2:
message.nanos = reader.int32();
@ -260,7 +260,7 @@ class Timestamp$Type extends MessageType<Timestamp> {
}
internalBinaryWrite(message: Timestamp, writer: IBinaryWriter, options: BinaryWriteOptions): IBinaryWriter {
/* int64 seconds = 1; */
if (message.seconds !== 0n)
if (message.seconds !== "0")
writer.tag(1, WireType.Varint).int64(message.seconds);
/* int32 nanos = 2; */
if (message.nanos !== 0)

View File

@ -1,4 +1,4 @@
// @generated by protobuf-ts 2.2.3-alpha.1 with parameter client_none,generate_dependencies
// @generated by protobuf-ts 2.9.1 with parameter long_type_string,client_none,generate_dependencies
// @generated from protobuf file "google/protobuf/wrappers.proto" (package "google.protobuf", syntax proto3)
// tslint:disable
//
@ -96,7 +96,7 @@ export interface Int64Value {
*
* @generated from protobuf field: int64 value = 1;
*/
value: bigint;
value: string;
}
/**
* Wrapper message for `uint64`.
@ -111,7 +111,7 @@ export interface UInt64Value {
*
* @generated from protobuf field: uint64 value = 1;
*/
value: bigint;
value: string;
}
/**
* Wrapper message for `int32`.
@ -316,7 +316,7 @@ export const FloatValue = new FloatValue$Type();
class Int64Value$Type extends MessageType<Int64Value> {
constructor() {
super("google.protobuf.Int64Value", [
{ no: 1, name: "value", kind: "scalar", T: 3 /*ScalarType.INT64*/, L: 0 /*LongType.BIGINT*/ }
{ no: 1, name: "value", kind: "scalar", T: 3 /*ScalarType.INT64*/ }
]);
}
/**
@ -331,11 +331,11 @@ class Int64Value$Type extends MessageType<Int64Value> {
internalJsonRead(json: JsonValue, options: JsonReadOptions, target?: Int64Value): Int64Value {
if (!target)
target = this.create();
target.value = this.refJsonReader.scalar(json, ScalarType.INT64, LongType.BIGINT, "value") as any;
target.value = this.refJsonReader.scalar(json, ScalarType.INT64, LongType.STRING, "value") as any;
return target;
}
create(value?: PartialMessage<Int64Value>): Int64Value {
const message = { value: 0n };
const message = { value: "0" };
globalThis.Object.defineProperty(message, MESSAGE_TYPE, { enumerable: false, value: this });
if (value !== undefined)
reflectionMergePartial<Int64Value>(this, message, value);
@ -347,7 +347,7 @@ class Int64Value$Type extends MessageType<Int64Value> {
let [fieldNo, wireType] = reader.tag();
switch (fieldNo) {
case /* int64 value */ 1:
message.value = reader.int64().toBigInt();
message.value = reader.int64().toString();
break;
default:
let u = options.readUnknownField;
@ -362,7 +362,7 @@ class Int64Value$Type extends MessageType<Int64Value> {
}
internalBinaryWrite(message: Int64Value, writer: IBinaryWriter, options: BinaryWriteOptions): IBinaryWriter {
/* int64 value = 1; */
if (message.value !== 0n)
if (message.value !== "0")
writer.tag(1, WireType.Varint).int64(message.value);
let u = options.writeUnknownFields;
if (u !== false)
@ -378,7 +378,7 @@ export const Int64Value = new Int64Value$Type();
class UInt64Value$Type extends MessageType<UInt64Value> {
constructor() {
super("google.protobuf.UInt64Value", [
{ no: 1, name: "value", kind: "scalar", T: 4 /*ScalarType.UINT64*/, L: 0 /*LongType.BIGINT*/ }
{ no: 1, name: "value", kind: "scalar", T: 4 /*ScalarType.UINT64*/ }
]);
}
/**
@ -393,11 +393,11 @@ class UInt64Value$Type extends MessageType<UInt64Value> {
internalJsonRead(json: JsonValue, options: JsonReadOptions, target?: UInt64Value): UInt64Value {
if (!target)
target = this.create();
target.value = this.refJsonReader.scalar(json, ScalarType.UINT64, LongType.BIGINT, "value") as any;
target.value = this.refJsonReader.scalar(json, ScalarType.UINT64, LongType.STRING, "value") as any;
return target;
}
create(value?: PartialMessage<UInt64Value>): UInt64Value {
const message = { value: 0n };
const message = { value: "0" };
globalThis.Object.defineProperty(message, MESSAGE_TYPE, { enumerable: false, value: this });
if (value !== undefined)
reflectionMergePartial<UInt64Value>(this, message, value);
@ -409,7 +409,7 @@ class UInt64Value$Type extends MessageType<UInt64Value> {
let [fieldNo, wireType] = reader.tag();
switch (fieldNo) {
case /* uint64 value */ 1:
message.value = reader.uint64().toBigInt();
message.value = reader.uint64().toString();
break;
default:
let u = options.readUnknownField;
@ -424,7 +424,7 @@ class UInt64Value$Type extends MessageType<UInt64Value> {
}
internalBinaryWrite(message: UInt64Value, writer: IBinaryWriter, options: BinaryWriteOptions): IBinaryWriter {
/* uint64 value = 1; */
if (message.value !== 0n)
if (message.value !== "0")
writer.tag(1, WireType.Varint).uint64(message.value);
let u = options.writeUnknownFields;
if (u !== false)

View File

@ -1,4 +1,4 @@
// @generated by protobuf-ts 2.2.3-alpha.1 with parameter client_none,generate_dependencies
// @generated by protobuf-ts 2.9.1 with parameter long_type_string,client_none,generate_dependencies
// @generated from protobuf file "results/api/v1/artifact.proto" (package "github.actions.results.api.v1", syntax proto3)
// tslint:disable
import { ServiceType } from "@protobuf-ts/runtime-rpc";
@ -71,7 +71,7 @@ export interface FinalizeArtifactRequest {
/**
* @generated from protobuf field: int64 size = 4;
*/
size: bigint;
size: string;
/**
* @generated from protobuf field: google.protobuf.StringValue hash = 5;
*/
@ -88,7 +88,7 @@ export interface FinalizeArtifactResponse {
/**
* @generated from protobuf field: int64 artifact_id = 2;
*/
artifactId: bigint;
artifactId: string;
}
// @generated message type with reflection information, may provide speed optimized methods
class CreateArtifactRequest$Type extends MessageType<CreateArtifactRequest> {
@ -226,12 +226,12 @@ class FinalizeArtifactRequest$Type extends MessageType<FinalizeArtifactRequest>
{ no: 1, name: "workflow_run_backend_id", kind: "scalar", T: 9 /*ScalarType.STRING*/ },
{ no: 2, name: "workflow_job_run_backend_id", kind: "scalar", T: 9 /*ScalarType.STRING*/ },
{ no: 3, name: "name", kind: "scalar", T: 9 /*ScalarType.STRING*/ },
{ no: 4, name: "size", kind: "scalar", T: 3 /*ScalarType.INT64*/, L: 0 /*LongType.BIGINT*/ },
{ no: 4, name: "size", kind: "scalar", T: 3 /*ScalarType.INT64*/ },
{ no: 5, name: "hash", kind: "message", T: () => StringValue }
]);
}
create(value?: PartialMessage<FinalizeArtifactRequest>): FinalizeArtifactRequest {
const message = { workflowRunBackendId: "", workflowJobRunBackendId: "", name: "", size: 0n };
const message = { workflowRunBackendId: "", workflowJobRunBackendId: "", name: "", size: "0" };
globalThis.Object.defineProperty(message, MESSAGE_TYPE, { enumerable: false, value: this });
if (value !== undefined)
reflectionMergePartial<FinalizeArtifactRequest>(this, message, value);
@ -252,7 +252,7 @@ class FinalizeArtifactRequest$Type extends MessageType<FinalizeArtifactRequest>
message.name = reader.string();
break;
case /* int64 size */ 4:
message.size = reader.int64().toBigInt();
message.size = reader.int64().toString();
break;
case /* google.protobuf.StringValue hash */ 5:
message.hash = StringValue.internalBinaryRead(reader, reader.uint32(), options, message.hash);
@ -279,7 +279,7 @@ class FinalizeArtifactRequest$Type extends MessageType<FinalizeArtifactRequest>
if (message.name !== "")
writer.tag(3, WireType.LengthDelimited).string(message.name);
/* int64 size = 4; */
if (message.size !== 0n)
if (message.size !== "0")
writer.tag(4, WireType.Varint).int64(message.size);
/* google.protobuf.StringValue hash = 5; */
if (message.hash)
@ -299,11 +299,11 @@ class FinalizeArtifactResponse$Type extends MessageType<FinalizeArtifactResponse
constructor() {
super("github.actions.results.api.v1.FinalizeArtifactResponse", [
{ no: 1, name: "ok", kind: "scalar", T: 8 /*ScalarType.BOOL*/ },
{ no: 2, name: "artifact_id", kind: "scalar", T: 3 /*ScalarType.INT64*/, L: 0 /*LongType.BIGINT*/ }
{ no: 2, name: "artifact_id", kind: "scalar", T: 3 /*ScalarType.INT64*/ }
]);
}
create(value?: PartialMessage<FinalizeArtifactResponse>): FinalizeArtifactResponse {
const message = { ok: false, artifactId: 0n };
const message = { ok: false, artifactId: "0" };
globalThis.Object.defineProperty(message, MESSAGE_TYPE, { enumerable: false, value: this });
if (value !== undefined)
reflectionMergePartial<FinalizeArtifactResponse>(this, message, value);
@ -318,7 +318,7 @@ class FinalizeArtifactResponse$Type extends MessageType<FinalizeArtifactResponse
message.ok = reader.bool();
break;
case /* int64 artifact_id */ 2:
message.artifactId = reader.int64().toBigInt();
message.artifactId = reader.int64().toString();
break;
default:
let u = options.readUnknownField;
@ -336,7 +336,7 @@ class FinalizeArtifactResponse$Type extends MessageType<FinalizeArtifactResponse
if (message.ok !== false)
writer.tag(1, WireType.Varint).bool(message.ok);
/* int64 artifact_id = 2; */
if (message.artifactId !== 0n)
if (message.artifactId !== "0")
writer.tag(2, WireType.Varint).int64(message.artifactId);
let u = options.writeUnknownFields;
if (u !== false)

View File

@ -0,0 +1,174 @@
import {HttpClient, HttpClientResponse, HttpCodes} from '@actions/http-client'
import {BearerCredentialHandler} from '@actions/http-client/lib/auth'
import {info} from '@actions/core'
import {ArtifactServiceClientJSON} from '../../generated'
import {getResultsServiceUrl, getRuntimeToken} from './config'
// The twirp http client must implement this interface
interface Rpc {
request(
service: string,
method: string,
contentType: 'application/json' | 'application/protobuf',
data: object | Uint8Array
): Promise<object | Uint8Array>
}
class ArtifactHttpClient implements Rpc {
private httpClient: HttpClient
private baseUrl: string
private maxAttempts = 5
private baseRetryIntervalMilliseconds = 3000
private retryMultiplier = 1.5
constructor(
userAgent: string,
maxAttempts?: number,
baseRetryIntervalMilliseconds?: number,
retryMultiplier?: number
) {
const token = getRuntimeToken()
this.baseUrl = getResultsServiceUrl()
if (maxAttempts) {
this.maxAttempts = maxAttempts
}
if (baseRetryIntervalMilliseconds) {
this.baseRetryIntervalMilliseconds = baseRetryIntervalMilliseconds
}
if (retryMultiplier) {
this.retryMultiplier = retryMultiplier
}
this.httpClient = new HttpClient(userAgent, [
new BearerCredentialHandler(token)
])
}
// This function satisfies the Rpc interface. It is compatible with the JSON
// JSON generated client.
async request(
service: string,
method: string,
contentType: 'application/json' | 'application/protobuf',
data: object | Uint8Array
): Promise<object | Uint8Array> {
const url = `${this.baseUrl}/twirp/${service}/${method}`
const headers = {
'Content-Type': contentType
}
info(`Making request to ${url} with data: ${JSON.stringify(data)}`)
try {
const response = await this.retryableRequest(async () =>
this.httpClient.post(url, JSON.stringify(data), headers)
)
const body = await response.readBody()
return JSON.parse(body)
} catch (error) {
throw new Error(error.message)
}
}
async retryableRequest(
operation: () => Promise<HttpClientResponse>
): Promise<HttpClientResponse> {
let attempt = 0
let errorMessage = ''
while (attempt < this.maxAttempts) {
let isRetryable = false
try {
const response = await operation()
const statusCode = response.message.statusCode
if (this.isSuccessStatusCode(statusCode)) {
return response
}
isRetryable = this.isRetryableHttpStatusCode(statusCode)
errorMessage = `Failed request: (${statusCode}) ${response.message.statusMessage}`
} catch (error) {
isRetryable = true
errorMessage = error.message
}
if (!isRetryable) {
throw new Error(`Received non-retryable error: ${errorMessage}`)
}
if (attempt + 1 === this.maxAttempts) {
throw new Error(
`Failed to make request after ${this.maxAttempts} attempts: ${errorMessage}`
)
}
const retryTimeMilliseconds =
this.getExponentialRetryTimeMilliseconds(attempt)
info(
`Attempt ${attempt + 1} of ${
this.maxAttempts
} failed with error: ${errorMessage}. Retrying request in ${retryTimeMilliseconds} ms...`
)
await this.sleep(retryTimeMilliseconds)
attempt++
}
throw new Error(`Request failed`)
}
isSuccessStatusCode(statusCode?: number): boolean {
if (!statusCode) return false
return statusCode >= 200 && statusCode < 300
}
isRetryableHttpStatusCode(statusCode?: number): boolean {
if (!statusCode) return false
const retryableStatusCodes = [
HttpCodes.BadGateway,
HttpCodes.GatewayTimeout,
HttpCodes.InternalServerError,
HttpCodes.ServiceUnavailable,
HttpCodes.TooManyRequests,
413 // Payload Too Large
]
return retryableStatusCodes.includes(statusCode)
}
async sleep(milliseconds: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, milliseconds))
}
getExponentialRetryTimeMilliseconds(attempt: number): number {
if (attempt < 0) {
throw new Error('attempt should be a positive integer')
}
if (attempt === 0) {
return this.baseRetryIntervalMilliseconds
}
const minTime =
this.baseRetryIntervalMilliseconds * this.retryMultiplier ** attempt
const maxTime = minTime * this.retryMultiplier
// returns a random number between minTime and maxTime (exclusive)
return Math.trunc(Math.random() * (maxTime - minTime) + minTime)
}
}
export function createArtifactTwirpClient(
type: 'upload' | 'download',
maxAttempts?: number,
baseRetryIntervalMilliseconds?: number,
retryMultiplier?: number
): ArtifactServiceClientJSON {
const client = new ArtifactHttpClient(
`@actions/artifact-${type}`,
maxAttempts,
baseRetryIntervalMilliseconds,
retryMultiplier
)
return new ArtifactServiceClientJSON(client)
}

View File

@ -0,0 +1,15 @@
export function getRuntimeToken(): string {
const token = process.env['ACTIONS_RUNTIME_TOKEN']
if (!token) {
throw new Error('Unable to get the ACTIONS_RUNTIME_TOKEN env variable')
}
return token
}
export function getResultsServiceUrl(): string {
const resultsUrl = process.env['ACTIONS_RESULTS_URL']
if (!resultsUrl) {
throw new Error('Unable to get the ACTIONS_RESULTS_URL env variable')
}
return resultsUrl
}

View File

@ -13,7 +13,6 @@
]
},
"useUnknownInCatchVariables": false,
"target": "es2020"
},
"include": [
"./src"