1set/starlet

View on GitHub
dataconv/share.go

Summary

Maintainability
A
1 hr
Test Coverage
package dataconv

import (
    "fmt"
    "sort"
    "sync"

    tps "github.com/1set/starlet/dataconv/types"
    itn "github.com/1set/starlet/internal"
    stdjson "go.starlark.net/lib/json"
    "go.starlark.net/starlark"
    "go.starlark.net/syntax"
)

// SharedDict represents a thread-safe dictionary that can be concurrently accessed and modified by multiple Starlark threads.
// This synchronization is crucial in concurrent environments where Starlark scripts are executed in parallel, ensuring data consistency and preventing race conditions.
//
// The internal state of a SharedDict includes a standard Starlark dictionary (`*starlark.Dict`), a mutex (`sync.RWMutex`) to manage concurrent access,
// and a boolean flag indicating whether the dictionary is frozen. A frozen SharedDict cannot be modified, aligning with Starlark's immutability rules for frozen values.
// Additionally, SharedDict supports custom naming through the 'name' field, allowing for more descriptive representations and debugging.
//
// Constructors:
// - NewSharedDict: Initializes a new SharedDict with default settings.
// - NewNamedSharedDict: Creates a new SharedDict with a specified name, providing clarity when multiple SharedDicts are used.
// - NewSharedDictFromDict: Generates a new SharedDict based on an existing Starlark dictionary. It attempts to clone the original dictionary to preserve immutability.
//
// Methods like Len, CloneDict, ToJSON, LoadJSON provide additional functionalities like determining the dictionary's length, cloning, JSON serialization, and deserialization, enhancing the utility of SharedDict in various use cases.
//
// SharedDict integrates tightly with Starlark's concurrency model, offering a robust solution for managing shared state across threads.
// By encapsulating thread safety mechanisms and providing a familiar dictionary interface, SharedDict facilitates the development of concurrent Starlark scripts with shared mutable state.
type SharedDict struct {
    _ itn.DoNotCompare
    sync.RWMutex
    dict   *starlark.Dict
    frozen bool
    name   string
}

const (
    defaultSharedDictSize = 8
    defaultSharedDictName = "shared_dict"
)

var (
    _ starlark.Value      = (*SharedDict)(nil)
    _ starlark.Comparable = (*SharedDict)(nil)
    _ starlark.Mapping    = (*SharedDict)(nil)
    _ starlark.HasAttrs   = (*SharedDict)(nil)
    _ starlark.HasSetKey  = (*SharedDict)(nil)
)

// NewSharedDict creates a new SharedDict instance.
func NewSharedDict() *SharedDict {
    return &SharedDict{
        dict: starlark.NewDict(defaultSharedDictSize),
    }
}

// NewNamedSharedDict creates a new SharedDict instance with the given name.
func NewNamedSharedDict(name string) *SharedDict {
    return &SharedDict{
        dict: starlark.NewDict(defaultSharedDictSize),
        name: name,
    }
}

// NewSharedDictFromDict creates a new SharedDict instance from the given starlark.Dict.
// It attempts to clone the dictionary, and returns the original dictionary if failed.
func NewSharedDictFromDict(d *starlark.Dict) *SharedDict {
    nd, err := CloneDict(d)
    if err != nil {
        nd = d
    }
    return &SharedDict{
        dict: nd,
    }
}

func (s *SharedDict) String() string {
    var v string
    if s != nil && s.dict != nil {
        v = s.dict.String()
    }
    return fmt.Sprintf("%s(%s)", s.getTypeName(), v)
}

// SetTypeName sets the type name of the SharedDict.
func (s *SharedDict) SetTypeName(name string) {
    s.name = name
}

// getTypeName returns the type name of the SharedDict.
func (s *SharedDict) getTypeName() string {
    if s.name == "" {
        return defaultSharedDictName
    }
    return s.name
}

// Type returns the type name of the SharedDict.
func (s *SharedDict) Type() string {
    return s.getTypeName()
}

// Freeze prevents the SharedDict from being modified.
func (s *SharedDict) Freeze() {
    s.Lock()
    defer s.Unlock()

    s.frozen = true
    if s.dict != nil {
        s.dict.Freeze()
    }
}

// Truth returns the truth value of the SharedDict.
func (s *SharedDict) Truth() starlark.Bool {
    s.RLock()
    defer s.RUnlock()

    return s != nil && s.dict != nil && s.dict.Truth()
}

// Hash returns the hash value of the SharedDict, actually it's not hashable.
func (s *SharedDict) Hash() (uint32, error) {
    return 0, fmt.Errorf("unhashable type: %s", s.getTypeName())
}

// Get returns the value corresponding to the specified key, or not found if the mapping does not contain the key.
// It implements the starlark.Mapping interface.
func (s *SharedDict) Get(k starlark.Value) (v starlark.Value, found bool, err error) {
    s.RLock()
    defer s.RUnlock()

    if s.dict != nil {
        return s.dict.Get(k)
    }
    return nil, false, nil
}

