Bnei-Baruch/mdb

View on GitHub
batch/kitei_makor.go

Summary

Maintainability
C
1 day
Test Coverage
package batch

import (
    "database/sql"
    "encoding/json"
    "fmt"
    "strconv"
    "strings"
    "time"

    log "github.com/Sirupsen/logrus"
    "github.com/pkg/errors"
    "github.com/spf13/viper"
    "github.com/volatiletech/sqlboiler/v4/boil"
    "github.com/volatiletech/sqlboiler/v4/queries"
    "github.com/volatiletech/sqlboiler/v4/queries/qm"

    "github.com/Bnei-Baruch/mdb/api"
    "github.com/Bnei-Baruch/mdb/common"
    "github.com/Bnei-Baruch/mdb/models"
    "github.com/Bnei-Baruch/mdb/utils"
)

func OrganizeKiteiMakor() {
    var err error
    clock := time.Now()

    log.SetFormatter(&log.TextFormatter{FullTimestamp: true})
    //log.SetLevel(log.WarnLevel)

    log.Info("Starting to organize kitei makor")

    log.Info("Setting up connection to MDB")
    mdb, err = sql.Open("postgres", viper.GetString("mdb.url"))
    utils.Must(err)
    utils.Must(mdb.Ping())
    defer mdb.Close()
    boil.SetDB(mdb)
    //boil.DebugMode = true

    log.Info("Initializing static data from MDB")
    utils.Must(common.InitTypeRegistries(mdb))

    log.Info("Loading kitei-makor files")
    ktFiles, err := models.Files(
        qm.Where("name ~ ?", "kitei-makor"),
        qm.Load("ContentUnit")).
        All(mdb)
    utils.Must(err)
    log.Infof("Got %d files", len(ktFiles))

    utils.Must(doOrganizeKiteiMakor(ktFiles))

    log.Info("Success")
    log.Infof("Total run time: %s", time.Now().Sub(clock).String())
}

func doOrganizeKiteiMakor(ktFiles []*models.File) error {
    cuByID := make(map[int64]*models.ContentUnit)
    cuMap := make(map[int64][]*models.File)
    noCU := make([]*models.File, 0)
    for i := range ktFiles {
        f := ktFiles[i]

        // skip sketches with buggy file name
        if f.Type == "image" {
            continue
        }

        if f.ContentUnitID.Valid {
            cuID := f.ContentUnitID.Int64
            cuByID[cuID] = f.R.ContentUnit
            v := cuMap[cuID]
            if v == nil {
                v = make([]*models.File, 0)
            }
            v = append(v, f)
            cuMap[cuID] = v
        } else {
            noCU = append(noCU, f)
        }
    }

    log.Infof("Here comes %d noCU", len(noCU))
    for i := range noCU {
        f := noCU[i]
        log.Infof("%d %s", f.ID, f.Name)
    }

    log.Infof("len(cuMap) %d", len(cuMap))
    for k, v := range cuMap {
        cu := cuByID[k]
        log.Infof("CU [%d] type_id %d has %d kitei-makor files", k, cu.TypeID, len(v))
        //for i := range v {
        //    f := v[i]
        //    log.Infof("%d %s", f.ID, f.Name)
        //}

        // we need to keep a state where main CU --derives--> KITEI_MAKOR CU
        // so follow link from either end.
        // 1. if KT CU doesn't exist then create it and link it
        // 2. move all 'kitei-makor' files into KT CU
        if common.CONTENT_TYPE_REGISTRY.ByID[cu.TypeID].Name == common.CT_KITEI_MAKOR {
            err := cu.L.LoadDerivedContentUnitDerivations(mdb, true, cu, nil)
            if err != nil {
                return errors.Wrapf(err, "Load Source CUs for %d", cu.ID)
            }

            if len(cu.R.DerivedContentUnitDerivations) == 0 {
                log.Infof("KT CU has no source: %d", cu.ID)
            } else if len(cu.R.DerivedContentUnitDerivations) > 1 {
                log.Warnf("KT CU %d has too many source CU %d", cu.ID, len(cu.R.DerivedContentUnitDerivations))
            } else {
                cud := cu.R.DerivedContentUnitDerivations[0]
                if vv, ok := cuMap[cud.SourceID]; ok {
                    log.Infof("KT CU %d has Source CU %d with %d KT files", cu.ID, cud.SourceID, len(vv))
                } else {
                    log.Infof("KT CU %d, Source CU %d has no KT files", cu.ID, cud.SourceID)
                }
            }
        } else {
            err := cu.L.LoadSourceContentUnitDerivations(mdb, true, cu, nil)
            if err != nil {
                return errors.Wrapf(err, "Load Derived CUs for %d", cu.ID)
            }

            if len(cu.R.SourceContentUnitDerivations) == 0 {
                log.Infof("Main CU has no derived CUs: %d", cu.ID)

                tx, err := mdb.Begin()
                utils.Must(err)

                // create and associate KT CU and move KT files there
                ktCU, err := createKTCU(tx, cu, v)
                if err != nil {
                    utils.Must(tx.Rollback())
                    return errors.Wrapf(err, "mainCU %d", cu.ID)
                }

                // move files from main cu to kt cu
                err = moveKTFiles(tx, v, ktCU.ID)
                if err != nil {
                    utils.Must(tx.Rollback())
                    return errors.Wrapf(err, "mainCU %d", cu.ID)
                }

                utils.Must(tx.Commit())

            } else if len(cu.R.SourceContentUnitDerivations) > 1 {
                log.Warnf("Main CU %d has too many derived CUs %d", cu.ID, len(cu.R.SourceContentUnitDerivations))
            } else {
                cud := cu.R.SourceContentUnitDerivations[0]
                if vv, ok := cuMap[cud.DerivedID]; ok {
                    log.Infof("Main CU %d has KT CU %d with %d files", cu.ID, cud.DerivedID, len(vv))
                } else {
                    log.Infof("Main CU %d, KT CU %d has no files", cu.ID, cud.DerivedID)
                }

                tx, err := mdb.Begin()
                utils.Must(err)

                // move files from main cu to kt cu
                err = moveKTFiles(tx, v, cud.DerivedID)
                if err != nil {
                    utils.Must(tx.Rollback())
                    return errors.Wrapf(err, "mainCU %d", cu.ID)
                }

                utils.Must(tx.Commit())
            }
        }
    }

    return nil
}

