Bnei-Baruch/mdb

View on GitHub
api/metadata_processor.go

Summary

Maintainability
F
1 wk
Test Coverage
package api

import (
    "database/sql"
    "encoding/json"
    "fmt"
    "strconv"
    "strings"
    "time"

    log "github.com/Sirupsen/logrus"
    "github.com/emirpasic/gods/sets/hashset"
    "github.com/lib/pq"
    "github.com/pkg/errors"
    "github.com/volatiletech/null/v8"
    "github.com/volatiletech/sqlboiler/v4/boil"
    "github.com/volatiletech/sqlboiler/v4/queries"
    "github.com/volatiletech/sqlboiler/v4/queries/qm"

    "github.com/Bnei-Baruch/mdb/common"
    "github.com/Bnei-Baruch/mdb/events"
    "github.com/Bnei-Baruch/mdb/models"
    "github.com/Bnei-Baruch/mdb/utils"
)

func ProcessCITMetadata(exec boil.Executor, metadata CITMetadata, original, proxy, source *models.File) ([]events.Event, error) {
    return doProcess(exec, metadata, original, proxy, source, nil)
}

// Do all stuff for processing metadata coming from Content Identification Tool.
//  1. Update properties for original and proxy (film_date, capture_date)
//  2. Update language of original
//  3. Create content_unit (content_type, dates)
//  4. Describe content unit (i18ns)
//  5. Add files to new unit
//  6. Add ancestor files to unit
//  7. Add peer ancestor (related captures)
//  8. Associate unit with sources, tags, and persons
//  9. Get or create collection
//  10. Update collection (content_type, dates, number) if full lesson or new lesson
//  11. Associate collection and unit
//  12. Associate unit and derived units
//  13. Set default permissions ?!
func doProcess(exec boil.Executor, metadata CITMetadata, original, proxy, source *models.File, cu *models.ContentUnit) ([]events.Event, error) {
    isUpdate := cu != nil
    log.Infof("Processing CITMetadata, isUpdate: %t", isUpdate)

    // Update properties for original and proxy (film_date, capture_date)
    filmDate := metadata.CaptureDate
    //if metadata.WeekDate != nil {
    //    filmDate = *metadata.WeekDate
    //}
    if metadata.FilmDate != nil {
        filmDate = *metadata.FilmDate
    }

    evnts := make([]events.Event, 0)

    props := map[string]interface{}{
        "capture_date":      metadata.CaptureDate,
        "film_date":         filmDate,
        "original_language": common.StdLang(metadata.Language),
    }
    log.Infof("Updating files properties: %v", props)
    err := UpdateFileProperties(exec, original, props)
    if err != nil {
        return nil, err
    }
    evnts = append(evnts, events.FileUpdateEvent(original))
    if proxy != nil {
        err = UpdateFileProperties(exec, proxy, props)
        if err != nil {
            return nil, err
        }
        evnts = append(evnts, events.FileUpdateEvent(proxy))
    }

    // Update language of original.
    // TODO: What about proxy !?
    if metadata.HasTranslation {
        original.Language = null.StringFrom(common.LANG_MULTI)
    } else {
        l := common.StdLang(metadata.Language)
        if l == common.LANG_UNKNOWN {
            log.Warnf("Unknown language in metadata %s", metadata.Language)
        }
        original.Language = null.StringFrom(l)
    }
    log.Infof("Updating original.Language to %s", original.Language.String)
    _, err = original.Update(exec, boil.Whitelist("language"))
    if err != nil {
        return nil, errors.Wrap(err, "Save original to DB")
    }

    // Create content_unit (content_type, dates)
    isDerived := metadata.ArtifactType.Valid && metadata.ArtifactType.String != "main"
    ct := metadata.ContentType
    if isDerived {
        // User input is verified below
        ct = strings.ToUpper(metadata.ArtifactType.String)
    }

    var originalProps map[string]interface{}
    err = original.Properties.Unmarshal(&originalProps)
    if err != nil {
        return nil, errors.Wrap(err, "json.Unmarshal original properties")
    }
    if duration, ok := originalProps["duration"]; ok {
        props["duration"] = int(duration.(float64))
    } else {
        log.Warnf("Original is missing duration property [%d]", original.ID)
    }

    if metadata.LabelID.Valid {
        props["label_id"] = metadata.LabelID.Int
    }
    if metadata.Number.Valid {
        props["number"] = metadata.Number.Int
    }
    if metadata.Part.Valid {
        props["part"] = metadata.Part.Int
    }

    if isUpdate {
        // content_type
        if ctVal, ok := common.CONTENT_TYPE_REGISTRY.ByName[ct]; !ok {
            return nil, errors.Errorf("Unknown content type %s", ct)
        } else if ctVal.ID != cu.TypeID {
            // update unit's content type
            cu.TypeID = ctVal.ID
            _, err = cu.Update(exec, boil.Whitelist("type_id"))
            if err != nil {
                return nil, errors.Wrap(err, "Update unit type in DB")
            }
        }

        // props
        propsBytes, err := json.Marshal(props)
        if err != nil {
            return nil, errors.Wrap(err, "json Marshal")
        }
        cu.Properties = null.JSONFrom(propsBytes)
        _, err = cu.Update(exec, boil.Whitelist("properties"))
        if err != nil {
            return nil, errors.Wrap(err, "Update unit properties in DB")
        }
    } else {
        log.Infof("Creating content unit of type %s", ct)
        cu, err = CreateContentUnit(exec, ct, props)
        if err != nil {
            return nil, errors.Wrap(err, "Create content unit")
        }
        evnts = append(evnts, events.ContentUnitCreateEvent(cu))

        log.Infof("Describing content unit [%d]", cu.ID)
        err = DescribeContentUnit(exec, cu, metadata)
        if err != nil {
            log.Errorf("Error describing content unit: %s", err.Error())
        }
    }

    // Add files to new unit
    log.Info("Adding files to unit")
    err = cu.AddFiles(exec, false, original)
    if err != nil {
        return nil, errors.Wrap(err, "Add original to unit")
    }
    if proxy != nil {
        err = cu.AddFiles(exec, false, proxy)
        if err != nil {
            return nil, errors.Wrap(err, "Add proxy to unit")
        }
    }
    if source != nil {
        ancestors, err := FindFileAncestors(exec, source.ID)
        if err != nil {
            return nil, errors.Wrap(err, "Find source's ancestors")
        }
        ancestors = append(ancestors, source)
        err = cu.AddFiles(exec, false, ancestors...)
        if err != nil {
            return nil, errors.Wrap(err, "Add source and his ancestors to unit")
        }
        for i := range ancestors {
            x := ancestors[i]
            evnts = append(evnts, events.FileUpdateEvent(x))
            log.Infof("%s [%d]", x.Name, x.ID)
        }
    }

    // Add ancestor files to unit (not for derived units)
    if !isDerived && !isUpdate {
        log.Info("Main unit, adding ancestors...")
        ancestors, err := FindFileAncestors(exec, original.ID)
        if err != nil {
            return nil, errors.Wrap(err, "Find original's ancestors")
        }

        if proxy != nil {
            err = proxy.L.LoadParent(exec, true, proxy, nil)
            if err != nil {
                return nil, errors.Wrap(err, "Load proxy's parent")
            }
            if proxy.R.Parent != nil {
                ancestors = append(ancestors, proxy.R.Parent)
            }
        }

        err = cu.AddFiles(exec, false, ancestors...)
        if err != nil {
            return nil, errors.Wrap(err, "Add ancestors to unit")
        }
        log.Infof("Added %d ancestors", len(ancestors))
        for i := range ancestors {
            x := ancestors[i]
            evnts = append(evnts, events.FileUpdateEvent(x))
            log.Infof("%s [%d]", x.Name, x.ID)
        }
    }

    // Associate unit with sources, tags, and persons
    if len(metadata.Sources) > 0 {
        log.Infof("Associating %d sources", len(metadata.Sources))
        sources, err := models.Sources(
            qm.WhereIn("uid in ?", utils.ConvertArgsString(metadata.Sources)...)).
            All(exec)
        if err != nil {
            return nil, errors.Wrap(err, "Lookup sources in DB")
        }

        // are we missing some source ?
        if len(sources) != len(metadata.Sources) {
            missing := make([]string, 0)
            for _, x := range metadata.Sources {
                found := false
                for _, y := range sources {
                    if x == y.UID {
                        found = true
                        break
                    }
                }
                if !found {
                    missing = append(missing, x)
                }
            }
            log.Warnf("Unknown sources: %s", missing)
        }

        err = cu.SetSources(exec, false, sources...)
        if err != nil {
            return nil, errors.Wrap(err, "Associate sources")
        }
    }

    if len(metadata.Tags) > 0 {
        log.Infof("Associating %d tags", len(metadata.Tags))
        tags, err := models.Tags(
            qm.WhereIn("uid in ?", utils.ConvertArgsString(metadata.Tags)...)).
            All(exec)
        if err != nil {
            return nil, errors.Wrap(err, "Lookup tags  in DB")
        }

        // are we missing some tag ?
        if len(tags) != len(metadata.Tags) {
            missing := make([]string, 0)
            for _, x := range metadata.Tags {
                found := false
                for _, y := range tags {
                    if x == y.UID {
                        found = true
                        break
                    }
                }
                if !found {
                    missing = append(missing, x)
                }
            }
            log.Warnf("Unknown sources: %s", missing)
        }
        err = cu.SetTags(exec, false, tags...)
        if err != nil {
            return nil, errors.Wrap(err, "Associate tags")
        }
    }
    if len(metadata.Likutim) > 0 {
        log.Infof("Associating %d likutim", len(metadata.Likutim))
        likutim, err := models.ContentUnits(
            qm.Select("distinct on (\"content_units\".id) \"content_units\".*"),
            qm.Where("type_id = ?", common.CONTENT_TYPE_REGISTRY.ByName[common.CT_LIKUTIM].ID),
            qm.WhereIn("uid in ?", utils.ConvertArgsString(metadata.Likutim)...)).
            All(exec)
        if err != nil {
            return nil, errors.Wrap(err, "Lookup tags  in DB")
        }

        // are we missing some unit ?
        if len(likutim) != len(metadata.Likutim) {
            missing := make([]string, 0)
            for _, x := range metadata.Likutim {
                found := false
                for _, y := range likutim {
                    if x == y.UID {
                        found = true
                        break
                    }
                }
                if !found {
                    missing = append(missing, x)
                }
            }
            log.Warnf("Unknown likutim: %s", missing)
        }

        derivations := make([]*models.ContentUnitDerivation, len(likutim))
        for i, l := range likutim {
            cud := &models.ContentUnitDerivation{
                SourceID:  cu.ID,
                DerivedID: l.ID,
            }
            derivations[i] = cud
        }
        err = cu.AddSourceContentUnitDerivations(exec, true, derivations...)
        if err != nil {
            return nil, errors.Wrap(err, "Associate likutim")
        }
        for _, l := range likutim {
            evnts = append(evnts, events.ContentUnitDerivativesChangeEvent(l))
        }
        evnts = append(evnts, events.ContentUnitDerivativesChangeEvent(cu))
    }

    // Handle persons ...
    if strings.ToLower(metadata.Lecturer) == common.P_RAV {
        log.Info("Associating unit to rav")
        cup := &models.ContentUnitsPerson{
            ContentUnitID: cu.ID,
            PersonID:      common.PERSON_REGISTRY.ByPattern[common.P_RAV].ID,
            RoleID:        common.CONTENT_ROLE_TYPE_REGISTRY.ByName[common.CR_LECTURER].ID,
        }

        // upsert make sure we either have such relation or insert a new one
        err = cup.Upsert(exec, false, nil, boil.Infer(), boil.Infer())
        if err != nil {
            return nil, errors.Wrap(err, "Associate persons")
        }
    } else if isUpdate && strings.ToLower(metadata.Lecturer) == "norav" {
        // in update mode, if norav so we remove relation to rav (if any)
        cup := &models.ContentUnitsPerson{
            ContentUnitID: cu.ID,
            PersonID:      common.PERSON_REGISTRY.ByPattern[common.P_RAV].ID,
        }
        _, err = cup.Delete(exec)
        if err != nil {
            return nil, errors.Wrap(err, "Delete Rav association")
        }
    } else {
        log.Infof("Unknown lecturer %s, skipping person association.", metadata.Lecturer)
    }

    // Get or create collection
    if metadata.CollectionUID.Valid {
        log.Infof("Specific collection %s", metadata.CollectionUID.String)

        // find collection
        c, err := models.Collections(qm.Where("uid = ?", metadata.CollectionUID.String)).One(exec)
        if err != nil {
            if err == sql.ErrNoRows {
                log.Warnf("No such collection %s", metadata.CollectionUID.String)
            } else {
                return nil, errors.Wrap(err, "Lookup collection in DB")
            }
        }

        // Associate unit to collection
        if c != nil &&
            (!metadata.ArtifactType.Valid || metadata.ArtifactType.String == "main") {
            err := associateUnitToCollection(exec, cu, c, metadata)
            if err != nil {
                return nil, errors.Wrap(err, "associate content_unit to collection")
            }
            evnts = append(evnts, events.CollectionContentUnitsChangeEvent(c))
        }
    }

    // Update mode ends here
    if isUpdate {
        return evnts, nil
    }

    // for lesson part that have source or likutim we check if its need add to lessons series collection
    // if we have number of lessons on period we create new collection
    if len(metadata.Likutim) > 0 && common.CONTENT_TYPE_REGISTRY.ByID[cu.TypeID].Name == common.CT_LESSON_PART {
        seriesEvnts, err := (&AssociateByLikutim{tx: exec, cu: cu}).Associate(metadata.Likutim)
        if err != nil {
            return nil, errors.Wrap(err, "Associate Lessons series collection by likutim")
        }
        evnts = append(evnts, seriesEvnts...)
    }

    if len(metadata.Sources) > 0 && common.CONTENT_TYPE_REGISTRY.ByID[cu.TypeID].Name == common.CT_LESSON_PART {
        seriesEvnts, err := (&AssociateBySources{tx: exec, cu: cu}).Associate(metadata.Sources)
        if err != nil {
            return nil, errors.Wrap(err, "Associate Lessons series collection by sources")
        }
        evnts = append(evnts, seriesEvnts...)
    }

    if ct == common.CT_LESSON_PART ||
        ct == common.CT_FULL_LESSON ||
        ct == common.CT_KTAIM_NIVCHARIM {
        log.Info("Lesson reconciliation")

        // we look up Original's capture_stop operation as it holds required information below
        var captureStopProps map[string]interface{}
        captureStop, err := FindUpChainOperation(exec, original.ID, common.OP_CAPTURE_STOP)
        if err != nil {
            if ex, ok := err.(UpChainOperationNotFound); ok {
                log.Warnf("capture_stop operation not found for original: %s", ex.Error())
            }
        } else {
            if captureStop.Properties.Valid {
                err = json.Unmarshal(captureStop.Properties.JSON, &captureStopProps)
                if err != nil {
                    return nil, errors.Wrap(err, "json Unmarshal")
                }
            }
        }

        // Reconcile or create new
        // Reconciliation is done by looking up the operation chain of original to capture_stop.
        // There we have a property of saying the capture_id of the full lesson capture.
        if captureID, ok := captureStopProps["collection_uid"]; ok {
            log.Infof("Reconcile by capture_id %s", captureID)
            var cct string
            if metadata.WeekDate == nil {
                cct = common.CT_DAILY_LESSON
            } else {
                cct = common.CT_SPECIAL_LESSON
            }

            // Keep this property on the collection for other parts to find it
            props["capture_id"] = captureID
            if metadata.Number.Valid {
                props["number"] = metadata.Number.Int
            }
            delete(props, "duration")
            delete(props, "part")

            // get or create collection
            c, err := FindCollectionByCaptureID(exec, captureID)
            if err != nil {
                if _, ok := err.(CollectionNotFound); !ok {
                    return nil, err
                }

                // Create new collection
                log.Info("Creating new collection")
                c, err = CreateCollection(exec, cct, props)
                if err != nil {
                    return nil, err
                }
                evnts = append(evnts, events.CollectionCreateEvent(c))
            } else if ct == common.CT_FULL_LESSON {
                // Update collection properties to those of full lesson
                log.Info("Full lesson, overriding collection properties")
                if c.TypeID != common.CONTENT_TYPE_REGISTRY.ByName[cct].ID {
                    log.Infof("Full lesson, content_type changed to %s", cct)
                    c.TypeID = common.CONTENT_TYPE_REGISTRY.ByName[cct].ID
                    _, err = c.Update(exec, boil.Whitelist("type_id"))
                    if err != nil {
                        return nil, errors.Wrap(err, "Update collection type in DB")
                    }
                }

                err = UpdateCollectionProperties(exec, c, props)
                if err != nil {
                    return nil, err
                }
                evnts = append(evnts, events.CollectionUpdateEvent(c))
            }

            // Associate unit to collection
            if c != nil &&
                (!metadata.ArtifactType.Valid ||
                    metadata.ArtifactType.String == "main" ||
                    metadata.ArtifactType.String == "KTAIM_NIVCHARIM") {
                err := associateUnitToCollection(exec, cu, c, metadata)
                if err != nil {
                    return nil, errors.Wrap(err, "associate content_unit to collection")
                }
                evnts = append(evnts, events.CollectionContentUnitsChangeEvent(c))
            }
        } else {
            log.Warn("capture_stop not found or its missing collection_uid. Skipping lesson reconciliation")
        }
    }

    // Associate unit and derived units
    // We take into account that a derived content unit arrives before it's source content unit.
    // Such cases are possible due to the studio operator actions sequence.
    err = original.L.LoadParent(exec, true, original, nil)
    if err != nil {
        return nil, errors.Wrap(err, "Load original's parent")
    }

    if original.R.Parent == nil {
        log.Warn("We don't have original's parent file. Skipping derived units association.")
    } else {
        log.Info("Processing derived units associations")
        if !metadata.ArtifactType.Valid ||
            metadata.ArtifactType.String == "main" {
            // main content unit
            log.Info("We're the main content unit")

            log.Info("Looking up pending derived units")
            derivedCUs, err := mainToDerived(exec, metadata, original)
            if err != nil {
                return nil, err
            }

            log.Infof("%d derived units pending our association", len(derivedCUs))
            for k, v := range derivedCUs {
                log.Infof("DerivedID: %d, Name: %s", k, v)
                cud := &models.ContentUnitDerivation{
                    DerivedID: k,
                    Name:      v,
                }
                err = cu.AddSourceContentUnitDerivations(exec, true, cud)
                if err != nil {
                    return nil, errors.Wrap(err, "Save derived unit association in DB")
                }

                _, err = queries.Raw(`UPDATE content_units SET properties = properties - 'artifact_type' WHERE id = $1`, k).
                    Exec(exec)
                if err != nil {
                    return nil, errors.Wrap(err, "Delete derived unit artifact_type property from DB")
                }
            }

            if len(derivedCUs) > 0 {
                evnts = append(evnts, events.ContentUnitDerivativesChangeEvent(cu))
            }

        } else {
            // derived content unit
            log.Info("We're the derived content unit")

            mainCUID, err := derivedToMain(exec, metadata, cu, original)
            if err != nil {
                return nil, err
            }

            if mainCUID == 0 {
                // save artifact type for later use (when main unit appears)
                log.Info("Main content unit not found, saving artifact_type property")
                err = UpdateContentUnitProperties(exec, cu, map[string]interface{}{
                    "artifact_type": metadata.ArtifactType.String,
                })
                if err != nil {
                    return nil, err
                }
            } else {
                // main content unit already exists
                log.Infof("Main content unit exists %d", mainCUID)
                cud := &models.ContentUnitDerivation{
                    SourceID: mainCUID,
                    Name:     metadata.ArtifactType.String,
                }
                err = cu.AddDerivedContentUnitDerivations(exec, true, cud)
                if err != nil {
                    return nil, errors.Wrap(err, "Save source unit in DB")
                }
                evnts = append(evnts, events.ContentUnitDerivativesChangeEvent(cu))
            }
        }
    }

    // set default permissions ?!

    return evnts, nil
}

