Bnei-Baruch/mdb

View on GitHub
importer/kmedia/common.go

Summary

Maintainability
F
4 days
Test Coverage
package kmedia

import (
    "crypto/sha1"
    "database/sql"
    "encoding/json"
    "fmt"
    "runtime/debug"
    "strconv"
    "strings"
    "time"

    log "github.com/Sirupsen/logrus"
    "github.com/pkg/errors"
    "github.com/spf13/viper"
    "github.com/volatiletech/null/v8"
    "github.com/volatiletech/sqlboiler/queries/qm"
    "github.com/volatiletech/sqlboiler/v4/boil"
    "github.com/volatiletech/sqlboiler/v4/queries"
    qm4 "github.com/volatiletech/sqlboiler/v4/queries/qm"

    "github.com/Bnei-Baruch/mdb/api"
    "github.com/Bnei-Baruch/mdb/common"
    "github.com/Bnei-Baruch/mdb/importer/kmedia/kmodels"
    "github.com/Bnei-Baruch/mdb/models"
    "github.com/Bnei-Baruch/mdb/utils"
)

const (
    CATALOGS_SOURCES_MAPPINGS_FILE = "importer/kmedia/data/Catalogs Sources Mappings - final.csv"
    CATALOGS_TAGS_MAPPINGS_FILE    = "importer/kmedia/data/catalogs_tags.csv"
)

var (
    mdb                     *sql.DB
    kmdb                    *sql.DB
    stats                   *ImportStatistics
    serverUrls              map[string]string
    catalogsSourcesMappings map[int]*models.Source
    catalogsTagsMappings    map[int]*models.Tag
)

func Init() time.Time {
    var err error
    clock := time.Now()

    log.SetFormatter(&log.TextFormatter{FullTimestamp: true})
    //log.SetLevel(log.WarnLevel)

    log.Info("Starting Kmedia migration")

    log.Info("Setting up connection to MDB")
    mdb, err = sql.Open("postgres", viper.GetString("mdb.url"))
    utils.Must(err)
    utils.Must(mdb.Ping())
    //defer mdb.Close()
    boil.SetDB(mdb)
    //boil.DebugMode = true

    log.Info("Setting up connection to Kmedia")
    kmdb, err = sql.Open("postgres", viper.GetString("kmedia.url"))
    utils.Must(err)
    utils.Must(kmdb.Ping())
    //defer kmdb.Close()

    log.Info("Initializing static data from MDB")
    utils.Must(common.InitTypeRegistries(mdb))

    log.Info("Initializing static data from Kmedia")
    kmediaLessonCT, err = kmodels.ContentTypes(kmdb, qm.Where("name = ?", "Lesson")).One()
    utils.Must(err)
    serverUrls, err = initServers(kmdb)
    utils.Must(err)

    log.Info("Initializing catalogs sources mappings")
    catalogsSourcesMappings, err = initCatalogSourcesMappings()
    utils.Must(err)
    log.Infof("Got %d mappings", len(catalogsSourcesMappings))

    log.Info("Initializing catalogs tags mappings")
    catalogsTagsMappings, err = initCatalogTagsMappings()
    utils.Must(err)
    log.Infof("Got %d mappings", len(catalogsTagsMappings))

    return clock
}

func Shutdown() {
    utils.Must(mdb.Close())
    utils.Must(kmdb.Close())
}

func importContainerWOCollectionNewCU(exec boil.Executor, container *kmodels.Container, cuType string) (*models.ContentUnit, error) {
    err := container.L.LoadFileAssets(kmdb, true, container)
    if err != nil {
        return nil, errors.Wrapf(err, "Load kmedia file assets %d", container.ID)
    }

    // Create import operation
    operation, err := api.CreateOperation(exec, common.OP_IMPORT_KMEDIA,
        api.Operation{WorkflowID: strconv.Itoa(container.ID)}, nil)
    if err != nil {
        return nil, errors.Wrapf(err, "Create operation %d", container.ID)
    }
    stats.OperationsCreated.Inc(1)

    // import container
    unit, err := importContainer(exec, container, nil, cuType, "", 0)
    if err != nil {
        return nil, errors.Wrapf(err, "Import container %d", container.ID)
    }

    // import container files
    var file *models.File
    for _, fileAsset := range container.R.FileAssets {
        log.Infof("Processing file_asset %d", fileAsset.ID)
        stats.FileAssetsProcessed.Inc(1)

        // Create or update MDB file
        file, err = importFileAsset(exec, fileAsset, unit, operation)
        if err != nil {
            return nil, errors.Wrapf(err, "Import file_asset %d", fileAsset.ID)
        }
        if file != nil && file.Published {
            unit.Published = true
        }
    }
    if err != nil {
        return nil, errors.Wrapf(err, "Import container files %d", container.ID)
    }

    if unit.Published {
        _, err = unit.Update(exec, boil.Whitelist("published"))
        if err != nil {
            return nil, errors.Wrapf(err, "Update unit published column %d", container.ID)
        }
    }

    return unit, nil
}

