timoth-y/kicksware-api

View on GitHub
services/references/usecase/storage/mongo/repository.go

Summary

Maintainability
B
4 hrs
Test Coverage
package mongo

import (
    "context"
    "crypto/tls"
    "crypto/x509"
    "io/ioutil"
    "time"

    "github.com/pkg/errors"
    "go.kicksware.com/api/shared/config"
    "go.mongodb.org/mongo-driver/bson"
    "go.mongodb.org/mongo-driver/mongo"
    "go.mongodb.org/mongo-driver/mongo/options"
    "go.mongodb.org/mongo-driver/mongo/readpref"

    "github.com/golang/glog"
    TLS "go.kicksware.com/api/shared/core/meta"
    "go.kicksware.com/api/shared/util"

    "go.kicksware.com/api/shared/core/meta"

    "go.kicksware.com/api/services/references/core/model"
    "go.kicksware.com/api/services/references/core/repo"
    "go.kicksware.com/api/services/references/usecase/business"
)

type repository struct {
    client     *mongo.Client
    database   *mongo.Database
    collection *mongo.Collection
    timeout    time.Duration
}

func NewMongoRepository(config config.DataStoreConfig) (repo.SneakerReferenceRepository, error) {
    repo := &repository{
        timeout:  time.Duration(config.Timeout) * time.Second,
    }
    client, err := newMongoClient(config); if err != nil {
        return nil, errors.Wrap(err, "repository.NewMongoRepository")
    }
    repo.client = client
    database := client.Database(config.Database)
    repo.database = database
    repo.collection = database.Collection(config.Collection)
    return repo, nil
}

func newMongoClient(config config.DataStoreConfig) (*mongo.Client, error) {
    ctx, cancel := context.WithTimeout(context.Background(), time.Duration(config.Timeout)*time.Second)
    defer cancel()
    client, err := mongo.Connect(ctx, options.Client().
        ApplyURI(config.URL),
    )
    err = client.Ping(ctx, readpref.Primary()); if err != nil {
        return nil, err
    }
    return client, nil
}

func newTLSConfig(tlsConfig *TLS.TLSCertificate) *tls.Config {
    if !tlsConfig.EnableTLS {
        return nil
    }
    certs := x509.NewCertPool()
    pem, err := ioutil.ReadFile(tlsConfig.CertFile); if err != nil {
        glog.Fatalln(err)
    }
    certs.AppendCertsFromPEM(pem)
    return &tls.Config{
        RootCAs: certs,
    }
}

func (r *repository) FetchOne(code string, params *meta.RequestParams) (*model.SneakerReference, error) {
    ctx, cancel := context.WithTimeout(context.Background(), r.timeout)
    defer cancel()

    query := r.buildQueryPipeline(bson.M{"uniqueid": code}, params)
    cursor, err := r.collection.Aggregate(ctx, query); if err != nil {
        return nil, errors.Wrap(err, "repository.SneakerReference.FetchOne")
    }
    defer cursor.Close(ctx)

    var sneakerReferences []*model.SneakerReference
    if err = cursor.All(ctx, &sneakerReferences); err != nil {
        return nil, errors.Wrap(err, "repository.SneakerReference.FetchOne")
    }
    if sneakerReferences == nil || len(sneakerReferences) == 0 {
        if err == mongo.ErrNoDocuments{
            return nil, errors.Wrap(business.ErrReferenceNotFound, "repository.SneakerReference.FetchOne")
        }
        return nil, errors.Wrap(err, "repository.SneakerReference.FetchOne")
    }
    return sneakerReferences[0], nil
}

