From f8b55bdcbcddd9b35208e9f20aabdba3484cde91 Mon Sep 17 00:00:00 2001 From: Manabu Mccloskey Date: Fri, 24 Feb 2023 19:38:50 -0800 Subject: [PATCH] wait for workflow completion --- packages/backend/src/plugins/scaffolder.ts | 2 - packages/backend/src/plugins/workflow-argo.ts | 68 +++++++++++++++++-- 2 files changed, 61 insertions(+), 9 deletions(-) diff --git a/packages/backend/src/plugins/scaffolder.ts b/packages/backend/src/plugins/scaffolder.ts index f052114..5fb6481 100644 --- a/packages/backend/src/plugins/scaffolder.ts +++ b/packages/backend/src/plugins/scaffolder.ts @@ -19,8 +19,6 @@ export default async function createPlugin( config: env.config, reader: env.reader, }); - console.log(`env.logger ${env.logger}`) - env.logger.info("HIIIII") const actions = [...builtInActions, createInvokeArgoAction(env.config, env.logger)]; return await createRouter({ diff --git a/packages/backend/src/plugins/workflow-argo.ts b/packages/backend/src/plugins/workflow-argo.ts index 00278fa..fd1f57a 100644 --- a/packages/backend/src/plugins/workflow-argo.ts +++ b/packages/backend/src/plugins/workflow-argo.ts @@ -24,7 +24,7 @@ class Workflow { kind: string = argoWorkFlowKind metadata: k8s.V1ObjectMeta = argoWorkFlowMetadataDefault spec: workflowSpec - + status?: workflowStatus constructor(templateName: string, namespace: string, params?: parameter[], artifacts?: object[] ) { this.metadata.namespace = namespace const args: argument = {} @@ -49,6 +49,18 @@ type workflowSpec = { workflowTemplateRef: workflowTemplateRef } +type workflowStatus = { + conditions?: workflowStatusCondition[] + phase?: string + progress?: string +} + +type workflowStatusCondition = { + message?: string + status?: string + type: string +} + type workflowTemplateRef = { clusterScope?: boolean name: string @@ -111,15 +123,19 @@ export function createInvokeArgoAction(config: Config, logger: Logger) { output: { type: 'object', properties: { - ID: { - title: 'Workflow ID', + workflowName: { + title: 'Workflow name', + type: 'string', + }, + workflowNamespace: { + title: 'Workflow namespace', type: 'string', }, }, }, }, async handler(ctx: ActionContext) { - logger.debug(`Invoked with ${ctx.input}`) + logger.debug(`Invoked with ${JSON.stringify(ctx.input)})`) const targetCluster = getClusterConfig(ctx.input.clusterName, config) const kc = new k8s.KubeConfig() @@ -143,14 +159,16 @@ export function createInvokeArgoAction(config: Config, logger: Logger) { const client = kc.makeApiClient(k8s.CustomObjectsApi) const wf = new Workflow(ctx.input.templateName, ctx.input.namespace, ctx.input.parameters) // const body = generateBody(ctx.input.templateName, ctx.input.namespace) - try { const resp = await client.createNamespacedCustomObject( argoWorkflowsGroup, argoWorkflowsVersion, ctx.input.namespace, argoWorkFlowPlural, wf ) - logger.debug(`response: ${resp.body}`) - ctx.output('ID', resp.body.toString()) + const respBody = resp.body as Workflow + logger.debug(`Workflow ID: ${respBody.metadata.name}, namespace ${respBody.metadata.namespace}`) + ctx.output('workflowName', respBody.metadata.name!) + ctx.output('workflowNamespace', respBody.metadata.namespace!) + await wait(kc, respBody.metadata.namespace!, respBody.metadata.name!) } catch (err) { if (err instanceof HttpError) { let msg = `${err.response.statusMessage}: ` @@ -191,3 +209,39 @@ function getClusterConfig(name: string, config: Config): Config { } return clusters[0] } + +async function wait(kc: k8s.KubeConfig, namespace: string, name: string, timeoutSeconds: number = 120) { + const client = new k8s.Watch(kc) + return new Promise( async (resolve, reject) => { + const result = await client.watch( + `/apis/${argoWorkflowsGroup}/${argoWorkflowsVersion}/namespaces/${namespace}/${argoWorkFlowPlural}`, + { + fieldSelector: `metadata.name=${name}`, + }, + (_type, apiObj, _watchObj) => { + if (apiObj) { + const wf = apiObj as Workflow + if (wf.status && wf.status.conditions) { + const cond = wf.status.conditions.filter((val) => { + return val.type === 'Completed' && val.status === "True" + }) + if (cond.length > 0) { + // result.abort() + resolve() + return + } + } + } + }, + (err) => { + if (err instanceof Error) { + // logger.debug(`error encountered while waiting for workflow to complete: ${err.name} ${err.message}`) + } + } + ) + setTimeout(() => { + result.abort() + reject(new Error("TIMEOUT")) + }, timeoutSeconds * 1000) + }) +}