func importContainerWCollection(exec boil.Executor, container *kmodels.Container, collection *models.Collection, cuType string) error {
    stats.ContainersProcessed.Inc(1)

    unit, err := models.ContentUnits(qm4.Where("(properties->>'kmedia_id')::int = ?", container.ID)).One(mdb)
    if err != nil {
        if err == sql.ErrNoRows {
            log.Infof("New CU %d %s", container.ID, container.Name.String)
            return importContainerWCollectionNewCU(exec, container, collection, cuType)
        }
        return errors.Wrapf(err, "Lookup content unit kmid %d", container.ID)
    }

    log.Infof("CU exists [%d] container: %s %d", unit.ID, container.Name.String, container.ID)
    _, err = importContainer(exec, container, collection, cuType,
        strconv.Itoa(container.Position.Int), container.Position.Int)
    if err != nil {
        return errors.Wrapf(err, "Import container %d", container.ID)
    }

    if cuType != common.CONTENT_TYPE_REGISTRY.ByID[unit.TypeID].Name {
        log.Infof("Overriding CU Type to %s", cuType)
        unit.TypeID = common.CONTENT_TYPE_REGISTRY.ByName[cuType].ID
        _, err = unit.Update(exec, boil.Whitelist("type_id"))
        if err != nil {
            return errors.Wrapf(err, "Update CU type %d", unit.ID)
        }
    }

    return nil
}

func importContainerWCollectionNewCU(exec boil.Executor, container *kmodels.Container, collection *models.Collection, cuType string) error {
    err := container.L.LoadFileAssets(kmdb, true, container)
    if err != nil {
        return errors.Wrapf(err, "Load kmedia file assets %d", container.ID)
    }

    // Create import operation
    operation, err := api.CreateOperation(exec, common.OP_IMPORT_KMEDIA,
        api.Operation{WorkflowID: strconv.Itoa(container.ID)}, nil)
    if err != nil {
        return errors.Wrapf(err, "Create operation %d", container.ID)
    }
    stats.OperationsCreated.Inc(1)

    // import container
    unit, err := importContainer(exec, container, collection, cuType,
        strconv.Itoa(container.Position.Int), container.Position.Int)
    if err != nil {
        return errors.Wrapf(err, "Import container %d", container.ID)
    }

    // import container files
    var file *models.File
    for _, fileAsset := range container.R.FileAssets {
        log.Infof("Processing file_asset %d", fileAsset.ID)
        stats.FileAssetsProcessed.Inc(1)

        // Create or update MDB file
        file, err = importFileAsset(exec, fileAsset, unit, operation)
        if err != nil {
            log.Error(err)
            debug.PrintStack()
            break
        }
        if file != nil && file.Published {
            unit.Published = true
        }
    }
    if err != nil {
        return errors.Wrapf(err, "Import container files %d", container.ID)
    }

    if unit.Published {
        _, err = unit.Update(exec, boil.Whitelist("published"))
        if err != nil {
            return errors.Wrapf(err, "Update unit published column %d", container.ID)
        }
    }

    return nil
}

