cloud/gcp/log_manager.go
//
// cloud/gcp/log_manager.go
//
// Copyright (c) 2016-2017 Junpei Kawamoto
//
// This file is part of Roadie.
//
// Roadie is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Roadie is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Roadie. If not, see <http://www.gnu.org/licenses/>.
//
package gcp
import (
"context"
"fmt"
"io"
"io/ioutil"
"log"
"strings"
"time"
"github.com/jkawamoto/roadie/cloud"
"cloud.google.com/go/logging/apiv2"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
loggingpb "google.golang.org/genproto/googleapis/logging/v2"
)
// LogManager implements cloud.LogManager interface.
// It requests logs to google cloud logging service.
type LogManager struct {
// Config is a reference for a configuration of GCP.
Config *Config
Logger *log.Logger
SleepTime time.Duration
}
// EntryHandler is a function type to handler Entries.
type EntryHandler func(*loggingpb.LogEntry) error
// RoadiePayloadHandler is a function type to handle RoadiePayloads.
type RoadiePayloadHandler func(time.Time, string) error
// ActivityPayloadHandler is a function type to handle ActivityPayloads.
type ActivityPayloadHandler func(time.Time, *ActivityPayload) error
// NewLogManager creates a new log manager.
func NewLogManager(cfg *Config, logger *log.Logger) (m *LogManager) {
if logger == nil {
logger = log.New(ioutil.Discard, "", log.LstdFlags)
}
return &LogManager{
Config: cfg,
Logger: logger,
SleepTime: 30 * time.Second,
}
}
// Get requests log entries of the given named instance.
func (s *LogManager) Get(ctx context.Context, instanceName string, from time.Time, handler cloud.LogHandler) (err error) {
// Determine when the newest instance starts.
err = s.OperationLogEntries(ctx, from, func(timestamp time.Time, payload *ActivityPayload) (err error) {
if payload.Resource.Name == instanceName {
if payload.EventSubtype == LogEventSubtypeInsert {
from = timestamp
}
}
return
})
if err != nil {
return
}
// Request log entries.
return s.InstanceLogEntries(ctx, instanceName, from, func(timestamp time.Time, payload string) (err error) {
if payload == instanceName {
return io.EOF
}
return handler(timestamp, payload, false)
})
}
// Delete doesn't do anything.
func (s *LogManager) Delete(ctx context.Context, instanceName string) error {
// In GCP, it is not necessary to delete log entries because old entries will
// be deleted automatically.
return nil
}
// GetQueueLog retrieves log entries from a queue.
func (s *LogManager) GetQueueLog(ctx context.Context, queue string, handler cloud.LogHandler) (err error) {
return s.InstanceLogEntries(ctx, queueLogKey(queue), time.Time{}, func(timestamp time.Time, payload string) (err error) {
return handler(timestamp, strings.TrimPrefix(payload, "task-"), false)
})
}
// GetTaskLog retrieves log entries for a task in a queue.
func (s *LogManager) GetTaskLog(ctx context.Context, queue, task string, handler cloud.LogHandler) (err error) {
prefix := fmt.Sprintf("task-%v:", task)
return s.InstanceLogEntries(ctx, queueLogKey(queue), time.Time{}, func(timestamp time.Time, payload string) (err error) {
if strings.HasPrefix(payload, prefix) {
return handler(timestamp, strings.TrimPrefix(payload, prefix), false)
}
return
})
}
// Entries get log entries matching with a given filter from given project logs.
// Found log entries will be passed a given handler one by one.
// If the handler returns non-nil value as an error, this function will end.
func (s *LogManager) Entries(ctx context.Context, filter string, handler EntryHandler) (err error) {
s.Logger.Println("Retrieving log:", filter)
var client *logging.Client
if s.Config.Token == nil || s.Config.Token.AccessToken == "" {
// If any token is not given, use a default client.
client, err = logging.NewClient(ctx)
} else {
cfg := NewAuthorizationConfig(0)
client, err = logging.NewClient(ctx, option.WithTokenSource(cfg.TokenSource(ctx, s.Config.Token)))
}
if err != nil {
return
}
defer client.Close()
iter := client.ListLogEntries(ctx, &loggingpb.ListLogEntriesRequest{
ResourceNames: []string{
fmt.Sprintf("projects/%v", s.Config.Project),
},
Filter: filter,
})
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
e, err := iter.Next()
if err == iterator.Done {
break
} else if err != nil {
return err
}
if err := handler(e); err != nil {
return err
}
}
s.Logger.Println("Finished retrieving log")
return
}
// InstanceLogEntries requests log entries of a given instance.
// Obtained log entries will be passed a given handler entry by entry.
// If the handler returns non nil value, obtaining log entries is canceled immediately.
func (s *LogManager) InstanceLogEntries(ctx context.Context, instanceName string, from time.Time, handler RoadiePayloadHandler) error {
// Instead of logName, which is specified TAG env in roadie-gcp,
// use instance name to distinguish instances. This update makes all logs
// will have same log name, docker, so that such log can be stored into
// GCS easily.
filter := fmt.Sprintf(
`resource.type = "gce_instance" AND jsonPayload.instance_name = "%s" AND timestamp > "%s"`,
instanceName, from.In(time.UTC).Format(LogTimeFormat))
return s.Entries(ctx, filter, func(entry *loggingpb.LogEntry) error {
payload := entry.GetJsonPayload()
if value, ok := payload.GetFields()["MESSAGE"]; !ok {
return nil
} else if msg := value.GetStringValue(); msg == "" {
return nil
} else {
ts := entry.GetTimestamp()
return handler(time.Unix(ts.Seconds, int64(ts.Nanos)).In(time.Local), msg)
}
})
}
// OperationLogEntries requests log entries about google cloud platform operations.
// Obtained log entries will be passed a given handler entry by entry.
// If the handler returns non nil value, obtaining log entries is canceled immediately.
func (s *LogManager) OperationLogEntries(ctx context.Context, from time.Time, handler ActivityPayloadHandler) error {
filter := fmt.Sprintf(
`jsonPayload.event_type = "GCE_OPERATION_DONE" AND timestamp > "%s"`,
from.In(time.UTC).Format(LogTimeFormat))
return s.Entries(ctx, filter, func(entry *loggingpb.LogEntry) error {
payload, err := NewActivityPayload(entry.GetJsonPayload())
if err != nil {
return err
}
ts := entry.GetTimestamp()
return handler(time.Unix(ts.Seconds, int64(ts.Nanos)).In(time.Local), payload)
})
}