Bnei-Baruch/mdb

View on GitHub
importer/kmedia/mixed_lessons.go

Summary

Maintainability
B
5 hrs
Test Coverage
package kmedia

import (
    "database/sql"
    "encoding/json"
    "fmt"
    "sort"
    "strings"
    "sync"
    "time"

    "github.com/Bnei-Baruch/mdb/common"

    log "github.com/Sirupsen/logrus"
    "github.com/emirpasic/gods/maps/treemap"
    godsutil "github.com/emirpasic/gods/utils"
    "github.com/pkg/errors"
    "github.com/volatiletech/sqlboiler/v4/queries/qm"

    "github.com/Bnei-Baruch/mdb/importer/kmedia/kmodels"
    "github.com/Bnei-Baruch/mdb/models"
    "github.com/Bnei-Baruch/mdb/utils"
)

// 4728    lessons-part
// 3630    lessons-part/lesson_preparation
// 3629    lessons-part/lesson_first-part
// 3631    lessons-part/lesson_second-part
// 3632    lessons-part/lesson_third-part
// 4020    lessons-part/lesson_fourth-part
// 4541    lessons-part/lesson_fifth-part
// 4841    lessons-part/lesson_six-part
// 4862    lessons-part/lesson_sixth-part
var LessonPartsCatalogs = map[int]int{
    4728: -1,
    3630: 0,
    3629: 1,
    3631: 2,
    3632: 3,
    4020: 4,
    4541: 5,
    4841: 6,
    4862: 6,
}

var cuMapByCNID map[int]*models.ContentUnit

func ImportMixedLessons() {
    clock := Init()

    var err error
    var cnMap map[int]*kmodels.Container

    log.Info("Loading all mixed lessons containers")
    cnMap, cuMapByCNID, err = loadContainersInCatalogsAndCUs(
        11, 6932, 6933, 4772, 4541, 3629, 4020, 4700, 3630, 3631, 4841, 4862, 4728, 4016, 4761, 3632)
    utils.Must(err)
    log.Infof("Got %d containers", len(cnMap))

    // Process lessons
    stats = NewImportStatistics()

    log.Info("Setting up workers")
    jobs := make(chan *kmodels.Container, 100)
    reconcileJobs := make(chan *kmodels.Container, 100)
    var workersWG sync.WaitGroup
    for w := 1; w <= 5; w++ {
        workersWG.Add(1)
        go mixedLessonWorker(jobs, reconcileJobs, &workersWG)
    }

    log.Infof("Setting up lesson reconciler")
    var reconcilerWG sync.WaitGroup
    reconcilerWG.Add(1)
    go lessonReconciler(reconcileJobs, &reconcilerWG)

    log.Info("Queueing work")
    for _, v := range cnMap {
        jobs <- v
    }

    log.Info("Closing jobs channel")
    close(jobs)

    log.Info("Waiting for workers to finish")
    workersWG.Wait()

    log.Info("Closing reconciliation jobs channel")
    close(reconcileJobs)

    log.Info("Waiting for reconciler to finish")
    reconcilerWG.Wait()

    stats.dump()

    Shutdown()
    log.Info("Success")
    log.Infof("Total run time: %s", time.Now().Sub(clock).String())
}

func mixedLessonWorker(jobs <-chan *kmodels.Container, reconcileJobs chan *kmodels.Container, wg *sync.WaitGroup) {
    for cn := range jobs {
        stats.ContainersProcessed.Inc(1)

        if unit, ok := cuMapByCNID[cn.ID]; ok {
            if err := analyzeExistingMixedContainer(cn, unit); err != nil {
                log.Errorf("Analyze existing %d: %s", cn.ID, err.Error())
            }
        } else {
            if shouldReconcile, err := analyzeNewMixedContainer(cn); err != nil {
                log.Errorf("Analyze new %d: %s", cn.ID, err.Error())
            } else if shouldReconcile {
                reconcileJobs <- cn
            }
        }
    }

    wg.Done()
}

func analyzeExistingMixedContainer(cn *kmodels.Container, cu *models.ContentUnit) error {
    stats.ContentUnitsUpdated.Inc(1)
    return nil
}