func importContainer(exec boil.Executor,
    container *kmodels.Container,
    collection *models.Collection,
    contentType string,
    ccuName string,
    ccuPosition int,
) (*models.ContentUnit, error) {

    // Get or create content unit by kmedia_id
    unit, err := models.ContentUnits(qm4.Where("(properties->>'kmedia_id')::int = ?", container.ID)).One(exec)
    if err == nil {
        stats.ContentUnitsUpdated.Inc(1)
        if contentType != "" && contentType != common.CONTENT_TYPE_REGISTRY.ByID[unit.TypeID].Name {
            log.Warnf("Different CU type %d %s != %s", unit.ID, common.CONTENT_TYPE_REGISTRY.ByID[unit.TypeID].Name, contentType)
        }
    } else {
        if err == sql.ErrNoRows {
            // Create new content unit
            unit, err = api.CreateContentUnit(exec, contentType, nil)
            if err != nil {
                return nil, errors.Wrapf(err, "Insert unit, container_id [%d]", container.ID)
            }
            stats.ContentUnitsCreated.Inc(1)
        } else {
            return nil, errors.Wrapf(err, "Lookup unit, container_id [%d]", container.ID)
        }
    }

    // Secure
    unit.Secure = mapSecure(container.Secure)
    _, err = unit.Update(exec, boil.Whitelist("secure"))
    if err != nil {
        return nil, errors.Wrapf(err, "Update secure, unit [%d]", unit.ID)
    }

    // Properties
    props := make(map[string]interface{})
    if unit.Properties.Valid {
        unit.Properties.Unmarshal(&props)
    }
    props["kmedia_id"] = container.ID
    if container.LangID.Valid {
        props["original_language"] = common.StdLang(container.LangID.String)
    }
    if container.Filmdate.Valid {
        props["film_date"] = container.Filmdate.Time.Format("2006-01-02")
    }
    if container.PlaytimeSecs.Valid {
        props["duration"] = container.PlaytimeSecs.Int
    }
    err = api.UpdateContentUnitProperties(exec, unit, props)
    if err != nil {
        return nil, errors.Wrapf(err, "Update properties, unit [%d]", unit.ID)
    }

    // TODO: what to do with censor workflow information ?

    // I18n
    descriptions, err := container.ContainerDescriptions(kmdb).All()
    if err != nil {
        return nil, errors.Wrapf(err, "Lookup container descriptions, container_id [%d]", container.ID)
    }

    hasI18n := false
    for _, d := range descriptions {
        if (d.ContainerDesc.Valid && d.ContainerDesc.String != "") ||
            (d.Descr.Valid && d.Descr.String != "") {
            hasI18n = true
            cui18n := models.ContentUnitI18n{
                ContentUnitID: unit.ID,
                Language:      common.LANG_MAP[d.LangID.String],
                Name:          null.NewString(d.ContainerDesc.String, d.ContainerDesc.Valid),
                Description:   null.NewString(d.Descr.String, d.Descr.Valid),
            }
            err = cui18n.Upsert(exec,
                true,
                []string{"content_unit_id", "language"},
                boil.Whitelist("name", "description"),
                boil.Infer())
            if err != nil {
                return nil, errors.Wrapf(err, "Upsert unit i18n, unit [%d]", unit.ID)
            }
        }
    }

    // no i18n - use container name
    if !hasI18n && container.Name.Valid {
        for _, lang := range []string{common.LANG_ENGLISH, common.LANG_HEBREW, common.LANG_RUSSIAN, common.LANG_SPANISH} {
            cui18n := models.ContentUnitI18n{
                ContentUnitID: unit.ID,
                Language:      lang,
                Name:          null.NewString(container.Name.String, container.Name.Valid),
            }
            err = cui18n.Upsert(exec,
                true,
                []string{"content_unit_id", "language"},
                boil.Whitelist("name", "description"),
                boil.Infer())
            if err != nil {
                return nil, errors.Wrapf(err, "Upsert unit i18n, unit [%d]", unit.ID)
            }
        }
    }

    if collection != nil {
        err := createOrUpdateCCU(exec, unit, models.CollectionsContentUnit{
            CollectionID:  collection.ID,
            ContentUnitID: unit.ID,
            Name:          ccuName,
            Position:      ccuPosition,
        })
        if err != nil {
            return nil, err
        }
    }

    // Associate sources & tags
    // we combine existing mappings with catalogs mappings
    err = container.L.LoadCatalogs(kmdb, true, container)
    if err != nil {
        return nil, errors.Wrapf(err, "Load catalogs, container [%d]", container.ID)
    }

    err = unit.L.LoadSources(exec, true, unit, nil)
    if err != nil {
        return nil, errors.Wrapf(err, "Load CU sources %d", unit.ID)
    }

    err = unit.L.LoadTags(exec, true, unit, nil)
    if err != nil {
        return nil, errors.Wrapf(err, "Load CU tags %d", unit.ID)
    }

    srcMap := make(map[int64]*models.Source)
    tagMap := make(map[int64]*models.Tag)
    for _, x := range container.R.Catalogs {
        if s, ok := catalogsSourcesMappings[x.ID]; ok {
            srcMap[s.ID] = s
        } else if t, ok := catalogsTagsMappings[x.ID]; ok {
            tagMap[t.ID] = t
        } else {
            stats.UnkownCatalogs.Inc(fmt.Sprintf("%s [%d]", x.Name, x.ID), 1)
        }
    }
    for _, s := range unit.R.Sources {
        srcMap[s.ID] = s
    }
    for _, t := range unit.R.Tags {
        tagMap[t.ID] = t
    }

    // Set sources
    src := make([]*models.Source, len(srcMap))
    i := 0
    for _, v := range srcMap {
        src[i] = v
        i++
    }
    err = unit.SetSources(exec, false, src...)
    if err != nil {
        return nil, errors.Wrapf(err, "Set sources, unit [%d]", unit.ID)
    }

    // Set tags
    tags := make([]*models.Tag, len(tagMap))
    i = 0
    for _, v := range tagMap {
        tags[i] = v
        i++
    }
    err = unit.SetTags(exec, false, tags...)
    if err != nil {
        return nil, errors.Wrapf(err, "Set tags, unit [%d]", unit.ID)
    }

    // person
    if container.LecturerID.Valid {
        person := mapPerson(container.LecturerID.Int)
        if person != nil {
            cup := models.ContentUnitsPerson{
                PersonID: person.ID,
                RoleID:   1, // lecturer
            }
            err := unit.AddContentUnitsPersons(exec, true, &cup)
            if err != nil {
                return nil, errors.Wrapf(err, "Add person, unit [%d]", unit.ID)
            }
        }
    }

    return unit, nil
}