func createKTCU(exec boil.Executor, mainCU *models.ContentUnit, ktFiles []*models.File) (*models.ContentUnit, error) {
    log.Infof("Create KT CU for %d", mainCU.ID)

    var props map[string]interface{}
    if !mainCU.Properties.Valid {
        return nil, errors.Errorf("mainCU invalid props %d", mainCU.ID)
    }

    err := json.Unmarshal(mainCU.Properties.JSON, &props)
    if err != nil {
        return nil, errors.Wrapf(err, "json.Unmarshal mainCU properties %d", mainCU.ID)
    }
    delete(props, "kmedia_id")

    // calculate duration if possible
    var sum float64
    var count int
    for i := range ktFiles {
        f := ktFiles[i]
        if f.Properties.Valid {
            var fProps map[string]interface{}
            err := json.Unmarshal(f.Properties.JSON, &fProps)
            if err != nil {
                return nil, errors.Wrapf(err, "json.Unmarshal file properties %d", f.ID)
            }

            if d, ok := fProps["duration"]; ok {
                if d.(float64) > 0 {
                    sum += d.(float64)
                    count += 1
                }
            }
        }
    }
    if sum > 0 {
        props["duration"] = int64(sum / float64(count))
        log.Infof("mainCU %d duration: (%f/%d)=%d", mainCU.ID, sum, count, props["duration"])
    } else {
        delete(props, "duration")
        log.Infof("mainCU %d duration invalid", mainCU.ID)
    }

    ktCU, err := api.CreateContentUnit(exec, common.CT_KITEI_MAKOR, props)
    if err != nil {
        return nil, errors.Wrap(err, "Create KT CU")
    }

    ktCU.Published = true
    _, err = ktCU.Update(exec, boil.Whitelist("published"))
    if err != nil {
        return nil, errors.Wrapf(err, "Update KT CU published %d", ktCU.ID)
    }

    cud := &models.ContentUnitDerivation{
        SourceID: mainCU.ID,
        Name:     common.CT_KITEI_MAKOR,
    }
    err = ktCU.AddDerivedContentUnitDerivations(exec, true, cud)
    if err != nil {
        return nil, errors.Wrap(err, "Save CUD in DB")
    }

    return ktCU, nil
}

func moveKTFiles(exec boil.Executor, ktFiles []*models.File, cuID int64) error {
    log.Infof("Moving %d files to %d", len(ktFiles), cuID)

    ids := make([]string, len(ktFiles))
    for i := range ktFiles {
        ids[i] = strconv.Itoa(int(ktFiles[i].ID))
    }

    _, err := queries.Raw(
        fmt.Sprintf("UPDATE files SET content_unit_id=%d WHERE id IN (%s)", cuID, strings.Join(ids, ","))).
        Exec(exec)
    if err != nil {
        return errors.Wrap(err, "move KT files")
    }

    return nil
}