func (r *repository) Fetch(codes []string, params *meta.RequestParams) ([]*model.SneakerReference, error) {
    ctx, cancel := context.WithTimeout(context.Background(), r.timeout)
    defer cancel()

    query := r.buildQueryPipeline(bson.M{"uniqueid": bson.M{"$in": codes}}, params)
    cursor, err := r.collection.Aggregate(ctx, query); if err != nil  {
        return nil, errors.Wrap(err, "repository.SneakerReference.Fetch")
    }
    defer cursor.Close(ctx)

    var sneakerReferences []*model.SneakerReference
    if err = cursor.All(ctx, &sneakerReferences); err != nil {
        return nil, errors.Wrap(err, "repository.SneakerReference.Fetch")
    }
    if sneakerReferences == nil || len(sneakerReferences) == 0 {
        if err == mongo.ErrNoDocuments {
            return nil, errors.Wrap(business.ErrReferenceNotFound, "repository.SneakerReference.Fetch")
        }
        return nil, errors.Wrap(err, "repository.SneakerReference.Fetch")
    }
    return sneakerReferences, nil
}

func (r *repository) FetchAll(params *meta.RequestParams) ([]*model.SneakerReference, error) {
    ctx, cancel := context.WithTimeout(context.Background(), r.timeout)
    defer cancel()

    query := r.buildQueryPipeline(bson.M{}, params)
    cursor, err := r.collection.Aggregate(ctx, query); if err != nil  {
        return nil, errors.Wrap(err, "repository.SneakerReference.FetchAll")
    }
    defer cursor.Close(ctx)

    var sneakerReference []*model.SneakerReference
    if err = cursor.All(ctx, &sneakerReference); err != nil {
        if err == mongo.ErrNoDocuments {
            return nil, errors.Wrap(business.ErrReferenceNotFound, "repository.SneakerReference.FetchAll")
        }
        return nil, errors.Wrap(err, "repository.SneakerReference.FetchAll")
    }
    return sneakerReference, nil
}

func (r *repository) FetchQuery(query meta.RequestQuery, params *meta.RequestParams) ([]*model.SneakerReference, error) {
    ctx, cancel := context.WithTimeout(context.Background(), r.timeout)
    defer cancel()

    filter, err := query.ToBson(); if err != nil {
        return nil, errors.Wrap(err, "repository.SneakerReference.FetchQuery")
    }
    queryPipe := r.buildQueryPipeline(filter, params)
    cursor, err := r.collection.Aggregate(ctx, queryPipe)
    if err != nil  {
        return nil, errors.Wrap(err, "repository.SneakerReference.FetchQuery")
    }
    defer cursor.Close(ctx)

    var sneakerReferences []*model.SneakerReference
    if err = cursor.All(ctx, &sneakerReferences); err != nil {
        return nil, errors.Wrap(err, "repository.SneakerReference.FetchQuery")
    }
    if sneakerReferences == nil || len(sneakerReferences) == 0 {
        return nil, errors.Wrap(business.ErrReferenceNotFound, "repository.SneakerReference.FetchQuery")
    }
    return sneakerReferences, nil
}

func (r *repository) StoreOne(sneakerReference *model.SneakerReference) error {
    ctx, cancel := context.WithTimeout(context.Background(), r.timeout)
    defer cancel()
    _, err := r.collection.InsertOne(ctx, sneakerReference)
    if err != nil {
        return errors.Wrap(err, "repository.SneakerReference.Store")
    }
    return nil
}

func (r *repository) Store(sneakerReferences []*model.SneakerReference) error {
    ctx, cancel := context.WithTimeout(context.Background(), r.timeout)
    defer cancel()
    bulk := make([]interface{}, len(sneakerReferences))
    for i := range sneakerReferences {
        bulk[i] = sneakerReferences[i]
    }
    _, err := r.collection.InsertMany(ctx, bulk)
    if err != nil {
        return errors.Wrap(err, "repository.SneakerReference.Store")
    }
    return nil
}