func createOrUpdateCCU(exec boil.Executor, unit *models.ContentUnit, ccu models.CollectionsContentUnit) error {
    x, err := models.FindCollectionsContentUnit(exec, ccu.CollectionID, ccu.ContentUnitID)
    if err != nil {
        if err == sql.ErrNoRows {
            // create
            err = unit.AddCollectionsContentUnits(exec, true, &models.CollectionsContentUnit{
                CollectionID:  ccu.CollectionID,
                ContentUnitID: ccu.ContentUnitID,
                Name:          ccu.Name,
                Position:      ccu.Position,
            })
            if err != nil {
                return errors.Wrapf(err, "Create CCU [c,cu]=[%d,%d]", ccu.CollectionID, ccu.ContentUnitID)
            }
        } else {
            return errors.Wrapf(err, "Find CCU [c,cu]=[%d,%d]", ccu.CollectionID, ccu.ContentUnitID)
        }
    } else {
        // update if name or position changed
        if ccu.Name != x.Name || ccu.Position != x.Position {
            x.Name = ccu.Name
            x.Position = ccu.Position
            _, err = x.Update(exec, boil.Whitelist("name", "position"))
            if err != nil {
                return errors.Wrapf(err, "Update CCU [c,cu]=[%d,%d]", ccu.CollectionID, ccu.ContentUnitID)
            }
        }
    }

    return nil
}

