Bnei-Baruch/mdb

View on GitHub
importer/sources/sources.go

Summary

Maintainability
D
2 days
Test Coverage
package sources

import (
    "database/sql"
    "fmt"
    "os"
    "strconv"
    "strings"
    "time"

    log "github.com/Sirupsen/logrus"
    "github.com/pkg/errors"
    "github.com/spf13/viper"
    "github.com/volatiletech/null/v8"
    "github.com/volatiletech/sqlboiler/v4/boil"
    "github.com/volatiletech/sqlboiler/v4/queries/qm"

    "github.com/Bnei-Baruch/mdb/common"
    "github.com/Bnei-Baruch/mdb/models"
    "github.com/Bnei-Baruch/mdb/utils"
)

const (
    BASE_PATH        = "importer/sources/data/Sources - "
    AUTHORS_FILE     = BASE_PATH + "Authors.csv"
    COLLECTIONS_FILE = BASE_PATH + "Collections.csv"
)

var (
    LANGS = [7]string{
        common.LANG_ENGLISH,
        common.LANG_HEBREW,
        common.LANG_RUSSIAN,
        common.LANG_GERMAN,
        common.LANG_SPANISH,
        common.LANG_TURKISH,
        common.LANG_UKRAINIAN,
    }
)

func ImportSources() {
    var err error
    clock := time.Now()

    log.SetFormatter(&log.TextFormatter{FullTimestamp: true})
    //log.SetLevel(log.WarnLevel)

    log.Info("Starting sources import")

    log.Info("Setting up connection to MDB")
    mdb, err := sql.Open("postgres", viper.GetString("mdb.url"))
    utils.Must(err)
    utils.Must(mdb.Ping())
    defer mdb.Close()
    boil.SetDB(mdb)
    //boil.DebugMode = true

    utils.Must(common.InitTypeRegistries(mdb))

    utils.Must(handleAuthors(mdb))
    utils.Must(handleCollections(mdb))

    log.Info("Success")
    log.Infof("Total run time: %s", time.Now().Sub(clock).String())
}

func handleAuthors(db *sql.DB) error {
    records, err := utils.ReadCSV(AUTHORS_FILE)
    if err != nil {
        return errors.Wrap(err, "Read authors")
    }

    h, err := utils.ParseCSVHeader(records[0])
    if err != nil {
        return errors.Wrap(err, "Bad header")
    }

    tx, err := db.Begin()
    if err != nil {
        return errors.Wrap(err, "Start transaction")
    }

    for _, x := range records[1:] {
        if err = doAuthor(tx, h, x); err != nil {
            break
        }
    }

    if err == nil {
        err = tx.Commit()
        if err != nil {
            return errors.Wrap(err, "Commit transaction")
        }
    } else {
        if ex := tx.Rollback(); ex != nil {
            return errors.Wrap(ex, "Rollback transaction")
        }
        return err
    }

    return nil
}

func doAuthor(exec boil.Executor, header map[string]int, record []string) error {
    // Get or create Author
    author, err := models.Authors(qm.Where("code = ?", record[header["code"]])).One(exec)
    if err != nil {
        if err == sql.ErrNoRows {
            // Create
            author = &models.Author{
                Code:     record[header["code"]],
                Name:     record[header["name"]],
                FullName: null.NewString(record[header["full name"]], record[header["full name"]] != ""),
            }
            err = author.Insert(exec, boil.Infer())
            if err != nil {
                return errors.Wrapf(err, "Insert author [%s]", record)
            }
        } else {
            return errors.Wrapf(err, "Lookup author in db [%s]", record)
        }
    } else {
        author.Name = record[header["name"]]
        author.FullName = null.NewString(record[header["full name"]], record[header["full name"]] != "")
        _, err = author.Update(exec, boil.Whitelist("name", "full_name"))
        if err != nil {
            return errors.Wrapf(err, "Update author [%d] [%s]", author.ID, record)
        }
    }

    // i18n
    for _, l := range LANGS {
        n := record[header[l+".name"]]
        fn := record[header[l+".full_name"]]
        if n == "" && fn == "" {
            continue
        }

        ai18n := models.AuthorI18n{
            AuthorID: author.ID,
            Language: l,
            Name:     null.NewString(n, n != ""),
            FullName: null.NewString(fn, fn != ""),
        }
        err = ai18n.Upsert(exec, true,
            []string{"author_id", "language"},
            boil.Whitelist("name", "full_name"),
            boil.Infer())
        if err != nil {
            return errors.Wrapf(err, "Upsert author i18n")
        }
    }

    return nil
}

func handleCollections(db *sql.DB) error {
    records, err := utils.ReadCSV(COLLECTIONS_FILE)
    if err != nil {
        return errors.Wrap(err, "Read collections")
    }

    h, err := utils.ParseCSVHeader(records[0])
    if err != nil {
        return errors.Wrap(err, "Bad header")
    }

    var tx *sql.Tx
    for _, x := range records[1:] {
        tx, err = db.Begin()
        if err != nil {
            return errors.Wrap(err, "Start transaction")
        }

        err = doCollection(tx, h, x)

        if err == nil {
            err = tx.Commit()
            if err != nil {
                err = errors.Wrap(err, "Commit transaction")
                break
            }
        } else {
            if ex := tx.Rollback(); ex != nil {
                err = errors.Wrap(ex, "Rollback transaction")
            }
            break
        }
    }

    return err
}

