wait/utils.go

Summary

Maintainability
A
0 mins
Test Coverage
B
88%
// Copyright (c) 2019-2022 Wibowo Arindrarto <contact@arindrarto.dev>
// SPDX-License-Identifier: BSD-3-Clause

package wait

import (
    "net"
    "os"
    "sync"
    "syscall"
)

// statusValues are the string representation of the Status enums.
var statusValues = []string{"start", "ready", "failed"}

// Status enumerates possible waiting status.
type Status int

const (
    // Start is the status emitted at the beginning of the wait operation.
    Start Status = iota
    // Ready is the status for when the wait operation finishes successfully.
    Ready
    // Failed is the status for when the wait operation failed.
    Failed
)

// String returns the string representation of the Status enum.
func (s Status) String() string {
    return statusValues[s]
}

// shouldWait checks that a given error represents a condition in which we should still wait and
// attempt a connection or not.
// Currently this covers two broad classes of errors:
//        1) I/O timeout errors
//        2) connection refused (server not ready) errors. Note that this has only been tested on
//           POSIX systems.
func shouldWait(err error) bool {
    // First case: i/o timeout.
    if os.IsTimeout(err) {
        return true
    }

    // Second case: connection refused -- remote server not ready.
    if opErr, isOpErr := err.(*net.OpError); isOpErr {
        ierr := opErr.Unwrap()
        if syscallErr, isSyscallErr := ierr.(*os.SyscallError); isSyscallErr {
            iierr := syscallErr.Unwrap()

            return iierr == syscall.ECONNREFUSED
        }
    }

    return false
}

// merge merges an array of channels into one channel.
// Adapted from: https://blog.golang.org/pipelines
func merge(chs []<-chan *TCPMessage) <-chan *TCPMessage {
    var wg sync.WaitGroup
    merged := make(chan *TCPMessage)

    forward := func(ch <-chan *TCPMessage) {
        for msg := range ch {
            merged <- msg
        }
        wg.Done()
    }

    wg.Add(len(chs))
    for _, ch := range chs {
        go forward(ch)
    }

    go func() {
        wg.Wait()
        close(merged)
    }()

    return merged
}