func importFileAsset(exec boil.Executor, fileAsset *kmodels.FileAsset, unit *models.ContentUnit,
    operation *models.Operation) (*models.File, error) {

    // Get or Create MDB file by SHA1
    var hash string
    if fileAsset.Sha1.Valid {
        hash = fileAsset.Sha1.String

        // This sha1 is for empty files, i.e. physical size = 0
        // We skip them for now.
        // Hoping someone will find these files and get their real sha1...
        if hash == "da39a3ee5e6b4b0d3255bfef95601890afd80709" {
            return nil, nil
        }
    } else {
        hash = fmt.Sprintf("%x", sha1.Sum([]byte(strconv.Itoa(fileAsset.ID))))
        stats.FileAssetsMissingSHA1.Inc(1)
    }

    //file, _, err := api.FindFileBySHA1(exec, hash)
    file, hashB, err := api.FindFileBySHA1(exec, hash)
    if err == nil {
        stats.FilesUpdated.Inc(1)
    } else {
        if _, ok := err.(api.FileNotFound); ok {
            shouldCreate := true

            // For unknown file assets with valid sha1 do second lookup before we create a new file.
            // This time with the fake sha1, if exists we replace fake hash with valid hash.
            // Note: this paragraph should not be executed on first import.
            if fileAsset.Sha1.Valid {
                file, _, err = api.FindFileBySHA1(exec,
                    fmt.Sprintf("%x", sha1.Sum([]byte(strconv.Itoa(fileAsset.ID)))))
                if err == nil {
                    file.Sha1 = null.BytesFrom(hashB)
                    shouldCreate = false
                    stats.FilesUpdated.Inc(1)
                } else {
                    if _, ok := err.(api.FileNotFound); !ok {
                        return nil, errors.Wrapf(err, "Second file lookup, file_asset [%d]", fileAsset.ID)
                    }
                }
            }

            if shouldCreate {
                // Create new file
                f := api.File{
                    FileName:  fileAsset.Name.String,
                    Sha1:      hash,
                    Size:      int64(fileAsset.Size.Int),
                    CreatedAt: &api.Timestamp{Time: fileAsset.Date.Time},
                }
                file, err = api.CreateFile(exec, nil, f, nil)
                if err != nil {
                    return nil, errors.Wrapf(err, "Create file")
                }
                stats.FilesCreated.Inc(1)
            }
        } else {
            return nil, errors.Wrapf(err, "Lookup file %s", hash)
        }
    }

    // Media types
    if fileAsset.AssetType.Valid {
        if mt, ok := common.MEDIA_TYPE_REGISTRY.ByExtension[strings.ToLower(fileAsset.AssetType.String)]; ok {
            file.Type = mt.Type
            file.SubType = mt.SubType
            file.MimeType = null.NewString(mt.MimeType, mt.MimeType != "")
        } else {
            stats.FileAssetsWInvalidMT.Inc(1)
        }
    } else {
        stats.FileAssetsMissingType.Inc(1)
    }

    // Language
    if fileAsset.LangID.Valid {
        l := common.LANG_MAP[fileAsset.LangID.String]
        file.Language = null.NewString(l, l != "")
    }

    // Secure
    if fileAsset.Secure.Valid {
        file.Secure = mapSecure(fileAsset.Secure.Int)
    }
    file.Published = file.Secure == 0

    // Properties
    props := make(map[string]interface{})
    if file.Properties.Valid {
        file.Properties.Unmarshal(&props)
    }
    props["kmedia_id"] = fileAsset.ID
    props["url"] = serverUrls[fileAsset.ServernameID.String] + "/" + file.Name
    if fileAsset.PlaytimeSecs.Valid {
        props["duration"] = fileAsset.PlaytimeSecs.Int
    }
    p, _ := json.Marshal(props)
    file.Properties = null.JSONFrom(p)

    _, err = file.Update(exec, boil.Infer())
    if err != nil {
        return nil, errors.Wrapf(err, "Update file [%d]", file.ID)
    }

    // i18n
    // We don't take anything from file_asset_descriptions as it`s mostly junk

    // Associate files with content unit
    if file.ContentUnitID.Valid && file.ContentUnitID.Int64 != unit.ID {
        log.Warnf("Changing file's unit association from %d to %d", file.ContentUnitID.Int64, unit.ID)
    }
    err = unit.AddFiles(exec, false, file)
    if err != nil {
        return nil, errors.Wrapf(err, "Associate file [%d] to unit [%d]", file.ID, unit.ID)
    }

    // Associate files with operation

    // We use a raw query here to do nothing on conflicts
    // These conflicts happen when different file_assets in the same lesson have identical SHA1
    _, err = queries.Raw(
        `INSERT INTO files_operations (file_id, operation_id) VALUES ($1, $2) ON CONFLICT DO NOTHING`,
        file.ID, operation.ID).Exec(exec)

    return file, err
}

func initServers(exec boil.Executor) (map[string]string, error) {
    servers, err := kmodels.Servers(exec).All()
    if err != nil {
        return nil, errors.Wrap(err, "Load kmedia servers")
    }

    serverUrls := make(map[string]string)
    for _, s := range servers {
        serverUrls[s.Servername] = s.Httpurl.String
    }
    return serverUrls, nil
}

