core/metrics/otlp.go
package metrics
import (
"context"
"crypto/tls"
"fmt"
"google.golang.org/grpc/credentials"
"strings"
"time"
"github.com/synapsecns/sanguine/core"
"github.com/synapsecns/sanguine/core/config"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
tracesdk "go.opentelemetry.io/otel/sdk/trace"
)
type otlpHandler struct {
*baseHandler
buildInfo config.BuildInfo
}
// NewOTLPMetricsHandler creates a new newrelic metrics handler.
func NewOTLPMetricsHandler(buildInfo config.BuildInfo) Handler {
return &otlpHandler{
buildInfo: buildInfo,
baseHandler: newBaseHandler(buildInfo),
}
}
func (n *otlpHandler) Start(ctx context.Context) (err error) {
var exporters []tracesdk.SpanExporter
primaryExporter, err := makeOTLPExporter(ctx, "")
if err != nil {
return fmt.Errorf("could not create default client: %w", err)
}
exporters = append(exporters, primaryExporter)
// Loop to create additional exporters
for i := 1; ; i++ {
envSuffix := fmt.Sprintf("_%d", i)
// if this is empty we can assume no config exists at all.
endpointEnv := otelEndpointEnv + envSuffix
// no more transports to add.
if !core.HasEnv(endpointEnv) {
break
}
exporter, err := makeOTLPExporter(ctx, envSuffix)
if err != nil {
return fmt.Errorf("could not create exporter %d: %w", i, err)
}
exporters = append(exporters, exporter)
}
// create the multi-exporter with all the exporters
multiExporter := NewMultiExporter(exporters...)
n.baseHandler = newBaseHandler(
n.buildInfo,
tracesdk.WithBatcher(
multiExporter,
tracesdk.WithMaxQueueSize(defaultMaxQueueSize),
tracesdk.WithMaxExportBatchSize(defaultMaxExportBatch),
),
tracesdk.WithSampler(tracesdk.AlwaysSample()),
)
// start the new parent
err = n.baseHandler.Start(ctx)
if err != nil {
return fmt.Errorf("could not start base handler: %w", err)
}
go func() {
handleShutdown(ctx, n.baseHandler.unwrappedTP)
}()
return nil
}
func (n *otlpHandler) Type() HandlerType {
return OTLP
}
// wait for the context to be canceled.
// then flush the traces and shutdown the exporter.
func handleShutdown(ctx context.Context, provider *tracesdk.TracerProvider) {
<-ctx.Done()
const spanWaitTime = time.Millisecond
const shutdownAllowance = time.Second * 10
// allow only 10 seconds for graceful shutdown.
// we use without cancel to copy the parents values while making sure our derived context is not canceled.
shutdownCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), shutdownAllowance)
defer cancel()
// don't shutdown immediately, wait for a bit to allow the last spans to be sent. This is in process and should be aymptotic to instant.
time.Sleep(spanWaitTime)
err := provider.ForceFlush(shutdownCtx)
if err != nil {
logger.Warnf("could not flush traces: %v", err)
}
err = provider.Shutdown(shutdownCtx)
if err != nil {
logger.Warnf("could not shutdown traces: %v", err)
}
}
const (
otelEndpointEnv = "OTEL_EXPORTER_OTLP_ENDPOINT"
otelTransportEnv = "OTEL_EXPORTER_OTLP_TRANSPORT"
otelInsecureEvn = "OTEL_EXPORTER_OTLP_SECURE_MODE"
otelHeadersEnv = "OTEL_EXPORTER_OTLP_HEADERS"
)
//go:generate go run golang.org/x/tools/cmd/stringer -type=otlpTransportType -linecomment
type otlpTransportType uint8
const (
otlpTransportHTTP otlpTransportType = iota + 1 // http
otlpTransportGRPC // grpc
)
// getEnvSuffix returns the value of an environment variable with a suffix.
func getEnvSuffix(env, suffix, defaultRet string) string {
return core.GetEnv(makeEnv(env, suffix), defaultRet)
}
func makeEnv(env, suffix string) string {
return env + suffix
}
// makeOTLPTrace creates a new OTLP client based on the transport type and url.
func makeOTLPExporter(ctx context.Context, envSuffix string) (*otlptrace.Exporter, error) {
transport := transportFromString(getEnvSuffix(otelTransportEnv, envSuffix, otlpTransportGRPC.String()))
url := getEnvSuffix(otelEndpointEnv, envSuffix, "")
secure := core.GetEnvBool(makeEnv(otelInsecureEvn, envSuffix), false)
headers := getEnvSuffix(otelHeadersEnv, envSuffix, "")
isCorrect := envSuffix != ""
if isCorrect != secure {
return nil, fmt.Errorf("could not create exporter: secure mode is not set correctly")
}
// I spent about 2 hours trying to figure out why this was failing to no avail. I'm going to leave it as is for now.
// My best guess is the issue is around the tsl config.
// Should you attempt to fix this and fail, please increment the counter above, although I send my umost encouragement.
if secure && transport == otlpTransportHTTP {
return nil, fmt.Errorf("could not create exporter: http transport does not support secure mode")
}
oteltraceClient, err := buildClientFromTransport(
transport,
withURL(url),
// defaults to true
withSecure(secure),
withHeaders(headers),
)
if err != nil {
return nil, fmt.Errorf("could not create client from transport: %w", err)
}
exporter, err := otlptrace.New(ctx, oteltraceClient)
if err != nil {
return nil, fmt.Errorf("ocould not create client: %w", err)
}
return exporter, nil
}
// buildClientFromTransport creates a new OTLP client based on the transport type and url.
func buildClientFromTransport(transport otlpTransportType, options ...Option) (otlptrace.Client, error) {
to := transportOptions{}
for _, option := range options {
if err := option(&to); err != nil {
return nil, fmt.Errorf("could not apply option: %w", err)
}
}
// TODO: make sure url is validated
switch transport {
case otlpTransportHTTP:
return otlptracehttp.NewClient(to.httpOptions...), nil
case otlpTransportGRPC:
return otlptracegrpc.NewClient(to.grpcOptions...), nil
default:
return nil, fmt.Errorf("unknown transport type: %s", transport.String())
}
}
type transportOptions struct {
// httpOptions are the options for the http transport.
httpOptions []otlptracehttp.Option
// grpcOptions are the options for the grpc transport.
grpcOptions []otlptracegrpc.Option
}
// Option Each option appends the correspond option for both http and grpc options.
// only one will be used in creating the actual client.
type Option func(*transportOptions) error
func withURL(url string) Option {
return func(o *transportOptions) error {
o.httpOptions = append(o.httpOptions, otlptracehttp.WithEndpointURL(url))
o.grpcOptions = append(o.grpcOptions, otlptracegrpc.WithEndpointURL(url))
return nil
}
}
func withSecure(secure bool) Option {
return func(o *transportOptions) error {
if secure {
tlsCreds := credentials.NewClientTLSFromCert(nil, "")
// note: you do not need to specify the tls creds for http, this happens automatically when https:// is used as the protocol scheme.
o.grpcOptions = append(o.grpcOptions, otlptracegrpc.WithTLSCredentials(tlsCreds))
tlsConfig := &tls.Config{
MinVersion: tls.VersionTLS12,
// RootCAs is nil, which means the default system root CAs are used
}
o.httpOptions = append(o.httpOptions, otlptracehttp.WithTLSClientConfig(tlsConfig))
} else {
o.httpOptions = append(o.httpOptions, otlptracehttp.WithInsecure())
o.grpcOptions = append(o.grpcOptions, otlptracegrpc.WithInsecure())
}
return nil
}
}
func withHeaders(headers string) Option {
return func(o *transportOptions) error {
realHeaders := headersToMap(headers)
o.httpOptions = append(o.httpOptions, otlptracehttp.WithHeaders(realHeaders))
o.grpcOptions = append(o.grpcOptions, otlptracegrpc.WithHeaders(realHeaders))
return nil
}
}
func headersToMap(input string) map[string]string {
// Initialize the map
result := make(map[string]string)
// Split the input string by comma to get key=value pairs
pairs := strings.Split(input, ",")
// Iterate over each pair
for _, pair := range pairs {
// Split each pair by '=' to get the key and value
kv := strings.SplitN(pair, "=", 2)
if len(kv) == 2 {
key := kv[0]
value := kv[1]
// Add the key and value to the map
result[key] = value
}
}
return result
}
// transportFromString converts a string to a transport type.
// Defaults to http if the string is not recognized.
func transportFromString(transport string) otlpTransportType {
switch strings.ToLower(transport) {
case otlpTransportHTTP.String():
return otlpTransportHTTP
case otlpTransportGRPC.String():
return otlpTransportGRPC
}
// will be unknown since we use iota+1
// (see uber's go stye guide for details)
return otlpTransportType(0)
}
const (
defaultMaxQueueSize = 1000000
defaultMaxExportBatch = 2000
)