func (r *repository) Modify(sneakerReference *model.SneakerReference) error {
    ctx, cancel := context.WithTimeout(context.Background(), r.timeout)
    defer cancel()
    doc, err := util.ToBsonMap(sneakerReference); if err != nil {
        return errors.Wrap(err, "repository.SneakerReference.Modify")
    }
    update := bson.D{
        {"$set", doc},
    }
    filter := bson.M{"uniqueid": sneakerReference.UniqueId}
    if _, err = r.collection.UpdateOne(ctx, filter, update); err != nil {
        return errors.Wrap(err, "repository.SneakerReference.Modify")
    }
    return nil
}

func (r *repository) Count(query meta.RequestQuery, params *meta.RequestParams) (int, error) {
    ctx, cancel := context.WithTimeout(context.Background(), r.timeout)
    defer cancel()

    filter, err := query.ToBson(); if err != nil {
        return 0, errors.Wrap(err, "repository.SneakerReference.Count")
    }

    count, err := r.collection.CountDocuments(ctx, filter); if err != nil {
        return 0, errors.Wrap(err, "repository.SneakerReference.Count")
    }
    return int(count), nil
}

func (r *repository) CountAll() (int, error) {
    ctx, cancel := context.WithTimeout(context.Background(), r.timeout)
    defer cancel()

    filter := bson.M{}
    count, err := r.collection.CountDocuments(ctx, filter); if err != nil {
        return 0, errors.Wrap(err, "repository.SneakerReference.Count")
    }
    return int(count), nil
}


func (r *repository) buildQueryPipeline(matchQuery bson.M, param *meta.RequestParams) mongo.Pipeline {
    pipe := mongo.Pipeline{}
    pipe = append(pipe, bson.D{{"$match", matchQuery}})

    pipe = append(pipe, bson.D{
        {"$lookup", bson.M {
            "from": "likes",
            "localField": "uniqueid",
            "foreignField": "entity_id",
            "as": "like",
        }},
    })

    pipe = append(pipe, bson.D {
        { "$addFields", bson.M {
            "likes": bson.M{"$size": "$like"},
        }},
    })

    if param != nil {
        if param.SortBy() != "" {
            pipe = append(pipe, bson.D {
                {"$sort", bson.M {param.SortBy(): param.SortDirectionNum(), "added_date": -1 }},
            })
        }
        if param.Offset() != 0 {
            pipe = append(pipe, bson.D {
                {"$skip", param.Offset()},
            })
        }
        if param.Limit() != 0 {
            pipe = append(pipe, bson.D {
                {"$limit",  param.Limit()},
            })
        }
    }

    if param != nil && len(param.UserID()) != 0 {
        pipe = append(pipe, bson.D {
            { "$addFields", bson.M {
                "liked": bson.M{"$in": bson.A{param.UserID(),"$like.user_id"}},
            }},
        })
    }

    pipe = append(pipe, bson.D {
        { "$project", bson.M {
            "like": 0,
        }},
    })

    pipe = append(pipe, bson.D {
        {"$lookup", bson.M {
            "from": "brands",
            "localField": "brand",
            "foreignField": "uniqueid",
            "as": "brand",
        }},
    })
    pipe = append(pipe, bson.D {{ "$unwind", bson.M{
        "path": "$brand",
        "preserveNullAndEmptyArrays": true,
    }},
    })

    pipe = append(pipe, bson.D {
        {"$lookup", bson.M {
            "from": "models",
            "localField": "model",
            "foreignField": "uniqueid",
            "as": "model",
        }},
    })
    pipe = append(pipe, bson.D {{ "$unwind", bson.M{
        "path": "$model",
        "preserveNullAndEmptyArrays": true,
    }},
    })

    pipe = append(pipe, bson.D{
        {"$lookup", bson.M {
            "from": "models",
            "localField": "basemodel",
            "foreignField": "uniqueid",
            "as": "basemodel",
        }},
    })
    pipe = append(pipe, bson.D{{ "$unwind", bson.M{
        "path": "$basemodel",
        "preserveNullAndEmptyArrays": true,
    }},
    })

    return pipe
}