slackscot.go
package slackscot
import (
"context"
"fmt"
"github.com/alexandre-normand/slackscot/config"
"github.com/alexandre-normand/slackscot/schedule"
"github.com/hashicorp/golang-lru"
"github.com/marcsantiago/gocron"
"github.com/slack-go/slack"
"github.com/spf13/cast"
"github.com/spf13/viper"
"go.opentelemetry.io/otel/metric"
otel "go.opentelemetry.io/otel/metric/global"
"io"
"log"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"time"
)
const (
defaultLogPrefix = "slackscot: "
defaultLogFlag = log.Lshortfile | log.LstdFlags
)
// Action types
const (
commandType = "command"
hearActionType = "hearAction"
)
// Slackscot represents what defines a Slack Mascot (mostly, a name and its plugins)
type Slackscot struct {
name string
config *viper.Viper
defaultAction Answerer
plugins []*Plugin
triggeringMsgToResponse *lru.ARCCache
// Runtime configuration options
namespaceCommands bool
// Slack options to apply on Run()
slackOpts []slack.Option
// Command identification
cmdMatcher CommandMatcher
// Bot matching for messages from us
botMatcher SelfMatcher
// Default for cmdMatcher and botMatcher
selfIdentity selfIdentity
// Logger
log *sLogger
// Resources to close on shutdown
closers []io.Closer
// Test mode which defines whether or not the bot reacts to terminationEvents
testMode bool
// Termination channel
terminationCh chan bool
meter metric.Meter
slackLatencyMillis int64
*partitionRouter
*instrumenter
}
// Plugin represents a plugin (its name, action definitions and slackscot injected services)
//
// Set NamespaceCommands to true if the plugin's commands are to be namespaced by slackscot.
// This means that commands will be first checked for a cmdPrefix that matches the plugin name.
// Since that's all handled by slackscot, a plugin should be written with matching only
// considering what comes after the namespace. For example, a plugin with name make would have
// a coffee command be something like
//
// Match: func(m *IncomingMessage) bool {
// return strings.HasPrefix(m.NormalizedText, "coffee ")
// },
// Usage: "coffee `<when>`",
// Description: "Make coffee",
// Answer: func(m *IncomingMessage) *Answer {
// when := strings.TrimPrefix(m.NormalizedText, "coffee ")
// return &Answer{Text: fmt.Sprintf("coffee will be reading %s", when))}}
// }
//
// In this example, if namespacing is enabled, a user would trigger the command with a message such as:
// <@slackscotID> make coffee in 10 minutes
// Note that the plugin itself doesn't need to concern itself with the namespace in the matching or answering
// as the NormalizedText has been formatted to be stripped of namespacing whether or not that's enabled and slackscot
// will have made sure the namespace matched if enabled.
//
// At runtime, instances of slackscot can request to disregard namespacing with OptionNoPluginNamespacing (for example, to run a single plugin and simplify usage).
type Plugin struct {
Name string
NamespaceCommands bool // Set to true for slackscot-managed namespacing of commands where the namespace/cmdPrefix to all commands is set to the plugin name
Commands []ActionDefinition
HearActions []ActionDefinition
ScheduledActions []ScheduledActionDefinition
// Those slackscot services are injected post-creation when slackscot is called.
// A plugin shouldn't rely on those being available during creation
UserInfoFinder UserInfoFinder
Logger SLogger
EmojiReactor EmojiReactor
FileUploader FileUploader
RealTimeMsgSender RealTimeMessageSender
// The slack.Client is injected post-creation. It gives access to all the https://godoc.org/github.com/slack-go/slack#Client.
// Plugin writers might want to check out https://godoc.org/github.com/slack-go/slack/slacktest to create a slack test server in order
// to mock a slack server to test plugins using the SlackClient.
SlackClient *slack.Client
}
// ActionDefinition represents how an action is triggered, published, used and described
// along with defining the function defining its behavior
type ActionDefinition struct {
// Indicates whether the action should be omitted from the help message
Hidden bool
// Matcher that will determine whether or not the action should be triggered
Match Matcher
// Usage example
Usage string
// Help description for the action
Description string
// Function to execute if the Matcher matches
Answer Answerer
}
// Matcher is the function that determines whether or not an action should be triggered based on a IncomingMessage (which
// includes a slack.Msg and a normalized text content. Note that a match doesn't guarantee that the action should
// actually respond with anything once invoked
type Matcher func(m *IncomingMessage) bool
// Answerer is what gets executed when an ActionDefinition is triggered. To signal the absence of an answer, an action
// should return nil
type Answerer func(m *IncomingMessage) *Answer
// ActionDefinitionWithID holds an action definition along with its identifier string
type ActionDefinitionWithID struct {
ActionDefinition
id string
}
// ScheduledActionDefinition represents when a scheduled action is triggered as well
// as what it does and how
type ScheduledActionDefinition struct {
// Indicates whether the action should be omitted from the help message
Hidden bool
// Schedule definition determining when the action runs
Schedule schedule.Definition
// Help description for the scheduled action
Description string
// ScheduledAction is the function that is invoked when the schedule activates
Action ScheduledAction
}
// ScheduledAction is what gets executed when a ScheduledActionDefinition is triggered (by its ScheduleDefinition)
// In order to do anything, a plugin should define its scheduled actions functions with itself as a receiver
// so the function has access to the injected services
type ScheduledAction func()
// SlackMessageID holds the elements that form a unique message identifier for slack. Technically, slack also uses
// the workspace id as the first part of that unique identifier but since an instance of slackscot only lives within
// a single workspace, that part is left out
type SlackMessageID struct {
channelID string
timestamp string
}
// IsMsgModifiable returns true if this slack message id can be used to update/delete the message.
// In practice, ephemeral messages don't have a channel ID and can't be deleted/updated so this would
// be a case where IsMsgModifiable would return false
func (sid SlackMessageID) IsMsgModifiable() bool {
return sid.channelID != "" && sid.timestamp != ""
}
// String returns the string representation of a SlackMessageID
func (sid SlackMessageID) String() string {
return fmt.Sprintf("%s/%s", sid.channelID, sid.timestamp)
}
// responseStrategy defines how a slack.OutgoingMessage is generated from an Answer
type responseStrategy func(m IncomingMessage, answer *Answer) slack.OutgoingMessage
// IncomingMessage holds data for an incoming slack message. In addition to a slack.Msg, it also has
// a normalized text that is the original text stripped from the "<@Mention>" cmdPrefix when a message
// is addressed to a slackscot instance. Since commands are usually received either via direct message
// (without @Mention) or on channels with @Mention, the normalized text is useful there to allow plugins
// to have a single version to do Match and Answer against
type IncomingMessage struct {
// The original slack.Msg text stripped from the "<@Mention>" cmdPrefix, if applicable
NormalizedText string
slack.Msg
}
// OutgoingMessage holds a plugin generated slack outgoing message along with the plugin identifier
type OutgoingMessage struct {
slack.OutgoingMessage
// Answer from plugins/internal commands
Answer
// The identifier of the source of the outgoing message. The format being: <pluginName>.command[<commandIndex>] (for a command) or <pluginName>.hearAction[actionIndex] (for an hear action)
pluginActionID string
}
// runDependencies represents all runtime dependencies. Note that they're mostly satisfied by slack.RTM or slack.Client
// but having dependencies used as the smaller interfaces keeps the rest of the code cleaner and easier to test
type runDependencies struct {
chatDriver chatDriver
userInfoFinder UserInfoFinder
emojiReactor EmojiReactor
fileUploader FileUploader
selfInfoFinder selfInfoFinder
realTimeMsgSender RealTimeMessageSender
slackClient *slack.Client
}
// CommandMatcher is the interface that wraps methods required for defining how commands
// are matched and how the corresponding slack user interface is generated
type CommandMatcher interface {
// Return True if the message is a command
IsCmd(msg slack.Msg) bool
// Prefix to use with help text
UsagePrefix() string
// TrimPrefix is the cmdPrefix that should be removed from non-DM commands
TrimPrefix(string) string
fmt.Stringer
}
// SelfMatcher is the interface defining how slackscot determines that a message
// originates from itself
type SelfMatcher interface {
// IsBot returns true if the message is from the bot
IsBot(msg slack.Msg) bool
fmt.Stringer
}
// Identify @mention based commands
type selfIdentity struct {
// Caching self identity used during message processing/filtering
id string
botID string
name string
userPrefix string
}
func (s *selfIdentity) IsCmd(msg slack.Msg) bool {
return strings.HasPrefix(msg.Text, s.userPrefix)
}
func (s *selfIdentity) UsagePrefix() string {
return ""
}
func (s *selfIdentity) TrimPrefix(text string) string {
return strings.TrimPrefix(text, s.userPrefix)
}
func (s *selfIdentity) IsBot(msg slack.Msg) bool {
return msg.User == s.id || msg.BotID == s.id || msg.BotID == s.botID
}
func (s *selfIdentity) String() string {
return fmt.Sprintf("Self-Ident{%v %v %v %v}", s.id, s.botID, s.name, s.userPrefix)
}
// Option defines an option for a Slackscot
type Option func(*Slackscot)
// OptionLog sets a logger for Slackscot
func OptionLog(logger *log.Logger) Option {
return func(s *Slackscot) {
s.log.logger = logger
}
}
// OptionWithSlackOption adds a slack.Option to apply on the slack client
func OptionWithSlackOption(opt slack.Option) Option {
return func(s *Slackscot) {
s.slackOpts = append(s.slackOpts, opt)
}
}
// OptionNoPluginNamespacing disables plugin command namespacing for this instance. This means
// that namespacing plugin candidates will run without any extra plugin name matching required
// This is useful to simplify command usage for instances running a single plugin
func OptionNoPluginNamespacing() Option {
return func(s *Slackscot) {
s.namespaceCommands = false
}
}
// OptionLogfile sets a logfile for Slackscot while using the other default logging cmdPrefix and options
func OptionLogfile(logfile *os.File) Option {
return func(s *Slackscot) {
s.log.logger = log.New(logfile, defaultLogPrefix, defaultLogFlag)
s.slackOpts = append(s.slackOpts, slack.OptionLog(s.log.logger))
}
}
// OptionTestMode sets the instance in test mode which instructs it to react to a goodbye event to terminate
// its execution. It is meant to be used for testing only and mostly in conjunction with github.com/slack-go/slack/slacktest.
// Very importantly, the termination message must be formed correctly so that the slackscot instance terminates
// correctly for tests to actually terminate.
//
// Here's an example:
//
// testServer := slacktest.NewTestServer()
// testServer.Handle("/channels.create", slacktest.Websocket(func(conn *websocket.Conn) {
// // Trigger a termination on any API call to channels.create
// slacktest.RTMServerSendGoodbye(conn)
// }))
// testServer.Start()
// defer testServer.Stop()
//
// termination := make(chan bool)
// s, err := New("BobbyTables", config.NewViperWithDefaults(), OptionWithSlackOption(slack.OptionAPIURL(testServer.GetAPIURL())), OptionTestMode(termination))
// require.NoError(t, err)
//
// tp := newTestPlugin()
// s.RegisterPlugin(tp)
//
// go s.Run()
//
// // TODO: Use the testserver to send events and messages and assert your plugin's behavior
//
// // Send this event to the testServer's websocket. This gets transformed into a
// // slack.DisconnectedEvent with Cause equal to slack.ErrRTMGoodbye that slackscot will
// // interpret as a signal to self-terminate
// testServer.SendToWebsocket("{\"type\":\"goodbye\"}")
//
// // Wait for slackscot to terminate
// <-termination
func OptionTestMode(terminationCh chan bool) Option {
return func(s *Slackscot) {
s.testMode = true
s.terminationCh = terminationCh
}
}
//OptionCommandPrefix sets a cmdPrefix to all commands that is used instead of at-mentioning the bot
func OptionCommandPrefix(cmdPrefix string) Option {
return func(s *Slackscot) {
pc := new(prefixedCommand)
pc.prefix = cmdPrefix
s.cmdMatcher = pc
}
}
type prefixedCommand struct {
prefix string
}
func (pc *prefixedCommand) IsCmd(msg slack.Msg) bool {
return strings.HasPrefix(msg.Text, pc.prefix)
}
func (pc *prefixedCommand) UsagePrefix() string {
return pc.prefix
}
func (pc *prefixedCommand) TrimPrefix(text string) string {
return strings.TrimPrefix(text, pc.prefix)
}
func (pc *prefixedCommand) String() string {
return fmt.Sprintf("Prefixed-Command{%v}", pc.prefix)
}
// NewSlackscot creates a new slackscot from an array of plugins and a name
//
// Deprecated: Use New instead. Will be removed in 2.0.0
func NewSlackscot(name string, v *viper.Viper, options ...Option) (s *Slackscot, err error) {
return New(name, v, options...)
}
// New creates a new slackscot from an array of plugins and a name
func New(name string, v *viper.Viper, options ...Option) (s *Slackscot, err error) {
s = new(Slackscot)
s.triggeringMsgToResponse, err = lru.NewARC(v.GetInt(config.ResponseCacheSizeKey))
if err != nil {
return nil, err
}
v = config.LayerConfigWithDefaults(v)
s.name = name
s.config = v
s.namespaceCommands = true
s.testMode = false
s.closers = make([]io.Closer, 0)
s.defaultAction = defaultAction
s.log = NewSLogger(log.New(os.Stdout, defaultLogPrefix, defaultLogFlag), v.GetBool(config.DebugKey))
partitionCount := s.config.GetInt(config.MessageProcessingPartitionCount)
if !isPowerOfTwo(partitionCount) {
return nil, fmt.Errorf("%s config should be a power of two but was [%d]", config.MessageProcessingPartitionCount, partitionCount)
}
s.slackOpts = make([]slack.Option, 0)
s.slackOpts = append(s.slackOpts, slack.OptionDebug(s.config.GetBool(config.DebugKey)))
s.slackOpts = append(s.slackOpts, slack.OptionLog(log.New(s.log.logger.Writer(), "slack: ", defaultLogFlag)))
s.botMatcher = &s.selfIdentity
s.cmdMatcher = &s.selfIdentity
s.meter = otel.GetMeterProvider().Meter("github.com/alexandre-normand/slackscot")
for _, opt := range options {
opt(s)
}
s.instrumenter, err = newInstrumenter(name, s.meter, s.reportLatency)
if err != nil {
return nil, err
}
s.partitionRouter, err = newPartitionRouter(partitionCount, s.config.GetInt(config.MessageProcessingBufferedMessageCount), s.log, s.instrumenter)
if err != nil {
return nil, err
}
return s, nil
}
// defaultAnswer for a message directed to slackscot that isn't matching any known plugin command
func defaultAction(m *IncomingMessage) *Answer {
return &Answer{Text: fmt.Sprintf("I don't understand. Ask me for \"%s\" to get a list of things I do", helpPluginName)}
}
func (s *Slackscot) reportLatency(ctx context.Context, result metric.Int64ObserverResult) {
result.Observe(s.slackLatencyMillis)
}
// Close closes all closers of this slackscot. The first error that occurs
// during a Close is returned but regardless, all closers are attempted
// to be closed
func (s *Slackscot) Close() (err error) {
for _, c := range s.closers {
if err == nil {
err = c.Close()
} else {
c.Close()
}
}
return err
}
// RegisterPlugin registers a plugin with the Slackscot engine. This should be invoked
// prior to calling Run
func (s *Slackscot) RegisterPlugin(p *Plugin) {
s.plugins = append(s.plugins, p)
}
// Run starts the Slackscot and loops until the process is interrupted
func (s *Slackscot) Run() (err error) {
sc := slack.New(
s.config.GetString(config.TokenKey),
s.slackOpts...,
)
// This will initiate the connection to the slack RTM and start the reception of messages
rtm := sc.NewRTM()
go rtm.ManageConnection()
// Load time zone location for the scheduler, we just log the error here since we fail to start
// but we're in a go routine. Hopefully, this should be sufficient for users to figure out the bad
// configuration
timeLoc, err := config.GetTimeLocation(s.config)
if err != nil {
return err
}
// Start scheduling of all plugins' scheduled actions
go s.startActionScheduler(timeLoc)
// runInternal is blocking call so it's running in a goroutine. The way slackscot would usually terminate
// in a production scenario is by its process getting killed which would result in a last message sent on the termination channel
if s.terminationCh != nil {
// Start the main processing and send the termination to the externally defined termination channel (so a test can block and wait for processing after sending all of its test messages)
go s.runInternal(rtm.IncomingEvents, &runDependencies{chatDriver: NewchatDriverWithTelemetry(sc, s.name, s.instrumenter.meter), userInfoFinder: NewUserInfoFinderWithTelemetry(sc, s.name, s.instrumenter.meter), emojiReactor: NewEmojiReactorWithTelemetry(sc, s.name, s.instrumenter.meter), fileUploader: NewFileUploaderWithTelemetry(NewFileUploader(sc), s.name, s.instrumenter.meter), selfInfoFinder: rtm, realTimeMsgSender: rtm, slackClient: sc})
} else {
// This is production and the lifecycle is managed here so we create the termination channel and wait for the termination signal
s.terminationCh = make(chan bool)
go s.runInternal(rtm.IncomingEvents, &runDependencies{chatDriver: NewchatDriverWithTelemetry(sc, s.name, s.instrumenter.meter), userInfoFinder: NewUserInfoFinderWithTelemetry(sc, s.name, s.instrumenter.meter), emojiReactor: NewEmojiReactorWithTelemetry(sc, s.name, s.instrumenter.meter), fileUploader: NewFileUploaderWithTelemetry(NewFileUploader(sc), s.name, s.instrumenter.meter), selfInfoFinder: rtm, realTimeMsgSender: rtm, slackClient: sc})
// Wait for termination
<-s.terminationCh
}
return nil
}
// runInternal handles all incoming events and acts as the main loop. It will essentially
// always process events as long as the process isn't interrupted. Normally, this happens
// by a kill signal being sent and slackscot gets notified and closes the events channel which
// terminates this loop and shuts down gracefully
func (s *Slackscot) runInternal(events <-chan slack.RTMEvent, deps *runDependencies) {
// Ensure we send a termination signal on the channel to unblock the main thread and exit
defer func() {
s.terminationCh <- true
}()
// Register to receive a notification for a termination signal which will, in turn, send a termination message to the
// termination channel
go s.watchForTerminationSignalToAbort()
// Start by adding the help command now that we know all plugins have been registered
helpPlugin := s.newHelpPlugin(VERSION)
s.RegisterPlugin(&helpPlugin.Plugin)
// Inject services into plugins before starting to process events
s.injectServicesToPlugins(deps.userInfoFinder, s.log, deps.emojiReactor, deps.fileUploader, deps.realTimeMsgSender, deps.slackClient)
// start all worker go routines
for i := range s.messageQueues {
go s.processMessages(deps.chatDriver, s.messageQueues[i], s.workerTerminationSignals[i])
}
for msg := range events {
switch e := msg.Data.(type) {
case *slack.ConnectedEvent:
s.log.Printf("Infos: %v\n", e.Info)
s.log.Printf("Connection counter: %d\n", e.ConnectionCount)
err := s.cacheSelfIdentity(deps.selfInfoFinder, deps.userInfoFinder)
if err != nil {
s.log.Printf("Error getting self identity: %s", err.Error())
return
}
case *slack.MessageEvent:
s.coreMetrics.msgsSeen.Add(context.Background(), 1)
s.routeMessageEvent(*e)
case *slack.LatencyReport:
s.slackLatencyMillis = e.Value.Milliseconds()
s.log.Printf("Current latency: %v\n", e.Value)
case *slack.RTMError:
s.log.Printf("Error: %s\n", e.Error())
case *slack.InvalidAuthEvent:
s.log.Printf("Invalid credentials\n")
return
case *slack.DisconnectedEvent:
if s.testMode && e.Cause != nil && e.Cause == slack.ErrRTMGoodbye {
s.log.Printf("Received termination event in test mode, terminating\n")
// Close all processing queues and wait for the terminations
for _, wq := range s.messageQueues {
close(wq)
}
// Wait for all workers to terminate processing
for _, tc := range s.workerTerminationSignals {
<-tc
}
return
}
default:
// Ignoring other messages
}
}
}
// injectServicesToPlugins assembles/creates the services and injects them in all plugins
func (s *Slackscot) injectServicesToPlugins(loadingUserInfoFinder UserInfoFinder, logger SLogger, emojiReactor EmojiReactor, fileUploader FileUploader, msgSender RealTimeMessageSender, slackClient *slack.Client) (err error) {
userInfoFinder, err := NewCachingUserInfoFinder(s.config, loadingUserInfoFinder, logger)
if err != nil {
return err
}
for _, p := range s.plugins {
p.Logger = logger
p.UserInfoFinder = userInfoFinder
p.EmojiReactor = emojiReactor
p.FileUploader = fileUploader
p.RealTimeMsgSender = msgSender
p.SlackClient = slackClient
}
return nil
}
// watchForTerminationSignalToAbort waits for a SIGTERM or SIGINT and sends a termination signal on the termination channel to finish
// the main Run() loop and terminate cleanly. Note that this is meant to run in a go routine given that this is blocking
func (s *Slackscot) watchForTerminationSignalToAbort() {
tSignals := make(chan os.Signal, 1)
// Register to be notified of termination signals so we can abort
signal.Notify(tSignals, syscall.SIGINT, syscall.SIGTERM)
sig := <-tSignals
s.log.Debugf("Received termination signal [%s], closing RTM's incoming events channel to terminate processing\n", sig)
s.terminationCh <- true
}
// getActionID returns a formatted identifier for an action. It includes the plugin name,
// the action type (command or hear action) and its index within the list of such actions for the plugin
//
// The identifier remains the same for the duration of an execution but might change if the slackscot instance
// reorders/replaces actions. Since the identifier isn't used for any durable functionality at the moment, this seems
// adequate. If this ever changes, we might formalize an action identifier that could be generated by users and validated
// to be unique.
func getActionID(pluginName string, actionType string, index int) (actionID string) {
return fmt.Sprintf("%s.%s[%d]", pluginName, actionType, index)
}
// cacheSelfIdentity gets "our" identity and keeps the id.id and id.name to avoid having to look it up every time
func (s *Slackscot) cacheSelfIdentity(selfInfoFinder selfInfoFinder, userInfoFinder UserInfoFinder) (err error) {
s.selfIdentity.id = selfInfoFinder.GetInfo().User.ID
s.selfIdentity.name = selfInfoFinder.GetInfo().User.Name
user, err := userInfoFinder.GetUserInfo(s.selfIdentity.id)
if err != nil {
return err
}
s.selfIdentity.botID = user.Profile.BotID
s.selfIdentity.userPrefix = fmt.Sprintf("<@%s> ", s.selfIdentity.id)
s.log.Debugf("Caching self id [%s], self name [%s], self bot ID [%s] and self cmdPrefix [%s]\n", s.selfIdentity.id, s.selfIdentity.name, s.selfIdentity.botID, s.selfIdentity.userPrefix)
return nil
}
// startActionScheduler creates all ScheduledActionDefinition from all plugins and registers them with the scheduler
// Very importantly, it also starts the scheduler
func (s *Slackscot) startActionScheduler(timeLoc *time.Location) {
gocron.ChangeLoc(timeLoc)
sc := gocron.NewScheduler()
for _, p := range s.plugins {
if p.ScheduledActions != nil {
for _, sa := range p.ScheduledActions {
j, err := schedule.NewJob(sc, sa.Schedule)
if err == nil {
s.log.Debugf("Adding job [%v] to scheduler\n", j)
err = j.Do(sa.Action)
}
if err != nil {
s.log.Printf("Error: failed to schedule job for scheduled action ['%s' - %s]: %v\n", sa.Schedule, sa.Description, err)
}
}
}
}
_, t := sc.NextRun()
s.log.Debugf("Starting scheduler with first job scheduled at [%s]\n", t)
// TODO: consider keeping track of the scheduler to stop it if it starts to appear necessary
<-sc.Start()
}
// processMessages processes messages from a queue and sends a termination signal on terminationChan when done
func (s *Slackscot) processMessages(driver chatDriver, queue chan slack.MessageEvent, terminationChan chan bool) {
for msg := range queue {
// reply_to is an field set to 1 sent by slack when a sent message has been acknowledged and should be considered
// officially sent to others. Therefore, we ignore all of those since it's mostly for clients/UI to show status
isReply := msg.ReplyTo > 0
s.log.Debugf("Processing event: %v", msg)
if !isReply && msg.Type == "message" {
if msg.SubType == "message_deleted" {
d := measure(func() {
s.processDeletedMessage(driver, msg)
})
c := s.coreMetrics.msgsProcessed[deleteMsgType]
c.Add(context.Background(), 1)
m := s.coreMetrics.msgProcessingLatencyMillis[deleteMsgType]
m.Record(context.Background(), d.Milliseconds())
} else {
if msg.SubType == "message_changed" {
d := measure(func() {
s.processUpdatedMessage(driver, msg)
})
c := s.coreMetrics.msgsProcessed[updateMsgType]
c.Add(context.Background(), 1)
m := s.coreMetrics.msgProcessingLatencyMillis[updateMsgType]
m.Record(context.Background(), d.Milliseconds())
} else if msg.SubType != "message_replied" {
d := measure(func() {
s.processNewMessage(driver, msg)
})
c := s.coreMetrics.msgsProcessed[newMsgType]
c.Add(context.Background(), 1)
m := s.coreMetrics.msgProcessingLatencyMillis[newMsgType]
m.Record(context.Background(), d.Milliseconds())
}
}
}
}
terminationChan <- true
}
// getOriginalMessageID returns the message ID of the original message if it's linked
// to a previous one or the self message id otherwise
func getOriginalMessageID(m slack.MessageEvent) (originalID SlackMessageID) {
if m.SubMessage != nil {
return SlackMessageID{channelID: m.Channel, timestamp: m.SubMessage.Timestamp}
}
return SlackMessageID{channelID: m.Channel, timestamp: m.Timestamp}
}
// getAgeOriginalMsg returns the age of an updated message as defined by the time elapsed between the message
// update (from the time of the current event) and the original message. If there's no previous message, the
// age is 0.
func getAgeOriginalMsg(m slack.MessageEvent) (age time.Duration, err error) {
updatedTime, err := strconv.ParseFloat(m.Timestamp, 64)
if err != nil {
return time.Duration(0), err
}
originalTime, err := strconv.ParseFloat(m.SubMessage.Timestamp, 64)
if err != nil {
return time.Duration(0), err
}
ageInSeconds := updatedTime - originalTime
return time.Duration(int64(ageInSeconds)) * time.Second, nil
}
// processUpdatedMessage processes changed messages. This is a more complicated scenario but slackscot handles it by doing the following:
// 1. If the message age is older than the config.MaxAgeHandledMessages threshold, the message update is ignored
// 2. If the message isn't present in the triggering message cache, we process it as we would any other regular new message (check if it triggers an action and sends responses accordingly)
// 3. If the message is present in cache, we had pre-existing responses so we handle this by updating responses on a plugin action basis. A plugin action that isn't triggering anymore gets its previous
// response deleted while a still triggering response will result in a message update. Newly triggered actions will be sent out as new messages.
// 4. The new state of responses replaces the previous one for the triggering message in the cache
func (s *Slackscot) processUpdatedMessage(driver chatDriver, m slack.MessageEvent) {
incomingMessageID := SlackMessageID{channelID: m.Channel, timestamp: m.Timestamp}
editedMsgID := getOriginalMessageID(m)
maxAgeThreshold := s.config.GetDuration(config.MaxAgeHandledMessages)
msgAge, err := getAgeOriginalMsg(m)
if err != nil {
s.log.Printf("Unable to determine max age for message [%v]: %s", m, err.Error())
return
}
if msgAge > maxAgeThreshold {
s.log.Debugf("Updated message: [%s] has an age of [%s] but the max age for handled messages is [%s]. Skipping...", editedMsgID, msgAge, maxAgeThreshold)
return
}
s.log.Debugf("Updated message: [%s], does cache contain it => [%t]", editedMsgID, s.triggeringMsgToResponse.Contains(editedMsgID))
if cachedResponses, exists := s.triggeringMsgToResponse.Get(editedMsgID); exists {
s.processUpdatedMessageWithCachedResponses(driver, m, editedMsgID, cachedResponses.(map[string]SlackMessageID))
} else {
outMsgs := s.routeMessage(m)
s.sendOutgoingMessages(driver, incomingMessageID, outMsgs)
}
}
// processUpdatedMessageWithCachedResponses handles a message update for which we still have cached responses in cache. This is where we take care of deleting responses that are no longer
// triggering the action they're coming from, updating the reactions for still triggering plugin actions as well as sending new reactions for plugin actions that are now triggering
func (s *Slackscot) processUpdatedMessageWithCachedResponses(driver chatDriver, m slack.MessageEvent, editedMsgID SlackMessageID, cachedResponses map[string]SlackMessageID) {
newResponseByActionID := make(map[string]SlackMessageID)
outMsgs := s.routeMessage(m)
s.log.Debugf("Detected %d existing responses to message [%s]\n", len(cachedResponses), editedMsgID)
for _, o := range outMsgs {
// We had a previous response for that same plugin action so edit it instead of posting a new message
if r, ok := cachedResponses[o.pluginActionID]; ok {
s.log.Debugf("Trying to update response at [%s] with message [%s]\n", r, o.OutgoingMessage.Text)
rID, err := s.updateExistingMessage(driver, r, o)
if err != nil {
s.log.Printf("Unable to update message [%s] to triggering message [%s]: %v\n", r, editedMsgID, err)
} else {
// Add the new updated message to the new responses
newResponseByActionID[o.pluginActionID] = rID
// Remove entries for plugin actions as we process them so that we can detect afterwards if a plugin isn't triggering
// anymore (to delete those responses).
delete(cachedResponses, o.pluginActionID)
}
} else {
s.log.Debugf("New response triggered to updated message [%s] [%s]: [%s]\n", o.OutgoingMessage.Text, r, o.OutgoingMessage.Text)
// It's a new message for that action so post it as a new message
rID, err := s.sendNewMessage(driver, o, editedMsgID.timestamp)
if err != nil {
s.log.Printf("Unable to send new message to updated message [%s]: %v\n", r, err)
} else if rID.IsMsgModifiable() {
// Add the new updated message to the new responses if it can be modified later
newResponseByActionID[o.pluginActionID] = rID
}
}
}
// Delete any previous triggered responses that aren't triggering anymore
for pa, r := range cachedResponses {
s.log.Debugf("Deleting previous response [%s] on a now non-triggered plugin action [%s]\n", r, pa)
driver.DeleteMessage(r.channelID, r.timestamp)
}
// Since the updated message now has new responses, update the entry with those or remove if no actions are triggered
if len(newResponseByActionID) > 0 {
s.log.Debugf("Updating responses to edited message [%s]\n", editedMsgID)
s.triggeringMsgToResponse.Add(editedMsgID, newResponseByActionID)
} else {
s.log.Debugf("Deleting entry for edited message [%s] since no more triggered response\n", editedMsgID)
s.triggeringMsgToResponse.Remove(editedMsgID)
}
}
// processDeletedMessage handles a deleted message. Slackscot cares about those in order to
// delete any previous responses triggered by that now inexistant message
func (s *Slackscot) processDeletedMessage(deleter messageDeleter, msgEvent slack.MessageEvent) {
deletedMessageID := SlackMessageID{channelID: msgEvent.Channel, timestamp: msgEvent.DeletedTimestamp}
s.log.Debugf("Message deleted: [%s] and cache contains: [%s]", deletedMessageID, s.triggeringMsgToResponse.Keys())
if existingResponses, exists := s.triggeringMsgToResponse.Get(deletedMessageID); exists {
byAction := existingResponses.(map[string]SlackMessageID)
for _, v := range byAction {
// Delete existing response since the triggering message was deleted
_, _, err := deleter.DeleteMessage(v.channelID, v.timestamp)
if err != nil {
s.log.Printf("Error deleting existing response to triggering message [%s]: %s: %v", deletedMessageID, v, err)
}
}
s.triggeringMsgToResponse.Remove(deletedMessageID)
}
}
// processNewMessage handles a regular new message and sends any triggered response
func (s *Slackscot) processNewMessage(msgSender messageSender, m slack.MessageEvent) {
incomingMessageID := SlackMessageID{channelID: m.Channel, timestamp: m.Timestamp}
outMsgs := s.routeMessage(m)
s.sendOutgoingMessages(msgSender, incomingMessageID, outMsgs)
}
// sendOutgoingMessages sends out any triggered plugin responses and keeps track of those in the internal cache
func (s *Slackscot) sendOutgoingMessages(sender messageSender, incomingMessageID SlackMessageID, outMsgs []OutgoingMessage) {
newResponseByActionID := make(map[string]SlackMessageID)
for _, o := range outMsgs {
// Send the message and keep track of our response in cache to be able to update it as needed later
rID, err := s.sendNewMessage(sender, o, incomingMessageID.timestamp)
if err != nil {
s.log.Printf("Unable to send new message triggered by [%s]: %v\n", incomingMessageID, err)
} else if rID.IsMsgModifiable() {
// Add the new updated message to the new responses if it's one that can be modified later
newResponseByActionID[o.pluginActionID] = rID
}
}
if len(newResponseByActionID) > 0 {
s.log.Debugf("Adding responses to triggering message [%s]: %s", incomingMessageID, newResponseByActionID)
// Add current responses for that triggering message
s.triggeringMsgToResponse.Add(incomingMessageID, newResponseByActionID)
}
}
// sendNewMessage sends a new outgoingMsg and waits for the response to return that message's identifier
func (s *Slackscot) sendNewMessage(sender messageSender, o OutgoingMessage, defaultThreadTS string) (rID SlackMessageID, err error) {
s.log.Printf("Sending new message: %s", o.OutgoingMessage.Text)
sendOpts := ApplyAnswerOpts(o.Options...)
options := []slack.MsgOption{slack.MsgOptionText(o.OutgoingMessage.Text, false), slack.MsgOptionAsUser(true)}
if s.config.GetBool(config.ThreadedRepliesKey) || cast.ToBool(sendOpts[ThreadedReplyOpt]) {
if threadTS := cast.ToString(sendOpts[ThreadTimestamp]); threadTS != "" {
options = append(options, slack.MsgOptionTS(threadTS))
} else {
options = append(options, slack.MsgOptionTS(defaultThreadTS))
}
if s.config.GetBool(config.BroadcastThreadedRepliesKey) || cast.ToBool(sendOpts[BroadcastOpt]) {
options = append(options, slack.MsgOptionBroadcast())
}
}
// Add ephemeral option if present
if userID, ok := sendOpts[EphemeralAnswerToOpt]; ok {
options = append(options, slack.MsgOptionPostEphemeral(userID))
}
// Add any block kit content blocks, if any
if len(o.ContentBlocks) > 0 {
options = append(options, slack.MsgOptionBlocks(o.ContentBlocks...))
}
channelID, newOutgoingMsgTimestamp, _, err := sender.SendMessage(o.OutgoingMessage.Channel, options...)
rID = SlackMessageID{channelID: channelID, timestamp: newOutgoingMsgTimestamp}
return rID, err
}
// updateExistingMessage updates an existing message with the content of a newly triggered OutgoingMessage
func (s *Slackscot) updateExistingMessage(updater messageUpdater, r SlackMessageID, o OutgoingMessage) (rID SlackMessageID, err error) {
options := []slack.MsgOption{slack.MsgOptionText(o.OutgoingMessage.Text, false), slack.MsgOptionAsUser(true)}
// Add any block kit content blocks, if any
if len(o.ContentBlocks) > 0 {
options = append(options, slack.MsgOptionBlocks(o.ContentBlocks...))
}
channelID, newOutgoingMsgTimestamp, _, err := updater.UpdateMessage(r.channelID, r.timestamp, options...)
rID = SlackMessageID{channelID: channelID, timestamp: newOutgoingMsgTimestamp}
return rID, err
}
// normalizeIncomingMessage normalizes a main message event and its sub message to form what would be an intuitive message to process for
// a bot. When it's a regular message (no SubMessage), a copy is returned unchanged. For other cases (like message updates),
// a message with the new updated text (since we're talking about a changed message) along with the channel being the one where the message
// is visible and with the user correctly set to the person who updated/sent the message. We also take the timestamp of the original message to make
// it convenient for plugins using the timestamp to know that they're looking at the same one they've seen before. Regarding this timestamp, we sort of treat
// is like the identifier that it is which would be initialized when first posted.
//
// Essentially, take everything from the main message except for the text, user and timestamp that is set on the SubMessage, if present.
func normalizeIncomingMessage(m slack.MessageEvent) (normalized slack.Msg) {
normalized = m.Msg
if m.SubMessage != nil {
normalized.Text = m.SubMessage.Text
normalized.User = m.SubMessage.User
normalized.Timestamp = m.SubMessage.Timestamp
}
return normalized
}
// resolveThreadTimestamp returns the proper thread timestamp to use for a new message.
// In the case of a response to a message on a thread, that value would be the original
// thread timestamp. Otherwise, this would be the timestamp of the message responded to.
// The function also returns whether or not the incoming message is a threaded message
// which would indicate that we want any answer to get posted to that thread instead of the
// main channel
func resolveThreadTimestamp(m slack.Msg) (threadTs string, isThreadedMessage bool) {
if m.ThreadTimestamp != "" {
return m.ThreadTimestamp, true
}
return m.Timestamp, false
}
// routeMessage handles routing the message to commands or hear actions according to the context
// The rules are the following:
// 1. If the message is on a channel with a direct mention to us (@name), we route to commands
// 2. If the message is a direct message to us, we route to commands
// 3. If the message is on a channel without mention (regular conversation), we route to hear actions
func (s *Slackscot) routeMessage(me slack.MessageEvent) (responses []OutgoingMessage) {
m := normalizeIncomingMessage(me)
responses = make([]OutgoingMessage, 0)
// Ignore messages_replied and messages send by "us"
if s.botMatcher.IsBot(m) {
s.log.Debugf("Ignoring message from user [%s] / bot ID [%s] because that's \"us\" [%s]", m.User, m.BotID, s.botMatcher)
return responses
}
// Try commands or hear actions depending on the format of the message
if s.isCommand(m) {
replyStrategy := reply
if isDirectMessage(m) {
replyStrategy = directReply
}
for _, p := range s.plugins {
matchedNamespace, inMsg := s.newCmdInMsgWithNormalizedText(p, m)
if matchedNamespace {
outMsgs := s.tryPluginActions(p.Name, commandType, p.Commands, inMsg, replyStrategy)
responses = append(responses, outMsgs...)
}
}
// Use default answer if this was a message formatted as a command for which we didn't have any answer to
if len(responses) == 0 {
responses = append(responses, defaultAnswer(s.defaultAction, s.newIncomingMsgWithNormalizedText(m), replyStrategy))
}
} else {
for _, p := range s.plugins {
inMsg := s.newIncomingMsgWithNormalizedText(m)
outMsgs := s.tryPluginActions(p.Name, hearActionType, p.HearActions, inMsg, send)
responses = append(responses, outMsgs...)
}
}
return responses
}
// defaultAnswer returns the answer by invocation of the default action
func defaultAnswer(answerDefault Answerer, inMsg IncomingMessage, rs responseStrategy) (o OutgoingMessage) {
answer := answerDefault(&inMsg)
answer.useExistingThreadIfAny(&inMsg)
slackOutMsg := rs(inMsg, answer)
return newOutMessageForAnswer(slackOutMsg, "default", *answer)
}
// newCmdInMsgWithNormalizedText creates a new IncomingMessage for a command and generates the normalized text for plugins
// to have a normalized view of the message regardless of context. For commands part of a Plugin with NamespaceCommands,
// the normalized text removes the namespace if the proper namespace is found. If not, matchedNamespace is false
// and the normalized text is the same as what newIncomingMsgWithNormalizedText would return
func (s *Slackscot) newCmdInMsgWithNormalizedText(p *Plugin, m slack.Msg) (matchedNamespace bool, inMsg IncomingMessage) {
inMsg = s.newIncomingMsgWithNormalizedText(m)
matchedNamespace = true
if p != nil && s.namespaceCommands && p.NamespaceCommands {
namespacePrefix := fmt.Sprintf("%s ", p.Name)
if matchedNamespace = strings.HasPrefix(inMsg.NormalizedText, namespacePrefix); matchedNamespace {
inMsg.NormalizedText = strings.TrimPrefix(inMsg.NormalizedText, namespacePrefix)
}
}
return matchedNamespace, inMsg
}
// newIncomingMsgWithNormalizedText creates a new IncomingMessage and generates the normalized text for plugins
// to have a normalized view of the message regardless of context. This includes having the text stripped of the "<@user>"
// for commands sent via a directed message on a channel
func (s *Slackscot) newIncomingMsgWithNormalizedText(m slack.Msg) (inMsg IncomingMessage) {
inMsg.NormalizedText = m.Text
inMsg.Msg = m
if isCmd, isDirectMsg := s.cmdMatcher.IsCmd(m), isDirectMessage(m); isCmd && !isDirectMsg {
inMsg.NormalizedText = s.cmdMatcher.TrimPrefix(m.Text)
}
return inMsg
}
// isCommand returns true if the slack message is to be interpreted as a command rather than a normal message
// subject to be handled by hear actions
func (s *Slackscot) isCommand(m slack.Msg) (isCommand bool) {
return s.cmdMatcher.IsCmd(m) || isDirectMessage(m)
}
// isDirectMessage returns true if the slack message is to be interpreted as a command rather than a normal message
// subject to be handled by hear actions
func isDirectMessage(m slack.Msg) (isDirectMsg bool) {
return strings.HasPrefix(m.Channel, "D")
}
// useExistingThreadIfAny sets the option on an Answer to reply in the existing thread if there is one
func (a *Answer) useExistingThreadIfAny(m *IncomingMessage) {
// If the message we're reacting to is happening on an existing thread, make sure we reply on that
// thread too and avoid the awkward situation of responding on the parent channel
threadTimestamp, threaded := resolveThreadTimestamp(m.Msg)
if threaded {
a.Options = append(a.Options, AnswerInExistingThread(threadTimestamp))
}
}
// tryPluginActions loops over all action definitions and invokes its action if the incoming message matches it's regular expression
// Note that more than one action can be triggered during the processing of a single message
func (s *Slackscot) tryPluginActions(pluginName string, actionType string, actions []ActionDefinition, m IncomingMessage, rs responseStrategy) (outMsgs []OutgoingMessage) {
before := time.Now()
outMsgs = make([]OutgoingMessage, 0)
for i, action := range actions {
matches := action.Match(&m)
if matches {
answer := action.Answer(&m)
if answer != nil {
answer.useExistingThreadIfAny(&m)
slackOutMsg := rs(m, answer)
outMsg := newOutMessageForAnswer(slackOutMsg, getActionID(pluginName, actionType, i), *answer)
outMsgs = append(outMsgs, outMsg)
}
}
}
pm, err := s.getOrCreatePluginMetrics(pluginName)
if err != nil {
s.log.Printf("Error creating plugin metrics for plugin [%s], skipping instrumentation measurements: %s", pluginName, err.Error())
} else {
ctx := context.Background()
pm.processingTimeMillis.Record(ctx, time.Since(before).Milliseconds())
pm.reactionCount.Add(ctx, int64(len(outMsgs)))
}
return outMsgs
}
// newOutMessageForAnswer creates a new internal OutgoingMessage for the given Answer
func newOutMessageForAnswer(o slack.OutgoingMessage, id string, answer Answer) (om OutgoingMessage) {
return OutgoingMessage{OutgoingMessage: o, pluginActionID: id, Answer: answer}
}
// newSlackOutgoingMessage creates a new slack.OutgoingMessage for a given channelID and text content
func newSlackOutgoingMessage(channelID string, text string) slack.OutgoingMessage {
return slack.OutgoingMessage{
Type: "message",
Channel: channelID,
Text: text,
}
}
// reply sends a reply to the user (using @user) who sent the message on the channel it was sent on
func reply(replyToMsg IncomingMessage, answer *Answer) slack.OutgoingMessage {
return newSlackOutgoingMessage(replyToMsg.Channel, fmt.Sprintf("<@%s>: %s", replyToMsg.User, answer.Text))
}
// directReply sends a reply to a direct message
func directReply(replyToMsg IncomingMessage, answer *Answer) slack.OutgoingMessage {
// Force a non-threaded reply since we're in a direct conversation. Instead of overriding
// all existing options, we just add the one to override the threading here
answer.Options = append(answer.Options, AnswerWithoutThreading())
return send(replyToMsg, answer)
}
// send creates a message to be sent on the same channel as received (which can be a direct message since
// slack internally uses a channel id for private conversations)
func send(replyToMsg IncomingMessage, answer *Answer) slack.OutgoingMessage {
return newSlackOutgoingMessage(replyToMsg.Channel, answer.Text)
}