jkawamoto/roadie

View on GitHub
cloud/storage.go

Summary

Maintainability
B
5 hrs
Test Coverage
//
// cloud/storage.go
//
// Copyright (c) 2016-2017 Junpei Kawamoto
//
// This file is part of Roadie.
//
// Roadie is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Roadie is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Roadie.  If not, see <http://www.gnu.org/licenses/>.
//

package cloud

import (
    "bufio"
    "context"
    "fmt"
    "io"
    "io/ioutil"
    "log"
    "net/url"
    "os"
    "path"
    "path/filepath"
    "strings"

    "golang.org/x/sync/errgroup"

    pb "gopkg.in/cheggaaa/pb.v1"

    "github.com/ulikunitz/xz"
)

// StorageManager defines methods which a storage service provider must provides.
// Each method takes a URL to point a file stored in this storage.
// The URL should be
// - roadie://category/path
// where category is one of
// - script.SourcePrefix
// - script.DataPrefix
// - script.ResultPrefix
type StorageManager interface {

    // Upload a given stream to a given URL.
    Upload(ctx context.Context, loc *url.URL, in io.Reader) error

    // Download a file pointed by a given URL and write it to a given stream.
    Download(ctx context.Context, loc *url.URL, out io.Writer) error

    // GetFileInfo retrieves information of a file pointed by a given URL.
    GetFileInfo(ctx context.Context, loc *url.URL) (*FileInfo, error)

    // List up files of which URLs start with a given URL.
    // It takes a handler; information of found files are sent to it.
    List(ctx context.Context, loc *url.URL, handler FileInfoHandler) error

    // Delete a file pointed by a given URL.
    Delete(ctx context.Context, loc *url.URL) error
}

// FileInfoHandler is a handler to receive a file info.
type FileInfoHandler func(*FileInfo) error

// Storage provides APIs to access a cloud storage.
type Storage struct {
    // Servicer.
    service StorageManager
    // TODO: Delete or update this.
    // Writer logs to be printed.
    Log io.Writer
}

// NewStorage creates a cloud storage accessor with a given context.
func NewStorage(servicer StorageManager, log io.Writer) (s *Storage) {

    if log == nil {
        log = ioutil.Discard
    }

    s = &Storage{
        service: servicer,
        Log:     log,
    }
    return

}

// UploadFile uploads a file where a given URL points.
func (s *Storage) UploadFile(ctx context.Context, loc *url.URL, input string) (err error) {

    file, err := os.Open(input)
    if err != nil {
        return
    }
    defer file.Close()

    info, err := file.Stat()
    if err != nil {
        return
    }

    fmt.Fprintln(s.Log, "Uploading...")
    bar := pb.New64(int64(info.Size())).SetUnits(pb.U_BYTES).Prefix(path.Base(loc.Path))
    bar.Output = s.Log
    bar.AlwaysUpdate = true
    bar.Start()
    defer bar.Finish()

    return s.service.Upload(ctx, loc, bar.NewProxyReader(bufio.NewReader(file)))

}

// ListupFiles lists up files which location is matching to a given URL.
// Information of found files will be passed to a handler.
// If the handler returns non nil value, the listing up will be canceled.
// In this case, this function also returns the given error value.
func (s *Storage) ListupFiles(ctx context.Context, loc *url.URL, handler FileInfoHandler) (err error) {

    return s.service.List(ctx, loc, handler)

}

// DownloadFiles downloads files matching a given prefix and queries.
// Downloaded files will be put in a given directory.
func (s *Storage) DownloadFiles(ctx context.Context, prefix *url.URL, dir string, queries []string) (err error) {

    var info os.FileInfo
    if info, err = os.Stat(dir); err != nil {
        // Given dir does not exist.
        if err = os.MkdirAll(dir, 0777); err != nil {
            return
        }
    } else {
        if !info.IsDir() {
            return fmt.Errorf("Cannot create the directory tree: %s", dir)
        }
    }

    fmt.Fprintln(s.Log, "")
    pool, err := pb.StartPool()
    if err != nil {
        log.Println("cannot create a progress bar:", err.Error())
    } else {
        pool.Output = s.Log
        defer pool.Stop()
    }

    ctx, cancel := context.WithCancel(ctx)
    defer cancel()

    eg, ctx := errgroup.WithContext(ctx)
    err = s.ListupFiles(ctx, prefix, func(info *FileInfo) error {

        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
        }

        // If Name is empty, it might be a folder or a special file.
        if info.Name == "" {
            return nil
        }

        if match(queries, info.Name) {

            bar := pb.New64(int64(info.Size)).SetUnits(pb.U_BYTES).Prefix(info.Name)
            if pool != nil {
                pool.Add(bar)
            }

            eg.Go(func() error {
                var goerr error
                filename := filepath.Join(dir, info.Name)
                f, goerr := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
                if goerr != nil {
                    bar.FinishPrint(fmt.Sprintf("Cannot create file %s (%s)", filename, goerr.Error()))
                    return goerr
                }
                defer f.Close()

                writer := bufio.NewWriter(f)
                defer writer.Flush()

                goerr = s.service.Download(ctx, info.URL, io.MultiWriter(writer, bar))
                if goerr != nil {
                    bar.FinishPrint(fmt.Sprintf("Cannot download %s (%s)", info.Name, goerr.Error()))
                } else {
                    bar.Finish()
                }
                return goerr
            })

        }
        return nil
    })

    if err != nil {
        return
    }
    return eg.Wait()

}

// DeleteFiles deletes files matching a given URL prefix and queries.
func (s *Storage) DeleteFiles(ctx context.Context, prefix *url.URL, queries []string) (err error) {

    ctx, cancel := context.WithCancel(ctx)
    defer cancel()

    eg, ctx := errgroup.WithContext(ctx)
    err = s.ListupFiles(ctx, prefix, func(info *FileInfo) error {

        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
        }

        // If Name is empty, it might be a folder or a special file.
        if info.Name == "" {
            return nil
        }

        if match(queries, info.Name) {

            eg.Go(func() (err error) {

                err = s.service.Delete(ctx, info.URL)
                if err != nil {
                    fmt.Fprintf(s.Log, "Cannot delete %v: %v\n", info.URL, err.Error())
                } else {
                    fmt.Fprintln(s.Log, info.URL)
                }
                return
            })

        }
        return nil

    })

    if err != nil {
        return
    }
    return eg.Wait()

}

// PrintFileBody prints file bodies which has a prefix and satisfies query.
// If header is ture, additional messages well be printed.
func (s *Storage) PrintFileBody(ctx context.Context, prefix *url.URL, query string, output io.Writer, header bool) error {

    return s.ListupFiles(ctx, prefix, func(info *FileInfo) error {

        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
        }

        if info.Name != "" && strings.HasPrefix(info.Name, query) {

            if header {
                fmt.Fprintf(output, "*** %s ***\n", info.Name)
            }

            if strings.HasSuffix(info.Name, ".xz") {
                pipeReader, pipeWriter := io.Pipe()
                defer pipeWriter.Close()

                xzReader, err := xz.NewReader(pipeReader)
                if err != nil {
                    return err
                }

                go io.Copy(output, xzReader)
                output = pipeWriter
            }

            return s.service.Download(ctx, info.URL, output)
        }

        return nil

    })

}

// match returns true if there are at least one pattern matching to name.
func match(patterns []string, name string) bool {
    for _, pat := range patterns {
        if res, _ := filepath.Match(pat, name); res {
            return true
        }
    }
    return false
}