jkawamoto/roadie

View on GitHub
cloud/azure/storage.go

Summary

Maintainability
B
5 hrs
Test Coverage
//
// cloud/azure/storage.go
//
// Copyright (c) 2016-2017 Junpei Kawamoto
//
// This file is part of Roadie.
//
// Roadie is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Roadie is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Roadie.  If not, see <http://www.gnu.org/licenses/>.
//

package azure

import (
    "bufio"
    "context"
    "io"
    "io/ioutil"
    "log"
    "net/url"
    "path"
    "strings"
    "time"

    arm_storage "github.com/Azure/azure-sdk-for-go/arm/storage"
    "github.com/Azure/azure-sdk-for-go/storage"
    "github.com/Azure/go-autorest/autorest"
    "github.com/jkawamoto/roadie/cloud"
)

const (
    // DefaultAccessPolicyExpiryTime defines a default expiry time.
    DefaultAccessPolicyExpiryTime = 30 * 24 * time.Hour
)

// storageAccountManager provides functions to manage a storage account.
type storageAccountManager struct {
    // Configuration
    Config *Config
    // Logger
    Logger *log.Logger
    // client for Azure's account resource manager.
    client arm_storage.AccountsClient
}

// newStorageAccountManager creates a new account manager.
func newStorageAccountManager(cfg *Config, logger *log.Logger) *storageAccountManager {

    cli := arm_storage.NewAccountsClient(cfg.SubscriptionID)
    cli.Authorizer = autorest.NewBearerAuthorizer(&cfg.Token)
    return &storageAccountManager{
        Config: cfg,
        Logger: logger,
        client: cli,
    }

}

// createIfNotExists checks the associated storage account exists. If not, creates
// it.
func (s *storageAccountManager) createIfNotExists(ctx context.Context) (err error) {

    s.Logger.Printf("Checking storage account %q exists", s.Config.AccountName)
    accounts, err := s.client.List()
    if err != nil {
        return
    }

    var exist bool
    for _, a := range *accounts.Value {
        if *a.Name == s.Config.AccountName {
            exist = true
            break
        }
    }
    if !exist {
        s.Logger.Printf("Storage account %q doesn't exist and creating it", s.Config.AccountName)
        resCh, errCh := s.client.Create(s.Config.AccountName, s.Config.AccountName, arm_storage.AccountCreateParameters{
            Sku: &arm_storage.Sku{
                Name: arm_storage.StandardRAGRS,
                Tier: arm_storage.Standard,
            },
            Kind:     arm_storage.BlobStorage,
            Location: &s.Config.Location,
            AccountPropertiesCreateParameters: &arm_storage.AccountPropertiesCreateParameters{
                AccessTier: arm_storage.Hot,
            },
        }, ctx.Done())

        select {
        case _ = <-resCh:
        case err = <-errCh:
        case <-ctx.Done():
            err = ctx.Err()
        }

    }
    return

}

// getStorageAccountInfo retrieves information of the associated storage account.
func (s *storageAccountManager) getStorageAccountInfo() (arm_storage.Account, error) {

    return s.client.GetProperties(s.Config.AccountName, s.Config.AccountName)

}

// getStorageKey returns a storage access key. If the associated storage account
// doesn't have any access keys, this function will generate it.
func (s *storageAccountManager) getStorageKey(ctx context.Context) (key string, err error) {

    s.Logger.Printf("Obtaining an access key for storage %q", s.Config.AccountName)
    res, err := s.client.ListKeys(s.Config.AccountName, s.Config.AccountName)
    if err != nil {
        return
    }
    for len(*res.Keys) == 0 {
        select {
        case <-ctx.Done():
            err = ctx.Err()
            return
        default:
        }

        s.Logger.Printf("Access keys for %q don't exist and generating it", s.Config.AccountName)
        res, err = s.client.RegenerateKey(s.Config.AccountName, s.Config.AccountName, arm_storage.AccountRegenerateKeyParameters{
            KeyName: toPtr("key"),
        })
        if err != nil {
            return
        }
    }
    key = *(*res.Keys)[0].Value
    return

}

// delete deletes the associated storage account.
func (s *storageAccountManager) delete() (err error) {

    s.Logger.Printf("Deleting storage account %q", s.Config.AccountName)
    _, err = s.client.Delete(s.Config.AccountName, s.Config.AccountName)
    return

}