func associateUnitToCollection(exec boil.Executor, cu *models.ContentUnit, c *models.Collection, metadata CITMetadata) error {
    log.Infof("Associating unit and collection [c-cu]=[%d-%d]", c.ID, cu.ID)

    ccu := &models.CollectionsContentUnit{
        CollectionID:  c.ID,
        ContentUnitID: cu.ID,
    }

    switch common.CONTENT_TYPE_REGISTRY.ByID[cu.TypeID].Name {
    case common.CT_FULL_LESSON:
        if c.TypeID == common.CONTENT_TYPE_REGISTRY.ByName[common.CT_DAILY_LESSON].ID ||
            c.TypeID == common.CONTENT_TYPE_REGISTRY.ByName[common.CT_SPECIAL_LESSON].ID {
            ccu.Name = "full"
        } else if metadata.Number.Valid {
            ccu.Name = strconv.Itoa(metadata.Number.Int)
        }
        break
    case common.CT_LESSON_PART:
        if metadata.Part.Valid {
            ccu.Name = strconv.Itoa(metadata.Part.Int)
        }
        break
    case common.CT_VIDEO_PROGRAM_CHAPTER:
        if metadata.Episode.Valid {
            ccu.Name = metadata.Episode.String
        }
        break
    default:
        if metadata.Number.Valid {
            ccu.Name = strconv.Itoa(metadata.Number.Int)
        }

        // first 3 event part types are lesson, YH and meal, we skip them.
        if metadata.PartType.Valid && metadata.PartType.Int > 2 {
            idx := metadata.PartType.Int - 3
            if idx < len(common.MISC_EVENT_PART_TYPES) {
                ccu.Name = common.MISC_EVENT_PART_TYPES[idx] + ccu.Name
            } else {
                log.Warnf("Unknown event part type: %d", metadata.PartType.Int)
            }
        }
        break
    }
    if metadata.ArtifactType.Valid &&
        metadata.ArtifactType.String != "main" {
        ccu.Name = fmt.Sprintf("%s_%s", metadata.ArtifactType.String, ccu.Name)
    }

    // Make this new unit the last one in this collection
    var err error
    ccu.Position, err = GetNextPositionInCollection(exec, c.ID)
    if err != nil {
        return errors.Wrap(err, "Get last position in collection")
    }

    log.Infof("Association name: %s", ccu.Name)
    err = ccu.Upsert(exec, true,
        []string{"collection_id", "content_unit_id"},
        boil.Whitelist("name", "position"),
        boil.Infer())
    if err != nil {
        return errors.Wrap(err, "Save collection and content unit association in DB")
    }

    return nil
}