func doCollection(exec boil.Executor, header map[string]int, record []string) error {
    authorCode := record[header["author"]]
    if authorCode == "" {
        return errors.New("Empty author code")
    }
    name := record[header["name"]]
    if name == "" {
        return errors.New("Empty collection name")
    }
    pattern := record[header["pattern"]]
    log.Infof("Author: %s, Name: %s", authorCode, name)

    // Fetch author
    author, err := models.Authors(qm.Where("code = ?", authorCode)).One(exec)
    if err != nil {
        return errors.Wrapf(err, "Fetch author [%s]", authorCode)
    }

    // Get or create collection source
    collection, err := models.Sources(
        qm.InnerJoin("authors_sources x on x.source_id = sources.id and author_id = ?", author.ID),
        qm.Where("name = ? and parent_id is null", name)).
        One(exec)
    if err == nil {
        // update
        if collection.Pattern.Valid || pattern != "" {
            collection.Pattern = null.NewString(pattern, pattern != "")
            _, err = collection.Update(exec, boil.Whitelist("pattern"))
            if err != nil {
                return errors.Wrapf(err, "Update collection [%d]", collection.ID)
            }
        }
    } else {
        if err == sql.ErrNoRows {
            // create
            collection = &models.Source{
                UID:     utils.GenerateUID(8),
                Name:    name,
                Pattern: null.NewString(pattern, pattern != ""),
                TypeID:  common.SOURCE_TYPE_REGISTRY.ByName[common.SRC_COLLECTION].ID,
            }
            err = author.AddSources(exec, true, collection)
            if err != nil {
                return errors.Wrapf(err, "Create collection [%s %s]", authorCode, name)
            }
        } else {
            return errors.Wrapf(err, "Lookup collection in db [%s, %s]", authorCode, name)
        }
    }

    // i18n
    for _, l := range LANGS {
        n := record[header[l+".name"]]
        d := record[header[l+".description"]]
        if n == "" && d == "" {
            continue
        }

        si18n := models.SourceI18n{
            SourceID:    collection.ID,
            Language:    l,
            Name:        null.NewString(n, n != ""),
            Description: null.NewString(d, d != ""),
        }
        err = si18n.Upsert(exec, true,
            []string{"source_id", "language"},
            boil.Whitelist("name", "description"),
            boil.Infer())
        if err != nil {
            return errors.Wrapf(err, "Upsert collection i18n")
        }
    }

    // Content
    fn := fmt.Sprintf("%s%s-%s.csv",
        BASE_PATH,
        strings.ToLower(authorCode),
        strings.Replace(strings.ToLower(name), " ", "-", -1))

    records, err := utils.ReadCSV(fn)
    if err != nil {
        if os.IsNotExist(err) {
            log.Warnf("Input missing: %s", fn)
            return nil
        }
        return errors.Wrap(err, "Read collection contents")
    }

    h, err := utils.ParseCSVHeader(records[0])
    if err != nil {
        return errors.Wrap(err, "Bad header")
    }

    var parents = []*models.Source{collection}
    for i, x := range records[1:] {
        if utils.IsEmpty(x) {
            continue
        }

        xLevel := x[h["level"]]
        level, err := strconv.Atoi(xLevel)
        if err != nil {
            return errors.Wrapf(err, "Bad level at row %d: %s", i+1, xLevel)
        }

        xType := x[h["type"]]
        sType, ok := common.SOURCE_TYPE_REGISTRY.ByName[xType]
        if !ok {
            return errors.Errorf("Unknown source type at row %d: %s", i+1, xType)
        }

        name := x[h["name"]]
        if name == "" {
            return errors.Errorf("Missing name at row %d", i)
        }

        position := -1
        xPosition := x[h["position"]]
        if xPosition != "" {
            position, err = strconv.Atoi(xPosition)
            if err != nil {
                return errors.Wrapf(err, "Bad position: %s", xPosition)
            }
        }

        pattern = x[h["pattern"]]
        description := x[h["description"]]

        // Get or Create source
        parent := parents[level-1]
        source, err := models.Sources(
            qm.Where("type_id = ? and parent_id = ? and name = ?", sType.ID, parent.ID, name)).
            One(exec)
        if err == nil {
            // update
            source.Description = null.NewString(description, description != "")
            source.Pattern = null.NewString(pattern, pattern != "")
            source.Position = null.NewInt(position, position != -1)
            _, err = source.Update(exec, boil.Whitelist("description", "pattern", "position"))
            if err != nil {
                return errors.Wrapf(err, "Update source [%d %d %s]", sType.ID, parent.ID, name)
            }
        } else {
            if err == sql.ErrNoRows {
                // create
                source = &models.Source{
                    UID:         utils.GenerateUID(8),
                    TypeID:      sType.ID,
                    Name:        name,
                    Description: null.NewString(description, description != ""),
                    Pattern:     null.NewString(pattern, pattern != ""),
                    ParentID:    null.Int64From(parent.ID),
                    Position:    null.NewInt(position, position != -1),
                }
                err = source.Insert(exec, boil.Infer())
                if err != nil {
                    return errors.Wrapf(err, "Insert source [%s]", x)
                }
            } else {
                return errors.Wrapf(err, "Fetch source [%d %d %s]", sType.ID, parent.ID, name)
            }
        }

        // Source i18n
        for _, l := range LANGS {
            n := x[h[l+".name"]]
            d := x[h[l+".description"]]
            if n == "" && d == "" {
                continue
            }

            si18n := models.SourceI18n{
                SourceID:    source.ID,
                Language:    l,
                Name:        null.NewString(n, n != ""),
                Description: null.NewString(d, d != ""),
            }
            err = si18n.Upsert(exec, true,
                []string{"source_id", "language"},
                boil.Whitelist("name", "description"),
                boil.Infer())
            if err != nil {
                return errors.Wrapf(err, "Upsert source [%d] i18n [%s]", source.ID, l)
            }
        }

        if level == len(parents) {
            parents = append(parents, source)
        } else {
            parents[level] = source
        }
    }

    return nil
}