wait for workflow completion

This commit is contained in:
Manabu Mccloskey 2023-02-24 19:38:50 -08:00
parent 09195317e6
commit f8b55bdcbc
2 changed files with 61 additions and 9 deletions

View file

@ -19,8 +19,6 @@ export default async function createPlugin(
config: env.config,
reader: env.reader,
});
console.log(`env.logger ${env.logger}`)
env.logger.info("HIIIII")
const actions = [...builtInActions, createInvokeArgoAction(env.config, env.logger)];
return await createRouter({

View file

@ -24,7 +24,7 @@ class Workflow {
kind: string = argoWorkFlowKind
metadata: k8s.V1ObjectMeta = argoWorkFlowMetadataDefault
spec: workflowSpec
status?: workflowStatus
constructor(templateName: string, namespace: string, params?: parameter[], artifacts?: object[] ) {
this.metadata.namespace = namespace
const args: argument = {}
@ -49,6 +49,18 @@ type workflowSpec = {
workflowTemplateRef: workflowTemplateRef
}
type workflowStatus = {
conditions?: workflowStatusCondition[]
phase?: string
progress?: string
}
type workflowStatusCondition = {
message?: string
status?: string
type: string
}
type workflowTemplateRef = {
clusterScope?: boolean
name: string
@ -111,15 +123,19 @@ export function createInvokeArgoAction(config: Config, logger: Logger) {
output: {
type: 'object',
properties: {
ID: {
title: 'Workflow ID',
workflowName: {
title: 'Workflow name',
type: 'string',
},
workflowNamespace: {
title: 'Workflow namespace',
type: 'string',
},
},
},
},
async handler(ctx: ActionContext<argoInput>) {
logger.debug(`Invoked with ${ctx.input}`)
logger.debug(`Invoked with ${JSON.stringify(ctx.input)})`)
const targetCluster = getClusterConfig(ctx.input.clusterName, config)
const kc = new k8s.KubeConfig()
@ -143,14 +159,16 @@ export function createInvokeArgoAction(config: Config, logger: Logger) {
const client = kc.makeApiClient(k8s.CustomObjectsApi)
const wf = new Workflow(ctx.input.templateName, ctx.input.namespace, ctx.input.parameters)
// const body = generateBody(ctx.input.templateName, ctx.input.namespace)
try {
const resp = await client.createNamespacedCustomObject(
argoWorkflowsGroup, argoWorkflowsVersion, ctx.input.namespace,
argoWorkFlowPlural, wf
)
logger.debug(`response: ${resp.body}`)
ctx.output('ID', resp.body.toString())
const respBody = resp.body as Workflow
logger.debug(`Workflow ID: ${respBody.metadata.name}, namespace ${respBody.metadata.namespace}`)
ctx.output('workflowName', respBody.metadata.name!)
ctx.output('workflowNamespace', respBody.metadata.namespace!)
await wait(kc, respBody.metadata.namespace!, respBody.metadata.name!)
} catch (err) {
if (err instanceof HttpError) {
let msg = `${err.response.statusMessage}: `
@ -191,3 +209,39 @@ function getClusterConfig(name: string, config: Config): Config {
}
return clusters[0]
}
async function wait(kc: k8s.KubeConfig, namespace: string, name: string, timeoutSeconds: number = 120) {
const client = new k8s.Watch(kc)
return new Promise<void>( async (resolve, reject) => {
const result = await client.watch(
`/apis/${argoWorkflowsGroup}/${argoWorkflowsVersion}/namespaces/${namespace}/${argoWorkFlowPlural}`,
{
fieldSelector: `metadata.name=${name}`,
},
(_type, apiObj, _watchObj) => {
if (apiObj) {
const wf = apiObj as Workflow
if (wf.status && wf.status.conditions) {
const cond = wf.status.conditions.filter((val) => {
return val.type === 'Completed' && val.status === "True"
})
if (cond.length > 0) {
// result.abort()
resolve()
return
}
}
}
},
(err) => {
if (err instanceof Error) {
// logger.debug(`error encountered while waiting for workflow to complete: ${err.name} ${err.message}`)
}
}
)
setTimeout(() => {
result.abort()
reject(new Error("TIMEOUT"))
}, timeoutSeconds * 1000)
})
}