handler.go
package httpcache import ( "encoding/json" "fmt" "reflect" "strings" "time" "net/http" "github.com/caddyserver/caddy/v2" "github.com/caddyserver/caddy/v2/modules/caddyhttp" "github.com/hashicorp/consul/api" "github.com/sillygod/cdp-cache/backends" "github.com/sillygod/cdp-cache/extends/distributed" "github.com/sillygod/cdp-cache/pkg/helper" "go.uber.org/zap") const ( cacheHit = "hit" cacheMiss = "miss" cacheSkip = "skip" cacheBypass = "bypass") var ( contextKeysToPreserve = [...]caddy.CtxKey{ caddy.ReplacerCtxKey, "path_prefix", "mitm", } cache *HTTPCache config *Config) func init() { caddy.RegisterModule(Handler{})} // getHandlerCache is a singleton of HTTPCachefunc getHandlerCache() *HTTPCache { l.RLock() defer l.RUnlock() return cache} // Handler is a http handler as a middleware to cache the responsetype Handler struct { Config *Config `json:"config,omitempty"` Cache *HTTPCache `json:"-"` URLLocks *URLLock `json:"-"` DistributedRaw json.RawMessage `json:"distributed,omitempty" caddy:"namespace=distributed inline_key=distributed"` Distributed *distributed.ConsulService `json:"-"` logger *zap.Logger} func (h *Handler) addStatusHeaderIfConfigured(w http.ResponseWriter, status string) { if h.Config.StatusHeader != "" { w.Header().Set(h.Config.StatusHeader, status) }} func (h *Handler) respond(w http.ResponseWriter, entry *Entry, cacheStatus string) error { h.addStatusHeaderIfConfigured(w, cacheStatus) copyHeaders(entry.Response.snapHeader, w.Header()) // when the request method is head, we don't need ot perform write body if entry.Request.Method == "HEAD" { w.WriteHeader(entry.Response.Code) return nil } err := entry.WriteBodyTo(w) return err} func popOrNil(h *Handler, errChan chan error) (err error) { select { case err := <-errChan: if err != nil { h.logger.Error(fmt.Sprintf("popOrNil: %s", err.Error())) } return err default: return nil } } func (h *Handler) fetchUpstream(req *http.Request, next caddyhttp.Handler, key string) (*Entry, error) { // Create a new empty response response := NewResponse() errChan := make(chan error, 1) // Do the upstream fetching in background go func(req *http.Request, response *Response) { upstreamError := next.ServeHTTP(response, req) errChan <- upstreamError response.Close() }(req, response) // Wait headers to be sent response.WaitHeaders() // Create a new CacheEntry return NewEntry(key, req, response, h.Config), popOrNil(h, errChan)} // CaddyModule returns the Caddy module informationfunc (Handler) CaddyModule() caddy.ModuleInfo { return caddy.ModuleInfo{ ID: "http.handlers.http_cache", New: func() caddy.Module { return new(Handler) }, }} func (h *Handler) provisionRuleMatchers() error { for _, raw := range h.Config.RuleMatchersRaws { switch raw.Type { case MatcherTypePath: var content *PathRuleMatcher err := json.Unmarshal(raw.Data, &content) if err != nil { return err } h.Config.RuleMatchers = append(h.Config.RuleMatchers, content) case MatcherTypeHeader: var content *HeaderRuleMatcher err := json.Unmarshal(raw.Data, &content) if err != nil { return err } h.Config.RuleMatchers = append(h.Config.RuleMatchers, content) } } return nil} func handleDeleteKey(data interface{}) error { // the type is api.KVPairs, // https://learn.hashicorp.com/tutorials/consul/distributed-semaphore?in=consul/developer-configuration // implement a distributed lock to handle del kvs, ok := data.(api.KVPairs) if !ok { return fmt.Errorf("non expected data type: %s", reflect.TypeOf(data)) } for _, kv := range kvs { myIP, _ := helper.IPAddr() ip := string(kv.Value) // and to exclude self-trigger event if kv.Session != "" && ip != myIP.String() { caddy.Log().Named("distributed cache"). Debug(fmt.Sprintf("perform cache delete: ip: %s=%s , content: %+v\n", ip, myIP.String(), kv)) key := kv.Key // handle the dispatching delete key event, need to trim the prefix // to get the original key if strings.HasPrefix(kv.Key, distributed.Keyprefix) { key = strings.TrimLeft(kv.Key, distributed.Keyprefix+"/") } cache := getHandlerCache() cache.Del(key) } } return nil} func (h *Handler) provisionDistributed(ctx caddy.Context) error { if h.DistributedRaw != nil { // Register the watch handlers before the distributed module is provisioned wh := distributed.WatchHandler{ Pg: distributed.GetKeyPrefixParams(distributed.Keyprefix), Callback: handleDeleteKey, } distributed.RegisterWatchHandler(wh) val, err := ctx.LoadModule(h, "DistributedRaw") // this will call provision if err != nil { return fmt.Errorf("loading distributed module: %s", err.Error()) } h.Distributed = val.(*distributed.ConsulService) } return nil} // Provision setups the configsfunc (h *Handler) Provision(ctx caddy.Context) error { h.logger = ctx.Logger(h) if h.Config == nil { h.Config = getDefaultConfig() } err := h.provisionRuleMatchers() if err != nil { return err } // NOTE: A dirty work to assign the config and cache to global vars // There will be the corresponding functions to get each of them. // Therefore, we can call its Del to purge the cache via the admin interface distributedOn := h.DistributedRaw != nil cache = NewHTTPCache(h.Config, distributedOn) h.Cache = cache h.URLLocks = NewURLLock(h.Config) // Some type of the backends need extra initialization. switch h.Config.Type { case inMemory: if err := backends.InitGroupCacheRes(h.Config.CacheMaxMemorySize); err != nil { return err } case redis: return h.provisionRedisCache() } // load the guest module distributed err = h.provisionDistributed(ctx) if err != nil { return err } config = h.Config return nil} func (h *Handler) provisionRedisCache() error { opts, err := backends.ParseRedisConfig(h.Config.RedisConnectionSetting) if err != nil { return err } if err := backends.InitRedisClient(opts.Addr, opts.Password, opts.DB); err != nil { return err } return nil} // Validate validates httpcache's configuration.func (h *Handler) Validate() error { return nil} // Cleanup release the resourcesfunc (h *Handler) Cleanup() error { var err error if h.Config.Type == inMemory { err = backends.ReleaseGroupCacheRes() } return err} Method `Handler.ServeHTTP` has a Cognitive Complexity of 31 (exceeds 20 allowed). Consider refactoring.
Method `Handler.ServeHTTP` has 14 return statements (exceeds 4 allowed).func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request, next caddyhttp.Handler) error { // add a log here to record the elapsed time (from receiving the request to send the response) start := time.Now() upstreamDuration := time.Duration(0) TODO found // TODO: think a proper way to log these info // research why can not write the log into files. defer func(h *Handler, t time.Time) { if upstreamDuration == 0 { duration := time.Since(t) h.logger.Debug("cache handler", zap.String("host", r.Host), // find a way to get upstream zap.String("method", r.Method), zap.String("uri", r.RequestURI), zap.Duration("request time", duration)) } else { duration := time.Since(t) h.logger.Debug("cache handler", zap.String("host", r.Host), // find a way to get upstream zap.String("method", r.Method), zap.String("uri", r.RequestURI), zap.Duration("request time", duration), zap.Duration("upstream request time", upstreamDuration)) } }(h, start) if !shouldUseCache(r, h.Config) { h.addStatusHeaderIfConfigured(w, cacheBypass) return next.ServeHTTP(w, r) } key := getKey(h.Config.CacheKeyTemplate, r) lock := h.URLLocks.Acquire(key) defer lock.Unlock() previousEntry, exists := h.Cache.Get(key, r, false) // First case: CACHE HIT // The response exists in cache and is public // It should be served as saved if exists && previousEntry.isPublic { if err := h.respond(w, previousEntry, cacheHit); err == nil { return nil } else if _, ok := err.(backends.NoPreCollectError); ok { // if the err is No pre collect, just return nil w.WriteHeader(previousEntry.Response.Code) return nil } } // Check whether the key exists in the groupcahce when the // distributed cache is enabled. // Currently, only support the memory backends if h.Distributed != nil { // new an entry without fetching the upstream response := NewResponse() entry := NewEntry(key, r, response, h.Config) err := entry.setBackend(r.Context(), h.Config) if err != nil { return caddyhttp.Error(http.StatusInternalServerError, err) } h.Cache.Put(r, entry, h.Config) response.Close() // NOTE: should set the content-length to the header manually when distributed // cache is enabled because we get the content from the other peer. // In this case, the snapHeader will not contain the content-length info if err = h.respond(w, entry, cacheHit); err == nil { return nil } // when error is NoPreCollectError, we need to fetch the resource from the // upstream. if e, ok := err.(backends.NoPreCollectError); !ok { return caddyhttp.Error(entry.Response.Code, e) } } // Second case: CACHE SKIP // The response is in cache but it is not public // It should NOT be served from cache // It should be fetched from upstream and check the new headers // To check if the new response changes to public // Third case: CACHE MISS // The response is not in cache // It should be fetched from upstream and save it in cache t := time.Now() entry, err := h.fetchUpstream(r, next, key) upstreamDuration = time.Since(t) if entry.Response.Code >= 500 { // using stale entry when available previousEntry, exists := h.Cache.Get(key, r, true) if exists && previousEntry.isPublic { if err := h.respond(w, previousEntry, cacheHit); err == nil { return nil } else if _, ok := err.(backends.NoPreCollectError); ok { // if the err is No pre collect, just return nil w.WriteHeader(previousEntry.Response.Code) return nil } } } if err != nil { return caddyhttp.Error(entry.Response.Code, err) } // Case when response was private but now is public if entry.isPublic { err := entry.setBackend(r.Context(), h.Config) if err != nil { return caddyhttp.Error(http.StatusInternalServerError, err) } h.Cache.Put(r, entry, h.Config) err = h.respond(w, entry, cacheMiss) if err != nil { h.logger.Error("cache handler", zap.Error(err)) return caddyhttp.Error(entry.Response.Code, err) } return nil } err = h.respond(w, entry, cacheSkip) if err != nil { h.logger.Error("cache handler", zap.Error(err)) return caddyhttp.Error(entry.Response.Code, err) } return nil} var ( _ caddyhttp.MiddlewareHandler = (*Handler)(nil) _ caddy.Provisioner = (*Handler)(nil) _ caddy.Validator = (*Handler)(nil))