cloudfoundry/stratos

View on GitHub
src/frontend/packages/kubernetes/src/kubernetes/kube-config-registration/kube-config-import/kube-config-import.component.ts

Summary

Maintainability
B
4 hrs
Test Coverage
import { Component, ComponentFactoryResolver, Injector, OnDestroy } from '@angular/core';
import { FormBuilder } from '@angular/forms';
import { Store } from '@ngrx/store';
import { BehaviorSubject, Observable, of as observableOf, Subscription } from 'rxjs';
import { distinctUntilChanged, filter, first, map, pairwise, startWith, withLatestFrom } from 'rxjs/operators';

import { EndpointsService } from '../../../../../core/src/core/endpoints.service';
import { safeUnsubscribe } from '../../../../../core/src/core/utils.service';
import {
  ConnectEndpointConfig,
  ConnectEndpointData,
  ConnectEndpointService,
} from '../../../../../core/src/features/endpoints/connect.service';
import {
  IActionMonitorComponentState,
} from '../../../../../core/src/shared/components/app-action-monitor-icon/app-action-monitor-icon.component';
import {
  ITableListDataSource,
  RowState,
} from '../../../../../core/src/shared/components/list/data-sources-controllers/list-data-source-types';
import { ITableColumn } from '../../../../../core/src/shared/components/list/list-table/table.types';
import { StepOnNextFunction } from '../../../../../core/src/shared/components/stepper/step/step.component';
import { AppState } from '../../../../../store/src/public-api';
import { ActionState } from '../../../../../store/src/reducers/api-request-reducer/types';
import { stratosEntityCatalog } from '../../../../../store/src/stratos-entity-catalog';
import { KUBERNETES_ENDPOINT_TYPE } from '../../kubernetes-entity-factory';
import { KubeConfigAuthHelper } from '../kube-config-auth.helper';
import { KubeConfigFileCluster, KubeConfigImportAction, KubeImportState } from '../kube-config.types';
import {
  KubeConfigTableImportStatusComponent,
} from './kube-config-table-import-status/kube-config-table-import-status.component';

const REGISTER_ACTION = 'Register endpoint';
const CONNECT_ACTION = 'Connect endpoint';

@Component({
  selector: 'app-kube-config-import',
  templateUrl: './kube-config-import.component.html',
  styleUrls: ['./kube-config-import.component.scss']
})
export class KubeConfigImportComponent implements OnDestroy {

  done = new BehaviorSubject<boolean>(false);
  done$ = this.done.asObservable();
  busy = new BehaviorSubject<boolean>(false);
  busy$ = this.busy.asObservable();
  data = new BehaviorSubject<KubeConfigImportAction[]>([]);
  data$ = this.data.asObservable();

  public dataSource: ITableListDataSource<KubeConfigImportAction> = {
    connect: () => this.data$,
    disconnect: () => { },
    // Ensure unique per entry to step (in case user went back step and updated)
    trackBy: (index, item) => item.cluster.name + this.iteration,
    isTableLoading$: this.data$.pipe(map(data => !(data && data.length > 0))),
    getRowState: (row: KubeConfigImportAction): Observable<RowState> => {
      return row ? row.state.asObservable() : observableOf({});
    }
  };
  public columns: ITableColumn<KubeConfigImportAction>[] = [
    {
      columnId: 'action', headerCell: () => 'Action',
      cellDefinition: {
        valuePath: 'action'
      },
      cellFlex: '1',
    },
    {
      columnId: 'description', headerCell: () => 'Description',
      cellDefinition: {
        valuePath: 'description'
      },
      cellFlex: '4',
    },
    // Right-hand column to show the action progress
    {
      columnId: 'monitorState',
      cellComponent: KubeConfigTableImportStatusComponent,
      cellConfig: (row) => row.actionState.asObservable(),
      cellFlex: '0 0 24px'
    }
  ];

  subs: Subscription[] = [];
  applyStarted: boolean;
  private iteration = 0;

  private connectService: ConnectEndpointService;

  constructor(
    public store: Store<AppState>,
    public resolver: ComponentFactoryResolver,
    private injector: Injector,
    private fb: FormBuilder,
    private endpointsService: EndpointsService,
  ) {
  }

  // Process the next action in the list
  private processAction(actions: KubeConfigImportAction[]) {
    if (actions.length === 0) {
      // We are done
      this.done.next(true);
      this.busy.next(false);
      return;
    }

    // Get the next action
    const i = actions.shift();
    if (i.action === REGISTER_ACTION) {
      this.doRegister(i, actions);
    } else if (i.action === CONNECT_ACTION) {
      this.doConnect(i, actions);
    } else {
      // Do the next action
      this.processAction(actions);
    }
  }

  private doRegister(reg: KubeConfigImportAction, next: KubeConfigImportAction[]) {
    const obs$ = this.registerEndpoint(
      reg.cluster.name,
      reg.cluster.cluster.server,
      reg.cluster.cluster['insecure-skip-tls-verify'],
      reg.cluster._subType
    );
    const mainObs$ = this.getUpdatingState(obs$).pipe(
      startWith({ busy: true, error: false, completed: false })
    );

    this.subs.push(mainObs$.subscribe(reg.actionState));

    const sub = reg.actionState.subscribe(progress => {
      // Not sure what the status is used for?
      reg.status = progress;
      if (progress.error && progress.message) {
        // Mark all dependency jobs as skip
        next.forEach(action => {
          if (action.depends === reg) {
            // Mark it as skipped by setting the action to null
            action.action = null;
            action.state.next({ message: 'Skipping action as endpoint could not be registered', warning: true });
          }
        });
        reg.state.next({ message: progress.message, error: true });
      }
      if (progress.completed) {
        if (!progress.error) {
          // If we created okay, then guid is in the message
          reg.cluster._guid = progress.message;
        }
        sub.unsubscribe();
        // Do the next one
        this.processAction(next);
      }
    });
    this.subs.push(sub);
  }

