ReanGD/go-web-search

View on GitHub
content/db_worker.go

Summary

Maintainability
B
5 hrs
Test Coverage
package content

import (
    "database/sql"
    "fmt"
    "log"
    "sync"

    "github.com/ReanGD/go-web-search/database"
    "github.com/ReanGD/go-web-search/proxy"
    "github.com/jinzhu/gorm"
)

// DBWorker - worker for save data to db
type DBWorker struct {
    DB   *DBrw
    ChDB <-chan *proxy.PageData
}

func (w *DBWorker) getURLIDByStr(tr *DBrw, urlStr string) (sql.NullInt64, error) {
    var urlRec URL
    err := tr.Where("url = ?", urlStr).First(&urlRec).Error
    if err == gorm.ErrRecordNotFound {
        return sql.NullInt64{Valid: false}, nil
    } else if err != nil {
        return sql.NullInt64{Valid: false}, fmt.Errorf("find in 'URL' table for URL %s, message: %s", urlStr, err)
    } else {
        return sql.NullInt64{Int64: urlRec.ID, Valid: true}, nil
    }
}

func (w *DBWorker) markURLLoaded(tr *DBrw, id sql.NullInt64, urlStr string, hostID sql.NullInt64) (int64, error) {
    var errID int64
    var urlRec URL
    if !id.Valid {
        urlRec = URL{
            URL:    urlStr,
            HostID: hostID,
            Loaded: true}
        err := tr.Create(&urlRec).Error
        if err != nil {
            return errID, fmt.Errorf("add new 'URL' record for URL %s, message: %s", urlStr, err)
        }
        return urlRec.ID, nil
    }
    err := tr.Model(&urlRec).Where("id = ?", id.Int64).Update("Loaded", true).Error
    if err != nil {
        return errID, fmt.Errorf("update 'URL' table with URL %s, message: %s", urlStr, err)
    }
    return id.Int64, nil
}

func (w *DBWorker) insertURLIfNotExists(tr *DBrw, urlStr string, hostID sql.NullInt64) (int64, error) {
    var rec URL
    err := tr.Where("url = ?", urlStr).First(&rec).Error
    if err == gorm.ErrRecordNotFound {
        rec = URL{URL: urlStr, HostID: hostID, Loaded: false}
        err = tr.Create(&rec).Error
        if err != nil {
            return rec.ID, fmt.Errorf("add new 'URL' record for URL %s, message: %s", urlStr, err)
        }
    } else if err != nil {
        return rec.ID, fmt.Errorf("find in 'URL' table for URL %s, message: %s", urlStr, err)
    }

    return rec.ID, nil
}

func (w *DBWorker) insertLinkIfNotExists(tr *DBrw, master int64, slave int64) error {
    var rec Link
    err := tr.Where("master = ? and slave = ?", master, slave).First(&rec).Error
    if err == gorm.ErrRecordNotFound {
        rec = Link{Master: master, Slave: slave}
        err = tr.Create(&rec).Error
        if err != nil {
            return fmt.Errorf("add new 'Link' record for master %d and slave %d, message: %s",
                uint64(master), uint64(slave), err)
        }
    } else if err != nil {
        return fmt.Errorf("find in 'Link' table for master %d and slave %d, message: %s",
            uint64(master), uint64(slave), err)
    }

    return nil
}

func (w *DBWorker) saveMeta(tr *DBrw, meta *proxy.Meta, origin sql.NullInt64) error {
    hostID := meta.GetHostID()
    urlStr := meta.GetURL()
    urlNullID, err := w.getURLIDByStr(tr, urlStr)
    if err != nil {
        return err
    }
    urlID, err := w.markURLLoaded(tr, urlNullID, urlStr, hostID)
    if err != nil {
        return err
    }
    if origin.Valid {
        err = w.insertLinkIfNotExists(tr, urlID, origin.Int64)
        if err != nil {
            return err
        }
    }

    var metaRec database.Meta
    err = tr.Where("url = ?", urlID).First(&metaRec).Error
    if err == gorm.ErrRecordNotFound {
        if !origin.Valid {
            hash := meta.GetHash()
            if len(hash) != 0 {
                origin = tr.FindOrigin(hash)
            }
        }
        meta.SetOrigin(origin)

        if meta.GetState() == database.StateSuccess {
            content := meta.GetContent()
            if content == nil {
                return fmt.Errorf("field 'content' is nil")
            }
            err = tr.Create(content.GetContent(urlID)).Error
            if err != nil {
                return fmt.Errorf("add new 'Content' record for URL %s, message: %s", urlStr, err)
            }
        }

        err = tr.Create(meta.GetMeta(urlID)).Error
        if err != nil {
            return fmt.Errorf("add new 'Meta' record for URL %s, message: %s", urlStr, err)
        }
        hash := meta.GetHash()
        if len(hash) != 0 {
            tr.AddHash(hash, urlID)
        }

        if meta.GetReferer() != nil {
            err = w.saveMeta(tr, meta.GetReferer(), sql.NullInt64{Int64: urlID, Valid: true})
            if err != nil {
                return err
            }
        }
    } else if err != nil {
        return fmt.Errorf("find in 'Meta' table for URL %s, message: %s", urlStr, err)
    } else {
        if meta.GetReferer() != nil {
            err = w.saveMeta(tr, meta.GetReferer(), sql.NullInt64{Int64: urlID, Valid: true})
            if err != nil {
                return err
            }
        }
    }

    return nil
}

func (w *DBWorker) savePageData(tr *DBrw, data *proxy.PageData) error {
    err := w.saveMeta(tr, data.GetMeta(), sql.NullInt64{Valid: false})
    if err != nil {
        return err
    }

    var id int64
    for urlStr, hostID := range data.GetURLs() {
        id, err = w.insertURLIfNotExists(tr, urlStr, hostID)
        if err != nil {
            return err
        }
        err = w.insertLinkIfNotExists(tr, data.GetParentURL(), id)
        if err != nil {
            return err
        }
    }

    return nil
}

// Start - run db write worker
func (w *DBWorker) Start(wgParent *sync.WaitGroup) {
    defer wgParent.Done()

    finish := false
    for !finish {
        err := w.DB.Transaction(func(tr *DBrw) error {
            for i := 0; i != 100; i++ {
                data, more := <-w.ChDB
                if !more {
                    finish = true
                    break
                }

                err := w.savePageData(tr, data)
                if err != nil {
                    return err
                }
            }
            return nil
        })

        if err != nil {
            log.Printf("ERROR: %s", err)
        }
    }
}