manager/state/store/tasks.go
package store
import (
"strconv"
"strings"
memdb "github.com/hashicorp/go-memdb"
"github.com/moby/swarmkit/v2/api"
"github.com/moby/swarmkit/v2/api/naming"
)
const tableTask = "task"
func init() {
register(ObjectStoreConfig{
Table: &memdb.TableSchema{
Name: tableTask,
Indexes: map[string]*memdb.IndexSchema{
indexID: {
Name: indexID,
Unique: true,
Indexer: api.TaskIndexerByID{},
},
indexName: {
Name: indexName,
AllowMissing: true,
Indexer: taskIndexerByName{},
},
indexRuntime: {
Name: indexRuntime,
AllowMissing: true,
Indexer: taskIndexerByRuntime{},
},
indexServiceID: {
Name: indexServiceID,
AllowMissing: true,
Indexer: taskIndexerByServiceID{},
},
indexNodeID: {
Name: indexNodeID,
AllowMissing: true,
Indexer: taskIndexerByNodeID{},
},
indexSlot: {
Name: indexSlot,
AllowMissing: true,
Indexer: taskIndexerBySlot{},
},
indexDesiredState: {
Name: indexDesiredState,
Indexer: taskIndexerByDesiredState{},
},
indexTaskState: {
Name: indexTaskState,
Indexer: taskIndexerByTaskState{},
},
indexNetwork: {
Name: indexNetwork,
AllowMissing: true,
Indexer: taskIndexerByNetwork{},
},
indexSecret: {
Name: indexSecret,
AllowMissing: true,
Indexer: taskIndexerBySecret{},
},
indexConfig: {
Name: indexConfig,
AllowMissing: true,
Indexer: taskIndexerByConfig{},
},
indexVolumeAttachment: {
Name: indexVolumeAttachment,
AllowMissing: true,
Indexer: taskIndexerByVolumeAttachment{},
},
indexCustom: {
Name: indexCustom,
Indexer: api.TaskCustomIndexer{},
AllowMissing: true,
},
},
},
Save: func(tx ReadTx, snapshot *api.StoreSnapshot) error {
var err error
snapshot.Tasks, err = FindTasks(tx, All)
return err
},
Restore: func(tx Tx, snapshot *api.StoreSnapshot) error {
toStoreObj := make([]api.StoreObject, len(snapshot.Tasks))
for i, x := range snapshot.Tasks {
toStoreObj[i] = x
}
return RestoreTable(tx, tableTask, toStoreObj)
},
ApplyStoreAction: func(tx Tx, sa api.StoreAction) error {
switch v := sa.Target.(type) {
case *api.StoreAction_Task:
obj := v.Task
switch sa.Action {
case api.StoreActionKindCreate:
return CreateTask(tx, obj)
case api.StoreActionKindUpdate:
return UpdateTask(tx, obj)
case api.StoreActionKindRemove:
return DeleteTask(tx, obj.ID)
}
}
return errUnknownStoreAction
},
})
}
// CreateTask adds a new task to the store.
// Returns ErrExist if the ID is already taken.
func CreateTask(tx Tx, t *api.Task) error {
return tx.create(tableTask, t)
}
// UpdateTask updates an existing task in the store.
// Returns ErrNotExist if the node doesn't exist.
func UpdateTask(tx Tx, t *api.Task) error {
return tx.update(tableTask, t)
}
// DeleteTask removes a task from the store.
// Returns ErrNotExist if the task doesn't exist.
func DeleteTask(tx Tx, id string) error {
return tx.delete(tableTask, id)
}
// GetTask looks up a task by ID.
// Returns nil if the task doesn't exist.
func GetTask(tx ReadTx, id string) *api.Task {
t := tx.get(tableTask, id)
if t == nil {
return nil
}
return t.(*api.Task)
}
// FindTasks selects a set of tasks and returns them.
func FindTasks(tx ReadTx, by By) ([]*api.Task, error) {
checkType := func(by By) error {
switch by.(type) {
case byName, byNamePrefix, byIDPrefix, byRuntime, byDesiredState, byTaskState, byNode, byService, bySlot, byReferencedNetworkID, byReferencedSecretID, byReferencedConfigID, byVolumeAttachment, byCustom, byCustomPrefix:
return nil
default:
return ErrInvalidFindBy
}
}
taskList := []*api.Task{}
appendResult := func(o api.StoreObject) {
taskList = append(taskList, o.(*api.Task))
}
err := tx.find(tableTask, by, checkType, appendResult)
return taskList, err
}
type taskIndexerByName struct{}
func (ti taskIndexerByName) FromArgs(args ...interface{}) ([]byte, error) {
return fromArgs(args...)
}
func (ti taskIndexerByName) FromObject(obj interface{}) (bool, []byte, error) {
t := obj.(*api.Task)
name := naming.Task(t)
// Add the null character as a terminator
return true, []byte(strings.ToLower(name) + "\x00"), nil
}
func (ti taskIndexerByName) PrefixFromArgs(args ...interface{}) ([]byte, error) {
return prefixFromArgs(args...)
}
type taskIndexerByRuntime struct{}
func (ti taskIndexerByRuntime) FromArgs(args ...interface{}) ([]byte, error) {
return fromArgs(args...)
}
func (ti taskIndexerByRuntime) FromObject(obj interface{}) (bool, []byte, error) {
t := obj.(*api.Task)
r, err := naming.Runtime(t.Spec)
if err != nil {
return false, nil, nil
}
return true, []byte(r + "\x00"), nil
}
func (ti taskIndexerByRuntime) PrefixFromArgs(args ...interface{}) ([]byte, error) {
return prefixFromArgs(args...)
}
type taskIndexerByServiceID struct{}
func (ti taskIndexerByServiceID) FromArgs(args ...interface{}) ([]byte, error) {
return fromArgs(args...)
}
func (ti taskIndexerByServiceID) FromObject(obj interface{}) (bool, []byte, error) {
t := obj.(*api.Task)
// Add the null character as a terminator
val := t.ServiceID + "\x00"
return true, []byte(val), nil
}
type taskIndexerByNodeID struct{}
func (ti taskIndexerByNodeID) FromArgs(args ...interface{}) ([]byte, error) {
return fromArgs(args...)
}
func (ti taskIndexerByNodeID) FromObject(obj interface{}) (bool, []byte, error) {
t := obj.(*api.Task)
// Add the null character as a terminator
val := t.NodeID + "\x00"
return true, []byte(val), nil
}
type taskIndexerBySlot struct{}
func (ti taskIndexerBySlot) FromArgs(args ...interface{}) ([]byte, error) {
return fromArgs(args...)
}
func (ti taskIndexerBySlot) FromObject(obj interface{}) (bool, []byte, error) {
t := obj.(*api.Task)
// Add the null character as a terminator
val := t.ServiceID + "\x00" + strconv.FormatUint(t.Slot, 10) + "\x00"
return true, []byte(val), nil
}
type taskIndexerByDesiredState struct{}
func (ti taskIndexerByDesiredState) FromArgs(args ...interface{}) ([]byte, error) {
return fromArgs(args...)
}
func (ti taskIndexerByDesiredState) FromObject(obj interface{}) (bool, []byte, error) {
t := obj.(*api.Task)
// Add the null character as a terminator
return true, []byte(strconv.FormatInt(int64(t.DesiredState), 10) + "\x00"), nil
}
type taskIndexerByNetwork struct{}
func (ti taskIndexerByNetwork) FromArgs(args ...interface{}) ([]byte, error) {
return fromArgs(args...)
}
func (ti taskIndexerByNetwork) FromObject(obj interface{}) (bool, [][]byte, error) {
t := obj.(*api.Task)
var networkIDs [][]byte
for _, na := range t.Spec.Networks {
// Add the null character as a terminator
networkIDs = append(networkIDs, []byte(na.Target+"\x00"))
}
return len(networkIDs) != 0, networkIDs, nil
}
type taskIndexerBySecret struct{}
func (ti taskIndexerBySecret) FromArgs(args ...interface{}) ([]byte, error) {
return fromArgs(args...)
}
func (ti taskIndexerBySecret) FromObject(obj interface{}) (bool, [][]byte, error) {
t := obj.(*api.Task)
container := t.Spec.GetContainer()
if container == nil {
return false, nil, nil
}
var secretIDs [][]byte
for _, secretRef := range container.Secrets {
// Add the null character as a terminator
secretIDs = append(secretIDs, []byte(secretRef.SecretID+"\x00"))
}
return len(secretIDs) != 0, secretIDs, nil
}
type taskIndexerByConfig struct{}
func (ti taskIndexerByConfig) FromArgs(args ...interface{}) ([]byte, error) {
return fromArgs(args...)
}
func (ti taskIndexerByConfig) FromObject(obj interface{}) (bool, [][]byte, error) {
t, ok := obj.(*api.Task)
if !ok {
panic("unexpected type passed to FromObject")
}
container := t.Spec.GetContainer()
if container == nil {
return false, nil, nil
}
var configIDs [][]byte
for _, configRef := range container.Configs {
// Add the null character as a terminator
configIDs = append(configIDs, []byte(configRef.ConfigID+"\x00"))
}
return len(configIDs) != 0, configIDs, nil
}
type taskIndexerByVolumeAttachment struct{}
func (ti taskIndexerByVolumeAttachment) FromArgs(args ...interface{}) ([]byte, error) {
return fromArgs(args...)
}
func (ti taskIndexerByVolumeAttachment) FromObject(obj interface{}) (bool, [][]byte, error) {
t, ok := obj.(*api.Task)
if !ok {
panic("unexpected type passed to FromObject")
}
var volumeIDs [][]byte
for _, v := range t.Volumes {
volumeIDs = append(volumeIDs, []byte(v.ID+"\x00"))
}
return len(volumeIDs) != 0, volumeIDs, nil
}
type taskIndexerByTaskState struct{}
func (ts taskIndexerByTaskState) FromArgs(args ...interface{}) ([]byte, error) {
return fromArgs(args...)
}
func (ts taskIndexerByTaskState) FromObject(obj interface{}) (bool, []byte, error) {
t := obj.(*api.Task)
// Add the null character as a terminator
return true, []byte(strconv.FormatInt(int64(t.Status.State), 10) + "\x00"), nil
}