// SetKey sets the value for the specified key, supports update using x[k]=v syntax, like a dictionary.
// It implements the starlark.HasSetKey interface.
func (s *SharedDict) SetKey(k, v starlark.Value) error {
    s.Lock()
    defer s.Unlock()

    // basic check
    if s.frozen {
        return fmt.Errorf("frozen %s", s.Type())
    }

    // maybe create the dictionary (perhaps this line is unreachable)
    if s.dict == nil {
        s.dict = starlark.NewDict(defaultSharedDictSize)
    }

    // check if the value is a shared dict -- reject it
    if sd, ok := v.(*SharedDict); ok {
        return fmt.Errorf("unsupported value: %s", sd.Type())
    }
    return s.dict.SetKey(k, v)
}

// Attr returns the value of the specified attribute, or (nil, nil) if the attribute is not found.
// It implements the starlark.HasAttrs interface.
func (s *SharedDict) Attr(name string) (starlark.Value, error) {
    s.Lock()
    defer s.Unlock()

    // basic check
    if s.dict == nil {
        return nil, nil
    }

    var (
        attr starlark.Value
        err  error
    )
    // try to get the new custom builtin
    if b, ok := customSharedDictMethods[name]; ok {
        attr = b.BindReceiver(s.dict)
    } else {
        // get the builtin from the original dict
        attr, err = s.dict.Attr(name)
    }

    // convert to builtin
    if attr == nil || err != nil {
        return attr, err
    }
    btl, ok := attr.(*starlark.Builtin)
    if !ok {
        return nil, fmt.Errorf("unsupported attribute: %s", name)
    }

    // wrap the builtin
    return starlark.NewBuiltin(name, func(thread *starlark.Thread, b *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) {
        // lock the shared dict
        s.Lock()
        defer s.Unlock()

        // call the original builtin
        return starlark.Call(thread, btl, args, kwargs)
    }), nil
}

// AttrNames returns a new slice containing the names of all the attributes of the SharedDict.
// It implements the starlark.HasAttrs interface.
func (s *SharedDict) AttrNames() []string {
    if s.dict != nil {
        names := s.dict.AttrNames()
        for cn := range customSharedDictMethods {
            names = append(names, cn)
        }
        sort.Strings(names)
        return names
    }
    return nil
}

// CompareSameType compares the SharedDict with another value of the same type.
// It implements the starlark.Comparable interface.
func (s *SharedDict) CompareSameType(op syntax.Token, yv starlark.Value, depth int) (bool, error) {
    retEqualCheck := func(equal bool, op syntax.Token) (bool, error) {
        switch op {
        case syntax.EQL:
            return equal, nil
        case syntax.NEQ:
            return !equal, nil
        default:
            return false, fmt.Errorf("unsupported operator: %s", op)
        }
    }

    // if they are the same object, they are equal
    if s == yv {
        return retEqualCheck(true, op)
    }

    // scan the type
    y := yv.(*SharedDict)

    // lock both objects
    s.RLock()
    defer s.RUnlock()
    y.RLock()
    defer y.RUnlock()

    // compare the underlying dictionaries
    if s.dict != nil && y.dict != nil {
        return s.dict.CompareSameType(op, y.dict, depth)
    } else if s.dict == nil && y.dict == nil {
        // both are nil, they are equal, aha! (nil == nil)
        return retEqualCheck(true, op)
    }

    // one is nil, the other is not, they are not equal
    return retEqualCheck(false, op)
}

// Len returns the length of the underlying dictionary.
// Notice that this method is not a must for the starlark.Value interface, but it's useful for Go code.
func (s *SharedDict) Len() int {
    s.RLock()
    defer s.RUnlock()

    if s.dict != nil {
        return s.dict.Len()
    }
    return 0
}

// CloneDict creates a shallow copy of the underlying Starlark dictionary contained within the SharedDict instance.
// This method is particularly valuable when a snapshot of the current state of the dictionary is needed without affecting the original dictionary.
// It ensures that modifications to the returned dictionary do not impact the source SharedDict, providing a mechanism for safe, concurrent read operations.
func (s *SharedDict) CloneDict() (*starlark.Dict, error) {
    s.RLock()
    defer s.RUnlock()

    return CloneDict(s.dict)
}

// ToJSON serializes the SharedDict instance into a JSON string representation.
// This method facilitates the conversion of complex, nested data structures stored within a SharedDict into a universally recognizable format (JSON),
// making it easier to export or log the data contained within the SharedDict.
//
// It is important to note that the serialization process adheres to JSON's limitations, such as not supporting circular references. If the SharedDict contains
// circular references or types not supported by JSON (e.g., functions), `ToJSON` will return an error.
func (s *SharedDict) ToJSON() (string, error) {
    return EncodeStarlarkJSON(s.dict)
}

