diff --git a/plugins/apache-spark/package.json b/plugins/apache-spark/package.json index 28ec3df..f39369e 100644 --- a/plugins/apache-spark/package.json +++ b/plugins/apache-spark/package.json @@ -35,7 +35,8 @@ "yaml": "^2.3.1" }, "peerDependencies": { - "react": "^16.13.1 || ^17.0.0" + "react": "^16.13.1 || ^17.0.0", + "react-router-dom": "^6.14.1" }, "devDependencies": { "@backstage/cli": "^0.22.7", diff --git a/plugins/apache-spark/pi-success.yaml b/plugins/apache-spark/pi-success.yaml new file mode 100644 index 0000000..4f88afe --- /dev/null +++ b/plugins/apache-spark/pi-success.yaml @@ -0,0 +1,55 @@ +# +# Copyright 2017 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: "sparkoperator.k8s.io/v1beta2" +kind: SparkApplication +metadata: +# name: spark-pi + generateName: spark-pi + namespace: default +spec: + type: Python + pythonVersion: "3" + mode: cluster + image: "public.ecr.aws/r1l5w1y9/spark-operator:3.2.1-hadoop-3.3.1-java-11-scala-2.12-python-3.8-latest" + mainApplicationFile: "local:///opt/spark/examples/src/main/python/pi.py" + sparkVersion: "3.1.1" + restartPolicy: + type: Never + volumes: + - name: "test-volume" + hostPath: + path: "/tmp" + type: Directory + driver: + cores: 1 + coreLimit: "1200m" + memory: "512m" + labels: + version: 3.1.1 + serviceAccount: spark + volumeMounts: + - name: "test-volume" + mountPath: "/tmp" + executor: + cores: 1 + instances: 1 + memory: "512m" + labels: + version: 3.1.1 + volumeMounts: + - name: "test-volume" + mountPath: "/tmp" + diff --git a/plugins/apache-spark/pi.yaml b/plugins/apache-spark/pi.yaml index 4e90932..28a46a3 100644 --- a/plugins/apache-spark/pi.yaml +++ b/plugins/apache-spark/pi.yaml @@ -24,9 +24,7 @@ spec: pythonVersion: "3" mode: cluster image: "public.ecr.aws/m8u6z8z4/manabu-test:test-spark" - imagePullPolicy: Always - mainClass: org.apache.spark.examples.SparkPi - mainApplicationFile: "local:///opt/spark/examples/src/main/python/pi.ps" + mainApplicationFile: "local:///opt/spark/examples/src/main/python/pi.py" sparkVersion: "3.1.1" restartPolicy: type: Never diff --git a/plugins/apache-spark/src/api/index.ts b/plugins/apache-spark/src/api/index.ts index eb32777..16c560a 100644 --- a/plugins/apache-spark/src/api/index.ts +++ b/plugins/apache-spark/src/api/index.ts @@ -1,5 +1,5 @@ import { createApiRef } from '@backstage/core-plugin-api'; -import { ApacheSpark, ApacheSparkList } from './model'; +import { ApacheSpark, ApacheSparkList, Pod } from './model'; import { KubernetesApi } from '@backstage/plugin-kubernetes'; export const apacheSparkApiRef = createApiRef({ @@ -22,6 +22,20 @@ export interface ApacheSparkApi { namespace: string | undefined, name: string, ): Promise; + + getLogs( + clusterName: string | undefined, + namespace: string | undefined, + podName: string, + containerName?: string | undefined, + tailLine?: number, + ): Promise; + + getContainers( + clusterName: string | undefined, + namespace: string | undefined, + podName: string, + ): Promise; } export class ApacheSparkClient implements ApacheSparkApi { @@ -82,6 +96,63 @@ export class ApacheSparkClient implements ApacheSparkApi { return out; } + async getLogs( + clusterName: string | undefined, + namespace: string | undefined, + podName: string, + containerName: string | undefined, + tailLine: number = 1000, + ): Promise { + const ns = namespace !== undefined ? namespace : 'default'; + const path = `/api/v1/namespaces/${ns}/pods/${podName}/log`; + const query = new URLSearchParams({ + tailLines: tailLine.toString(), + }); + if (containerName) { + query.set('container', containerName); + } + + const resp = await this.kubernetesApi.proxy({ + clusterName: + clusterName !== undefined ? clusterName : await this.getFirstCluster(), + path: `${path}?${query.toString()}`, + }); + if (!resp.ok) { + return Promise.reject( + `failed to fetch logs: ${resp.status}, ${ + resp.statusText + }, ${await resp.text()}`, + ); + } + return resp.text(); + } + + async getContainers( + clusterName: string | undefined, + namespace: string | undefined, + podName: string, + ): Promise { + const ns = namespace !== undefined ? namespace : 'default'; + const path = `/api/v1/namespaces/${ns}/pods/${podName}`; + const query = new URLSearchParams({ + [K8s_API_TIMEOUT]: '30', + }); + const resp = await this.kubernetesApi.proxy({ + clusterName: + clusterName !== undefined ? clusterName : await this.getFirstCluster(), + path: `${path}?${query.toString()}`, + }); + if (!resp.ok) { + throw new Error( + `failed to fetch logs: ${resp.status}, ${ + resp.statusText + }, ${await resp.text()}`, + ); + } + const pod = JSON.parse(await resp.text()) as Pod; + return pod.spec.containers.map(c => c.name); + } + async getFirstCluster(): Promise { const clusters = await this.kubernetesApi.getClusters(); if (clusters.length > 0) { diff --git a/plugins/apache-spark/src/api/model.ts b/plugins/apache-spark/src/api/model.ts index 2c1c282..e2cac1c 100644 --- a/plugins/apache-spark/src/api/model.ts +++ b/plugins/apache-spark/src/api/model.ts @@ -61,7 +61,7 @@ export type Status = { webUIServiceName: string; }; executionAttempts: number; - executorState: Record; + executorState: { [key: string]: string }; lastSubmissionAttemptTime: string; sparkApplicationId: string; submissionAttempts: number; @@ -82,3 +82,17 @@ export type ApacheSparkList = { kind: string; items?: ApacheSpark[]; }; + +export type Pod = { + apiVersion: string; + kind: string; + metadata: Metadata; + spec: PodSpec; +}; + +export type PodSpec = { + containers: { + image: string; + name: string; + }[]; +}; diff --git a/plugins/apache-spark/src/components/ApacheSparkLogs/ApacheSparkLogs.tsx b/plugins/apache-spark/src/components/ApacheSparkLogs/ApacheSparkLogs.tsx new file mode 100644 index 0000000..162d0b6 --- /dev/null +++ b/plugins/apache-spark/src/components/ApacheSparkLogs/ApacheSparkLogs.tsx @@ -0,0 +1,95 @@ +import { useApi } from '@backstage/core-plugin-api'; +import { apacheSparkApiRef } from '../../api'; +import useAsync from 'react-use/lib/useAsync'; +import { ApacheSpark, ApacheSparkList } from '../../api/model'; +import { + LogViewer, + Progress, + Select, + SelectedItems, + SelectItem, +} from '@backstage/core-components'; +import Alert from '@material-ui/lab/Alert'; +import React, { useEffect, useState } from 'react'; + +export const ApacheSparkDriverLogs = (props: { sparkApp: ApacheSpark }) => { + const apiClient = useApi(apacheSparkApiRef); + + const { value, loading, error } = useAsync(async (): Promise => { + return await apiClient.getLogs( + 'cnoe-packaging-2', + 'default', + props.sparkApp.status.driverInfo.podName, + 'spark-kubernetes-driver', + ); + }, [props]); + if (loading) { + return ; + } else if (error) { + return {`${error}`}; + } + return ; +}; + +const ExecutorLogs = (props: { name: string }) => { + const apiClient = useApi(apacheSparkApiRef); + const [logs, setLogs] = useState(''); + useEffect(() => { + async function getLogs() { + try { + const val = await apiClient.getLogs( + 'cnoe-packaging-2', + 'default', + props.name, + 'spark-kubernetes-executor', + ); + setLogs(val); + } catch (e) { + if (typeof e === 'string') { + setLogs(e); + } + } + } + if (props.name !== '') { + getLogs(); + } + }, [apiClient, props]); + + return ; +}; + +export const ApacheSparkExecutorLogs = (props: { sparkApp: ApacheSpark }) => { + const [selected, setSelected] = useState(''); + if (props.sparkApp.status.applicationState.state !== 'RUNNING') { + return ( + + Executor logs are only available for Spark Applications in RUNNING state + + ); + } + const executors: SelectItem[] = [{ label: '', value: '' }]; + for (const key in props.sparkApp.status.executorState) { + if (props.sparkApp.status.executorState.hasOwnProperty(key)) { + executors.push({ label: key, value: key }); + } + } + + const handleChange = (item: SelectedItems) => { + // setSelected(item); + // return; + if (typeof item === 'string' && item !== '' && item !== '[]') { + setSelected(item); + } + }; + return ( + <> +