cloud/azure/batch.go
//
// cloud/azure/batch.go
//
// Copyright (c) 2016-2017 Junpei Kawamoto
//
// This file is part of Roadie.
//
// Roadie is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Roadie is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Roadie. If not, see <http://www.gnu.org/licenses/>.
//
// This source file is associated with Azure's Batch API of which Swagger's
// clients are stored in `batch` directory.
package azure
import (
"bufio"
"context"
"crypto/hmac"
"crypto/sha256"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"net/url"
"os"
"path/filepath"
"regexp"
"sort"
"strings"
"time"
"golang.org/x/net/context/ctxhttp"
arm_storage "github.com/Azure/azure-sdk-for-go/arm/storage"
"github.com/Azure/azure-sdk-for-go/storage"
httptransport "github.com/go-openapi/runtime/client"
"github.com/go-openapi/strfmt"
"github.com/jkawamoto/roadie/cloud"
"github.com/jkawamoto/roadie/cloud/azure/batch/client"
"github.com/jkawamoto/roadie/cloud/azure/batch/client/accounts"
"github.com/jkawamoto/roadie/cloud/azure/batch/client/compute_nodes"
"github.com/jkawamoto/roadie/cloud/azure/batch/client/jobs"
"github.com/jkawamoto/roadie/cloud/azure/batch/client/pools"
"github.com/jkawamoto/roadie/cloud/azure/batch/client/tasks"
"github.com/jkawamoto/roadie/cloud/azure/batch/models"
"github.com/jkawamoto/roadie/script"
)
const (
// BatchAPIVersion defines API version of batch service.
BatchAPIVersion = "2016-07-01.3.1"
// RoadieAzureArchiveName is an archive name of roarie-azure command.
RoadieAzureArchiveName = "roadie-azure_linux_amd64.tar.gz"
// RoadieAzureVersion is the version of the job manager program.
RoadieAzureVersion = "v0.3.5"
)
var (
// RoadieAzureURL is a template of URL where the job manager program is published.
RoadieAzureURL = fmt.Sprintf("https://github.com/jkawamoto/roadie-azure/releases/download/%v/%v", RoadieAzureVersion, RoadieAzureArchiveName)
)
// MinimalJSONProducer is a provider which marshals message bodies as a JSON
// object and remove null values from it.
type MinimalJSONProducer struct {
regexp *regexp.Regexp
blank []byte
}
// NewMinimalJSONProducer creates a new MinimalJSONProducer.
func NewMinimalJSONProducer() *MinimalJSONProducer {
return &MinimalJSONProducer{
regexp: regexp.MustCompile("(\"[^\"]+?\":null,?|,\"[^\"]+\":null)"),
blank: []byte(""),
}
}
// Produce creates a message body from a given request message.
func (p *MinimalJSONProducer) Produce(out io.Writer, msg interface{}) (err error) {
data, err := json.Marshal(msg)
if err != nil {
return
}
data = p.regexp.ReplaceAllLiteral(data, p.blank)
_, err = out.Write(data)
return
}
// AuthorizedTransporter is a transporter which adds authentication information
// to each request before transporting it.
type AuthorizedTransporter struct {
http.RoundTripper
account string
key []byte
}
// NewAuthorizedTransporter creates a new authorized transporter with a given
// account name and shared key.
func NewAuthorizedTransporter(transport http.RoundTripper, account string, key []byte) *AuthorizedTransporter {
return &AuthorizedTransporter{
RoundTripper: transport,
account: account,
key: key,
}
}
// RoundTrip computes a shared key from a given request and add Authorization
// header to the request.
func (t *AuthorizedTransporter) RoundTrip(req *http.Request) (*http.Response, error) {
var ocpHeaderKeys []string
for key := range req.Header {
if strings.HasPrefix(key, "Ocp-") || strings.HasPrefix(key, "ocp-") {
ocpHeaderKeys = append(ocpHeaderKeys, key)
}
}
sort.Strings(ocpHeaderKeys)
var ocpHeaders []string
for _, key := range ocpHeaderKeys {
ocpHeaders = append(ocpHeaders, fmt.Sprintf("%s:%s", strings.ToLower(key), req.Header.Get(key)))
}
canonicalizedHeaders := strings.Join(ocpHeaders, "\n")
canonicalizedResource := "/" + t.account + req.URL.Path
var queryKeys []string
for key := range req.URL.Query() {
queryKeys = append(queryKeys, key)
}
sort.Strings(queryKeys)
for _, key := range queryKeys {
v, _ := url.QueryUnescape(req.URL.Query().Get(key))
canonicalizedResource += "\n" + strings.ToLower(key) + ":" + v
}
stringToSign := strings.Join([]string{
strings.ToUpper(req.Method),
req.Header.Get("Content-Encoding"),
req.Header.Get("Content-Language"),
getContentLength(req),
req.Header.Get("Content-MD5"),
req.Header.Get("Content-Type"),
req.Header.Get("Date"),
req.Header.Get("If-Modified-Since"),
req.Header.Get("If-Match"),
req.Header.Get("If-None-Match"),
req.Header.Get("If-Unmodified-Since"),
req.Header.Get("Range"),
canonicalizedHeaders,
canonicalizedResource,
}, "\n")
if apiAccessDebugMode {
fmt.Println("StringToSign:", stringToSign)
}
hash := hmac.New(sha256.New, t.key)
hash.Write([]byte(stringToSign))
signature := base64.StdEncoding.EncodeToString(hash.Sum(nil))
req.Header.Add("Authorization", fmt.Sprintf("SharedKey %v:%v", t.account, signature))
return t.RoundTripper.RoundTrip(req)
}
// getContentLength returns a string representing the context length of a given
// request for calculating a shared key associated with the request.
// If the request doesn't have Content-Length header, it returns an empty
// string.
func getContentLength(r *http.Request) string {
if r.ContentLength <= 0 {
return ""
}
return fmt.Sprintf("%v", r.ContentLength)
}
// BatchService provides an interface for Azure's batch service.
type BatchService struct {
client *client.BatchService
storage *StorageService
gmt *time.Location
Config *Config
Logger *log.Logger
SleepTime time.Duration
}
// JobSet represents a set of jobs.
type JobSet map[string]*models.CloudJob
// TaskSet represents a set of tasks.
type TaskSet map[string]*models.CloudTask
// NewBatchService creates a new batch service interface assosiated with
// a given config; to authorize a authentication token
// is required.
func NewBatchService(ctx context.Context, cfg *Config, logger *log.Logger) (service *BatchService, err error) {
if logger == nil {
logger = log.New(ioutil.Discard, "", log.LstdFlags)
}
// Create a resource group if not exist.
err = CreateResourceGroupIfNotExist(ctx, cfg, logger)
if err != nil {
return
}
// Create a management client.
manager, err := newBatchAccountManager(ctx, cfg, logger)
if err != nil {
return
}
accounts, err := manager.accounts(ctx)
if err != nil {
return
}
var exist bool
for _, a := range accounts {
if *a.Name == cfg.AccountName && *a.Location == cfg.Location {
exist = true
break
}
}
if !exist {
// Check Storage ID.
storage := newStorageAccountManager(cfg, logger)
var storageInfo arm_storage.Account
err = storage.createIfNotExists(ctx)
if err != nil {
return
}
storageInfo, err = storage.getStorageAccountInfo()
if err != nil {
return
}
err = manager.create(ctx, *storageInfo.ID)
if err != nil {
return
}
}
key, err := manager.getKey(ctx)
if err != nil {
return
}
// Create a service client.
scli := client.NewHTTPClient(strfmt.NewFormats())
switch transport := scli.Transport.(type) {
case *httptransport.Runtime:
// Update the host to {account-name}.{region-id}.batch.azure.com
transport.Host = fmt.Sprintf("%v.%v.batch.azure.com", cfg.AccountName, cfg.Location)
transport.Debug = apiAccessDebugMode
transport.Transport = NewAuthorizedTransporter(transport.Transport, cfg.AccountName, key)
transport.Producers["application/json; odata=minimalmetadata"] = NewMinimalJSONProducer()
scli.Accounts.SetTransport(transport)
scli.Jobs.SetTransport(transport)
scli.Pools.SetTransport(transport)
}
// Create a storage service client.
storage, err := NewStorageService(ctx, cfg, logger)
if err != nil {
return
}
gmt, _ := time.LoadLocation("GMT")
service = &BatchService{
client: scli,
storage: storage,
gmt: gmt,
Config: cfg,
Logger: logger,
SleepTime: DefaultSleepTime,
}
return
}
// Nodes retrieves information of compute nodes in a given named pool.
func (s *BatchService) Nodes(ctx context.Context, pool string) (nodes []*models.ComputeNode, err error) {
s.Logger.Println("Retrieving compute nodes in pool", pool)
res, err := s.client.ComputeNodes.ComputeNodeList(
compute_nodes.NewComputeNodeListParamsWithContext(ctx).
WithAPIVersion(BatchAPIVersion).
WithClientRequestID(toPtr(ClientID)).
WithPoolID(pool).
WithOcpDate(s.getOcpDate()))
if err != nil {
return
}
nodes = res.Payload.Value
s.Logger.Println("Finished retrieving compute nodes in pool", pool)
return
}
// GetPoolInfo retrieves information of a given named pool.
func (s *BatchService) GetPoolInfo(ctx context.Context, name string) (info *models.CloudPool, err error) {
s.Logger.Println("Retrieving information of pool", name)
res, err := s.client.Pools.PoolGet(
pools.NewPoolGetParamsWithContext(ctx).
WithAPIVersion(BatchAPIVersion).
WithClientRequestID(toPtr(ClientID)).
WithPoolID(name).
WithOcpDate(s.getOcpDate()))
if err != nil {
return
}
info = res.Payload
s.Logger.Println("Finished retrieving information of pool", name)
return
}
// UpdatePoolSize requests updating the size of the given named pool to size.
// Note that: resizing pool size is an asynchronous operation.
func (s *BatchService) UpdatePoolSize(ctx context.Context, name string, size int32) (err error) {
s.Logger.Println("Updating the size of pool", name)
_, err = s.client.Pools.PoolResize(
pools.NewPoolResizeParamsWithContext(ctx).
WithAPIVersion(BatchAPIVersion).
WithClientRequestID(toPtr(ClientID)).
WithPoolID(name).
WithOcpDate(s.getOcpDate()).
WithPoolResizeParameter(&models.PoolResizeParameter{
TargetDedicated: &size,
}))
if err != nil {
return
}
s.Logger.Println("The update request is accepted")
return
}
// CreateJob creates a job which has a given name.
func (s *BatchService) CreateJob(ctx context.Context, name string) (err error) {
// 1. Check metadata, if error returns, it means no app exists, then upload.
// 2. If version metadata is old or snapshot, upload new version.
// 3. otherwise create url and use it.
var execURL string
metadata, err := s.storage.GetMetadata(ctx, BinContainer, RoadieAzureArchiveName)
if err == nil && metadata["version"] == RoadieAzureVersion {
execURL = s.storage.getFileURL(BinContainer, RoadieAzureArchiveName)
} else {
s.Logger.Println("Job management program is not found")
var fp *os.File
// From GitHub.
var res *http.Response
res, err = ctxhttp.Get(ctx, nil, RoadieAzureURL)
if err != nil {
return
}
fp, err = ioutil.TempFile("", "")
if err != nil {
return
}
defer os.Remove(fp.Name())
defer fp.Close()
_, err = io.Copy(fp, bufio.NewReader(res.Body))
if err != nil {
return
}
_, err = fp.Seek(0, 0)
if err != nil {
return
}
// From a local snapshot.
// fp, err = os.Open(filepath.Join(os.Getenv("GOPATH"), "src/github.com/jkawamoto/roadie-azure/pkg/v0.3.4/roadie-azure_linux_amd64.tar.gz"))
// if err != nil {
// return
// }
// defer fp.Close()
err = s.storage.UploadWithMetadata(ctx, BinContainer, RoadieAzureArchiveName, bufio.NewReader(fp), &storage.BlobProperties{
ContentType: "application/tar+gzip",
}, map[string]string{
"version": RoadieAzureVersion,
})
if err != nil {
return
}
execURL = s.storage.getFileURL(BinContainer, RoadieAzureArchiveName)
s.Logger.Println("Job management program is uploaded at", execURL)
}
// Upload the config file.
configFilename := fmt.Sprintf("%v%v-init.cfg", name, time.Now().Unix())
configString, err := s.Config.String()
if err != nil {
return
}
err = s.storage.UploadWithMetadata(ctx, StartupContainer, configFilename, strings.NewReader(configString), &storage.BlobProperties{
ContentType: "text/yaml",
}, nil)
if err != nil {
return
}
configURL := s.storage.getFileURL(StartupContainer, configFilename)
s.Logger.Println("Creating job", name)
_, err = s.client.Jobs.JobAdd(
jobs.NewJobAddParamsWithContext(ctx).
WithAPIVersion(BatchAPIVersion).
WithClientRequestID(toPtr(ClientID)).
WithOcpDate(s.getOcpDate()).
WithJob(&models.JobAddParameter{
ID: &name,
PoolInfo: &models.PoolInformation{
AutoPoolSpecification: &models.AutoPoolSpecification{
AutoPoolIDPrefix: name,
KeepAlive: false,
Pool: &models.PoolSpecification{
VMSize: &s.Config.MachineType,
VirtualMachineConfiguration: &models.VirtualMachineConfiguration{
ImageReference: &models.ImageReference{
Publisher: &s.Config.OS.PublisherName,
Offer: &s.Config.OS.Offer,
Sku: &s.Config.OS.Skus,
Version: s.Config.OS.Version,
},
NodeAgentSKUID: toPtr("batch.node.ubuntu 16.04"),
},
TargetDedicated: 1,
},
PoolLifetimeOption: toPtr(models.AutoPoolSpecificationPoolLifetimeOptionJob),
},
},
// For debugging, comment out the following section.
JobPreparationTask: &models.JobPreparationTask{
CommandLine: toPtr(fmt.Sprintf(
`sh -c "tar -zxvf %v -C ${AZ_BATCH_NODE_SHARED_DIR} --strip-components=1 && sudo -n -E ${AZ_BATCH_NODE_SHARED_DIR}/roadie-azure init %v %v"`,
RoadieAzureArchiveName, configFilename, name)),
ResourceFiles: []*models.ResourceFile{
&models.ResourceFile{
BlobSource: &execURL,
FilePath: toPtr(RoadieAzureArchiveName),
},
&models.ResourceFile{
BlobSource: &configURL,
FilePath: &configFilename,
},
},
RunElevated: true,
},
OnAllTasksComplete: models.CloudJobOnAllTasksCompleteTerminateJob,
OnTaskFailure: models.CloudJobOnTaskFailurePerformExitOptionsJobAction,
}))
if err != nil {
err = NewAPIError(err)
} else {
var set JobSet
for {
if set, err = s.Jobs(ctx); err != nil {
break
} else if _, exist := set[name]; exist {
s.Logger.Println("Created job", name)
// For debugging: uncomment the following section.
// s.client.Tasks.TaskAdd(
// tasks.NewTaskAddParamsWithContext(ctx).
// WithAPIVersion(BatchAPIVersion).
// WithClientRequestID(toPtr(ClientID)).
// WithJobID(name).
// WithOcpDate(s.getOcpDate()).
// WithTask(&models.TaskAddParameter{
// ID: toPtr("init"),
// CommandLine: toPtr(fmt.Sprintf(
// `sh -c "tar -zxvf %v -C ${AZ_BATCH_NODE_SHARED_DIR} --strip-components=1 && sudo -n -E ${AZ_BATCH_NODE_SHARED_DIR}/roadie-azure init %v %v"`,
// RoadieAzureArchiveName, configFilename, name)),
// ResourceFiles: []*models.ResourceFile{
// &models.ResourceFile{
// BlobSource: &execURL,
// FilePath: toPtr(RoadieAzureArchiveName),
// },
// &models.ResourceFile{
// BlobSource: &configURL,
// FilePath: &configFilename,
// },
// },
// RunElevated: true,
// }))
return
}
select {
case <-ctx.Done():
err = ctx.Err()
break
case <-time.After(s.SleepTime):
}
}
}
s.Logger.Println("Cannot create a job:", err.Error())
return
}
// EnableJob starts a given named job which was stopped.
func (s *BatchService) EnableJob(ctx context.Context, name string) (err error) {
s.Logger.Println("Enabling job", name)
_, err = s.client.Jobs.JobEnable(
jobs.NewJobEnableParamsWithContext(ctx).
WithAPIVersion(BatchAPIVersion).
WithClientRequestID(toPtr(ClientID)).
WithJobID(name).
WithOcpDate(s.getOcpDate()))
if err != nil {
err = NewAPIError(err)
s.Logger.Println("Cannot enable job", name, ":", err.Error())
} else {
s.Logger.Println("Enabled job", name)
}
return
}
// DisableJob stops a given named job.
func (s *BatchService) DisableJob(ctx context.Context, name string) (err error) {
s.Logger.Println("Disabling job", name)
_, err = s.client.Jobs.JobDisable(
jobs.NewJobDisableParamsWithContext(ctx).
WithAPIVersion(BatchAPIVersion).
WithClientRequestID(toPtr(ClientID)).
WithJobID(name).
WithOcpDate(s.getOcpDate()).
WithJobDisableParameter(&models.JobDisableParameter{
DisableTasks: toPtr(models.JobDisableParameterDisableTasksRequeue),
}))
if err != nil {
err = NewAPIError(err)
s.Logger.Println("Cannot disable job", name, ":", err.Error())
} else {
s.Logger.Println("Disabled job", name)
}
return
}
// Jobs retrieves a set of jobs defined in the batch account specified in
// the configuration.
func (s *BatchService) Jobs(ctx context.Context) (set JobSet, err error) {
s.Logger.Println("Retriving jobs")
res, err := s.client.Jobs.JobList(
jobs.NewJobListParamsWithContext(ctx).
WithAPIVersion(BatchAPIVersion).
WithClientRequestID(toPtr(ClientID)).
WithOcpDate(s.getOcpDate()))
if err != nil {
err = NewAPIError(err)
s.Logger.Println("Cannot retrieve jobs:", err.Error())
return
}
set = make(JobSet)
for _, v := range res.Payload.Value {
set[v.ID] = v
}
s.Logger.Println("Retrieved jobs")
return
}
// DeleteJob deletes a given named job.
func (s *BatchService) DeleteJob(ctx context.Context, name string) (err error) {
s.Logger.Println("Deleting job", name)
_, err = s.client.Jobs.JobDelete(
jobs.NewJobDeleteParamsWithContext(ctx).
WithAPIVersion(BatchAPIVersion).
WithClientRequestID(toPtr(ClientID)).
WithOcpDate(s.getOcpDate()).
WithJobID(name))
if err != nil {
err = NewAPIError(err)
} else {
var set JobSet
for {
if set, err = s.Jobs(ctx); err != nil {
break
} else if _, exist := set[name]; !exist {
s.Logger.Println("Deleted job", name)
return
}
select {
case <-ctx.Done():
err = ctx.Err()
break
case <-time.After(s.SleepTime):
}
}
}
s.Logger.Println("Cannot delete job", name, ":", err.Error())
return
}
// GetJobInfo retrives the information of the given named job.
func (s *BatchService) GetJobInfo(ctx context.Context, job string) (info *models.CloudJob, err error) {
s.Logger.Println("Retrieving information of job", job)
res, err := s.client.Jobs.JobGet(
jobs.NewJobGetParamsWithContext(ctx).
WithAPIVersion(BatchAPIVersion).
WithClientRequestID(toPtr(ClientID)).
WithJobID(job).
WithOcpDate(s.getOcpDate()))
if err != nil {
return
}
info = res.Payload
s.Logger.Println("Finished retrieving information of job", job)
return
}
// CreateTask adds a given task to a given named job.
func (s *BatchService) CreateTask(ctx context.Context, job string, task *script.Script) (err error) {
var resourceFiles []*models.ResourceFile
// Update source section.
// If the URL schema of the source file is `roadie`, remove the URL and put it
// as a resource file.
if strings.HasPrefix(task.Source, script.RoadieSchemePrefix) {
u, err2 := url.Parse(task.Source)
if err2 != nil {
return err2
}
filename := filepath.Base(u.Path)
task.Source = fmt.Sprintf("file://./%v", filename)
resourceFiles = append(resourceFiles, &models.ResourceFile{
BlobSource: toPtr(s.storage.getFileURL(u.Hostname(), u.Path)),
FilePath: &filename,
})
}
// Update data section.
var newData []string
for _, v := range task.Data {
if strings.HasPrefix(v, script.RoadieSchemePrefix) {
src, dest := parseRenamableURL(v)
u, err2 := url.Parse(src)
if err2 != nil {
return err2
}
resourceFiles = append(resourceFiles, &models.ResourceFile{
BlobSource: toPtr(s.storage.getFileURL(u.Hostname(), u.Path)),
FilePath: &dest,
})
} else {
newData = append(newData, v)
}
}
task.Data = newData
now := time.Now().Unix()
// Create a startup script and upload it.
startupFilename := fmt.Sprintf("%v%v.yml", task.Name, now)
err = s.storage.UploadWithMetadata(ctx, StartupContainer, startupFilename, strings.NewReader(task.String()), &storage.BlobProperties{
ContentType: "text/yaml",
}, nil)
if err != nil {
return
}
resourceFiles = append(resourceFiles, &models.ResourceFile{
BlobSource: toPtr(s.storage.getFileURL(StartupContainer, startupFilename)),
FilePath: &startupFilename,
})
// Upload the config file.
configFilename := fmt.Sprintf("%v%v.cfg", task.Name, now)
configString, err := s.Config.String()
if err != nil {
return
}
err = s.storage.UploadWithMetadata(ctx, StartupContainer, configFilename, strings.NewReader(configString), &storage.BlobProperties{
ContentType: "text/yaml",
}, nil)
if err != nil {
return
}
resourceFiles = append(resourceFiles, &models.ResourceFile{
BlobSource: toPtr(s.storage.getFileURL(StartupContainer, configFilename)),
FilePath: &configFilename,
})
// Create an instance.
s.Logger.Println("Creating a task in job", job)
_, err = s.client.Tasks.TaskAdd(
tasks.NewTaskAddParamsWithContext(ctx).
WithAPIVersion(BatchAPIVersion).
WithClientRequestID(toPtr(ClientID)).
WithJobID(job).
WithOcpDate(s.getOcpDate()).
WithTask(&models.TaskAddParameter{
ID: &task.Name,
CommandLine: toPtr(fmt.Sprintf(`sh -c "sudo -n -E ${AZ_BATCH_NODE_SHARED_DIR}/roadie-azure exec %v %v %v"`, configFilename, startupFilename, task.Name)),
ResourceFiles: resourceFiles,
RunElevated: true,
}))
if err != nil {
err = NewAPIError(err)
} else {
var set TaskSet
for {
if set, err = s.Tasks(ctx, job); err != nil {
break
} else if _, exist := set[task.Name]; exist {
s.Logger.Println("Created task", task.Name)
return
}
select {
case <-ctx.Done():
err = ctx.Err()
break
case <-time.After(s.SleepTime):
}
}
}
s.Logger.Println("Cannot create a task:", err.Error())
return
}
// Tasks retrieves tasks in a given named job.
func (s *BatchService) Tasks(ctx context.Context, job string) (set TaskSet, err error) {
s.Logger.Println("Retrieving tasks in job", job)
res, err := s.client.Tasks.TaskList(
tasks.NewTaskListParamsWithContext(ctx).
WithAPIVersion(BatchAPIVersion).
WithClientRequestID(toPtr(ClientID)).
WithJobID(job).
WithOcpDate(s.getOcpDate()))
if err != nil {
err = NewAPIError(err)
s.Logger.Println("Cannot retrieve tasks:", err)
return
}
set = make(TaskSet)
for _, v := range res.Payload.Value {
set[v.ID] = v
}
s.Logger.Println("Retrieved tasks in job", job)
return
}
// DeleteTask deletes a given named task from a given named job.
func (s *BatchService) DeleteTask(ctx context.Context, job, task string) (err error) {
// TODO: Delete related files, such as script, config, from the storage.
s.Logger.Println("Deleting task", task)
_, err = s.client.Tasks.TaskDelete(
tasks.NewTaskDeleteParamsWithContext(ctx).
WithAPIVersion(BatchAPIVersion).
WithClientRequestID(toPtr(ClientID)).
WithJobID(job).
WithOcpDate(s.getOcpDate()).
WithTaskID(task))
if err != nil {
err = NewAPIError(err)
} else {
var set TaskSet
for {
if set, err = s.Tasks(ctx, job); err != nil {
break
} else if _, exist := set[task]; !exist {
s.Logger.Println("Deleted task", task)
return
}
select {
case <-ctx.Done():
err = ctx.Err()
break
case <-time.After(s.SleepTime):
}
}
}
s.Logger.Println("Cannot delete task", task, ":", err.Error())
return
}
// AvailableMachineTypes returns a list of supported machine types.
// Batch supports all Azure VM sizes except STANDARD_A0 and those with premium
// storage (STANDARD_GS, STANDARD_DS, and STANDARD_DSV2 series).
func (s *BatchService) AvailableMachineTypes(ctx context.Context) (types []cloud.MachineType, err error) {
service, err := NewComputeService(ctx, s.Config, s.Logger)
if err != nil {
return
}
aux, err := service.AvailableMachineTypes(ctx)
if err != nil {
return
}
for _, v := range aux {
switch {
case v.Name == "Standard_A0":
// Not supported
case strings.HasPrefix(v.Name, "Standard_GS"):
// Not supported
case strings.HasPrefix(v.Name, "Standard_DS"):
// Not supported
default:
types = append(types, v)
}
}
return
}
// AvailableOSImages returns a list of available OS images.
func (s *BatchService) AvailableOSImages(ctx context.Context) (images []string, err error) {
s.Logger.Println("Retrieving available os images")
res, err := s.client.Accounts.AccountListNodeAgentSkus(
accounts.NewAccountListNodeAgentSkusParamsWithContext(ctx).
WithAPIVersion(BatchAPIVersion).
WithClientRequestID(toPtr(ClientID)).
WithOcpDate(s.getOcpDate()))
if err != nil {
err = NewAPIError(err)
s.Logger.Println("Cannot retrieve available os images")
return
}
for _, v := range res.Payload.Value {
for _, image := range v.VerifiedImageReferences {
images = append(
images,
fmt.Sprintf("%v:%v:%v:%v", *image.Publisher, *image.Offer, *image.Sku, image.Version))
}
}
return
}
// getOcpDate returns a pointer for the string representing current date and
// time in RFC 1123 format. This pointer will be used with WithOcpDate method.
func (s *BatchService) getOcpDate() *string {
now := time.Now().In(s.gmt).Format(time.RFC1123)
return &now
}