ipfs-search/ipfs-search

View on GitHub
components/index/opensearch/index.go

Summary

Maintainability
A
0 mins
Test Coverage
package opensearch

import (
    "bytes"
    "context"
    "encoding/json"
    "fmt"
    "io"
    "log"

    opensearchutil "github.com/opensearch-project/opensearch-go/v2/opensearchutil"
    "go.opentelemetry.io/otel/codes"

    "github.com/ipfs-search/ipfs-search/components/index"
    "github.com/ipfs-search/ipfs-search/components/index/opensearch/bulkgetter"
)

const debug bool = false

// Index wraps an OpenSearch index to store documents
type Index struct {
    cfg *Config
    c   *Client
}

// New returns a new index.
func New(client *Client, cfg *Config) index.Index {
    if client == nil {
        panic("Index.New Client cannot be nil.")
    }

    if cfg == nil {
        panic("Index.New Config cannot be nil.")
    }

    index := &Index{
        c:   client,
        cfg: cfg,
    }

    return index
}

// String returns the name of the index, for convenient logging.
func (i *Index) String() string {
    return i.cfg.Name
}

func getBody(v interface{}) (io.ReadSeeker, error) {
    b, err := json.Marshal(v)
    if err != nil {
        return nil, err
    }

    return bytes.NewReader(b), nil
}

// index wraps BulkIndexer.Add().
func (i *Index) index(
    ctx context.Context,
    action string,
    id string,
    properties interface{},
) error {
    ctx, span := i.c.Tracer.Start(ctx, "index.opensearch.index")
    defer span.End()

    var (
        body io.ReadSeeker
        err  error
    )

    if properties != nil {
        if action == "update" {
            // For updates, the updated fields need to be wrapped in a `doc` field
            body, err = getBody(struct {
                Doc interface{} `json:"doc"`
            }{properties})
        } else {
            body, err = getBody(properties)
        }
        if err != nil {
            panic(err)
        }
    }

    item := opensearchutil.BulkIndexerItem{
        Index:      i.cfg.Name,
        Action:     action,
        Body:       body,
        DocumentID: id,
        Version:    nil,
        OnFailure: func(
            ctx context.Context,
            item opensearchutil.BulkIndexerItem,
            res opensearchutil.BulkIndexerResponseItem, err error,
        ) {
            if err == nil {
                err = fmt.Errorf("Error flushing: %+v (%s)", res, id)
            }

            span.RecordError(err)
            log.Println(err)

        },
    }

    ctx, span = i.c.Tracer.Start(ctx, "index.opensearch.bulkIndexer.Add")
    defer span.End()

    err = i.c.bulkIndexer.Add(ctx, item)
    if err != nil {
        span.RecordError(err)
        span.SetStatus(codes.Error, "Error adding to BulkIndexer.")
    }

    return err
}

// Index a document's properties, identified by id
func (i *Index) Index(ctx context.Context, id string, properties interface{}) error {
    return i.index(ctx, "create", id, properties)
}

// Update a document's properties, given id
func (i *Index) Update(ctx context.Context, id string, properties interface{}) error {
    return i.index(ctx, "update", id, properties)
}

// Delete item from index
func (i *Index) Delete(ctx context.Context, id string) error {
    return i.index(ctx, "delete", id, nil)
}

// Get retreives `fields` from document with `id` from the index, returning:
// - (true, decoding_error) if found (decoding error set when errors in json)
// - (false, nil) when not found
// - (false, error) otherwise
func (i *Index) Get(ctx context.Context, id string, dst interface{}, fields ...string) (bool, error) {
    ctx, span := i.c.Tracer.Start(ctx, "index.opensearch.Get")
    defer span.End()

    req := bulkgetter.GetRequest{
        Index:      i.cfg.Name,
        DocumentID: id,
        Fields:     fields,
    }

    resp := <-i.c.bulkGetter.Get(ctx, &req, dst)

    if debug {
        if resp.Found {
            log.Printf("opensearch: found %s in %s", id, i)
        } else {
            if resp.Error != nil {
                log.Printf("opensearch: error getting %s in %s: %v", id, i, resp.Error)
            }
        }
    }

    return resp.Found, resp.Error
}

// Compile-time assurance that implementation satisfies interface.
var _ index.Index = &Index{}