func initCatalogSourcesMappings() (map[int]*models.Source, error) {
    // read mappings file
    records, err := utils.ReadCSV(CATALOGS_SOURCES_MAPPINGS_FILE)
    if err != nil {
        return nil, errors.Wrap(err, "Read catalogs sources mappings")
    }
    log.Infof("Catalogs Sources Mappings has %d rows", len(records))

    // read MDB sources
    rows, err := queries.Raw(`WITH RECURSIVE rec_sources AS (
  SELECT
    s.id,
    concat(a.code, '/', s.name) path
  FROM sources s INNER JOIN authors_sources x ON s.id = x.source_id
    INNER JOIN authors a ON x.author_id = a.id
  WHERE s.parent_id IS NULL
  UNION
  SELECT
    s.id,
    concat(rs.path, '/', s.name)
  FROM sources s INNER JOIN rec_sources rs ON s.parent_id = rs.id
)
SELECT *
FROM rec_sources;`).Query(mdb)
    if err != nil {
        return nil, errors.Wrap(err, "Read MDB sources")
    }

    defer rows.Close()

    tmp := make(map[string]*models.Source)
    for rows.Next() {
        var id int64
        var path string
        err := rows.Scan(&id, &path)
        if err != nil {
            return nil, errors.Wrap(err, "Scan row")
        }
        tmp[path] = &models.Source{ID: id}
    }
    err = rows.Err()
    if err != nil {
        return nil, errors.Wrap(err, "Iterating MDB sources")
    }
    log.Infof("%d MDB Sources", len(tmp))

    mappings := make(map[int]*models.Source)
    for i, r := range records[1:] {
        catalogID, err := strconv.Atoi(r[0])
        if err != nil {
            return nil, errors.Wrapf(err, "Bad catalog_id, row [%d]", i)
        }

        sourcePath := strings.TrimSpace(r[2])
        s, ok := tmp[sourcePath]
        if !ok {
            log.Warnf("Unknown source, path=%s", sourcePath)
        }
        mappings[catalogID] = s
    }

    return mappings, nil
}

func initCatalogTagsMappings() (map[int]*models.Tag, error) {
    // Read mappings file
    records, err := utils.ReadCSV(CATALOGS_TAGS_MAPPINGS_FILE)
    if err != nil {
        return nil, errors.Wrap(err, "Read catalogs tags mappings")
    }
    log.Infof("Catalogs Tags Mappings has %d rows", len(records))

    // Read all tags from MDB
    tags, err := models.Tags().All(mdb)
    if err != nil {
        return nil, errors.Wrap(err, "Fetch tags from MDB")
    }
    tmp := make(map[int64]*models.Tag, len(tags))
    for _, t := range tags {
        tmp[t.ID] = t
    }

    // Create mappings
    mappings := make(map[int]*models.Tag, len(records)-1)
    for i, r := range records[1:] {
        catalogID, err := strconv.Atoi(r[0])
        if err != nil {
            return nil, errors.Wrapf(err, "Bad catalog_id, row [%d]", i)
        }
        tagID, err := strconv.Atoi(r[1])
        if err != nil {
            return nil, errors.Wrapf(err, "Bad tag_id, row [%d]", i)
        }

        mappings[catalogID] = tmp[int64(tagID)]
    }

    return mappings, nil
}

func mapSecure(kmVal int) int16 {
    if kmVal == 0 {
        return common.SEC_PUBLIC
    } else if kmVal < 4 {
        return common.SEC_SENSITIVE
    }
    return common.SEC_PRIVATE
}

func mapPerson(kmID int) *models.Person {
    switch kmID {
    case 1:
        return common.PERSON_REGISTRY.ByPattern["rav"]
    case 8:
        return common.PERSON_REGISTRY.ByPattern["rb"]
    default:
        return nil
    }
}

