internal/cmds/cmds.go
package cmds
import "strings"
const (
optInTag = uint16(1 << 15)
blockTag = uint16(1 << 14)
readonly = uint16(1 << 13)
noRetTag = uint16(1<<12) | readonly // make noRetTag can also be retried
mtGetTag = uint16(1<<11) | readonly // make mtGetTag can also be retried
scrRoTag = uint16(1<<10) | readonly // make scrRoTag can also be retried
// InitSlot indicates that the command be sent to any redis node in cluster
// When SendToReplicas is set, InitSlot command will be sent to primary node
InitSlot = uint16(1 << 14)
// NoSlot indicates that the command has no key slot specified
NoSlot = uint16(1 << 15)
)
var (
// OptInCmd is predefined CLIENT CACHING YES
OptInCmd = Completed{
cs: newCommandSlice([]string{"CLIENT", "CACHING", "YES"}),
cf: optInTag,
}
// MultiCmd is predefined MULTI
MultiCmd = Completed{
cs: newCommandSlice([]string{"MULTI"}),
}
// ExecCmd is predefined EXEC
ExecCmd = Completed{
cs: newCommandSlice([]string{"EXEC"}),
}
// RoleCmd is predefined ROLE
RoleCmd = Completed{
cs: newCommandSlice([]string{"ROLE"}),
}
// UnsubscribeCmd is predefined UNSUBSCRIBE
UnsubscribeCmd = Completed{
cs: newCommandSlice([]string{"UNSUBSCRIBE"}),
cf: noRetTag,
}
// PUnsubscribeCmd is predefined PUNSUBSCRIBE
PUnsubscribeCmd = Completed{
cs: newCommandSlice([]string{"PUNSUBSCRIBE"}),
cf: noRetTag,
}
// SUnsubscribeCmd is predefined SUNSUBSCRIBE
SUnsubscribeCmd = Completed{
cs: newCommandSlice([]string{"SUNSUBSCRIBE"}),
cf: noRetTag,
}
// PingCmd is predefined PING
PingCmd = Completed{
cs: newCommandSlice([]string{"PING"}),
}
// SlotCmd is predefined CLUSTER SLOTS
SlotCmd = Completed{
cs: newCommandSlice([]string{"CLUSTER", "SLOTS"}),
}
// ShardsCmd is predefined CLUSTER SHARDS
ShardsCmd = Completed{
cs: newCommandSlice([]string{"CLUSTER", "SHARDS"}),
}
// AskingCmd is predefined CLUSTER ASKING
AskingCmd = Completed{
cs: newCommandSlice([]string{"ASKING"}),
}
// SentinelSubscribe is predefined SUBSCRIBE ASKING
SentinelSubscribe = Completed{
cs: newCommandSlice([]string{"SUBSCRIBE", "+sentinel", "+slave", "-sdown", "+sdown", "+switch-master", "+reboot"}),
cf: noRetTag,
}
// SentinelUnSubscribe is predefined UNSUBSCRIBE ASKING
SentinelUnSubscribe = Completed{
cs: newCommandSlice([]string{"UNSUBSCRIBE", "+sentinel", "+slave", "-sdown", "+sdown", "+switch-master", "+reboot"}),
cf: noRetTag,
}
)
// ToBlock marks the command with blockTag
func ToBlock(c *Completed) {
c.cf |= blockTag
}
// Incomplete represents an incomplete Redis command. It should then be completed by calling the Build().
type Incomplete struct {
cs *CommandSlice
cf int16 // use int16 instead of uint16 to make a difference with Completed
ks uint16
}
// Completed represents a completed Redis command, should be created by the Build() of command builder.
type Completed struct {
cs *CommandSlice
cf uint16 // cmd flag
ks uint16 // key slot
}
// Pin prevents a Completed to be recycled
func (c Completed) Pin() Completed {
c.cs.r = 1
return c
}
// IsEmpty checks if it is an empty command.
func (c *Completed) IsEmpty() bool {
return c.cs == nil || len(c.cs.s) == 0
}
// IsOptIn checks if it is client side caching opt-int command.
func (c *Completed) IsOptIn() bool {
return c.cf&optInTag == optInTag
}
// IsBlock checks if it is blocking command which needs to be process by dedicated connection.
func (c *Completed) IsBlock() bool {
return c.cf&blockTag == blockTag
}
// NoReply checks if it is one of the SUBSCRIBE, PSUBSCRIBE, UNSUBSCRIBE or PUNSUBSCRIBE commands.
func (c *Completed) NoReply() bool {
return c.cf&noRetTag == noRetTag
}
// IsReadOnly checks if it is readonly command and can be retried when network error.
func (c *Completed) IsReadOnly() bool {
return c.cf&readonly == readonly
}
// IsWrite checks if it is not readonly command.
func (c *Completed) IsWrite() bool {
return !c.IsReadOnly()
}
// Commands returns the commands as []string.
// Note that the returned []string should not be modified
// and should not be read after passing into the Client interface, because it will be recycled.
func (c *Completed) Commands() []string {
return c.cs.s
}
// Slot returns the command key slot
func (c *Completed) Slot() uint16 {
return c.ks
}
// SetSlot returns a new completed command with its key slot be overridden
func (c Completed) SetSlot(key string) Completed {
if c.ks&NoSlot == NoSlot {
c.ks = NoSlot | slot(key)
} else {
c.ks = slot(key)
}
return c
}
// Cacheable represents a completed Redis command which supports server-assisted client side caching,
// and it should be created by the Cache() of command builder.
type Cacheable Completed
// Pin prevents a Cacheable to be recycled
func (c Cacheable) Pin() Cacheable {
c.cs.r = 1
return c
}
// Slot returns the command key slot
func (c *Cacheable) Slot() uint16 {
return c.ks
}
// Commands returns the commands as []string.
// Note that the returned []string should not be modified
// and should not be read after passing into the Client interface, because it will be recycled.
func (c *Cacheable) Commands() []string {
return c.cs.s
}
// IsMGet returns if the command is MGET
func (c *Cacheable) IsMGet() bool {
return c.cf == mtGetTag
}
// MGetCacheCmd returns the cache command of the MGET singular command
func MGetCacheCmd(c Cacheable) string {
if c.cs.s[0][0] == 'J' {
return "JSON.GET" + c.cs.s[len(c.cs.s)-1]
}
return "GET"
}
// MGetCacheKey returns the cache key of the MGET singular command
func MGetCacheKey(c Cacheable, i int) string {
return c.cs.s[i+1]
}
// CacheKey returns the cache key used by the server-assisted client side caching
func CacheKey(c Cacheable) (key, command string) {
if len(c.cs.s) == 2 {
return c.cs.s[1], c.cs.s[0]
}
kp := 1
if c.cf == scrRoTag {
if c.cs.s[2] != "1" {
panic(multiKeyCacheErr)
}
kp = 3
}
length := 0
for i, v := range c.cs.s {
if i == kp {
continue
}
length += len(v)
}
sb := strings.Builder{}
sb.Grow(length)
for i, v := range c.cs.s {
if i == kp {
key = v
} else {
sb.WriteString(v)
}
}
return key, sb.String()
}
// CompletedCS get the underlying *CommandSlice
func CompletedCS(c Completed) *CommandSlice {
return c.cs
}
// CacheableCS get the underlying *CommandSlice
func CacheableCS(c Cacheable) *CommandSlice {
return c.cs
}
// NewCompleted creates an arbitrary Completed command.
func NewCompleted(ss []string) Completed {
return Completed{cs: newCommandSlice(ss)}
}
// NewBlockingCompleted creates an arbitrary blocking Completed command.
func NewBlockingCompleted(ss []string) Completed {
return Completed{cs: newCommandSlice(ss), cf: blockTag}
}
// NewReadOnlyCompleted creates an arbitrary readonly Completed command.
func NewReadOnlyCompleted(ss []string) Completed {
return Completed{cs: newCommandSlice(ss), cf: readonly}
}
// NewMGetCompleted creates an arbitrary readonly Completed command.
func NewMGetCompleted(ss []string) Completed {
return Completed{cs: newCommandSlice(ss), cf: mtGetTag}
}
// MGets groups keys by their slot and returns multi MGET commands
func MGets(keys []string) map[uint16]Completed {
return slotMCMDs("MGET", keys, mtGetTag)
}
// MDels groups keys by their slot and returns multi DEL commands
func MDels(keys []string) map[uint16]Completed {
return slotMCMDs("DEL", keys, 0)
}
// MSets groups keys by their slot and returns multi MSET commands
func MSets(kvs map[string]string) map[uint16]Completed {
return slotMSets("MSET", kvs)
}
// MSetNXs groups keys by their slot and returns multi MSETNX commands
func MSetNXs(kvs map[string]string) map[uint16]Completed {
return slotMSets("MSETNX", kvs)
}
// JsonMGets groups keys by their slot and returns multi JSON.MGET commands
func JsonMGets(keys []string, path string) map[uint16]Completed {
ret := slotMCMDs("JSON.MGET", keys, mtGetTag)
for _, jsonmget := range ret {
jsonmget.cs.s = append(jsonmget.cs.s, path)
jsonmget.cs.l++
}
return ret
}
// JsonMSets groups keys by their slot and returns multi JSON.MSET commands
func JsonMSets(kvs map[string]string, path string) map[uint16]Completed {
ret := make(map[uint16]Completed, 8)
for key, value := range kvs {
var cs *CommandSlice
ks := slot(key)
if cp, ok := ret[ks]; ok {
cs = cp.cs
} else {
cs = get()
cs.s = append(cs.s, "JSON.MSET")
cs.l = 1
ret[ks] = Completed{cs: cs, ks: ks}
}
cs.s = append(cs.s, key, path, value)
cs.l += 3
}
return ret
}
func slotMCMDs(cmd string, keys []string, cf uint16) map[uint16]Completed {
ret := make(map[uint16]Completed, 8)
for _, key := range keys {
var cs *CommandSlice
ks := slot(key)
if cp, ok := ret[ks]; ok {
cs = cp.cs
} else {
cs = get()
cs.s = append(cs.s, cmd)
cs.l = 1
ret[ks] = Completed{cs: cs, cf: cf, ks: ks}
}
cs.s = append(cs.s, key)
cs.l++
}
return ret
}
func slotMSets(cmd string, kvs map[string]string) map[uint16]Completed {
ret := make(map[uint16]Completed, 8)
for key, value := range kvs {
var cs *CommandSlice
ks := slot(key)
if cp, ok := ret[ks]; ok {
cs = cp.cs
} else {
cs = get()
cs.s = append(cs.s, cmd)
cs.l = 1
ret[ks] = Completed{cs: cs, ks: ks}
}
cs.s = append(cs.s, key, value)
cs.l += 2
}
return ret
}
// NewMultiCompleted creates multiple arbitrary Completed commands.
func NewMultiCompleted(cs [][]string) []Completed {
ret := make([]Completed, len(cs))
for i, c := range cs {
ret[i] = NewCompleted(c)
}
return ret
}
func check(prev, new uint16) uint16 {
if prev == InitSlot || prev == new {
return new
}
panic(multiKeySlotErr)
}
const multiKeySlotErr = "multi key command with different key slots are not allowed"
const multiKeyCacheErr = "client side caching for scripting only supports numkeys=1"