// LoadJSON updates the SharedDict instance with key-value pairs decoded from a given JSON string.
// This method provides a convenient way to populate or update the contents of a SharedDict with data received in JSON format,
// such as from a configuration file, a network request, or any external data source.
//
// The method attempts to merge the contents of the JSON string into the existing SharedDict. In cases where keys overlap,
// the values specified in the JSON string will overwrite those in the SharedDict.
//
// It's important to ensure that the JSON string represents a dictionary/object structure; otherwise, `LoadJSON` will return an error.
// Also, the SharedDict must not be frozen; attempting to modify a frozen SharedDict will result in an error.
func (s *SharedDict) LoadJSON(jsonStr string) error {
    // check the dict itself
    if s == nil {
        return fmt.Errorf("nil shared dict")
    }

    // json decode
    val, err := DecodeStarlarkJSON([]byte(jsonStr))
    if err != nil {
        return err
    }

    // convert to dict
    nd, ok := val.(*starlark.Dict)
    if !ok {
        return fmt.Errorf("got %s result, want dict", val.Type())
    }

    // lock the shared dict
    s.Lock()
    defer s.Unlock()

    // merge the new dict into the shared dict
    for _, r := range nd.Items() {
        if len(r) < 2 {
            continue
        }
        if e := s.dict.SetKey(r[0], r[1]); e != nil {
            return e
        }
    }
    return nil
}

var (
    customSharedDictMethods = map[string]*starlark.Builtin{
        "len":       starlark.NewBuiltin("len", sharedDictLen),
        "perform":   starlark.NewBuiltin("perform", sharedDictPerform),
        "to_dict":   starlark.NewBuiltin("to_dict", sharedDictToDict),
        "to_json":   starlark.NewBuiltin("to_json", sharedDictToJSON),
        "from_json": starlark.NewBuiltin("from_json", sharedDictFromJSON),
    }
)

// sharedDictLen returns the length of the underlying dictionary.
func sharedDictLen(_ *starlark.Thread, b *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) {
    if err := starlark.UnpackPositionalArgs(b.Name(), args, kwargs, 0); err != nil {
        return nil, err
    }
    l := b.Receiver().(*starlark.Dict).Len()
    return starlark.MakeInt(l), nil
}

// sharedDictPerform calls the given function with the underlying receiver dictionary, and returns the result.
// The function must be callable, like def perform(fn).
func sharedDictPerform(thread *starlark.Thread, b *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) {
    // get the perform function
    var pr starlark.Value
    if err := starlark.UnpackArgs(b.Name(), args, kwargs, "fn", &pr); err != nil {
        return nil, err
    }

    // get the receiver
    d := b.Receiver().(*starlark.Dict)

    // call the function with the receiver
    switch pr := pr.(type) {
    case starlark.Callable:
        return starlark.Call(thread, pr, starlark.Tuple{d}, nil)
    default:
        return nil, fmt.Errorf("%s: not callable type: %s", b.Name(), pr.Type())
    }
}

// sharedDictToDict returns the shadow-clone of underlying dictionary.
func sharedDictToDict(thread *starlark.Thread, b *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) {
    if err := starlark.UnpackPositionalArgs(b.Name(), args, kwargs, 0); err != nil {
        return nil, err
    }
    // get the receiver
    od := b.Receiver().(*starlark.Dict)

    // clone the dictionary
    return CloneDict(od)
}

// sharedDictToJSON converts the underlying dictionary to a JSON string.
func sharedDictToJSON(thread *starlark.Thread, b *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) {
    // check the arguments: no arguments
    if err := starlark.UnpackPositionalArgs(b.Name(), args, kwargs, 0); err != nil {
        return nil, err
    }

    // get the receiver
    d := b.Receiver().(*starlark.Dict)

    // get the JSON encoder
    jm, ok := stdjson.Module.Members["encode"]
    if !ok {
        return nil, fmt.Errorf("json.encode not found")
    }
    enc := jm.(*starlark.Builtin)

    // convert to JSON
    return starlark.Call(thread, enc, starlark.Tuple{d}, nil)
}

// sharedDictFromJSON converts a starlark.Value to a starlark.Dict, and wraps it with a SharedDict.
func sharedDictFromJSON(thread *starlark.Thread, b *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) {
    // check the arguments
    var s tps.StringOrBytes
    if err := starlark.UnpackArgs(b.Name(), args, kwargs, "x", &s); err != nil {
        return nil, err
    }

    // json decode
    v, err := DecodeStarlarkJSON(s.GoBytes())
    if err != nil {
        return nil, err
    }

    // convert to dict
    nd, ok := v.(*starlark.Dict)
    if !ok {
        return nil, fmt.Errorf("got %s, want dict", v.Type())
    }

    // merge the new dict into a shared dict
    od := b.Receiver().(*starlark.Dict)
    for _, r := range nd.Items() {
        if len(r) < 2 {
            continue
        }
        if e := od.SetKey(r[0], r[1]); e != nil {
            return nil, e
        }
    }

    // return new json dict
    return nd, nil
}

// CloneDict returns a shadow-clone of the given dictionary. It's safe to call it with a nil dictionary, it will return a new empty dictionary.
func CloneDict(od *starlark.Dict) (*starlark.Dict, error) {
    if od == nil {
        return starlark.NewDict(defaultSharedDictSize), nil
    }
    nd := starlark.NewDict(od.Len())
    for _, r := range od.Items() {
        if len(r) < 2 {
            continue
        }
        if e := nd.SetKey(r[0], r[1]); e != nil {
            return nil, e
        }
    }
    return nd, nil
}