func analyzeNewMixedContainer(cn *kmodels.Container) (bool, error) {
    stats.ContentUnitsCreated.Inc(1)
    //log.Infof("New container\t%d\t%s", cn.ID, cn.Name.String)

    if cn.VirtualLessonID.Valid && cn.VirtualLessonID.Int != 0 {
        log.Warnf("New container %d %s has virtual_lesson_id %d", cn.ID, cn.Name.String, cn.VirtualLessonID.Int)

        c, err := models.Collections(
            qm.Where("(properties->>'kmedia_id')::int = ?", cn.VirtualLessonID.Int)).
            One(mdb)
        if err != nil {
            if err == sql.ErrNoRows {
                log.Warnf("No collection with kmedia_id %d", cn.VirtualLessonID.Int)
            } else {
                return false, errors.Wrapf(err, "Load Collection kmedia_id %d", cn.VirtualLessonID.Int)
            }
        } else {
            log.Warnf("Collection %d missing unit container_id %d", c.ID, cn.ID)
        }

        return false, nil
    }

    return cn.ContentTypeID.Int == 4, nil

    //err := cn.L.LoadCatalogs(kmdb, true, cn)
    //if err != nil {
    //    return false, errors.Wrapf(err, "Load catalogs cn.ID %d", cn.ID)
    //}
    //
    //isLessonPart := false
    //for i := range cn.R.Catalogs {
    //    c := cn.R.Catalogs[i]
    //    if _, ok := LessonPartsCatalogs[c.ID]; ok {
    //        isLessonPart = true
    //        break
    //    }
    //}
    //
    //if !isLessonPart {
    //    log.Infof("New non-lesson-part container\t%d\t%s", cn.ID, cn.Name.String)
    //}
    //
    //return isLessonPart, nil
}

func lessonReconciler(jobs <-chan *kmodels.Container, wg *sync.WaitGroup) {
    containers := make([]*kmodels.Container, 0)

    skipped := 0
    for cn := range jobs {
        //log.Infof("Reconcile %d\t%s\t%s", cn.ID, cn.Filmdate.Time.Format("2006-01-02"), cn.Name.String)

        skip := !cn.Filmdate.Valid ||
            strings.HasPrefix(cn.Name.String, "maamar_zohoraim") ||
            strings.HasPrefix(cn.Name.String, "ML_Maamar_zohoraim") ||
            strings.HasSuffix(cn.Name.String, "mzohoraim_bb") ||
            strings.HasPrefix(cn.Name.String, "UlpanIvrit") ||
            !strings.Contains(cn.Name.String, cn.Filmdate.Time.Format("2006-01-02"))

        if skip {
            skipped++
        } else {
            containers = append(containers, cn)
        }
    }

    log.Infof("%d containers needs reconciliation", len(containers))
    log.Infof("%d skipped", skipped)

    sort.Slice(containers, func(i, j int) bool {
        a := containers[i]
        b := containers[j]

        if a.Filmdate.Time.Before(b.Filmdate.Time) {
            return true
        } else if b.Filmdate.Time.Before(a.Filmdate.Time) {
            return false
        }

        return a.Name.String < b.Name.String
    })

    byDate := treemap.NewWith(godsutil.TimeComparator)
    for i := range containers {
        cn := containers[i]

        k := cn.Filmdate.Time
        v, ok := byDate.Get(k)
        if !ok {
            v = make([]*kmodels.Container, 0)
        }
        byDate.Put(k, append(v.([]*kmodels.Container), cn))
    }

    byDate.Each(func(k, v interface{}) {
        filmDate := k.(time.Time).Format("2006-01-02")
        log.Infof("%s\t%d", filmDate, len(v.([]*kmodels.Container)))
        for _, cn := range v.([]*kmodels.Container) {
            log.Infof("%d\t%s", cn.ID, cn.Name.String)
        }

        cus, err := models.ContentUnits(
            qm.Where("type_id = ?", 1),
            qm.Where("properties->>'film_date' = ?", filmDate),
            qm.Load("ContentUnitI18ns"),
        ).
            All(mdb)
        if err != nil {
            log.Errorf("Load CUs in %s: %s", filmDate, err.Error())
            return
        }
        log.Infof("\t\tCUs in %s [%d]", filmDate, len(cus))
        for i := range cus {
            cu := cus[i]

            name := ""
            for i := range cu.R.ContentUnitI18ns {
                i18n := cu.R.ContentUnitI18ns[i]
                if i18n.Language == common.LANG_HEBREW {
                    name = utils.Reverse(i18n.Name.String)
                }
            }

            if cu.Properties.Valid {
                var props map[string]interface{}
                if err := json.Unmarshal(cu.Properties.JSON, &props); err != nil {
                    log.Errorf("json.Unmarshal CU properties %d", cu.ID)
                } else {
                    if kmid, ok := props["kmedia_id"]; ok {
                        name = fmt.Sprintf("%s\t%d", name, int(kmid.(float64)))
                    }
                }
            }

            log.Infof("\t\t\t%d\t%s", cu.ID, name)
        }
    })

    wg.Done()
}