  private doConnect(connect: KubeConfigImportAction, next: KubeConfigImportAction[]) {
    if (!connect.user) {
      connect.state.next({ message: 'Can not connect - no user specified', error: true });
      return;
    }
    const helper = new KubeConfigAuthHelper();
    const data = helper.getAuthDataForConnect(this.resolver, this.injector, this.fb, connect.user);
    if (data) {
      const obs$ = this.connectEndpoint(connect, data);

      // Echo obs$ to the behaviour subject
      this.subs.push(obs$.subscribe(connect.actionState));

      this.subs.push(connect.actionState.pipe(filter(status => status.completed), first()).subscribe(status => {
        if (status.error) {
          connect.state.next({ message: status.message, error: true });
        }
        this.processAction(next);
      }));
    } else {
      connect.state.next({ message: 'Can not connect - could not get user auth data', error: true });
    }
  }

  ngOnDestroy() {
    safeUnsubscribe(...this.subs);

    if (this.connectService) {
      this.connectService.destroy();
    }
  }

  // Register the endpoint
  private registerEndpoint(name: string, url: string, skipSslValidation: boolean, subType: string) {
    return stratosEntityCatalog.endpoint.api.register<ActionState>(
      KUBERNETES_ENDPOINT_TYPE,
      subType,
      name,
      url,
      skipSslValidation,
      '',
      '',
      false
    ).pipe(
      filter(update => !!update)
    );
  }

  // Connect to an endpoint
  private connectEndpoint(action: KubeConfigImportAction, pData: ConnectEndpointData): Observable<IActionMonitorComponentState> {
    const config: ConnectEndpointConfig = {
      name: action.cluster.name,
      guid: action.depends.cluster._guid || action.cluster._guid,
      type: KUBERNETES_ENDPOINT_TYPE,
      subType: action.user._authData.subType,
      ssoAllowed: false
    };

    if (this.connectService) {
      this.connectService.destroy();
    }
    this.connectService = new ConnectEndpointService(this.endpointsService, config);
    this.connectService.setData(pData);
    return this.connectService.submit().pipe(
      map(updateSection => ({
        busy: false,
        error: !updateSection.success,
        completed: true,
        message: updateSection.errorMessage
      })),
      startWith({
        message: '',
        busy: true,
        completed: false,
        error: false
      })
    );
  }

  // Enter the step - process the list of clusters to import
  onEnter = (data: KubeConfigFileCluster[]) => {
    this.applyStarted = false;
    this.iteration += 1;
    const imports: KubeConfigImportAction[] = [];
    data.forEach(item => {
      if (item._selected) {
        const register = {
          action: REGISTER_ACTION,
          description: `Register "${item.name}" with the URL "${item.cluster.server}"`,
          cluster: item,
          state: new BehaviorSubject<RowState>({}),
          actionState: new BehaviorSubject<any>({}),
        };
        // Only include if the endpoint does not already exist
        if (!item._guid) {
          imports.push(register);
        }
        if (item._additionalUserInfo) {
          return;
        }
        const user = item._users.find(u => u.name === item._user);
        if (user) {
          imports.push({
            action: CONNECT_ACTION,
            description: `Connect "${item.name}" with the user "${user.name}"`,
            cluster: item,
            user,
            state: new BehaviorSubject<RowState>({}),
            depends: register,
            actionState: new BehaviorSubject<any>({}),
          });
        }
      }
    });
    this.data.next(imports);
  };

  // Finish - go back to the endpoints view
  onNext: StepOnNextFunction = () => {
    if (this.applyStarted) {
      // this.store.dispatch(new RouterNav({ path: ['endpoints'] }));
      return observableOf({ success: true, redirect: true });

    } else {
      this.applyStarted = true;
      this.busy.next(true);
      this.data$.pipe(
        filter((data => data && data.length > 0)),
        first()
      ).subscribe(imports => {
        // Go through the imports and dispatch the actions to perform them in sequence
        this.processAction([...imports]);
      });
      return observableOf({ success: true, ignoreSuccess: true });
    }
  };

  // These two should be somewhere else
  private getUpdatingState(actionState$: Observable<ActionState>): Observable<KubeImportState> {
    const completed$ = this.getHasCompletedObservable(actionState$.pipe(map(requestState => requestState.busy)));
    return actionState$.pipe(
      pairwise(),
      withLatestFrom(completed$),
      map(([[, requestState], completed]) => {
        return {
          busy: requestState.busy,
          error: requestState.error,
          completed,
          message: requestState.message,
        };
      })
    );
  }

  private getHasCompletedObservable(busy$: Observable<boolean>) {
    return busy$.pipe(
      distinctUntilChanged(),
      pairwise(),
      map(([oldBusy, newBusy]) => oldBusy && !newBusy),
      startWith(false),
    );
  }

}