// StorageService provides an interface for Azure's storage management service.
type StorageService struct {
    // Client of blob storage service
    Client storage.BlobStorageClient
    // Configuration
    Config *Config
    // Logger
    Logger *log.Logger
    // AccessPolicyExpiryTime defines the expiry time for accessing containers.
    AccessPolicyExpiryTime time.Duration
}

// NewStorageService creates an interface of the storage service which has a
// given name and belongs to given subscription and location.
// If log.Logger logger is given, verbose mode is on and logging information will
// be written to the logger.
func NewStorageService(ctx context.Context, cfg *Config, logger *log.Logger) (s *StorageService, err error) {

    if logger == nil {
        logger = log.New(ioutil.Discard, "", log.LstdFlags)
    }
    // Create a resource group if not exist.
    err = CreateResourceGroupIfNotExist(ctx, cfg, logger)
    if err != nil {
        return
    }

    // Create a storage account manager and prepare the account.
    manager := newStorageAccountManager(cfg, logger)
    err = manager.createIfNotExists(ctx)
    if err != nil {
        return
    }
    key, err := manager.getStorageKey(ctx)
    if err != nil {
        return
    }

    // Create a blob storage client.
    cli, err := storage.NewBasicClient(cfg.AccountName, key)
    if err != nil {
        return
    }

    s = &StorageService{
        Client:                 cli.GetBlobService(),
        Config:                 cfg,
        Logger:                 logger,
        AccessPolicyExpiryTime: DefaultAccessPolicyExpiryTime,
    }
    return

}

// TODO: The following methods should support both roadie based URL and Azure based URL.

// Upload a given stream to a given location.
func (s *StorageService) Upload(ctx context.Context, loc *url.URL, in io.Reader) (err error) {

    s.Logger.Println("Creating a blob at", loc)
    filename := strings.TrimPrefix(loc.Path, "/")
    return s.UploadWithMetadata(ctx, loc.Hostname(), filename, in, nil, nil)

}

// UploadWithMetadata a given stream in a given container as a file named a given file name.
func (s *StorageService) UploadWithMetadata(
    ctx context.Context, container, filename string, in io.Reader, props *storage.BlobProperties, metadata storage.BlobMetadata) (err error) {

    // Check the target container exists.
    containerRef := s.Client.GetContainerReference(container)
    created, err := containerRef.CreateIfNotExists(&storage.CreateContainerOptions{
        Access: storage.ContainerAccessTypeBlob,
    })
    if err != nil {
        return
    }
    if created {
        err = containerRef.SetPermissions(storage.ContainerPermissions{
            AccessType: storage.ContainerAccessTypeBlob,
            AccessPolicies: []storage.ContainerAccessPolicy{
                storage.ContainerAccessPolicy{
                    ID:         "full-access",
                    StartTime:  time.Now(),
                    ExpiryTime: time.Now().Add(s.AccessPolicyExpiryTime),
                    CanRead:    true,
                    CanWrite:   true,
                    CanDelete:  true,
                },
            },
        }, nil)
        if err != nil {
            return
        }
    }

    s.Logger.Printf("Checking the uploading file %v exists\n", filename)
    exists, err := containerRef.GetBlobReference(filename).DeleteIfExists(nil)
    if err != nil {
        s.Logger.Println("Cannot check the existence of the uploading file")
        return
    } else if exists {
        s.Logger.Println("Old file has been deleted")
    }

    s.Logger.Println("Creating blob", filename)
    blob := containerRef.GetBlobReference(filename)
    err = blob.PutAppendBlob(nil)
    if err != nil {
        s.Logger.Println("Cannot create the blob", filename)
        return
    }
    if props != nil {
        blob.Properties = *props
        err = blob.SetProperties(nil)
        if err != nil {
            s.Logger.Println("Cannot set properties to blob", filename)
            return
        }
    }
    if metadata != nil {
        blob.Metadata = metadata
        err = blob.SetMetadata(nil)
        if err != nil {
            s.Logger.Println("Cannot set metadata to blob", filename)
            return
        }
    }
    s.Logger.Println("Created append blob", filename)

    s.Logger.Println("Uploading data")
    reader := bufio.NewReader(in)
    buf := make([]byte, 1024*1024*4)
    var size int
    for {
        size, err = reader.Read(buf)
        if err == io.EOF {
            err = nil
            break
        } else if err != nil {
            break
        } else if size > 0 {
            err = blob.AppendBlock(buf[0:size], nil)
            if err != nil {
                break
            }
        }
    }

    if err != nil {
        s.Logger.Println("Uploading data didn't finish")
    } else {
        s.Logger.Println("Finish uploading data")
    }
    return

}