func mainToDerived(exec boil.Executor, metadata CITMetadata, original *models.File) (map[int64]string, error) {
    part := -888 // something we never use for part
    if metadata.Part.Valid {
        part = metadata.Part.Int
    }

    // We lookup original's siblings for derived content units that arrived before us.
    // We then associate them with us and remove their "unprocessed" mark.
    // Meaning, the presence of "artifact_type" property
    rows, err := queries.Raw(
        `SELECT
  cu.id,
  cu.properties ->> 'artifact_type'
FROM content_units cu
  INNER JOIN files f ON f.content_unit_id = cu.id AND f.parent_id = $1
WHERE cu.properties ? 'artifact_type' AND (cu.properties ->> 'part') :: INT = $2`,
        original.ParentID.Int64, part).
        Query(exec)
    if err != nil {
        return nil, errors.Wrap(err, "Load derived content units")
    }

    // put results in a map first since we can't process them while iterating.
    // see this bug:  https://github.com/lib/pq/issues/81
    derivedCUs := make(map[int64]string)
    for rows.Next() {
        var cuid int64
        var artifactType string
        err = rows.Scan(&cuid, &artifactType)
        if err != nil {
            return nil, errors.Wrap(err, "Scan row")
        }
        derivedCUs[cuid] = artifactType
    }
    err = rows.Err()
    if err != nil {
        return nil, errors.Wrap(err, "Iter rows")
    }
    err = rows.Close()
    if err != nil {
        return nil, errors.Wrap(err, "Close rows")
    }

    return derivedCUs, nil
}