func loadContainersInCatalogsAndCUs(catalogIDs ...int) (map[int]*kmodels.Container, map[int]*models.ContentUnit, error) {
    q := `WITH RECURSIVE rec_catalogs AS (
  SELECT c.id
  FROM catalogs c
  WHERE id IN (%s)
  UNION
  SELECT c.id
  FROM catalogs c INNER JOIN rec_catalogs rc ON c.parent_id = rc.id
)
SELECT
  DISTINCT cc.container_id
FROM rec_catalogs rc INNER JOIN catalogs_containers cc ON rc.id = cc.catalog_id`

    catIDs := make([]string, len(catalogIDs))
    for i := range catalogIDs {
        catIDs[i] = strconv.Itoa(catalogIDs[i])
    }

    rows, err := queries.Raw(fmt.Sprintf(q, strings.Join(catIDs, ","))).Query(kmdb)
    if err != nil {
        return nil, nil, errors.Wrap(err, "Load containers")
    }
    defer rows.Close()

    cnIDs := make([]int, 0)
    for rows.Next() {
        var x int
        if err := rows.Scan(&x); err != nil {
            return nil, nil, errors.Wrap(err, "rows.Scan")
        } else {
            cnIDs = append(cnIDs, x)
        }
    }
    if err := rows.Err(); err != nil {
        return nil, nil, errors.Wrap(err, "rows.Err")
    }
    log.Infof("len(cnIDS) = %d", len(cnIDs))

    pageSize := 2500
    page := 0
    cnMap := make(map[int]*kmodels.Container, len(cnIDs))
    cuMap := make(map[int]*models.ContentUnit, len(cnIDs))
    for page*pageSize < len(cnIDs) {
        s := page * pageSize
        e := utils.Min(len(cnIDs), s+pageSize)
        ids := utils.ConvertArgsInt(cnIDs[s:e]...)

        cns, err := kmodels.Containers(kmdb,
            qm.WhereIn("id in ?", ids...),
            qm.Load("Catalogs")).
            All()
        if err != nil {
            return nil, nil, errors.Wrapf(err, "Load containers page %d", page)
        }
        for i := range cns {
            cn := cns[i]
            cnMap[cn.ID] = cn
        }

        cus, err := models.ContentUnits(
            qm4.WhereIn("(properties->>'kmedia_id')::int in ?", ids...)).
            All(mdb)
        if err != nil {
            return nil, nil, errors.Wrapf(err, "Load content units page %d", page)
        }
        for i := range cus {
            cu := cus[i]
            var props map[string]interface{}
            if err := json.Unmarshal(cu.Properties.JSON, &props); err != nil {
                return nil, nil, errors.Wrapf(err, "json.Unmarshal CU properties %d", cu.ID)
            }
            cuMap[int(props["kmedia_id"].(float64))] = cu
        }

        page++
    }
    log.Infof("len(cnMap) = %d", len(cnMap))
    log.Infof("len(cuMap) = %d", len(cuMap))

    return cnMap, cuMap, nil
}

func loadContainersByTypeAndCUs(typeID int) (map[int]*kmodels.Container, map[int]*models.ContentUnit, error) {
    containers, err := kmodels.Containers(kmdb,
        qm.Where("content_type_id = ?", typeID),
        qm.Load("Catalogs")).
        All()
    if err != nil {
        return nil, nil, errors.Wrap(err, "Load containers")
    }

    cnMap := make(map[int]*kmodels.Container)
    for i := range containers {
        cn := containers[i]
        cnMap[cn.ID] = cn
    }

    cnIDs := make([]interface{}, len(cnMap))
    i := 0
    for k := range cnMap {
        cnIDs[i] = k
        i++
    }

    pageSize := 2500
    page := 0
    cuMap := make(map[int]*models.ContentUnit)
    for page*pageSize < len(cnIDs) {
        s := page * pageSize
        e := utils.Min(len(cnIDs), s+pageSize)

        cus, err := models.ContentUnits(
            qm4.WhereIn("(properties->>'kmedia_id')::int in ?", cnIDs[s:e]...)).
            All(mdb)
        if err != nil {
            return nil, nil, errors.Wrapf(err, "Load content units page %d", page)
        }
        for i := range cus {
            cu := cus[i]
            var props map[string]interface{}
            if err := json.Unmarshal(cu.Properties.JSON, &props); err != nil {
                return nil, nil, errors.Wrapf(err, "json.Unmarshal CU properties %d", cu.ID)
            }
            cuMap[int(props["kmedia_id"].(float64))] = cu
        }

        page++
    }
    log.Infof("len(cnMap) = %d", len(cnMap))
    log.Infof("len(cuMap) = %d", len(cuMap))

    return cnMap, cuMap, nil
}