cloud/gcp/instance.go
//
// cloud/gcp/instance.go
//
// Copyright (c) 2016-2017 Junpei Kawamoto
//
// This file is part of Roadie.
//
// Roadie is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Roadie is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Roadie. If not, see <http://www.gnu.org/licenses/>.
//
package gcp
import (
"context"
"fmt"
"io/ioutil"
"log"
"net/http"
"sort"
"strings"
"time"
"golang.org/x/oauth2/google"
"github.com/jkawamoto/roadie/cloud"
"github.com/jkawamoto/roadie/script"
"google.golang.org/api/compute/v1"
)
const (
// SourceImage defines the ID of the source image to be used for instance.
SourceImage = "projects/coreos-cloud/global/images/coreos-stable-1298-7-0-v20170401"
)
var (
// RoadieSchemeURLOffset defines an offset value to remove scheme name from
// URLs.
RoadieSchemeURLOffset = len(script.RoadieSchemePrefix)
)
// ComputeService implements cloud.InstanceManager based on Google Cloud
// Platform.
type ComputeService struct {
Config *Config
Logger *log.Logger
SleepTime time.Duration
}
// NewComputeService creates a new compute service client.
func NewComputeService(cfg *Config, logger *log.Logger) *ComputeService {
if logger == nil {
logger = log.New(ioutil.Discard, "", log.LstdFlags)
}
return &ComputeService{
Config: cfg,
Logger: logger,
SleepTime: 10 * time.Second,
}
}
// newService creates a new service under a given context.
func (s *ComputeService) newService(ctx context.Context) (*compute.Service, error) {
// Create a client.
var client *http.Client
var err error
if s.Config.Token == nil || s.Config.Token.AccessToken == "" {
client, err = google.DefaultClient(ctx, gcpScope)
if err != nil {
return nil, err
}
} else {
cfg := NewAuthorizationConfig(0)
client = cfg.Client(ctx, s.Config.Token)
}
// Create a servicer.
return compute.New(client)
}
// AvailableRegions returns a slice of region information.
func (s *ComputeService) AvailableRegions(ctx context.Context) (regions []cloud.Region, err error) {
s.Logger.Println("Retrieving available regions")
service, err := s.newService(ctx)
if err != nil {
return
}
res, err := service.Zones.List(s.Config.Project).Do()
if err != nil {
return
}
regions = make([]cloud.Region, len(res.Items))
for i, v := range res.Items {
regions[i] = cloud.Region{
Name: v.Name,
Status: v.Status,
}
}
s.Logger.Println("Finished retrieving available regions")
return
}
// AvailableMachineTypes returns a slice of machie type names.
func (s *ComputeService) AvailableMachineTypes(ctx context.Context) (types []cloud.MachineType, err error) {
s.Logger.Println("Retrieving available machine types")
service, err := s.newService(ctx)
if err != nil {
return
}
res, err := service.MachineTypes.List(s.Config.Project, s.Config.Zone).Do()
if err != nil {
return
}
types = make([]cloud.MachineType, len(res.Items))
for i, v := range res.Items {
types[i] = cloud.MachineType{
Name: v.Name,
Description: v.Description,
}
}
s.Logger.Println("Finished retrieving available machine types")
return
}
// CreateInstance creates a new instance based on the builder's configuration.
func (s *ComputeService) CreateInstance(ctx context.Context, task *script.Script) (err error) {
if task.Image == "" {
task.Image = DefaultBaseImage
}
// Create an ignition config.
fluentd, err := FluentdUnit(task.Name)
if err != nil {
return
}
options := ""
if s.Config.NoShutdown {
options += "--no-shutdown"
}
roadie, err := RoadieUnit(task.Name, task.Image, options)
if err != nil {
return
}
logcast, err := LogcastUnit("roadie.service")
if err != nil {
return
}
ignition := NewIgnitionConfig().Append(fluentd).Append(roadie).Append(logcast).String()
s.Logger.Println("Ignition configuration is", ignition)
// Update URLs of which scheme is `roadie://` to `gs://`.
ReplaceURLScheme(s.Config, task)
s.Logger.Printf("Updated script file is \n%v\n", task.String())
scriptStr := task.String()
err = s.createInstance(ctx, task.Name, []*compute.MetadataItems{
&compute.MetadataItems{
Key: "script",
Value: &scriptStr,
},
&compute.MetadataItems{
Key: "user-data",
Value: &ignition,
},
})
if err != nil {
return
}
s.Logger.Println("Finished creating instance", task.Name)
return
}
// DeleteInstance deletes a given named instance.
func (s *ComputeService) DeleteInstance(ctx context.Context, name string) (err error) {
s.Logger.Println("Deleting instance", name)
service, err := s.newService(ctx)
if err != nil {
return
}
res, err := service.Instances.Delete(s.Config.Project, s.Config.Zone, name).Do()
if err == nil {
s.Logger.Println("Finished deleting instance")
if res.StatusMessage != "" {
s.Logger.Println("*", res.StatusMessage)
}
for _, v := range res.Warnings {
s.Logger.Println("*", v.Message)
}
}
return
}
// Instances is different from instances and sends names of instances which are
// not working for any queue into the given handler.
func (s *ComputeService) Instances(ctx context.Context, handler cloud.InstanceHandler) (err error) {
return s.instances(ctx, func(name, status string) error {
if strings.HasPrefix(name, "queue-") {
return nil
}
return handler(name, status)
})
}
// instances sends names of instances with their status into the given handler.
func (s *ComputeService) instances(ctx context.Context, handler cloud.InstanceHandler) (err error) {
s.Logger.Println("Retrieving running and terminated instances")
// key: instance name, value: true if the instance is still running.
instances := make(map[string]bool)
log := NewLogManager(s.Config, s.Logger)
err = log.OperationLogEntries(ctx, time.Time{}, func(_ time.Time, payload *ActivityPayload) error {
switch payload.EventSubtype {
case LogEventSubtypeInsert:
instances[payload.Resource.Name] = true
case LogEventSubtypeDelete:
instances[payload.Resource.Name] = false
}
return nil
})
if err != nil {
return
}
var instanceNames []string
for name := range instances {
instanceNames = append(instanceNames, name)
}
sort.Strings(instanceNames)
for _, name := range instanceNames {
if instances[name] {
err = handler(name, StatusRunning)
} else {
err = handler(name, StatusTerminated)
}
if err != nil {
return
}
}
s.Logger.Println("Finished retrieving instances")
return
}
// CreateInstance creates a new instance based on the builder's configuration.
func (s *ComputeService) createInstance(ctx context.Context, instanceName string, matadataItems []*compute.MetadataItems) (err error) {
s.Logger.Println("Creating instance", instanceName)
service, err := s.newService(ctx)
if err != nil {
return
}
blueprint := compute.Instance{
Name: strings.ToLower(instanceName),
Zone: s.Config.normalizedZone(),
MachineType: s.Config.normalizedMachineType(),
Disks: []*compute.AttachedDisk{
&compute.AttachedDisk{
Type: "PERSISTENT",
Boot: true,
Mode: "READ_WRITE",
AutoDelete: true,
InitializeParams: &compute.AttachedDiskInitializeParams{
SourceImage: SourceImage,
DiskType: s.Config.diskType(),
DiskSizeGb: s.Config.DiskSize,
},
},
},
CanIpForward: false,
NetworkInterfaces: []*compute.NetworkInterface{
&compute.NetworkInterface{
Network: s.Config.network(),
AccessConfigs: []*compute.AccessConfig{
&compute.AccessConfig{
Name: "External NAT",
Type: "ONE_TO_ONE_NAT",
},
},
},
},
Scheduling: &compute.Scheduling{
Preemptible: false,
OnHostMaintenance: "MIGRATE",
AutomaticRestart: truePtr,
},
ServiceAccounts: []*compute.ServiceAccount{
&compute.ServiceAccount{
Email: "default",
Scopes: []string{
"https://www.googleapis.com/auth/cloud-platform",
},
},
},
Metadata: &compute.Metadata{
Items: matadataItems,
},
}
res, err := service.Instances.Insert(s.Config.Project, s.Config.Zone, &blueprint).Do()
if err != nil {
return
}
if res.StatusMessage != "" {
s.Logger.Println(res.StatusMessage)
}
for _, v := range res.Warnings {
s.Logger.Println("*", v.Message)
}
log := NewLogManager(s.Config, s.Logger)
from := time.Now().In(time.UTC)
// Token for specifying a target instance has been started.
instanceStarted := fmt.Errorf("Target instance has been started.")
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(s.SleepTime):
}
err := log.OperationLogEntries(ctx, from, func(t time.Time, payload *ActivityPayload) (err error) {
if payload.EventSubtype == LogEventSubtypeInsert && payload.Resource.Name == blueprint.Name {
return instanceStarted
}
from = t
return
})
switch err {
case instanceStarted:
return nil
case nil:
continue
default:
return err
}
}
}