func derivedToMain(exec boil.Executor, metadata CITMetadata, cu *models.ContentUnit, original *models.File) (int64, error) {
    part := -888 // something we never use for part
    if metadata.Part.Valid {
        part = metadata.Part.Int
    }

    mainCT, ok := common.CONTENT_TYPE_REGISTRY.ByName[metadata.ContentType]
    if !ok {
        return 0, errors.Errorf("Unknown content type %s", metadata.ContentType)
    }

    var cuID int64
    err := queries.Raw(`
SELECT cu.id
FROM content_units cu
  INNER JOIN files f ON f.content_unit_id = cu.id AND f.parent_id = $1 AND cu.id != $2 AND cu.type_id = $3
WHERE (cu.properties ->> 'part') :: INT = $4`,
        original.ParentID.Int64, cu.ID, mainCT.ID, part).QueryRow(exec).Scan(&cuID)

    if err != nil {
        if err == sql.ErrNoRows {
            return 0, nil
        } else {
            return 0, errors.Wrap(err, "Query main CU ID")
        }
    }

    return cuID, nil
}

/*
    send-fix

Sometimes, after a unit was created in a send operation,
we need to fix it.

Either the metadata that was given is wrong or we might need a different trim.
For such cases a new button in the trim admin "fix" is made.

The workflow simply shows the same CIT screen and unit selection.
We should:

1. re-process the metadata in "update" mode
2. figure out files for removal
3. mark those as removed
4. update the unit's published status
*/
func ProcessCITMetadataUpdate(exec boil.Executor, metadata CITMetadata, original, proxy, source *models.File) ([]events.Event, error) {
    unit, err := models.ContentUnits(qm.Where("uid = ?", metadata.UnitToFixUID.String)).One(exec)
    if err != nil {
        return nil, errors.Wrapf(err, "lookup unit UID %s", metadata.UnitToFixUID.String)
    }

    evnts, err := doProcess(exec, metadata, original, proxy, source, unit)
    if err != nil {
        return nil, errors.Wrap(err, "doProcess")
    }

    // We remove only files generated in convert (carbon)
    // and previous trimmed not in our path.
    // Other, manually inserted, files are not touched and are left to admin
    // to figure out what to do with them.

    // Figure out merged set of file IDs
    // which are either ancestor of original or proxy of source
    // These should be excluded from removal.
    mutualAncestors := hashset.New()

    oPath, err := FindFileAncestors(exec, original.ID)
    if err != nil {
        return nil, errors.Wrapf(err, "lookup original ancestors %d", original.ID)
    }
    for i := range oPath {
        mutualAncestors.Add(oPath[i].ID)
    }

    if proxy != nil {
        pPath, err := FindFileAncestors(exec, proxy.ID)
        if err != nil {
            return nil, errors.Wrapf(err, "lookup proxy ancestors %d", original.ID)
        }
        for i := range pPath {
            mutualAncestors.Add(pPath[i].ID)
        }
    }

    if source != nil {
        sPath, err := FindFileAncestors(exec, source.ID)
        if err != nil {
            return nil, errors.Wrapf(err, "lookup source ancestors %d", original.ID)
        }
        for i := range sPath {
            mutualAncestors.Add(sPath[i].ID)
        }
    }

    ancestorsIDs := mutualAncestors.Values()
    // These are the fix. Not the problem. Don't remove them
    ancestorsIDs = append(ancestorsIDs, original.ID)
    if proxy != nil {
        ancestorsIDs = append(ancestorsIDs, original.ID, proxy.ID)
    }

    log.Infof("ancestorsIDs: %v", ancestorsIDs)

    // fetch file IDs to remove
    var fIDs pq.Int64Array
    q := `SELECT array_agg(distinct f.id)
FROM files f
  INNER JOIN files_operations fo ON f.id = fo.file_id
  INNER JOIN operations o ON fo.operation_id = o.id AND o.type_id = ANY($1)
WHERE f.content_unit_id = $2 AND NOT f.id = ANY($3) 
`
    err = queries.Raw(q, pq.Array([]int64{
        common.OPERATION_TYPE_REGISTRY.ByName[common.OP_TRIM].ID,
        common.OPERATION_TYPE_REGISTRY.ByName[common.OP_CONVERT].ID,
    }), unit.ID, pq.Array(ancestorsIDs)).QueryRow(exec).Scan(&fIDs)
    if err != nil {
        return nil, errors.Wrap(err, "fetch file IDs to remove")
    }

    log.Infof("%d files to remove: %v", len(fIDs), fIDs)
    wasPublished := false
    if len(fIDs) > 0 {
        // actual removal
        _, err = models.Files(
            qm.WhereIn("id in ?", utils.ConvertArgsInt64(fIDs)...)).
            UpdateAll(exec, models.M{
                "removed_at": null.TimeFrom(time.Now().UTC()),
            })
        if err != nil {
            return nil, errors.Wrap(err, "Update files to remove")
        }

        // file removed events
        removedFiles, err := models.Files(
            qm.Select("id", "uid", "published"),
            qm.WhereIn("id in ?", utils.ConvertArgsInt64(fIDs)...)).
            All(exec)
        if err != nil {
            return nil, errors.Wrap(err, "Refresh files to remove")
        }

        for i := range removedFiles {
            evnts = append(evnts, events.FileRemoveEvent(removedFiles[i]))
            wasPublished = wasPublished || removedFiles[i].Published
        }

    }

    // unit published status change
    impact, err := FileLeftUnitImpact(exec, wasPublished, unit.ID)
    if err != nil {
        return nil, errors.Wrap(err, "File left impact")
    }
    evnts = append(evnts, impact.Events()...)

    return evnts, nil
}