
View on GitHub


0 mins
Test Coverage
package repositories

import (

    apierrors ""
    korifiv1alpha1 ""

    corev1 ""

const (
    appLogSourceType = "APP"

type PodRepo struct {
    userClientFactory authorization.UserK8sClientFactory

func NewPodRepo(userClientFactory authorization.UserK8sClientFactory) *PodRepo {
    return &PodRepo{
        userClientFactory: userClientFactory,

func (r *PodRepo) listPods(ctx context.Context, authInfo authorization.Info, listOpts client.ListOptions) ([]corev1.Pod, error) {
    userClient, err := r.userClientFactory.BuildClient(authInfo)
    if err != nil {
        return nil, fmt.Errorf("failed to build user client: %w", err)

    podList := corev1.PodList{}
    err = userClient.List(ctx, &podList, &listOpts)
    if err != nil {
        return nil, fmt.Errorf("failed to list pods: %w", apierrors.FromK8sError(err, PodResourceType))

    return podList.Items, nil

type RuntimeLogsMessage struct {
    SpaceGUID   string
    AppGUID     string
    AppRevision string
    Limit       int64

func (r *PodRepo) GetRuntimeLogsForApp(ctx context.Context, logger logr.Logger, authInfo authorization.Info, message RuntimeLogsMessage) ([]LogRecord, error) {
    labelSelector, err := labels.ValidatedSelectorFromSet(map[string]string{
        korifiv1alpha1.CFAppGUIDLabelKey:  message.AppGUID,
        "": message.AppRevision,
    if err != nil {
        return nil, fmt.Errorf("failed to build labelSelector: %w", err)
    listOpts := client.ListOptions{Namespace: message.SpaceGUID, LabelSelector: labelSelector}

    pods, err := r.listPods(ctx, authInfo, listOpts)
    if err != nil {
        return nil, err

    appLogs := make([]LogRecord, 0, int64(len(pods))*message.Limit/2)

    k8sClient, err := r.userClientFactory.BuildK8sClient(authInfo)
    if err != nil {
        return nil, fmt.Errorf("failed to build user client: %w", err)

    for _, pod := range pods {
        var logReadCloser io.ReadCloser
        logReadCloser, err = k8sClient.CoreV1().Pods(message.SpaceGUID).GetLogs(pod.Name, &corev1.PodLogOptions{Timestamps: true, TailLines: &message.Limit}).Stream(ctx)
        if err != nil {
            // untested
            logger.Info("failed to fetch logs", "pod", pod.Name, "reason", err)

        r := bufio.NewReader(logReadCloser)
        for {
            var line []byte
            line, err = r.ReadBytes('\n')
            if err != nil {
                if err == io.EOF {
                } else {
                    _ = logReadCloser.Close()
                    return nil, fmt.Errorf("failed to parse pod logs: %w", err)

            logRecord := lineToAppLogRecord(line)

            appLogs = append(appLogs, logRecord)

        _ = logReadCloser.Close()

    return appLogs, nil

func lineToAppLogRecord(line []byte) LogRecord {
    logLine := string(line)
    var logTime int64
    logLine, logTime, _ = parseRFC3339NanoTime(logLine)

    // trim trailing newlines so that the CLI doesn't render extra log lines for them
    logLine = strings.TrimRight(logLine, "\r\n")

    logRecord := LogRecord{
        Message:   logLine,
        Timestamp: logTime,
        Tags: map[string]string{
            "source_type": appLogSourceType,
    return logRecord

func parseRFC3339NanoTime(input string) (string, int64, error) {
    if len(input) < 30 {
        return input, 0, fmt.Errorf("string not long enough")

    t, err := time.Parse(time.RFC3339Nano, input[:30])
    if err != nil {
        return input, 0, err

    return input[31:], t.UnixNano(), nil