// Download a file associated from a given location and write it to a given
// writer.
func (s *StorageService) Download(ctx context.Context, loc *url.URL, out io.Writer) (err error) {

    s.Logger.Println("Downloading a blob from", loc)
    filename := strings.TrimPrefix(loc.Path, "/")
    reader, err := s.Client.GetContainerReference(loc.Hostname()).GetBlobReference(filename).Get(nil)
    if err != nil {
        return
    }
    defer reader.Close()

    _, err = io.Copy(out, reader)
    s.Logger.Println("Downloaded blob", filename)
    return

}

// GetFileInfo gets information of file in a given location.
func (s *StorageService) GetFileInfo(ctx context.Context, loc *url.URL) (info *cloud.FileInfo, err error) {

    s.Logger.Println("Retrieving information of file in", loc)
    filename := strings.TrimPrefix(loc.Path, "/")
    blob := s.Client.GetContainerReference(loc.Hostname()).GetBlobReference(filename)
    err = blob.GetProperties(
        &storage.GetBlobPropertiesOptions{})
    if err != nil {
        return
    }

    info = &cloud.FileInfo{
        Name:        path.Base(filename),
        URL:         loc,
        TimeCreated: time.Time(blob.Properties.LastModified),
        Size:        blob.Properties.ContentLength,
    }

    s.Logger.Println("Retrieved information of file", filename)
    return

}

// GetMetadata retrives metadata of a given named file.
func (s *StorageService) GetMetadata(ctx context.Context, container, filename string) (metadata map[string]string, err error) {

    s.Logger.Println("Retrieving metadata of file", filename)
    blob := s.Client.GetContainerReference(container).GetBlobReference(filename)
    err = blob.GetMetadata(nil)
    if err != nil {
        s.Logger.Println("Get metadata of file", filename)
    }
    metadata = blob.Metadata
    return

}

// List up files matching a given prefix.
// It takes a handler; information of found files are sent to it.
func (s *StorageService) List(ctx context.Context, loc *url.URL, handler cloud.FileInfoHandler) (err error) {

    s.Logger.Println("Retrieving blobs matching", loc)
    prefix := strings.TrimPrefix(loc.Path, "/")
    res, err := s.Client.GetContainerReference(loc.Hostname()).ListBlobs(storage.ListBlobsParameters{
        Prefix: prefix,
    })
    if err != nil {
        switch e := err.(type) {
        case storage.AzureStorageServiceError:
            if e.StatusCode == 404 {
                s.Logger.Println("Finished retrieving blobs")
                err = nil
            }
        }
        return
    }

    for _, v := range res.Blobs {

        u := *loc
        u.Path = path.Join("/", v.Name)
        err = handler(&cloud.FileInfo{
            Name:        path.Base(v.Name),
            URL:         &u,
            TimeCreated: time.Time(v.Properties.LastModified),
            Size:        v.Properties.ContentLength,
        })
        if err != nil {
            return
        }

    }
    return

}

// Delete a file in a given location.
func (s *StorageService) Delete(ctx context.Context, loc *url.URL) (err error) {

    s.Logger.Println("Deleting a blob in", loc)
    filename := strings.TrimPrefix(loc.Path, "/")
    err = s.Client.GetContainerReference(loc.Hostname()).GetBlobReference(filename).Delete(nil)
    if err != nil {
        s.Logger.Println("Cannot delete blob", filename, ":", err.Error())
    } else {
        s.Logger.Println("Deleted blob", filename)
    }
    return

}

// getFileURL returns an Azure storage URL of a file identified by the given
// container name and file name.
// Note that, this URL shouldn't use out of this package. In other packages,
// URL must starts with `roadie://`.
func (s *StorageService) getFileURL(container, filename string) string {
    return s.Client.GetContainerReference(container).GetBlobReference(filename).GetURL()
}

// deleteAccount deletes the associated storage account.
func (s *StorageService) deleteAccount(ctx context.Context) (err error) {

    manager := newStorageAccountManager(s.Config, s.Logger)
    return manager.delete()

}